restructure inotify package and add some tests

This commit is contained in:
Dominic Breuker
2018-02-25 14:21:16 +01:00
parent d59ec7f1a8
commit dd123848f2
16 changed files with 315 additions and 166 deletions

View File

@@ -66,20 +66,23 @@ func init() {
} }
func root(cmd *cobra.Command, args []string) { func root(cmd *cobra.Command, args []string) {
fmt.Printf("Watching recursively : %+v (%d)\n", rDirs, len(rDirs)) logger := logging.NewLogger()
fmt.Printf("Watching non-recursively: %+v (%d)\n", dirs, len(dirs))
fmt.Printf("Printing: processes=%t file-system events=%t\n", logPS, logFS)
cfg := config.Config{ cfg := config.Config{
RDirs: rDirs, RDirs: rDirs,
Dirs: dirs, Dirs: dirs,
LogPS: logPS, LogPS: logPS,
LogFS: logFS, LogFS: logFS,
} }
logger := logging.NewLogger() iw, err := inotify.NewInotifyWatcher()
iw := inotify.NewInotifyWatcher() if err != nil {
logger.Errorf("Can't initialize inotify: %v", err)
os.Exit(1)
}
defer iw.Close()
pscan := process.NewProcfsScanner() pscan := process.NewProcfsScanner()
sigCh := make(chan os.Signal, 1) sigCh := make(chan os.Signal)
signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
exit, err := pspy.Start(cfg, logger, iw, pscan, sigCh) exit, err := pspy.Start(cfg, logger, iw, pscan, sigCh)

View File

@@ -10,5 +10,5 @@ type Config struct {
} }
func (c Config) String() string { func (c Config) String() string {
return fmt.Sprintf("Printing events: processes: %t | file system events: %t | Watching directories: %+v (recursive) | %+v (non-recursive)", c.LogPS, c.LogFS, c.RDirs, c.Dirs) return fmt.Sprintf("Printing events: processes=%t | file-system-events=%t ||| Watching directories: %+v (recursive) | %+v (non-recursive)", c.LogPS, c.LogFS, c.RDirs, c.Dirs)
} }

View File

@@ -3,9 +3,7 @@ package inotify
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"log"
"strconv" "strconv"
"time"
"unsafe" "unsafe"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
@@ -37,38 +35,11 @@ var InotifyEvents = map[uint32]string{
(unix.IN_OPEN | unix.IN_ISDIR): "OPEN DIR", (unix.IN_OPEN | unix.IN_ISDIR): "OPEN DIR",
} }
type Event struct { func parseEvents(i *Inotify, dataCh chan []byte, eventCh chan string, errCh chan error) {
name string for buf := range dataCh {
op string n := len(buf)
}
func (e Event) String() string {
return fmt.Sprintf("%20s | %s", e.op, e.name)
}
func newEvent(name string, mask uint32) Event {
e := Event{name: name}
op, ok := InotifyEvents[mask]
if !ok {
op = strconv.FormatInt(int64(mask), 2)
}
e.op = op
return e
}
func eventLogger(i *Inotify, buffers chan bufRead, print bool) {
// enable printing only after delay since setting up watchers causes flood of events
printEnabled := false
go func() {
<-time.After(1 * time.Second)
printEnabled = print
}()
for bf := range buffers {
n := bf.n
buf := bf.buf
if n < unix.SizeofInotifyEvent { if n < unix.SizeofInotifyEvent {
// incomplete or erroneous read errCh <- fmt.Errorf("Inotify event parser: incomplete read: n=%d", n)
continue continue
} }
@@ -80,6 +51,7 @@ func eventLogger(i *Inotify, buffers chan bufRead, print bool) {
watcher, ok := i.watchers[int(sys.Wd)] watcher, ok := i.watchers[int(sys.Wd)]
if !ok { if !ok {
errCh <- fmt.Errorf("Inotify event parser: unknown watcher ID: %d", sys.Wd)
continue continue
} }
name = watcher.dir + "/" name = watcher.dir + "/"
@@ -88,10 +60,15 @@ func eventLogger(i *Inotify, buffers chan bufRead, print bool) {
ptr += sys.Len ptr += sys.Len
} }
ev := newEvent(name, sys.Mask) eventCh <- formatEvent(name, sys.Mask)
if printEnabled {
log.Printf("\x1b[32;1mFS: %+v\x1b[0m", ev)
}
} }
} }
} }
func formatEvent(name string, mask uint32) string {
op, ok := InotifyEvents[mask]
if !ok {
op = strconv.FormatInt(int64(mask), 2)
}
return fmt.Sprintf("%20s | %s", op, name)
}

