diff options
Diffstat (limited to 'lib/agent')
-rw-r--r-- | lib/agent/apiv1-sdks.go | 74 | ||||
-rw-r--r-- | lib/agent/apiv1.go | 11 | ||||
-rw-r--r-- | lib/agent/events.go | 6 | ||||
-rw-r--r-- | lib/agent/project-st.go | 5 | ||||
-rw-r--r-- | lib/agent/projects.go | 5 | ||||
-rw-r--r-- | lib/agent/xdsserver.go | 104 |
6 files changed, 173 insertions, 32 deletions
diff --git a/lib/agent/apiv1-sdks.go b/lib/agent/apiv1-sdks.go index 7a445ce..7d6342b 100644 --- a/lib/agent/apiv1-sdks.go +++ b/lib/agent/apiv1-sdks.go @@ -17,10 +17,84 @@ package agent +import ( + "encoding/json" + "fmt" + + "github.com/iotbzh/xds-agent/lib/xaapiv1" + "github.com/iotbzh/xds-server/lib/xsapiv1" +) + // sdksPassthroughInit Declare passthrough routes for sdks func (s *APIService) sdksPassthroughInit(svr *XdsServer) error { svr.PassthroughGet("/sdks") svr.PassthroughGet("/sdks/:id") + svr.PassthroughPost("/sdks") + svr.PassthroughPost("/sdks/abortinstall") + svr.PassthroughDelete("/sdks/:id") + + return nil +} + +// sdksEventsForwardInit Register events forwarder for sdks +func (s *APIService) sdksEventsForwardInit(svr *XdsServer) error { + + if !svr.Connected { + return fmt.Errorf("Cannot register events: XDS Server %v not connected", svr.ID) + } + + // Forward SDK events from XDS-server to client + if _, err := svr.EventOn(xsapiv1.EVTSDKInstall, "", s._sdkEventInstallCB); err != nil { + s.Log.Errorf("XDS Server EventOn '%s' failed: %v", xsapiv1.EVTSDKInstall, err) + return err + } + + if _, err := svr.EventOn(xsapiv1.EVTSDKRemove, "", s._sdkEventRemoveCB); err != nil { + s.Log.Errorf("XDS Server EventOn '%s' failed: %v", xsapiv1.EVTSDKRemove, err) + return err + } + + return nil +} + +func (s *APIService) _sdkEventInstallCB(privD interface{}, data interface{}) error { + // assume that xsapiv1.SDKManagementMsg == xaapiv1.SDKManagementMsg + evt := xaapiv1.SDKManagementMsg{} + evtName := xaapiv1.EVTSDKInstall + d, err := json.Marshal(data) + if err != nil { + s.Log.Errorf("Cannot marshal XDS Server %s: err=%v", evtName, err) + return err + } + if err = json.Unmarshal(d, &evt); err != nil { + s.Log.Errorf("Cannot unmarshal XDS Server %s: err=%v", evtName, err) + return err + } + + if err := s.events.Emit(evtName, evt, ""); err != nil { + s.Log.Warningf("Cannot notify %s (from server): %v", evtName, err) + return err + } + return nil +} + +func (s *APIService) _sdkEventRemoveCB(privD interface{}, data interface{}) error { + // assume that xsapiv1.SDKManagementMsg == xaapiv1.SDKManagementMsg + evt := xaapiv1.SDKManagementMsg{} + evtName := xaapiv1.EVTSDKRemove + d, err := json.Marshal(data) + if err != nil { + s.Log.Errorf("Cannot marshal XDS Server %s: err=%v", evtName, err) + return err + } + if err = json.Unmarshal(d, &evt); err != nil { + s.Log.Errorf("Cannot unmarshal XDS Server %s: err=%v", evtName, err) + return err + } + if err := s.events.Emit(evtName, evt, ""); err != nil { + s.Log.Warningf("Cannot notify %s (from server): %v", evtName, err) + return err + } return nil } diff --git a/lib/agent/apiv1.go b/lib/agent/apiv1.go index 8ec26d2..d0e5a1c 100644 --- a/lib/agent/apiv1.go +++ b/lib/agent/apiv1.go @@ -23,6 +23,7 @@ import ( "github.com/gin-gonic/gin" "github.com/iotbzh/xds-agent/lib/xdsconfig" + "github.com/iotbzh/xds-server/lib/xsapiv1" ) const apiBaseURL = "/api/v1" @@ -129,11 +130,21 @@ func (s *APIService) AddXdsServer(cfg xdsconfig.XDSServerConf) (*XdsServer, erro // Add to map s.xdsServers[svr.ID] = svr + // Register event forwarder + s.sdksEventsForwardInit(svr) + // Load projects if err == nil && svr.Connected { err = s.projects.Init(svr) } + // Registered to all events + if err == nil && svr.Connected { + if err = svr.EventRegister(xsapiv1.EVTAll, ""); err != nil { + s.Log.Errorf("XDS Server %v - register all events error: %v", svr.ID, err) + } + } + return svr, err } diff --git a/lib/agent/events.go b/lib/agent/events.go index df7015a..678f116 100644 --- a/lib/agent/events.go +++ b/lib/agent/events.go @@ -88,14 +88,14 @@ func (e *Events) UnRegister(evName, sessionID string) error { } // Emit Used to manually emit an event -func (e *Events) Emit(evName string, data interface{},fromSid string) error { +func (e *Events) Emit(evName string, data interface{}, fromSid string) error { var firstErr error if _, ok := e.eventsMap[evName]; !ok { return fmt.Errorf("Unsupported event type") } - e.LogSillyf("Emit Event %s: %v", evName, data) + e.LogSillyf("Emit Event %s: %v", evName, data) firstErr = nil evm := e.eventsMap[evName] @@ -113,7 +113,7 @@ func (e *Events) Emit(evName string, data interface{},fromSid string) error { Type: evName, Data: data, } - e.Log.Debugf("Emit Event %s: %v", evName, sid) + if err := (*so).Emit(evName, msg); err != nil { e.Log.Errorf("WS Emit %v error : %v", evName, err) if firstErr == nil { diff --git a/lib/agent/project-st.go b/lib/agent/project-st.go index 8ac5f36..c92f5d4 100644 --- a/lib/agent/project-st.go +++ b/lib/agent/project-st.go @@ -106,9 +106,8 @@ func (p *STProject) Setup(prj xaapiv1.ProjectConfig) (*xaapiv1.ProjectConfig, er // Register events to update folder status // Register to XDS Server events - p.server.EventOn(xsapiv1.EVTFolderStateChange, "", p._cbServerFolderChanged) - if err := p.server.EventRegister(xsapiv1.EVTFolderStateChange, svrPrj.ID); err != nil { - p.Log.Warningf("XDS Server EventRegister failed: %v", err) + if _, err := p.server.EventOn(xsapiv1.EVTFolderStateChange, "", p._cbServerFolderChanged); err != nil { + p.Log.Errorf("XDS Server EventOn '%s' failed: %v", xsapiv1.EVTFolderStateChange, err) return svrPrj, err } diff --git a/lib/agent/projects.go b/lib/agent/projects.go index d6268fa..7393364 100644 --- a/lib/agent/projects.go +++ b/lib/agent/projects.go @@ -75,6 +75,11 @@ func (p *Projects) Init(server *XdsServer) error { for _, prj := range xFlds { newP := svr.FolderToProject(prj) if _, err := p.createUpdate(newP, false, true); err != nil { + // Don't consider that as an error (allow support config without CloudSync support) + if p.Context.SThg == nil && strings.Contains(err.Error(), "Server doesn't support project type CloudSync") { + continue + } + errMsg += "Error while creating project id " + prj.ID + ": " + err.Error() + "\n " continue } diff --git a/lib/agent/xdsserver.go b/lib/agent/xdsserver.go index 32656cf..3ec6123 100644 --- a/lib/agent/xdsserver.go +++ b/lib/agent/xdsserver.go @@ -21,7 +21,6 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "net/http" "strings" "sync" @@ -239,8 +238,11 @@ func (xs *XdsServer) PassthroughPost(url string) { } xs.apiRouter.POST(url, func(c *gin.Context) { - bodyReq := []byte{} - n, err := c.Request.Body.Read(bodyReq) + var err error + var data interface{} + + // Get raw body + body, err := c.GetRawData() if err != nil { common.APIError(c, err.Error()) return @@ -253,34 +255,80 @@ func (xs *XdsServer) PassthroughPost(url string) { } // Send Post request - body, err := json.Marshal(bodyReq[:n]) + response, err := xs.client.HTTPPostWithRes(nURL, string(body)) if err != nil { - common.APIError(c, err.Error()) + goto httpError + } + if response.StatusCode != 200 { + err = fmt.Errorf(response.Status) return } - - response, err := xs.client.HTTPPostWithRes(nURL, string(body)) + err = json.Unmarshal(xs.client.ResponseToBArray(response), &data) if err != nil { - common.APIError(c, err.Error()) - return + goto httpError } - bodyRes, err := ioutil.ReadAll(response.Body) + c.JSON(http.StatusOK, data) + return + + /* Handle error case */ + httpError: + if strings.Contains(err.Error(), "connection refused") { + xs._Disconnected() + } + common.APIError(c, err.Error()) + }) +} + +// PassthroughDelete Used to declare a route that sends directly a Delete request to XDS Server +func (xs *XdsServer) PassthroughDelete(url string) { + if xs.apiRouter == nil { + xs.Log.Errorf("apiRouter not set !") + return + } + + xs.apiRouter.DELETE(url, func(c *gin.Context) { + var err error + var data interface{} + + // Take care of param (eg. id in /projects/:id) + nURL := url + if strings.Contains(url, ":") { + nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL) + } + + // Send Post request + response, err := xs.client.HTTPDeleteWithRes(nURL) if err != nil { - common.APIError(c, "Cannot read response body") + goto httpError + } + if response.StatusCode != 200 { + err = fmt.Errorf(response.Status) return } - c.JSON(http.StatusOK, string(bodyRes)) + err = json.Unmarshal(xs.client.ResponseToBArray(response), &data) + if err != nil { + goto httpError + } + + c.JSON(http.StatusOK, data) + return + + /* Handle error case */ + httpError: + if strings.Contains(err.Error(), "connection refused") { + xs._Disconnected() + } + common.APIError(c, err.Error()) }) } // EventRegister Post a request to register to an XdsServer event -func (xs *XdsServer) EventRegister(evName string, id string) error { - return xs.client.Post( - "/events/register", +func (xs *XdsServer) EventRegister(evName string, filter string) error { + return xs.client.Post("/events/register", xsapiv1.EventRegisterArgs{ - Name: evName, - ProjectID: id, + Name: evName, + Filter: filter, }, nil) } @@ -308,15 +356,15 @@ func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uu evn := evName err := xs.ioSock.On(evn, func(data interface{}) error { - xs.sockEventsLock.Lock() - sEvts := make([]*caller, len(xs.sockEvents[evn])) - copy(sEvts, xs.sockEvents[evn]) - xs.sockEventsLock.Unlock() - for _, c := range sEvts { - c.Func(c.PrivateData, data) - } - return nil - }) + xs.sockEventsLock.Lock() + sEvts := make([]*caller, len(xs.sockEvents[evn])) + copy(sEvts, xs.sockEvents[evn]) + xs.sockEventsLock.Unlock() + for _, c := range sEvts { + c.Func(c.PrivateData, data) + } + return nil + }) if err != nil { return uuid.Nil, err } @@ -493,6 +541,10 @@ func (xs *XdsServer) _Reconnect() error { // Reload projects list for this server err = xs.projects.Init(xs) } + if err == nil { + // Register again to all events + err = xs.EventRegister(xsapiv1.EVTAll, "") + } return err } |