From 7496dbabaf710a9e0f3b599c83163adddfcb8870 Mon Sep 17 00:00:00 2001 From: Sebastien Douheret Date: Mon, 7 Aug 2017 18:08:27 +0200 Subject: Added eows (Exec Over WebSocket) package. --- .vscode/settings.json | 12 +++ glide.yaml | 12 +++ golib/eows/eows-in.go | 56 ++++++++++++ golib/eows/eows-out.go | 47 ++++++++++ golib/eows/eows-signal.go | 48 +++++++++++ golib/eows/eows-signal_windows.go | 47 ++++++++++ golib/eows/eows.go | 176 ++++++++++++++++++++++++++++++++++++++ 7 files changed, 398 insertions(+) create mode 100644 .vscode/settings.json create mode 100644 glide.yaml create mode 100644 golib/eows/eows-in.go create mode 100644 golib/eows/eows-out.go create mode 100644 golib/eows/eows-signal.go create mode 100644 golib/eows/eows-signal_windows.go create mode 100644 golib/eows/eows.go diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..093e927 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,12 @@ +// Place your settings in this file to overwrite default and user settings. +{ + // Pick 'gofmt', 'goimports' or 'goreturns' to run on format. + "go.formatTool": "gofmt", + + // Words to add to dictionary for a workspace. + "cSpell.words": [ + "eows", + "socketio" + ] + +} diff --git a/glide.yaml b/glide.yaml new file mode 100644 index 0000000..4add0ff --- /dev/null +++ b/glide.yaml @@ -0,0 +1,12 @@ +package: github.com/iotbzh/xds-common +license: Apache-2 +owners: +- name: Sebastien Douheret + email: sebastien@iot.bzh +import: +- package: github.com/gin-gonic/gin + version: ^1.1.4 +- package: github.com/Sirupsen/logrus + version: ^0.11.5 +- package: github.com/googollee/go-socket.io +- package: github.com/zhouhui8915/go-socket.io-client diff --git a/golib/eows/eows-in.go b/golib/eows/eows-in.go new file mode 100644 index 0000000..1ecd2a1 --- /dev/null +++ b/golib/eows/eows-in.go @@ -0,0 +1,56 @@ +package eows + +import ( + "fmt" + "os" + "syscall" + "time" +) + +type DoneChan struct { + status int + err error +} + +// cmdPumpStdin is in charge of receive characters and send them to stdin +func (e *ExecOverWS) cmdPumpStdin(inw *os.File) { + + done := make(chan DoneChan, 1) + + if e.InputEvent != "" && e.InputCB != nil { + + err := (*e.SocketIO).On(e.InputEvent, func(stdin string) { + in, err := e.InputCB(e, string(stdin)) + if err != nil { + e.logDebug("Error stdin: %s", err.Error()) + inw.Close() + return + } + if _, err := inw.Write([]byte(in)); err != nil { + e.logError("Error while writing to stdin: %s", err.Error()) + } + }) + if err != nil { + e.logError("Error stdin on event: %s", err.Error()) + } + } + + // Monitor process exit + go func() { + status := 0 + sts, err := e.proc.Wait() + if !sts.Success() { + s := sts.Sys().(syscall.WaitStatus) + status = s.ExitStatus() + } + done <- DoneChan{status, err} + }() + + // Wait cmd complete + select { + case dC := <-done: + e.ExitCB(e, dC.status, dC.err) + case <-time.After(time.Duration(e.CmdExecTimeout) * time.Second): + e.ExitCB(e, -999, fmt.Errorf("Exit Timeout for command ID %v", e.CmdID)) + } +} diff --git a/golib/eows/eows-out.go b/golib/eows/eows-out.go new file mode 100644 index 0000000..d624618 --- /dev/null +++ b/golib/eows/eows-out.go @@ -0,0 +1,47 @@ +package eows + +import ( + "bufio" + "io" +) + +// scanBlocks +func scanBlocks(data []byte, atEOF bool) (advance int, token []byte, err error) { + if atEOF && len(data) == 0 { + return 0, nil, nil + } + return len(data), data, nil +} + +// cmdPumpStdout is in charge to forward stdout in websocket +func (e *ExecOverWS) cmdPumpStdout(r io.Reader, done chan struct{}) { + + defer func() { + }() + + sc := bufio.NewScanner(r) + sc.Split(scanBlocks) + for sc.Scan() { + e.OutputCB(e, sc.Text(), "") + } + if sc.Err() != nil { + e.logError("stdout scan:", sc.Err()) + } + + close(done) +} + +// cmdPumpStderr is in charge to forward stderr in websocket +func (e *ExecOverWS) cmdPumpStderr(r io.Reader) { + + defer func() { + }() + sc := bufio.NewScanner(r) + sc.Split(scanBlocks) + for sc.Scan() { + e.OutputCB(e, "", sc.Text()) + } + if sc.Err() != nil { + e.logError("stderr scan:", sc.Err()) + } +} diff --git a/golib/eows/eows-signal.go b/golib/eows/eows-signal.go new file mode 100644 index 0000000..5db5366 --- /dev/null +++ b/golib/eows/eows-signal.go @@ -0,0 +1,48 @@ +// +build !windows + +// Package eows is used to Execute commands Over WebSocket +package eows + +import ( + "fmt" + "os" + "syscall" +) + +// Signal sends a signal to the running command / process +func (e *ExecOverWS) Signal(signal string) error { + var sig os.Signal + switch signal { + case "quit", "SIGQUIT": + sig = syscall.SIGQUIT + case "terminated", "SIGTERM": + sig = syscall.SIGTERM + case "interrupt", "SIGINT": + sig = syscall.SIGINT + case "aborted", "SIGABRT": + sig = syscall.SIGABRT + case "continued", "SIGCONT": + sig = syscall.SIGCONT + case "hangup", "SIGHUP": + sig = syscall.SIGHUP + case "killed", "SIGKILL": + sig = syscall.SIGKILL + case "stopped (signal)", "SIGSTOP": + sig = syscall.SIGSTOP + case "stopped", "SIGTSTP": + sig = syscall.SIGTSTP + case "user defined signal 1", "SIGUSR1": + sig = syscall.SIGUSR1 + case "user defined signal 2", "SIGUSR2": + sig = syscall.SIGUSR2 + default: + return fmt.Errorf("Unsupported signal") + } + + if e.proc == nil { + return fmt.Errorf("Cannot retrieve process") + } + + fmt.Printf("SEND signal %v to proc %v\n", sig, e.proc.Pid) + return e.proc.Signal(sig) +} diff --git a/golib/eows/eows-signal_windows.go b/golib/eows/eows-signal_windows.go new file mode 100644 index 0000000..dff44f2 --- /dev/null +++ b/golib/eows/eows-signal_windows.go @@ -0,0 +1,47 @@ +package eows + +// +build windows + +import ( + "fmt" +) + +// Signal sends a signal to the running command / process +func Signal(signal string) error { + panic("FIXME: Not implemented") + /* + var sig os.Signal + switch signal { + case "quit", "SIGQUIT": + sig = syscall.SIGQUIT + case "terminated", "SIGTERM": + sig = syscall.SIGTERM + case "interrupt", "SIGINT": + sig = syscall.SIGINT + case "aborted", "SIGABRT": + sig = syscall.SIGABRT + case "continued", "SIGCONT": + sig = syscall.SIGCONT + case "hangup", "SIGHUP": + sig = syscall.SIGHUP + case "killed", "SIGKILL": + sig = syscall.SIGKILL + case "stopped (signal)", "SIGSTOP": + sig = syscall.SIGSTOP + case "stopped", "SIGTSTP": + sig = syscall.SIGTSTP + case "user defined signal 1", "SIGUSR1": + sig = syscall.SIGUSR1 + case "user defined signal 2", "SIGUSR2": + sig = syscall.SIGUSR2 + default: + return fmt.Errorf("Unsupported signal") + } + + if e.proc == nil { + return fmt.Errorf("Cannot retrieve process") + } + fmt.Printf("SEND signal %v to proc %v\n", sig, e.proc.Pid) + return e.proc.Signal(sig) + */ +} diff --git a/golib/eows/eows.go b/golib/eows/eows.go new file mode 100644 index 0000000..6fc3550 --- /dev/null +++ b/golib/eows/eows.go @@ -0,0 +1,176 @@ +// Package eows is used to Execute commands Over WebSocket +package eows + +import ( + "fmt" + "os" + "strings" + "time" + + "github.com/Sirupsen/logrus" + "github.com/googollee/go-socket.io" +) + +// OnInputCB is the function callback used to receive data +type OnInputCB func(e *ExecOverWS, stdin string) (string, error) + +// EmitOutputCB is the function callback used to emit data +type EmitOutputCB func(e *ExecOverWS, stdout, stderr string) + +// EmitExitCB is the function callback used to emit exit proc code +type EmitExitCB func(e *ExecOverWS, code int, err error) + +// Inspired by : +// https://github.com/gorilla/websocket/blob/master/examples/command/main.go + +// ExecOverWS . +type ExecOverWS struct { + Cmd string // command name to execute + Args []string // command arguments + SocketIO *socketio.Socket // websocket + Sid string // websocket ID + CmdID string // command ID + + // Optional fields + Env []string // command environment variables + CmdExecTimeout int // command execution time timeout + Log *logrus.Logger // logger (nil if disabled) + InputEvent string // websocket input event name + InputCB OnInputCB // stdin callback + OutputCB EmitOutputCB // stdout/stderr callback + ExitCB EmitExitCB // exit proc callback + UserData *map[string]interface{} // user data passed to callbacks + + // Private fields + proc *os.Process +} + +var cmdIDMap = make(map[string]*ExecOverWS) + +// New creates a new instace of eows +func New(cmd string, args []string, so *socketio.Socket, soID, cmdID string) *ExecOverWS { + + e := &ExecOverWS{ + Cmd: cmd, + Args: args, + SocketIO: so, + Sid: soID, + CmdID: cmdID, + CmdExecTimeout: -1, // default no timeout + } + + cmdIDMap[cmdID] = e + + return e +} + +// GetEows gets ExecOverWS object from command ID +func GetEows(cmdID string) *ExecOverWS { + if _, ok := cmdIDMap[cmdID]; !ok { + return nil + } + return cmdIDMap[cmdID] +} + +// Start executes the command and redirect stdout/stderr into a WebSocket +func (e *ExecOverWS) Start() error { + var err error + var outr, outw, errr, errw, inr, inw *os.File + + bashArgs := []string{"/bin/bash", "-c", e.Cmd + " " + strings.Join(e.Args, " ")} + + // no timeout == 1 year + if e.CmdExecTimeout == -1 { + e.CmdExecTimeout = 365 * 24 * 60 * 60 + } + + // Create pipes + outr, outw, err = os.Pipe() + if err != nil { + err = fmt.Errorf("Pipe stdout error: " + err.Error()) + goto exitErr + } + + errr, errw, err = os.Pipe() + if err != nil { + err = fmt.Errorf("Pipe stderr error: " + err.Error()) + goto exitErr + } + + inr, inw, err = os.Pipe() + if err != nil { + err = fmt.Errorf("Pipe stdin error: " + err.Error()) + goto exitErr + } + + e.proc, err = os.StartProcess("/bin/bash", bashArgs, &os.ProcAttr{ + Files: []*os.File{inr, outw, errw}, + Env: append(os.Environ(), e.Env...), + }) + if err != nil { + err = fmt.Errorf("Process start error: " + err.Error()) + goto exitErr + } + + go func() { + defer outr.Close() + defer outw.Close() + defer errr.Close() + defer errw.Close() + defer inr.Close() + defer inw.Close() + + stdoutDone := make(chan struct{}) + go e.cmdPumpStdout(outr, stdoutDone) + go e.cmdPumpStderr(errr) + + // Blocking function that poll input or wait for end of process + e.cmdPumpStdin(inw) + + // Some commands will exit when stdin is closed. + inw.Close() + + defer outr.Close() + + if status, err := e.proc.Wait(); err == nil { + // Other commands need a bonk on the head. + if !status.Exited() { + if err := e.proc.Signal(os.Interrupt); err != nil { + e.logError("Proc interrupt:", err) + } + + select { + case <-stdoutDone: + case <-time.After(time.Second): + // A bigger bonk on the head. + if err := e.proc.Signal(os.Kill); err != nil { + e.logError("Proc term:", err) + } + <-stdoutDone + } + } + } + + delete(cmdIDMap, e.CmdID) + }() + + return nil + +exitErr: + for _, pf := range []*os.File{outr, outw, errr, errw, inr, inw} { + pf.Close() + } + return err +} + +func (e *ExecOverWS) logDebug(format string, a ...interface{}) { + if e.Log != nil { + e.Log.Debugf(format, a) + } +} + +func (e *ExecOverWS) logError(format string, a ...interface{}) { + if e.Log != nil { + e.Log.Errorf(format, a) + } +} -- cgit 1.2.3-korg