aboutsummaryrefslogtreecommitdiffstats
path: root/lib/agent/xdsserver.go
diff options
context:
space:
mode:
Diffstat (limited to 'lib/agent/xdsserver.go')
-rw-r--r--lib/agent/xdsserver.go285
1 files changed, 208 insertions, 77 deletions
diff --git a/lib/agent/xdsserver.go b/lib/agent/xdsserver.go
index 518c68b..c900c9e 100644
--- a/lib/agent/xdsserver.go
+++ b/lib/agent/xdsserver.go
@@ -7,6 +7,7 @@ import (
"io/ioutil"
"net/http"
"strings"
+ "sync"
"time"
"github.com/gin-gonic/gin"
@@ -16,7 +17,7 @@ import (
sio_client "github.com/sebd71/go-socket.io-client"
)
-// Server .
+// XdsServer .
type XdsServer struct {
*Context
ID string
@@ -26,11 +27,13 @@ type XdsServer struct {
ConnRetry int
Connected bool
Disabled bool
- ServerConfig *xdsServerConfig
+ ServerConfig *XdsServerConfig
- // callbacks
+ // Events management
CBOnError func(error)
CBOnDisconnect func(error)
+ sockEvents map[string][]*caller
+ sockEventsLock *sync.Mutex
// Private fields
client *common.HTTPClient
@@ -39,25 +42,25 @@ type XdsServer struct {
apiRouter *gin.RouterGroup
}
-// xdsServerConfig Data return by GET /config
-type xdsServerConfig struct {
+// XdsServerConfig Data return by GET /config
+type XdsServerConfig struct {
ID string `json:"id"`
Version string `json:"version"`
APIVersion string `json:"apiVersion"`
VersionGitTag string `json:"gitTag"`
SupportedSharing map[string]bool `json:"supportedSharing"`
- Builder xdsBuilderConfig `json:"builder"`
+ Builder XdsBuilderConfig `json:"builder"`
}
-// xdsBuilderConfig represents the builder container configuration
-type xdsBuilderConfig struct {
+// XdsBuilderConfig represents the builder container configuration
+type XdsBuilderConfig struct {
IP string `json:"ip"`
Port string `json:"port"`
SyncThingID string `json:"syncThingID"`
}
-// FolderType XdsServer folder type
-type FolderType string
+// XdsFolderType XdsServer folder type
+type XdsFolderType string
const (
XdsTypePathMap = "PathMap"
@@ -65,28 +68,52 @@ const (
XdsTypeCifsSmb = "CIFS"
)
-// FolderConfig XdsServer folder config
-type FolderConfig struct {
- ID string `json:"id"`
- Label string `json:"label"`
- ClientPath string `json:"path"`
- Type FolderType `json:"type"`
- Status string `json:"status"`
- IsInSync bool `json:"isInSync"`
- DefaultSdk string `json:"defaultSdk"`
+// XdsFolderConfig XdsServer folder config
+type XdsFolderConfig struct {
+ ID string `json:"id"`
+ Label string `json:"label"`
+ ClientPath string `json:"path"`
+ Type XdsFolderType `json:"type"`
+ Status string `json:"status"`
+ IsInSync bool `json:"isInSync"`
+ DefaultSdk string `json:"defaultSdk"`
// Specific data depending on which Type is used
- DataPathMap PathMapConfig `json:"dataPathMap,omitempty"`
- DataCloudSync CloudSyncConfig `json:"dataCloudSync,omitempty"`
+ DataPathMap XdsPathMapConfig `json:"dataPathMap,omitempty"`
+ DataCloudSync XdsCloudSyncConfig `json:"dataCloudSync,omitempty"`
}
-// PathMapConfig Path mapping specific data
-type PathMapConfig struct {
+// XdsPathMapConfig Path mapping specific data
+type XdsPathMapConfig struct {
ServerPath string `json:"serverPath"`
}
-// CloudSyncConfig CloudSync (AKA Syncthing) specific data
-type CloudSyncConfig struct {
- SyncThingID string `json:"syncThingID"`
+// XdsCloudSyncConfig CloudSync (AKA Syncthing) specific data
+type XdsCloudSyncConfig struct {
+ SyncThingID string `json:"syncThingID"`
+ STSvrStatus string `json:"-"`
+ STSvrIsInSync bool `json:"-"`
+ STLocStatus string `json:"-"`
+ STLocIsInSync bool `json:"-"`
+}
+
+// XdsEventRegisterArgs arguments used to register to XDS server events
+type XdsEventRegisterArgs struct {
+ Name string `json:"name"`
+ ProjectID string `json:"filterProjectID"`
+}
+
+// XdsEventFolderChange Folder change event structure
+type XdsEventFolderChange struct {
+ Time string `json:"time"`
+ Type string `json:"type"`
+ Folder XdsFolderConfig `json:"folder"`
+}
+
+// caller Used to chain event listeners
+type caller struct {
+ id uuid.UUID
+ EventName string
+ Func func(interface{})
}
const _IDTempoPrefix = "tempo-"
@@ -103,7 +130,9 @@ func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
Connected: false,
Disabled: false,
- logOut: ctx.Log.Out,
+ sockEvents: make(map[string][]*caller),
+ sockEventsLock: &sync.Mutex{},
+ logOut: ctx.Log.Out,
}
}
@@ -138,7 +167,7 @@ func (xs *XdsServer) Connect() error {
time.Sleep(time.Second)
}
if retry == 0 {
- // FIXME SEB: re-use _reconnect to wait longer in background
+ // FIXME: re-use _reconnect to wait longer in background
return fmt.Errorf("Connection to XDS Server failure")
}
if err != nil {
@@ -161,16 +190,35 @@ func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
xs.logOut = out
}
+// SendCommand Send a command to XDS Server
+func (xs *XdsServer) SendCommand(cmd string, body []byte) (*http.Response, error) {
+ url := cmd
+ if !strings.HasPrefix("/", cmd) {
+ url = "/" + cmd
+ }
+ return xs.client.HTTPPostWithRes(url, string(body))
+}
+
+// GetVersion Send Get request to retrieve XDS Server version
+func (xs *XdsServer) GetVersion(res interface{}) error {
+ return xs._HTTPGet("/version", &res)
+}
+
+// GetFolders Send GET request to get current folder configuration
+func (xs *XdsServer) GetFolders(prjs *[]XdsFolderConfig) error {
+ return xs._HTTPGet("/folders", prjs)
+}
+
// FolderAdd Send POST request to add a folder
-func (xs *XdsServer) FolderAdd(prj *FolderConfig, res interface{}) error {
- response, err := xs.HTTPPost("/folder", prj)
+func (xs *XdsServer) FolderAdd(prj *XdsFolderConfig, res interface{}) error {
+ response, err := xs._HTTPPost("/folder", prj)
if err != nil {
return err
}
if response.StatusCode != 200 {
return fmt.Errorf("FolderAdd error status=%s", response.Status)
}
- // Result is a FolderConfig that is equivalent to ProjectConfig
+ // Result is a XdsFolderConfig that is equivalent to ProjectConfig
err = json.Unmarshal(xs.client.ResponseToBArray(response), res)
return err
@@ -181,27 +229,9 @@ func (xs *XdsServer) FolderDelete(id string) error {
return xs.client.HTTPDelete("/folder/" + id)
}
-// HTTPGet .
-func (xs *XdsServer) HTTPGet(url string, data interface{}) error {
- var dd []byte
- if err := xs.client.HTTPGet(url, &dd); err != nil {
- return err
- }
- return json.Unmarshal(dd, &data)
-}
-
-// HTTPPost .
-func (xs *XdsServer) HTTPPost(url string, data interface{}) (*http.Response, error) {
- body, err := json.Marshal(data)
- if err != nil {
- return nil, err
- }
- return xs.HTTPPostBody(url, string(body))
-}
-
-// HTTPPostBody .
-func (xs *XdsServer) HTTPPostBody(url string, body string) (*http.Response, error) {
- return xs.client.HTTPPostWithRes(url, body)
+// FolderSync Send POST request to force synchronization of a folder
+func (xs *XdsServer) FolderSync(id string) error {
+ return xs.client.HTTPPost("/folder/sync/"+id, "")
}
// SetAPIRouterGroup .
@@ -218,7 +248,7 @@ func (xs *XdsServer) PassthroughGet(url string) {
xs.apiRouter.GET(url, func(c *gin.Context) {
var data interface{}
- if err := xs.HTTPGet(url, &data); err != nil {
+ if err := xs._HTTPGet(url, &data); err != nil {
if strings.Contains(err.Error(), "connection refused") {
xs.Connected = false
xs._NotifyState()
@@ -246,7 +276,7 @@ func (xs *XdsServer) PassthroughPost(url string) {
return
}
- response, err := xs.HTTPPostBody(url, string(bodyReq[:n]))
+ response, err := xs._HTTPPost(url, bodyReq[:n])
if err != nil {
common.APIError(c, err.Error())
return
@@ -260,49 +290,132 @@ func (xs *XdsServer) PassthroughPost(url string) {
})
}
+// EventRegister Post a request to register to an XdsServer event
+func (xs *XdsServer) EventRegister(evName string, id string) error {
+ var err error
+ _, err = xs._HTTPPost("/events/register", XdsEventRegisterArgs{
+ Name: evName,
+ ProjectID: id,
+ })
+ return err
+}
+
// EventOn Register a callback on events reception
-func (xs *XdsServer) EventOn(message string, f interface{}) (err error) {
+func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, error) {
if xs.ioSock == nil {
- return fmt.Errorf("Io.Socket not initialized")
+ return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
}
- // FIXME SEB: support chain / multiple listeners
- /* sockEvents map[string][]*caller
+
xs.sockEventsLock.Lock()
- xs.sockEvents[message] = append(xs.sockEvents[message], f)
- xs.sockEventsLock.Unlock()
- xs.ioSock.On(message, func(ev) {
+ defer xs.sockEventsLock.Unlock()
+
+ if _, exist := xs.sockEvents[evName]; !exist {
+ // Register listener only the first time
+ evn := evName
+
+ // FIXME: use generic type: data interface{} instead of data XdsEventFolderChange
+ var err error
+ if evName == "event:FolderStateChanged" {
+ err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error {
+ xs.sockEventsLock.Lock()
+ defer xs.sockEventsLock.Unlock()
+ for _, c := range xs.sockEvents[evn] {
+ c.Func(data)
+ }
+ return nil
+ })
+ } else {
+ err = xs.ioSock.On(evn, f)
+ }
+ if err != nil {
+ return uuid.Nil, err
+ }
+ }
- })
- */
- return xs.ioSock.On(message, f)
+ c := &caller{
+ id: uuid.NewV1(),
+ EventName: evName,
+ Func: f,
+ }
+
+ xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
+ return c.id, nil
+}
+
+// EventOff Un-register a (or all) callbacks associated to an event
+func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
+ xs.sockEventsLock.Lock()
+ defer xs.sockEventsLock.Unlock()
+ if _, exist := xs.sockEvents[evName]; exist {
+ if id == uuid.Nil {
+ // Un-register all
+ xs.sockEvents[evName] = []*caller{}
+ } else {
+ // Un-register only the specified callback
+ for i, ff := range xs.sockEvents[evName] {
+ if uuid.Equal(ff.id, id) {
+ xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
+ break
+ }
+ }
+ }
+ }
+ return nil
}
-// ProjectToFolder
-func (xs *XdsServer) ProjectToFolder(pPrj ProjectConfig) *FolderConfig {
+// ProjectToFolder Convert Project structure to Folder structure
+func (xs *XdsServer) ProjectToFolder(pPrj ProjectConfig) *XdsFolderConfig {
stID := ""
if pPrj.Type == XdsTypeCloudSync {
stID, _ = xs.SThg.IDGet()
}
- fPrj := FolderConfig{
+ fPrj := XdsFolderConfig{
ID: pPrj.ID,
Label: pPrj.Label,
ClientPath: pPrj.ClientPath,
- Type: FolderType(pPrj.Type),
+ Type: XdsFolderType(pPrj.Type),
Status: pPrj.Status,
IsInSync: pPrj.IsInSync,
DefaultSdk: pPrj.DefaultSdk,
- DataPathMap: PathMapConfig{
+ DataPathMap: XdsPathMapConfig{
ServerPath: pPrj.ServerPath,
},
- DataCloudSync: CloudSyncConfig{
- SyncThingID: stID,
+ DataCloudSync: XdsCloudSyncConfig{
+ SyncThingID: stID,
+ STLocIsInSync: pPrj.IsInSync,
+ STLocStatus: pPrj.Status,
+ STSvrIsInSync: pPrj.IsInSync,
+ STSvrStatus: pPrj.Status,
},
}
+
return &fPrj
}
-// FolderToProject
-func (xs *XdsServer) FolderToProject(fPrj FolderConfig) ProjectConfig {
+// FolderToProject Convert Folder structure to Project structure
+func (xs *XdsServer) FolderToProject(fPrj XdsFolderConfig) ProjectConfig {
+ inSync := fPrj.IsInSync
+ sts := fPrj.Status
+
+ if fPrj.Type == XdsTypeCloudSync {
+ inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
+
+ sts = fPrj.DataCloudSync.STSvrStatus
+ switch fPrj.DataCloudSync.STLocStatus {
+ case StatusErrorConfig, StatusDisable, StatusPause:
+ sts = fPrj.DataCloudSync.STLocStatus
+ break
+ case StatusSyncing:
+ if sts != StatusErrorConfig && sts != StatusDisable && sts != StatusPause {
+ sts = StatusSyncing
+ }
+ break
+ case StatusEnable:
+ // keep STSvrStatus
+ break
+ }
+ }
+
pPrj := ProjectConfig{
ID: fPrj.ID,
ServerID: xs.ID,
@@ -310,8 +423,8 @@ func (xs *XdsServer) FolderToProject(fPrj FolderConfig) ProjectConfig {
ClientPath: fPrj.ClientPath,
ServerPath: fPrj.DataPathMap.ServerPath,
Type: ProjectType(fPrj.Type),
- Status: fPrj.Status,
- IsInSync: fPrj.IsInSync,
+ Status: sts,
+ IsInSync: inSync,
DefaultSdk: fPrj.DefaultSdk,
}
return pPrj
@@ -350,6 +463,24 @@ func (xs *XdsServer) _CreateConnectHTTP() error {
return nil
}
+// _HTTPGet .
+func (xs *XdsServer) _HTTPGet(url string, data interface{}) error {
+ var dd []byte
+ if err := xs.client.HTTPGet(url, &dd); err != nil {
+ return err
+ }
+ return json.Unmarshal(dd, &data)
+}
+
+// _HTTPPost .
+func (xs *XdsServer) _HTTPPost(url string, data interface{}) (*http.Response, error) {
+ body, err := json.Marshal(data)
+ if err != nil {
+ return nil, err
+ }
+ return xs.client.HTTPPostWithRes(url, string(body))
+}
+
// Re-established connection
func (xs *XdsServer) _reconnect() error {
err := xs._connect(true)
@@ -363,8 +494,8 @@ func (xs *XdsServer) _reconnect() error {
// Established HTTP and WS connection and retrieve XDSServer config
func (xs *XdsServer) _connect(reConn bool) error {
- xdsCfg := xdsServerConfig{}
- if err := xs.HTTPGet("/config", &xdsCfg); err != nil {
+ xdsCfg := XdsServerConfig{}
+ if err := xs._HTTPGet("/config", &xdsCfg); err != nil {
xs.Connected = false
if !reConn {
xs._NotifyState()