add tests for fswatcher package

This commit is contained in:
Dominic Breuker
2018-02-27 09:56:05 +01:00
parent d38cb18712
commit a2dfb51d51
14 changed files with 277 additions and 199 deletions

View File

@@ -74,7 +74,7 @@ func root(cmd *cobra.Command, args []string) {
LogPS: logPS,
LogFS: logFS,
}
iw, err := fswatcher.NewInotifyWatcher()
iw, err := fswatcher.NewFSWatcher()
if err != nil {
logger.Errorf("Can't initialize fswatcher: %v", err)
os.Exit(1)

View File

@@ -2,20 +2,18 @@ package fswatcher
import (
"fmt"
"github.com/dominicbreuker/pspy/internal/fswatcher/inotify"
)
func parseEvents(i *inotify.Inotify, dataCh chan []byte, eventCh chan string, errCh chan error) {
func parseEvents(i Inotify, dataCh chan []byte, eventCh chan string, errCh chan error) {
for buf := range dataCh {
var ptr uint32
for len(buf[ptr:]) > 0 {
event, size, err := i.ParseNextEvent(buf[ptr:])
ptr += size
if err != nil {
errCh <- fmt.Errorf("parsing events: %v", err)
continue
}
ptr += size
eventCh <- fmt.Sprintf("%20s | %s", event.Op, event.Name)
}
}

View File

@@ -0,0 +1,105 @@
package fswatcher
import (
"fmt"
"github.com/dominicbreuker/pspy/internal/fswatcher/inotify"
"github.com/dominicbreuker/pspy/internal/fswatcher/walker"
)
type Inotify interface {
Init() error
Watch(dir string) error
NumWatchers() int
Read(buf []byte) (int, error)
ParseNextEvent(buf []byte) (*inotify.Event, uint32, error)
Close() error
}
type Walker interface {
Walk(dir string, depth int) (chan string, chan error, chan struct{})
}
type FSWatcher struct {
i Inotify
w Walker
maxWatchers int
}
func NewFSWatcher() (*FSWatcher, error) {
return &FSWatcher{
i: inotify.NewInotify(),
w: walker.NewWalker(),
maxWatchers: inotify.MaxWatchers,
}, nil
}
func (iw *FSWatcher) Close() {
iw.i.Close()
}
func (fs *FSWatcher) Init(rdirs, dirs []string) (chan error, chan struct{}) {
errCh := make(chan error)
doneCh := make(chan struct{})
go func() {
err := fs.i.Init()
if err != nil {
errCh <- fmt.Errorf("setting up inotify: %v", err)
}
for _, dir := range rdirs {
addWatchers(dir, -1, fs.i, fs.maxWatchers, fs.w, errCh)
}
for _, dir := range dirs {
addWatchers(dir, 0, fs.i, fs.maxWatchers, fs.w, errCh)
}
close(doneCh)
}()
return errCh, doneCh
}
func addWatchers(dir string, depth int, i Inotify, maxWatchers int, w Walker, errCh chan error) {
dirCh, walkErrCh, doneCh := w.Walk(dir, depth)
loop:
for {
if maxWatchers > 0 && i.NumWatchers() >= maxWatchers {
close(doneCh)
break loop
}
select {
case err := <-walkErrCh:
errCh <- fmt.Errorf("adding inotift watchers: %v", err)
case dir, ok := <-dirCh:
if !ok {
break loop
}
if err := i.Watch(dir); err != nil {
errCh <- fmt.Errorf("Can't create watcher: %v", err)
}
}
}
}
func (fs *FSWatcher) Start(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) {
err := fs.i.Init()
if err != nil {
return nil, nil, fmt.Errorf("setting up inotify: %v", err)
}
for _, dir := range rdirs {
addWatchers(dir, -1, fs.i, fs.maxWatchers, fs.w, errCh)
}
for _, dir := range dirs {
addWatchers(dir, 0, fs.i, fs.maxWatchers, fs.w, errCh)
}
triggerCh := make(chan struct{})
dataCh := make(chan []byte)
go Observe(fs.i, triggerCh, dataCh, errCh)
eventCh := make(chan string)
go parseEvents(fs.i, dataCh, eventCh, errCh)
return triggerCh, eventCh, nil
}

View File

@@ -0,0 +1,127 @@
package fswatcher
import (
"errors"
"reflect"
"testing"
"time"
"github.com/dominicbreuker/pspy/internal/fswatcher/inotify"
)
func TestInit(t *testing.T) {
i := NewMockInotify()
w := &MockWalker{
subdirs: map[string][]string{
"mydir1": []string{"dir1", "dir2"},
"mydir2": []string{"dir3"},
"dir1": []string{"another-dir"},
},
}
fs := &FSWatcher{
i: i,
w: w,
maxWatchers: 999,
}
rdirs := []string{"mydir1"}
dirs := []string{"mydir2"}
errCh, doneCh := fs.Init(rdirs, dirs)
loop:
for {
select {
case <-doneCh:
break loop
case err := <-errCh:
t.Errorf("Unexpected error: %v", err)
case <-time.After(1 * time.Second):
t.Fatalf("Test timeout")
}
}
if !reflect.DeepEqual(i.watching, []string{"mydir1", "dir1", "another-dir", "dir2", "mydir2"}) {
t.Fatalf("Watching wrong directories: %+v", i.watching)
}
}
// mocks
// Mock Inotify
type MockInotify struct {
initialized bool
watching []string
}
func NewMockInotify() *MockInotify {
return &MockInotify{
initialized: false,
watching: make([]string, 0),
}
}
func (i *MockInotify) Init() error {
i.initialized = true
return nil
}
func (i *MockInotify) Watch(dir string) error {
if !i.initialized {
return errors.New("Not yet initialized")
}
i.watching = append(i.watching, dir)
return nil
}
func (i *MockInotify) NumWatchers() int {
return len(i.watching)
}
func (i *MockInotify) Read(buf []byte) (int, error) {
return 32, nil
}
func (i *MockInotify) ParseNextEvent(buf []byte) (*inotify.Event, uint32, error) {
return &inotify.Event{Name: "name", Op: "CREATE"}, 32, nil
}
func (i *MockInotify) Close() error {
if !i.initialized {
return errors.New("Not yet initialized")
}
return nil
}
// Mock Walker
type MockWalker struct {
subdirs map[string][]string
}
func (w *MockWalker) Walk(dir string, depth int) (chan string, chan error, chan struct{}) {
dirCh := make(chan string)
errCh := make(chan error)
doneCh := make(chan struct{})
go func() {
defer close(dirCh)
sendDir(w, depth, dir, dirCh)
}()
return dirCh, errCh, doneCh
}
func sendDir(w *MockWalker, depth int, dir string, dirCh chan string) {
dirCh <- dir
if depth == 0 {
return
}
subdirs, ok := w.subdirs[dir]
if !ok {
return
}
for _, sdir := range subdirs {
sendDir(w, depth-1, sdir, dirCh)
}
}

View File

@@ -1,58 +0,0 @@
package fswatcher
import (
"fmt"
"golang.org/x/sys/unix"
)
type Inotify struct {
fd int
watchers map[int]*watcher
}
func NewInotify() (*Inotify, error) {
fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC)
if fd == -1 {
return nil, fmt.Errorf("Can't init inotify: %d", errno)
}
i := &Inotify{
fd: fd,
watchers: make(map[int]*watcher),
}
return i, nil
}
func (i *Inotify) Watch(dir string) error {
w, err := newWatcher(i.fd, dir)
if err != nil {
return fmt.Errorf("creating watcher: %v", err)
}
i.watchers[w.wd] = w
return nil
}
func (i *Inotify) Close() error {
if err := unix.Close(i.fd); err != nil {
return fmt.Errorf("closing inotify file descriptor: %v", err)
}
return nil
}
func (i *Inotify) NumWatchers() int {
return len(i.watchers)
}
func (i *Inotify) String() string {
if len(i.watchers) < 20 {
dirs := make([]string, 0)
for _, w := range i.watchers {
dirs = append(dirs, w.dir)
}
return fmt.Sprintf("Watching: %v", dirs)
} else {
return fmt.Sprintf("Watching %d directories", len(i.watchers))
}
}

View File

@@ -13,6 +13,17 @@ import (
const maximumWatchersFile = "/proc/sys/fs/inotify/max_user_watches"
// MaxWatchers is the maximum number of inotify watches supported by the Kernel
// set to -1 if the number cannot be determined
var MaxWatchers int = -1
func init() {
mw, err := getMaxWatchers()
if err == nil {
MaxWatchers = mw
}
}
type Inotify struct {
FD int
Watchers map[int]*Watcher
@@ -28,17 +39,20 @@ type Event struct {
Op string
}
func NewInotify() (*Inotify, error) {
fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC)
if fd < 0 {
return nil, fmt.Errorf("initializing inotify: errno: %d", errno)
}
i := &Inotify{
FD: fd,
func NewInotify() *Inotify {
return &Inotify{
FD: 0,
Watchers: make(map[int]*Watcher),
}
return i, nil
}
func (i *Inotify) Init() error {
fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC)
if fd < 0 {
return fmt.Errorf("initializing inotify: errno: %d", errno)
}
i.FD = fd
return nil
}
func (i *Inotify) Watch(dir string) error {
@@ -112,7 +126,7 @@ func (i *Inotify) String() string {
}
}
func GetMaxWatchers() (int, error) {
func getMaxWatchers() (int, error) {
b, err := ioutil.ReadFile(maximumWatchersFile)
if err != nil {
return 0, fmt.Errorf("reading from %s: %v", maximumWatchersFile, err)

View File

@@ -9,7 +9,9 @@ import (
)
func TestInotify(t *testing.T) {
i, err := NewInotify()
i := NewInotify()
err := i.Init()
expectNoError(t, err)
err = i.Watch("testdata/folder")

View File

@@ -1,11 +1,10 @@
package fswatcher
import (
"github.com/dominicbreuker/pspy/internal/fswatcher/inotify"
"golang.org/x/sys/unix"
)
func Observe(i *inotify.Inotify, triggerCh chan struct{}, dataCh chan []byte, errCh chan error) {
func Observe(i Inotify, triggerCh chan struct{}, dataCh chan []byte, errCh chan error) {
buf := make([]byte, 5*unix.SizeofInotifyEvent)
for {

View File

@@ -1,72 +0,0 @@
package fswatcher
import (
"fmt"
"github.com/dominicbreuker/pspy/internal/fswatcher/inotify"
"github.com/dominicbreuker/pspy/internal/fswatcher/walker"
)
type InotifyWatcher struct {
i *inotify.Inotify
}
func (iw *InotifyWatcher) Close() {
iw.i.Close()
}
func NewInotifyWatcher() (*InotifyWatcher, error) {
i, err := inotify.NewInotify()
if err != nil {
return nil, fmt.Errorf("setting up inotify: %v", err)
}
return &InotifyWatcher{
i: i,
}, nil
}
func (iw *InotifyWatcher) Setup(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) {
maxWatchers, err := inotify.GetMaxWatchers()
if err != nil {
errCh <- fmt.Errorf("Can't get inotify watcher limit...: %v\n", err)
maxWatchers = -1
}
for _, dir := range rdirs {
addWatchers(dir, -1, iw.i, maxWatchers, errCh)
}
for _, dir := range dirs {
addWatchers(dir, 0, iw.i, maxWatchers, errCh)
}
triggerCh := make(chan struct{})
dataCh := make(chan []byte)
go Observe(iw.i, triggerCh, dataCh, errCh)
eventCh := make(chan string)
go parseEvents(iw.i, dataCh, eventCh, errCh)
return triggerCh, eventCh, nil
}
func addWatchers(dir string, depth int, i *inotify.Inotify, maxWatchers int, errCh chan error) {
dirCh, walkErrCh, doneCh := walker.Walk(dir, depth)
loop:
for {
if maxWatchers > 0 && i.NumWatchers() >= maxWatchers {
close(doneCh)
break loop
}
select {
case err := <-walkErrCh:
errCh <- fmt.Errorf("adding inotift watchers: %v", err)
case dir, ok := <-dirCh:
if !ok {
break loop
}
if err := i.Watch(dir); err != nil {
errCh <- fmt.Errorf("Can't create watcher: %v", err)
}
}
}
}

View File

@@ -7,9 +7,15 @@ import (
"path/filepath"
)
type Walker struct{}
func NewWalker() *Walker {
return &Walker{}
}
const maxInt = int(^uint(0) >> 1)
func Walk(root string, depth int) (dirCh chan string, errCh chan error, doneCh chan struct{}) {
func (w *Walker) Walk(root string, depth int) (dirCh chan string, errCh chan error, doneCh chan struct{}) {
if depth < 0 {
depth = maxInt
}

View File

@@ -39,7 +39,8 @@ func TestWalk(t *testing.T) {
}
for i, tt := range tests {
dirCh, errCh, doneCh := Walk(tt.root, tt.depth)
w := &Walker{}
dirCh, errCh, doneCh := w.Walk(tt.root, tt.depth)
dirs, errs := getAllDirsAndErrors(dirCh, errCh)
if !reflect.DeepEqual(dirs, tt.result) {

View File

@@ -1,44 +0,0 @@
package fswatcher
import (
"fmt"
"io/ioutil"
"strconv"
"strings"
"golang.org/x/sys/unix"
)
const events = unix.IN_ALL_EVENTS
const MaximumWatchersFile = "/proc/sys/fs/inotify/max_user_watches"
type watcher struct {
wd int
dir string
}
func newWatcher(fd int, dir string) (*watcher, error) {
wd, errno := unix.InotifyAddWatch(fd, dir, events)
if wd == -1 {
return nil, fmt.Errorf("adding watcher on %s: %d", dir, errno)
}
return &watcher{
wd: wd,
dir: dir,
}, nil
}
func getLimit() (int, error) {
b, err := ioutil.ReadFile(MaximumWatchersFile)
if err != nil {
return 0, fmt.Errorf("reading from %s: %v", MaximumWatchersFile, err)
}
s := strings.TrimSpace(string(b))
m, err := strconv.Atoi(s)
if err != nil {
return 0, fmt.Errorf("converting to integer: %v", err)
}
return m, nil
}

View File

@@ -14,22 +14,22 @@ type Logger interface {
Eventf(format string, v ...interface{})
}
type InotifyWatcher interface {
Setup(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error)
type FSWatcher interface {
Start(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error)
}
type ProcfsScanner interface {
Setup(triggerCh chan struct{}, interval time.Duration) (chan string, error)
}
func Start(cfg config.Config, logger Logger, inotify InotifyWatcher, pscan ProcfsScanner, sigCh chan os.Signal) (chan struct{}, error) {
func Start(cfg config.Config, logger Logger, inotify FSWatcher, pscan ProcfsScanner, sigCh chan os.Signal) (chan struct{}, error) {
logger.Infof("Config: %+v\n", cfg)
// log all errors
errCh := make(chan error, 10)
go logErrors(errCh, logger)
triggerCh, fsEventCh, err := inotify.Setup(cfg.RDirs, cfg.Dirs, errCh)
triggerCh, fsEventCh, err := inotify.Start(cfg.RDirs, cfg.Dirs, errCh)
if err != nil {
logger.Errorf("Can't set up inotify watchers: %v\n", err)
return nil, errors.New("inotify error")

View File

@@ -104,7 +104,7 @@ func newMockInotifyWatcher(setupErr error) *mockInotifyWatcher {
}
}
func (i *mockInotifyWatcher) Setup(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) {
func (i *mockInotifyWatcher) Start(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) {
if i.setupErr != nil {
return nil, nil, i.setupErr
}