refactors psscanner

This commit is contained in:
Dominic Breuker
2018-03-02 13:59:41 +01:00
parent 26c67a6e5c
commit cff61b1102
8 changed files with 150 additions and 116 deletions

View File

@@ -11,8 +11,8 @@ import (
"github.com/dominicbreuker/pspy/internal/config" "github.com/dominicbreuker/pspy/internal/config"
"github.com/dominicbreuker/pspy/internal/fswatcher" "github.com/dominicbreuker/pspy/internal/fswatcher"
"github.com/dominicbreuker/pspy/internal/logging" "github.com/dominicbreuker/pspy/internal/logging"
"github.com/dominicbreuker/pspy/internal/process"
"github.com/dominicbreuker/pspy/internal/pspy" "github.com/dominicbreuker/pspy/internal/pspy"
"github.com/dominicbreuker/pspy/internal/psscanner"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@@ -68,27 +68,26 @@ func init() {
func root(cmd *cobra.Command, args []string) { func root(cmd *cobra.Command, args []string) {
logger := logging.NewLogger() logger := logging.NewLogger()
cfg := config.Config{ cfg := &config.Config{
RDirs: rDirs, RDirs: rDirs,
Dirs: dirs, Dirs: dirs,
LogPS: logPS, LogPS: logPS,
LogFS: logFS, LogFS: logFS,
} }
iw, err := fswatcher.NewFSWatcher() fsw := fswatcher.NewFSWatcher()
if err != nil { defer fsw.Close()
logger.Errorf("Can't initialize fswatcher: %v", err)
os.Exit(1) pss := psscanner.NewPSScanner()
}
defer iw.Close()
pscan := process.NewProcfsScanner()
sigCh := make(chan os.Signal) sigCh := make(chan os.Signal)
signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
exit, err := pspy.Start(cfg, logger, iw, pscan, sigCh) b := &pspy.Bindings{
if err != nil { Logger: logger,
os.Exit(1) FSW: fsw,
PSS: pss,
} }
exit := pspy.Start(cfg, b, sigCh)
<-exit <-exit
os.Exit(0) os.Exit(0)
} }

View File

@@ -1,20 +0,0 @@
package fswatcher
import (
"fmt"
)
func parseEvents(i Inotify, dataCh chan []byte, eventCh chan string, errCh chan error) {
for buf := range dataCh {
var ptr uint32
for len(buf[ptr:]) > 0 {
event, size, err := i.ParseNextEvent(buf[ptr:])
ptr += size
if err != nil {
errCh <- fmt.Errorf("parsing events: %v", err)
continue
}
eventCh <- fmt.Sprintf("%20s | %s", event.Op, event.Name)
}
}
}

View File

@@ -27,13 +27,13 @@ type FSWatcher struct {
eventSize int eventSize int
} }
func NewFSWatcher() (*FSWatcher, error) { func NewFSWatcher() *FSWatcher {
return &FSWatcher{ return &FSWatcher{
i: inotify.NewInotify(), i: inotify.NewInotify(),
w: walker.NewWalker(), w: walker.NewWalker(),
maxWatchers: inotify.MaxWatchers, maxWatchers: inotify.MaxWatchers,
eventSize: inotify.EventSize, eventSize: inotify.EventSize,
}, nil }
} }
func (fs *FSWatcher) Close() { func (fs *FSWatcher) Close() {

View File

@@ -1,20 +0,0 @@
package fswatcher
import (
"golang.org/x/sys/unix"
)
func Observe(i Inotify, triggerCh chan struct{}, dataCh chan []byte, errCh chan error) {
buf := make([]byte, 5*unix.SizeofInotifyEvent)
for {
n, err := i.Read(buf)
if err != nil {
errCh <- err
}
triggerCh <- struct{}{}
bufCopy := make([]byte, n)
copy(bufCopy, buf)
dataCh <- bufCopy
}
}

View File

@@ -1,13 +1,18 @@
package pspy package pspy
import ( import (
"errors"
"os" "os"
"time" "time"
"github.com/dominicbreuker/pspy/internal/config" "github.com/dominicbreuker/pspy/internal/config"
) )
type Bindings struct {
Logger Logger
FSW FSWatcher
PSS PSScanner
}
type Logger interface { type Logger interface {
Infof(format string, v ...interface{}) Infof(format string, v ...interface{})
Errorf(format string, v ...interface{}) Errorf(format string, v ...interface{})
@@ -19,56 +24,73 @@ type FSWatcher interface {
Run() (chan struct{}, chan string, chan error) Run() (chan struct{}, chan string, chan error)
} }
type ProcfsScanner interface { type PSScanner interface {
Setup(triggerCh chan struct{}, interval time.Duration) (chan string, error) Run(triggerCh chan struct{}) (chan string, chan error)
} }
func Start(cfg config.Config, logger Logger, inotify FSWatcher, pscan ProcfsScanner, sigCh chan os.Signal) (chan struct{}, error) { func Start(cfg *config.Config, b *Bindings, sigCh chan os.Signal) chan struct{} {
logger.Infof("Config: %+v\n", cfg) b.Logger.Infof("Config: %+v\n", cfg)
errCh, doneCh := inotify.Init(cfg.RDirs, cfg.Dirs) initFSW(b.FSW, cfg.RDirs, cfg.Dirs, b.Logger)
initloop: triggerCh, fsEventCh := startFSW(b.FSW, b.Logger)
psEventCh := startPSS(b.PSS, b.Logger, triggerCh)
go func() {
for { for {
select { <-time.After(100 * time.Millisecond)
case <-doneCh: triggerCh <- struct{}{}
break initloop
case err := <-errCh:
logger.Errorf("initializing fs watcher: %v", err)
} }
} }()
triggerCh, fsEventCh, errCh := inotify.Run()
go logErrors(errCh, logger)
psEventCh, err := pscan.Setup(triggerCh, 100*time.Millisecond)
if err != nil {
logger.Errorf("Can't set up procfs scanner: %+v\n", err)
return nil, errors.New("procfs scanner error")
}
// ignore all file system events created on startup
logger.Infof("Draining file system events due to startup...")
drainChanFor(fsEventCh, 1*time.Second)
logger.Infof("done")
exit := make(chan struct{}) exit := make(chan struct{})
go func() { go func() {
for { for {
select { select {
case se := <-sigCh: case se := <-sigCh:
logger.Infof("Exiting program... (%s)\n", se) b.Logger.Infof("Exiting program... (%s)\n", se)
exit <- struct{}{} exit <- struct{}{}
case fe := <-fsEventCh: case fe := <-fsEventCh:
if cfg.LogFS { if cfg.LogFS {
logger.Eventf("FS: %+v\n", fe) b.Logger.Eventf("FS: %+v\n", fe)
} }
case pe := <-psEventCh: case pe := <-psEventCh:
if cfg.LogPS { if cfg.LogPS {
logger.Eventf("CMD: %+v\n", pe) b.Logger.Eventf("CMD: %+v\n", pe)
} }
} }
} }
}() }()
return exit, nil return exit
}
func initFSW(fsw FSWatcher, rdirs, dirs []string, logger Logger) {
errCh, doneCh := fsw.Init(rdirs, dirs)
for {
select {
case <-doneCh:
return
case err := <-errCh:
logger.Errorf("initializing fs watcher: %v", err)
}
}
}
func startFSW(fsw FSWatcher, logger Logger) (triggerCh chan struct{}, fsEventCh chan string) {
triggerCh, fsEventCh, errCh := fsw.Run()
go logErrors(errCh, logger)
// ignore all file system events created on startup
logger.Infof("Draining file system events due to startup...")
drainEventsFor(triggerCh, fsEventCh, 1*time.Second)
logger.Infof("done")
return triggerCh, fsEventCh
}
func startPSS(pss PSScanner, logger Logger, triggerCh chan struct{}) (psEventCh chan string) {
psEventCh, errCh := pss.Run(triggerCh)
go logErrors(errCh, logger)
return psEventCh
} }
func logErrors(errCh chan error, logger Logger) { func logErrors(errCh chan error, logger Logger) {
@@ -78,10 +100,11 @@ func logErrors(errCh chan error, logger Logger) {
} }
} }
func drainChanFor(c chan string, d time.Duration) { func drainEventsFor(triggerCh chan struct{}, eventCh chan string, d time.Duration) {
for { for {
select { select {
case <-c: case <-triggerCh:
case <-eventCh:
case <-time.After(d): case <-time.After(d):
return return
} }

View File

@@ -1,38 +1,30 @@
package process package psscanner
import ( import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log" "os"
"strconv" "strconv"
"strings" "strings"
"time"
) )
type ProcfsScanner struct{} var procDirReader = func() ([]os.FileInfo, error) {
return ioutil.ReadDir("/proc")
func NewProcfsScanner() *ProcfsScanner {
return &ProcfsScanner{}
} }
func (p *ProcfsScanner) Setup(triggerCh chan struct{}, interval time.Duration) (chan string, error) { var procStatusReader = func(pid int) ([]byte, error) {
psEventCh := make(chan string) statPath := fmt.Sprintf("/proc/%d/status", pid)
go func() { return ioutil.ReadFile(statPath)
for {
<-triggerCh
}
}()
return psEventCh, nil
} }
type ProcList map[int]string var cmdLineReader = func(pid int) ([]byte, error) {
cmdPath := fmt.Sprintf("/proc/%d/cmdline", pid)
func NewProcList() *ProcList { return ioutil.ReadFile(cmdPath)
pl := make(ProcList)
return &pl
} }
func (pl ProcList) Refresh(print bool) error { type procList map[int]string
func (pl procList) refresh(eventCh chan string) error {
pids, err := getPIDs() pids, err := getPIDs()
if err != nil { if err != nil {
return err return err
@@ -50,9 +42,10 @@ func (pl ProcList) Refresh(print bool) error {
if err != nil { if err != nil {
uid = "???" uid = "???"
} }
if print { eventCh <- fmt.Sprintf("CMD: UID=%-4s PID=%-6d | %s", uid, pid, cmd)
log.Printf("\x1b[31;1mCMD: UID=%-4s PID=%-6d | %s\x1b[0m\n", uid, pid, cmd) // if print {
} // log.Printf("\x1b[31;1mCMD: UID=%-4s PID=%-6d | %s\x1b[0m\n", uid, pid, cmd)
// }
pl[pid] = cmd pl[pid] = cmd
} }
} }
@@ -61,7 +54,7 @@ func (pl ProcList) Refresh(print bool) error {
} }
func getPIDs() ([]int, error) { func getPIDs() ([]int, error) {
proc, err := ioutil.ReadDir("/proc") proc, err := procDirReader()
if err != nil { if err != nil {
return nil, fmt.Errorf("opening proc dir: %v", err) return nil, fmt.Errorf("opening proc dir: %v", err)
} }
@@ -81,8 +74,7 @@ func getPIDs() ([]int, error) {
} }
func getCmd(pid int) (string, error) { func getCmd(pid int) (string, error) {
cmdPath := fmt.Sprintf("/proc/%d/cmdline", pid) cmd, err := cmdLineReader(pid)
cmd, err := ioutil.ReadFile(cmdPath)
if err != nil { if err != nil {
return "", err return "", err
} }
@@ -95,8 +87,7 @@ func getCmd(pid int) (string, error) {
} }
func getUID(pid int) (string, error) { func getUID(pid int) (string, error) {
statPath := fmt.Sprintf("/proc/%d/status", pid) stat, err := procStatusReader(pid)
stat, err := ioutil.ReadFile(statPath)
if err != nil { if err != nil {
return "", err return "", err
} }

View File

@@ -0,0 +1,40 @@
package psscanner
import (
"fmt"
"testing"
)
func TestGetCmd(t *testing.T) {
tests := []struct {
pid int
cmdLine []byte
cmdErr error
cmd string
err string
}{
{pid: 1, cmdLine: []byte("abc"), cmdErr: nil, cmd: "abc", err: ""},
}
for _, tt := range tests {
restore := mockCmdLineReader(tt.cmdLine, tt.cmdErr)
cmd, err := getCmd(tt.pid)
if cmd != tt.cmd {
t.Errorf("Wrong cmd line returned: got %s but want %s", cmd, tt.cmd)
}
if (err != nil || tt.err != "") && fmt.Sprintf("%v", err) != tt.err {
t.Errorf("Wrong error returned: got %v but want %s", err, tt.err)
}
restore()
}
}
func mockCmdLineReader(cmdLine []byte, err error) (restore func()) {
oldFunc := cmdLineReader
cmdLineReader = func(pid int) ([]byte, error) {
return cmdLine, err
}
return func() {
cmdLineReader = oldFunc
}
}

View File

@@ -0,0 +1,21 @@
package psscanner
type PSScanner struct{}
func NewPSScanner() *PSScanner {
return &PSScanner{}
}
func (p *PSScanner) Run(triggerCh chan struct{}) (chan string, chan error) {
eventCh := make(chan string, 100)
errCh := make(chan error)
pl := make(procList)
go func() {
for {
<-triggerCh
pl.refresh(eventCh)
}
}()
return eventCh, errCh
}