avoid spamming trigger and event channels while draining

This commit is contained in:
Dominic Breuker
2022-10-25 22:58:55 +02:00
parent 2bf9f913ae
commit d6985d700f
7 changed files with 31 additions and 18 deletions

View File

@@ -25,6 +25,7 @@ type FSWatcher struct {
w Walker
maxWatchers int
eventSize int
drain bool
}
func NewFSWatcher() *FSWatcher {
@@ -33,9 +34,14 @@ func NewFSWatcher() *FSWatcher {
w: walker.NewWalker(),
maxWatchers: inotify.MaxWatchers,
eventSize: inotify.EventSize,
drain: true,
}
}
func (fs *FSWatcher) Enable() {
fs.drain = false
}
func (fs *FSWatcher) Close() {
fs.i.Close()
}
@@ -116,6 +122,10 @@ func (fs *FSWatcher) observe(triggerCh chan struct{}, dataCh chan []byte, errCh
for {
n, err := fs.i.Read(buf)
if fs.drain {
continue
}
triggerCh <- struct{}{}
if err != nil {
errCh <- fmt.Errorf("reading inotify buffer: %v", err)

View File

@@ -24,6 +24,7 @@ type Logger interface {
type FSWatcher interface {
Init(rdirs, dirs []string) (chan error, chan struct{})
Run() (chan struct{}, chan string, chan error)
Enable()
}
type PSScanner interface {
@@ -110,7 +111,7 @@ func startFSW(fsw FSWatcher, logger Logger, drainFor time.Duration, sigCh <-chan
// ignore all file system events created on startup
logger.Infof("Draining file system events due to startup...")
ok = drainEventsFor(triggerCh, fsEventCh, drainFor, sigCh)
ok = drainEventsFor(triggerCh, fsEventCh, drainFor, sigCh, fsw)
logger.Infof("done")
return
}
@@ -137,14 +138,13 @@ func logErrors(errCh chan error, logger Logger) {
}
}
func drainEventsFor(triggerCh chan struct{}, eventCh chan string, d time.Duration, sigCh <-chan os.Signal) bool {
func drainEventsFor(triggerCh chan struct{}, eventCh chan string, d time.Duration, sigCh <-chan os.Signal, fsw FSWatcher) bool {
for {
select {
case <-sigCh:
return false
case <-triggerCh:
case <-eventCh:
case <-time.After(d):
fsw.Enable()
return true
}
}

View File

@@ -68,8 +68,6 @@ func TestStartFSW(t *testing.T) {
sigCh := make(chan os.Signal)
go func() {
fsw.runTriggerCh <- struct{}{} // trigger sent while draining
fsw.runEventCh <- "event sent while draining"
fsw.runErrCh <- errors.New("error sent while draining")
<-time.After(drainFor) // ensure draining is over
fsw.runTriggerCh <- struct{}{}
@@ -294,6 +292,10 @@ func (fsw *mockFSWatcher) Run() (chan struct{}, chan string, chan error) {
return fsw.runTriggerCh, fsw.runEventCh, fsw.runErrCh
}
func (fsw *mockFSWatcher) Enable() {
return
}
// PSScanner
type mockPSScanner struct {