From cff61b11024387f1a84d8f48656206e0f8d14e24 Mon Sep 17 00:00:00 2001 From: Dominic Breuker Date: Fri, 2 Mar 2018 13:59:41 +0100 Subject: [PATCH] refactors psscanner --- cmd/root.go | 23 +++-- internal/fswatcher/event.go | 20 ----- internal/fswatcher/fswatcher.go | 4 +- internal/fswatcher/observer.go | 20 ----- internal/pspy/pspy.go | 89 ++++++++++++------- .../process.go => psscanner/proclist.go} | 49 +++++----- internal/psscanner/proclist_test.go | 40 +++++++++ internal/psscanner/psscanner.go | 21 +++++ 8 files changed, 150 insertions(+), 116 deletions(-) delete mode 100644 internal/fswatcher/event.go delete mode 100644 internal/fswatcher/observer.go rename internal/{process/process.go => psscanner/proclist.go} (66%) create mode 100644 internal/psscanner/proclist_test.go create mode 100644 internal/psscanner/psscanner.go diff --git a/cmd/root.go b/cmd/root.go index 7ff9989..d21178d 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -11,8 +11,8 @@ import ( "github.com/dominicbreuker/pspy/internal/config" "github.com/dominicbreuker/pspy/internal/fswatcher" "github.com/dominicbreuker/pspy/internal/logging" - "github.com/dominicbreuker/pspy/internal/process" "github.com/dominicbreuker/pspy/internal/pspy" + "github.com/dominicbreuker/pspy/internal/psscanner" "github.com/spf13/cobra" ) @@ -68,27 +68,26 @@ func init() { func root(cmd *cobra.Command, args []string) { logger := logging.NewLogger() - cfg := config.Config{ + cfg := &config.Config{ RDirs: rDirs, Dirs: dirs, LogPS: logPS, LogFS: logFS, } - iw, err := fswatcher.NewFSWatcher() - if err != nil { - logger.Errorf("Can't initialize fswatcher: %v", err) - os.Exit(1) - } - defer iw.Close() - pscan := process.NewProcfsScanner() + fsw := fswatcher.NewFSWatcher() + defer fsw.Close() + + pss := psscanner.NewPSScanner() sigCh := make(chan os.Signal) signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) - exit, err := pspy.Start(cfg, logger, iw, pscan, sigCh) - if err != nil { - os.Exit(1) + b := &pspy.Bindings{ + Logger: logger, + FSW: fsw, + PSS: pss, } + exit := pspy.Start(cfg, b, sigCh) <-exit os.Exit(0) } diff --git a/internal/fswatcher/event.go b/internal/fswatcher/event.go deleted file mode 100644 index 19a9285..0000000 --- a/internal/fswatcher/event.go +++ /dev/null @@ -1,20 +0,0 @@ -package fswatcher - -import ( - "fmt" -) - -func parseEvents(i Inotify, dataCh chan []byte, eventCh chan string, errCh chan error) { - for buf := range dataCh { - var ptr uint32 - for len(buf[ptr:]) > 0 { - event, size, err := i.ParseNextEvent(buf[ptr:]) - ptr += size - if err != nil { - errCh <- fmt.Errorf("parsing events: %v", err) - continue - } - eventCh <- fmt.Sprintf("%20s | %s", event.Op, event.Name) - } - } -} diff --git a/internal/fswatcher/fswatcher.go b/internal/fswatcher/fswatcher.go index f577b44..74c137d 100644 --- a/internal/fswatcher/fswatcher.go +++ b/internal/fswatcher/fswatcher.go @@ -27,13 +27,13 @@ type FSWatcher struct { eventSize int } -func NewFSWatcher() (*FSWatcher, error) { +func NewFSWatcher() *FSWatcher { return &FSWatcher{ i: inotify.NewInotify(), w: walker.NewWalker(), maxWatchers: inotify.MaxWatchers, eventSize: inotify.EventSize, - }, nil + } } func (fs *FSWatcher) Close() { diff --git a/internal/fswatcher/observer.go b/internal/fswatcher/observer.go deleted file mode 100644 index 15c5d19..0000000 --- a/internal/fswatcher/observer.go +++ /dev/null @@ -1,20 +0,0 @@ -package fswatcher - -import ( - "golang.org/x/sys/unix" -) - -func Observe(i Inotify, triggerCh chan struct{}, dataCh chan []byte, errCh chan error) { - buf := make([]byte, 5*unix.SizeofInotifyEvent) - - for { - n, err := i.Read(buf) - if err != nil { - errCh <- err - } - triggerCh <- struct{}{} - bufCopy := make([]byte, n) - copy(bufCopy, buf) - dataCh <- bufCopy - } -} diff --git a/internal/pspy/pspy.go b/internal/pspy/pspy.go index 9657602..b0ba1a9 100644 --- a/internal/pspy/pspy.go +++ b/internal/pspy/pspy.go @@ -1,13 +1,18 @@ package pspy import ( - "errors" "os" "time" "github.com/dominicbreuker/pspy/internal/config" ) +type Bindings struct { + Logger Logger + FSW FSWatcher + PSS PSScanner +} + type Logger interface { Infof(format string, v ...interface{}) Errorf(format string, v ...interface{}) @@ -19,56 +24,73 @@ type FSWatcher interface { Run() (chan struct{}, chan string, chan error) } -type ProcfsScanner interface { - Setup(triggerCh chan struct{}, interval time.Duration) (chan string, error) +type PSScanner interface { + Run(triggerCh chan struct{}) (chan string, chan error) } -func Start(cfg config.Config, logger Logger, inotify FSWatcher, pscan ProcfsScanner, sigCh chan os.Signal) (chan struct{}, error) { - logger.Infof("Config: %+v\n", cfg) +func Start(cfg *config.Config, b *Bindings, sigCh chan os.Signal) chan struct{} { + b.Logger.Infof("Config: %+v\n", cfg) - errCh, doneCh := inotify.Init(cfg.RDirs, cfg.Dirs) -initloop: - for { - select { - case <-doneCh: - break initloop - case err := <-errCh: - logger.Errorf("initializing fs watcher: %v", err) + initFSW(b.FSW, cfg.RDirs, cfg.Dirs, b.Logger) + triggerCh, fsEventCh := startFSW(b.FSW, b.Logger) + + psEventCh := startPSS(b.PSS, b.Logger, triggerCh) + + go func() { + for { + <-time.After(100 * time.Millisecond) + triggerCh <- struct{}{} } - } - triggerCh, fsEventCh, errCh := inotify.Run() - go logErrors(errCh, logger) - - psEventCh, err := pscan.Setup(triggerCh, 100*time.Millisecond) - if err != nil { - logger.Errorf("Can't set up procfs scanner: %+v\n", err) - return nil, errors.New("procfs scanner error") - } - - // ignore all file system events created on startup - logger.Infof("Draining file system events due to startup...") - drainChanFor(fsEventCh, 1*time.Second) - logger.Infof("done") + }() exit := make(chan struct{}) go func() { for { select { case se := <-sigCh: - logger.Infof("Exiting program... (%s)\n", se) + b.Logger.Infof("Exiting program... (%s)\n", se) exit <- struct{}{} case fe := <-fsEventCh: if cfg.LogFS { - logger.Eventf("FS: %+v\n", fe) + b.Logger.Eventf("FS: %+v\n", fe) } case pe := <-psEventCh: if cfg.LogPS { - logger.Eventf("CMD: %+v\n", pe) + b.Logger.Eventf("CMD: %+v\n", pe) } } } }() - return exit, nil + return exit +} + +func initFSW(fsw FSWatcher, rdirs, dirs []string, logger Logger) { + errCh, doneCh := fsw.Init(rdirs, dirs) + for { + select { + case <-doneCh: + return + case err := <-errCh: + logger.Errorf("initializing fs watcher: %v", err) + } + } +} + +func startFSW(fsw FSWatcher, logger Logger) (triggerCh chan struct{}, fsEventCh chan string) { + triggerCh, fsEventCh, errCh := fsw.Run() + go logErrors(errCh, logger) + + // ignore all file system events created on startup + logger.Infof("Draining file system events due to startup...") + drainEventsFor(triggerCh, fsEventCh, 1*time.Second) + logger.Infof("done") + return triggerCh, fsEventCh +} + +func startPSS(pss PSScanner, logger Logger, triggerCh chan struct{}) (psEventCh chan string) { + psEventCh, errCh := pss.Run(triggerCh) + go logErrors(errCh, logger) + return psEventCh } func logErrors(errCh chan error, logger Logger) { @@ -78,10 +100,11 @@ func logErrors(errCh chan error, logger Logger) { } } -func drainChanFor(c chan string, d time.Duration) { +func drainEventsFor(triggerCh chan struct{}, eventCh chan string, d time.Duration) { for { select { - case <-c: + case <-triggerCh: + case <-eventCh: case <-time.After(d): return } diff --git a/internal/process/process.go b/internal/psscanner/proclist.go similarity index 66% rename from internal/process/process.go rename to internal/psscanner/proclist.go index 2759de1..150a735 100644 --- a/internal/process/process.go +++ b/internal/psscanner/proclist.go @@ -1,38 +1,30 @@ -package process +package psscanner import ( "fmt" "io/ioutil" - "log" + "os" "strconv" "strings" - "time" ) -type ProcfsScanner struct{} - -func NewProcfsScanner() *ProcfsScanner { - return &ProcfsScanner{} +var procDirReader = func() ([]os.FileInfo, error) { + return ioutil.ReadDir("/proc") } -func (p *ProcfsScanner) Setup(triggerCh chan struct{}, interval time.Duration) (chan string, error) { - psEventCh := make(chan string) - go func() { - for { - <-triggerCh - } - }() - return psEventCh, nil +var procStatusReader = func(pid int) ([]byte, error) { + statPath := fmt.Sprintf("/proc/%d/status", pid) + return ioutil.ReadFile(statPath) } -type ProcList map[int]string - -func NewProcList() *ProcList { - pl := make(ProcList) - return &pl +var cmdLineReader = func(pid int) ([]byte, error) { + cmdPath := fmt.Sprintf("/proc/%d/cmdline", pid) + return ioutil.ReadFile(cmdPath) } -func (pl ProcList) Refresh(print bool) error { +type procList map[int]string + +func (pl procList) refresh(eventCh chan string) error { pids, err := getPIDs() if err != nil { return err @@ -50,9 +42,10 @@ func (pl ProcList) Refresh(print bool) error { if err != nil { uid = "???" } - if print { - log.Printf("\x1b[31;1mCMD: UID=%-4s PID=%-6d | %s\x1b[0m\n", uid, pid, cmd) - } + eventCh <- fmt.Sprintf("CMD: UID=%-4s PID=%-6d | %s", uid, pid, cmd) + // if print { + // log.Printf("\x1b[31;1mCMD: UID=%-4s PID=%-6d | %s\x1b[0m\n", uid, pid, cmd) + // } pl[pid] = cmd } } @@ -61,7 +54,7 @@ func (pl ProcList) Refresh(print bool) error { } func getPIDs() ([]int, error) { - proc, err := ioutil.ReadDir("/proc") + proc, err := procDirReader() if err != nil { return nil, fmt.Errorf("opening proc dir: %v", err) } @@ -81,8 +74,7 @@ func getPIDs() ([]int, error) { } func getCmd(pid int) (string, error) { - cmdPath := fmt.Sprintf("/proc/%d/cmdline", pid) - cmd, err := ioutil.ReadFile(cmdPath) + cmd, err := cmdLineReader(pid) if err != nil { return "", err } @@ -95,8 +87,7 @@ func getCmd(pid int) (string, error) { } func getUID(pid int) (string, error) { - statPath := fmt.Sprintf("/proc/%d/status", pid) - stat, err := ioutil.ReadFile(statPath) + stat, err := procStatusReader(pid) if err != nil { return "", err } diff --git a/internal/psscanner/proclist_test.go b/internal/psscanner/proclist_test.go new file mode 100644 index 0000000..d59716f --- /dev/null +++ b/internal/psscanner/proclist_test.go @@ -0,0 +1,40 @@ +package psscanner + +import ( + "fmt" + "testing" +) + +func TestGetCmd(t *testing.T) { + tests := []struct { + pid int + cmdLine []byte + cmdErr error + cmd string + err string + }{ + {pid: 1, cmdLine: []byte("abc"), cmdErr: nil, cmd: "abc", err: ""}, + } + + for _, tt := range tests { + restore := mockCmdLineReader(tt.cmdLine, tt.cmdErr) + cmd, err := getCmd(tt.pid) + if cmd != tt.cmd { + t.Errorf("Wrong cmd line returned: got %s but want %s", cmd, tt.cmd) + } + if (err != nil || tt.err != "") && fmt.Sprintf("%v", err) != tt.err { + t.Errorf("Wrong error returned: got %v but want %s", err, tt.err) + } + restore() + } +} + +func mockCmdLineReader(cmdLine []byte, err error) (restore func()) { + oldFunc := cmdLineReader + cmdLineReader = func(pid int) ([]byte, error) { + return cmdLine, err + } + return func() { + cmdLineReader = oldFunc + } +} diff --git a/internal/psscanner/psscanner.go b/internal/psscanner/psscanner.go new file mode 100644 index 0000000..21ad0ed --- /dev/null +++ b/internal/psscanner/psscanner.go @@ -0,0 +1,21 @@ +package psscanner + +type PSScanner struct{} + +func NewPSScanner() *PSScanner { + return &PSScanner{} +} + +func (p *PSScanner) Run(triggerCh chan struct{}) (chan string, chan error) { + eventCh := make(chan string, 100) + errCh := make(chan error) + pl := make(procList) + + go func() { + for { + <-triggerCh + pl.refresh(eventCh) + } + }() + return eventCh, errCh +}