aboutsummaryrefslogtreecommitdiffstats
path: root/lib/agent
diff options
context:
space:
mode:
Diffstat (limited to 'lib/agent')
-rw-r--r--lib/agent/apiv1-sdks.go74
-rw-r--r--lib/agent/apiv1.go11
-rw-r--r--lib/agent/events.go6
-rw-r--r--lib/agent/project-st.go5
-rw-r--r--lib/agent/projects.go5
-rw-r--r--lib/agent/xdsserver.go104
6 files changed, 173 insertions, 32 deletions
diff --git a/lib/agent/apiv1-sdks.go b/lib/agent/apiv1-sdks.go
index 7a445ce..7d6342b 100644
--- a/lib/agent/apiv1-sdks.go
+++ b/lib/agent/apiv1-sdks.go
@@ -17,10 +17,84 @@
package agent
+import (
+ "encoding/json"
+ "fmt"
+
+ "github.com/iotbzh/xds-agent/lib/xaapiv1"
+ "github.com/iotbzh/xds-server/lib/xsapiv1"
+)
+
// sdksPassthroughInit Declare passthrough routes for sdks
func (s *APIService) sdksPassthroughInit(svr *XdsServer) error {
svr.PassthroughGet("/sdks")
svr.PassthroughGet("/sdks/:id")
+ svr.PassthroughPost("/sdks")
+ svr.PassthroughPost("/sdks/abortinstall")
+ svr.PassthroughDelete("/sdks/:id")
+
+ return nil
+}
+
+// sdksEventsForwardInit Register events forwarder for sdks
+func (s *APIService) sdksEventsForwardInit(svr *XdsServer) error {
+
+ if !svr.Connected {
+ return fmt.Errorf("Cannot register events: XDS Server %v not connected", svr.ID)
+ }
+
+ // Forward SDK events from XDS-server to client
+ if _, err := svr.EventOn(xsapiv1.EVTSDKInstall, "", s._sdkEventInstallCB); err != nil {
+ s.Log.Errorf("XDS Server EventOn '%s' failed: %v", xsapiv1.EVTSDKInstall, err)
+ return err
+ }
+
+ if _, err := svr.EventOn(xsapiv1.EVTSDKRemove, "", s._sdkEventRemoveCB); err != nil {
+ s.Log.Errorf("XDS Server EventOn '%s' failed: %v", xsapiv1.EVTSDKRemove, err)
+ return err
+ }
+
+ return nil
+}
+
+func (s *APIService) _sdkEventInstallCB(privD interface{}, data interface{}) error {
+ // assume that xsapiv1.SDKManagementMsg == xaapiv1.SDKManagementMsg
+ evt := xaapiv1.SDKManagementMsg{}
+ evtName := xaapiv1.EVTSDKInstall
+ d, err := json.Marshal(data)
+ if err != nil {
+ s.Log.Errorf("Cannot marshal XDS Server %s: err=%v", evtName, err)
+ return err
+ }
+ if err = json.Unmarshal(d, &evt); err != nil {
+ s.Log.Errorf("Cannot unmarshal XDS Server %s: err=%v", evtName, err)
+ return err
+ }
+
+ if err := s.events.Emit(evtName, evt, ""); err != nil {
+ s.Log.Warningf("Cannot notify %s (from server): %v", evtName, err)
+ return err
+ }
+ return nil
+}
+
+func (s *APIService) _sdkEventRemoveCB(privD interface{}, data interface{}) error {
+ // assume that xsapiv1.SDKManagementMsg == xaapiv1.SDKManagementMsg
+ evt := xaapiv1.SDKManagementMsg{}
+ evtName := xaapiv1.EVTSDKRemove
+ d, err := json.Marshal(data)
+ if err != nil {
+ s.Log.Errorf("Cannot marshal XDS Server %s: err=%v", evtName, err)
+ return err
+ }
+ if err = json.Unmarshal(d, &evt); err != nil {
+ s.Log.Errorf("Cannot unmarshal XDS Server %s: err=%v", evtName, err)
+ return err
+ }
+ if err := s.events.Emit(evtName, evt, ""); err != nil {
+ s.Log.Warningf("Cannot notify %s (from server): %v", evtName, err)
+ return err
+ }
return nil
}
diff --git a/lib/agent/apiv1.go b/lib/agent/apiv1.go
index 8ec26d2..d0e5a1c 100644
--- a/lib/agent/apiv1.go
+++ b/lib/agent/apiv1.go
@@ -23,6 +23,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/iotbzh/xds-agent/lib/xdsconfig"
+ "github.com/iotbzh/xds-server/lib/xsapiv1"
)
const apiBaseURL = "/api/v1"
@@ -129,11 +130,21 @@ func (s *APIService) AddXdsServer(cfg xdsconfig.XDSServerConf) (*XdsServer, erro
// Add to map
s.xdsServers[svr.ID] = svr
+ // Register event forwarder
+ s.sdksEventsForwardInit(svr)
+
// Load projects
if err == nil && svr.Connected {
err = s.projects.Init(svr)
}
+ // Registered to all events
+ if err == nil && svr.Connected {
+ if err = svr.EventRegister(xsapiv1.EVTAll, ""); err != nil {
+ s.Log.Errorf("XDS Server %v - register all events error: %v", svr.ID, err)
+ }
+ }
+
return svr, err
}
diff --git a/lib/agent/events.go b/lib/agent/events.go
index df7015a..678f116 100644
--- a/lib/agent/events.go
+++ b/lib/agent/events.go
@@ -88,14 +88,14 @@ func (e *Events) UnRegister(evName, sessionID string) error {
}
// Emit Used to manually emit an event
-func (e *Events) Emit(evName string, data interface{},fromSid string) error {
+func (e *Events) Emit(evName string, data interface{}, fromSid string) error {
var firstErr error
if _, ok := e.eventsMap[evName]; !ok {
return fmt.Errorf("Unsupported event type")
}
- e.LogSillyf("Emit Event %s: %v", evName, data)
+ e.LogSillyf("Emit Event %s: %v", evName, data)
firstErr = nil
evm := e.eventsMap[evName]
@@ -113,7 +113,7 @@ func (e *Events) Emit(evName string, data interface{},fromSid string) error {
Type: evName,
Data: data,
}
- e.Log.Debugf("Emit Event %s: %v", evName, sid)
+
if err := (*so).Emit(evName, msg); err != nil {
e.Log.Errorf("WS Emit %v error : %v", evName, err)
if firstErr == nil {
diff --git a/lib/agent/project-st.go b/lib/agent/project-st.go
index 8ac5f36..c92f5d4 100644
--- a/lib/agent/project-st.go
+++ b/lib/agent/project-st.go
@@ -106,9 +106,8 @@ func (p *STProject) Setup(prj xaapiv1.ProjectConfig) (*xaapiv1.ProjectConfig, er
// Register events to update folder status
// Register to XDS Server events
- p.server.EventOn(xsapiv1.EVTFolderStateChange, "", p._cbServerFolderChanged)
- if err := p.server.EventRegister(xsapiv1.EVTFolderStateChange, svrPrj.ID); err != nil {
- p.Log.Warningf("XDS Server EventRegister failed: %v", err)
+ if _, err := p.server.EventOn(xsapiv1.EVTFolderStateChange, "", p._cbServerFolderChanged); err != nil {
+ p.Log.Errorf("XDS Server EventOn '%s' failed: %v", xsapiv1.EVTFolderStateChange, err)
return svrPrj, err
}
diff --git a/lib/agent/projects.go b/lib/agent/projects.go
index d6268fa..7393364 100644
--- a/lib/agent/projects.go
+++ b/lib/agent/projects.go
@@ -75,6 +75,11 @@ func (p *Projects) Init(server *XdsServer) error {
for _, prj := range xFlds {
newP := svr.FolderToProject(prj)
if _, err := p.createUpdate(newP, false, true); err != nil {
+ // Don't consider that as an error (allow support config without CloudSync support)
+ if p.Context.SThg == nil && strings.Contains(err.Error(), "Server doesn't support project type CloudSync") {
+ continue
+ }
+
errMsg += "Error while creating project id " + prj.ID + ": " + err.Error() + "\n "
continue
}
diff --git a/lib/agent/xdsserver.go b/lib/agent/xdsserver.go
index 32656cf..3ec6123 100644
--- a/lib/agent/xdsserver.go
+++ b/lib/agent/xdsserver.go
@@ -21,7 +21,6 @@ import (
"encoding/json"
"fmt"
"io"
- "io/ioutil"
"net/http"
"strings"
"sync"
@@ -239,8 +238,11 @@ func (xs *XdsServer) PassthroughPost(url string) {
}
xs.apiRouter.POST(url, func(c *gin.Context) {
- bodyReq := []byte{}
- n, err := c.Request.Body.Read(bodyReq)
+ var err error
+ var data interface{}
+
+ // Get raw body
+ body, err := c.GetRawData()
if err != nil {
common.APIError(c, err.Error())
return
@@ -253,34 +255,80 @@ func (xs *XdsServer) PassthroughPost(url string) {
}
// Send Post request
- body, err := json.Marshal(bodyReq[:n])
+ response, err := xs.client.HTTPPostWithRes(nURL, string(body))
if err != nil {
- common.APIError(c, err.Error())
+ goto httpError
+ }
+ if response.StatusCode != 200 {
+ err = fmt.Errorf(response.Status)
return
}
-
- response, err := xs.client.HTTPPostWithRes(nURL, string(body))
+ err = json.Unmarshal(xs.client.ResponseToBArray(response), &data)
if err != nil {
- common.APIError(c, err.Error())
- return
+ goto httpError
}
- bodyRes, err := ioutil.ReadAll(response.Body)
+ c.JSON(http.StatusOK, data)
+ return
+
+ /* Handle error case */
+ httpError:
+ if strings.Contains(err.Error(), "connection refused") {
+ xs._Disconnected()
+ }
+ common.APIError(c, err.Error())
+ })
+}
+
+// PassthroughDelete Used to declare a route that sends directly a Delete request to XDS Server
+func (xs *XdsServer) PassthroughDelete(url string) {
+ if xs.apiRouter == nil {
+ xs.Log.Errorf("apiRouter not set !")
+ return
+ }
+
+ xs.apiRouter.DELETE(url, func(c *gin.Context) {
+ var err error
+ var data interface{}
+
+ // Take care of param (eg. id in /projects/:id)
+ nURL := url
+ if strings.Contains(url, ":") {
+ nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
+ }
+
+ // Send Post request
+ response, err := xs.client.HTTPDeleteWithRes(nURL)
if err != nil {
- common.APIError(c, "Cannot read response body")
+ goto httpError
+ }
+ if response.StatusCode != 200 {
+ err = fmt.Errorf(response.Status)
return
}
- c.JSON(http.StatusOK, string(bodyRes))
+ err = json.Unmarshal(xs.client.ResponseToBArray(response), &data)
+ if err != nil {
+ goto httpError
+ }
+
+ c.JSON(http.StatusOK, data)
+ return
+
+ /* Handle error case */
+ httpError:
+ if strings.Contains(err.Error(), "connection refused") {
+ xs._Disconnected()
+ }
+ common.APIError(c, err.Error())
})
}
// EventRegister Post a request to register to an XdsServer event
-func (xs *XdsServer) EventRegister(evName string, id string) error {
- return xs.client.Post(
- "/events/register",
+func (xs *XdsServer) EventRegister(evName string, filter string) error {
+ return xs.client.Post("/events/register",
xsapiv1.EventRegisterArgs{
- Name: evName,
- ProjectID: id,
+ Name: evName,
+ Filter: filter,
},
nil)
}
@@ -308,15 +356,15 @@ func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uu
evn := evName
err := xs.ioSock.On(evn, func(data interface{}) error {
- xs.sockEventsLock.Lock()
- sEvts := make([]*caller, len(xs.sockEvents[evn]))
- copy(sEvts, xs.sockEvents[evn])
- xs.sockEventsLock.Unlock()
- for _, c := range sEvts {
- c.Func(c.PrivateData, data)
- }
- return nil
- })
+ xs.sockEventsLock.Lock()
+ sEvts := make([]*caller, len(xs.sockEvents[evn]))
+ copy(sEvts, xs.sockEvents[evn])
+ xs.sockEventsLock.Unlock()
+ for _, c := range sEvts {
+ c.Func(c.PrivateData, data)
+ }
+ return nil
+ })
if err != nil {
return uuid.Nil, err
}
@@ -493,6 +541,10 @@ func (xs *XdsServer) _Reconnect() error {
// Reload projects list for this server
err = xs.projects.Init(xs)
}
+ if err == nil {
+ // Register again to all events
+ err = xs.EventRegister(xsapiv1.EVTAll, "")
+ }
return err
}