View File

@@ -6,26 +6,12 @@ import (
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
) )
type InotifyWatcher struct{}
func NewInotifyWatcher() *InotifyWatcher {
return &InotifyWatcher{}
}
func (i *InotifyWatcher) Setup(rdirs, dirs []string) (chan struct{}, chan string, error) {
triggerCh := make(chan struct{})
fsEventCh := make(chan string)
return triggerCh, fsEventCh, nil
}
type Inotify struct { type Inotify struct {
fd int fd int
watchers map[int]*watcher watchers map[int]*watcher
ping chan struct{}
paused bool
} }
func NewInotify(ping chan struct{}, print bool) (*Inotify, error) { func NewInotify() (*Inotify, error) {
fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC) fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC)
if fd == -1 { if fd == -1 {
return nil, fmt.Errorf("Can't init inotify: %d", errno) return nil, fmt.Errorf("Can't init inotify: %d", errno)
@@ -34,16 +20,13 @@ func NewInotify(ping chan struct{}, print bool) (*Inotify, error) {
i := &Inotify{ i := &Inotify{
fd: fd, fd: fd,
watchers: make(map[int]*watcher), watchers: make(map[int]*watcher),
ping: ping,
paused: false,
} }
go watch(i, print)
return i, nil return i, nil
} }
func (i *Inotify) Watch(dir string) error { func (i *Inotify) Watch(dir string) error {
w, err := newWatcher(i.fd, dir, i.ping) w, err := newWatcher(i.fd, dir)
if err != nil { if err != nil {
return fmt.Errorf("creating watcher: %v", err) return fmt.Errorf("creating watcher: %v", err)
} }
@@ -58,14 +41,6 @@ func (i *Inotify) Close() error {
return nil return nil
} }
func (i *Inotify) Pause() {
i.paused = true
}
func (i *Inotify) UnPause() {
i.paused = false
}
func (i *Inotify) NumWatchers() int { func (i *Inotify) NumWatchers() int {
return len(i.watchers) return len(i.watchers)
} }
@@ -81,24 +56,3 @@ func (i *Inotify) String() string {
return fmt.Sprintf("Watching %d directories", len(i.watchers)) return fmt.Sprintf("Watching %d directories", len(i.watchers))
} }
} }
type bufRead struct {
n int
buf []byte
}
func watch(i *Inotify, print bool) {
buf := make([]byte, 5*unix.SizeofInotifyEvent)
buffers := make(chan bufRead)
go eventLogger(i, buffers, print)
for {
n, _ := unix.Read(i.fd, buf)
if !i.paused {
i.ping <- struct{}{}
}
buffers <- bufRead{
n: n,
buf: buf,
}
}
}

View File

@@ -0,0 +1,23 @@
package inotify
import (
"fmt"
"golang.org/x/sys/unix"
)
func Observe(i *Inotify, triggerCh chan struct{}, dataCh chan []byte, errCh chan error) {
buf := make([]byte, 5*unix.SizeofInotifyEvent)
for {
n, errno := unix.Read(i.fd, buf)
if n == -1 {
errCh <- fmt.Errorf("reading from inotify fd: errno: %d", errno)
return
}
triggerCh <- struct{}{}
bufCopy := make([]byte, n)
copy(bufCopy, buf)
dataCh <- bufCopy
}
}

