aboutsummaryrefslogtreecommitdiffstats
path: root/golib/execPipeWs.go
diff options
context:
space:
mode:
Diffstat (limited to 'golib/execPipeWs.go')
-rw-r--r--golib/execPipeWs.go151
1 files changed, 151 insertions, 0 deletions
diff --git a/golib/execPipeWs.go b/golib/execPipeWs.go
new file mode 100644
index 0000000..9bb4517
--- /dev/null
+++ b/golib/execPipeWs.go
@@ -0,0 +1,151 @@
+package common
+
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "os"
+ "strings"
+ "time"
+
+ "syscall"
+
+ "github.com/Sirupsen/logrus"
+ "github.com/googollee/go-socket.io"
+)
+
+// EmitOutputCB is the function callback used to emit data
+type EmitOutputCB func(sid string, cmdID int, stdout, stderr string, data *map[string]interface{})
+
+// EmitExitCB is the function callback used to emit exit proc code
+type EmitExitCB func(sid string, cmdID int, code int, err error, data *map[string]interface{})
+
+// Inspired by :
+// https://github.com/gorilla/websocket/blob/master/examples/command/main.go
+
+// ExecPipeWs executes a command and redirect stdout/stderr into a WebSocket
+func ExecPipeWs(cmd []string, env []string, so *socketio.Socket, sid string, cmdID int,
+ cmdExecTimeout int, log *logrus.Logger, eoCB EmitOutputCB, eeCB EmitExitCB, data *map[string]interface{}) error {
+
+ outr, outw, err := os.Pipe()
+ if err != nil {
+ return fmt.Errorf("Pipe stdout error: " + err.Error())
+ }
+
+ // XXX - do we need to pipe stdin one day ?
+ inr, inw, err := os.Pipe()
+ if err != nil {
+ outr.Close()
+ outw.Close()
+ return fmt.Errorf("Pipe stdin error: " + err.Error())
+ }
+
+ bashArgs := []string{"/bin/bash", "-c", strings.Join(cmd, " ")}
+ proc, err := os.StartProcess("/bin/bash", bashArgs, &os.ProcAttr{
+ Files: []*os.File{inr, outw, outw},
+ Env: append(os.Environ(), env...),
+ })
+ if err != nil {
+ outr.Close()
+ outw.Close()
+ inr.Close()
+ inw.Close()
+ return fmt.Errorf("Process start error: " + err.Error())
+ }
+
+ go func() {
+ defer outr.Close()
+ defer outw.Close()
+ defer inr.Close()
+ defer inw.Close()
+
+ stdoutDone := make(chan struct{})
+ go cmdPumpStdout(so, outr, stdoutDone, sid, cmdID, log, eoCB, data)
+
+ // Blocking function that poll input or wait for end of process
+ cmdPumpStdin(so, inw, proc, sid, cmdID, cmdExecTimeout, log, eeCB, data)
+
+ // Some commands will exit when stdin is closed.
+ inw.Close()
+
+ defer outr.Close()
+
+ if status, err := proc.Wait(); err == nil {
+ // Other commands need a bonk on the head.
+ if !status.Exited() {
+ if err := proc.Signal(os.Interrupt); err != nil {
+ log.Errorln("Proc interrupt:", err)
+ }
+
+ select {
+ case <-stdoutDone:
+ case <-time.After(time.Second):
+ // A bigger bonk on the head.
+ if err := proc.Signal(os.Kill); err != nil {
+ log.Errorln("Proc term:", err)
+ }
+ <-stdoutDone
+ }
+ }
+ }
+ }()
+
+ return nil
+}
+
+func cmdPumpStdin(so *socketio.Socket, w io.Writer, proc *os.Process,
+ sid string, cmdID int, tmo int, log *logrus.Logger, exitFuncCB EmitExitCB,
+ data *map[string]interface{}) {
+ /* XXX - code to add to support stdin through WS
+ for {
+ _, message, err := so. ?? ReadMessage()
+ if err != nil {
+ break
+ }
+ message = append(message, '\n')
+ if _, err := w.Write(message); err != nil {
+ break
+ }
+ }
+ */
+
+ // Monitor process exit
+ type DoneChan struct {
+ status int
+ err error
+ }
+ done := make(chan DoneChan, 1)
+ go func() {
+ status := 0
+ sts, err := proc.Wait()
+ if !sts.Success() {
+ s := sts.Sys().(syscall.WaitStatus)
+ status = s.ExitStatus()
+ }
+ done <- DoneChan{status, err}
+ }()
+
+ // Wait cmd complete
+ select {
+ case dC := <-done:
+ exitFuncCB(sid, cmdID, dC.status, dC.err, data)
+ case <-time.After(time.Duration(tmo) * time.Second):
+ exitFuncCB(sid, cmdID, -99,
+ fmt.Errorf("Exit Timeout for command ID %v", cmdID), data)
+ }
+}
+
+func cmdPumpStdout(so *socketio.Socket, r io.Reader, done chan struct{},
+ sid string, cmdID int, log *logrus.Logger, emitFuncCB EmitOutputCB, data *map[string]interface{}) {
+ defer func() {
+ }()
+
+ sc := bufio.NewScanner(r)
+ for sc.Scan() {
+ emitFuncCB(sid, cmdID, string(sc.Bytes()), "", data)
+ }
+ if sc.Err() != nil {
+ log.Errorln("scan:", sc.Err())
+ }
+ close(done)
+}