mirror of
https://github.com/DominicBreuker/pspy.git
synced 2025-12-21 03:34:50 +00:00
add tests for fswatcher
This commit is contained in:
@@ -24,6 +24,7 @@ type FSWatcher struct {
|
||||
i Inotify
|
||||
w Walker
|
||||
maxWatchers int
|
||||
eventSize int
|
||||
}
|
||||
|
||||
func NewFSWatcher() (*FSWatcher, error) {
|
||||
@@ -31,11 +32,12 @@ func NewFSWatcher() (*FSWatcher, error) {
|
||||
i: inotify.NewInotify(),
|
||||
w: walker.NewWalker(),
|
||||
maxWatchers: inotify.MaxWatchers,
|
||||
eventSize: inotify.EventSize,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (iw *FSWatcher) Close() {
|
||||
iw.i.Close()
|
||||
func (fs *FSWatcher) Close() {
|
||||
fs.i.Close()
|
||||
}
|
||||
|
||||
func (fs *FSWatcher) Init(rdirs, dirs []string) (chan error, chan struct{}) {
|
||||
@@ -48,10 +50,10 @@ func (fs *FSWatcher) Init(rdirs, dirs []string) (chan error, chan struct{}) {
|
||||
errCh <- fmt.Errorf("setting up inotify: %v", err)
|
||||
}
|
||||
for _, dir := range rdirs {
|
||||
addWatchers(dir, -1, fs.i, fs.maxWatchers, fs.w, errCh)
|
||||
fs.addWatchers(dir, -1, errCh)
|
||||
}
|
||||
for _, dir := range dirs {
|
||||
addWatchers(dir, 0, fs.i, fs.maxWatchers, fs.w, errCh)
|
||||
fs.addWatchers(dir, 0, errCh)
|
||||
}
|
||||
close(doneCh)
|
||||
}()
|
||||
@@ -59,47 +61,65 @@ func (fs *FSWatcher) Init(rdirs, dirs []string) (chan error, chan struct{}) {
|
||||
return errCh, doneCh
|
||||
}
|
||||
|
||||
func addWatchers(dir string, depth int, i Inotify, maxWatchers int, w Walker, errCh chan error) {
|
||||
dirCh, walkErrCh, doneCh := w.Walk(dir, depth)
|
||||
loop:
|
||||
func (fs *FSWatcher) addWatchers(dir string, depth int, errCh chan error) {
|
||||
dirCh, walkErrCh, doneCh := fs.w.Walk(dir, depth)
|
||||
|
||||
for {
|
||||
if maxWatchers > 0 && i.NumWatchers() >= maxWatchers {
|
||||
if fs.maxWatchers > 0 && fs.i.NumWatchers() >= fs.maxWatchers {
|
||||
close(doneCh)
|
||||
break loop
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-walkErrCh:
|
||||
errCh <- fmt.Errorf("adding inotift watchers: %v", err)
|
||||
case dir, ok := <-dirCh:
|
||||
if !ok {
|
||||
break loop
|
||||
return
|
||||
}
|
||||
if err := i.Watch(dir); err != nil {
|
||||
if err := fs.i.Watch(dir); err != nil {
|
||||
errCh <- fmt.Errorf("Can't create watcher: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (fs *FSWatcher) Start(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) {
|
||||
err := fs.i.Init()
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("setting up inotify: %v", err)
|
||||
}
|
||||
func (fs *FSWatcher) Run() (chan struct{}, chan string, chan error) {
|
||||
triggerCh, dataCh, eventCh, errCh := make(chan struct{}), make(chan []byte), make(chan string), make(chan error)
|
||||
|
||||
for _, dir := range rdirs {
|
||||
addWatchers(dir, -1, fs.i, fs.maxWatchers, fs.w, errCh)
|
||||
}
|
||||
for _, dir := range dirs {
|
||||
addWatchers(dir, 0, fs.i, fs.maxWatchers, fs.w, errCh)
|
||||
}
|
||||
go fs.observe(triggerCh, dataCh, errCh)
|
||||
go fs.parseEvents(dataCh, eventCh, errCh)
|
||||
|
||||
triggerCh := make(chan struct{})
|
||||
dataCh := make(chan []byte)
|
||||
go Observe(fs.i, triggerCh, dataCh, errCh)
|
||||
|
||||
eventCh := make(chan string)
|
||||
go parseEvents(fs.i, dataCh, eventCh, errCh)
|
||||
|
||||
return triggerCh, eventCh, nil
|
||||
return triggerCh, eventCh, errCh
|
||||
}
|
||||
|
||||
func (fs *FSWatcher) observe(triggerCh chan struct{}, dataCh chan []byte, errCh chan error) {
|
||||
buf := make([]byte, 5*fs.eventSize)
|
||||
|
||||
for {
|
||||
n, err := fs.i.Read(buf)
|
||||
triggerCh <- struct{}{}
|
||||
if err != nil {
|
||||
errCh <- fmt.Errorf("reading inotify buffer: %v", err)
|
||||
continue
|
||||
}
|
||||
bufCopy := make([]byte, n)
|
||||
copy(bufCopy, buf)
|
||||
dataCh <- bufCopy
|
||||
}
|
||||
}
|
||||
|
||||
func (fs *FSWatcher) parseEvents(dataCh chan []byte, eventCh chan string, errCh chan error) {
|
||||
for buf := range dataCh {
|
||||
var ptr uint32
|
||||
for len(buf[ptr:]) > 0 {
|
||||
event, size, err := fs.i.ParseNextEvent(buf[ptr:])
|
||||
ptr += size
|
||||
if err != nil {
|
||||
errCh <- fmt.Errorf("parsing events: %v", err)
|
||||
continue
|
||||
}
|
||||
eventCh <- fmt.Sprintf("%20s | %s", event.Op, event.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,14 +2,16 @@ package fswatcher
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/dominicbreuker/pspy/internal/fswatcher/inotify"
|
||||
)
|
||||
|
||||
func TestInit(t *testing.T) {
|
||||
func initObjs() (*MockInotify, *MockWalker, *FSWatcher) {
|
||||
i := NewMockInotify()
|
||||
w := &MockWalker{
|
||||
subdirs: map[string][]string{
|
||||
@@ -22,7 +24,13 @@ func TestInit(t *testing.T) {
|
||||
i: i,
|
||||
w: w,
|
||||
maxWatchers: 999,
|
||||
eventSize: 11,
|
||||
}
|
||||
return i, w, fs
|
||||
}
|
||||
|
||||
func TestInit(t *testing.T) {
|
||||
i, _, fs := initObjs()
|
||||
rdirs := []string{"mydir1"}
|
||||
dirs := []string{"mydir2"}
|
||||
|
||||
@@ -45,6 +53,76 @@ loop:
|
||||
}
|
||||
}
|
||||
|
||||
func TestRun(t *testing.T) {
|
||||
i, _, fs := initObjs()
|
||||
triggerCh, eventCh, errCh := fs.Run()
|
||||
|
||||
// send data (len=11)
|
||||
go func() {
|
||||
sendInotifyData(t, i.bufReads, "name:type__") // single event
|
||||
sendInotifyData(t, i.bufReads, "error:read_") // read error
|
||||
sendInotifyData(t, i.bufReads, "error:parse") // parse error
|
||||
sendInotifyData(t, i.bufReads, "name1:type1name2:type2") // 2 events
|
||||
}()
|
||||
|
||||
// parse first datum
|
||||
expectTrigger(t, triggerCh)
|
||||
expectEvent(t, eventCh, "type__ | name")
|
||||
|
||||
// parse second datum
|
||||
expectTrigger(t, triggerCh)
|
||||
expectError(t, errCh, "reading inotify buffer: error-inotify-read")
|
||||
|
||||
// parse third datum
|
||||
expectTrigger(t, triggerCh)
|
||||
expectError(t, errCh, "parsing events: parse-event-error")
|
||||
|
||||
// parse fourth datum
|
||||
expectTrigger(t, triggerCh)
|
||||
expectEvent(t, eventCh, "type1 | name1")
|
||||
expectEvent(t, eventCh, "type2 | name2")
|
||||
}
|
||||
|
||||
const timeout = 500 * time.Millisecond
|
||||
|
||||
func sendInotifyData(t *testing.T, dataCh chan []byte, s string) {
|
||||
select {
|
||||
case dataCh <- []byte(s):
|
||||
case <-time.After(timeout):
|
||||
t.Fatalf("Could not send data in time: %s", s)
|
||||
}
|
||||
}
|
||||
|
||||
func expectTrigger(t *testing.T, triggerCh chan struct{}) {
|
||||
select {
|
||||
case <-triggerCh:
|
||||
case <-time.After(timeout):
|
||||
t.Fatalf("Timeout: did not receive trigger in time")
|
||||
}
|
||||
}
|
||||
|
||||
func expectEvent(t *testing.T, eventCh chan string, exp string) {
|
||||
select {
|
||||
case e := <-eventCh:
|
||||
if strings.TrimSpace(e) != exp {
|
||||
t.Errorf("Wrong event: %+v", e)
|
||||
}
|
||||
case <-time.After(timeout):
|
||||
t.Fatalf("Timeout: did not receive event in time")
|
||||
}
|
||||
}
|
||||
|
||||
func expectError(t *testing.T, errCh chan error, exp string) {
|
||||
select {
|
||||
case err := <-errCh:
|
||||
if err.Error() != exp {
|
||||
t.Errorf("Wrong error: %v", err)
|
||||
}
|
||||
case <-time.After(timeout):
|
||||
t.Fatalf("Timeout: did not receive error in time")
|
||||
}
|
||||
}
|
||||
|
||||
// mocks
|
||||
|
||||
// Mock Inotify
|
||||
@@ -52,12 +130,14 @@ loop:
|
||||
type MockInotify struct {
|
||||
initialized bool
|
||||
watching []string
|
||||
bufReads chan []byte
|
||||
}
|
||||
|
||||
func NewMockInotify() *MockInotify {
|
||||
return &MockInotify{
|
||||
initialized: false,
|
||||
watching: make([]string, 0),
|
||||
bufReads: make(chan []byte),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,11 +159,22 @@ func (i *MockInotify) NumWatchers() int {
|
||||
}
|
||||
|
||||
func (i *MockInotify) Read(buf []byte) (int, error) {
|
||||
return 32, nil
|
||||
b := <-i.bufReads
|
||||
t := strings.Split(string(b), ":")
|
||||
if t[0] == "error" && t[1] == "read_" {
|
||||
return -1, fmt.Errorf("error-inotify-read")
|
||||
}
|
||||
copy(buf, b)
|
||||
return len(b), nil
|
||||
}
|
||||
|
||||
func (i *MockInotify) ParseNextEvent(buf []byte) (*inotify.Event, uint32, error) {
|
||||
return &inotify.Event{Name: "name", Op: "CREATE"}, 32, nil
|
||||
s := string(buf[:11])
|
||||
t := strings.Split(s, ":")
|
||||
if t[0] == "error" && t[1] == "parse" {
|
||||
return nil, uint32(len(buf)), fmt.Errorf("parse-event-error")
|
||||
}
|
||||
return &inotify.Event{Name: t[0], Op: t[1]}, 11, nil
|
||||
}
|
||||
|
||||
func (i *MockInotify) Close() error {
|
||||
|
||||
@@ -17,6 +17,8 @@ const maximumWatchersFile = "/proc/sys/fs/inotify/max_user_watches"
|
||||
// set to -1 if the number cannot be determined
|
||||
var MaxWatchers int = -1
|
||||
|
||||
const EventSize int = unix.SizeofInotifyEvent
|
||||
|
||||
func init() {
|
||||
mw, err := getMaxWatchers()
|
||||
if err == nil {
|
||||
@@ -83,6 +85,13 @@ func (i *Inotify) ParseNextEvent(buf []byte) (*Event, uint32, error) {
|
||||
sys := (*unix.InotifyEvent)(unsafe.Pointer(&buf[0]))
|
||||
offset := unix.SizeofInotifyEvent + sys.Len
|
||||
|
||||
if sys.Wd == -1 {
|
||||
// watch descriptors should never be negative, yet there appears to be an unfixed bug causing them to be:
|
||||
// https://rachelbythebay.com/w/2014/11/24/touch/
|
||||
// https://code.launchpad.net/~jamesodhunt/libnih/libnih-inotify-overflow-fix-for-777093/+merge/65372
|
||||
return nil, offset, fmt.Errorf("possible inotify event overflow")
|
||||
}
|
||||
|
||||
watcher, ok := i.Watchers[int(sys.Wd)]
|
||||
if !ok {
|
||||
return nil, offset, fmt.Errorf("unknown watcher ID: %d", sys.Wd)
|
||||
|
||||
Reference in New Issue
Block a user