mirror of
https://github.com/DominicBreuker/pspy.git
synced 2025-12-21 03:34:50 +00:00
add tests for fswatcher package
This commit is contained in:
committed by
Dominic Breuker
parent
94a12cf031
commit
1deb4838a5
@@ -2,20 +2,18 @@ package fswatcher
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/dominicbreuker/pspy/internal/fswatcher/inotify"
|
||||
)
|
||||
|
||||
func parseEvents(i *inotify.Inotify, dataCh chan []byte, eventCh chan string, errCh chan error) {
|
||||
func parseEvents(i Inotify, dataCh chan []byte, eventCh chan string, errCh chan error) {
|
||||
for buf := range dataCh {
|
||||
var ptr uint32
|
||||
for len(buf[ptr:]) > 0 {
|
||||
event, size, err := i.ParseNextEvent(buf[ptr:])
|
||||
ptr += size
|
||||
if err != nil {
|
||||
errCh <- fmt.Errorf("parsing events: %v", err)
|
||||
continue
|
||||
}
|
||||
ptr += size
|
||||
eventCh <- fmt.Sprintf("%20s | %s", event.Op, event.Name)
|
||||
}
|
||||
}
|
||||
|
||||
105
internal/fswatcher/fswatcher.go
Normal file
105
internal/fswatcher/fswatcher.go
Normal file
@@ -0,0 +1,105 @@
|
||||
package fswatcher
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/dominicbreuker/pspy/internal/fswatcher/inotify"
|
||||
"github.com/dominicbreuker/pspy/internal/fswatcher/walker"
|
||||
)
|
||||
|
||||
type Inotify interface {
|
||||
Init() error
|
||||
Watch(dir string) error
|
||||
NumWatchers() int
|
||||
Read(buf []byte) (int, error)
|
||||
ParseNextEvent(buf []byte) (*inotify.Event, uint32, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
type Walker interface {
|
||||
Walk(dir string, depth int) (chan string, chan error, chan struct{})
|
||||
}
|
||||
|
||||
type FSWatcher struct {
|
||||
i Inotify
|
||||
w Walker
|
||||
maxWatchers int
|
||||
}
|
||||
|
||||
func NewFSWatcher() (*FSWatcher, error) {
|
||||
return &FSWatcher{
|
||||
i: inotify.NewInotify(),
|
||||
w: walker.NewWalker(),
|
||||
maxWatchers: inotify.MaxWatchers,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (iw *FSWatcher) Close() {
|
||||
iw.i.Close()
|
||||
}
|
||||
|
||||
func (fs *FSWatcher) Init(rdirs, dirs []string) (chan error, chan struct{}) {
|
||||
errCh := make(chan error)
|
||||
doneCh := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
err := fs.i.Init()
|
||||
if err != nil {
|
||||
errCh <- fmt.Errorf("setting up inotify: %v", err)
|
||||
}
|
||||
for _, dir := range rdirs {
|
||||
addWatchers(dir, -1, fs.i, fs.maxWatchers, fs.w, errCh)
|
||||
}
|
||||
for _, dir := range dirs {
|
||||
addWatchers(dir, 0, fs.i, fs.maxWatchers, fs.w, errCh)
|
||||
}
|
||||
close(doneCh)
|
||||
}()
|
||||
|
||||
return errCh, doneCh
|
||||
}
|
||||
|
||||
func addWatchers(dir string, depth int, i Inotify, maxWatchers int, w Walker, errCh chan error) {
|
||||
dirCh, walkErrCh, doneCh := w.Walk(dir, depth)
|
||||
loop:
|
||||
for {
|
||||
if maxWatchers > 0 && i.NumWatchers() >= maxWatchers {
|
||||
close(doneCh)
|
||||
break loop
|
||||
}
|
||||
select {
|
||||
case err := <-walkErrCh:
|
||||
errCh <- fmt.Errorf("adding inotift watchers: %v", err)
|
||||
case dir, ok := <-dirCh:
|
||||
if !ok {
|
||||
break loop
|
||||
}
|
||||
if err := i.Watch(dir); err != nil {
|
||||
errCh <- fmt.Errorf("Can't create watcher: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (fs *FSWatcher) Start(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) {
|
||||
err := fs.i.Init()
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("setting up inotify: %v", err)
|
||||
}
|
||||
|
||||
for _, dir := range rdirs {
|
||||
addWatchers(dir, -1, fs.i, fs.maxWatchers, fs.w, errCh)
|
||||
}
|
||||
for _, dir := range dirs {
|
||||
addWatchers(dir, 0, fs.i, fs.maxWatchers, fs.w, errCh)
|
||||
}
|
||||
|
||||
triggerCh := make(chan struct{})
|
||||
dataCh := make(chan []byte)
|
||||
go Observe(fs.i, triggerCh, dataCh, errCh)
|
||||
|
||||
eventCh := make(chan string)
|
||||
go parseEvents(fs.i, dataCh, eventCh, errCh)
|
||||
|
||||
return triggerCh, eventCh, nil
|
||||
}
|
||||
127
internal/fswatcher/fswatcher_test.go
Normal file
127
internal/fswatcher/fswatcher_test.go
Normal file
@@ -0,0 +1,127 @@
|
||||
package fswatcher
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/dominicbreuker/pspy/internal/fswatcher/inotify"
|
||||
)
|
||||
|
||||
func TestInit(t *testing.T) {
|
||||
i := NewMockInotify()
|
||||
w := &MockWalker{
|
||||
subdirs: map[string][]string{
|
||||
"mydir1": []string{"dir1", "dir2"},
|
||||
"mydir2": []string{"dir3"},
|
||||
"dir1": []string{"another-dir"},
|
||||
},
|
||||
}
|
||||
fs := &FSWatcher{
|
||||
i: i,
|
||||
w: w,
|
||||
maxWatchers: 999,
|
||||
}
|
||||
rdirs := []string{"mydir1"}
|
||||
dirs := []string{"mydir2"}
|
||||
|
||||
errCh, doneCh := fs.Init(rdirs, dirs)
|
||||
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case <-doneCh:
|
||||
break loop
|
||||
case err := <-errCh:
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatalf("Test timeout")
|
||||
}
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(i.watching, []string{"mydir1", "dir1", "another-dir", "dir2", "mydir2"}) {
|
||||
t.Fatalf("Watching wrong directories: %+v", i.watching)
|
||||
}
|
||||
}
|
||||
|
||||
// mocks
|
||||
|
||||
// Mock Inotify
|
||||
|
||||
type MockInotify struct {
|
||||
initialized bool
|
||||
watching []string
|
||||
}
|
||||
|
||||
func NewMockInotify() *MockInotify {
|
||||
return &MockInotify{
|
||||
initialized: false,
|
||||
watching: make([]string, 0),
|
||||
}
|
||||
}
|
||||
|
||||
func (i *MockInotify) Init() error {
|
||||
i.initialized = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *MockInotify) Watch(dir string) error {
|
||||
if !i.initialized {
|
||||
return errors.New("Not yet initialized")
|
||||
}
|
||||
i.watching = append(i.watching, dir)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *MockInotify) NumWatchers() int {
|
||||
return len(i.watching)
|
||||
}
|
||||
|
||||
func (i *MockInotify) Read(buf []byte) (int, error) {
|
||||
return 32, nil
|
||||
}
|
||||
|
||||
func (i *MockInotify) ParseNextEvent(buf []byte) (*inotify.Event, uint32, error) {
|
||||
return &inotify.Event{Name: "name", Op: "CREATE"}, 32, nil
|
||||
}
|
||||
|
||||
func (i *MockInotify) Close() error {
|
||||
if !i.initialized {
|
||||
return errors.New("Not yet initialized")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Mock Walker
|
||||
|
||||
type MockWalker struct {
|
||||
subdirs map[string][]string
|
||||
}
|
||||
|
||||
func (w *MockWalker) Walk(dir string, depth int) (chan string, chan error, chan struct{}) {
|
||||
dirCh := make(chan string)
|
||||
errCh := make(chan error)
|
||||
doneCh := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
defer close(dirCh)
|
||||
sendDir(w, depth, dir, dirCh)
|
||||
}()
|
||||
|
||||
return dirCh, errCh, doneCh
|
||||
}
|
||||
|
||||
func sendDir(w *MockWalker, depth int, dir string, dirCh chan string) {
|
||||
dirCh <- dir
|
||||
if depth == 0 {
|
||||
return
|
||||
}
|
||||
subdirs, ok := w.subdirs[dir]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
for _, sdir := range subdirs {
|
||||
sendDir(w, depth-1, sdir, dirCh)
|
||||
}
|
||||
}
|
||||
@@ -1,58 +0,0 @@
|
||||
package fswatcher
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
type Inotify struct {
|
||||
fd int
|
||||
watchers map[int]*watcher
|
||||
}
|
||||
|
||||
func NewInotify() (*Inotify, error) {
|
||||
fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC)
|
||||
if fd == -1 {
|
||||
return nil, fmt.Errorf("Can't init inotify: %d", errno)
|
||||
}
|
||||
|
||||
i := &Inotify{
|
||||
fd: fd,
|
||||
watchers: make(map[int]*watcher),
|
||||
}
|
||||
|
||||
return i, nil
|
||||
}
|
||||
|
||||
func (i *Inotify) Watch(dir string) error {
|
||||
w, err := newWatcher(i.fd, dir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating watcher: %v", err)
|
||||
}
|
||||
i.watchers[w.wd] = w
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Inotify) Close() error {
|
||||
if err := unix.Close(i.fd); err != nil {
|
||||
return fmt.Errorf("closing inotify file descriptor: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Inotify) NumWatchers() int {
|
||||
return len(i.watchers)
|
||||
}
|
||||
|
||||
func (i *Inotify) String() string {
|
||||
if len(i.watchers) < 20 {
|
||||
dirs := make([]string, 0)
|
||||
for _, w := range i.watchers {
|
||||
dirs = append(dirs, w.dir)
|
||||
}
|
||||
return fmt.Sprintf("Watching: %v", dirs)
|
||||
} else {
|
||||
return fmt.Sprintf("Watching %d directories", len(i.watchers))
|
||||
}
|
||||
}
|
||||
@@ -13,6 +13,17 @@ import (
|
||||
|
||||
const maximumWatchersFile = "/proc/sys/fs/inotify/max_user_watches"
|
||||
|
||||
// MaxWatchers is the maximum number of inotify watches supported by the Kernel
|
||||
// set to -1 if the number cannot be determined
|
||||
var MaxWatchers int = -1
|
||||
|
||||
func init() {
|
||||
mw, err := getMaxWatchers()
|
||||
if err == nil {
|
||||
MaxWatchers = mw
|
||||
}
|
||||
}
|
||||
|
||||
type Inotify struct {
|
||||
FD int
|
||||
Watchers map[int]*Watcher
|
||||
@@ -28,17 +39,20 @@ type Event struct {
|
||||
Op string
|
||||
}
|
||||
|
||||
func NewInotify() (*Inotify, error) {
|
||||
fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC)
|
||||
if fd < 0 {
|
||||
return nil, fmt.Errorf("initializing inotify: errno: %d", errno)
|
||||
}
|
||||
|
||||
i := &Inotify{
|
||||
FD: fd,
|
||||
func NewInotify() *Inotify {
|
||||
return &Inotify{
|
||||
FD: 0,
|
||||
Watchers: make(map[int]*Watcher),
|
||||
}
|
||||
return i, nil
|
||||
}
|
||||
|
||||
func (i *Inotify) Init() error {
|
||||
fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC)
|
||||
if fd < 0 {
|
||||
return fmt.Errorf("initializing inotify: errno: %d", errno)
|
||||
}
|
||||
i.FD = fd
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Inotify) Watch(dir string) error {
|
||||
@@ -112,7 +126,7 @@ func (i *Inotify) String() string {
|
||||
}
|
||||
}
|
||||
|
||||
func GetMaxWatchers() (int, error) {
|
||||
func getMaxWatchers() (int, error) {
|
||||
b, err := ioutil.ReadFile(maximumWatchersFile)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("reading from %s: %v", maximumWatchersFile, err)
|
||||
|
||||
@@ -9,7 +9,9 @@ import (
|
||||
)
|
||||
|
||||
func TestInotify(t *testing.T) {
|
||||
i, err := NewInotify()
|
||||
i := NewInotify()
|
||||
|
||||
err := i.Init()
|
||||
expectNoError(t, err)
|
||||
|
||||
err = i.Watch("testdata/folder")
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
package fswatcher
|
||||
|
||||
import (
|
||||
"github.com/dominicbreuker/pspy/internal/fswatcher/inotify"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
func Observe(i *inotify.Inotify, triggerCh chan struct{}, dataCh chan []byte, errCh chan error) {
|
||||
func Observe(i Inotify, triggerCh chan struct{}, dataCh chan []byte, errCh chan error) {
|
||||
buf := make([]byte, 5*unix.SizeofInotifyEvent)
|
||||
|
||||
for {
|
||||
|
||||
@@ -1,72 +0,0 @@
|
||||
package fswatcher
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/dominicbreuker/pspy/internal/fswatcher/inotify"
|
||||
"github.com/dominicbreuker/pspy/internal/fswatcher/walker"
|
||||
)
|
||||
|
||||
type InotifyWatcher struct {
|
||||
i *inotify.Inotify
|
||||
}
|
||||
|
||||
func (iw *InotifyWatcher) Close() {
|
||||
iw.i.Close()
|
||||
}
|
||||
|
||||
func NewInotifyWatcher() (*InotifyWatcher, error) {
|
||||
i, err := inotify.NewInotify()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("setting up inotify: %v", err)
|
||||
}
|
||||
return &InotifyWatcher{
|
||||
i: i,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (iw *InotifyWatcher) Setup(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) {
|
||||
maxWatchers, err := inotify.GetMaxWatchers()
|
||||
if err != nil {
|
||||
errCh <- fmt.Errorf("Can't get inotify watcher limit...: %v\n", err)
|
||||
maxWatchers = -1
|
||||
}
|
||||
|
||||
for _, dir := range rdirs {
|
||||
addWatchers(dir, -1, iw.i, maxWatchers, errCh)
|
||||
}
|
||||
for _, dir := range dirs {
|
||||
addWatchers(dir, 0, iw.i, maxWatchers, errCh)
|
||||
}
|
||||
|
||||
triggerCh := make(chan struct{})
|
||||
dataCh := make(chan []byte)
|
||||
go Observe(iw.i, triggerCh, dataCh, errCh)
|
||||
|
||||
eventCh := make(chan string)
|
||||
go parseEvents(iw.i, dataCh, eventCh, errCh)
|
||||
|
||||
return triggerCh, eventCh, nil
|
||||
}
|
||||
|
||||
func addWatchers(dir string, depth int, i *inotify.Inotify, maxWatchers int, errCh chan error) {
|
||||
dirCh, walkErrCh, doneCh := walker.Walk(dir, depth)
|
||||
loop:
|
||||
for {
|
||||
if maxWatchers > 0 && i.NumWatchers() >= maxWatchers {
|
||||
close(doneCh)
|
||||
break loop
|
||||
}
|
||||
select {
|
||||
case err := <-walkErrCh:
|
||||
errCh <- fmt.Errorf("adding inotift watchers: %v", err)
|
||||
case dir, ok := <-dirCh:
|
||||
if !ok {
|
||||
break loop
|
||||
}
|
||||
if err := i.Watch(dir); err != nil {
|
||||
errCh <- fmt.Errorf("Can't create watcher: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -7,9 +7,15 @@ import (
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
type Walker struct{}
|
||||
|
||||
func NewWalker() *Walker {
|
||||
return &Walker{}
|
||||
}
|
||||
|
||||
const maxInt = int(^uint(0) >> 1)
|
||||
|
||||
func Walk(root string, depth int) (dirCh chan string, errCh chan error, doneCh chan struct{}) {
|
||||
func (w *Walker) Walk(root string, depth int) (dirCh chan string, errCh chan error, doneCh chan struct{}) {
|
||||
if depth < 0 {
|
||||
depth = maxInt
|
||||
}
|
||||
|
||||
@@ -39,7 +39,8 @@ func TestWalk(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
dirCh, errCh, doneCh := Walk(tt.root, tt.depth)
|
||||
w := &Walker{}
|
||||
dirCh, errCh, doneCh := w.Walk(tt.root, tt.depth)
|
||||
dirs, errs := getAllDirsAndErrors(dirCh, errCh)
|
||||
|
||||
if !reflect.DeepEqual(dirs, tt.result) {
|
||||
|
||||
@@ -1,44 +0,0 @@
|
||||
package fswatcher
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
const events = unix.IN_ALL_EVENTS
|
||||
const MaximumWatchersFile = "/proc/sys/fs/inotify/max_user_watches"
|
||||
|
||||
type watcher struct {
|
||||
wd int
|
||||
dir string
|
||||
}
|
||||
|
||||
func newWatcher(fd int, dir string) (*watcher, error) {
|
||||
wd, errno := unix.InotifyAddWatch(fd, dir, events)
|
||||
if wd == -1 {
|
||||
return nil, fmt.Errorf("adding watcher on %s: %d", dir, errno)
|
||||
}
|
||||
return &watcher{
|
||||
wd: wd,
|
||||
dir: dir,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func getLimit() (int, error) {
|
||||
b, err := ioutil.ReadFile(MaximumWatchersFile)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("reading from %s: %v", MaximumWatchersFile, err)
|
||||
}
|
||||
|
||||
s := strings.TrimSpace(string(b))
|
||||
m, err := strconv.Atoi(s)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("converting to integer: %v", err)
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
@@ -14,22 +14,22 @@ type Logger interface {
|
||||
Eventf(format string, v ...interface{})
|
||||
}
|
||||
|
||||
type InotifyWatcher interface {
|
||||
Setup(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error)
|
||||
type FSWatcher interface {
|
||||
Start(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error)
|
||||
}
|
||||
|
||||
type ProcfsScanner interface {
|
||||
Setup(triggerCh chan struct{}, interval time.Duration) (chan string, error)
|
||||
}
|
||||
|
||||
func Start(cfg config.Config, logger Logger, inotify InotifyWatcher, pscan ProcfsScanner, sigCh chan os.Signal) (chan struct{}, error) {
|
||||
func Start(cfg config.Config, logger Logger, inotify FSWatcher, pscan ProcfsScanner, sigCh chan os.Signal) (chan struct{}, error) {
|
||||
logger.Infof("Config: %+v\n", cfg)
|
||||
|
||||
// log all errors
|
||||
errCh := make(chan error, 10)
|
||||
go logErrors(errCh, logger)
|
||||
|
||||
triggerCh, fsEventCh, err := inotify.Setup(cfg.RDirs, cfg.Dirs, errCh)
|
||||
triggerCh, fsEventCh, err := inotify.Start(cfg.RDirs, cfg.Dirs, errCh)
|
||||
if err != nil {
|
||||
logger.Errorf("Can't set up inotify watchers: %v\n", err)
|
||||
return nil, errors.New("inotify error")
|
||||
|
||||
@@ -104,7 +104,7 @@ func newMockInotifyWatcher(setupErr error) *mockInotifyWatcher {
|
||||
}
|
||||
}
|
||||
|
||||
func (i *mockInotifyWatcher) Setup(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) {
|
||||
func (i *mockInotifyWatcher) Start(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) {
|
||||
if i.setupErr != nil {
|
||||
return nil, nil, i.setupErr
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user