summaryrefslogtreecommitdiffstats
path: root/golib
diff options
context:
space:
mode:
authorSebastien Douheret <sebastien.douheret@iot.bzh>2017-08-07 18:08:27 +0200
committerSebastien Douheret <sebastien.douheret@iot.bzh>2017-08-07 19:30:25 +0200
commit7496dbabaf710a9e0f3b599c83163adddfcb8870 (patch)
treed2ab9f9f89edc0f381a71a967d920cef3e150be5 /golib
parent62e2996fcbcd704653d3043046c451fbc044918b (diff)
Added eows (Exec Over WebSocket) package.
Diffstat (limited to 'golib')
-rw-r--r--golib/eows/eows-in.go56
-rw-r--r--golib/eows/eows-out.go47
-rw-r--r--golib/eows/eows-signal.go48
-rw-r--r--golib/eows/eows-signal_windows.go47
-rw-r--r--golib/eows/eows.go176
5 files changed, 374 insertions, 0 deletions
diff --git a/golib/eows/eows-in.go b/golib/eows/eows-in.go
new file mode 100644
index 0000000..1ecd2a1
--- /dev/null
+++ b/golib/eows/eows-in.go
@@ -0,0 +1,56 @@
+package eows
+
+import (
+ "fmt"
+ "os"
+ "syscall"
+ "time"
+)
+
+type DoneChan struct {
+ status int
+ err error
+}
+
+// cmdPumpStdin is in charge of receive characters and send them to stdin
+func (e *ExecOverWS) cmdPumpStdin(inw *os.File) {
+
+ done := make(chan DoneChan, 1)
+
+ if e.InputEvent != "" && e.InputCB != nil {
+
+ err := (*e.SocketIO).On(e.InputEvent, func(stdin string) {
+ in, err := e.InputCB(e, string(stdin))
+ if err != nil {
+ e.logDebug("Error stdin: %s", err.Error())
+ inw.Close()
+ return
+ }
+ if _, err := inw.Write([]byte(in)); err != nil {
+ e.logError("Error while writing to stdin: %s", err.Error())
+ }
+ })
+ if err != nil {
+ e.logError("Error stdin on event: %s", err.Error())
+ }
+ }
+
+ // Monitor process exit
+ go func() {
+ status := 0
+ sts, err := e.proc.Wait()
+ if !sts.Success() {
+ s := sts.Sys().(syscall.WaitStatus)
+ status = s.ExitStatus()
+ }
+ done <- DoneChan{status, err}
+ }()
+
+ // Wait cmd complete
+ select {
+ case dC := <-done:
+ e.ExitCB(e, dC.status, dC.err)
+ case <-time.After(time.Duration(e.CmdExecTimeout) * time.Second):
+ e.ExitCB(e, -999, fmt.Errorf("Exit Timeout for command ID %v", e.CmdID))
+ }
+}
diff --git a/golib/eows/eows-out.go b/golib/eows/eows-out.go
new file mode 100644
index 0000000..d624618
--- /dev/null
+++ b/golib/eows/eows-out.go
@@ -0,0 +1,47 @@
+package eows
+
+import (
+ "bufio"
+ "io"
+)
+
+// scanBlocks
+func scanBlocks(data []byte, atEOF bool) (advance int, token []byte, err error) {
+ if atEOF && len(data) == 0 {
+ return 0, nil, nil
+ }
+ return len(data), data, nil
+}
+
+// cmdPumpStdout is in charge to forward stdout in websocket
+func (e *ExecOverWS) cmdPumpStdout(r io.Reader, done chan struct{}) {
+
+ defer func() {
+ }()
+
+ sc := bufio.NewScanner(r)
+ sc.Split(scanBlocks)
+ for sc.Scan() {
+ e.OutputCB(e, sc.Text(), "")
+ }
+ if sc.Err() != nil {
+ e.logError("stdout scan:", sc.Err())
+ }
+
+ close(done)
+}
+
+// cmdPumpStderr is in charge to forward stderr in websocket
+func (e *ExecOverWS) cmdPumpStderr(r io.Reader) {
+
+ defer func() {
+ }()
+ sc := bufio.NewScanner(r)
+ sc.Split(scanBlocks)
+ for sc.Scan() {
+ e.OutputCB(e, "", sc.Text())
+ }
+ if sc.Err() != nil {
+ e.logError("stderr scan:", sc.Err())
+ }
+}
diff --git a/golib/eows/eows-signal.go b/golib/eows/eows-signal.go
new file mode 100644
index 0000000..5db5366
--- /dev/null
+++ b/golib/eows/eows-signal.go
@@ -0,0 +1,48 @@
+// +build !windows
+
+// Package eows is used to Execute commands Over WebSocket
+package eows
+
+import (
+ "fmt"
+ "os"
+ "syscall"
+)
+
+// Signal sends a signal to the running command / process
+func (e *ExecOverWS) Signal(signal string) error {
+ var sig os.Signal
+ switch signal {
+ case "quit", "SIGQUIT":
+ sig = syscall.SIGQUIT
+ case "terminated", "SIGTERM":
+ sig = syscall.SIGTERM
+ case "interrupt", "SIGINT":
+ sig = syscall.SIGINT
+ case "aborted", "SIGABRT":
+ sig = syscall.SIGABRT
+ case "continued", "SIGCONT":
+ sig = syscall.SIGCONT
+ case "hangup", "SIGHUP":
+ sig = syscall.SIGHUP
+ case "killed", "SIGKILL":
+ sig = syscall.SIGKILL
+ case "stopped (signal)", "SIGSTOP":
+ sig = syscall.SIGSTOP
+ case "stopped", "SIGTSTP":
+ sig = syscall.SIGTSTP
+ case "user defined signal 1", "SIGUSR1":
+ sig = syscall.SIGUSR1
+ case "user defined signal 2", "SIGUSR2":
+ sig = syscall.SIGUSR2
+ default:
+ return fmt.Errorf("Unsupported signal")
+ }
+
+ if e.proc == nil {
+ return fmt.Errorf("Cannot retrieve process")
+ }
+
+ fmt.Printf("SEND signal %v to proc %v\n", sig, e.proc.Pid)
+ return e.proc.Signal(sig)
+}
diff --git a/golib/eows/eows-signal_windows.go b/golib/eows/eows-signal_windows.go
new file mode 100644
index 0000000..dff44f2
--- /dev/null
+++ b/golib/eows/eows-signal_windows.go
@@ -0,0 +1,47 @@
+package eows
+
+// +build windows
+
+import (
+ "fmt"
+)
+
+// Signal sends a signal to the running command / process
+func Signal(signal string) error {
+ panic("FIXME: Not implemented")
+ /*
+ var sig os.Signal
+ switch signal {
+ case "quit", "SIGQUIT":
+ sig = syscall.SIGQUIT
+ case "terminated", "SIGTERM":
+ sig = syscall.SIGTERM
+ case "interrupt", "SIGINT":
+ sig = syscall.SIGINT
+ case "aborted", "SIGABRT":
+ sig = syscall.SIGABRT
+ case "continued", "SIGCONT":
+ sig = syscall.SIGCONT
+ case "hangup", "SIGHUP":
+ sig = syscall.SIGHUP
+ case "killed", "SIGKILL":
+ sig = syscall.SIGKILL
+ case "stopped (signal)", "SIGSTOP":
+ sig = syscall.SIGSTOP
+ case "stopped", "SIGTSTP":
+ sig = syscall.SIGTSTP
+ case "user defined signal 1", "SIGUSR1":
+ sig = syscall.SIGUSR1
+ case "user defined signal 2", "SIGUSR2":
+ sig = syscall.SIGUSR2
+ default:
+ return fmt.Errorf("Unsupported signal")
+ }
+
+ if e.proc == nil {
+ return fmt.Errorf("Cannot retrieve process")
+ }
+ fmt.Printf("SEND signal %v to proc %v\n", sig, e.proc.Pid)
+ return e.proc.Signal(sig)
+ */
+}
diff --git a/golib/eows/eows.go b/golib/eows/eows.go
new file mode 100644
index 0000000..6fc3550
--- /dev/null
+++ b/golib/eows/eows.go
@@ -0,0 +1,176 @@
+// Package eows is used to Execute commands Over WebSocket
+package eows
+
+import (
+ "fmt"
+ "os"
+ "strings"
+ "time"
+
+ "github.com/Sirupsen/logrus"
+ "github.com/googollee/go-socket.io"
+)
+
+// OnInputCB is the function callback used to receive data
+type OnInputCB func(e *ExecOverWS, stdin string) (string, error)
+
+// EmitOutputCB is the function callback used to emit data
+type EmitOutputCB func(e *ExecOverWS, stdout, stderr string)
+
+// EmitExitCB is the function callback used to emit exit proc code
+type EmitExitCB func(e *ExecOverWS, code int, err error)
+
+// Inspired by :
+// https://github.com/gorilla/websocket/blob/master/examples/command/main.go
+
+// ExecOverWS .
+type ExecOverWS struct {
+ Cmd string // command name to execute
+ Args []string // command arguments
+ SocketIO *socketio.Socket // websocket
+ Sid string // websocket ID
+ CmdID string // command ID
+
+ // Optional fields
+ Env []string // command environment variables
+ CmdExecTimeout int // command execution time timeout
+ Log *logrus.Logger // logger (nil if disabled)
+ InputEvent string // websocket input event name
+ InputCB OnInputCB // stdin callback
+ OutputCB EmitOutputCB // stdout/stderr callback
+ ExitCB EmitExitCB // exit proc callback
+ UserData *map[string]interface{} // user data passed to callbacks
+
+ // Private fields
+ proc *os.Process
+}
+
+var cmdIDMap = make(map[string]*ExecOverWS)
+
+// New creates a new instace of eows
+func New(cmd string, args []string, so *socketio.Socket, soID, cmdID string) *ExecOverWS {
+
+ e := &ExecOverWS{
+ Cmd: cmd,
+ Args: args,
+ SocketIO: so,
+ Sid: soID,
+ CmdID: cmdID,
+ CmdExecTimeout: -1, // default no timeout
+ }
+
+ cmdIDMap[cmdID] = e
+
+ return e
+}
+
+// GetEows gets ExecOverWS object from command ID
+func GetEows(cmdID string) *ExecOverWS {
+ if _, ok := cmdIDMap[cmdID]; !ok {
+ return nil
+ }
+ return cmdIDMap[cmdID]
+}
+
+// Start executes the command and redirect stdout/stderr into a WebSocket
+func (e *ExecOverWS) Start() error {
+ var err error
+ var outr, outw, errr, errw, inr, inw *os.File
+
+ bashArgs := []string{"/bin/bash", "-c", e.Cmd + " " + strings.Join(e.Args, " ")}
+
+ // no timeout == 1 year
+ if e.CmdExecTimeout == -1 {
+ e.CmdExecTimeout = 365 * 24 * 60 * 60
+ }
+
+ // Create pipes
+ outr, outw, err = os.Pipe()
+ if err != nil {
+ err = fmt.Errorf("Pipe stdout error: " + err.Error())
+ goto exitErr
+ }
+
+ errr, errw, err = os.Pipe()
+ if err != nil {
+ err = fmt.Errorf("Pipe stderr error: " + err.Error())
+ goto exitErr
+ }
+
+ inr, inw, err = os.Pipe()
+ if err != nil {
+ err = fmt.Errorf("Pipe stdin error: " + err.Error())
+ goto exitErr
+ }
+
+ e.proc, err = os.StartProcess("/bin/bash", bashArgs, &os.ProcAttr{
+ Files: []*os.File{inr, outw, errw},
+ Env: append(os.Environ(), e.Env...),
+ })
+ if err != nil {
+ err = fmt.Errorf("Process start error: " + err.Error())
+ goto exitErr
+ }
+
+ go func() {
+ defer outr.Close()
+ defer outw.Close()
+ defer errr.Close()
+ defer errw.Close()
+ defer inr.Close()
+ defer inw.Close()
+
+ stdoutDone := make(chan struct{})
+ go e.cmdPumpStdout(outr, stdoutDone)
+ go e.cmdPumpStderr(errr)
+
+ // Blocking function that poll input or wait for end of process
+ e.cmdPumpStdin(inw)
+
+ // Some commands will exit when stdin is closed.
+ inw.Close()
+
+ defer outr.Close()
+
+ if status, err := e.proc.Wait(); err == nil {
+ // Other commands need a bonk on the head.
+ if !status.Exited() {
+ if err := e.proc.Signal(os.Interrupt); err != nil {
+ e.logError("Proc interrupt:", err)
+ }
+
+ select {
+ case <-stdoutDone:
+ case <-time.After(time.Second):
+ // A bigger bonk on the head.
+ if err := e.proc.Signal(os.Kill); err != nil {
+ e.logError("Proc term:", err)
+ }
+ <-stdoutDone
+ }
+ }
+ }
+
+ delete(cmdIDMap, e.CmdID)
+ }()
+
+ return nil
+
+exitErr:
+ for _, pf := range []*os.File{outr, outw, errr, errw, inr, inw} {
+ pf.Close()
+ }
+ return err
+}
+
+func (e *ExecOverWS) logDebug(format string, a ...interface{}) {
+ if e.Log != nil {
+ e.Log.Debugf(format, a)
+ }
+}
+
+func (e *ExecOverWS) logError(format string, a ...interface{}) {
+ if e.Log != nil {
+ e.Log.Errorf(format, a)
+ }
+}