From 1deb4838a5c4c92f550a8c0e82fb79d3310b0ca8 Mon Sep 17 00:00:00 2001 From: Dominic Breuker Date: Tue, 27 Feb 2018 09:56:05 +0100 Subject: [PATCH] add tests for fswatcher package --- cmd/root.go | 2 +- internal/fswatcher/event.go | 6 +- internal/fswatcher/fswatcher.go | 105 +++++++++++++++++ internal/fswatcher/fswatcher_test.go | 127 +++++++++++++++++++++ internal/fswatcher/inotify.go | 58 ---------- internal/fswatcher/inotify/inotify.go | 34 ++++-- internal/fswatcher/inotify/inotify_test.go | 4 +- internal/fswatcher/observer.go | 3 +- internal/fswatcher/setup.go | 72 ------------ internal/fswatcher/walker/walker.go | 8 +- internal/fswatcher/walker/walker_test.go | 3 +- internal/fswatcher/watcher.go | 44 ------- internal/pspy/pspy.go | 8 +- internal/pspy/pspy_test.go | 2 +- 14 files changed, 277 insertions(+), 199 deletions(-) create mode 100644 internal/fswatcher/fswatcher.go create mode 100644 internal/fswatcher/fswatcher_test.go delete mode 100644 internal/fswatcher/inotify.go delete mode 100644 internal/fswatcher/setup.go delete mode 100644 internal/fswatcher/watcher.go diff --git a/cmd/root.go b/cmd/root.go index 78d26bc..7ff9989 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -74,7 +74,7 @@ func root(cmd *cobra.Command, args []string) { LogPS: logPS, LogFS: logFS, } - iw, err := fswatcher.NewInotifyWatcher() + iw, err := fswatcher.NewFSWatcher() if err != nil { logger.Errorf("Can't initialize fswatcher: %v", err) os.Exit(1) diff --git a/internal/fswatcher/event.go b/internal/fswatcher/event.go index a2c7092..19a9285 100644 --- a/internal/fswatcher/event.go +++ b/internal/fswatcher/event.go @@ -2,20 +2,18 @@ package fswatcher import ( "fmt" - - "github.com/dominicbreuker/pspy/internal/fswatcher/inotify" ) -func parseEvents(i *inotify.Inotify, dataCh chan []byte, eventCh chan string, errCh chan error) { +func parseEvents(i Inotify, dataCh chan []byte, eventCh chan string, errCh chan error) { for buf := range dataCh { var ptr uint32 for len(buf[ptr:]) > 0 { event, size, err := i.ParseNextEvent(buf[ptr:]) + ptr += size if err != nil { errCh <- fmt.Errorf("parsing events: %v", err) continue } - ptr += size eventCh <- fmt.Sprintf("%20s | %s", event.Op, event.Name) } } diff --git a/internal/fswatcher/fswatcher.go b/internal/fswatcher/fswatcher.go new file mode 100644 index 0000000..933fdbb --- /dev/null +++ b/internal/fswatcher/fswatcher.go @@ -0,0 +1,105 @@ +package fswatcher + +import ( + "fmt" + + "github.com/dominicbreuker/pspy/internal/fswatcher/inotify" + "github.com/dominicbreuker/pspy/internal/fswatcher/walker" +) + +type Inotify interface { + Init() error + Watch(dir string) error + NumWatchers() int + Read(buf []byte) (int, error) + ParseNextEvent(buf []byte) (*inotify.Event, uint32, error) + Close() error +} + +type Walker interface { + Walk(dir string, depth int) (chan string, chan error, chan struct{}) +} + +type FSWatcher struct { + i Inotify + w Walker + maxWatchers int +} + +func NewFSWatcher() (*FSWatcher, error) { + return &FSWatcher{ + i: inotify.NewInotify(), + w: walker.NewWalker(), + maxWatchers: inotify.MaxWatchers, + }, nil +} + +func (iw *FSWatcher) Close() { + iw.i.Close() +} + +func (fs *FSWatcher) Init(rdirs, dirs []string) (chan error, chan struct{}) { + errCh := make(chan error) + doneCh := make(chan struct{}) + + go func() { + err := fs.i.Init() + if err != nil { + errCh <- fmt.Errorf("setting up inotify: %v", err) + } + for _, dir := range rdirs { + addWatchers(dir, -1, fs.i, fs.maxWatchers, fs.w, errCh) + } + for _, dir := range dirs { + addWatchers(dir, 0, fs.i, fs.maxWatchers, fs.w, errCh) + } + close(doneCh) + }() + + return errCh, doneCh +} + +func addWatchers(dir string, depth int, i Inotify, maxWatchers int, w Walker, errCh chan error) { + dirCh, walkErrCh, doneCh := w.Walk(dir, depth) +loop: + for { + if maxWatchers > 0 && i.NumWatchers() >= maxWatchers { + close(doneCh) + break loop + } + select { + case err := <-walkErrCh: + errCh <- fmt.Errorf("adding inotift watchers: %v", err) + case dir, ok := <-dirCh: + if !ok { + break loop + } + if err := i.Watch(dir); err != nil { + errCh <- fmt.Errorf("Can't create watcher: %v", err) + } + } + } +} + +func (fs *FSWatcher) Start(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) { + err := fs.i.Init() + if err != nil { + return nil, nil, fmt.Errorf("setting up inotify: %v", err) + } + + for _, dir := range rdirs { + addWatchers(dir, -1, fs.i, fs.maxWatchers, fs.w, errCh) + } + for _, dir := range dirs { + addWatchers(dir, 0, fs.i, fs.maxWatchers, fs.w, errCh) + } + + triggerCh := make(chan struct{}) + dataCh := make(chan []byte) + go Observe(fs.i, triggerCh, dataCh, errCh) + + eventCh := make(chan string) + go parseEvents(fs.i, dataCh, eventCh, errCh) + + return triggerCh, eventCh, nil +} diff --git a/internal/fswatcher/fswatcher_test.go b/internal/fswatcher/fswatcher_test.go new file mode 100644 index 0000000..b1c52cb --- /dev/null +++ b/internal/fswatcher/fswatcher_test.go @@ -0,0 +1,127 @@ +package fswatcher + +import ( + "errors" + "reflect" + "testing" + "time" + + "github.com/dominicbreuker/pspy/internal/fswatcher/inotify" +) + +func TestInit(t *testing.T) { + i := NewMockInotify() + w := &MockWalker{ + subdirs: map[string][]string{ + "mydir1": []string{"dir1", "dir2"}, + "mydir2": []string{"dir3"}, + "dir1": []string{"another-dir"}, + }, + } + fs := &FSWatcher{ + i: i, + w: w, + maxWatchers: 999, + } + rdirs := []string{"mydir1"} + dirs := []string{"mydir2"} + + errCh, doneCh := fs.Init(rdirs, dirs) + +loop: + for { + select { + case <-doneCh: + break loop + case err := <-errCh: + t.Errorf("Unexpected error: %v", err) + case <-time.After(1 * time.Second): + t.Fatalf("Test timeout") + } + } + + if !reflect.DeepEqual(i.watching, []string{"mydir1", "dir1", "another-dir", "dir2", "mydir2"}) { + t.Fatalf("Watching wrong directories: %+v", i.watching) + } +} + +// mocks + +// Mock Inotify + +type MockInotify struct { + initialized bool + watching []string +} + +func NewMockInotify() *MockInotify { + return &MockInotify{ + initialized: false, + watching: make([]string, 0), + } +} + +func (i *MockInotify) Init() error { + i.initialized = true + return nil +} + +func (i *MockInotify) Watch(dir string) error { + if !i.initialized { + return errors.New("Not yet initialized") + } + i.watching = append(i.watching, dir) + return nil +} + +func (i *MockInotify) NumWatchers() int { + return len(i.watching) +} + +func (i *MockInotify) Read(buf []byte) (int, error) { + return 32, nil +} + +func (i *MockInotify) ParseNextEvent(buf []byte) (*inotify.Event, uint32, error) { + return &inotify.Event{Name: "name", Op: "CREATE"}, 32, nil +} + +func (i *MockInotify) Close() error { + if !i.initialized { + return errors.New("Not yet initialized") + } + return nil +} + +// Mock Walker + +type MockWalker struct { + subdirs map[string][]string +} + +func (w *MockWalker) Walk(dir string, depth int) (chan string, chan error, chan struct{}) { + dirCh := make(chan string) + errCh := make(chan error) + doneCh := make(chan struct{}) + + go func() { + defer close(dirCh) + sendDir(w, depth, dir, dirCh) + }() + + return dirCh, errCh, doneCh +} + +func sendDir(w *MockWalker, depth int, dir string, dirCh chan string) { + dirCh <- dir + if depth == 0 { + return + } + subdirs, ok := w.subdirs[dir] + if !ok { + return + } + for _, sdir := range subdirs { + sendDir(w, depth-1, sdir, dirCh) + } +} diff --git a/internal/fswatcher/inotify.go b/internal/fswatcher/inotify.go deleted file mode 100644 index 432d25f..0000000 --- a/internal/fswatcher/inotify.go +++ /dev/null @@ -1,58 +0,0 @@ -package fswatcher - -import ( - "fmt" - - "golang.org/x/sys/unix" -) - -type Inotify struct { - fd int - watchers map[int]*watcher -} - -func NewInotify() (*Inotify, error) { - fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC) - if fd == -1 { - return nil, fmt.Errorf("Can't init inotify: %d", errno) - } - - i := &Inotify{ - fd: fd, - watchers: make(map[int]*watcher), - } - - return i, nil -} - -func (i *Inotify) Watch(dir string) error { - w, err := newWatcher(i.fd, dir) - if err != nil { - return fmt.Errorf("creating watcher: %v", err) - } - i.watchers[w.wd] = w - return nil -} - -func (i *Inotify) Close() error { - if err := unix.Close(i.fd); err != nil { - return fmt.Errorf("closing inotify file descriptor: %v", err) - } - return nil -} - -func (i *Inotify) NumWatchers() int { - return len(i.watchers) -} - -func (i *Inotify) String() string { - if len(i.watchers) < 20 { - dirs := make([]string, 0) - for _, w := range i.watchers { - dirs = append(dirs, w.dir) - } - return fmt.Sprintf("Watching: %v", dirs) - } else { - return fmt.Sprintf("Watching %d directories", len(i.watchers)) - } -} diff --git a/internal/fswatcher/inotify/inotify.go b/internal/fswatcher/inotify/inotify.go index 549b251..006f647 100644 --- a/internal/fswatcher/inotify/inotify.go +++ b/internal/fswatcher/inotify/inotify.go @@ -13,6 +13,17 @@ import ( const maximumWatchersFile = "/proc/sys/fs/inotify/max_user_watches" +// MaxWatchers is the maximum number of inotify watches supported by the Kernel +// set to -1 if the number cannot be determined +var MaxWatchers int = -1 + +func init() { + mw, err := getMaxWatchers() + if err == nil { + MaxWatchers = mw + } +} + type Inotify struct { FD int Watchers map[int]*Watcher @@ -28,17 +39,20 @@ type Event struct { Op string } -func NewInotify() (*Inotify, error) { - fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC) - if fd < 0 { - return nil, fmt.Errorf("initializing inotify: errno: %d", errno) - } - - i := &Inotify{ - FD: fd, +func NewInotify() *Inotify { + return &Inotify{ + FD: 0, Watchers: make(map[int]*Watcher), } - return i, nil +} + +func (i *Inotify) Init() error { + fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC) + if fd < 0 { + return fmt.Errorf("initializing inotify: errno: %d", errno) + } + i.FD = fd + return nil } func (i *Inotify) Watch(dir string) error { @@ -112,7 +126,7 @@ func (i *Inotify) String() string { } } -func GetMaxWatchers() (int, error) { +func getMaxWatchers() (int, error) { b, err := ioutil.ReadFile(maximumWatchersFile) if err != nil { return 0, fmt.Errorf("reading from %s: %v", maximumWatchersFile, err) diff --git a/internal/fswatcher/inotify/inotify_test.go b/internal/fswatcher/inotify/inotify_test.go index dbfc068..7b36006 100644 --- a/internal/fswatcher/inotify/inotify_test.go +++ b/internal/fswatcher/inotify/inotify_test.go @@ -9,7 +9,9 @@ import ( ) func TestInotify(t *testing.T) { - i, err := NewInotify() + i := NewInotify() + + err := i.Init() expectNoError(t, err) err = i.Watch("testdata/folder") diff --git a/internal/fswatcher/observer.go b/internal/fswatcher/observer.go index 3b2771f..15c5d19 100644 --- a/internal/fswatcher/observer.go +++ b/internal/fswatcher/observer.go @@ -1,11 +1,10 @@ package fswatcher import ( - "github.com/dominicbreuker/pspy/internal/fswatcher/inotify" "golang.org/x/sys/unix" ) -func Observe(i *inotify.Inotify, triggerCh chan struct{}, dataCh chan []byte, errCh chan error) { +func Observe(i Inotify, triggerCh chan struct{}, dataCh chan []byte, errCh chan error) { buf := make([]byte, 5*unix.SizeofInotifyEvent) for { diff --git a/internal/fswatcher/setup.go b/internal/fswatcher/setup.go deleted file mode 100644 index 44b973e..0000000 --- a/internal/fswatcher/setup.go +++ /dev/null @@ -1,72 +0,0 @@ -package fswatcher - -import ( - "fmt" - - "github.com/dominicbreuker/pspy/internal/fswatcher/inotify" - "github.com/dominicbreuker/pspy/internal/fswatcher/walker" -) - -type InotifyWatcher struct { - i *inotify.Inotify -} - -func (iw *InotifyWatcher) Close() { - iw.i.Close() -} - -func NewInotifyWatcher() (*InotifyWatcher, error) { - i, err := inotify.NewInotify() - if err != nil { - return nil, fmt.Errorf("setting up inotify: %v", err) - } - return &InotifyWatcher{ - i: i, - }, nil -} - -func (iw *InotifyWatcher) Setup(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) { - maxWatchers, err := inotify.GetMaxWatchers() - if err != nil { - errCh <- fmt.Errorf("Can't get inotify watcher limit...: %v\n", err) - maxWatchers = -1 - } - - for _, dir := range rdirs { - addWatchers(dir, -1, iw.i, maxWatchers, errCh) - } - for _, dir := range dirs { - addWatchers(dir, 0, iw.i, maxWatchers, errCh) - } - - triggerCh := make(chan struct{}) - dataCh := make(chan []byte) - go Observe(iw.i, triggerCh, dataCh, errCh) - - eventCh := make(chan string) - go parseEvents(iw.i, dataCh, eventCh, errCh) - - return triggerCh, eventCh, nil -} - -func addWatchers(dir string, depth int, i *inotify.Inotify, maxWatchers int, errCh chan error) { - dirCh, walkErrCh, doneCh := walker.Walk(dir, depth) -loop: - for { - if maxWatchers > 0 && i.NumWatchers() >= maxWatchers { - close(doneCh) - break loop - } - select { - case err := <-walkErrCh: - errCh <- fmt.Errorf("adding inotift watchers: %v", err) - case dir, ok := <-dirCh: - if !ok { - break loop - } - if err := i.Watch(dir); err != nil { - errCh <- fmt.Errorf("Can't create watcher: %v", err) - } - } - } -} diff --git a/internal/fswatcher/walker/walker.go b/internal/fswatcher/walker/walker.go index 93128f2..8d8a07f 100644 --- a/internal/fswatcher/walker/walker.go +++ b/internal/fswatcher/walker/walker.go @@ -7,9 +7,15 @@ import ( "path/filepath" ) +type Walker struct{} + +func NewWalker() *Walker { + return &Walker{} +} + const maxInt = int(^uint(0) >> 1) -func Walk(root string, depth int) (dirCh chan string, errCh chan error, doneCh chan struct{}) { +func (w *Walker) Walk(root string, depth int) (dirCh chan string, errCh chan error, doneCh chan struct{}) { if depth < 0 { depth = maxInt } diff --git a/internal/fswatcher/walker/walker_test.go b/internal/fswatcher/walker/walker_test.go index 76419a2..5f91be8 100644 --- a/internal/fswatcher/walker/walker_test.go +++ b/internal/fswatcher/walker/walker_test.go @@ -39,7 +39,8 @@ func TestWalk(t *testing.T) { } for i, tt := range tests { - dirCh, errCh, doneCh := Walk(tt.root, tt.depth) + w := &Walker{} + dirCh, errCh, doneCh := w.Walk(tt.root, tt.depth) dirs, errs := getAllDirsAndErrors(dirCh, errCh) if !reflect.DeepEqual(dirs, tt.result) { diff --git a/internal/fswatcher/watcher.go b/internal/fswatcher/watcher.go deleted file mode 100644 index c02080a..0000000 --- a/internal/fswatcher/watcher.go +++ /dev/null @@ -1,44 +0,0 @@ -package fswatcher - -import ( - "fmt" - "io/ioutil" - "strconv" - "strings" - - "golang.org/x/sys/unix" -) - -const events = unix.IN_ALL_EVENTS -const MaximumWatchersFile = "/proc/sys/fs/inotify/max_user_watches" - -type watcher struct { - wd int - dir string -} - -func newWatcher(fd int, dir string) (*watcher, error) { - wd, errno := unix.InotifyAddWatch(fd, dir, events) - if wd == -1 { - return nil, fmt.Errorf("adding watcher on %s: %d", dir, errno) - } - return &watcher{ - wd: wd, - dir: dir, - }, nil -} - -func getLimit() (int, error) { - b, err := ioutil.ReadFile(MaximumWatchersFile) - if err != nil { - return 0, fmt.Errorf("reading from %s: %v", MaximumWatchersFile, err) - } - - s := strings.TrimSpace(string(b)) - m, err := strconv.Atoi(s) - if err != nil { - return 0, fmt.Errorf("converting to integer: %v", err) - } - - return m, nil -} diff --git a/internal/pspy/pspy.go b/internal/pspy/pspy.go index 95b57f9..f2dcffe 100644 --- a/internal/pspy/pspy.go +++ b/internal/pspy/pspy.go @@ -14,22 +14,22 @@ type Logger interface { Eventf(format string, v ...interface{}) } -type InotifyWatcher interface { - Setup(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) +type FSWatcher interface { + Start(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) } type ProcfsScanner interface { Setup(triggerCh chan struct{}, interval time.Duration) (chan string, error) } -func Start(cfg config.Config, logger Logger, inotify InotifyWatcher, pscan ProcfsScanner, sigCh chan os.Signal) (chan struct{}, error) { +func Start(cfg config.Config, logger Logger, inotify FSWatcher, pscan ProcfsScanner, sigCh chan os.Signal) (chan struct{}, error) { logger.Infof("Config: %+v\n", cfg) // log all errors errCh := make(chan error, 10) go logErrors(errCh, logger) - triggerCh, fsEventCh, err := inotify.Setup(cfg.RDirs, cfg.Dirs, errCh) + triggerCh, fsEventCh, err := inotify.Start(cfg.RDirs, cfg.Dirs, errCh) if err != nil { logger.Errorf("Can't set up inotify watchers: %v\n", err) return nil, errors.New("inotify error") diff --git a/internal/pspy/pspy_test.go b/internal/pspy/pspy_test.go index 54d0b45..a1b58a0 100644 --- a/internal/pspy/pspy_test.go +++ b/internal/pspy/pspy_test.go @@ -104,7 +104,7 @@ func newMockInotifyWatcher(setupErr error) *mockInotifyWatcher { } } -func (i *mockInotifyWatcher) Setup(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) { +func (i *mockInotifyWatcher) Start(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) { if i.setupErr != nil { return nil, nil, i.setupErr }