diff --git a/cmd/root.go b/cmd/root.go index c01e489..27d0e29 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -66,20 +66,23 @@ func init() { } func root(cmd *cobra.Command, args []string) { - fmt.Printf("Watching recursively : %+v (%d)\n", rDirs, len(rDirs)) - fmt.Printf("Watching non-recursively: %+v (%d)\n", dirs, len(dirs)) - fmt.Printf("Printing: processes=%t file-system events=%t\n", logPS, logFS) + logger := logging.NewLogger() + cfg := config.Config{ RDirs: rDirs, Dirs: dirs, LogPS: logPS, LogFS: logFS, } - logger := logging.NewLogger() - iw := inotify.NewInotifyWatcher() + iw, err := inotify.NewInotifyWatcher() + if err != nil { + logger.Errorf("Can't initialize inotify: %v", err) + os.Exit(1) + } + defer iw.Close() pscan := process.NewProcfsScanner() - sigCh := make(chan os.Signal, 1) + sigCh := make(chan os.Signal) signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) exit, err := pspy.Start(cfg, logger, iw, pscan, sigCh) diff --git a/internal/config/config.go b/internal/config/config.go index 287a97a..4a34e00 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -10,5 +10,5 @@ type Config struct { } func (c Config) String() string { - return fmt.Sprintf("Printing events: processes: %t | file system events: %t | Watching directories: %+v (recursive) | %+v (non-recursive)", c.LogPS, c.LogFS, c.RDirs, c.Dirs) + return fmt.Sprintf("Printing events: processes=%t | file-system-events=%t ||| Watching directories: %+v (recursive) | %+v (non-recursive)", c.LogPS, c.LogFS, c.RDirs, c.Dirs) } diff --git a/internal/inotify/event.go b/internal/inotify/event.go index 62e8ad4..4599156 100644 --- a/internal/inotify/event.go +++ b/internal/inotify/event.go @@ -3,9 +3,7 @@ package inotify import ( "bytes" "fmt" - "log" "strconv" - "time" "unsafe" "golang.org/x/sys/unix" @@ -37,38 +35,11 @@ var InotifyEvents = map[uint32]string{ (unix.IN_OPEN | unix.IN_ISDIR): "OPEN DIR", } -type Event struct { - name string - op string -} - -func (e Event) String() string { - return fmt.Sprintf("%20s | %s", e.op, e.name) -} - -func newEvent(name string, mask uint32) Event { - e := Event{name: name} - op, ok := InotifyEvents[mask] - if !ok { - op = strconv.FormatInt(int64(mask), 2) - } - e.op = op - return e -} - -func eventLogger(i *Inotify, buffers chan bufRead, print bool) { - // enable printing only after delay since setting up watchers causes flood of events - printEnabled := false - go func() { - <-time.After(1 * time.Second) - printEnabled = print - }() - for bf := range buffers { - n := bf.n - buf := bf.buf - +func parseEvents(i *Inotify, dataCh chan []byte, eventCh chan string, errCh chan error) { + for buf := range dataCh { + n := len(buf) if n < unix.SizeofInotifyEvent { - // incomplete or erroneous read + errCh <- fmt.Errorf("Inotify event parser: incomplete read: n=%d", n) continue } @@ -80,6 +51,7 @@ func eventLogger(i *Inotify, buffers chan bufRead, print bool) { watcher, ok := i.watchers[int(sys.Wd)] if !ok { + errCh <- fmt.Errorf("Inotify event parser: unknown watcher ID: %d", sys.Wd) continue } name = watcher.dir + "/" @@ -88,10 +60,15 @@ func eventLogger(i *Inotify, buffers chan bufRead, print bool) { ptr += sys.Len } - ev := newEvent(name, sys.Mask) - if printEnabled { - log.Printf("\x1b[32;1mFS: %+v\x1b[0m", ev) - } + eventCh <- formatEvent(name, sys.Mask) } } } + +func formatEvent(name string, mask uint32) string { + op, ok := InotifyEvents[mask] + if !ok { + op = strconv.FormatInt(int64(mask), 2) + } + return fmt.Sprintf("%20s | %s", op, name) +} diff --git a/internal/inotify/inotify.go b/internal/inotify/inotify.go index 90a3d55..990feb6 100644 --- a/internal/inotify/inotify.go +++ b/internal/inotify/inotify.go @@ -6,26 +6,12 @@ import ( "golang.org/x/sys/unix" ) -type InotifyWatcher struct{} - -func NewInotifyWatcher() *InotifyWatcher { - return &InotifyWatcher{} -} - -func (i *InotifyWatcher) Setup(rdirs, dirs []string) (chan struct{}, chan string, error) { - triggerCh := make(chan struct{}) - fsEventCh := make(chan string) - return triggerCh, fsEventCh, nil -} - type Inotify struct { fd int watchers map[int]*watcher - ping chan struct{} - paused bool } -func NewInotify(ping chan struct{}, print bool) (*Inotify, error) { +func NewInotify() (*Inotify, error) { fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC) if fd == -1 { return nil, fmt.Errorf("Can't init inotify: %d", errno) @@ -34,16 +20,13 @@ func NewInotify(ping chan struct{}, print bool) (*Inotify, error) { i := &Inotify{ fd: fd, watchers: make(map[int]*watcher), - ping: ping, - paused: false, } - go watch(i, print) return i, nil } func (i *Inotify) Watch(dir string) error { - w, err := newWatcher(i.fd, dir, i.ping) + w, err := newWatcher(i.fd, dir) if err != nil { return fmt.Errorf("creating watcher: %v", err) } @@ -58,14 +41,6 @@ func (i *Inotify) Close() error { return nil } -func (i *Inotify) Pause() { - i.paused = true -} - -func (i *Inotify) UnPause() { - i.paused = false -} - func (i *Inotify) NumWatchers() int { return len(i.watchers) } @@ -81,24 +56,3 @@ func (i *Inotify) String() string { return fmt.Sprintf("Watching %d directories", len(i.watchers)) } } - -type bufRead struct { - n int - buf []byte -} - -func watch(i *Inotify, print bool) { - buf := make([]byte, 5*unix.SizeofInotifyEvent) - buffers := make(chan bufRead) - go eventLogger(i, buffers, print) - for { - n, _ := unix.Read(i.fd, buf) - if !i.paused { - i.ping <- struct{}{} - } - buffers <- bufRead{ - n: n, - buf: buf, - } - } -} diff --git a/internal/inotify/observer.go b/internal/inotify/observer.go new file mode 100644 index 0000000..ea5d574 --- /dev/null +++ b/internal/inotify/observer.go @@ -0,0 +1,23 @@ +package inotify + +import ( + "fmt" + + "golang.org/x/sys/unix" +) + +func Observe(i *Inotify, triggerCh chan struct{}, dataCh chan []byte, errCh chan error) { + buf := make([]byte, 5*unix.SizeofInotifyEvent) + + for { + n, errno := unix.Read(i.fd, buf) + if n == -1 { + errCh <- fmt.Errorf("reading from inotify fd: errno: %d", errno) + return + } + triggerCh <- struct{}{} + bufCopy := make([]byte, n) + copy(bufCopy, buf) + dataCh <- bufCopy + } +} diff --git a/internal/inotify/setup.go b/internal/inotify/setup.go new file mode 100644 index 0000000..3c94e8e --- /dev/null +++ b/internal/inotify/setup.go @@ -0,0 +1,69 @@ +package inotify + +import ( + "fmt" + + "github.com/dominicbreuker/pspy/internal/inotify/walker" +) + +type InotifyWatcher struct { + i *Inotify +} + +func (iw *InotifyWatcher) Close() { + iw.i.Close() +} + +func NewInotifyWatcher() (*InotifyWatcher, error) { + i, err := NewInotify() + if err != nil { + return nil, fmt.Errorf("setting up inotify: %v", err) + } + return &InotifyWatcher{ + i: i, + }, nil +} + +func (iw *InotifyWatcher) Setup(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) { + maxWatchers, err := getLimit() + if err != nil { + errCh <- fmt.Errorf("Can't get inotify watcher limit...: %v\n", err) + maxWatchers = -1 + } + + for _, dir := range rdirs { + addWatchers(dir, -1, iw.i, maxWatchers, errCh) + } + for _, dir := range dirs { + addWatchers(dir, 0, iw.i, maxWatchers, errCh) + } + + triggerCh := make(chan struct{}) + dataCh := make(chan []byte) + go Observe(iw.i, triggerCh, dataCh, errCh) + + eventCh := make(chan string) + go parseEvents(iw.i, dataCh, eventCh, errCh) + + return triggerCh, eventCh, nil +} + +func addWatchers(dir string, depth int, i *Inotify, maxWatchers int, errCh chan error) { + dirCh, doneCh := walker.Walk(dir, depth, errCh) +loop: + for { + if maxWatchers > 0 && i.NumWatchers() >= maxWatchers { + close(doneCh) + break loop + } + select { + case dir, ok := <-dirCh: + if !ok { + break loop + } + if err := i.Watch(dir); err != nil { + errCh <- fmt.Errorf("Can't create watcher: %v", err) + } + } + } +} diff --git a/internal/inotify/walker/testdata/f11.txt b/internal/inotify/walker/testdata/f11.txt new file mode 100644 index 0000000..e69de29 diff --git a/internal/inotify/walker/testdata/f12.txt b/internal/inotify/walker/testdata/f12.txt new file mode 100644 index 0000000..e69de29 diff --git a/internal/inotify/walker/testdata/subdir/f21.txt b/internal/inotify/walker/testdata/subdir/f21.txt new file mode 100644 index 0000000..e69de29 diff --git a/internal/inotify/walker/testdata/subdir/subsubdir/f31.txt b/internal/inotify/walker/testdata/subdir/subsubdir/f31.txt new file mode 100644 index 0000000..e69de29 diff --git a/internal/walker/walker.go b/internal/inotify/walker/walker.go similarity index 67% rename from internal/walker/walker.go rename to internal/inotify/walker/walker.go index 6c8b285..8e85832 100644 --- a/internal/walker/walker.go +++ b/internal/inotify/walker/walker.go @@ -3,22 +3,32 @@ package walker import ( "fmt" "io/ioutil" + "os" "path/filepath" ) -func Walk(root string, depth int) (dirCh chan string, errCh chan error, doneCh chan struct{}) { +const maxInt = int(^uint(0) >> 1) + +func Walk(root string, depth int, errCh chan error) (dirCh chan string, doneCh chan struct{}) { + if depth < 0 { + depth = maxInt + } dirCh = make(chan string) - errCh = make(chan error) doneCh = make(chan struct{}) go func() { descent(root, depth-1, dirCh, errCh, doneCh) close(dirCh) }() - return dirCh, errCh, doneCh + return dirCh, doneCh } func descent(dir string, depth int, dirCh chan string, errCh chan error, doneCh chan struct{}) { + _, err := os.Stat(dir) + if err != nil { + errCh <- fmt.Errorf("Can't walk directory %s: %v", dir, err) + return + } select { case dirCh <- dir: case <-doneCh: diff --git a/internal/inotify/walker/walker_test.go b/internal/inotify/walker/walker_test.go new file mode 100644 index 0000000..974c2c0 --- /dev/null +++ b/internal/inotify/walker/walker_test.go @@ -0,0 +1,84 @@ +package walker + +import ( + "reflect" + "strings" + "testing" +) + +func TestWalk(t *testing.T) { + tests := []struct { + root string + depth int + errCh chan error + result []string + errs []string + }{ + {root: "testdata", depth: 999, errCh: newErrCh(), result: []string{ + "testdata", + "testdata/subdir", + "testdata/subdir/subsubdir", + }, errs: make([]string, 0)}, + {root: "testdata", depth: -1, errCh: newErrCh(), result: []string{ + "testdata", + "testdata/subdir", + "testdata/subdir/subsubdir", + }, errs: []string{}}, + {root: "testdata", depth: 1, errCh: newErrCh(), result: []string{ + "testdata", + "testdata/subdir", + }, errs: []string{}}, + {root: "testdata", depth: 0, errCh: newErrCh(), result: []string{ + "testdata", + }, errs: []string{}}, + {root: "testdata/subdir", depth: 1, errCh: newErrCh(), result: []string{ + "testdata/subdir", + "testdata/subdir/subsubdir", + }, errs: []string{}}, + {root: "testdata/non-existing-dir", depth: 1, errCh: newErrCh(), result: []string{}, errs: []string{"Can't walk directory testdata/non-existing-dir"}}, + } + + for i, tt := range tests { + dirCh, doneCh := Walk(tt.root, tt.depth, tt.errCh) + dirs, errs := getAllDirsAndErrors(dirCh, tt.errCh) + + if !reflect.DeepEqual(dirs, tt.result) { + t.Fatalf("[%d] Wrong number of dirs found: %+v", i, dirs) + } + if !reflect.DeepEqual(errs, tt.errs) { + t.Fatalf("[%d] Wrong number of errs found: %+v vs %+v", i, errs, tt.errs) + } + close(doneCh) + } + +} + +func getAllDirsAndErrors(dirCh chan string, errCh chan error) ([]string, []string) { + dirs := make([]string, 0) + errs := make([]string, 0) + + doneDirsCh := make(chan struct{}) + go func() { + for d := range dirCh { + dirs = append(dirs, d) + } + close(errCh) + close(doneDirsCh) + }() + + doneErrsCh := make(chan struct{}) + go func() { + for err := range errCh { + tokens := strings.SplitN(err.Error(), ":", 2) + errs = append(errs, tokens[0]) + } + close(doneErrsCh) + }() + <-doneDirsCh + <-doneErrsCh + return dirs, errs +} + +func newErrCh() chan error { + return make(chan error) +} diff --git a/internal/inotify/watcher.go b/internal/inotify/watcher.go index 98fe61b..a055c25 100644 --- a/internal/inotify/watcher.go +++ b/internal/inotify/watcher.go @@ -17,7 +17,7 @@ type watcher struct { dir string } -func newWatcher(fd int, dir string, ping chan struct{}) (*watcher, error) { +func newWatcher(fd int, dir string) (*watcher, error) { wd, errno := unix.InotifyAddWatch(fd, dir, events) if wd == -1 { return nil, fmt.Errorf("adding watcher on %s: %d", dir, errno) @@ -28,7 +28,7 @@ func newWatcher(fd int, dir string, ping chan struct{}) (*watcher, error) { }, nil } -func WatcherLimit() (int, error) { +func getLimit() (int, error) { b, err := ioutil.ReadFile(MaximumWatchersFile) if err != nil { return 0, fmt.Errorf("reading from %s: %v", MaximumWatchersFile, err) diff --git a/internal/process/process.go b/internal/process/process.go index 8fae7d6..2759de1 100644 --- a/internal/process/process.go +++ b/internal/process/process.go @@ -17,6 +17,11 @@ func NewProcfsScanner() *ProcfsScanner { func (p *ProcfsScanner) Setup(triggerCh chan struct{}, interval time.Duration) (chan string, error) { psEventCh := make(chan string) + go func() { + for { + <-triggerCh + } + }() return psEventCh, nil } diff --git a/internal/pspy/pspy.go b/internal/pspy/pspy.go index 79303ab..95b57f9 100644 --- a/internal/pspy/pspy.go +++ b/internal/pspy/pspy.go @@ -2,15 +2,10 @@ package pspy import ( "errors" - "fmt" - "log" "os" "time" "github.com/dominicbreuker/pspy/internal/config" - "github.com/dominicbreuker/pspy/internal/inotify" - "github.com/dominicbreuker/pspy/internal/process" - "github.com/dominicbreuker/pspy/internal/walker" ) type Logger interface { @@ -20,7 +15,7 @@ type Logger interface { } type InotifyWatcher interface { - Setup(rdirs, dirs []string) (chan struct{}, chan string, error) + Setup(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) } type ProcfsScanner interface { @@ -30,7 +25,11 @@ type ProcfsScanner interface { func Start(cfg config.Config, logger Logger, inotify InotifyWatcher, pscan ProcfsScanner, sigCh chan os.Signal) (chan struct{}, error) { logger.Infof("Config: %+v\n", cfg) - triggerCh, fsEventCh, err := inotify.Setup(cfg.RDirs, cfg.Dirs) + // log all errors + errCh := make(chan error, 10) + go logErrors(errCh, logger) + + triggerCh, fsEventCh, err := inotify.Setup(cfg.RDirs, cfg.Dirs, errCh) if err != nil { logger.Errorf("Can't set up inotify watchers: %v\n", err) return nil, errors.New("inotify error") @@ -41,8 +40,10 @@ func Start(cfg config.Config, logger Logger, inotify InotifyWatcher, pscan Procf return nil, errors.New("procfs scanner error") } - exit := make(chan struct{}) + // ignore all file system events created on startup + drainChanFor(fsEventCh, 1*time.Second) + exit := make(chan struct{}) go func() { for { select { @@ -63,72 +64,89 @@ func Start(cfg config.Config, logger Logger, inotify InotifyWatcher, pscan Procf return exit, nil } -const MaxInt = int(^uint(0) >> 1) - -func Watch(rdirs, dirs []string, logPS, logFS bool) { - maxWatchers, err := inotify.WatcherLimit() - if err != nil { - log.Printf("Can't get inotify watcher limit...: %v\n", err) +func logErrors(errCh chan error, logger Logger) { + for { + err := <-errCh + logger.Errorf("ERROR: %v\n", err) } - log.Printf("Inotify watcher limit: %d (/proc/sys/fs/inotify/max_user_watches)\n", maxWatchers) - - ping := make(chan struct{}) - in, err := inotify.NewInotify(ping, logFS) - if err != nil { - log.Fatalf("Can't init inotify: %v", err) - } - defer in.Close() - - for _, dir := range rdirs { - addWatchers(dir, MaxInt, in, maxWatchers) - } - for _, dir := range dirs { - addWatchers(dir, 0, in, maxWatchers) - } - - log.Printf("Inotify watchers set up: %s - watching now\n", in) - - procList := process.NewProcList() - - ticker := time.NewTicker(100 * time.Millisecond) +} +func drainChanFor(c chan string, d time.Duration) { for { select { - case <-ticker.C: - refresh(in, procList, logPS) - case <-ping: - refresh(in, procList, logPS) + case <-c: + case <-time.After(d): + return } } } -func addWatchers(dir string, depth int, in *inotify.Inotify, maxWatchers int) { - dirCh, errCh, doneCh := walker.Walk(dir, depth) -loop: - for { - if in.NumWatchers() >= maxWatchers { - close(doneCh) - break loop - } - select { - case dir, ok := <-dirCh: - if !ok { - break loop - } - if err := in.Watch(dir); err != nil { - fmt.Fprintf(os.Stderr, "Can't create watcher: %v\n", err) - } - case err := <-errCh: - fmt.Fprintf(os.Stderr, "Error walking filesystem: %v\n", err) - } - } -} +// const MaxInt = int(^uint(0) >> 1) -func refresh(in *inotify.Inotify, pl *process.ProcList, print bool) { - in.Pause() - if err := pl.Refresh(print); err != nil { - log.Printf("ERROR refreshing process list: %v", err) - } - time.Sleep(5 * time.Millisecond) - in.UnPause() -} +// func Watch(rdirs, dirs []string, logPS, logFS bool) { +// maxWatchers, err := inotify.WatcherLimit() +// if err != nil { +// log.Printf("Can't get inotify watcher limit...: %v\n", err) +// } +// log.Printf("Inotify watcher limit: %d (/proc/sys/fs/inotify/max_user_watches)\n", maxWatchers) + +// ping := make(chan struct{}) +// in, err := inotify.NewInotify(ping, logFS) +// if err != nil { +// log.Fatalf("Can't init inotify: %v", err) +// } +// defer in.Close() + +// for _, dir := range rdirs { +// addWatchers(dir, MaxInt, in, maxWatchers) +// } +// for _, dir := range dirs { +// addWatchers(dir, 0, in, maxWatchers) +// } + +// log.Printf("Inotify watchers set up: %s - watching now\n", in) + +// procList := process.NewProcList() + +// ticker := time.NewTicker(100 * time.Millisecond) + +// for { +// select { +// case <-ticker.C: +// refresh(in, procList, logPS) +// case <-ping: +// refresh(in, procList, logPS) +// } +// } +// } + +// func addWatchers(dir string, depth int, in *inotify.Inotify, maxWatchers int) { +// dirCh, errCh, doneCh := walker.Walk(dir, depth) +// loop: +// for { +// if in.NumWatchers() >= maxWatchers { +// close(doneCh) +// break loop +// } +// select { +// case dir, ok := <-dirCh: +// if !ok { +// break loop +// } +// if err := in.Watch(dir); err != nil { +// fmt.Fprintf(os.Stderr, "Can't create watcher: %v\n", err) +// } +// case err := <-errCh: +// fmt.Fprintf(os.Stderr, "Error walking filesystem: %v\n", err) +// } +// } +// } + +// func refresh(in *inotify.Inotify, pl *process.ProcList, print bool) { +// in.Pause() +// if err := pl.Refresh(print); err != nil { +// log.Printf("ERROR refreshing process list: %v", err) +// } +// time.Sleep(5 * time.Millisecond) +// in.UnPause() +// } diff --git a/internal/pspy/pspy_test.go b/internal/pspy/pspy_test.go index dc9f0f2..54d0b45 100644 --- a/internal/pspy/pspy_test.go +++ b/internal/pspy/pspy_test.go @@ -92,6 +92,7 @@ type mockInotifyWatcher struct { triggerCh chan struct{} fsEventCh chan string setupErr error + closed bool } func newMockInotifyWatcher(setupErr error) *mockInotifyWatcher { @@ -99,16 +100,21 @@ func newMockInotifyWatcher(setupErr error) *mockInotifyWatcher { triggerCh: make(chan struct{}), fsEventCh: make(chan string), setupErr: setupErr, + closed: false, } } -func (i *mockInotifyWatcher) Setup(rdirs, dirs []string) (chan struct{}, chan string, error) { +func (i *mockInotifyWatcher) Setup(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) { if i.setupErr != nil { return nil, nil, i.setupErr } return i.triggerCh, i.fsEventCh, nil } +func (i mockInotifyWatcher) Close() { + i.closed = true +} + // ProcfsScanner type mockProcfsScanner struct {