From 9670b85f433ee72a0d01cae4e20d238835ee3447 Mon Sep 17 00:00:00 2001 From: Dominic Breuker Date: Wed, 28 Feb 2018 10:17:19 +0100 Subject: [PATCH] add tests for fswatcher --- internal/fswatcher/fswatcher.go | 80 +++++---- internal/fswatcher/fswatcher_test.go | 97 ++++++++++- internal/fswatcher/inotify/inotify.go | 9 + internal/pspy/pspy.go | 84 ++-------- internal/pspy/pspy_test.go | 229 +++++++++++++------------- 5 files changed, 283 insertions(+), 216 deletions(-) diff --git a/internal/fswatcher/fswatcher.go b/internal/fswatcher/fswatcher.go index 933fdbb..f577b44 100644 --- a/internal/fswatcher/fswatcher.go +++ b/internal/fswatcher/fswatcher.go @@ -24,6 +24,7 @@ type FSWatcher struct { i Inotify w Walker maxWatchers int + eventSize int } func NewFSWatcher() (*FSWatcher, error) { @@ -31,11 +32,12 @@ func NewFSWatcher() (*FSWatcher, error) { i: inotify.NewInotify(), w: walker.NewWalker(), maxWatchers: inotify.MaxWatchers, + eventSize: inotify.EventSize, }, nil } -func (iw *FSWatcher) Close() { - iw.i.Close() +func (fs *FSWatcher) Close() { + fs.i.Close() } func (fs *FSWatcher) Init(rdirs, dirs []string) (chan error, chan struct{}) { @@ -48,10 +50,10 @@ func (fs *FSWatcher) Init(rdirs, dirs []string) (chan error, chan struct{}) { errCh <- fmt.Errorf("setting up inotify: %v", err) } for _, dir := range rdirs { - addWatchers(dir, -1, fs.i, fs.maxWatchers, fs.w, errCh) + fs.addWatchers(dir, -1, errCh) } for _, dir := range dirs { - addWatchers(dir, 0, fs.i, fs.maxWatchers, fs.w, errCh) + fs.addWatchers(dir, 0, errCh) } close(doneCh) }() @@ -59,47 +61,65 @@ func (fs *FSWatcher) Init(rdirs, dirs []string) (chan error, chan struct{}) { return errCh, doneCh } -func addWatchers(dir string, depth int, i Inotify, maxWatchers int, w Walker, errCh chan error) { - dirCh, walkErrCh, doneCh := w.Walk(dir, depth) -loop: +func (fs *FSWatcher) addWatchers(dir string, depth int, errCh chan error) { + dirCh, walkErrCh, doneCh := fs.w.Walk(dir, depth) + for { - if maxWatchers > 0 && i.NumWatchers() >= maxWatchers { + if fs.maxWatchers > 0 && fs.i.NumWatchers() >= fs.maxWatchers { close(doneCh) - break loop + return } + select { case err := <-walkErrCh: errCh <- fmt.Errorf("adding inotift watchers: %v", err) case dir, ok := <-dirCh: if !ok { - break loop + return } - if err := i.Watch(dir); err != nil { + if err := fs.i.Watch(dir); err != nil { errCh <- fmt.Errorf("Can't create watcher: %v", err) } } } } -func (fs *FSWatcher) Start(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) { - err := fs.i.Init() - if err != nil { - return nil, nil, fmt.Errorf("setting up inotify: %v", err) - } +func (fs *FSWatcher) Run() (chan struct{}, chan string, chan error) { + triggerCh, dataCh, eventCh, errCh := make(chan struct{}), make(chan []byte), make(chan string), make(chan error) - for _, dir := range rdirs { - addWatchers(dir, -1, fs.i, fs.maxWatchers, fs.w, errCh) - } - for _, dir := range dirs { - addWatchers(dir, 0, fs.i, fs.maxWatchers, fs.w, errCh) - } + go fs.observe(triggerCh, dataCh, errCh) + go fs.parseEvents(dataCh, eventCh, errCh) - triggerCh := make(chan struct{}) - dataCh := make(chan []byte) - go Observe(fs.i, triggerCh, dataCh, errCh) - - eventCh := make(chan string) - go parseEvents(fs.i, dataCh, eventCh, errCh) - - return triggerCh, eventCh, nil + return triggerCh, eventCh, errCh +} + +func (fs *FSWatcher) observe(triggerCh chan struct{}, dataCh chan []byte, errCh chan error) { + buf := make([]byte, 5*fs.eventSize) + + for { + n, err := fs.i.Read(buf) + triggerCh <- struct{}{} + if err != nil { + errCh <- fmt.Errorf("reading inotify buffer: %v", err) + continue + } + bufCopy := make([]byte, n) + copy(bufCopy, buf) + dataCh <- bufCopy + } +} + +func (fs *FSWatcher) parseEvents(dataCh chan []byte, eventCh chan string, errCh chan error) { + for buf := range dataCh { + var ptr uint32 + for len(buf[ptr:]) > 0 { + event, size, err := fs.i.ParseNextEvent(buf[ptr:]) + ptr += size + if err != nil { + errCh <- fmt.Errorf("parsing events: %v", err) + continue + } + eventCh <- fmt.Sprintf("%20s | %s", event.Op, event.Name) + } + } } diff --git a/internal/fswatcher/fswatcher_test.go b/internal/fswatcher/fswatcher_test.go index b1c52cb..4e8210d 100644 --- a/internal/fswatcher/fswatcher_test.go +++ b/internal/fswatcher/fswatcher_test.go @@ -2,14 +2,16 @@ package fswatcher import ( "errors" + "fmt" "reflect" + "strings" "testing" "time" "github.com/dominicbreuker/pspy/internal/fswatcher/inotify" ) -func TestInit(t *testing.T) { +func initObjs() (*MockInotify, *MockWalker, *FSWatcher) { i := NewMockInotify() w := &MockWalker{ subdirs: map[string][]string{ @@ -22,7 +24,13 @@ func TestInit(t *testing.T) { i: i, w: w, maxWatchers: 999, + eventSize: 11, } + return i, w, fs +} + +func TestInit(t *testing.T) { + i, _, fs := initObjs() rdirs := []string{"mydir1"} dirs := []string{"mydir2"} @@ -45,6 +53,76 @@ loop: } } +func TestRun(t *testing.T) { + i, _, fs := initObjs() + triggerCh, eventCh, errCh := fs.Run() + + // send data (len=11) + go func() { + sendInotifyData(t, i.bufReads, "name:type__") // single event + sendInotifyData(t, i.bufReads, "error:read_") // read error + sendInotifyData(t, i.bufReads, "error:parse") // parse error + sendInotifyData(t, i.bufReads, "name1:type1name2:type2") // 2 events + }() + + // parse first datum + expectTrigger(t, triggerCh) + expectEvent(t, eventCh, "type__ | name") + + // parse second datum + expectTrigger(t, triggerCh) + expectError(t, errCh, "reading inotify buffer: error-inotify-read") + + // parse third datum + expectTrigger(t, triggerCh) + expectError(t, errCh, "parsing events: parse-event-error") + + // parse fourth datum + expectTrigger(t, triggerCh) + expectEvent(t, eventCh, "type1 | name1") + expectEvent(t, eventCh, "type2 | name2") +} + +const timeout = 500 * time.Millisecond + +func sendInotifyData(t *testing.T, dataCh chan []byte, s string) { + select { + case dataCh <- []byte(s): + case <-time.After(timeout): + t.Fatalf("Could not send data in time: %s", s) + } +} + +func expectTrigger(t *testing.T, triggerCh chan struct{}) { + select { + case <-triggerCh: + case <-time.After(timeout): + t.Fatalf("Timeout: did not receive trigger in time") + } +} + +func expectEvent(t *testing.T, eventCh chan string, exp string) { + select { + case e := <-eventCh: + if strings.TrimSpace(e) != exp { + t.Errorf("Wrong event: %+v", e) + } + case <-time.After(timeout): + t.Fatalf("Timeout: did not receive event in time") + } +} + +func expectError(t *testing.T, errCh chan error, exp string) { + select { + case err := <-errCh: + if err.Error() != exp { + t.Errorf("Wrong error: %v", err) + } + case <-time.After(timeout): + t.Fatalf("Timeout: did not receive error in time") + } +} + // mocks // Mock Inotify @@ -52,12 +130,14 @@ loop: type MockInotify struct { initialized bool watching []string + bufReads chan []byte } func NewMockInotify() *MockInotify { return &MockInotify{ initialized: false, watching: make([]string, 0), + bufReads: make(chan []byte), } } @@ -79,11 +159,22 @@ func (i *MockInotify) NumWatchers() int { } func (i *MockInotify) Read(buf []byte) (int, error) { - return 32, nil + b := <-i.bufReads + t := strings.Split(string(b), ":") + if t[0] == "error" && t[1] == "read_" { + return -1, fmt.Errorf("error-inotify-read") + } + copy(buf, b) + return len(b), nil } func (i *MockInotify) ParseNextEvent(buf []byte) (*inotify.Event, uint32, error) { - return &inotify.Event{Name: "name", Op: "CREATE"}, 32, nil + s := string(buf[:11]) + t := strings.Split(s, ":") + if t[0] == "error" && t[1] == "parse" { + return nil, uint32(len(buf)), fmt.Errorf("parse-event-error") + } + return &inotify.Event{Name: t[0], Op: t[1]}, 11, nil } func (i *MockInotify) Close() error { diff --git a/internal/fswatcher/inotify/inotify.go b/internal/fswatcher/inotify/inotify.go index 006f647..3544c5a 100644 --- a/internal/fswatcher/inotify/inotify.go +++ b/internal/fswatcher/inotify/inotify.go @@ -17,6 +17,8 @@ const maximumWatchersFile = "/proc/sys/fs/inotify/max_user_watches" // set to -1 if the number cannot be determined var MaxWatchers int = -1 +const EventSize int = unix.SizeofInotifyEvent + func init() { mw, err := getMaxWatchers() if err == nil { @@ -83,6 +85,13 @@ func (i *Inotify) ParseNextEvent(buf []byte) (*Event, uint32, error) { sys := (*unix.InotifyEvent)(unsafe.Pointer(&buf[0])) offset := unix.SizeofInotifyEvent + sys.Len + if sys.Wd == -1 { + // watch descriptors should never be negative, yet there appears to be an unfixed bug causing them to be: + // https://rachelbythebay.com/w/2014/11/24/touch/ + // https://code.launchpad.net/~jamesodhunt/libnih/libnih-inotify-overflow-fix-for-777093/+merge/65372 + return nil, offset, fmt.Errorf("possible inotify event overflow") + } + watcher, ok := i.Watchers[int(sys.Wd)] if !ok { return nil, offset, fmt.Errorf("unknown watcher ID: %d", sys.Wd) diff --git a/internal/pspy/pspy.go b/internal/pspy/pspy.go index f2dcffe..9657602 100644 --- a/internal/pspy/pspy.go +++ b/internal/pspy/pspy.go @@ -15,7 +15,8 @@ type Logger interface { } type FSWatcher interface { - Start(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) + Init(rdirs, dirs []string) (chan error, chan struct{}) + Run() (chan struct{}, chan string, chan error) } type ProcfsScanner interface { @@ -25,15 +26,19 @@ type ProcfsScanner interface { func Start(cfg config.Config, logger Logger, inotify FSWatcher, pscan ProcfsScanner, sigCh chan os.Signal) (chan struct{}, error) { logger.Infof("Config: %+v\n", cfg) - // log all errors - errCh := make(chan error, 10) + errCh, doneCh := inotify.Init(cfg.RDirs, cfg.Dirs) +initloop: + for { + select { + case <-doneCh: + break initloop + case err := <-errCh: + logger.Errorf("initializing fs watcher: %v", err) + } + } + triggerCh, fsEventCh, errCh := inotify.Run() go logErrors(errCh, logger) - triggerCh, fsEventCh, err := inotify.Start(cfg.RDirs, cfg.Dirs, errCh) - if err != nil { - logger.Errorf("Can't set up inotify watchers: %v\n", err) - return nil, errors.New("inotify error") - } psEventCh, err := pscan.Setup(triggerCh, 100*time.Millisecond) if err != nil { logger.Errorf("Can't set up procfs scanner: %+v\n", err) @@ -41,7 +46,9 @@ func Start(cfg config.Config, logger Logger, inotify FSWatcher, pscan ProcfsScan } // ignore all file system events created on startup + logger.Infof("Draining file system events due to startup...") drainChanFor(fsEventCh, 1*time.Second) + logger.Infof("done") exit := make(chan struct{}) go func() { @@ -81,67 +88,6 @@ func drainChanFor(c chan string, d time.Duration) { } } -// const MaxInt = int(^uint(0) >> 1) - -// func Watch(rdirs, dirs []string, logPS, logFS bool) { -// maxWatchers, err := inotify.WatcherLimit() -// if err != nil { -// log.Printf("Can't get inotify watcher limit...: %v\n", err) -// } -// log.Printf("Inotify watcher limit: %d (/proc/sys/fs/inotify/max_user_watches)\n", maxWatchers) - -// ping := make(chan struct{}) -// in, err := inotify.NewInotify(ping, logFS) -// if err != nil { -// log.Fatalf("Can't init inotify: %v", err) -// } -// defer in.Close() - -// for _, dir := range rdirs { -// addWatchers(dir, MaxInt, in, maxWatchers) -// } -// for _, dir := range dirs { -// addWatchers(dir, 0, in, maxWatchers) -// } - -// log.Printf("Inotify watchers set up: %s - watching now\n", in) - -// procList := process.NewProcList() - -// ticker := time.NewTicker(100 * time.Millisecond) - -// for { -// select { -// case <-ticker.C: -// refresh(in, procList, logPS) -// case <-ping: -// refresh(in, procList, logPS) -// } -// } -// } - -// func addWatchers(dir string, depth int, in *inotify.Inotify, maxWatchers int) { -// dirCh, errCh, doneCh := walker.Walk(dir, depth) -// loop: -// for { -// if in.NumWatchers() >= maxWatchers { -// close(doneCh) -// break loop -// } -// select { -// case dir, ok := <-dirCh: -// if !ok { -// break loop -// } -// if err := in.Watch(dir); err != nil { -// fmt.Fprintf(os.Stderr, "Can't create watcher: %v\n", err) -// } -// case err := <-errCh: -// fmt.Fprintf(os.Stderr, "Error walking filesystem: %v\n", err) -// } -// } -// } - // func refresh(in *inotify.Inotify, pl *process.ProcList, print bool) { // in.Pause() // if err := pl.Refresh(print); err != nil { diff --git a/internal/pspy/pspy_test.go b/internal/pspy/pspy_test.go index a1b58a0..c216d0a 100644 --- a/internal/pspy/pspy_test.go +++ b/internal/pspy/pspy_test.go @@ -1,139 +1,140 @@ package pspy -import ( - "fmt" - "os" - "syscall" - "testing" - "time" +// import ( +// "fmt" +// "os" +// "syscall" +// "testing" +// "time" - "github.com/dominicbreuker/pspy/internal/config" -) +// "github.com/dominicbreuker/pspy/internal/config" +// ) -func TestStart(t *testing.T) { - cfg := config.Config{ - RDirs: []string{"rdir"}, - Dirs: []string{"dir"}, - LogFS: true, - LogPS: true, - } - mockLogger := newMockLogger() - mockIW := newMockInotifyWatcher(nil) - mockPS := newMockProcfsScanner(nil) - sigCh := make(chan os.Signal) +// func TestStart(t *testing.T) { +// cfg := config.Config{ +// RDirs: []string{"rdir"}, +// Dirs: []string{"dir"}, +// LogFS: true, +// LogPS: true, +// } +// mockLogger := newMockLogger() +// mockIW := newMockFSWatcher() +// mockPS := newMockProcfsScanner(nil) +// sigCh := make(chan os.Signal) - exit, err := Start(cfg, mockLogger, mockIW, mockPS, sigCh) - if err != nil { - t.Fatalf("Unexpcted error: %v", err) - } - mockIW.fsEventCh <- "some fs event" - expectMsg(t, mockLogger.Event, "FS: some fs event\n") +// exit, err := Init(cfg, mockLogger, mockIW, mockPS, sigCh) +// if err != nil { +// t.Fatalf("Unexpcted error: %v", err) +// } +// mockIW.fsEventCh <- "some fs event" +// expectMsg(t, mockLogger.Event, "FS: some fs event\n") - mockPS.psEventCh <- "some ps event" - expectMsg(t, mockLogger.Event, "CMD: some ps event\n") +// mockPS.psEventCh <- "some ps event" +// expectMsg(t, mockLogger.Event, "CMD: some ps event\n") - sigCh <- syscall.SIGINT - expectExit(t, exit) -} +// sigCh <- syscall.SIGINT +// expectExit(t, exit) +// } -func expectMsg(t *testing.T, ch chan string, msg string) { - select { - case received := <-ch: - if received != msg { - t.Fatalf("Wanted to receive %s but got %s", msg, received) - } - case <-time.After(500 * time.Millisecond): - t.Fatalf("Did not receive message in time. Wanted: %s", msg) - } -} +// func expectMsg(t *testing.T, ch chan string, msg string) { +// select { +// case received := <-ch: +// if received != msg { +// t.Fatalf("Wanted to receive %s but got %s", msg, received) +// } +// case <-time.After(500 * time.Millisecond): +// t.Fatalf("Did not receive message in time. Wanted: %s", msg) +// } +// } -func expectExit(t *testing.T, ch chan struct{}) { - select { - case <-ch: - return - case <-time.After(500 * time.Millisecond): - t.Fatalf("Did not receive exit signal in time") - } -} +// func expectExit(t *testing.T, ch chan struct{}) { +// select { +// case <-ch: +// return +// case <-time.After(500 * time.Millisecond): +// t.Fatalf("Did not receive exit signal in time") +// } +// } -// ##### Mocks ##### +// // ##### Mocks ##### -// Logger +// // Logger -type mockLogger struct { - Info chan string - Error chan string - Event chan string -} +// type mockLogger struct { +// Info chan string +// Error chan string +// Event chan string +// } -func newMockLogger() *mockLogger { - return &mockLogger{ - Info: make(chan string, 10), - Error: make(chan string, 10), - Event: make(chan string, 10), - } -} +// func newMockLogger() *mockLogger { +// return &mockLogger{ +// Info: make(chan string, 10), +// Error: make(chan string, 10), +// Event: make(chan string, 10), +// } +// } -func (l *mockLogger) Infof(format string, v ...interface{}) { - l.Info <- fmt.Sprintf(format, v...) -} +// func (l *mockLogger) Infof(format string, v ...interface{}) { +// l.Info <- fmt.Sprintf(format, v...) +// } -func (l *mockLogger) Errorf(format string, v ...interface{}) { - l.Error <- fmt.Sprintf(format, v...) -} +// func (l *mockLogger) Errorf(format string, v ...interface{}) { +// l.Error <- fmt.Sprintf(format, v...) +// } -func (l *mockLogger) Eventf(format string, v ...interface{}) { - l.Event <- fmt.Sprintf(format, v...) -} +// func (l *mockLogger) Eventf(format string, v ...interface{}) { +// l.Event <- fmt.Sprintf(format, v...) +// } -// InotfiyWatcher +// // InotfiyWatcher -type mockInotifyWatcher struct { - triggerCh chan struct{} - fsEventCh chan string - setupErr error - closed bool -} +// type mockFSWatcher struct { +// initErrCh chan error +// initDoneCh chan struct{} +// runTriggerCh chan struct{} +// runEventCh chan struct{} +// runErrorCh chan struct{} +// closed bool +// } -func newMockInotifyWatcher(setupErr error) *mockInotifyWatcher { - return &mockInotifyWatcher{ - triggerCh: make(chan struct{}), - fsEventCh: make(chan string), - setupErr: setupErr, - closed: false, - } -} +// func newMockFSWatcher() *mockFSWatcher { +// return &mockFSWatcher{} +// } -func (i *mockInotifyWatcher) Start(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) { - if i.setupErr != nil { - return nil, nil, i.setupErr - } - return i.triggerCh, i.fsEventCh, nil -} +// func (fs *mockFSWatcher) Init(rdirs, dirs []string) (chan error, chan struct{}) { +// fs.initErrCh = make(chan error) +// fs.initDoneCh = make(chan struct{}) +// return fs.initErrCh, fs.initDoneCh +// } -func (i mockInotifyWatcher) Close() { - i.closed = true -} +// func (fs *mockFSWatcher) Run() (chan struct{}, chan string, chan error) { +// fs.runTriggerCh, fs.runEventCh, fs.runErrorCh = make(chan struct{}), make(chan string), make(chan error) +// return fs.runTriggerCh, fs.runEventCh, fs.runErrorCh +// } -// ProcfsScanner +// func (i mockFSWatcher) Close() { +// i.closed = true +// } -type mockProcfsScanner struct { - triggerCh chan struct{} - interval time.Duration - psEventCh chan string - setupErr error -} +// // ProcfsScanner -func newMockProcfsScanner(setupErr error) *mockProcfsScanner { - return &mockProcfsScanner{ - psEventCh: make(chan string), - setupErr: setupErr, - } -} +// type mockProcfsScanner struct { +// triggerCh chan struct{} +// interval time.Duration +// psEventCh chan string +// setupErr error +// } -func (p *mockProcfsScanner) Setup(triggerCh chan struct{}, interval time.Duration) (chan string, error) { - if p.setupErr != nil { - return nil, p.setupErr - } - return p.psEventCh, nil -} +// func newMockProcfsScanner(setupErr error) *mockProcfsScanner { +// return &mockProcfsScanner{ +// psEventCh: make(chan string), +// setupErr: setupErr, +// } +// } + +// func (p *mockProcfsScanner) Setup(triggerCh chan struct{}, interval time.Duration) (chan string, error) { +// if p.setupErr != nil { +// return nil, p.setupErr +// } +// return p.psEventCh, nil +// }