package fswatcher import ( "fmt" "github.com/dominicbreuker/pspy/internal/fswatcher/inotify" "github.com/dominicbreuker/pspy/internal/fswatcher/walker" ) type Inotify interface { Init() error Watch(dir string) error NumWatchers() int Read(buf []byte) (int, error) ParseNextEvent(buf []byte) (*inotify.Event, uint32, error) Close() error } type Walker interface { Walk(dir string, depth int) (chan string, chan error, chan struct{}) } type FSWatcher struct { i Inotify w Walker maxWatchers int eventSize int drain bool } func NewFSWatcher() *FSWatcher { return &FSWatcher{ i: inotify.NewInotify(), w: walker.NewWalker(), maxWatchers: inotify.MaxWatchers, eventSize: inotify.EventSize, drain: true, } } func (fs *FSWatcher) Enable() { fs.drain = false } func (fs *FSWatcher) Close() { fs.i.Close() } func (fs *FSWatcher) Init(rdirs, dirs []string) (chan error, chan struct{}) { errCh := make(chan error) doneCh := make(chan struct{}) go func() { defer close(doneCh) err := fs.i.Init() if err != nil { errCh <- fmt.Errorf("setting up inotify: %v", err) return } fs.addWatchers(rdirs, dirs, errCh) }() return errCh, doneCh } func (fs *FSWatcher) addWatchers(rdirs, dirs []string, errCh chan error) { for _, dir := range rdirs { fs.addWatchersToDir(dir, -1, errCh) } for _, dir := range dirs { fs.addWatchersToDir(dir, 0, errCh) } } func (fs *FSWatcher) addWatchersToDir(dir string, depth int, errCh chan error) { dirCh, walkErrCh, doneCh := fs.w.Walk(dir, depth) for { if fs.maximumWatchersExceeded() { close(doneCh) return } if done := fs.handleNextWalkerResult(dirCh, walkErrCh, errCh); done { return } } } func (fs *FSWatcher) maximumWatchersExceeded() bool { return fs.maxWatchers > 0 && fs.i.NumWatchers() >= fs.maxWatchers } func (fs *FSWatcher) handleNextWalkerResult(dirCh chan string, walkErrCh chan error, errCh chan error) bool { select { case err := <-walkErrCh: errCh <- fmt.Errorf("adding inotify watchers: %v", err) case dir, ok := <-dirCh: if !ok { return true } if err := fs.i.Watch(dir); err != nil { errCh <- fmt.Errorf("can't create watcher: %v", err) } } return false } func (fs *FSWatcher) Run() (chan struct{}, chan string, chan error) { triggerCh, dataCh, eventCh, errCh := make(chan struct{}), make(chan []byte), make(chan string), make(chan error) go fs.observe(triggerCh, dataCh, errCh) go fs.parseEvents(dataCh, eventCh, errCh) return triggerCh, eventCh, errCh } func (fs *FSWatcher) observe(triggerCh chan struct{}, dataCh chan []byte, errCh chan error) { buf := make([]byte, 5*fs.eventSize) for { n, err := fs.i.Read(buf) if fs.drain { continue } triggerCh <- struct{}{} if err != nil { errCh <- fmt.Errorf("reading inotify buffer: %v", err) continue } bufCopy := make([]byte, n) copy(bufCopy, buf) dataCh <- bufCopy } } func (fs *FSWatcher) parseEvents(dataCh chan []byte, eventCh chan string, errCh chan error) { for buf := range dataCh { fs.handleChunk(buf, eventCh, errCh) } } func (fs *FSWatcher) handleChunk(buf []byte, eventCh chan string, errCh chan error) { var ptr uint32 for len(buf[ptr:]) > 0 { event, size, err := fs.i.ParseNextEvent(buf[ptr:]) ptr += size if err != nil { errCh <- fmt.Errorf("parsing events: %v", err) continue } eventCh <- fmt.Sprintf("%20s | %s", event.Op, event.Name) } }