diff options
Diffstat (limited to 'eows')
-rw-r--r-- | eows/eows-in.go | 59 | ||||
-rw-r--r-- | eows/eows-out.go | 122 | ||||
-rw-r--r-- | eows/eows-signal.go | 48 | ||||
-rw-r--r-- | eows/eows-signal_windows.go | 43 | ||||
-rw-r--r-- | eows/eows.go | 287 |
5 files changed, 559 insertions, 0 deletions
diff --git a/eows/eows-in.go b/eows/eows-in.go new file mode 100644 index 0000000..5e74c76 --- /dev/null +++ b/eows/eows-in.go @@ -0,0 +1,59 @@ +package eows + +import ( + "fmt" + "os" + "syscall" + "time" +) + +// DoneChan Channel used to propagate status+error on command exit +type DoneChan struct { + status int + err error +} + +// pumpStdin is in charge of receive characters and send them to stdin +func (e *ExecOverWS) pumpStdin(inw *os.File) { + + done := make(chan DoneChan, 1) + + if e.InputEvent != "" && e.InputCB != nil { + + err := (*e.SocketIO).On(e.InputEvent, func(stdin []byte) { + in, err := e.InputCB(e, stdin) + if err != nil { + e.logDebug("Error stdin: %s", err.Error()) + inw.Close() + return + } + if _, err := inw.Write(in); err != nil { + e.logError("Error while writing to stdin: %s", err.Error()) + } + }) + if err != nil { + e.logError("Error stdin on event: %s", err.Error()) + } + } + + // Monitor process exit + go func() { + status := 0 + sts, err := e.proc.Wait() + if !sts.Success() { + s := sts.Sys().(syscall.WaitStatus) + status = s.ExitStatus() + } + e.procExited = true + + done <- DoneChan{status, err} + }() + + // Wait cmd complete + select { + case dC := <-done: + e.ExitCB(e, dC.status, dC.err) + case <-time.After(time.Duration(e.CmdExecTimeout) * time.Second): + e.ExitCB(e, -999, fmt.Errorf("Exit Timeout for command ID %v", e.CmdID)) + } +} diff --git a/eows/eows-out.go b/eows/eows-out.go new file mode 100644 index 0000000..3163b2f --- /dev/null +++ b/eows/eows-out.go @@ -0,0 +1,122 @@ +package eows + +import ( + "bufio" + "io" + "strings" + "time" +) + +// scanChars - gain character by character (or as soon as one or more characters are available) +func scanChars(data []byte, atEOF bool) (advance int, token []byte, err error) { + if atEOF && len(data) == 0 { + return 0, nil, nil + } + return len(data), data, nil +} + +// _pumper is in charge to collect +func (e *ExecOverWS) _pumper(sc *bufio.Scanner, fctCB func(s string)) { + + // Select split function (default sc.ScanLines) + if e.OutSplit == SplitChar || e.OutSplit == SplitLineTime || e.OutSplit == SplitTime { + sc.Split(scanChars) + } + + // Scan method according to split type + if e.OutSplit == SplitLineTime || e.OutSplit == SplitTime { + t0 := time.Now() + buf := "" + for sc.Scan() { + buf += sc.Text() + if time.Since(t0).Nanoseconds() > e.LineTimeSpan || + (e.OutSplit == SplitLineTime && strings.Contains(buf, "\n")) { + fctCB(buf) + buf = "" + t0 = time.Now() + } + if e.procExited { + break + } + } + // Send remaining characters + if len(buf) > 0 { + fctCB(buf) + } + + } else { + + for sc.Scan() { + fctCB(sc.Text()) + if e.procExited { + break + } + } + } + +} + +// pipePumpStdout is in charge to forward stdout in websocket +func (e *ExecOverWS) pipePumpStdout(r io.Reader, done chan struct{}) { + + sc := bufio.NewScanner(r) + + e._pumper(sc, func(b string) { + if e.OutputCB != nil { + e.OutputCB(e, []byte(b), []byte{}) + } + }) + + e.logDebug("STDOUT pump exit") + + if sc.Err() != nil && !strings.Contains(sc.Err().Error(), "file already closed") { + e.logError("stdout scan: %v", sc.Err()) + } + + close(done) +} + +// pipePumpStderr is in charge to forward stderr in websocket +func (e *ExecOverWS) pipePumpStderr(r io.Reader) { + + sc := bufio.NewScanner(r) + + e._pumper(sc, func(b string) { + if e.OutputCB != nil { + e.OutputCB(e, []byte{}, []byte(b)) + } + }) + + e.logDebug("STDERR pump exit") + + if sc.Err() != nil && !strings.Contains(sc.Err().Error(), "file already closed") { + e.logError("stderr scan: %v", sc.Err()) + } +} + +// ptsPumpStdout is in charge to forward stdout in websocket +// (only used when PtyMode is set) +func (e *ExecOverWS) ptsPumpStdout(r io.Reader, done chan struct{}) { + + buffer := make([]byte, 1024) + for { + n, err := r.Read(buffer) + if err != nil { + if err != io.EOF && + !strings.Contains(err.Error(), "file already closed") { + e.logError("Error stdout read: %v", err) + } + break + } + if n == 0 { + continue + } + if e.OutputCB != nil { + e.OutputCB(e, buffer[:n], []byte{}) + } + } + + close(done) + + e.logDebug("Eows stdout pump exited") +} diff --git a/eows/eows-signal.go b/eows/eows-signal.go new file mode 100644 index 0000000..f48279a --- /dev/null +++ b/eows/eows-signal.go @@ -0,0 +1,48 @@ +// +build !windows + +// Package eows is used to Execute commands Over WebSocket +package eows + +import ( + "fmt" + "os" + "syscall" +) + +// Signal sends a signal to the running command / process +func (e *ExecOverWS) Signal(signal string) error { + var sig os.Signal + switch signal { + case "quit", "SIGQUIT": + sig = syscall.SIGQUIT + case "terminated", "SIGTERM": + sig = syscall.SIGTERM + case "interrupt", "SIGINT": + sig = syscall.SIGINT + case "aborted", "SIGABRT": + sig = syscall.SIGABRT + case "continued", "SIGCONT": + sig = syscall.SIGCONT + case "hangup", "SIGHUP": + sig = syscall.SIGHUP + case "killed", "SIGKILL": + sig = syscall.SIGKILL + case "stopped (signal)", "SIGSTOP": + sig = syscall.SIGSTOP + case "stopped", "SIGTSTP": + sig = syscall.SIGTSTP + case "user defined signal 1", "SIGUSR1": + sig = syscall.SIGUSR1 + case "user defined signal 2", "SIGUSR2": + sig = syscall.SIGUSR2 + default: + return fmt.Errorf("Unsupported signal") + } + + if e.proc == nil { + return fmt.Errorf("Cannot retrieve process") + } + + e.logDebug("SEND signal %v to proc %v", sig, e.proc.Pid) + return e.proc.Signal(sig) +} diff --git a/eows/eows-signal_windows.go b/eows/eows-signal_windows.go new file mode 100644 index 0000000..23ad924 --- /dev/null +++ b/eows/eows-signal_windows.go @@ -0,0 +1,43 @@ +package eows + +// +build windows + +// Signal sends a signal to the running command / process +func (e *ExecOverWS) Signal(signal string) error { + panic("FIXME: Not implemented") + /* + var sig os.Signal + switch signal { + case "quit", "SIGQUIT": + sig = syscall.SIGQUIT + case "terminated", "SIGTERM": + sig = syscall.SIGTERM + case "interrupt", "SIGINT": + sig = syscall.SIGINT + case "aborted", "SIGABRT": + sig = syscall.SIGABRT + case "continued", "SIGCONT": + sig = syscall.SIGCONT + case "hangup", "SIGHUP": + sig = syscall.SIGHUP + case "killed", "SIGKILL": + sig = syscall.SIGKILL + case "stopped (signal)", "SIGSTOP": + sig = syscall.SIGSTOP + case "stopped", "SIGTSTP": + sig = syscall.SIGTSTP + case "user defined signal 1", "SIGUSR1": + sig = syscall.SIGUSR1 + case "user defined signal 2", "SIGUSR2": + sig = syscall.SIGUSR2 + default: + return fmt.Errorf("Unsupported signal") + } + + if e.proc == nil { + return fmt.Errorf("Cannot retrieve process") + } + fmt.Printf("SEND signal %v to proc %v\n", sig, e.proc.Pid) + return e.proc.Signal(sig) + */ +} diff --git a/eows/eows.go b/eows/eows.go new file mode 100644 index 0000000..9d0b520 --- /dev/null +++ b/eows/eows.go @@ -0,0 +1,287 @@ +// Package eows is used to Execute commands Over WebSocket +package eows + +import ( + "fmt" + "os" + "os/exec" + "strings" + "syscall" + "time" + "unsafe" + + "github.com/Sirupsen/logrus" + "github.com/googollee/go-socket.io" + "github.com/kr/pty" +) + +// OnInputCB is the function callback used to receive data +type OnInputCB func(e *ExecOverWS, stdin []byte) ([]byte, error) + +// EmitOutputCB is the function callback used to emit data +type EmitOutputCB func(e *ExecOverWS, stdout, stderr []byte) + +// EmitExitCB is the function callback used to emit exit proc code +type EmitExitCB func(e *ExecOverWS, code int, err error) + +// SplitType Type of spliting method to tokenize stdout/stderr +type SplitType uint8 + +const ( + // SplitLine Split line by line + SplitLine SplitType = iota + // SplitChar Split character by character + SplitChar + // SplitLineTime Split by line or until a timeout has passed + SplitLineTime + // SplitTime Split until a timeout has passed + SplitTime +) + +// Inspired by : +// https://github.com/gorilla/websocket/blob/master/examples/command/main.go + +// ExecOverWS . +type ExecOverWS struct { + Cmd string // command name to execute + Args []string // command arguments + SocketIO *socketio.Socket // websocket + Sid string // websocket ID + CmdID string // command ID + + // Optional fields + Env []string // command environment variables + CmdExecTimeout int // command execution time timeout + Log *logrus.Logger // logger (nil if disabled) + InputEvent string // websocket input event name + InputCB OnInputCB // stdin callback + OutputCB EmitOutputCB // stdout/stderr callback + ExitCB EmitExitCB // exit proc callback + UserData *map[string]interface{} // user data passed to callbacks + OutSplit SplitType // split method to tokenize stdout/stderr + LineTimeSpan int64 // time span (only used with SplitTime or SplitLineTime) + PtyMode bool // Allocate a pseudo-terminal (allow to execute screen-based program) + PtyTermEcho bool // Turn on/off terminal echo + + // Private fields + + proc *os.Process + command *exec.Cmd + ptmx *os.File + procExited bool +} + +var cmdIDMap = make(map[string]*ExecOverWS) + +// New creates a new instace of eows +func New(cmd string, args []string, so *socketio.Socket, soID, cmdID string) *ExecOverWS { + + e := &ExecOverWS{ + Cmd: cmd, + Args: args, + SocketIO: so, + Sid: soID, + CmdID: cmdID, + CmdExecTimeout: -1, // default no timeout + OutSplit: SplitLineTime, // default split by line with time + LineTimeSpan: 500 * time.Millisecond.Nanoseconds(), + PtyMode: false, + PtyTermEcho: true, + } + + cmdIDMap[cmdID] = e + + return e +} + +// GetEows gets ExecOverWS object from command ID +func GetEows(cmdID string) *ExecOverWS { + if _, ok := cmdIDMap[cmdID]; !ok { + return nil + } + return cmdIDMap[cmdID] +} + +// Start executes the command and redirect stdout/stderr into a WebSocket +func (e *ExecOverWS) Start() error { + var err error + var outr, outw, errr, errw, inr, inw *os.File + + bashArgs := []string{"/bin/bash", "-c", e.Cmd + " " + strings.Join(e.Args, " ")} + + // no timeout == 1 year + if e.CmdExecTimeout == -1 { + e.CmdExecTimeout = 365 * 24 * 60 * 60 + } + + e.procExited = false + + if e.PtyMode { + + e.command = exec.Command(bashArgs[0], bashArgs[1:]...) + e.command.Env = append(os.Environ(), e.Env...) + e.ptmx, err = pty.Start(e.command) + if err != nil { + err = fmt.Errorf("Process start error: " + err.Error()) + goto exitErr + } + e.proc = e.command.Process + + // Turn off terminal echo + if !e.PtyTermEcho { + e.terminalEcho(e.ptmx, false) + } + + } else { + + // Create pipes + outr, outw, err = os.Pipe() + if err != nil { + err = fmt.Errorf("Pipe stdout error: " + err.Error()) + goto exitErr + } + + errr, errw, err = os.Pipe() + if err != nil { + err = fmt.Errorf("Pipe stderr error: " + err.Error()) + goto exitErr + } + + inr, inw, err = os.Pipe() + if err != nil { + err = fmt.Errorf("Pipe stdin error: " + err.Error()) + goto exitErr + } + + e.proc, err = os.StartProcess(bashArgs[0], bashArgs, &os.ProcAttr{ + Files: []*os.File{inr, outw, errw}, + Env: append(os.Environ(), e.Env...), + }) + if err != nil { + err = fmt.Errorf("Process start error: " + err.Error()) + goto exitErr + } + } + + go func() { + stdoutDone := make(chan struct{}) + + if e.PtyMode { + // Make sure to close the pty at the end. + defer e.ptmx.Close() + + // Handle both stdout mixed with stderr + go e.ptsPumpStdout(e.ptmx, stdoutDone) + + // Blocking function that poll input or wait for end of process + e.pumpStdin(e.ptmx) + + } else { + // Make sure to close all pipes + defer outr.Close() + defer outw.Close() + defer errr.Close() + defer errw.Close() + defer inr.Close() + defer inw.Close() + + // Handle stdout + stderr + go e.pipePumpStdout(outr, stdoutDone) + go e.pipePumpStderr(errr) + + // Blocking function that poll input or wait for end of process + e.pumpStdin(inw) + } + + if status, err := e.proc.Wait(); err == nil { + // Other commands need a bonk on the head. + if !status.Exited() { + if err := e.proc.Signal(os.Interrupt); err != nil { + e.logError("Proc interrupt:", err) + } + + select { + case <-stdoutDone: + case <-time.After(time.Second): + // A bigger bonk on the head. + if err := e.proc.Signal(os.Kill); err != nil { + e.logError("Proc term:", err) + } + <-stdoutDone + } + } + } + + delete(cmdIDMap, e.CmdID) + }() + + return nil + +exitErr: + for _, pf := range []*os.File{outr, outw, errr, errw, inr, inw} { + pf.Close() + } + return err +} + +// TerminalSetSize Set terminal size +func (e *ExecOverWS) TerminalSetSize(rows, cols uint16) error { + if !e.PtyMode || e.ptmx == nil { + return fmt.Errorf("PtyMode not set") + } + w, err := pty.GetsizeFull(e.ptmx) + if err != nil { + return err + } + return e.TerminalSetSizePos(rows, cols, w.X, w.Y) +} + +// TerminalSetSizePos Set terminal size and position +func (e *ExecOverWS) TerminalSetSizePos(rows, cols, x, y uint16) error { + if !e.PtyMode || e.ptmx == nil { + return fmt.Errorf("PtyMode not set") + } + winSz := pty.Winsize{Rows: rows, Cols: cols, X: x, Y: y} + return pty.Setsize(e.ptmx, &winSz) +} + +/** + * Private functions + **/ + +// terminalEcho Enable or disable echoing terminal input. +// This is useful specifically for when users enter passwords. +func (e *ExecOverWS) terminalEcho(ff *os.File, show bool) { + var termios = &syscall.Termios{} + + fd := ff.Fd() + + if _, _, err := syscall.Syscall(syscall.SYS_IOCTL, fd, + syscall.TCGETS, uintptr(unsafe.Pointer(termios))); err != 0 { + return + } + + if show { + termios.Lflag |= syscall.ECHO + } else { + termios.Lflag &^= syscall.ECHO + } + + if _, _, err := syscall.Syscall(syscall.SYS_IOCTL, fd, + uintptr(syscall.TCSETS), + uintptr(unsafe.Pointer(termios))); err != 0 { + return + } +} + +func (e *ExecOverWS) logDebug(format string, a ...interface{}) { + if e.Log != nil { + e.Log.Debugf(format, a...) + } +} + +func (e *ExecOverWS) logError(format string, a ...interface{}) { + if e.Log != nil { + e.Log.Errorf(format, a...) + } +} |