diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/agent/apiv1-sdks.go | 74 | ||||
-rw-r--r-- | lib/agent/apiv1.go | 11 | ||||
-rw-r--r-- | lib/agent/events.go | 6 | ||||
-rw-r--r-- | lib/agent/project-st.go | 5 | ||||
-rw-r--r-- | lib/agent/projects.go | 5 | ||||
-rw-r--r-- | lib/agent/xdsserver.go | 104 | ||||
-rw-r--r-- | lib/syncthing/st.go | 2 | ||||
-rw-r--r-- | lib/xaapiv1/events.go | 21 | ||||
-rw-r--r-- | lib/xaapiv1/sdks.go | 50 |
9 files changed, 239 insertions, 39 deletions
diff --git a/lib/agent/apiv1-sdks.go b/lib/agent/apiv1-sdks.go index 7a445ce..7d6342b 100644 --- a/lib/agent/apiv1-sdks.go +++ b/lib/agent/apiv1-sdks.go @@ -17,10 +17,84 @@ package agent +import ( + "encoding/json" + "fmt" + + "github.com/iotbzh/xds-agent/lib/xaapiv1" + "github.com/iotbzh/xds-server/lib/xsapiv1" +) + // sdksPassthroughInit Declare passthrough routes for sdks func (s *APIService) sdksPassthroughInit(svr *XdsServer) error { svr.PassthroughGet("/sdks") svr.PassthroughGet("/sdks/:id") + svr.PassthroughPost("/sdks") + svr.PassthroughPost("/sdks/abortinstall") + svr.PassthroughDelete("/sdks/:id") + + return nil +} + +// sdksEventsForwardInit Register events forwarder for sdks +func (s *APIService) sdksEventsForwardInit(svr *XdsServer) error { + + if !svr.Connected { + return fmt.Errorf("Cannot register events: XDS Server %v not connected", svr.ID) + } + + // Forward SDK events from XDS-server to client + if _, err := svr.EventOn(xsapiv1.EVTSDKInstall, "", s._sdkEventInstallCB); err != nil { + s.Log.Errorf("XDS Server EventOn '%s' failed: %v", xsapiv1.EVTSDKInstall, err) + return err + } + + if _, err := svr.EventOn(xsapiv1.EVTSDKRemove, "", s._sdkEventRemoveCB); err != nil { + s.Log.Errorf("XDS Server EventOn '%s' failed: %v", xsapiv1.EVTSDKRemove, err) + return err + } + + return nil +} + +func (s *APIService) _sdkEventInstallCB(privD interface{}, data interface{}) error { + // assume that xsapiv1.SDKManagementMsg == xaapiv1.SDKManagementMsg + evt := xaapiv1.SDKManagementMsg{} + evtName := xaapiv1.EVTSDKInstall + d, err := json.Marshal(data) + if err != nil { + s.Log.Errorf("Cannot marshal XDS Server %s: err=%v", evtName, err) + return err + } + if err = json.Unmarshal(d, &evt); err != nil { + s.Log.Errorf("Cannot unmarshal XDS Server %s: err=%v", evtName, err) + return err + } + + if err := s.events.Emit(evtName, evt, ""); err != nil { + s.Log.Warningf("Cannot notify %s (from server): %v", evtName, err) + return err + } + return nil +} + +func (s *APIService) _sdkEventRemoveCB(privD interface{}, data interface{}) error { + // assume that xsapiv1.SDKManagementMsg == xaapiv1.SDKManagementMsg + evt := xaapiv1.SDKManagementMsg{} + evtName := xaapiv1.EVTSDKRemove + d, err := json.Marshal(data) + if err != nil { + s.Log.Errorf("Cannot marshal XDS Server %s: err=%v", evtName, err) + return err + } + if err = json.Unmarshal(d, &evt); err != nil { + s.Log.Errorf("Cannot unmarshal XDS Server %s: err=%v", evtName, err) + return err + } + if err := s.events.Emit(evtName, evt, ""); err != nil { + s.Log.Warningf("Cannot notify %s (from server): %v", evtName, err) + return err + } return nil } diff --git a/lib/agent/apiv1.go b/lib/agent/apiv1.go index 8ec26d2..d0e5a1c 100644 --- a/lib/agent/apiv1.go +++ b/lib/agent/apiv1.go @@ -23,6 +23,7 @@ import ( "github.com/gin-gonic/gin" "github.com/iotbzh/xds-agent/lib/xdsconfig" + "github.com/iotbzh/xds-server/lib/xsapiv1" ) const apiBaseURL = "/api/v1" @@ -129,11 +130,21 @@ func (s *APIService) AddXdsServer(cfg xdsconfig.XDSServerConf) (*XdsServer, erro // Add to map s.xdsServers[svr.ID] = svr + // Register event forwarder + s.sdksEventsForwardInit(svr) + // Load projects if err == nil && svr.Connected { err = s.projects.Init(svr) } + // Registered to all events + if err == nil && svr.Connected { + if err = svr.EventRegister(xsapiv1.EVTAll, ""); err != nil { + s.Log.Errorf("XDS Server %v - register all events error: %v", svr.ID, err) + } + } + return svr, err } diff --git a/lib/agent/events.go b/lib/agent/events.go index df7015a..678f116 100644 --- a/lib/agent/events.go +++ b/lib/agent/events.go @@ -88,14 +88,14 @@ func (e *Events) UnRegister(evName, sessionID string) error { } // Emit Used to manually emit an event -func (e *Events) Emit(evName string, data interface{},fromSid string) error { +func (e *Events) Emit(evName string, data interface{}, fromSid string) error { var firstErr error if _, ok := e.eventsMap[evName]; !ok { return fmt.Errorf("Unsupported event type") } - e.LogSillyf("Emit Event %s: %v", evName, data) + e.LogSillyf("Emit Event %s: %v", evName, data) firstErr = nil evm := e.eventsMap[evName] @@ -113,7 +113,7 @@ func (e *Events) Emit(evName string, data interface{},fromSid string) error { Type: evName, Data: data, } - e.Log.Debugf("Emit Event %s: %v", evName, sid) + if err := (*so).Emit(evName, msg); err != nil { e.Log.Errorf("WS Emit %v error : %v", evName, err) if firstErr == nil { diff --git a/lib/agent/project-st.go b/lib/agent/project-st.go index 8ac5f36..c92f5d4 100644 --- a/lib/agent/project-st.go +++ b/lib/agent/project-st.go @@ -106,9 +106,8 @@ func (p *STProject) Setup(prj xaapiv1.ProjectConfig) (*xaapiv1.ProjectConfig, er // Register events to update folder status // Register to XDS Server events - p.server.EventOn(xsapiv1.EVTFolderStateChange, "", p._cbServerFolderChanged) - if err := p.server.EventRegister(xsapiv1.EVTFolderStateChange, svrPrj.ID); err != nil { - p.Log.Warningf("XDS Server EventRegister failed: %v", err) + if _, err := p.server.EventOn(xsapiv1.EVTFolderStateChange, "", p._cbServerFolderChanged); err != nil { + p.Log.Errorf("XDS Server EventOn '%s' failed: %v", xsapiv1.EVTFolderStateChange, err) return svrPrj, err } diff --git a/lib/agent/projects.go b/lib/agent/projects.go index d6268fa..7393364 100644 --- a/lib/agent/projects.go +++ b/lib/agent/projects.go @@ -75,6 +75,11 @@ func (p *Projects) Init(server *XdsServer) error { for _, prj := range xFlds { newP := svr.FolderToProject(prj) if _, err := p.createUpdate(newP, false, true); err != nil { + // Don't consider that as an error (allow support config without CloudSync support) + if p.Context.SThg == nil && strings.Contains(err.Error(), "Server doesn't support project type CloudSync") { + continue + } + errMsg += "Error while creating project id " + prj.ID + ": " + err.Error() + "\n " continue } diff --git a/lib/agent/xdsserver.go b/lib/agent/xdsserver.go index 32656cf..3ec6123 100644 --- a/lib/agent/xdsserver.go +++ b/lib/agent/xdsserver.go @@ -21,7 +21,6 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "net/http" "strings" "sync" @@ -239,8 +238,11 @@ func (xs *XdsServer) PassthroughPost(url string) { } xs.apiRouter.POST(url, func(c *gin.Context) { - bodyReq := []byte{} - n, err := c.Request.Body.Read(bodyReq) + var err error + var data interface{} + + // Get raw body + body, err := c.GetRawData() if err != nil { common.APIError(c, err.Error()) return @@ -253,34 +255,80 @@ func (xs *XdsServer) PassthroughPost(url string) { } // Send Post request - body, err := json.Marshal(bodyReq[:n]) + response, err := xs.client.HTTPPostWithRes(nURL, string(body)) if err != nil { - common.APIError(c, err.Error()) + goto httpError + } + if response.StatusCode != 200 { + err = fmt.Errorf(response.Status) return } - - response, err := xs.client.HTTPPostWithRes(nURL, string(body)) + err = json.Unmarshal(xs.client.ResponseToBArray(response), &data) if err != nil { - common.APIError(c, err.Error()) - return + goto httpError } - bodyRes, err := ioutil.ReadAll(response.Body) + c.JSON(http.StatusOK, data) + return + + /* Handle error case */ + httpError: + if strings.Contains(err.Error(), "connection refused") { + xs._Disconnected() + } + common.APIError(c, err.Error()) + }) +} + +// PassthroughDelete Used to declare a route that sends directly a Delete request to XDS Server +func (xs *XdsServer) PassthroughDelete(url string) { + if xs.apiRouter == nil { + xs.Log.Errorf("apiRouter not set !") + return + } + + xs.apiRouter.DELETE(url, func(c *gin.Context) { + var err error + var data interface{} + + // Take care of param (eg. id in /projects/:id) + nURL := url + if strings.Contains(url, ":") { + nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL) + } + + // Send Post request + response, err := xs.client.HTTPDeleteWithRes(nURL) if err != nil { - common.APIError(c, "Cannot read response body") + goto httpError + } + if response.StatusCode != 200 { + err = fmt.Errorf(response.Status) return } - c.JSON(http.StatusOK, string(bodyRes)) + err = json.Unmarshal(xs.client.ResponseToBArray(response), &data) + if err != nil { + goto httpError + } + + c.JSON(http.StatusOK, data) + return + + /* Handle error case */ + httpError: + if strings.Contains(err.Error(), "connection refused") { + xs._Disconnected() + } + common.APIError(c, err.Error()) }) } // EventRegister Post a request to register to an XdsServer event -func (xs *XdsServer) EventRegister(evName string, id string) error { - return xs.client.Post( - "/events/register", +func (xs *XdsServer) EventRegister(evName string, filter string) error { + return xs.client.Post("/events/register", xsapiv1.EventRegisterArgs{ - Name: evName, - ProjectID: id, + Name: evName, + Filter: filter, }, nil) } @@ -308,15 +356,15 @@ func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uu evn := evName err := xs.ioSock.On(evn, func(data interface{}) error { - xs.sockEventsLock.Lock() - sEvts := make([]*caller, len(xs.sockEvents[evn])) - copy(sEvts, xs.sockEvents[evn]) - xs.sockEventsLock.Unlock() - for _, c := range sEvts { - c.Func(c.PrivateData, data) - } - return nil - }) + xs.sockEventsLock.Lock() + sEvts := make([]*caller, len(xs.sockEvents[evn])) + copy(sEvts, xs.sockEvents[evn]) + xs.sockEventsLock.Unlock() + for _, c := range sEvts { + c.Func(c.PrivateData, data) + } + return nil + }) if err != nil { return uuid.Nil, err } @@ -493,6 +541,10 @@ func (xs *XdsServer) _Reconnect() error { // Reload projects list for this server err = xs.projects.Init(xs) } + if err == nil { + // Register again to all events + err = xs.EventRegister(xsapiv1.EVTAll, "") + } return err } diff --git a/lib/syncthing/st.go b/lib/syncthing/st.go index c4b72c5..1ea3947 100644 --- a/lib/syncthing/st.go +++ b/lib/syncthing/st.go @@ -188,7 +188,7 @@ func (s *SyncThing) startProc(exeName string, args []string, env []string, eChan cmdOut, err := cmd.StdoutPipe() if err != nil { - return nil, fmt.Errorf("Pipe stdout error for : %s", err) + return nil, fmt.Errorf("Pipe stdout error for : %v", err) } go io.Copy(outfile, cmdOut) diff --git a/lib/xaapiv1/events.go b/lib/xaapiv1/events.go index 0ac08e8..16c2dd7 100644 --- a/lib/xaapiv1/events.go +++ b/lib/xaapiv1/events.go @@ -45,6 +45,8 @@ const ( EVTProjectAdd = EventTypePrefix + "project-add" // type EventMsg with Data type xaapiv1.ProjectConfig EVTProjectDelete = EventTypePrefix + "project-delete" // type EventMsg with Data type xaapiv1.ProjectConfig EVTProjectChange = EventTypePrefix + "project-state-change" // type EventMsg with Data type xaapiv1.ProjectConfig + EVTSDKInstall = EventTypePrefix + "sdk-install" // type EventMsg with Data type xaapiv1.SDKManagementMsg + EVTSDKRemove = EventTypePrefix + "sdk-remove" // type EventMsg with Data type xaapiv1.SDKManagementMsg ) // EVTAllList List of all supported events @@ -53,6 +55,8 @@ var EVTAllList = []string{ EVTProjectAdd, EVTProjectDelete, EVTProjectChange, + EVTSDKInstall, + EVTSDKRemove, } // EventMsg Event message send over Websocket, data format depend to Type (see DecodeXXX function) @@ -92,3 +96,20 @@ func (e *EventMsg) DecodeProjectConfig() (ProjectConfig, error) { } return p, err } + +// DecodeSDKMsg Helper to decode Data field type SDKManagementMsg +func (e *EventMsg) DecodeSDKMsg() (SDKManagementMsg, error) { + var err error + s := SDKManagementMsg{} + switch e.Type { + case EVTSDKInstall, EVTSDKRemove: + d := []byte{} + d, err = json.Marshal(e.Data) + if err == nil { + err = json.Unmarshal(d, &s) + } + default: + err = fmt.Errorf("Invalid type") + } + return s, err +} diff --git a/lib/xaapiv1/sdks.go b/lib/xaapiv1/sdks.go index 2dceecf..589f748 100644 --- a/lib/xaapiv1/sdks.go +++ b/lib/xaapiv1/sdks.go @@ -17,12 +17,50 @@ package xaapiv1 +// SDK status definition +const ( + SdkStatusDisable = "Disable" + SdkStatusNotInstalled = "Not Installed" + SdkStatusInstalling = "Installing" + SdkStatusUninstalling = "Un-installing" + SdkStatusInstalled = "Installed" +) + // SDK Define a cross tool chain used to build application type SDK struct { - ID string `json:"id" binding:"required"` - Name string `json:"name"` - Profile string `json:"profile"` - Version string `json:"version"` - Arch string `json:"arch"` - Path string `json:"path"` + ID string `json:"id" binding:"required"` + Name string `json:"name"` + Description string `json:"description"` + Profile string `json:"profile"` + Version string `json:"version"` + Arch string `json:"arch"` + Path string `json:"path"` + URL string `json:"url"` + Status string `json:"status"` + Date string `json:"date"` + Size string `json:"size"` + Md5sum string `json:"md5sum"` + SetupFile string `json:"setupFile"` + LastError string `json:"lastError"` +} + +// SDKInstallArgs JSON parameters of POST /sdks or /sdks/abortinstall commands +type SDKInstallArgs struct { + ID string `json:"id" binding:"required"` // install by ID (must be part of GET /sdks result) + Filename string `json:"filename"` // install by using a file + Force bool `json:"force"` // force SDK install when already existing + Timeout int `json:"timeout"` // 1800 == default 30 minutes +} + +// SDKManagementMsg Message send during SDK installation or when installation is complete +type SDKManagementMsg struct { + CmdID string `json:"cmdID"` + Timestamp string `json:"timestamp"` + Sdk SDK `json:"sdk"` + Stdout string `json:"stdout"` + Stderr string `json:"stderr"` + Progress int `json:"progress"` // 0 = not started to 100% = complete + Exited bool `json:"exited"` + Code int `json:"code"` + Error string `json:"error"` } |