aboutsummaryrefslogtreecommitdiffstats
path: root/lib/common/execPipeWs.go
blob: 9bb45174f6ec2f62d5c61f9c4686682b6c7bc6d5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package common

import (
	"bufio"
	"fmt"
	"io"
	"os"
	"strings"
	"time"

	"syscall"

	"github.com/Sirupsen/logrus"
	"github.com/googollee/go-socket.io"
)

// EmitOutputCB is the function callback used to emit data
type EmitOutputCB func(sid string, cmdID int, stdout, stderr string, data *map[string]interface{})

// EmitExitCB is the function callback used to emit exit proc code
type EmitExitCB func(sid string, cmdID int, code int, err error, data *map[string]interface{})

// Inspired by :
// https://github.com/gorilla/websocket/blob/master/examples/command/main.go

// ExecPipeWs executes a command and redirect stdout/stderr into a WebSocket
func ExecPipeWs(cmd []string, env []string, so *socketio.Socket, sid string, cmdID int,
	cmdExecTimeout int, log *logrus.Logger, eoCB EmitOutputCB, eeCB EmitExitCB, data *map[string]interface{}) error {

	outr, outw, err := os.Pipe()
	if err != nil {
		return fmt.Errorf("Pipe stdout error: " + err.Error())
	}

	// XXX - do we need to pipe stdin one day ?
	inr, inw, err := os.Pipe()
	if err != nil {
		outr.Close()
		outw.Close()
		return fmt.Errorf("Pipe stdin error: " + err.Error())
	}

	bashArgs := []string{"/bin/bash", "-c", strings.Join(cmd, " ")}
	proc, err := os.StartProcess("/bin/bash", bashArgs, &os.ProcAttr{
		Files: []*os.File{inr, outw, outw},
		Env:   append(os.Environ(), env...),
	})
	if err != nil {
		outr.Close()
		outw.Close()
		inr.Close()
		inw.Close()
		return fmt.Errorf("Process start error: " + err.Error())
	}

	go func() {
		defer outr.Close()
		defer outw.Close()
		defer inr.Close()
		defer inw.Close()

		stdoutDone := make(chan struct{})
		go cmdPumpStdout(so, outr, stdoutDone, sid, cmdID, log, eoCB, data)

		// Blocking function that poll input or wait for end of process
		cmdPumpStdin(so, inw, proc, sid, cmdID, cmdExecTimeout, log, eeCB, data)

		// Some commands will exit when stdin is closed.
		inw.Close()

		defer outr.Close()

		if status, err := proc.Wait(); err == nil {
			// Other commands need a bonk on the head.
			if !status.Exited() {
				if err := proc.Signal(os.Interrupt); err != nil {
					log.Errorln("Proc interrupt:", err)
				}

				select {
				case <-stdoutDone:
				case <-time.After(time.Second):
					// A bigger bonk on the head.
					if err := proc.Signal(os.Kill); err != nil {
						log.Errorln("Proc term:", err)
					}
					<-stdoutDone
				}
			}
		}
	}()

	return nil
}

func cmdPumpStdin(so *socketio.Socket, w io.Writer, proc *os.Process,
	sid string, cmdID int, tmo int, log *logrus.Logger, exitFuncCB EmitExitCB,
	data *map[string]interface{}) {
	/* XXX - code to add to support stdin through WS
	for {
		_, message, err := so. ?? ReadMessage()
		if err != nil {
			break
		}
		message = append(message, '\n')
		if _, err := w.Write(message); err != nil {
			break
		}
	}
	*/

	// Monitor process exit
	type DoneChan struct {
		status int
		err    error
	}
	done := make(chan DoneChan, 1)
	go func() {
		status := 0
		sts, err := proc.Wait()
		if !sts.Success() {
			s := sts.Sys().(syscall.WaitStatus)
			status = s.ExitStatus()
		}
		done <- DoneChan{status, err}
	}()

	// Wait cmd complete
	select {
	case dC := <-done:
		exitFuncCB(sid, cmdID, dC.status, dC.err, data)
	case <-time.After(time.Duration(tmo) * time.Second):
		exitFuncCB(sid, cmdID, -99,
			fmt.Errorf("Exit Timeout for command ID %v", cmdID), data)
	}
}

func cmdPumpStdout(so *socketio.Socket, r io.Reader, done chan struct{},
	sid string, cmdID int, log *logrus.Logger, emitFuncCB EmitOutputCB, data *map[string]interface{}) {
	defer func() {
	}()

	sc := bufio.NewScanner(r)
	for sc.Scan() {
		emitFuncCB(sid, cmdID, string(sc.Bytes()), "", data)
	}
	if sc.Err() != nil {
		log.Errorln("scan:", sc.Err())
	}
	close(done)
}