refactors psscanner

This commit is contained in:
Dominic Breuker
2018-03-02 13:59:41 +01:00
parent 9670b85f43
commit 644d65be7b
8 changed files with 150 additions and 116 deletions

View File

@@ -11,8 +11,8 @@ import (
"github.com/dominicbreuker/pspy/internal/config"
"github.com/dominicbreuker/pspy/internal/fswatcher"
"github.com/dominicbreuker/pspy/internal/logging"
"github.com/dominicbreuker/pspy/internal/process"
"github.com/dominicbreuker/pspy/internal/pspy"
"github.com/dominicbreuker/pspy/internal/psscanner"
"github.com/spf13/cobra"
)
@@ -68,27 +68,26 @@ func init() {
func root(cmd *cobra.Command, args []string) {
logger := logging.NewLogger()
cfg := config.Config{
cfg := &config.Config{
RDirs: rDirs,
Dirs: dirs,
LogPS: logPS,
LogFS: logFS,
}
iw, err := fswatcher.NewFSWatcher()
if err != nil {
logger.Errorf("Can't initialize fswatcher: %v", err)
os.Exit(1)
}
defer iw.Close()
pscan := process.NewProcfsScanner()
fsw := fswatcher.NewFSWatcher()
defer fsw.Close()
pss := psscanner.NewPSScanner()
sigCh := make(chan os.Signal)
signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
exit, err := pspy.Start(cfg, logger, iw, pscan, sigCh)
if err != nil {
os.Exit(1)
b := &pspy.Bindings{
Logger: logger,
FSW: fsw,
PSS: pss,
}
exit := pspy.Start(cfg, b, sigCh)
<-exit
os.Exit(0)
}

View File

@@ -1,20 +0,0 @@
package fswatcher
import (
"fmt"
)
func parseEvents(i Inotify, dataCh chan []byte, eventCh chan string, errCh chan error) {
for buf := range dataCh {
var ptr uint32
for len(buf[ptr:]) > 0 {
event, size, err := i.ParseNextEvent(buf[ptr:])
ptr += size
if err != nil {
errCh <- fmt.Errorf("parsing events: %v", err)
continue
}
eventCh <- fmt.Sprintf("%20s | %s", event.Op, event.Name)
}
}
}

View File

@@ -27,13 +27,13 @@ type FSWatcher struct {
eventSize int
}
func NewFSWatcher() (*FSWatcher, error) {
func NewFSWatcher() *FSWatcher {
return &FSWatcher{
i: inotify.NewInotify(),
w: walker.NewWalker(),
maxWatchers: inotify.MaxWatchers,
eventSize: inotify.EventSize,
}, nil
}
}
func (fs *FSWatcher) Close() {

View File

@@ -1,20 +0,0 @@
package fswatcher
import (
"golang.org/x/sys/unix"
)
func Observe(i Inotify, triggerCh chan struct{}, dataCh chan []byte, errCh chan error) {
buf := make([]byte, 5*unix.SizeofInotifyEvent)
for {
n, err := i.Read(buf)
if err != nil {
errCh <- err
}
triggerCh <- struct{}{}
bufCopy := make([]byte, n)
copy(bufCopy, buf)
dataCh <- bufCopy
}
}

View File

@@ -1,13 +1,18 @@
package pspy
import (
"errors"
"os"
"time"
"github.com/dominicbreuker/pspy/internal/config"
)
type Bindings struct {
Logger Logger
FSW FSWatcher
PSS PSScanner
}
type Logger interface {
Infof(format string, v ...interface{})
Errorf(format string, v ...interface{})
@@ -19,56 +24,73 @@ type FSWatcher interface {
Run() (chan struct{}, chan string, chan error)
}
type ProcfsScanner interface {
Setup(triggerCh chan struct{}, interval time.Duration) (chan string, error)
type PSScanner interface {
Run(triggerCh chan struct{}) (chan string, chan error)
}
func Start(cfg config.Config, logger Logger, inotify FSWatcher, pscan ProcfsScanner, sigCh chan os.Signal) (chan struct{}, error) {
logger.Infof("Config: %+v\n", cfg)
func Start(cfg *config.Config, b *Bindings, sigCh chan os.Signal) chan struct{} {
b.Logger.Infof("Config: %+v\n", cfg)
errCh, doneCh := inotify.Init(cfg.RDirs, cfg.Dirs)
initloop:
initFSW(b.FSW, cfg.RDirs, cfg.Dirs, b.Logger)
triggerCh, fsEventCh := startFSW(b.FSW, b.Logger)
psEventCh := startPSS(b.PSS, b.Logger, triggerCh)
go func() {
for {
select {
case <-doneCh:
break initloop
case err := <-errCh:
logger.Errorf("initializing fs watcher: %v", err)
<-time.After(100 * time.Millisecond)
triggerCh <- struct{}{}
}
}
triggerCh, fsEventCh, errCh := inotify.Run()
go logErrors(errCh, logger)
psEventCh, err := pscan.Setup(triggerCh, 100*time.Millisecond)
if err != nil {
logger.Errorf("Can't set up procfs scanner: %+v\n", err)
return nil, errors.New("procfs scanner error")
}
// ignore all file system events created on startup
logger.Infof("Draining file system events due to startup...")
drainChanFor(fsEventCh, 1*time.Second)
logger.Infof("done")
}()
exit := make(chan struct{})
go func() {
for {
select {
case se := <-sigCh:
logger.Infof("Exiting program... (%s)\n", se)
b.Logger.Infof("Exiting program... (%s)\n", se)
exit <- struct{}{}
case fe := <-fsEventCh:
if cfg.LogFS {
logger.Eventf("FS: %+v\n", fe)
b.Logger.Eventf("FS: %+v\n", fe)
}
case pe := <-psEventCh:
if cfg.LogPS {
logger.Eventf("CMD: %+v\n", pe)
b.Logger.Eventf("CMD: %+v\n", pe)
}
}
}
}()
return exit, nil
return exit
}
func initFSW(fsw FSWatcher, rdirs, dirs []string, logger Logger) {
errCh, doneCh := fsw.Init(rdirs, dirs)
for {
select {
case <-doneCh:
return
case err := <-errCh:
logger.Errorf("initializing fs watcher: %v", err)
}
}
}
func startFSW(fsw FSWatcher, logger Logger) (triggerCh chan struct{}, fsEventCh chan string) {
triggerCh, fsEventCh, errCh := fsw.Run()
go logErrors(errCh, logger)
// ignore all file system events created on startup
logger.Infof("Draining file system events due to startup...")
drainEventsFor(triggerCh, fsEventCh, 1*time.Second)
logger.Infof("done")
return triggerCh, fsEventCh
}
func startPSS(pss PSScanner, logger Logger, triggerCh chan struct{}) (psEventCh chan string) {
psEventCh, errCh := pss.Run(triggerCh)
go logErrors(errCh, logger)
return psEventCh
}
func logErrors(errCh chan error, logger Logger) {
@@ -78,10 +100,11 @@ func logErrors(errCh chan error, logger Logger) {
}
}
func drainChanFor(c chan string, d time.Duration) {
func drainEventsFor(triggerCh chan struct{}, eventCh chan string, d time.Duration) {
for {
select {
case <-c:
case <-triggerCh:
case <-eventCh:
case <-time.After(d):
return
}

View File

@@ -1,38 +1,30 @@
package process
package psscanner
import (
"fmt"
"io/ioutil"
"log"
"os"
"strconv"
"strings"
"time"
)
type ProcfsScanner struct{}
func NewProcfsScanner() *ProcfsScanner {
return &ProcfsScanner{}
var procDirReader = func() ([]os.FileInfo, error) {
return ioutil.ReadDir("/proc")
}
func (p *ProcfsScanner) Setup(triggerCh chan struct{}, interval time.Duration) (chan string, error) {
psEventCh := make(chan string)
go func() {
for {
<-triggerCh
}
}()
return psEventCh, nil
var procStatusReader = func(pid int) ([]byte, error) {
statPath := fmt.Sprintf("/proc/%d/status", pid)
return ioutil.ReadFile(statPath)
}
type ProcList map[int]string
func NewProcList() *ProcList {
pl := make(ProcList)
return &pl
var cmdLineReader = func(pid int) ([]byte, error) {
cmdPath := fmt.Sprintf("/proc/%d/cmdline", pid)
return ioutil.ReadFile(cmdPath)
}
func (pl ProcList) Refresh(print bool) error {
type procList map[int]string
func (pl procList) refresh(eventCh chan string) error {
pids, err := getPIDs()
if err != nil {
return err
@@ -50,9 +42,10 @@ func (pl ProcList) Refresh(print bool) error {
if err != nil {
uid = "???"
}
if print {
log.Printf("\x1b[31;1mCMD: UID=%-4s PID=%-6d | %s\x1b[0m\n", uid, pid, cmd)
}
eventCh <- fmt.Sprintf("CMD: UID=%-4s PID=%-6d | %s", uid, pid, cmd)
// if print {
// log.Printf("\x1b[31;1mCMD: UID=%-4s PID=%-6d | %s\x1b[0m\n", uid, pid, cmd)
// }
pl[pid] = cmd
}
}
@@ -61,7 +54,7 @@ func (pl ProcList) Refresh(print bool) error {
}
func getPIDs() ([]int, error) {
proc, err := ioutil.ReadDir("/proc")
proc, err := procDirReader()
if err != nil {
return nil, fmt.Errorf("opening proc dir: %v", err)
}
@@ -81,8 +74,7 @@ func getPIDs() ([]int, error) {
}
func getCmd(pid int) (string, error) {
cmdPath := fmt.Sprintf("/proc/%d/cmdline", pid)
cmd, err := ioutil.ReadFile(cmdPath)
cmd, err := cmdLineReader(pid)
if err != nil {
return "", err
}
@@ -95,8 +87,7 @@ func getCmd(pid int) (string, error) {
}
func getUID(pid int) (string, error) {
statPath := fmt.Sprintf("/proc/%d/status", pid)
stat, err := ioutil.ReadFile(statPath)
stat, err := procStatusReader(pid)
if err != nil {
return "", err
}

View File

@@ -0,0 +1,40 @@
package psscanner
import (
"fmt"
"testing"
)
func TestGetCmd(t *testing.T) {
tests := []struct {
pid int
cmdLine []byte
cmdErr error
cmd string
err string
}{
{pid: 1, cmdLine: []byte("abc"), cmdErr: nil, cmd: "abc", err: ""},
}
for _, tt := range tests {
restore := mockCmdLineReader(tt.cmdLine, tt.cmdErr)
cmd, err := getCmd(tt.pid)
if cmd != tt.cmd {
t.Errorf("Wrong cmd line returned: got %s but want %s", cmd, tt.cmd)
}
if (err != nil || tt.err != "") && fmt.Sprintf("%v", err) != tt.err {
t.Errorf("Wrong error returned: got %v but want %s", err, tt.err)
}
restore()
}
}
func mockCmdLineReader(cmdLine []byte, err error) (restore func()) {
oldFunc := cmdLineReader
cmdLineReader = func(pid int) ([]byte, error) {
return cmdLine, err
}
return func() {
cmdLineReader = oldFunc
}
}

View File

@@ -0,0 +1,21 @@
package psscanner
type PSScanner struct{}
func NewPSScanner() *PSScanner {
return &PSScanner{}
}
func (p *PSScanner) Run(triggerCh chan struct{}) (chan string, chan error) {
eventCh := make(chan string, 100)
errCh := make(chan error)
pl := make(procList)
go func() {
for {
<-triggerCh
pl.refresh(eventCh)
}
}()
return eventCh, errCh
}