diff options
Diffstat (limited to 'lib/agent/xdsserver.go')
-rw-r--r-- | lib/agent/xdsserver.go | 285 |
1 files changed, 208 insertions, 77 deletions
diff --git a/lib/agent/xdsserver.go b/lib/agent/xdsserver.go index 518c68b..c900c9e 100644 --- a/lib/agent/xdsserver.go +++ b/lib/agent/xdsserver.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "net/http" "strings" + "sync" "time" "github.com/gin-gonic/gin" @@ -16,7 +17,7 @@ import ( sio_client "github.com/sebd71/go-socket.io-client" ) -// Server . +// XdsServer . type XdsServer struct { *Context ID string @@ -26,11 +27,13 @@ type XdsServer struct { ConnRetry int Connected bool Disabled bool - ServerConfig *xdsServerConfig + ServerConfig *XdsServerConfig - // callbacks + // Events management CBOnError func(error) CBOnDisconnect func(error) + sockEvents map[string][]*caller + sockEventsLock *sync.Mutex // Private fields client *common.HTTPClient @@ -39,25 +42,25 @@ type XdsServer struct { apiRouter *gin.RouterGroup } -// xdsServerConfig Data return by GET /config -type xdsServerConfig struct { +// XdsServerConfig Data return by GET /config +type XdsServerConfig struct { ID string `json:"id"` Version string `json:"version"` APIVersion string `json:"apiVersion"` VersionGitTag string `json:"gitTag"` SupportedSharing map[string]bool `json:"supportedSharing"` - Builder xdsBuilderConfig `json:"builder"` + Builder XdsBuilderConfig `json:"builder"` } -// xdsBuilderConfig represents the builder container configuration -type xdsBuilderConfig struct { +// XdsBuilderConfig represents the builder container configuration +type XdsBuilderConfig struct { IP string `json:"ip"` Port string `json:"port"` SyncThingID string `json:"syncThingID"` } -// FolderType XdsServer folder type -type FolderType string +// XdsFolderType XdsServer folder type +type XdsFolderType string const ( XdsTypePathMap = "PathMap" @@ -65,28 +68,52 @@ const ( XdsTypeCifsSmb = "CIFS" ) -// FolderConfig XdsServer folder config -type FolderConfig struct { - ID string `json:"id"` - Label string `json:"label"` - ClientPath string `json:"path"` - Type FolderType `json:"type"` - Status string `json:"status"` - IsInSync bool `json:"isInSync"` - DefaultSdk string `json:"defaultSdk"` +// XdsFolderConfig XdsServer folder config +type XdsFolderConfig struct { + ID string `json:"id"` + Label string `json:"label"` + ClientPath string `json:"path"` + Type XdsFolderType `json:"type"` + Status string `json:"status"` + IsInSync bool `json:"isInSync"` + DefaultSdk string `json:"defaultSdk"` // Specific data depending on which Type is used - DataPathMap PathMapConfig `json:"dataPathMap,omitempty"` - DataCloudSync CloudSyncConfig `json:"dataCloudSync,omitempty"` + DataPathMap XdsPathMapConfig `json:"dataPathMap,omitempty"` + DataCloudSync XdsCloudSyncConfig `json:"dataCloudSync,omitempty"` } -// PathMapConfig Path mapping specific data -type PathMapConfig struct { +// XdsPathMapConfig Path mapping specific data +type XdsPathMapConfig struct { ServerPath string `json:"serverPath"` } -// CloudSyncConfig CloudSync (AKA Syncthing) specific data -type CloudSyncConfig struct { - SyncThingID string `json:"syncThingID"` +// XdsCloudSyncConfig CloudSync (AKA Syncthing) specific data +type XdsCloudSyncConfig struct { + SyncThingID string `json:"syncThingID"` + STSvrStatus string `json:"-"` + STSvrIsInSync bool `json:"-"` + STLocStatus string `json:"-"` + STLocIsInSync bool `json:"-"` +} + +// XdsEventRegisterArgs arguments used to register to XDS server events +type XdsEventRegisterArgs struct { + Name string `json:"name"` + ProjectID string `json:"filterProjectID"` +} + +// XdsEventFolderChange Folder change event structure +type XdsEventFolderChange struct { + Time string `json:"time"` + Type string `json:"type"` + Folder XdsFolderConfig `json:"folder"` +} + +// caller Used to chain event listeners +type caller struct { + id uuid.UUID + EventName string + Func func(interface{}) } const _IDTempoPrefix = "tempo-" @@ -103,7 +130,9 @@ func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer { Connected: false, Disabled: false, - logOut: ctx.Log.Out, + sockEvents: make(map[string][]*caller), + sockEventsLock: &sync.Mutex{}, + logOut: ctx.Log.Out, } } @@ -138,7 +167,7 @@ func (xs *XdsServer) Connect() error { time.Sleep(time.Second) } if retry == 0 { - // FIXME SEB: re-use _reconnect to wait longer in background + // FIXME: re-use _reconnect to wait longer in background return fmt.Errorf("Connection to XDS Server failure") } if err != nil { @@ -161,16 +190,35 @@ func (xs *XdsServer) SetLoggerOutput(out io.Writer) { xs.logOut = out } +// SendCommand Send a command to XDS Server +func (xs *XdsServer) SendCommand(cmd string, body []byte) (*http.Response, error) { + url := cmd + if !strings.HasPrefix("/", cmd) { + url = "/" + cmd + } + return xs.client.HTTPPostWithRes(url, string(body)) +} + +// GetVersion Send Get request to retrieve XDS Server version +func (xs *XdsServer) GetVersion(res interface{}) error { + return xs._HTTPGet("/version", &res) +} + +// GetFolders Send GET request to get current folder configuration +func (xs *XdsServer) GetFolders(prjs *[]XdsFolderConfig) error { + return xs._HTTPGet("/folders", prjs) +} + // FolderAdd Send POST request to add a folder -func (xs *XdsServer) FolderAdd(prj *FolderConfig, res interface{}) error { - response, err := xs.HTTPPost("/folder", prj) +func (xs *XdsServer) FolderAdd(prj *XdsFolderConfig, res interface{}) error { + response, err := xs._HTTPPost("/folder", prj) if err != nil { return err } if response.StatusCode != 200 { return fmt.Errorf("FolderAdd error status=%s", response.Status) } - // Result is a FolderConfig that is equivalent to ProjectConfig + // Result is a XdsFolderConfig that is equivalent to ProjectConfig err = json.Unmarshal(xs.client.ResponseToBArray(response), res) return err @@ -181,27 +229,9 @@ func (xs *XdsServer) FolderDelete(id string) error { return xs.client.HTTPDelete("/folder/" + id) } -// HTTPGet . -func (xs *XdsServer) HTTPGet(url string, data interface{}) error { - var dd []byte - if err := xs.client.HTTPGet(url, &dd); err != nil { - return err - } - return json.Unmarshal(dd, &data) -} - -// HTTPPost . -func (xs *XdsServer) HTTPPost(url string, data interface{}) (*http.Response, error) { - body, err := json.Marshal(data) - if err != nil { - return nil, err - } - return xs.HTTPPostBody(url, string(body)) -} - -// HTTPPostBody . -func (xs *XdsServer) HTTPPostBody(url string, body string) (*http.Response, error) { - return xs.client.HTTPPostWithRes(url, body) +// FolderSync Send POST request to force synchronization of a folder +func (xs *XdsServer) FolderSync(id string) error { + return xs.client.HTTPPost("/folder/sync/"+id, "") } // SetAPIRouterGroup . @@ -218,7 +248,7 @@ func (xs *XdsServer) PassthroughGet(url string) { xs.apiRouter.GET(url, func(c *gin.Context) { var data interface{} - if err := xs.HTTPGet(url, &data); err != nil { + if err := xs._HTTPGet(url, &data); err != nil { if strings.Contains(err.Error(), "connection refused") { xs.Connected = false xs._NotifyState() @@ -246,7 +276,7 @@ func (xs *XdsServer) PassthroughPost(url string) { return } - response, err := xs.HTTPPostBody(url, string(bodyReq[:n])) + response, err := xs._HTTPPost(url, bodyReq[:n]) if err != nil { common.APIError(c, err.Error()) return @@ -260,49 +290,132 @@ func (xs *XdsServer) PassthroughPost(url string) { }) } +// EventRegister Post a request to register to an XdsServer event +func (xs *XdsServer) EventRegister(evName string, id string) error { + var err error + _, err = xs._HTTPPost("/events/register", XdsEventRegisterArgs{ + Name: evName, + ProjectID: id, + }) + return err +} + // EventOn Register a callback on events reception -func (xs *XdsServer) EventOn(message string, f interface{}) (err error) { +func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, error) { if xs.ioSock == nil { - return fmt.Errorf("Io.Socket not initialized") + return uuid.Nil, fmt.Errorf("Io.Socket not initialized") } - // FIXME SEB: support chain / multiple listeners - /* sockEvents map[string][]*caller + xs.sockEventsLock.Lock() - xs.sockEvents[message] = append(xs.sockEvents[message], f) - xs.sockEventsLock.Unlock() - xs.ioSock.On(message, func(ev) { + defer xs.sockEventsLock.Unlock() + + if _, exist := xs.sockEvents[evName]; !exist { + // Register listener only the first time + evn := evName + + // FIXME: use generic type: data interface{} instead of data XdsEventFolderChange + var err error + if evName == "event:FolderStateChanged" { + err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error { + xs.sockEventsLock.Lock() + defer xs.sockEventsLock.Unlock() + for _, c := range xs.sockEvents[evn] { + c.Func(data) + } + return nil + }) + } else { + err = xs.ioSock.On(evn, f) + } + if err != nil { + return uuid.Nil, err + } + } - }) - */ - return xs.ioSock.On(message, f) + c := &caller{ + id: uuid.NewV1(), + EventName: evName, + Func: f, + } + + xs.sockEvents[evName] = append(xs.sockEvents[evName], c) + return c.id, nil +} + +// EventOff Un-register a (or all) callbacks associated to an event +func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error { + xs.sockEventsLock.Lock() + defer xs.sockEventsLock.Unlock() + if _, exist := xs.sockEvents[evName]; exist { + if id == uuid.Nil { + // Un-register all + xs.sockEvents[evName] = []*caller{} + } else { + // Un-register only the specified callback + for i, ff := range xs.sockEvents[evName] { + if uuid.Equal(ff.id, id) { + xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...) + break + } + } + } + } + return nil } -// ProjectToFolder -func (xs *XdsServer) ProjectToFolder(pPrj ProjectConfig) *FolderConfig { +// ProjectToFolder Convert Project structure to Folder structure +func (xs *XdsServer) ProjectToFolder(pPrj ProjectConfig) *XdsFolderConfig { stID := "" if pPrj.Type == XdsTypeCloudSync { stID, _ = xs.SThg.IDGet() } - fPrj := FolderConfig{ + fPrj := XdsFolderConfig{ ID: pPrj.ID, Label: pPrj.Label, ClientPath: pPrj.ClientPath, - Type: FolderType(pPrj.Type), + Type: XdsFolderType(pPrj.Type), Status: pPrj.Status, IsInSync: pPrj.IsInSync, DefaultSdk: pPrj.DefaultSdk, - DataPathMap: PathMapConfig{ + DataPathMap: XdsPathMapConfig{ ServerPath: pPrj.ServerPath, }, - DataCloudSync: CloudSyncConfig{ - SyncThingID: stID, + DataCloudSync: XdsCloudSyncConfig{ + SyncThingID: stID, + STLocIsInSync: pPrj.IsInSync, + STLocStatus: pPrj.Status, + STSvrIsInSync: pPrj.IsInSync, + STSvrStatus: pPrj.Status, }, } + return &fPrj } -// FolderToProject -func (xs *XdsServer) FolderToProject(fPrj FolderConfig) ProjectConfig { +// FolderToProject Convert Folder structure to Project structure +func (xs *XdsServer) FolderToProject(fPrj XdsFolderConfig) ProjectConfig { + inSync := fPrj.IsInSync + sts := fPrj.Status + + if fPrj.Type == XdsTypeCloudSync { + inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync + + sts = fPrj.DataCloudSync.STSvrStatus + switch fPrj.DataCloudSync.STLocStatus { + case StatusErrorConfig, StatusDisable, StatusPause: + sts = fPrj.DataCloudSync.STLocStatus + break + case StatusSyncing: + if sts != StatusErrorConfig && sts != StatusDisable && sts != StatusPause { + sts = StatusSyncing + } + break + case StatusEnable: + // keep STSvrStatus + break + } + } + pPrj := ProjectConfig{ ID: fPrj.ID, ServerID: xs.ID, @@ -310,8 +423,8 @@ func (xs *XdsServer) FolderToProject(fPrj FolderConfig) ProjectConfig { ClientPath: fPrj.ClientPath, ServerPath: fPrj.DataPathMap.ServerPath, Type: ProjectType(fPrj.Type), - Status: fPrj.Status, - IsInSync: fPrj.IsInSync, + Status: sts, + IsInSync: inSync, DefaultSdk: fPrj.DefaultSdk, } return pPrj @@ -350,6 +463,24 @@ func (xs *XdsServer) _CreateConnectHTTP() error { return nil } +// _HTTPGet . +func (xs *XdsServer) _HTTPGet(url string, data interface{}) error { + var dd []byte + if err := xs.client.HTTPGet(url, &dd); err != nil { + return err + } + return json.Unmarshal(dd, &data) +} + +// _HTTPPost . +func (xs *XdsServer) _HTTPPost(url string, data interface{}) (*http.Response, error) { + body, err := json.Marshal(data) + if err != nil { + return nil, err + } + return xs.client.HTTPPostWithRes(url, string(body)) +} + // Re-established connection func (xs *XdsServer) _reconnect() error { err := xs._connect(true) @@ -363,8 +494,8 @@ func (xs *XdsServer) _reconnect() error { // Established HTTP and WS connection and retrieve XDSServer config func (xs *XdsServer) _connect(reConn bool) error { - xdsCfg := xdsServerConfig{} - if err := xs.HTTPGet("/config", &xdsCfg); err != nil { + xdsCfg := XdsServerConfig{} + if err := xs._HTTPGet("/config", &xdsCfg); err != nil { xs.Connected = false if !reConn { xs._NotifyState() |