69
internal/inotify/setup.go Normal file
View File

@@ -0,0 +1,69 @@
package inotify
import (
"fmt"
"github.com/dominicbreuker/pspy/internal/inotify/walker"
)
type InotifyWatcher struct {
i *Inotify
}
func (iw *InotifyWatcher) Close() {
iw.i.Close()
}
func NewInotifyWatcher() (*InotifyWatcher, error) {
i, err := NewInotify()
if err != nil {
return nil, fmt.Errorf("setting up inotify: %v", err)
}
return &InotifyWatcher{
i: i,
}, nil
}
func (iw *InotifyWatcher) Setup(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) {
maxWatchers, err := getLimit()
if err != nil {
errCh <- fmt.Errorf("Can't get inotify watcher limit...: %v\n", err)
maxWatchers = -1
}
for _, dir := range rdirs {
addWatchers(dir, -1, iw.i, maxWatchers, errCh)
}
for _, dir := range dirs {
addWatchers(dir, 0, iw.i, maxWatchers, errCh)
}
triggerCh := make(chan struct{})
dataCh := make(chan []byte)
go Observe(iw.i, triggerCh, dataCh, errCh)
eventCh := make(chan string)
go parseEvents(iw.i, dataCh, eventCh, errCh)
return triggerCh, eventCh, nil
}
func addWatchers(dir string, depth int, i *Inotify, maxWatchers int, errCh chan error) {
dirCh, doneCh := walker.Walk(dir, depth, errCh)
loop:
for {
if maxWatchers > 0 && i.NumWatchers() >= maxWatchers {
close(doneCh)
break loop
}
select {
case dir, ok := <-dirCh:
if !ok {
break loop
}
if err := i.Watch(dir); err != nil {
errCh <- fmt.Errorf("Can't create watcher: %v", err)
}
}
}
}

View File

View File

View File

View File

