add tests for fswatcher

This commit is contained in:
Dominic Breuker
2018-02-28 10:17:19 +01:00
committed by Dominic Breuker
parent 1deb4838a5
commit 9670b85f43
5 changed files with 283 additions and 216 deletions

View File

@@ -24,6 +24,7 @@ type FSWatcher struct {
i Inotify i Inotify
w Walker w Walker
maxWatchers int maxWatchers int
eventSize int
} }
func NewFSWatcher() (*FSWatcher, error) { func NewFSWatcher() (*FSWatcher, error) {
@@ -31,11 +32,12 @@ func NewFSWatcher() (*FSWatcher, error) {
i: inotify.NewInotify(), i: inotify.NewInotify(),
w: walker.NewWalker(), w: walker.NewWalker(),
maxWatchers: inotify.MaxWatchers, maxWatchers: inotify.MaxWatchers,
eventSize: inotify.EventSize,
}, nil }, nil
} }
func (iw *FSWatcher) Close() { func (fs *FSWatcher) Close() {
iw.i.Close() fs.i.Close()
} }
func (fs *FSWatcher) Init(rdirs, dirs []string) (chan error, chan struct{}) { func (fs *FSWatcher) Init(rdirs, dirs []string) (chan error, chan struct{}) {
@@ -48,10 +50,10 @@ func (fs *FSWatcher) Init(rdirs, dirs []string) (chan error, chan struct{}) {
errCh <- fmt.Errorf("setting up inotify: %v", err) errCh <- fmt.Errorf("setting up inotify: %v", err)
} }
for _, dir := range rdirs { for _, dir := range rdirs {
addWatchers(dir, -1, fs.i, fs.maxWatchers, fs.w, errCh) fs.addWatchers(dir, -1, errCh)
} }
for _, dir := range dirs { for _, dir := range dirs {
addWatchers(dir, 0, fs.i, fs.maxWatchers, fs.w, errCh) fs.addWatchers(dir, 0, errCh)
} }
close(doneCh) close(doneCh)
}() }()
@@ -59,47 +61,65 @@ func (fs *FSWatcher) Init(rdirs, dirs []string) (chan error, chan struct{}) {
return errCh, doneCh return errCh, doneCh
} }
func addWatchers(dir string, depth int, i Inotify, maxWatchers int, w Walker, errCh chan error) { func (fs *FSWatcher) addWatchers(dir string, depth int, errCh chan error) {
dirCh, walkErrCh, doneCh := w.Walk(dir, depth) dirCh, walkErrCh, doneCh := fs.w.Walk(dir, depth)
loop:
for { for {
if maxWatchers > 0 && i.NumWatchers() >= maxWatchers { if fs.maxWatchers > 0 && fs.i.NumWatchers() >= fs.maxWatchers {
close(doneCh) close(doneCh)
break loop return
} }
select { select {
case err := <-walkErrCh: case err := <-walkErrCh:
errCh <- fmt.Errorf("adding inotift watchers: %v", err) errCh <- fmt.Errorf("adding inotift watchers: %v", err)
case dir, ok := <-dirCh: case dir, ok := <-dirCh:
if !ok { if !ok {
break loop return
} }
if err := i.Watch(dir); err != nil { if err := fs.i.Watch(dir); err != nil {
errCh <- fmt.Errorf("Can't create watcher: %v", err) errCh <- fmt.Errorf("Can't create watcher: %v", err)
} }
} }
} }
} }
func (fs *FSWatcher) Start(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) { func (fs *FSWatcher) Run() (chan struct{}, chan string, chan error) {
err := fs.i.Init() triggerCh, dataCh, eventCh, errCh := make(chan struct{}), make(chan []byte), make(chan string), make(chan error)
go fs.observe(triggerCh, dataCh, errCh)
go fs.parseEvents(dataCh, eventCh, errCh)
return triggerCh, eventCh, errCh
}
func (fs *FSWatcher) observe(triggerCh chan struct{}, dataCh chan []byte, errCh chan error) {
buf := make([]byte, 5*fs.eventSize)
for {
n, err := fs.i.Read(buf)
triggerCh <- struct{}{}
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("setting up inotify: %v", err) errCh <- fmt.Errorf("reading inotify buffer: %v", err)
continue
}
bufCopy := make([]byte, n)
copy(bufCopy, buf)
dataCh <- bufCopy
}
} }
for _, dir := range rdirs { func (fs *FSWatcher) parseEvents(dataCh chan []byte, eventCh chan string, errCh chan error) {
addWatchers(dir, -1, fs.i, fs.maxWatchers, fs.w, errCh) for buf := range dataCh {
var ptr uint32
for len(buf[ptr:]) > 0 {
event, size, err := fs.i.ParseNextEvent(buf[ptr:])
ptr += size
if err != nil {
errCh <- fmt.Errorf("parsing events: %v", err)
continue
}
eventCh <- fmt.Sprintf("%20s | %s", event.Op, event.Name)
} }
for _, dir := range dirs {
addWatchers(dir, 0, fs.i, fs.maxWatchers, fs.w, errCh)
} }
triggerCh := make(chan struct{})
dataCh := make(chan []byte)
go Observe(fs.i, triggerCh, dataCh, errCh)
eventCh := make(chan string)
go parseEvents(fs.i, dataCh, eventCh, errCh)
return triggerCh, eventCh, nil
} }

