From 32791ffed5bdfaa698e90f9c067dc6e8ababbfc3 Mon Sep 17 00:00:00 2001 From: Sebastien Douheret Date: Wed, 29 Nov 2017 11:06:15 +0100 Subject: Fixed /exec input stream and /signal. Signed-off-by: Sebastien Douheret --- lib/agent/apiv1-exec.go | 159 ++++++++++++++++++++++++++++++++++++------------ lib/agent/apiv1.go | 2 +- lib/agent/xdsserver.go | 48 +++++++++++++++ lib/xaapiv1/exec.go | 12 ++++ lib/xdsconfig/config.go | 2 +- 5 files changed, 181 insertions(+), 42 deletions(-) (limited to 'lib') diff --git a/lib/agent/apiv1-exec.go b/lib/agent/apiv1-exec.go index 3cb4d23..5fdfb9d 100644 --- a/lib/agent/apiv1-exec.go +++ b/lib/agent/apiv1-exec.go @@ -1,37 +1,22 @@ package agent import ( - "encoding/json" - "io/ioutil" "net/http" "github.com/franciscocpg/reflectme" "github.com/gin-gonic/gin" - "github.com/iotbzh/xds-agent/lib/apiv1" + "github.com/iotbzh/xds-agent/lib/xaapiv1" common "github.com/iotbzh/xds-common/golib" + "github.com/iotbzh/xds-server/lib/xsapiv1" uuid "github.com/satori/go.uuid" ) // ExecCmd executes remotely a command func (s *APIService) execCmd(c *gin.Context) { - s._execRequest("/exec", c) -} - -// execSignalCmd executes remotely a command -func (s *APIService) execSignalCmd(c *gin.Context) { - s._execRequest("/signal", c) -} - -func (s *APIService) _execRequest(cmd string, c *gin.Context) { - data, err := c.GetRawData() - if err != nil { - common.APIError(c, err.Error()) - } - args := apiv1.ExecArgs{} - // XXX - we cannot use c.BindJSON, so directly unmarshall it - // (see https://github.com/gin-gonic/gin/issues/1078) - if err := json.Unmarshal(data, &args); err != nil { + args := xaapiv1.ExecArgs{} + if err := c.BindJSON(&args); err != nil { + s.Log.Warningf("/exec invalid args, err=%v", err) common.APIError(c, "Invalid arguments") return } @@ -70,17 +55,36 @@ func (s *APIService) _execRequest(cmd string, c *gin.Context) { return } - // Forward XDSServer WS events to client WS - // TODO removed static event name list and get it from XDSServer - evtList := []string{ - apiv1.ExecInEvent, - apiv1.ExecOutEvent, - apiv1.ExecInferiorInEvent, - apiv1.ExecInferiorOutEvent, + // Forward input events from client to XDSServer through WS + // TODO use XDSServer events names definition + evtInList := []string{ + xaapiv1.ExecInEvent, + xaapiv1.ExecInferiorInEvent, + } + for _, evName := range evtInList { + evN := evName + err := (*sock).On(evN, func(stdin string) { + if s.LogLevelSilly { + s.Log.Debugf("EXEC EVENT IN (%s) <<%v>>", evN, stdin) + } + svr.EventEmit(evN, stdin) + }) + if err != nil { + msgErr := "Error while registering WS for " + evN + " event" + s.Log.Errorf(msgErr, ", err: %v", err) + common.APIError(c, msgErr) + return + } } + // Forward output events from XDSServer to client through WS + // TODO use XDSServer events names definition var fwdFuncID []uuid.UUID - for _, evName := range evtList { + evtOutList := []string{ + xaapiv1.ExecOutEvent, + xaapiv1.ExecInferiorOutEvent, + } + for _, evName := range evtOutList { evN := evName fwdFunc := func(pData interface{}, evData interface{}) error { sid := pData.(string) @@ -94,6 +98,10 @@ func (s *APIService) _execRequest(cmd string, c *gin.Context) { // Add sessionID to event Data reflectme.SetField(evData, "sessionID", sid) + if s.LogLevelSilly { + s.Log.Debugf("EXEC EVENT OUT (%s) <<%v>>", evN, evData) + } + // Forward event to Client/Dashboard (*so).Emit(evN, evData) return nil @@ -108,9 +116,12 @@ func (s *APIService) _execRequest(cmd string, c *gin.Context) { // Handle Exit event separately to cleanup registered listener var exitFuncID uuid.UUID - exitFunc := func(pData interface{}, evData interface{}) error { - evN := apiv1.ExecExitEvent - sid := pData.(string) + exitFunc := func(privD interface{}, evData interface{}) error { + evN := xaapiv1.ExecExitEvent + + pData := privD.(map[string]string) + sid := pData["sessID"] + prjID := pData["prjID"] // Add sessionID to event Data reflectme.SetField(evData, "sessionID", sid) @@ -123,32 +134,100 @@ func (s *APIService) _execRequest(cmd string, c *gin.Context) { s.Log.Infof("%s not emitted: WS closed (sid:%s)", evN, sid) } + prj := s.projects.Get(prjID) + if prj != nil { + evD := evData.(map[string]interface{}) + cmdIDData, cmdIDExist := evD["cmdID"] + svr := (*prj).GetServer() + if svr != nil && cmdIDExist { + svr.CommandDelete(cmdIDData.(string)) + } else { + s.Log.Infof("%s: cannot retrieve server for sid=%s, prjID=%s, evD=%v", evN, sid, prjID, evD) + } + } else { + s.Log.Infof("%s: cannot retrieve project for sid=%s, prjID=%s", evN, sid, prjID) + } + // cleanup listener - for i, evName := range evtList { + for i, evName := range evtOutList { svr.EventOff(evName, fwdFuncID[i]) } svr.EventOff(evN, exitFuncID) return nil } - exitFuncID, err = svr.EventOn(apiv1.ExecExitEvent, sess.ID, exitFunc) + + prjCfg := (*prj).GetProject() + privData := map[string]string{"sessID": sess.ID, "prjID": prjCfg.ID} + exitFuncID, err = svr.EventOn(xaapiv1.ExecExitEvent, privData, exitFunc) if err != nil { common.APIError(c, err.Error()) return } // Forward back command to right server - response, err := svr.SendCommand(cmd, data) - if err != nil { + res := xsapiv1.ExecResult{} + xsArgs := &xsapiv1.ExecArgs{ + ID: args.ID, + SdkID: args.SdkID, + CmdID: args.CmdID, + Cmd: args.Cmd, + Args: args.Args, + Env: args.Env, + RPath: args.RPath, + TTY: args.TTY, + TTYGdbserverFix: args.TTYGdbserverFix, + ExitImmediate: args.ExitImmediate, + CmdTimeout: args.CmdTimeout, + } + if err := svr.CommandExec(xsArgs, &res); err != nil { common.APIError(c, err.Error()) return } - // Decode response - body, err := ioutil.ReadAll(response.Body) - if err != nil { - common.APIError(c, "Cannot read response body") + // Add command to running commands list + if err := svr.CommandAdd(res.CmdID, xsArgs); err != nil { + common.APIError(c, err.Error()) + return + } + + c.JSON(http.StatusOK, xaapiv1.ExecResult{Status: res.Status, CmdID: res.CmdID}) +} + +// execSignalCmd executes remotely the signal command +func (s *APIService) execSignalCmd(c *gin.Context) { + + args := xaapiv1.ExecSignalArgs{} + if err := c.BindJSON(&args); err != nil { + s.Log.Warningf("/signal invalid args, err=%v", err) + common.APIError(c, "Invalid arguments") return } - c.JSON(http.StatusOK, string(body)) + + // Retrieve on which xds-server the command is running + var svr *XdsServer + var dataCmd interface{} + for _, svr = range s.xdsServers { + dataCmd = svr.CommandGet(args.CmdID) + if dataCmd != nil { + break + } + } + if dataCmd == nil { + common.APIError(c, "Cannot retrieve XDS Server for this cmdID") + return + } + + // Forward back command to right server + res := xsapiv1.ExecSigResult{} + xsArgs := &xsapiv1.ExecSignalArgs{ + CmdID: args.CmdID, + Signal: args.Signal, + } + if err := svr.CommandSignal(xsArgs, &res); err != nil { + common.APIError(c, err.Error()) + return + } + + c.JSON(http.StatusOK, xaapiv1.ExecSignalResult{Status: res.Status, CmdID: res.CmdID}) } diff --git a/lib/agent/apiv1.go b/lib/agent/apiv1.go index 36e5a54..1051f2a 100644 --- a/lib/agent/apiv1.go +++ b/lib/agent/apiv1.go @@ -84,7 +84,7 @@ func (s *APIService) AddXdsServer(cfg xdsconfig.XDSServerConf) (*XdsServer, erro cfg.APIBaseURL = apiBaseURL } if cfg.APIPartialURL == "" { - cfg.APIPartialURL = "/server/" + strconv.Itoa(s.serverIndex) + cfg.APIPartialURL = "/servers/" + strconv.Itoa(s.serverIndex) s.serverIndex = s.serverIndex + 1 } diff --git a/lib/agent/xdsserver.go b/lib/agent/xdsserver.go index 620bae9..bca4b66 100644 --- a/lib/agent/xdsserver.go +++ b/lib/agent/xdsserver.go @@ -42,6 +42,7 @@ type XdsServer struct { ioSock *sio_client.Client logOut io.Writer apiRouter *gin.RouterGroup + cmdList map[string]interface{} } // EventCB Event emitter callback @@ -72,6 +73,7 @@ func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer { sockEvents: make(map[string][]*caller), sockEventsLock: &sync.Mutex{}, logOut: ctx.Log.Out, + cmdList: make(map[string]interface{}), } } @@ -172,6 +174,16 @@ func (xs *XdsServer) FolderUpdate(fld *xsapiv1.FolderConfig, resFld *xsapiv1.Fol return xs.client.Put("/folders/"+fld.ID, fld, resFld) } +// CommandExec Send POST request to execute a command +func (xs *XdsServer) CommandExec(args *xsapiv1.ExecArgs, res *xsapiv1.ExecResult) error { + return xs.client.Post("/exec", args, res) +} + +// CommandSignal Send POST request to send a signal to a command +func (xs *XdsServer) CommandSignal(args *xsapiv1.ExecSignalArgs, res *xsapiv1.ExecSigResult) error { + return xs.client.Post("/signal", args, res) +} + // SetAPIRouterGroup . func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) { xs.apiRouter = r @@ -259,6 +271,15 @@ func (xs *XdsServer) EventRegister(evName string, id string) error { nil) } +// EventEmit Emit a event to XDS Server through WS +func (xs *XdsServer) EventEmit(message string, args ...interface{}) error { + if xs.ioSock == nil { + return fmt.Errorf("Io.Socket not initialized") + } + + return xs.ioSock.Emit(message, args...) +} + // EventOn Register a callback on events reception func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) { if xs.ioSock == nil { @@ -404,6 +425,33 @@ func (xs *XdsServer) FolderToProject(fPrj xsapiv1.FolderConfig) xaapiv1.ProjectC return pPrj } +// CommandAdd Add a new command to the list of running commands +func (xs *XdsServer) CommandAdd(cmdID string, data interface{}) error { + if xs.CommandGet(cmdID) != nil { + return fmt.Errorf("command id already exist") + } + xs.cmdList[cmdID] = data + return nil +} + +// CommandDelete Delete a command from the command list +func (xs *XdsServer) CommandDelete(cmdID string) error { + if xs.CommandGet(cmdID) == nil { + return fmt.Errorf("unknown command id") + } + delete(xs.cmdList, cmdID) + return nil +} + +// CommandGet Retrieve a command data +func (xs *XdsServer) CommandGet(cmdID string) interface{} { + d, exist := xs.cmdList[cmdID] + if exist { + return d + } + return nil +} + /*** ** Private functions ***/ diff --git a/lib/xaapiv1/exec.go b/lib/xaapiv1/exec.go index b96be79..8438e7f 100644 --- a/lib/xaapiv1/exec.go +++ b/lib/xaapiv1/exec.go @@ -16,6 +16,18 @@ type ( CmdTimeout int `json:"timeout"` // command completion timeout in Second } + // ExecResult JSON result of /exec command + ExecResult struct { + Status string `json:"status"` // status OK + CmdID string `json:"cmdID"` // command unique ID + } + + // ExecSignalResult JSON result of /signal command + ExecSignalResult struct { + Status string `json:"status"` // status OK + CmdID string `json:"cmdID"` // command unique ID + } + // ExecInMsg Message used to received input characters (stdin) ExecInMsg struct { CmdID string `json:"cmdID"` diff --git a/lib/xdsconfig/config.go b/lib/xdsconfig/config.go index 56a9cf9..1cc48c5 100644 --- a/lib/xdsconfig/config.go +++ b/lib/xdsconfig/config.go @@ -8,9 +8,9 @@ import ( "os" "github.com/Sirupsen/logrus" - "github.com/urfave/cli" common "github.com/iotbzh/xds-common/golib" uuid "github.com/satori/go.uuid" + "github.com/urfave/cli" ) // Config parameters (json format) of /config command -- cgit 1.2.3-korg