@@ -3,22 +3,32 @@ package walker
import ( import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os"
"path/filepath" "path/filepath"
) )
func Walk(root string, depth int) (dirCh chan string, errCh chan error, doneCh chan struct{}) { const maxInt = int(^uint(0) >> 1)
func Walk(root string, depth int, errCh chan error) (dirCh chan string, doneCh chan struct{}) {
if depth < 0 {
depth = maxInt
}
dirCh = make(chan string) dirCh = make(chan string)
errCh = make(chan error)
doneCh = make(chan struct{}) doneCh = make(chan struct{})
go func() { go func() {
descent(root, depth-1, dirCh, errCh, doneCh) descent(root, depth-1, dirCh, errCh, doneCh)
close(dirCh) close(dirCh)
}() }()
return dirCh, errCh, doneCh return dirCh, doneCh
} }
func descent(dir string, depth int, dirCh chan string, errCh chan error, doneCh chan struct{}) { func descent(dir string, depth int, dirCh chan string, errCh chan error, doneCh chan struct{}) {
_, err := os.Stat(dir)
if err != nil {
errCh <- fmt.Errorf("Can't walk directory %s: %v", dir, err)
return
}
select { select {
case dirCh <- dir: case dirCh <- dir:
case <-doneCh: case <-doneCh:

View File

@@ -0,0 +1,84 @@
package walker
import (
"reflect"
"strings"
"testing"
)
func TestWalk(t *testing.T) {
tests := []struct {
root string
depth int
errCh chan error
result []string
errs []string
}{
{root: "testdata", depth: 999, errCh: newErrCh(), result: []string{
"testdata",
"testdata/subdir",
"testdata/subdir/subsubdir",
}, errs: make([]string, 0)},
{root: "testdata", depth: -1, errCh: newErrCh(), result: []string{
"testdata",
"testdata/subdir",
"testdata/subdir/subsubdir",
}, errs: []string{}},
{root: "testdata", depth: 1, errCh: newErrCh(), result: []string{
"testdata",
"testdata/subdir",
}, errs: []string{}},
{root: "testdata", depth: 0, errCh: newErrCh(), result: []string{
"testdata",
}, errs: []string{}},
{root: "testdata/subdir", depth: 1, errCh: newErrCh(), result: []string{
"testdata/subdir",
"testdata/subdir/subsubdir",
}, errs: []string{}},
{root: "testdata/non-existing-dir", depth: 1, errCh: newErrCh(), result: []string{}, errs: []string{"Can't walk directory testdata/non-existing-dir"}},
}
for i, tt := range tests {
dirCh, doneCh := Walk(tt.root, tt.depth, tt.errCh)
dirs, errs := getAllDirsAndErrors(dirCh, tt.errCh)
if !reflect.DeepEqual(dirs, tt.result) {
t.Fatalf("[%d] Wrong number of dirs found: %+v", i, dirs)
}
if !reflect.DeepEqual(errs, tt.errs) {
t.Fatalf("[%d] Wrong number of errs found: %+v vs %+v", i, errs, tt.errs)
}
close(doneCh)
}
}
func getAllDirsAndErrors(dirCh chan string, errCh chan error) ([]string, []string) {
dirs := make([]string, 0)
errs := make([]string, 0)
doneDirsCh := make(chan struct{})
go func() {
for d := range dirCh {
dirs = append(dirs, d)
}
close(errCh)
close(doneDirsCh)
}()
doneErrsCh := make(chan struct{})
go func() {
for err := range errCh {
tokens := strings.SplitN(err.Error(), ":", 2)
errs = append(errs, tokens[0])
}
close(doneErrsCh)
}()
<-doneDirsCh
<-doneErrsCh
return dirs, errs
}
func newErrCh() chan error {
return make(chan error)
}

View File

@@ -17,7 +17,7 @@ type watcher struct {
dir string dir string
} }
func newWatcher(fd int, dir string, ping chan struct{}) (*watcher, error) { func newWatcher(fd int, dir string) (*watcher, error) {
wd, errno := unix.InotifyAddWatch(fd, dir, events) wd, errno := unix.InotifyAddWatch(fd, dir, events)
if wd == -1 { if wd == -1 {
return nil, fmt.Errorf("adding watcher on %s: %d", dir, errno) return nil, fmt.Errorf("adding watcher on %s: %d", dir, errno)
@@ -28,7 +28,7 @@ func newWatcher(fd int, dir string, ping chan struct{}) (*watcher, error) {
}, nil }, nil
} }
func WatcherLimit() (int, error) { func getLimit() (int, error) {
b, err := ioutil.ReadFile(MaximumWatchersFile) b, err := ioutil.ReadFile(MaximumWatchersFile)
if err != nil { if err != nil {
return 0, fmt.Errorf("reading from %s: %v", MaximumWatchersFile, err) return 0, fmt.Errorf("reading from %s: %v", MaximumWatchersFile, err)

View File

@@ -17,6 +17,11 @@ func NewProcfsScanner() *ProcfsScanner {
func (p *ProcfsScanner) Setup(triggerCh chan struct{}, interval time.Duration) (chan string, error) { func (p *ProcfsScanner) Setup(triggerCh chan struct{}, interval time.Duration) (chan string, error) {
psEventCh := make(chan string) psEventCh := make(chan string)
go func() {
for {
<-triggerCh
}
}()
return psEventCh, nil return psEventCh, nil
} }

View File

@@ -2,15 +2,10 @@ package pspy
import ( import (
"errors" "errors"
"fmt"
"log"
"os" "os"
"time" "time"
"github.com/dominicbreuker/pspy/internal/config" "github.com/dominicbreuker/pspy/internal/config"
"github.com/dominicbreuker/pspy/internal/inotify"
"github.com/dominicbreuker/pspy/internal/process"
"github.com/dominicbreuker/pspy/internal/walker"
) )
type Logger interface { type Logger interface {
@@ -20,7 +15,7 @@ type Logger interface {
} }
type InotifyWatcher interface { type InotifyWatcher interface {
Setup(rdirs, dirs []string) (chan struct{}, chan string, error) Setup(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error)
} }
type ProcfsScanner interface { type ProcfsScanner interface {
@@ -30,7 +25,11 @@ type ProcfsScanner interface {
func Start(cfg config.Config, logger Logger, inotify InotifyWatcher, pscan ProcfsScanner, sigCh chan os.Signal) (chan struct{}, error) { func Start(cfg config.Config, logger Logger, inotify InotifyWatcher, pscan ProcfsScanner, sigCh chan os.Signal) (chan struct{}, error) {
logger.Infof("Config: %+v\n", cfg) logger.Infof("Config: %+v\n", cfg)
triggerCh, fsEventCh, err := inotify.Setup(cfg.RDirs, cfg.Dirs) // log all errors
errCh := make(chan error, 10)
go logErrors(errCh, logger)
triggerCh, fsEventCh, err := inotify.Setup(cfg.RDirs, cfg.Dirs, errCh)
if err != nil { if err != nil {
logger.Errorf("Can't set up inotify watchers: %v\n", err) logger.Errorf("Can't set up inotify watchers: %v\n", err)
return nil, errors.New("inotify error") return nil, errors.New("inotify error")
@@ -41,8 +40,10 @@ func Start(cfg config.Config, logger Logger, inotify InotifyWatcher, pscan Procf
return nil, errors.New("procfs scanner error") return nil, errors.New("procfs scanner error")
} }
exit := make(chan struct{}) // ignore all file system events created on startup
drainChanFor(fsEventCh, 1*time.Second)
exit := make(chan struct{})
go func() { go func() {
for { for {
select { select {
@@ -63,72 +64,89 @@ func Start(cfg config.Config, logger Logger, inotify InotifyWatcher, pscan Procf
return exit, nil return exit, nil
} }
const MaxInt = int(^uint(0) >> 1) func logErrors(errCh chan error, logger Logger) {
for {
func Watch(rdirs, dirs []string, logPS, logFS bool) { err := <-errCh
maxWatchers, err := inotify.WatcherLimit() logger.Errorf("ERROR: %v\n", err)
if err != nil {
log.Printf("Can't get inotify watcher limit...: %v\n", err)
} }
log.Printf("Inotify watcher limit: %d (/proc/sys/fs/inotify/max_user_watches)\n", maxWatchers) }
ping := make(chan struct{})
in, err := inotify.NewInotify(ping, logFS)
if err != nil {
log.Fatalf("Can't init inotify: %v", err)
}
defer in.Close()
for _, dir := range rdirs {
addWatchers(dir, MaxInt, in, maxWatchers)
}
for _, dir := range dirs {
addWatchers(dir, 0, in, maxWatchers)
}
log.Printf("Inotify watchers set up: %s - watching now\n", in)
procList := process.NewProcList()
ticker := time.NewTicker(100 * time.Millisecond)
func drainChanFor(c chan string, d time.Duration) {
for { for {
select { select {
case <-ticker.C: case <-c:
refresh(in, procList, logPS) case <-time.After(d):
case <-ping: return
refresh(in, procList, logPS)
} }
} }
} }
func addWatchers(dir string, depth int, in *inotify.Inotify, maxWatchers int) { // const MaxInt = int(^uint(0) >> 1)
dirCh, errCh, doneCh := walker.Walk(dir, depth)
loop:
for {
if in.NumWatchers() >= maxWatchers {
close(doneCh)
break loop
}
select {
case dir, ok := <-dirCh:
if !ok {
break loop
}
if err := in.Watch(dir); err != nil {
fmt.Fprintf(os.Stderr, "Can't create watcher: %v\n", err)
}
case err := <-errCh:
fmt.Fprintf(os.Stderr, "Error walking filesystem: %v\n", err)
}
}
}
func refresh(in *inotify.Inotify, pl *process.ProcList, print bool) { // func Watch(rdirs, dirs []string, logPS, logFS bool) {
in.Pause() // maxWatchers, err := inotify.WatcherLimit()
if err := pl.Refresh(print); err != nil { // if err != nil {
log.Printf("ERROR refreshing process list: %v", err) // log.Printf("Can't get inotify watcher limit...: %v\n", err)
} // }
time.Sleep(5 * time.Millisecond) // log.Printf("Inotify watcher limit: %d (/proc/sys/fs/inotify/max_user_watches)\n", maxWatchers)
in.UnPause()
} // ping := make(chan struct{})
// in, err := inotify.NewInotify(ping, logFS)
// if err != nil {
// log.Fatalf("Can't init inotify: %v", err)
// }
// defer in.Close()
// for _, dir := range rdirs {
// addWatchers(dir, MaxInt, in, maxWatchers)
// }
// for _, dir := range dirs {
// addWatchers(dir, 0, in, maxWatchers)
// }
// log.Printf("Inotify watchers set up: %s - watching now\n", in)
// procList := process.NewProcList()
// ticker := time.NewTicker(100 * time.Millisecond)
// for {
// select {
// case <-ticker.C:
// refresh(in, procList, logPS)
// case <-ping:
// refresh(in, procList, logPS)
// }
// }
// }
// func addWatchers(dir string, depth int, in *inotify.Inotify, maxWatchers int) {
// dirCh, errCh, doneCh := walker.Walk(dir, depth)
// loop:
// for {
// if in.NumWatchers() >= maxWatchers {
// close(doneCh)
// break loop
// }
// select {
// case dir, ok := <-dirCh:
// if !ok {
// break loop
// }
// if err := in.Watch(dir); err != nil {
// fmt.Fprintf(os.Stderr, "Can't create watcher: %v\n", err)
// }
// case err := <-errCh:
// fmt.Fprintf(os.Stderr, "Error walking filesystem: %v\n", err)
// }
// }
// }
// func refresh(in *inotify.Inotify, pl *process.ProcList, print bool) {
// in.Pause()
// if err := pl.Refresh(print); err != nil {
// log.Printf("ERROR refreshing process list: %v", err)
// }
// time.Sleep(5 * time.Millisecond)
// in.UnPause()
// }

View File

@@ -92,6 +92,7 @@ type mockInotifyWatcher struct {
triggerCh chan struct{} triggerCh chan struct{}
fsEventCh chan string fsEventCh chan string
setupErr error setupErr error
closed bool
} }
func newMockInotifyWatcher(setupErr error) *mockInotifyWatcher { func newMockInotifyWatcher(setupErr error) *mockInotifyWatcher {
@@ -99,16 +100,21 @@ func newMockInotifyWatcher(setupErr error) *mockInotifyWatcher {
triggerCh: make(chan struct{}), triggerCh: make(chan struct{}),
fsEventCh: make(chan string), fsEventCh: make(chan string),
setupErr: setupErr, setupErr: setupErr,
closed: false,
} }
} }
func (i *mockInotifyWatcher) Setup(rdirs, dirs []string) (chan struct{}, chan string, error) { func (i *mockInotifyWatcher) Setup(rdirs, dirs []string, errCh chan error) (chan struct{}, chan string, error) {
if i.setupErr != nil { if i.setupErr != nil {
return nil, nil, i.setupErr return nil, nil, i.setupErr
} }
return i.triggerCh, i.fsEventCh, nil return i.triggerCh, i.fsEventCh, nil
} }
func (i mockInotifyWatcher) Close() {
i.closed = true
}
// ProcfsScanner // ProcfsScanner
type mockProcfsScanner struct { type mockProcfsScanner struct {