summaryrefslogtreecommitdiffstats
path: root/conf.d/autobuild
ModeNameSize
d---------agl37logstatsplain
d---------linux37logstatsplain
href='#n75'>75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
package common

import (
	"bufio"
	"fmt"
	"io"
	"os"
	"strings"
	"time"

	"syscall"

	"github.com/Sirupsen/logrus"
	"github.com/googollee/go-socket.io"
)

// EmitOutputCB is the function callback used to emit data
type EmitOutputCB func(sid string, cmdID int, stdout, stderr string, data *map[string]interface{})

// EmitExitCB is the function callback used to emit exit proc code
type EmitExitCB func(sid string, cmdID int, code int, err error)

// Inspired by :
// https://github.com/gorilla/websocket/blob/master/examples/command/main.go

// ExecPipeWs executes a command and redirect stdout/stderr into a WebSocket
func ExecPipeWs(cmd []string, env []string, so *socketio.Socket, sid string, cmdID int,
	cmdExecTimeout int, log *logrus.Logger, eoCB EmitOutputCB, eeCB EmitExitCB, data *map[string]interface{}) error {

	outr, outw, err := os.Pipe()
	if err != nil {
		return fmt.Errorf("Pipe stdout error: " + err.Error())
	}

	// XXX - do we need to pipe stdin one day ?
	inr, inw, err := os.Pipe()
	if err != nil {
		outr.Close()
		outw.Close()
		return fmt.Errorf("Pipe stdin error: " + err.Error())
	}

	bashArgs := []string{"/bin/bash", "-c", strings.Join(cmd, " ")}
	proc, err := os.StartProcess("/bin/bash", bashArgs, &os.ProcAttr{
		Files: []*os.File{inr, outw, outw},
		Env:   append(os.Environ(), env...),
	})
	if err != nil {
		outr.Close()
		outw.Close()
		inr.Close()
		inw.Close()
		return fmt.Errorf("Process start error: " + err.Error())
	}

	go func() {
		defer outr.Close()
		defer outw.Close()
		defer inr.Close()
		defer inw.Close()

		stdoutDone := make(chan struct{})
		go cmdPumpStdout(so, outr, stdoutDone, sid, cmdID, log, eoCB, data)

		// Blocking function that poll input or wait for end of process
		cmdPumpStdin(so, inw, proc, sid, cmdID, cmdExecTimeout, log, eeCB)

		// Some commands will exit when stdin is closed.
		inw.Close()

		defer outr.Close()

		if status, err := proc.Wait(); err == nil {
			// Other commands need a bonk on the head.
			if !status.Exited() {
				if err := proc.Signal(os.Interrupt); err != nil {
					log.Errorln("Proc interrupt:", err)
				}

				select {
				case <-stdoutDone:
				case <-time.After(time.Second):
					// A bigger bonk on the head.
					if err := proc.Signal(os.Kill); err != nil {
						log.Errorln("Proc term:", err)
					}
					<-stdoutDone
				}
			}
		}
	}()

	return nil
}

func cmdPumpStdin(so *socketio.Socket, w io.Writer, proc *os.Process,
	sid string, cmdID int, tmo int, log *logrus.Logger, exitFuncCB EmitExitCB) {
	/* XXX - code to add to support stdin through WS
	for {
		_, message, err := so. ?? ReadMessage()
		if err != nil {
			break
		}
		message = append(message, '\n')
		if _, err := w.Write(message); err != nil {
			break
		}
	}
	*/

	// Monitor process exit
	type DoneChan struct {
		status int
		err    error
	}
	done := make(chan DoneChan, 1)
	go func() {
		status := 0
		sts, err := proc.Wait()
		if !sts.Success() {
			s := sts.Sys().(syscall.WaitStatus)
			status = s.ExitStatus()
		}
		done <- DoneChan{status, err}
	}()

	// Wait cmd complete
	select {
	case dC := <-done:
		exitFuncCB(sid, cmdID, dC.status, dC.err)
	case <-time.After(time.Duration(tmo) * time.Second):
		exitFuncCB(sid, cmdID, -99,
			fmt.Errorf("Exit Timeout for command ID %v", cmdID))
	}
}

func cmdPumpStdout(so *socketio.Socket, r io.Reader, done chan struct{},
	sid string, cmdID int, log *logrus.Logger, emitFuncCB EmitOutputCB, data *map[string]interface{}) {
	defer func() {
	}()

	sc := bufio.NewScanner(r)
	for sc.Scan() {
		emitFuncCB(sid, cmdID, string(sc.Bytes()), "", data)
	}
	if sc.Err() != nil {
		log.Errorln("scan:", sc.Err())
	}
	close(done)
}