From 22df28ae1dd0da8771c5e79326490bc06e644b17 Mon Sep 17 00:00:00 2001 From: Dominic Breuker Date: Sun, 11 Feb 2018 22:15:10 +0100 Subject: [PATCH] add some hacky experiments for inotify event parsing --- docker/Dockerfile | 8 ++- docker/etc/cron.d/myjob | 1 + docker/etc/cron.d/print | 1 + docker/scripts/print_stuff.py | 9 +++ internal/dev/plist.go | 42 ++++++------ internal/inotify/inotify.go | 118 ++++++++++++++++++++++++++++++---- internal/inotify/watcher.go | 19 ++++++ internal/walker/walker.go | 47 ++++++++++++++ 8 files changed, 214 insertions(+), 31 deletions(-) create mode 100644 docker/etc/cron.d/myjob create mode 100644 docker/etc/cron.d/print create mode 100644 docker/scripts/print_stuff.py create mode 100644 internal/walker/walker.go diff --git a/docker/Dockerfile b/docker/Dockerfile index 8c30c67..e91a0e4 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,9 +1,15 @@ FROM golang:1.9-stretch -RUN useradd -ms /bin/bash myuser +RUN apt-get update && apt-get -y install cron sudo +COPY docker/etc/cron.d /etc/cron.d +COPY docker/scripts /scripts +RUN useradd -ms /bin/bash myuser && \ + adduser myuser sudo && \ + echo 'myuser ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers USER myuser + WORKDIR /go/src/github.com/dominicbreuker diff --git a/docker/etc/cron.d/myjob b/docker/etc/cron.d/myjob new file mode 100644 index 0000000..a718b9b --- /dev/null +++ b/docker/etc/cron.d/myjob @@ -0,0 +1 @@ +* * * * * root echo 'this is some text' >> /tmp/myjob.log diff --git a/docker/etc/cron.d/print b/docker/etc/cron.d/print new file mode 100644 index 0000000..2682ce3 --- /dev/null +++ b/docker/etc/cron.d/print @@ -0,0 +1 @@ +* * * * * root python /scripts/print_stuff.py >> /tmp/print.log diff --git a/docker/scripts/print_stuff.py b/docker/scripts/print_stuff.py new file mode 100644 index 0000000..38d1440 --- /dev/null +++ b/docker/scripts/print_stuff.py @@ -0,0 +1,9 @@ +#!/usr/bin/python + +user = "myusername" +password = "thepw" + +for i in range(100): + print("a"*i) + +print("done") diff --git a/internal/dev/plist.go b/internal/dev/plist.go index c97a925..71e5c5e 100644 --- a/internal/dev/plist.go +++ b/internal/dev/plist.go @@ -6,6 +6,7 @@ import ( "github.com/dominicbreuker/pspy/internal/inotify" "github.com/dominicbreuker/pspy/internal/process" + "github.com/dominicbreuker/pspy/internal/walker" ) type Process struct { @@ -19,32 +20,36 @@ type Process struct { } func Monitor() { - // procList := make(map[int]string) - watch() - - // for { - // refresh(procList) - // } } func watch() { + maxWatchers, err := inotify.WatcherLimit() + if err != nil { + log.Printf("Can't get inotify watcher limit...: %v\n", err) + } + log.Printf("Watcher limit: %d\n", maxWatchers) + ping := make(chan struct{}) in, err := inotify.NewInotify(ping) if err != nil { log.Fatalf("Can't init inotify: %v", err) } + defer in.Close() - dirs := []string{ - "/proc", - "/var/log", - "/home", - "/tmp", - } - - for _, dir := range dirs { - if err := in.Watch(dir); err != nil { - log.Fatalf("Can't create watcher: %v", err) + dirCh, errCh := walker.Walk("/tmp") +loop: + for { + select { + case dir, ok := <-dirCh: + if !ok { + break loop + } + if err := in.Watch(dir); err != nil { + log.Printf("Can't create watcher: %v", err) + } + case err := <-errCh: + log.Printf("Error walking filesystem: %v", err) } } @@ -52,14 +57,13 @@ func watch() { procList := process.NewProcList() - ticker := time.NewTicker(50 * time.Millisecond).C + ticker := time.NewTicker(100 * time.Millisecond).C for { select { case <-ticker: refresh(in, procList) case <-ping: - log.Printf("PING") refresh(in, procList) } } @@ -70,6 +74,6 @@ func refresh(in *inotify.Inotify, pl *process.ProcList) { if err := pl.Refresh(); err != nil { log.Printf("ERROR refreshing process list: %v", err) } - time.Sleep(50 * time.Millisecond) + time.Sleep(10 * time.Millisecond) in.UnPause() } diff --git a/internal/inotify/inotify.go b/internal/inotify/inotify.go index c3c1a5e..f6431bb 100644 --- a/internal/inotify/inotify.go +++ b/internal/inotify/inotify.go @@ -2,13 +2,16 @@ package inotify import ( "fmt" + "log" + "strings" + "unsafe" "golang.org/x/sys/unix" ) type Inotify struct { fd int - watchers []*watcher + watchers map[int]*watcher ping chan struct{} paused bool } @@ -20,9 +23,10 @@ func NewInotify(ping chan struct{}) (*Inotify, error) { } i := &Inotify{ - fd: fd, - ping: ping, - paused: false, + fd: fd, + watchers: make(map[int]*watcher), + ping: ping, + paused: false, } go watch(i) @@ -34,7 +38,14 @@ func (i *Inotify) Watch(dir string) error { if err != nil { return fmt.Errorf("creating watcher: %v", err) } - i.watchers = append(i.watchers, w) + i.watchers[w.wd] = w + return nil +} + +func (i *Inotify) Close() error { + if err := unix.Close(i.fd); err != nil { + return fmt.Errorf("closing inotify file descriptor: %v", err) + } return nil } @@ -47,19 +58,104 @@ func (i *Inotify) UnPause() { } func (i *Inotify) String() string { - dirs := make([]string, 0) - for _, w := range i.watchers { - dirs = append(dirs, w.dir) + if len(i.watchers) < 20 { + dirs := make([]string, 0) + for _, w := range i.watchers { + dirs = append(dirs, w.dir) + } + return fmt.Sprintf("Watching: %v", dirs) + } else { + return fmt.Sprintf("Watching %d directories", len(i.watchers)) } - return fmt.Sprintf("Watching: %v", dirs) +} + +type bufRead struct { + n int + buf []byte } func watch(i *Inotify) { - buf := make([]byte, 1024) + buf := make([]byte, 20*unix.SizeofInotifyEvent) + buffers := make(chan bufRead) + go verboseWatcher(i, buffers) for { - _, _ = unix.Read(i.fd, buf) + n, _ := unix.Read(i.fd, buf) if !i.paused { i.ping <- struct{}{} } + buffers <- bufRead{ + n: n, + buf: buf, + } } } + +func verboseWatcher(i *Inotify, buffers chan bufRead) { + for bf := range buffers { + n := bf.n + buf := bf.buf + + if n < unix.SizeofInotifyEvent { + if n == 0 { + // If EOF is received. This should really never happen. + panic(fmt.Sprintf("No bytes read from fd")) + } else if n < 0 { + // If an error occurred while reading. + log.Printf("ERROR: reading from inotify: %d", n) + } else { + // Read was too short. + log.Printf("ERROR: Short read") + } + continue + } + + var offset uint32 + for offset <= uint32(n-unix.SizeofInotifyEvent) { + raw := (*unix.InotifyEvent)(unsafe.Pointer(&buf[offset])) + + mask := uint32(raw.Mask) + nameLen := uint32(raw.Len) + + name := i.watchers[int(raw.Wd)].dir + if nameLen > 0 { + bytes := (*[unix.PathMax]byte)(unsafe.Pointer(&buf[offset+unix.SizeofInotifyEvent])) + if uint32(len(bytes)) > nameLen { + name += "/" + strings.TrimRight(string(bytes[0:nameLen]), "\000") + } + } + ev := newEvent(name, mask) + log.Printf("### %+v", ev) + + offset += unix.SizeofInotifyEvent + nameLen + } + } +} + +type Event struct { + name string + op string +} + +func (e Event) String() string { + return fmt.Sprintf("%10s | %s", e.op, e.name) +} + +func newEvent(name string, mask uint32) Event { + e := Event{name: name} + if mask&unix.IN_CREATE == unix.IN_CREATE || mask&unix.IN_MOVED_TO == unix.IN_MOVED_TO { + e.op = "CREATE" + } + if mask&unix.IN_DELETE_SELF == unix.IN_DELETE_SELF || mask&unix.IN_DELETE == unix.IN_DELETE { + e.op = "REMOVE" + } + if mask&unix.IN_MODIFY == unix.IN_MODIFY { + e.op = "WRITE" + } + if mask&unix.IN_MOVE_SELF == unix.IN_MOVE_SELF || mask&unix.IN_MOVED_FROM == unix.IN_MOVED_FROM { + e.op = "RENAME" + } + if mask&unix.IN_ATTRIB == unix.IN_ATTRIB { + e.op = "CHMOD" + } + return e +} diff --git a/internal/inotify/watcher.go b/internal/inotify/watcher.go index 64fb81d..98fe61b 100644 --- a/internal/inotify/watcher.go +++ b/internal/inotify/watcher.go @@ -2,11 +2,15 @@ package inotify import ( "fmt" + "io/ioutil" + "strconv" + "strings" "golang.org/x/sys/unix" ) const events = unix.IN_ALL_EVENTS +const MaximumWatchersFile = "/proc/sys/fs/inotify/max_user_watches" type watcher struct { wd int @@ -23,3 +27,18 @@ func newWatcher(fd int, dir string, ping chan struct{}) (*watcher, error) { dir: dir, }, nil } + +func WatcherLimit() (int, error) { + b, err := ioutil.ReadFile(MaximumWatchersFile) + if err != nil { + return 0, fmt.Errorf("reading from %s: %v", MaximumWatchersFile, err) + } + + s := strings.TrimSpace(string(b)) + m, err := strconv.Atoi(s) + if err != nil { + return 0, fmt.Errorf("converting to integer: %v", err) + } + + return m, nil +} diff --git a/internal/walker/walker.go b/internal/walker/walker.go new file mode 100644 index 0000000..c1f5f3b --- /dev/null +++ b/internal/walker/walker.go @@ -0,0 +1,47 @@ +package walker + +import ( + "fmt" + "io/ioutil" +) + +func Walk(root string) (dirCh chan string, errCh chan error) { + dirCh = make(chan string) + errCh = make(chan error) + dirs := make([]string, 1) + dirs[0] = root + + go func() { + dirCh <- root + }() + go func() { + for { + if len(dirs) == 0 { + break + } + dirs = descent(dirs, dirCh, errCh) + } + close(dirCh) + close(errCh) + }() + return dirCh, errCh +} + +func descent(dirs []string, dirCh chan string, errCh chan error) []string { + next := make([]string, 0) + for _, dir := range dirs { + ls, err := ioutil.ReadDir(dir) + if err != nil { + errCh <- fmt.Errorf("opening dir %s: %v", dir, err) + } + + for _, e := range ls { + if e.IsDir() { + newDir := dir + e.Name() + "/" + dirCh <- newDir + next = append(next, newDir) + } + } + } + return next +}