restructure inotify package and add some tests

This commit is contained in:
Dominic Breuker
2018-02-25 14:21:16 +01:00
parent f5ca2dad75
commit d1c18d901a
16 changed files with 315 additions and 166 deletions

View File

@@ -3,9 +3,7 @@ package inotify
import (
"bytes"
"fmt"
"log"
"strconv"
"time"
"unsafe"
"golang.org/x/sys/unix"
@@ -37,38 +35,11 @@ var InotifyEvents = map[uint32]string{
(unix.IN_OPEN | unix.IN_ISDIR): "OPEN DIR",
}
type Event struct {
name string
op string
}
func (e Event) String() string {
return fmt.Sprintf("%20s | %s", e.op, e.name)
}
func newEvent(name string, mask uint32) Event {
e := Event{name: name}
op, ok := InotifyEvents[mask]
if !ok {
op = strconv.FormatInt(int64(mask), 2)
}
e.op = op
return e
}
func eventLogger(i *Inotify, buffers chan bufRead, print bool) {
// enable printing only after delay since setting up watchers causes flood of events
printEnabled := false
go func() {
<-time.After(1 * time.Second)
printEnabled = print
}()
for bf := range buffers {
n := bf.n
buf := bf.buf
func parseEvents(i *Inotify, dataCh chan []byte, eventCh chan string, errCh chan error) {
for buf := range dataCh {
n := len(buf)
if n < unix.SizeofInotifyEvent {
// incomplete or erroneous read
errCh <- fmt.Errorf("Inotify event parser: incomplete read: n=%d", n)
continue
}
@@ -80,6 +51,7 @@ func eventLogger(i *Inotify, buffers chan bufRead, print bool) {
watcher, ok := i.watchers[int(sys.Wd)]
if !ok {
errCh <- fmt.Errorf("Inotify event parser: unknown watcher ID: %d", sys.Wd)
continue
}
name = watcher.dir + "/"
@@ -88,10 +60,15 @@ func eventLogger(i *Inotify, buffers chan bufRead, print bool) {
ptr += sys.Len
}
ev := newEvent(name, sys.Mask)
if printEnabled {
log.Printf("\x1b[32;1mFS: %+v\x1b[0m", ev)
}
eventCh <- formatEvent(name, sys.Mask)
}
}
}
func formatEvent(name string, mask uint32) string {
op, ok := InotifyEvents[mask]
if !ok {
op = strconv.FormatInt(int64(mask), 2)
}
return fmt.Sprintf("%20s | %s", op, name)
}

View File

@@ -6,26 +6,12 @@ import (
"golang.org/x/sys/unix"
)
type InotifyWatcher struct{}
func NewInotifyWatcher() *InotifyWatcher {
return &InotifyWatcher{}
}
func (i *InotifyWatcher) Setup(rdirs, dirs []string) (chan struct{}, chan string, error) {
triggerCh := make(chan struct{})
fsEventCh := make(chan string)
return triggerCh, fsEventCh, nil
}
type Inotify struct {
fd int
watchers map[int]*watcher
ping chan struct{}
paused bool
}
func NewInotify(ping chan struct{}, print bool) (*Inotify, error) {
func NewInotify() (*Inotify, error) {
fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC)
if fd == -1 {
return nil, fmt.Errorf("Can't init inotify: %d", errno)
@@ -34,16 +20,13 @@ func NewInotify(ping chan struct{}, print bool) (*Inotify, error) {
i := &Inotify{
fd: fd,
watchers: make(map[int]*watcher),
ping: ping,
paused: false,
}
go watch(i, print)
return i, nil
}
func (i *Inotify) Watch(dir string) error {
w, err := newWatcher(i.fd, dir, i.ping)
w, err := newWatcher(i.fd, dir)
if err != nil {
return fmt.Errorf("creating watcher: %v", err)
}
@@ -58,14 +41,6 @@ func (i *Inotify) Close() error {
return nil
}
func (i *Inotify) Pause() {
i.paused = true
}
func (i *Inotify) UnPause() {
i.paused = false
}
func (i *Inotify) NumWatchers() int {
return len(i.watchers)
}
@@ -81,24 +56,3 @@ func (i *Inotify) String() string {
return fmt.Sprintf("Watching %d directories", len(i.watchers))
}
}
type bufRead struct {
n int
buf []byte
}
func watch(i *Inotify, print bool) {
buf := make([]byte, 5*unix.SizeofInotifyEvent)
buffers := make(chan bufRead)
go eventLogger(i, buffers, print)
for {
n, _ := unix.Read(i.fd, buf)
if !i.paused {
i.ping <- struct{}{}
}
buffers <- bufRead{
n: n,
buf: buf,
}
}
}

View File

@@ -0,0 +1,23 @@
package inotify
import (
"fmt"
"golang.org/x/sys/unix"
)
func Observe(i *Inotify, triggerCh chan struct{}, dataCh chan []byte, errCh chan error) {
buf := make([]byte, 5*unix.SizeofInotifyEvent)
for {
n, errno := unix.Read(i.fd, buf)
if n == -1 {
errCh <- fmt.Errorf("reading from inotify fd: errno: %d", errno)
return
}
triggerCh <- struct{}{}
bufCopy := make([]byte, n)
copy(bufCopy, buf)
dataCh <- bufCopy
}
}

69
internal/inotify/setup.go Normal file
View File

@@ -0,0 +1,69 @@
package inotify
import (
"fmt"
"github.com/dominicbreuker/pspy/internal/inotify/walker"
)
type InotifyWatcher struct {
i *Inotify
}
func (iw *InotifyWatcher) Close() {
iw.i.Close()
}
func NewInotifyWatcher() (*InotifyWatcher, error) {
i, err := NewInotify()
if err != nil {
return nil, fmt.Errorf("setting up inotify: %v", err)
}
return &InotifyWatcher{
i: i,
}, nil
}
func (iw *InotifyWatcher) Setup(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) {
maxWatchers, err := getLimit()
if err != nil {
errCh <- fmt.Errorf("Can't get inotify watcher limit...: %v\n", err)
maxWatchers = -1
}
for _, dir := range rdirs {
addWatchers(dir, -1, iw.i, maxWatchers, errCh)
}
for _, dir := range dirs {
addWatchers(dir, 0, iw.i, maxWatchers, errCh)
}
triggerCh := make(chan struct{})
dataCh := make(chan []byte)
go Observe(iw.i, triggerCh, dataCh, errCh)
eventCh := make(chan string)
go parseEvents(iw.i, dataCh, eventCh, errCh)
return triggerCh, eventCh, nil
}
func addWatchers(dir string, depth int, i *Inotify, maxWatchers int, errCh chan error) {
dirCh, doneCh := walker.Walk(dir, depth, errCh)
loop:
for {
if maxWatchers > 0 && i.NumWatchers() >= maxWatchers {
close(doneCh)
break loop
}
select {
case dir, ok := <-dirCh:
if !ok {
break loop
}
if err := i.Watch(dir); err != nil {
errCh <- fmt.Errorf("Can't create watcher: %v", err)
}
}
}
}

View File

View File

View File

View File

@@ -0,0 +1,52 @@
package walker
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
)
const maxInt = int(^uint(0) >> 1)
func Walk(root string, depth int, errCh chan error) (dirCh chan string, doneCh chan struct{}) {
if depth < 0 {
depth = maxInt
}
dirCh = make(chan string)
doneCh = make(chan struct{})
go func() {
descent(root, depth-1, dirCh, errCh, doneCh)
close(dirCh)
}()
return dirCh, doneCh
}
func descent(dir string, depth int, dirCh chan string, errCh chan error, doneCh chan struct{}) {
_, err := os.Stat(dir)
if err != nil {
errCh <- fmt.Errorf("Can't walk directory %s: %v", dir, err)
return
}
select {
case dirCh <- dir:
case <-doneCh:
return
}
if depth < 0 {
return
}
ls, err := ioutil.ReadDir(dir)
if err != nil {
errCh <- fmt.Errorf("opening dir %s: %v", dir, err)
}
for _, e := range ls {
if e.IsDir() {
newDir := filepath.Join(dir, e.Name())
descent(newDir, depth-1, dirCh, errCh, doneCh)
}
}
}

View File

@@ -0,0 +1,84 @@
package walker
import (
"reflect"
"strings"
"testing"
)
func TestWalk(t *testing.T) {
tests := []struct {
root string
depth int
errCh chan error
result []string
errs []string
}{
{root: "testdata", depth: 999, errCh: newErrCh(), result: []string{
"testdata",
"testdata/subdir",
"testdata/subdir/subsubdir",
}, errs: make([]string, 0)},
{root: "testdata", depth: -1, errCh: newErrCh(), result: []string{
"testdata",
"testdata/subdir",
"testdata/subdir/subsubdir",
}, errs: []string{}},
{root: "testdata", depth: 1, errCh: newErrCh(), result: []string{
"testdata",
"testdata/subdir",
}, errs: []string{}},
{root: "testdata", depth: 0, errCh: newErrCh(), result: []string{
"testdata",
}, errs: []string{}},
{root: "testdata/subdir", depth: 1, errCh: newErrCh(), result: []string{
"testdata/subdir",
"testdata/subdir/subsubdir",
}, errs: []string{}},
{root: "testdata/non-existing-dir", depth: 1, errCh: newErrCh(), result: []string{}, errs: []string{"Can't walk directory testdata/non-existing-dir"}},
}
for i, tt := range tests {
dirCh, doneCh := Walk(tt.root, tt.depth, tt.errCh)
dirs, errs := getAllDirsAndErrors(dirCh, tt.errCh)
if !reflect.DeepEqual(dirs, tt.result) {
t.Fatalf("[%d] Wrong number of dirs found: %+v", i, dirs)
}
if !reflect.DeepEqual(errs, tt.errs) {
t.Fatalf("[%d] Wrong number of errs found: %+v vs %+v", i, errs, tt.errs)
}
close(doneCh)
}
}
func getAllDirsAndErrors(dirCh chan string, errCh chan error) ([]string, []string) {
dirs := make([]string, 0)
errs := make([]string, 0)
doneDirsCh := make(chan struct{})
go func() {
for d := range dirCh {
dirs = append(dirs, d)
}
close(errCh)
close(doneDirsCh)
}()
doneErrsCh := make(chan struct{})
go func() {
for err := range errCh {
tokens := strings.SplitN(err.Error(), ":", 2)
errs = append(errs, tokens[0])
}
close(doneErrsCh)
}()
<-doneDirsCh
<-doneErrsCh
return dirs, errs
}
func newErrCh() chan error {
return make(chan error)
}

View File

@@ -17,7 +17,7 @@ type watcher struct {
dir string
}
func newWatcher(fd int, dir string, ping chan struct{}) (*watcher, error) {
func newWatcher(fd int, dir string) (*watcher, error) {
wd, errno := unix.InotifyAddWatch(fd, dir, events)
if wd == -1 {
return nil, fmt.Errorf("adding watcher on %s: %d", dir, errno)
@@ -28,7 +28,7 @@ func newWatcher(fd int, dir string, ping chan struct{}) (*watcher, error) {
}, nil
}
func WatcherLimit() (int, error) {
func getLimit() (int, error) {
b, err := ioutil.ReadFile(MaximumWatchersFile)
if err != nil {
return 0, fmt.Errorf("reading from %s: %v", MaximumWatchersFile, err)