diff options
author | Sebastien Douheret <sebastien.douheret@iot.bzh> | 2019-04-04 23:45:56 +0200 |
---|---|---|
committer | Sebastien Douheret <sebastien.douheret@iot.bzh> | 2019-04-04 23:45:56 +0200 |
commit | 89ea6ebd3671e6ebbf6101525a5416427806f318 (patch) | |
tree | 5db52146365a9c2c439b77485f938cc8c2e3a727 /eows/eows-out.go | |
parent | ee147062c3bebed83e34bf5ce71019c95f62b96f (diff) |
Fixed package tree and go mod filev0.5.0
Change-Id: I1047094d5b80d0622e2c2ce674979f18207b8c0f
Signed-off-by: Sebastien Douheret <sebastien.douheret@iot.bzh>
Diffstat (limited to 'eows/eows-out.go')
-rw-r--r-- | eows/eows-out.go | 122 |
1 files changed, 122 insertions, 0 deletions
diff --git a/eows/eows-out.go b/eows/eows-out.go new file mode 100644 index 0000000..3163b2f --- /dev/null +++ b/eows/eows-out.go @@ -0,0 +1,122 @@ +package eows + +import ( + "bufio" + "io" + "strings" + "time" +) + +// scanChars - gain character by character (or as soon as one or more characters are available) +func scanChars(data []byte, atEOF bool) (advance int, token []byte, err error) { + if atEOF && len(data) == 0 { + return 0, nil, nil + } + return len(data), data, nil +} + +// _pumper is in charge to collect +func (e *ExecOverWS) _pumper(sc *bufio.Scanner, fctCB func(s string)) { + + // Select split function (default sc.ScanLines) + if e.OutSplit == SplitChar || e.OutSplit == SplitLineTime || e.OutSplit == SplitTime { + sc.Split(scanChars) + } + + // Scan method according to split type + if e.OutSplit == SplitLineTime || e.OutSplit == SplitTime { + t0 := time.Now() + buf := "" + for sc.Scan() { + buf += sc.Text() + if time.Since(t0).Nanoseconds() > e.LineTimeSpan || + (e.OutSplit == SplitLineTime && strings.Contains(buf, "\n")) { + fctCB(buf) + buf = "" + t0 = time.Now() + } + if e.procExited { + break + } + } + // Send remaining characters + if len(buf) > 0 { + fctCB(buf) + } + + } else { + + for sc.Scan() { + fctCB(sc.Text()) + if e.procExited { + break + } + } + } + +} + +// pipePumpStdout is in charge to forward stdout in websocket +func (e *ExecOverWS) pipePumpStdout(r io.Reader, done chan struct{}) { + + sc := bufio.NewScanner(r) + + e._pumper(sc, func(b string) { + if e.OutputCB != nil { + e.OutputCB(e, []byte(b), []byte{}) + } + }) + + e.logDebug("STDOUT pump exit") + + if sc.Err() != nil && !strings.Contains(sc.Err().Error(), "file already closed") { + e.logError("stdout scan: %v", sc.Err()) + } + + close(done) +} + +// pipePumpStderr is in charge to forward stderr in websocket +func (e *ExecOverWS) pipePumpStderr(r io.Reader) { + + sc := bufio.NewScanner(r) + + e._pumper(sc, func(b string) { + if e.OutputCB != nil { + e.OutputCB(e, []byte{}, []byte(b)) + } + }) + + e.logDebug("STDERR pump exit") + + if sc.Err() != nil && !strings.Contains(sc.Err().Error(), "file already closed") { + e.logError("stderr scan: %v", sc.Err()) + } +} + +// ptsPumpStdout is in charge to forward stdout in websocket +// (only used when PtyMode is set) +func (e *ExecOverWS) ptsPumpStdout(r io.Reader, done chan struct{}) { + + buffer := make([]byte, 1024) + for { + n, err := r.Read(buffer) + if err != nil { + if err != io.EOF && + !strings.Contains(err.Error(), "file already closed") { + e.logError("Error stdout read: %v", err) + } + break + } + if n == 0 { + continue + } + if e.OutputCB != nil { + e.OutputCB(e, buffer[:n], []byte{}) + } + } + + close(done) + + e.logDebug("Eows stdout pump exited") +} |