mirror of
https://github.com/DominicBreuker/pspy.git
synced 2025-12-21 11:44:51 +00:00
126 lines
2.7 KiB
Go
126 lines
2.7 KiB
Go
package fswatcher
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/dominicbreuker/pspy/internal/fswatcher/inotify"
|
|
"github.com/dominicbreuker/pspy/internal/fswatcher/walker"
|
|
)
|
|
|
|
type Inotify interface {
|
|
Init() error
|
|
Watch(dir string) error
|
|
NumWatchers() int
|
|
Read(buf []byte) (int, error)
|
|
ParseNextEvent(buf []byte) (*inotify.Event, uint32, error)
|
|
Close() error
|
|
}
|
|
|
|
type Walker interface {
|
|
Walk(dir string, depth int) (chan string, chan error, chan struct{})
|
|
}
|
|
|
|
type FSWatcher struct {
|
|
i Inotify
|
|
w Walker
|
|
maxWatchers int
|
|
eventSize int
|
|
}
|
|
|
|
func NewFSWatcher() *FSWatcher {
|
|
return &FSWatcher{
|
|
i: inotify.NewInotify(),
|
|
w: walker.NewWalker(),
|
|
maxWatchers: inotify.MaxWatchers,
|
|
eventSize: inotify.EventSize,
|
|
}
|
|
}
|
|
|
|
func (fs *FSWatcher) Close() {
|
|
fs.i.Close()
|
|
}
|
|
|
|
func (fs *FSWatcher) Init(rdirs, dirs []string) (chan error, chan struct{}) {
|
|
errCh := make(chan error)
|
|
doneCh := make(chan struct{})
|
|
|
|
go func() {
|
|
err := fs.i.Init()
|
|
if err != nil {
|
|
errCh <- fmt.Errorf("setting up inotify: %v", err)
|
|
}
|
|
for _, dir := range rdirs {
|
|
fs.addWatchers(dir, -1, errCh)
|
|
}
|
|
for _, dir := range dirs {
|
|
fs.addWatchers(dir, 0, errCh)
|
|
}
|
|
close(doneCh)
|
|
}()
|
|
|
|
return errCh, doneCh
|
|
}
|
|
|
|
func (fs *FSWatcher) addWatchers(dir string, depth int, errCh chan error) {
|
|
dirCh, walkErrCh, doneCh := fs.w.Walk(dir, depth)
|
|
|
|
for {
|
|
if fs.maxWatchers > 0 && fs.i.NumWatchers() >= fs.maxWatchers {
|
|
close(doneCh)
|
|
return
|
|
}
|
|
|
|
select {
|
|
case err := <-walkErrCh:
|
|
errCh <- fmt.Errorf("adding inotift watchers: %v", err)
|
|
case dir, ok := <-dirCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
if err := fs.i.Watch(dir); err != nil {
|
|
errCh <- fmt.Errorf("Can't create watcher: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (fs *FSWatcher) Run() (chan struct{}, chan string, chan error) {
|
|
triggerCh, dataCh, eventCh, errCh := make(chan struct{}), make(chan []byte), make(chan string), make(chan error)
|
|
|
|
go fs.observe(triggerCh, dataCh, errCh)
|
|
go fs.parseEvents(dataCh, eventCh, errCh)
|
|
|
|
return triggerCh, eventCh, errCh
|
|
}
|
|
|
|
func (fs *FSWatcher) observe(triggerCh chan struct{}, dataCh chan []byte, errCh chan error) {
|
|
buf := make([]byte, 5*fs.eventSize)
|
|
|
|
for {
|
|
n, err := fs.i.Read(buf)
|
|
triggerCh <- struct{}{}
|
|
if err != nil {
|
|
errCh <- fmt.Errorf("reading inotify buffer: %v", err)
|
|
continue
|
|
}
|
|
bufCopy := make([]byte, n)
|
|
copy(bufCopy, buf)
|
|
dataCh <- bufCopy
|
|
}
|
|
}
|
|
|
|
func (fs *FSWatcher) parseEvents(dataCh chan []byte, eventCh chan string, errCh chan error) {
|
|
for buf := range dataCh {
|
|
var ptr uint32
|
|
for len(buf[ptr:]) > 0 {
|
|
event, size, err := fs.i.ParseNextEvent(buf[ptr:])
|
|
ptr += size
|
|
if err != nil {
|
|
errCh <- fmt.Errorf("parsing events: %v", err)
|
|
continue
|
|
}
|
|
eventCh <- fmt.Sprintf("%20s | %s", event.Op, event.Name)
|
|
}
|
|
}
|
|
}
|