View File

@@ -2,14 +2,16 @@ package fswatcher
import ( import (
"errors" "errors"
"fmt"
"reflect" "reflect"
"strings"
"testing" "testing"
"time" "time"
"github.com/dominicbreuker/pspy/internal/fswatcher/inotify" "github.com/dominicbreuker/pspy/internal/fswatcher/inotify"
) )
func TestInit(t *testing.T) { func initObjs() (*MockInotify, *MockWalker, *FSWatcher) {
i := NewMockInotify() i := NewMockInotify()
w := &MockWalker{ w := &MockWalker{
subdirs: map[string][]string{ subdirs: map[string][]string{
@@ -22,7 +24,13 @@ func TestInit(t *testing.T) {
i: i, i: i,
w: w, w: w,
maxWatchers: 999, maxWatchers: 999,
eventSize: 11,
} }
return i, w, fs
}
func TestInit(t *testing.T) {
i, _, fs := initObjs()
rdirs := []string{"mydir1"} rdirs := []string{"mydir1"}
dirs := []string{"mydir2"} dirs := []string{"mydir2"}
@@ -45,6 +53,76 @@ loop:
} }
} }
func TestRun(t *testing.T) {
i, _, fs := initObjs()
triggerCh, eventCh, errCh := fs.Run()
// send data (len=11)
go func() {
sendInotifyData(t, i.bufReads, "name:type__") // single event
sendInotifyData(t, i.bufReads, "error:read_") // read error
sendInotifyData(t, i.bufReads, "error:parse") // parse error
sendInotifyData(t, i.bufReads, "name1:type1name2:type2") // 2 events
}()
// parse first datum
expectTrigger(t, triggerCh)
expectEvent(t, eventCh, "type__ | name")
// parse second datum
expectTrigger(t, triggerCh)
expectError(t, errCh, "reading inotify buffer: error-inotify-read")
// parse third datum
expectTrigger(t, triggerCh)
expectError(t, errCh, "parsing events: parse-event-error")
// parse fourth datum
expectTrigger(t, triggerCh)
expectEvent(t, eventCh, "type1 | name1")
expectEvent(t, eventCh, "type2 | name2")
}
const timeout = 500 * time.Millisecond
func sendInotifyData(t *testing.T, dataCh chan []byte, s string) {
select {
case dataCh <- []byte(s):
case <-time.After(timeout):
t.Fatalf("Could not send data in time: %s", s)
}
}
func expectTrigger(t *testing.T, triggerCh chan struct{}) {
select {
case <-triggerCh:
case <-time.After(timeout):
t.Fatalf("Timeout: did not receive trigger in time")
}
}
func expectEvent(t *testing.T, eventCh chan string, exp string) {
select {
case e := <-eventCh:
if strings.TrimSpace(e) != exp {
t.Errorf("Wrong event: %+v", e)
}
case <-time.After(timeout):
t.Fatalf("Timeout: did not receive event in time")
}
}
func expectError(t *testing.T, errCh chan error, exp string) {
select {
case err := <-errCh:
if err.Error() != exp {
t.Errorf("Wrong error: %v", err)
}
case <-time.After(timeout):
t.Fatalf("Timeout: did not receive error in time")
}
}
// mocks // mocks
// Mock Inotify // Mock Inotify
@@ -52,12 +130,14 @@ loop:
type MockInotify struct { type MockInotify struct {
initialized bool initialized bool
watching []string watching []string
bufReads chan []byte
} }
func NewMockInotify() *MockInotify { func NewMockInotify() *MockInotify {
return &MockInotify{ return &MockInotify{
initialized: false, initialized: false,
watching: make([]string, 0), watching: make([]string, 0),
bufReads: make(chan []byte),
} }
} }
@@ -79,11 +159,22 @@ func (i *MockInotify) NumWatchers() int {
} }
func (i *MockInotify) Read(buf []byte) (int, error) { func (i *MockInotify) Read(buf []byte) (int, error) {
return 32, nil b := <-i.bufReads
t := strings.Split(string(b), ":")
if t[0] == "error" && t[1] == "read_" {
return -1, fmt.Errorf("error-inotify-read")
}
copy(buf, b)
return len(b), nil
} }
func (i *MockInotify) ParseNextEvent(buf []byte) (*inotify.Event, uint32, error) { func (i *MockInotify) ParseNextEvent(buf []byte) (*inotify.Event, uint32, error) {
return &inotify.Event{Name: "name", Op: "CREATE"}, 32, nil s := string(buf[:11])
t := strings.Split(s, ":")
if t[0] == "error" && t[1] == "parse" {
return nil, uint32(len(buf)), fmt.Errorf("parse-event-error")
}
return &inotify.Event{Name: t[0], Op: t[1]}, 11, nil
} }
func (i *MockInotify) Close() error { func (i *MockInotify) Close() error {

View File

@@ -17,6 +17,8 @@ const maximumWatchersFile = "/proc/sys/fs/inotify/max_user_watches"
// set to -1 if the number cannot be determined // set to -1 if the number cannot be determined
var MaxWatchers int = -1 var MaxWatchers int = -1
const EventSize int = unix.SizeofInotifyEvent
func init() { func init() {
mw, err := getMaxWatchers() mw, err := getMaxWatchers()
if err == nil { if err == nil {
@@ -83,6 +85,13 @@ func (i *Inotify) ParseNextEvent(buf []byte) (*Event, uint32, error) {
sys := (*unix.InotifyEvent)(unsafe.Pointer(&buf[0])) sys := (*unix.InotifyEvent)(unsafe.Pointer(&buf[0]))
offset := unix.SizeofInotifyEvent + sys.Len offset := unix.SizeofInotifyEvent + sys.Len
if sys.Wd == -1 {
// watch descriptors should never be negative, yet there appears to be an unfixed bug causing them to be:
// https://rachelbythebay.com/w/2014/11/24/touch/
// https://code.launchpad.net/~jamesodhunt/libnih/libnih-inotify-overflow-fix-for-777093/+merge/65372
return nil, offset, fmt.Errorf("possible inotify event overflow")
}
watcher, ok := i.Watchers[int(sys.Wd)] watcher, ok := i.Watchers[int(sys.Wd)]
if !ok { if !ok {
return nil, offset, fmt.Errorf("unknown watcher ID: %d", sys.Wd) return nil, offset, fmt.Errorf("unknown watcher ID: %d", sys.Wd)

View File

@@ -15,7 +15,8 @@ type Logger interface {
} }
type FSWatcher interface { type FSWatcher interface {
Start(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) Init(rdirs, dirs []string) (chan error, chan struct{})
Run() (chan struct{}, chan string, chan error)
} }
type ProcfsScanner interface { type ProcfsScanner interface {
@@ -25,15 +26,19 @@ type ProcfsScanner interface {
func Start(cfg config.Config, logger Logger, inotify FSWatcher, pscan ProcfsScanner, sigCh chan os.Signal) (chan struct{}, error) { func Start(cfg config.Config, logger Logger, inotify FSWatcher, pscan ProcfsScanner, sigCh chan os.Signal) (chan struct{}, error) {
logger.Infof("Config: %+v\n", cfg) logger.Infof("Config: %+v\n", cfg)
// log all errors errCh, doneCh := inotify.Init(cfg.RDirs, cfg.Dirs)
errCh := make(chan error, 10) initloop:
for {
select {
case <-doneCh:
break initloop
case err := <-errCh:
logger.Errorf("initializing fs watcher: %v", err)
}
}
triggerCh, fsEventCh, errCh := inotify.Run()
go logErrors(errCh, logger) go logErrors(errCh, logger)
triggerCh, fsEventCh, err := inotify.Start(cfg.RDirs, cfg.Dirs, errCh)
if err != nil {
logger.Errorf("Can't set up inotify watchers: %v\n", err)
return nil, errors.New("inotify error")
}
psEventCh, err := pscan.Setup(triggerCh, 100*time.Millisecond) psEventCh, err := pscan.Setup(triggerCh, 100*time.Millisecond)
if err != nil { if err != nil {
logger.Errorf("Can't set up procfs scanner: %+v\n", err) logger.Errorf("Can't set up procfs scanner: %+v\n", err)
@@ -41,7 +46,9 @@ func Start(cfg config.Config, logger Logger, inotify FSWatcher, pscan ProcfsScan
} }
// ignore all file system events created on startup // ignore all file system events created on startup
logger.Infof("Draining file system events due to startup...")
drainChanFor(fsEventCh, 1*time.Second) drainChanFor(fsEventCh, 1*time.Second)
logger.Infof("done")
exit := make(chan struct{}) exit := make(chan struct{})
go func() { go func() {
@@ -81,67 +88,6 @@ func drainChanFor(c chan string, d time.Duration) {
} }
} }
// const MaxInt = int(^uint(0) >> 1)
// func Watch(rdirs, dirs []string, logPS, logFS bool) {
// maxWatchers, err := inotify.WatcherLimit()
// if err != nil {
// log.Printf("Can't get inotify watcher limit...: %v\n", err)
// }
// log.Printf("Inotify watcher limit: %d (/proc/sys/fs/inotify/max_user_watches)\n", maxWatchers)
// ping := make(chan struct{})
// in, err := inotify.NewInotify(ping, logFS)
// if err != nil {
// log.Fatalf("Can't init inotify: %v", err)
// }
// defer in.Close()
// for _, dir := range rdirs {
// addWatchers(dir, MaxInt, in, maxWatchers)
// }
// for _, dir := range dirs {
// addWatchers(dir, 0, in, maxWatchers)
// }
// log.Printf("Inotify watchers set up: %s - watching now\n", in)
// procList := process.NewProcList()
// ticker := time.NewTicker(100 * time.Millisecond)
// for {
// select {
// case <-ticker.C:
// refresh(in, procList, logPS)
// case <-ping:
// refresh(in, procList, logPS)
// }
// }
// }
// func addWatchers(dir string, depth int, in *inotify.Inotify, maxWatchers int) {
// dirCh, errCh, doneCh := walker.Walk(dir, depth)
// loop:
// for {
// if in.NumWatchers() >= maxWatchers {
// close(doneCh)
// break loop
// }
// select {
// case dir, ok := <-dirCh:
// if !ok {
// break loop
// }
// if err := in.Watch(dir); err != nil {
// fmt.Fprintf(os.Stderr, "Can't create watcher: %v\n", err)
// }
// case err := <-errCh:
// fmt.Fprintf(os.Stderr, "Error walking filesystem: %v\n", err)
// }
// }
// }
// func refresh(in *inotify.Inotify, pl *process.ProcList, print bool) { // func refresh(in *inotify.Inotify, pl *process.ProcList, print bool) {
// in.Pause() // in.Pause()
// if err := pl.Refresh(print); err != nil { // if err := pl.Refresh(print); err != nil {

View File

@@ -1,139 +1,140 @@
package pspy package pspy
import ( // import (
"fmt" // "fmt"
"os" // "os"
"syscall" // "syscall"
"testing" // "testing"
"time" // "time"
"github.com/dominicbreuker/pspy/internal/config" // "github.com/dominicbreuker/pspy/internal/config"
) // )
func TestStart(t *testing.T) { // func TestStart(t *testing.T) {
cfg := config.Config{ // cfg := config.Config{
RDirs: []string{"rdir"}, // RDirs: []string{"rdir"},
Dirs: []string{"dir"}, // Dirs: []string{"dir"},
LogFS: true, // LogFS: true,
LogPS: true, // LogPS: true,
} // }
mockLogger := newMockLogger() // mockLogger := newMockLogger()
mockIW := newMockInotifyWatcher(nil) // mockIW := newMockFSWatcher()
mockPS := newMockProcfsScanner(nil) // mockPS := newMockProcfsScanner(nil)
sigCh := make(chan os.Signal) // sigCh := make(chan os.Signal)
exit, err := Start(cfg, mockLogger, mockIW, mockPS, sigCh) // exit, err := Init(cfg, mockLogger, mockIW, mockPS, sigCh)
if err != nil { // if err != nil {
t.Fatalf("Unexpcted error: %v", err) // t.Fatalf("Unexpcted error: %v", err)
} // }
mockIW.fsEventCh <- "some fs event" // mockIW.fsEventCh <- "some fs event"
expectMsg(t, mockLogger.Event, "FS: some fs event\n") // expectMsg(t, mockLogger.Event, "FS: some fs event\n")
mockPS.psEventCh <- "some ps event" // mockPS.psEventCh <- "some ps event"
expectMsg(t, mockLogger.Event, "CMD: some ps event\n") // expectMsg(t, mockLogger.Event, "CMD: some ps event\n")
sigCh <- syscall.SIGINT // sigCh <- syscall.SIGINT
expectExit(t, exit) // expectExit(t, exit)
} // }
func expectMsg(t *testing.T, ch chan string, msg string) { // func expectMsg(t *testing.T, ch chan string, msg string) {
select { // select {
case received := <-ch: // case received := <-ch:
if received != msg { // if received != msg {
t.Fatalf("Wanted to receive %s but got %s", msg, received) // t.Fatalf("Wanted to receive %s but got %s", msg, received)
} // }
case <-time.After(500 * time.Millisecond): // case <-time.After(500 * time.Millisecond):
t.Fatalf("Did not receive message in time. Wanted: %s", msg) // t.Fatalf("Did not receive message in time. Wanted: %s", msg)
} // }
} // }
func expectExit(t *testing.T, ch chan struct{}) { // func expectExit(t *testing.T, ch chan struct{}) {
select { // select {
case <-ch: // case <-ch:
return // return
case <-time.After(500 * time.Millisecond): // case <-time.After(500 * time.Millisecond):
t.Fatalf("Did not receive exit signal in time") // t.Fatalf("Did not receive exit signal in time")
} // }
} // }
// ##### Mocks ##### // // ##### Mocks #####
// Logger // // Logger
type mockLogger struct { // type mockLogger struct {
Info chan string // Info chan string
Error chan string // Error chan string
Event chan string // Event chan string
} // }
func newMockLogger() *mockLogger { // func newMockLogger() *mockLogger {
return &mockLogger{ // return &mockLogger{
Info: make(chan string, 10), // Info: make(chan string, 10),
Error: make(chan string, 10), // Error: make(chan string, 10),
Event: make(chan string, 10), // Event: make(chan string, 10),
} // }
} // }
func (l *mockLogger) Infof(format string, v ...interface{}) { // func (l *mockLogger) Infof(format string, v ...interface{}) {
l.Info <- fmt.Sprintf(format, v...) // l.Info <- fmt.Sprintf(format, v...)
} // }
func (l *mockLogger) Errorf(format string, v ...interface{}) { // func (l *mockLogger) Errorf(format string, v ...interface{}) {
l.Error <- fmt.Sprintf(format, v...) // l.Error <- fmt.Sprintf(format, v...)
} // }
func (l *mockLogger) Eventf(format string, v ...interface{}) { // func (l *mockLogger) Eventf(format string, v ...interface{}) {
l.Event <- fmt.Sprintf(format, v...) // l.Event <- fmt.Sprintf(format, v...)
} // }
// InotfiyWatcher // // InotfiyWatcher
type mockInotifyWatcher struct { // type mockFSWatcher struct {
triggerCh chan struct{} // initErrCh chan error
fsEventCh chan string // initDoneCh chan struct{}
setupErr error // runTriggerCh chan struct{}
closed bool // runEventCh chan struct{}
} // runErrorCh chan struct{}
// closed bool
// }
func newMockInotifyWatcher(setupErr error) *mockInotifyWatcher { // func newMockFSWatcher() *mockFSWatcher {
return &mockInotifyWatcher{ // return &mockFSWatcher{}
triggerCh: make(chan struct{}), // }
fsEventCh: make(chan string),
setupErr: setupErr,
closed: false,
}
}
func (i *mockInotifyWatcher) Start(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) { // func (fs *mockFSWatcher) Init(rdirs, dirs []string) (chan error, chan struct{}) {
if i.setupErr != nil { // fs.initErrCh = make(chan error)
return nil, nil, i.setupErr // fs.initDoneCh = make(chan struct{})
} // return fs.initErrCh, fs.initDoneCh
return i.triggerCh, i.fsEventCh, nil // }
}
func (i mockInotifyWatcher) Close() { // func (fs *mockFSWatcher) Run() (chan struct{}, chan string, chan error) {
i.closed = true // fs.runTriggerCh, fs.runEventCh, fs.runErrorCh = make(chan struct{}), make(chan string), make(chan error)
} // return fs.runTriggerCh, fs.runEventCh, fs.runErrorCh
// }
// ProcfsScanner // func (i mockFSWatcher) Close() {
// i.closed = true
// }
type mockProcfsScanner struct { // // ProcfsScanner
triggerCh chan struct{}
interval time.Duration
psEventCh chan string
setupErr error
}
func newMockProcfsScanner(setupErr error) *mockProcfsScanner { // type mockProcfsScanner struct {
return &mockProcfsScanner{ // triggerCh chan struct{}
psEventCh: make(chan string), // interval time.Duration
setupErr: setupErr, // psEventCh chan string
} // setupErr error
} // }
func (p *mockProcfsScanner) Setup(triggerCh chan struct{}, interval time.Duration) (chan string, error) { // func newMockProcfsScanner(setupErr error) *mockProcfsScanner {
if p.setupErr != nil { // return &mockProcfsScanner{
return nil, p.setupErr // psEventCh: make(chan string),
} // setupErr: setupErr,
return p.psEventCh, nil // }
} // }
// func (p *mockProcfsScanner) Setup(triggerCh chan struct{}, interval time.Duration) (chan string, error) {
// if p.setupErr != nil {
// return nil, p.setupErr
// }
// return p.psEventCh, nil
// }