add some hacky experiments for inotify event parsing

This commit is contained in:
Dominic Breuker
2018-02-11 22:15:10 +01:00
parent 38c5d42bb4
commit 9bc66835a6
8 changed files with 214 additions and 31 deletions

View File

@@ -1,9 +1,15 @@
FROM golang:1.9-stretch
RUN useradd -ms /bin/bash myuser
RUN apt-get update && apt-get -y install cron sudo
COPY docker/etc/cron.d /etc/cron.d
COPY docker/scripts /scripts
RUN useradd -ms /bin/bash myuser && \
adduser myuser sudo && \
echo 'myuser ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers
USER myuser
WORKDIR /go/src/github.com/dominicbreuker

1
docker/etc/cron.d/myjob Normal file
View File

@@ -0,0 +1 @@
* * * * * root echo 'this is some text' >> /tmp/myjob.log

1
docker/etc/cron.d/print Normal file
View File

@@ -0,0 +1 @@
* * * * * root python /scripts/print_stuff.py >> /tmp/print.log

View File

@@ -0,0 +1,9 @@
#!/usr/bin/python
user = "myusername"
password = "thepw"
for i in range(100):
print("a"*i)
print("done")

View File

@@ -6,6 +6,7 @@ import (
"github.com/dominicbreuker/pspy/internal/inotify"
"github.com/dominicbreuker/pspy/internal/process"
"github.com/dominicbreuker/pspy/internal/walker"
)
type Process struct {
@@ -19,32 +20,36 @@ type Process struct {
}
func Monitor() {
// procList := make(map[int]string)
watch()
// for {
// refresh(procList)
// }
}
func watch() {
maxWatchers, err := inotify.WatcherLimit()
if err != nil {
log.Printf("Can't get inotify watcher limit...: %v\n", err)
}
log.Printf("Watcher limit: %d\n", maxWatchers)
ping := make(chan struct{})
in, err := inotify.NewInotify(ping)
if err != nil {
log.Fatalf("Can't init inotify: %v", err)
}
defer in.Close()
dirs := []string{
"/proc",
"/var/log",
"/home",
"/tmp",
}
for _, dir := range dirs {
if err := in.Watch(dir); err != nil {
log.Fatalf("Can't create watcher: %v", err)
dirCh, errCh := walker.Walk("/tmp")
loop:
for {
select {
case dir, ok := <-dirCh:
if !ok {
break loop
}
if err := in.Watch(dir); err != nil {
log.Printf("Can't create watcher: %v", err)
}
case err := <-errCh:
log.Printf("Error walking filesystem: %v", err)
}
}
@@ -52,14 +57,13 @@ func watch() {
procList := process.NewProcList()
ticker := time.NewTicker(50 * time.Millisecond).C
ticker := time.NewTicker(100 * time.Millisecond).C
for {
select {
case <-ticker:
refresh(in, procList)
case <-ping:
log.Printf("PING")
refresh(in, procList)
}
}
@@ -70,6 +74,6 @@ func refresh(in *inotify.Inotify, pl *process.ProcList) {
if err := pl.Refresh(); err != nil {
log.Printf("ERROR refreshing process list: %v", err)
}
time.Sleep(50 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
in.UnPause()
}

View File

@@ -2,13 +2,16 @@ package inotify
import (
"fmt"
"log"
"strings"
"unsafe"
"golang.org/x/sys/unix"
)
type Inotify struct {
fd int
watchers []*watcher
watchers map[int]*watcher
ping chan struct{}
paused bool
}
@@ -20,9 +23,10 @@ func NewInotify(ping chan struct{}) (*Inotify, error) {
}
i := &Inotify{
fd: fd,
ping: ping,
paused: false,
fd: fd,
watchers: make(map[int]*watcher),
ping: ping,
paused: false,
}
go watch(i)
@@ -34,7 +38,14 @@ func (i *Inotify) Watch(dir string) error {
if err != nil {
return fmt.Errorf("creating watcher: %v", err)
}
i.watchers = append(i.watchers, w)
i.watchers[w.wd] = w
return nil
}
func (i *Inotify) Close() error {
if err := unix.Close(i.fd); err != nil {
return fmt.Errorf("closing inotify file descriptor: %v", err)
}
return nil
}
@@ -47,19 +58,104 @@ func (i *Inotify) UnPause() {
}
func (i *Inotify) String() string {
dirs := make([]string, 0)
for _, w := range i.watchers {
dirs = append(dirs, w.dir)
if len(i.watchers) < 20 {
dirs := make([]string, 0)
for _, w := range i.watchers {
dirs = append(dirs, w.dir)
}
return fmt.Sprintf("Watching: %v", dirs)
} else {
return fmt.Sprintf("Watching %d directories", len(i.watchers))
}
return fmt.Sprintf("Watching: %v", dirs)
}
type bufRead struct {
n int
buf []byte
}
func watch(i *Inotify) {
buf := make([]byte, 1024)
buf := make([]byte, 20*unix.SizeofInotifyEvent)
buffers := make(chan bufRead)
go verboseWatcher(i, buffers)
for {
_, _ = unix.Read(i.fd, buf)
n, _ := unix.Read(i.fd, buf)
if !i.paused {
i.ping <- struct{}{}
}
buffers <- bufRead{
n: n,
buf: buf,
}
}
}
func verboseWatcher(i *Inotify, buffers chan bufRead) {
for bf := range buffers {
n := bf.n
buf := bf.buf
if n < unix.SizeofInotifyEvent {
if n == 0 {
// If EOF is received. This should really never happen.
panic(fmt.Sprintf("No bytes read from fd"))
} else if n < 0 {
// If an error occurred while reading.
log.Printf("ERROR: reading from inotify: %d", n)
} else {
// Read was too short.
log.Printf("ERROR: Short read")
}
continue
}
var offset uint32
for offset <= uint32(n-unix.SizeofInotifyEvent) {
raw := (*unix.InotifyEvent)(unsafe.Pointer(&buf[offset]))
mask := uint32(raw.Mask)
nameLen := uint32(raw.Len)
name := i.watchers[int(raw.Wd)].dir
if nameLen > 0 {
bytes := (*[unix.PathMax]byte)(unsafe.Pointer(&buf[offset+unix.SizeofInotifyEvent]))
if uint32(len(bytes)) > nameLen {
name += "/" + strings.TrimRight(string(bytes[0:nameLen]), "\000")
}
}
ev := newEvent(name, mask)
log.Printf("### %+v", ev)
offset += unix.SizeofInotifyEvent + nameLen
}
}
}
type Event struct {
name string
op string
}
func (e Event) String() string {
return fmt.Sprintf("%10s | %s", e.op, e.name)
}
func newEvent(name string, mask uint32) Event {
e := Event{name: name}
if mask&unix.IN_CREATE == unix.IN_CREATE || mask&unix.IN_MOVED_TO == unix.IN_MOVED_TO {
e.op = "CREATE"
}
if mask&unix.IN_DELETE_SELF == unix.IN_DELETE_SELF || mask&unix.IN_DELETE == unix.IN_DELETE {
e.op = "REMOVE"
}
if mask&unix.IN_MODIFY == unix.IN_MODIFY {
e.op = "WRITE"
}
if mask&unix.IN_MOVE_SELF == unix.IN_MOVE_SELF || mask&unix.IN_MOVED_FROM == unix.IN_MOVED_FROM {
e.op = "RENAME"
}
if mask&unix.IN_ATTRIB == unix.IN_ATTRIB {
e.op = "CHMOD"
}
return e
}

View File

@@ -2,11 +2,15 @@ package inotify
import (
"fmt"
"io/ioutil"
"strconv"
"strings"
"golang.org/x/sys/unix"
)
const events = unix.IN_ALL_EVENTS
const MaximumWatchersFile = "/proc/sys/fs/inotify/max_user_watches"
type watcher struct {
wd int
@@ -23,3 +27,18 @@ func newWatcher(fd int, dir string, ping chan struct{}) (*watcher, error) {
dir: dir,
}, nil
}
func WatcherLimit() (int, error) {
b, err := ioutil.ReadFile(MaximumWatchersFile)
if err != nil {
return 0, fmt.Errorf("reading from %s: %v", MaximumWatchersFile, err)
}
s := strings.TrimSpace(string(b))
m, err := strconv.Atoi(s)
if err != nil {
return 0, fmt.Errorf("converting to integer: %v", err)
}
return m, nil
}

47
internal/walker/walker.go Normal file
View File

@@ -0,0 +1,47 @@
package walker
import (
"fmt"
"io/ioutil"
)
func Walk(root string) (dirCh chan string, errCh chan error) {
dirCh = make(chan string)
errCh = make(chan error)
dirs := make([]string, 1)
dirs[0] = root
go func() {
dirCh <- root
}()
go func() {
for {
if len(dirs) == 0 {
break
}
dirs = descent(dirs, dirCh, errCh)
}
close(dirCh)
close(errCh)
}()
return dirCh, errCh
}
func descent(dirs []string, dirCh chan string, errCh chan error) []string {
next := make([]string, 0)
for _, dir := range dirs {
ls, err := ioutil.ReadDir(dir)
if err != nil {
errCh <- fmt.Errorf("opening dir %s: %v", dir, err)
}
for _, e := range ls {
if e.IsDir() {
newDir := dir + e.Name() + "/"
dirCh <- newDir
next = append(next, newDir)
}
}
}
return next
}