summaryrefslogtreecommitdiffstats
path: root/eows
diff options
context:
space:
mode:
Diffstat (limited to 'eows')
-rw-r--r--eows/eows-in.go59
-rw-r--r--eows/eows-out.go122
-rw-r--r--eows/eows-signal.go48
-rw-r--r--eows/eows-signal_windows.go43
-rw-r--r--eows/eows.go287
5 files changed, 559 insertions, 0 deletions
diff --git a/eows/eows-in.go b/eows/eows-in.go
new file mode 100644
index 0000000..5e74c76
--- /dev/null
+++ b/eows/eows-in.go
@@ -0,0 +1,59 @@
+package eows
+
+import (
+ "fmt"
+ "os"
+ "syscall"
+ "time"
+)
+
+// DoneChan Channel used to propagate status+error on command exit
+type DoneChan struct {
+ status int
+ err error
+}
+
+// pumpStdin is in charge of receive characters and send them to stdin
+func (e *ExecOverWS) pumpStdin(inw *os.File) {
+
+ done := make(chan DoneChan, 1)
+
+ if e.InputEvent != "" && e.InputCB != nil {
+
+ err := (*e.SocketIO).On(e.InputEvent, func(stdin []byte) {
+ in, err := e.InputCB(e, stdin)
+ if err != nil {
+ e.logDebug("Error stdin: %s", err.Error())
+ inw.Close()
+ return
+ }
+ if _, err := inw.Write(in); err != nil {
+ e.logError("Error while writing to stdin: %s", err.Error())
+ }
+ })
+ if err != nil {
+ e.logError("Error stdin on event: %s", err.Error())
+ }
+ }
+
+ // Monitor process exit
+ go func() {
+ status := 0
+ sts, err := e.proc.Wait()
+ if !sts.Success() {
+ s := sts.Sys().(syscall.WaitStatus)
+ status = s.ExitStatus()
+ }
+ e.procExited = true
+
+ done <- DoneChan{status, err}
+ }()
+
+ // Wait cmd complete
+ select {
+ case dC := <-done:
+ e.ExitCB(e, dC.status, dC.err)
+ case <-time.After(time.Duration(e.CmdExecTimeout) * time.Second):
+ e.ExitCB(e, -999, fmt.Errorf("Exit Timeout for command ID %v", e.CmdID))
+ }
+}
diff --git a/eows/eows-out.go b/eows/eows-out.go
new file mode 100644
index 0000000..3163b2f
--- /dev/null
+++ b/eows/eows-out.go
@@ -0,0 +1,122 @@
+package eows
+
+import (
+ "bufio"
+ "io"
+ "strings"
+ "time"
+)
+
+// scanChars - gain character by character (or as soon as one or more characters are available)
+func scanChars(data []byte, atEOF bool) (advance int, token []byte, err error) {
+ if atEOF && len(data) == 0 {
+ return 0, nil, nil
+ }
+ return len(data), data, nil
+}
+
+// _pumper is in charge to collect
+func (e *ExecOverWS) _pumper(sc *bufio.Scanner, fctCB func(s string)) {
+
+ // Select split function (default sc.ScanLines)
+ if e.OutSplit == SplitChar || e.OutSplit == SplitLineTime || e.OutSplit == SplitTime {
+ sc.Split(scanChars)
+ }
+
+ // Scan method according to split type
+ if e.OutSplit == SplitLineTime || e.OutSplit == SplitTime {
+ t0 := time.Now()
+ buf := ""
+ for sc.Scan() {
+ buf += sc.Text()
+ if time.Since(t0).Nanoseconds() > e.LineTimeSpan ||
+ (e.OutSplit == SplitLineTime && strings.Contains(buf, "\n")) {
+ fctCB(buf)
+ buf = ""
+ t0 = time.Now()
+ }
+ if e.procExited {
+ break
+ }
+ }
+ // Send remaining characters
+ if len(buf) > 0 {
+ fctCB(buf)
+ }
+
+ } else {
+
+ for sc.Scan() {
+ fctCB(sc.Text())
+ if e.procExited {
+ break
+ }
+ }
+ }
+
+}
+
+// pipePumpStdout is in charge to forward stdout in websocket
+func (e *ExecOverWS) pipePumpStdout(r io.Reader, done chan struct{}) {
+
+ sc := bufio.NewScanner(r)
+
+ e._pumper(sc, func(b string) {
+ if e.OutputCB != nil {
+ e.OutputCB(e, []byte(b), []byte{})
+ }
+ })
+
+ e.logDebug("STDOUT pump exit")
+
+ if sc.Err() != nil && !strings.Contains(sc.Err().Error(), "file already closed") {
+ e.logError("stdout scan: %v", sc.Err())
+ }
+
+ close(done)
+}
+
+// pipePumpStderr is in charge to forward stderr in websocket
+func (e *ExecOverWS) pipePumpStderr(r io.Reader) {
+
+ sc := bufio.NewScanner(r)
+
+ e._pumper(sc, func(b string) {
+ if e.OutputCB != nil {
+ e.OutputCB(e, []byte{}, []byte(b))
+ }
+ })
+
+ e.logDebug("STDERR pump exit")
+
+ if sc.Err() != nil && !strings.Contains(sc.Err().Error(), "file already closed") {
+ e.logError("stderr scan: %v", sc.Err())
+ }
+}
+
+// ptsPumpStdout is in charge to forward stdout in websocket
+// (only used when PtyMode is set)
+func (e *ExecOverWS) ptsPumpStdout(r io.Reader, done chan struct{}) {
+
+ buffer := make([]byte, 1024)
+ for {
+ n, err := r.Read(buffer)
+ if err != nil {
+ if err != io.EOF &&
+ !strings.Contains(err.Error(), "file already closed") {
+ e.logError("Error stdout read: %v", err)
+ }
+ break
+ }
+ if n == 0 {
+ continue
+ }
+ if e.OutputCB != nil {
+ e.OutputCB(e, buffer[:n], []byte{})
+ }
+ }
+
+ close(done)
+
+ e.logDebug("Eows stdout pump exited")
+}
diff --git a/eows/eows-signal.go b/eows/eows-signal.go
new file mode 100644
index 0000000..f48279a
--- /dev/null
+++ b/eows/eows-signal.go
@@ -0,0 +1,48 @@
+// +build !windows
+
+// Package eows is used to Execute commands Over WebSocket
+package eows
+
+import (
+ "fmt"
+ "os"
+ "syscall"
+)
+
+// Signal sends a signal to the running command / process
+func (e *ExecOverWS) Signal(signal string) error {
+ var sig os.Signal
+ switch signal {
+ case "quit", "SIGQUIT":
+ sig = syscall.SIGQUIT
+ case "terminated", "SIGTERM":
+ sig = syscall.SIGTERM
+ case "interrupt", "SIGINT":
+ sig = syscall.SIGINT
+ case "aborted", "SIGABRT":
+ sig = syscall.SIGABRT
+ case "continued", "SIGCONT":
+ sig = syscall.SIGCONT
+ case "hangup", "SIGHUP":
+ sig = syscall.SIGHUP
+ case "killed", "SIGKILL":
+ sig = syscall.SIGKILL
+ case "stopped (signal)", "SIGSTOP":
+ sig = syscall.SIGSTOP
+ case "stopped", "SIGTSTP":
+ sig = syscall.SIGTSTP
+ case "user defined signal 1", "SIGUSR1":
+ sig = syscall.SIGUSR1
+ case "user defined signal 2", "SIGUSR2":
+ sig = syscall.SIGUSR2
+ default:
+ return fmt.Errorf("Unsupported signal")
+ }
+
+ if e.proc == nil {
+ return fmt.Errorf("Cannot retrieve process")
+ }
+
+ e.logDebug("SEND signal %v to proc %v", sig, e.proc.Pid)
+ return e.proc.Signal(sig)
+}
diff --git a/eows/eows-signal_windows.go b/eows/eows-signal_windows.go
new file mode 100644
index 0000000..23ad924
--- /dev/null
+++ b/eows/eows-signal_windows.go
@@ -0,0 +1,43 @@
+package eows
+
+// +build windows
+
+// Signal sends a signal to the running command / process
+func (e *ExecOverWS) Signal(signal string) error {
+ panic("FIXME: Not implemented")
+ /*
+ var sig os.Signal
+ switch signal {
+ case "quit", "SIGQUIT":
+ sig = syscall.SIGQUIT
+ case "terminated", "SIGTERM":
+ sig = syscall.SIGTERM
+ case "interrupt", "SIGINT":
+ sig = syscall.SIGINT
+ case "aborted", "SIGABRT":
+ sig = syscall.SIGABRT
+ case "continued", "SIGCONT":
+ sig = syscall.SIGCONT
+ case "hangup", "SIGHUP":
+ sig = syscall.SIGHUP
+ case "killed", "SIGKILL":
+ sig = syscall.SIGKILL
+ case "stopped (signal)", "SIGSTOP":
+ sig = syscall.SIGSTOP
+ case "stopped", "SIGTSTP":
+ sig = syscall.SIGTSTP
+ case "user defined signal 1", "SIGUSR1":
+ sig = syscall.SIGUSR1
+ case "user defined signal 2", "SIGUSR2":
+ sig = syscall.SIGUSR2
+ default:
+ return fmt.Errorf("Unsupported signal")
+ }
+
+ if e.proc == nil {
+ return fmt.Errorf("Cannot retrieve process")
+ }
+ fmt.Printf("SEND signal %v to proc %v\n", sig, e.proc.Pid)
+ return e.proc.Signal(sig)
+ */
+}
diff --git a/eows/eows.go b/eows/eows.go
new file mode 100644
index 0000000..9d0b520
--- /dev/null
+++ b/eows/eows.go
@@ -0,0 +1,287 @@
+// Package eows is used to Execute commands Over WebSocket
+package eows
+
+import (
+ "fmt"
+ "os"
+ "os/exec"
+ "strings"
+ "syscall"
+ "time"
+ "unsafe"
+
+ "github.com/Sirupsen/logrus"
+ "github.com/googollee/go-socket.io"
+ "github.com/kr/pty"
+)
+
+// OnInputCB is the function callback used to receive data
+type OnInputCB func(e *ExecOverWS, stdin []byte) ([]byte, error)
+
+// EmitOutputCB is the function callback used to emit data
+type EmitOutputCB func(e *ExecOverWS, stdout, stderr []byte)
+
+// EmitExitCB is the function callback used to emit exit proc code
+type EmitExitCB func(e *ExecOverWS, code int, err error)
+
+// SplitType Type of spliting method to tokenize stdout/stderr
+type SplitType uint8
+
+const (
+ // SplitLine Split line by line
+ SplitLine SplitType = iota
+ // SplitChar Split character by character
+ SplitChar
+ // SplitLineTime Split by line or until a timeout has passed
+ SplitLineTime
+ // SplitTime Split until a timeout has passed
+ SplitTime
+)
+
+// Inspired by :
+// https://github.com/gorilla/websocket/blob/master/examples/command/main.go
+
+// ExecOverWS .
+type ExecOverWS struct {
+ Cmd string // command name to execute
+ Args []string // command arguments
+ SocketIO *socketio.Socket // websocket
+ Sid string // websocket ID
+ CmdID string // command ID
+
+ // Optional fields
+ Env []string // command environment variables
+ CmdExecTimeout int // command execution time timeout
+ Log *logrus.Logger // logger (nil if disabled)
+ InputEvent string // websocket input event name
+ InputCB OnInputCB // stdin callback
+ OutputCB EmitOutputCB // stdout/stderr callback
+ ExitCB EmitExitCB // exit proc callback
+ UserData *map[string]interface{} // user data passed to callbacks
+ OutSplit SplitType // split method to tokenize stdout/stderr
+ LineTimeSpan int64 // time span (only used with SplitTime or SplitLineTime)
+ PtyMode bool // Allocate a pseudo-terminal (allow to execute screen-based program)
+ PtyTermEcho bool // Turn on/off terminal echo
+
+ // Private fields
+
+ proc *os.Process
+ command *exec.Cmd
+ ptmx *os.File
+ procExited bool
+}
+
+var cmdIDMap = make(map[string]*ExecOverWS)
+
+// New creates a new instace of eows
+func New(cmd string, args []string, so *socketio.Socket, soID, cmdID string) *ExecOverWS {
+
+ e := &ExecOverWS{
+ Cmd: cmd,
+ Args: args,
+ SocketIO: so,
+ Sid: soID,
+ CmdID: cmdID,
+ CmdExecTimeout: -1, // default no timeout
+ OutSplit: SplitLineTime, // default split by line with time
+ LineTimeSpan: 500 * time.Millisecond.Nanoseconds(),
+ PtyMode: false,
+ PtyTermEcho: true,
+ }
+
+ cmdIDMap[cmdID] = e
+
+ return e
+}
+
+// GetEows gets ExecOverWS object from command ID
+func GetEows(cmdID string) *ExecOverWS {
+ if _, ok := cmdIDMap[cmdID]; !ok {
+ return nil
+ }
+ return cmdIDMap[cmdID]
+}
+
+// Start executes the command and redirect stdout/stderr into a WebSocket
+func (e *ExecOverWS) Start() error {
+ var err error
+ var outr, outw, errr, errw, inr, inw *os.File
+
+ bashArgs := []string{"/bin/bash", "-c", e.Cmd + " " + strings.Join(e.Args, " ")}
+
+ // no timeout == 1 year
+ if e.CmdExecTimeout == -1 {
+ e.CmdExecTimeout = 365 * 24 * 60 * 60
+ }
+
+ e.procExited = false
+
+ if e.PtyMode {
+
+ e.command = exec.Command(bashArgs[0], bashArgs[1:]...)
+ e.command.Env = append(os.Environ(), e.Env...)
+ e.ptmx, err = pty.Start(e.command)
+ if err != nil {
+ err = fmt.Errorf("Process start error: " + err.Error())
+ goto exitErr
+ }
+ e.proc = e.command.Process
+
+ // Turn off terminal echo
+ if !e.PtyTermEcho {
+ e.terminalEcho(e.ptmx, false)
+ }
+
+ } else {
+
+ // Create pipes
+ outr, outw, err = os.Pipe()
+ if err != nil {
+ err = fmt.Errorf("Pipe stdout error: " + err.Error())
+ goto exitErr
+ }
+
+ errr, errw, err = os.Pipe()
+ if err != nil {
+ err = fmt.Errorf("Pipe stderr error: " + err.Error())
+ goto exitErr
+ }
+
+ inr, inw, err = os.Pipe()
+ if err != nil {
+ err = fmt.Errorf("Pipe stdin error: " + err.Error())
+ goto exitErr
+ }
+
+ e.proc, err = os.StartProcess(bashArgs[0], bashArgs, &os.ProcAttr{
+ Files: []*os.File{inr, outw, errw},
+ Env: append(os.Environ(), e.Env...),
+ })
+ if err != nil {
+ err = fmt.Errorf("Process start error: " + err.Error())
+ goto exitErr
+ }
+ }
+
+ go func() {
+ stdoutDone := make(chan struct{})
+
+ if e.PtyMode {
+ // Make sure to close the pty at the end.
+ defer e.ptmx.Close()
+
+ // Handle both stdout mixed with stderr
+ go e.ptsPumpStdout(e.ptmx, stdoutDone)
+
+ // Blocking function that poll input or wait for end of process
+ e.pumpStdin(e.ptmx)
+
+ } else {
+ // Make sure to close all pipes
+ defer outr.Close()
+ defer outw.Close()
+ defer errr.Close()
+ defer errw.Close()
+ defer inr.Close()
+ defer inw.Close()
+
+ // Handle stdout + stderr
+ go e.pipePumpStdout(outr, stdoutDone)
+ go e.pipePumpStderr(errr)
+
+ // Blocking function that poll input or wait for end of process
+ e.pumpStdin(inw)
+ }
+
+ if status, err := e.proc.Wait(); err == nil {
+ // Other commands need a bonk on the head.
+ if !status.Exited() {
+ if err := e.proc.Signal(os.Interrupt); err != nil {
+ e.logError("Proc interrupt:", err)
+ }
+
+ select {
+ case <-stdoutDone:
+ case <-time.After(time.Second):
+ // A bigger bonk on the head.
+ if err := e.proc.Signal(os.Kill); err != nil {
+ e.logError("Proc term:", err)
+ }
+ <-stdoutDone
+ }
+ }
+ }
+
+ delete(cmdIDMap, e.CmdID)
+ }()
+
+ return nil
+
+exitErr:
+ for _, pf := range []*os.File{outr, outw, errr, errw, inr, inw} {
+ pf.Close()
+ }
+ return err
+}
+
+// TerminalSetSize Set terminal size
+func (e *ExecOverWS) TerminalSetSize(rows, cols uint16) error {
+ if !e.PtyMode || e.ptmx == nil {
+ return fmt.Errorf("PtyMode not set")
+ }
+ w, err := pty.GetsizeFull(e.ptmx)
+ if err != nil {
+ return err
+ }
+ return e.TerminalSetSizePos(rows, cols, w.X, w.Y)
+}
+
+// TerminalSetSizePos Set terminal size and position
+func (e *ExecOverWS) TerminalSetSizePos(rows, cols, x, y uint16) error {
+ if !e.PtyMode || e.ptmx == nil {
+ return fmt.Errorf("PtyMode not set")
+ }
+ winSz := pty.Winsize{Rows: rows, Cols: cols, X: x, Y: y}
+ return pty.Setsize(e.ptmx, &winSz)
+}
+
+/**
+ * Private functions
+ **/
+
+// terminalEcho Enable or disable echoing terminal input.
+// This is useful specifically for when users enter passwords.
+func (e *ExecOverWS) terminalEcho(ff *os.File, show bool) {
+ var termios = &syscall.Termios{}
+
+ fd := ff.Fd()
+
+ if _, _, err := syscall.Syscall(syscall.SYS_IOCTL, fd,
+ syscall.TCGETS, uintptr(unsafe.Pointer(termios))); err != 0 {
+ return
+ }
+
+ if show {
+ termios.Lflag |= syscall.ECHO
+ } else {
+ termios.Lflag &^= syscall.ECHO
+ }
+
+ if _, _, err := syscall.Syscall(syscall.SYS_IOCTL, fd,
+ uintptr(syscall.TCSETS),
+ uintptr(unsafe.Pointer(termios))); err != 0 {
+ return
+ }
+}
+
+func (e *ExecOverWS) logDebug(format string, a ...interface{}) {
+ if e.Log != nil {
+ e.Log.Debugf(format, a...)
+ }
+}
+
+func (e *ExecOverWS) logError(format string, a ...interface{}) {
+ if e.Log != nil {
+ e.Log.Errorf(format, a...)
+ }
+}