From 4695555e178bcabe54c5bf82117c9c4cef5440b5 Mon Sep 17 00:00:00 2001 From: Sebastien Douheret Date: Wed, 11 Oct 2017 00:24:02 +0200 Subject: Fixed Syncthing folder status events and exec command. --- lib/agent/agent.go | 36 +++--- lib/agent/apiv1-exec.go | 66 +++++++--- lib/agent/apiv1-version.go | 2 +- lib/agent/events.go | 15 ++- lib/agent/project-interface.go | 18 +-- lib/agent/project-pathmap.go | 26 ++-- lib/agent/project-st.go | 155 +++++++++++++++++----- lib/agent/projects.go | 33 ++--- lib/agent/session.go | 8 +- lib/agent/webserver.go | 4 +- lib/agent/xdsserver.go | 285 ++++++++++++++++++++++++++++++----------- 11 files changed, 441 insertions(+), 207 deletions(-) (limited to 'lib/agent') diff --git a/lib/agent/agent.go b/lib/agent/agent.go index 29b0622..3bdd89f 100644 --- a/lib/agent/agent.go +++ b/lib/agent/agent.go @@ -20,18 +20,19 @@ const cookieMaxAge = "3600" // Context holds the Agent context structure type Context struct { - ProgName string - Config *xdsconfig.Config - Log *logrus.Logger - SThg *st.SyncThing - SThgCmd *exec.Cmd - SThgInotCmd *exec.Cmd - - webServer *WebServer + ProgName string + Config *xdsconfig.Config + Log *logrus.Logger + LogLevelSilly bool + SThg *st.SyncThing + SThgCmd *exec.Cmd + SThgInotCmd *exec.Cmd + + webServer *WebServer xdsServers map[string]*XdsServer - sessions *Sessions - events *Events - projects *Projects + sessions *Sessions + events *Events + projects *Projects Exit chan os.Signal } @@ -53,15 +54,18 @@ func NewAgent(cliCtx *cli.Context) *Context { } log.Formatter = &logrus.TextFormatter{} + sillyVal, sillyLog := os.LookupEnv("XDS_LOG_SILLY") + // Define default configuration ctx := Context{ - ProgName: cliCtx.App.Name, - Log: log, - Exit: make(chan os.Signal, 1), + ProgName: cliCtx.App.Name, + Log: log, + LogLevelSilly: (sillyLog && sillyVal == "1"), + Exit: make(chan os.Signal, 1), - webServer: nil, + webServer: nil, xdsServers: make(map[string]*XdsServer), - events: nil, + events: nil, } // register handler on SIGTERM / exit diff --git a/lib/agent/apiv1-exec.go b/lib/agent/apiv1-exec.go index 83ec7aa..37070f7 100644 --- a/lib/agent/apiv1-exec.go +++ b/lib/agent/apiv1-exec.go @@ -7,13 +7,17 @@ import ( "github.com/gin-gonic/gin" common "github.com/iotbzh/xds-common/golib" + uuid "github.com/satori/go.uuid" ) -// Only define useful fields +// ExecArgs Only define used fields type ExecArgs struct { - ID string `json:"id" binding:"required"` + ID string `json:"id" binding:"required"` + CmdID string `json:"cmdID"` // command unique ID } +var execCmdID = 1 + // ExecCmd executes remotely a command func (s *APIService) execCmd(c *gin.Context) { s._execRequest("/exec", c) @@ -24,24 +28,26 @@ func (s *APIService) execSignalCmd(c *gin.Context) { s._execRequest("/signal", c) } -func (s *APIService) _execRequest(url string, c *gin.Context) { +func (s *APIService) _execRequest(cmd string, c *gin.Context) { data, err := c.GetRawData() if err != nil { common.APIError(c, err.Error()) } + args := ExecArgs{} + // XXX - we cannot use c.BindJSON, so directly unmarshall it + // (see https://github.com/gin-gonic/gin/issues/1078) + if err := json.Unmarshal(data, &args); err != nil { + common.APIError(c, "Invalid arguments") + return + } + // First get Project ID to retrieve Server ID and send command to right server id := c.Param("id") if id == "" { - args := ExecArgs{} - // XXX - we cannot use c.BindJSON, so directly unmarshall it - // (see https://github.com/gin-gonic/gin/issues/1078) - if err := json.Unmarshal(data, &args); err != nil { - common.APIError(c, "Invalid arguments") - return - } id = args.ID } + prj := s.projects.Get(id) if prj == nil { common.APIError(c, "Unknown id") @@ -68,21 +74,47 @@ func (s *APIService) _execRequest(url string, c *gin.Context) { // Forward XDSServer WS events to client WS // TODO removed static event name list and get it from XDSServer - for _, evName := range []string{ + evtList := []string{ "exec:input", "exec:output", - "exec:exit", "exec:inferior-input", "exec:inferior-output", - } { + } + so := *sock + fwdFuncID := []uuid.UUID{} + for _, evName := range evtList { evN := evName - svr.EventOn(evN, func(evData interface{}) { - (*sock).Emit(evN, evData) - }) + fwdFunc := func(evData interface{}) { + // Forward event to Client/Dashboard + so.Emit(evN, evData) + } + id, err := svr.EventOn(evN, fwdFunc) + if err != nil { + common.APIError(c, err.Error()) + return + } + fwdFuncID = append(fwdFuncID, id) + } + + // Handle Exit event separately to cleanup registered listener + var exitFuncID uuid.UUID + exitFunc := func(evData interface{}) { + so.Emit("exec:exit", evData) + + // cleanup listener + for i, evName := range evtList { + svr.EventOff(evName, fwdFuncID[i]) + } + svr.EventOff("exec:exit", exitFuncID) + } + exitFuncID, err = svr.EventOn("exec:exit", exitFunc) + if err != nil { + common.APIError(c, err.Error()) + return } // Forward back command to right server - response, err := svr.HTTPPostBody(url, string(data)) + response, err := svr.SendCommand(cmd, data) if err != nil { common.APIError(c, err.Error()) return diff --git a/lib/agent/apiv1-version.go b/lib/agent/apiv1-version.go index c2387c1..6b4923f 100644 --- a/lib/agent/apiv1-version.go +++ b/lib/agent/apiv1-version.go @@ -33,7 +33,7 @@ func (s *APIService) getVersion(c *gin.Context) { svrVer := []version{} for _, svr := range s.xdsServers { res := version{} - if err := svr.HTTPGet("/version", &res); err != nil { + if err := svr.GetVersion(&res); err != nil { common.APIError(c, "Cannot retrieve version of XDS server ID %s : %v", svr.ID, err.Error()) return } diff --git a/lib/agent/events.go b/lib/agent/events.go index 24efc5a..e66f758 100644 --- a/lib/agent/events.go +++ b/lib/agent/events.go @@ -18,7 +18,7 @@ const ( EVTProjectChange = "project-state-change" // data type ProjectConfig ) -var EVTAllList = []string{ +var _EVTAllList = []string{ EVTServerConfig, EVTProjectAdd, EVTProjectDelete, @@ -33,7 +33,6 @@ type EventMsg struct { } type EventDef struct { - // SEB cbs []EventsCB sids map[string]int } @@ -45,7 +44,7 @@ type Events struct { // NewEvents creates an instance of Events func NewEvents(ctx *Context) *Events { evMap := make(map[string]*EventDef) - for _, ev := range EVTAllList { + for _, ev := range _EVTAllList { evMap[ev] = &EventDef{ sids: make(map[string]int), } @@ -58,12 +57,12 @@ func NewEvents(ctx *Context) *Events { // GetList returns the list of all supported events func (e *Events) GetList() []string { - return EVTAllList + return _EVTAllList } // Register Used by a client/session to register to a specific (or all) event(s) func (e *Events) Register(evName, sessionID string) error { - evs := EVTAllList + evs := _EVTAllList if evName != EVTAll { if _, ok := e.eventsMap[evName]; !ok { return fmt.Errorf("Unsupported event type name") @@ -78,7 +77,7 @@ func (e *Events) Register(evName, sessionID string) error { // UnRegister Used by a client/session to unregister event(s) func (e *Events) UnRegister(evName, sessionID string) error { - evs := EVTAllList + evs := _EVTAllList if evName != EVTAll { if _, ok := e.eventsMap[evName]; !ok { return fmt.Errorf("Unsupported event type name") @@ -102,7 +101,9 @@ func (e *Events) Emit(evName string, data interface{}) error { return fmt.Errorf("Unsupported event type") } - e.Log.Debugf("Emit Event %s: %v", evName, data) + if e.LogLevelSilly { + e.Log.Debugf("Emit Event %s: %v", evName, data) + } firstErr = nil evm := e.eventsMap[evName] diff --git a/lib/agent/project-interface.go b/lib/agent/project-interface.go index 031e1d9..0a4a17e 100644 --- a/lib/agent/project-interface.go +++ b/lib/agent/project-interface.go @@ -18,19 +18,15 @@ const ( StatusSyncing = "Syncing" ) -type EventCBData map[string]interface{} -type EventCB func(cfg *ProjectConfig, data *EventCBData) - // IPROJECT Project interface type IPROJECT interface { - Add(cfg ProjectConfig) (*ProjectConfig, error) // Add a new project - Delete() error // Delete a project - GetProject() *ProjectConfig // Get project public configuration - SetProject(prj ProjectConfig) *ProjectConfig // Set project configuration - GetServer() *XdsServer // Get XdsServer that holds this project - GetFullPath(dir string) string // Get project full path - Sync() error // Force project files synchronization - IsInSync() (bool, error) // Check if project files are in-sync + Add(cfg ProjectConfig) (*ProjectConfig, error) // Add a new project + Delete() error // Delete a project + GetProject() *ProjectConfig // Get project public configuration + UpdateProject(prj ProjectConfig) (*ProjectConfig, error) // Update project configuration + GetServer() *XdsServer // Get XdsServer that holds this project + Sync() error // Force project files synchronization + IsInSync() (bool, error) // Check if project files are in-sync } // ProjectConfig is the config for one project diff --git a/lib/agent/project-pathmap.go b/lib/agent/project-pathmap.go index 1de8e11..6c49d6a 100644 --- a/lib/agent/project-pathmap.go +++ b/lib/agent/project-pathmap.go @@ -1,16 +1,12 @@ package agent -import ( - "path/filepath" -) - // IPROJECT interface implementation for native/path mapping projects // PathMap . type PathMap struct { *Context server *XdsServer - folder *FolderConfig + folder *XdsFolderConfig } // NewProjectPathMap Create a new instance of PathMap @@ -18,7 +14,7 @@ func NewProjectPathMap(ctx *Context, svr *XdsServer) *PathMap { p := PathMap{ Context: ctx, server: svr, - folder: &FolderConfig{}, + folder: &XdsFolderConfig{}, } return &p } @@ -49,10 +45,14 @@ func (p *PathMap) GetProject() *ProjectConfig { return &prj } -// SetProject Set project config -func (p *PathMap) SetProject(prj ProjectConfig) *ProjectConfig { +// UpdateProject Set project config +func (p *PathMap) UpdateProject(prj ProjectConfig) (*ProjectConfig, error) { p.folder = p.server.ProjectToFolder(prj) - return p.GetProject() + np := p.GetProject() + if err := p.events.Emit(EVTProjectChange, np); err != nil { + return np, err + } + return np, nil } // GetServer Get the XdsServer that holds this project @@ -60,14 +60,6 @@ func (p *PathMap) GetServer() *XdsServer { return p.server } -// GetFullPath returns the full path of a directory (from server POV) -func (p *PathMap) GetFullPath(dir string) string { - if &dir == nil { - return p.folder.DataPathMap.ServerPath - } - return filepath.Join(p.folder.DataPathMap.ServerPath, dir) -} - // Sync Force project files synchronization func (p *PathMap) Sync() error { return nil diff --git a/lib/agent/project-st.go b/lib/agent/project-st.go index 28a287c..c0d2550 100644 --- a/lib/agent/project-st.go +++ b/lib/agent/project-st.go @@ -1,16 +1,17 @@ package agent -import "github.com/iotbzh/xds-agent/lib/syncthing" - -// SEB TODO +import ( + st "github.com/iotbzh/xds-agent/lib/syncthing" +) // IPROJECT interface implementation for syncthing projects // STProject . type STProject struct { *Context - server *XdsServer - folder *FolderConfig + server *XdsServer + folder *XdsFolderConfig + eventIDs []int } // NewProjectST Create a new instance of STProject @@ -18,7 +19,7 @@ func NewProjectST(ctx *Context, svr *XdsServer) *STProject { p := STProject{ Context: ctx, server: svr, - folder: &FolderConfig{}, + folder: &XdsFolderConfig{}, } return &p } @@ -27,6 +28,7 @@ func NewProjectST(ctx *Context, svr *XdsServer) *STProject { func (p *STProject) Add(cfg ProjectConfig) (*ProjectConfig, error) { var err error + // Add project/folder into XDS Server err = p.server.FolderAdd(p.server.ProjectToFolder(cfg), p.folder) if err != nil { return nil, err @@ -34,19 +36,37 @@ func (p *STProject) Add(cfg ProjectConfig) (*ProjectConfig, error) { svrPrj := p.GetProject() // Declare project into local Syncthing - p.SThg.FolderChange(st.FolderChangeArg{ - ID: cfg.ID, - Label: cfg.Label, + id, err := p.SThg.FolderChange(st.FolderChangeArg{ + ID: svrPrj.ID, + Label: svrPrj.Label, RelativePath: cfg.ClientPath, SyncThingID: p.server.ServerConfig.Builder.SyncThingID, }) + if err != nil { + return nil, err + } - return svrPrj, nil + locPrj, err := p.SThg.FolderConfigGet(id) + if err != nil { + svrPrj.Status = StatusErrorConfig + return nil, err + } + if svrPrj.ID != locPrj.ID { + p.Log.Errorf("Project ID in XDSServer and local ST differ: %s != %s", svrPrj.ID, locPrj.ID) + } + + // Use Update function to setup remains fields + return p.UpdateProject(*svrPrj) } // Delete a project func (p *STProject) Delete() error { - return p.server.FolderDelete(p.folder.ID) + errSvr := p.server.FolderDelete(p.folder.ID) + errLoc := p.SThg.FolderDelete(p.folder.ID) + if errSvr != nil { + return errSvr + } + return errLoc } // GetProject Get public part of project config @@ -56,38 +76,111 @@ func (p *STProject) GetProject() *ProjectConfig { return &prj } -// SetProject Set project config -func (p *STProject) SetProject(prj ProjectConfig) *ProjectConfig { - // SEB TODO +// UpdateProject Update project config +func (p *STProject) UpdateProject(prj ProjectConfig) (*ProjectConfig, error) { + // Update folder p.folder = p.server.ProjectToFolder(prj) - return p.GetProject() + svrPrj := p.GetProject() + + // Register events to update folder status + // Register to XDS Server events + p.server.EventOn("event:FolderStateChanged", p._cbServerFolderChanged) + if err := p.server.EventRegister("FolderStateChanged", svrPrj.ID); err != nil { + p.Log.Warningf("XDS Server EventRegister failed: %v", err) + return svrPrj, err + } + + // Register to Local Syncthing events + for _, evName := range []string{st.EventStateChanged, st.EventFolderPaused} { + evID, err := p.SThg.Events.Register(evName, p._cbLocalSTEvents, svrPrj.ID, nil) + if err != nil { + return nil, err + } + p.eventIDs = append(p.eventIDs, evID) + } + + return svrPrj, nil } // GetServer Get the XdsServer that holds this project func (p *STProject) GetServer() *XdsServer { - // SEB TODO return p.server } -// GetFullPath returns the full path of a directory (from server POV) -func (p *STProject) GetFullPath(dir string) string { - /* SEB - if &dir == nil { - return p.folder.DataSTProject.ServerPath - } - return filepath.Join(p.folder.DataSTProject.ServerPath, dir) - */ - return "SEB TODO" -} - // Sync Force project files synchronization func (p *STProject) Sync() error { - // SEB TODO - return nil + if err := p.server.FolderSync(p.folder.ID); err != nil { + return err + } + return p.SThg.FolderScan(p.folder.ID, "") } // IsInSync Check if project files are in-sync func (p *STProject) IsInSync() (bool, error) { - // SEB TODO - return false, nil + // Should be up-to-date by callbacks (see below) + return p.folder.IsInSync, nil +} + +/** +** Private functions +***/ + +// callback use to update (XDS Server) folder IsInSync status + +func (p *STProject) _cbServerFolderChanged(data interface{}) { + evt := data.(XdsEventFolderChange) + + // Only process event that concerns this project/folder ID + if p.folder.ID != evt.Folder.ID { + return + } + + if evt.Folder.IsInSync != p.folder.DataCloudSync.STSvrIsInSync || + evt.Folder.Status != p.folder.DataCloudSync.STSvrStatus { + + p.folder.DataCloudSync.STSvrIsInSync = evt.Folder.IsInSync + p.folder.DataCloudSync.STSvrStatus = evt.Folder.Status + + if err := p.events.Emit(EVTProjectChange, p.server.FolderToProject(*p.folder)); err != nil { + p.Log.Warningf("Cannot notify project change: %v", err) + } + } +} + +// callback use to update IsInSync status +func (p *STProject) _cbLocalSTEvents(ev st.Event, data *st.EventsCBData) { + + inSync := p.folder.DataCloudSync.STLocIsInSync + sts := p.folder.DataCloudSync.STLocStatus + prevSync := inSync + prevStatus := sts + + switch ev.Type { + + case st.EventStateChanged: + to := ev.Data["to"] + switch to { + case "scanning", "syncing": + sts = StatusSyncing + case "idle": + sts = StatusEnable + } + inSync = (to == "idle") + + case st.EventFolderPaused: + if sts == StatusEnable { + sts = StatusPause + } + inSync = false + } + + if prevSync != inSync || prevStatus != sts { + + p.folder.DataCloudSync.STLocIsInSync = inSync + p.folder.DataCloudSync.STLocStatus = sts + + if err := p.events.Emit(EVTProjectChange, p.server.FolderToProject(*p.folder)); err != nil { + p.Log.Warningf("Cannot notify project change: %v", err) + } + } } diff --git a/lib/agent/projects.go b/lib/agent/projects.go index 39c120f..5e20395 100644 --- a/lib/agent/projects.go +++ b/lib/agent/projects.go @@ -14,16 +14,8 @@ type Projects struct { *Context SThg *st.SyncThing projects map[string]*IPROJECT - //SEB registerCB []RegisteredCB } -/* SEB -type RegisteredCB struct { - cb *EventCB - data *EventCBData -} -*/ - // Mutex to make add/delete atomic var pjMutex = sync.NewMutex() @@ -33,7 +25,6 @@ func NewProjects(ctx *Context, st *st.SyncThing) *Projects { Context: ctx, SThg: st, projects: make(map[string]*IPROJECT), - //registerCB: []RegisteredCB{}, } } @@ -51,26 +42,19 @@ func (p *Projects) Init(server *XdsServer) error { if svr.Disabled { continue } - xFlds := []FolderConfig{} - if err := svr.HTTPGet("/folders", &xFlds); err != nil { + xFlds := []XdsFolderConfig{} + if err := svr.GetFolders(&xFlds); err != nil { errMsg += fmt.Sprintf("Cannot retrieve folders config of XDS server ID %s : %v \n", svr.ID, err.Error()) continue } - p.Log.Debugf("Server %s, %d projects detected", svr.ID[:8], len(xFlds)) + p.Log.Debugf("Connected to XDS Server %s, %d projects detected", svr.ID, len(xFlds)) for _, prj := range xFlds { newP := svr.FolderToProject(prj) - if /*nPrj*/ _, err := p.createUpdate(newP, false, true); err != nil { + if _, err := p.createUpdate(newP, false, true); err != nil { errMsg += "Error while creating project id " + prj.ID + ": " + err.Error() + "\n " continue } - - /* FIXME emit EVTProjectChange event ? - if err := p.events.Emit(EVTProjectChange, *nPrj); err != nil { - p.Log.Warningf("Cannot notify project change: %v", err) - } - */ } - } p.Log.Infof("Number of loaded Projects: %d", len(p.projects)) @@ -161,7 +145,6 @@ func (p *Projects) createUpdate(newF ProjectConfig, create bool, initial bool) ( // SYNCTHING case TypeCloudSync: if p.SThg != nil { - /*SEB fld = f.SThg.NewFolderST(f.Conf)*/ fld = NewProjectST(p.Context, svr) } else { return nil, fmt.Errorf("Cloud Sync project not supported") @@ -179,12 +162,16 @@ func (p *Projects) createUpdate(newF ProjectConfig, create bool, initial bool) ( // Add project on server if newPrj, err = fld.Add(newF); err != nil { newF.Status = StatusErrorConfig - log.Printf("ERROR Adding folder: %v\n", err) + log.Printf("ERROR Adding project: %v\n", err) return newPrj, err } } else { // Just update project config - newPrj = fld.SetProject(newF) + if newPrj, err = fld.UpdateProject(newF); err != nil { + newF.Status = StatusErrorConfig + log.Printf("ERROR Updating project: %v\n", err) + return newPrj, err + } } // Sanity check diff --git a/lib/agent/session.go b/lib/agent/session.go index e50abe1..06789d5 100644 --- a/lib/agent/session.go +++ b/lib/agent/session.go @@ -194,15 +194,13 @@ func (s *Sessions) refresh(sid string) { } func (s *Sessions) monitorSessMap() { - const dbgFullTrace = false // for debugging - for { select { case <-s.stop: s.Log.Debugln("Stop monitorSessMap") return case <-time.After(sessionMonitorTime * time.Second): - if dbgFullTrace { + if s.LogLevelSilly { s.Log.Debugf("Sessions Map size: %d", len(s.sessMap)) s.Log.Debugf("Sessions Map : %v", s.sessMap) } @@ -214,7 +212,9 @@ func (s *Sessions) monitorSessMap() { s.mutex.Lock() for _, ss := range s.sessMap { if ss.expireAt.Sub(time.Now()) < 0 { - //SEB DEBUG s.Log.Debugf("Delete expired session id: %s", ss.ID) + if s.LogLevelSilly { + s.Log.Debugf("Delete expired session id: %s", ss.ID) + } delete(s.sessMap, ss.ID) } } diff --git a/lib/agent/webserver.go b/lib/agent/webserver.go index ead06d1..4b2e024 100644 --- a/lib/agent/webserver.go +++ b/lib/agent/webserver.go @@ -148,11 +148,9 @@ func (s *WebServer) middlewareXDSDetails() gin.HandlerFunc { } } -/* SEB func (s *WebServer) isValidAPIKey(key string) bool { - return (key == s.Config.FileConf.XDSAPIKey && key != "") + return (s.Config.FileConf.XDSAPIKey != "" && key == s.Config.FileConf.XDSAPIKey) } -*/ func (s *WebServer) middlewareCSRF() gin.HandlerFunc { return func(c *gin.Context) { diff --git a/lib/agent/xdsserver.go b/lib/agent/xdsserver.go index 518c68b..c900c9e 100644 --- a/lib/agent/xdsserver.go +++ b/lib/agent/xdsserver.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "net/http" "strings" + "sync" "time" "github.com/gin-gonic/gin" @@ -16,7 +17,7 @@ import ( sio_client "github.com/sebd71/go-socket.io-client" ) -// Server . +// XdsServer . type XdsServer struct { *Context ID string @@ -26,11 +27,13 @@ type XdsServer struct { ConnRetry int Connected bool Disabled bool - ServerConfig *xdsServerConfig + ServerConfig *XdsServerConfig - // callbacks + // Events management CBOnError func(error) CBOnDisconnect func(error) + sockEvents map[string][]*caller + sockEventsLock *sync.Mutex // Private fields client *common.HTTPClient @@ -39,25 +42,25 @@ type XdsServer struct { apiRouter *gin.RouterGroup } -// xdsServerConfig Data return by GET /config -type xdsServerConfig struct { +// XdsServerConfig Data return by GET /config +type XdsServerConfig struct { ID string `json:"id"` Version string `json:"version"` APIVersion string `json:"apiVersion"` VersionGitTag string `json:"gitTag"` SupportedSharing map[string]bool `json:"supportedSharing"` - Builder xdsBuilderConfig `json:"builder"` + Builder XdsBuilderConfig `json:"builder"` } -// xdsBuilderConfig represents the builder container configuration -type xdsBuilderConfig struct { +// XdsBuilderConfig represents the builder container configuration +type XdsBuilderConfig struct { IP string `json:"ip"` Port string `json:"port"` SyncThingID string `json:"syncThingID"` } -// FolderType XdsServer folder type -type FolderType string +// XdsFolderType XdsServer folder type +type XdsFolderType string const ( XdsTypePathMap = "PathMap" @@ -65,28 +68,52 @@ const ( XdsTypeCifsSmb = "CIFS" ) -// FolderConfig XdsServer folder config -type FolderConfig struct { - ID string `json:"id"` - Label string `json:"label"` - ClientPath string `json:"path"` - Type FolderType `json:"type"` - Status string `json:"status"` - IsInSync bool `json:"isInSync"` - DefaultSdk string `json:"defaultSdk"` +// XdsFolderConfig XdsServer folder config +type XdsFolderConfig struct { + ID string `json:"id"` + Label string `json:"label"` + ClientPath string `json:"path"` + Type XdsFolderType `json:"type"` + Status string `json:"status"` + IsInSync bool `json:"isInSync"` + DefaultSdk string `json:"defaultSdk"` // Specific data depending on which Type is used - DataPathMap PathMapConfig `json:"dataPathMap,omitempty"` - DataCloudSync CloudSyncConfig `json:"dataCloudSync,omitempty"` + DataPathMap XdsPathMapConfig `json:"dataPathMap,omitempty"` + DataCloudSync XdsCloudSyncConfig `json:"dataCloudSync,omitempty"` } -// PathMapConfig Path mapping specific data -type PathMapConfig struct { +// XdsPathMapConfig Path mapping specific data +type XdsPathMapConfig struct { ServerPath string `json:"serverPath"` } -// CloudSyncConfig CloudSync (AKA Syncthing) specific data -type CloudSyncConfig struct { - SyncThingID string `json:"syncThingID"` +// XdsCloudSyncConfig CloudSync (AKA Syncthing) specific data +type XdsCloudSyncConfig struct { + SyncThingID string `json:"syncThingID"` + STSvrStatus string `json:"-"` + STSvrIsInSync bool `json:"-"` + STLocStatus string `json:"-"` + STLocIsInSync bool `json:"-"` +} + +// XdsEventRegisterArgs arguments used to register to XDS server events +type XdsEventRegisterArgs struct { + Name string `json:"name"` + ProjectID string `json:"filterProjectID"` +} + +// XdsEventFolderChange Folder change event structure +type XdsEventFolderChange struct { + Time string `json:"time"` + Type string `json:"type"` + Folder XdsFolderConfig `json:"folder"` +} + +// caller Used to chain event listeners +type caller struct { + id uuid.UUID + EventName string + Func func(interface{}) } const _IDTempoPrefix = "tempo-" @@ -103,7 +130,9 @@ func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer { Connected: false, Disabled: false, - logOut: ctx.Log.Out, + sockEvents: make(map[string][]*caller), + sockEventsLock: &sync.Mutex{}, + logOut: ctx.Log.Out, } } @@ -138,7 +167,7 @@ func (xs *XdsServer) Connect() error { time.Sleep(time.Second) } if retry == 0 { - // FIXME SEB: re-use _reconnect to wait longer in background + // FIXME: re-use _reconnect to wait longer in background return fmt.Errorf("Connection to XDS Server failure") } if err != nil { @@ -161,16 +190,35 @@ func (xs *XdsServer) SetLoggerOutput(out io.Writer) { xs.logOut = out } +// SendCommand Send a command to XDS Server +func (xs *XdsServer) SendCommand(cmd string, body []byte) (*http.Response, error) { + url := cmd + if !strings.HasPrefix("/", cmd) { + url = "/" + cmd + } + return xs.client.HTTPPostWithRes(url, string(body)) +} + +// GetVersion Send Get request to retrieve XDS Server version +func (xs *XdsServer) GetVersion(res interface{}) error { + return xs._HTTPGet("/version", &res) +} + +// GetFolders Send GET request to get current folder configuration +func (xs *XdsServer) GetFolders(prjs *[]XdsFolderConfig) error { + return xs._HTTPGet("/folders", prjs) +} + // FolderAdd Send POST request to add a folder -func (xs *XdsServer) FolderAdd(prj *FolderConfig, res interface{}) error { - response, err := xs.HTTPPost("/folder", prj) +func (xs *XdsServer) FolderAdd(prj *XdsFolderConfig, res interface{}) error { + response, err := xs._HTTPPost("/folder", prj) if err != nil { return err } if response.StatusCode != 200 { return fmt.Errorf("FolderAdd error status=%s", response.Status) } - // Result is a FolderConfig that is equivalent to ProjectConfig + // Result is a XdsFolderConfig that is equivalent to ProjectConfig err = json.Unmarshal(xs.client.ResponseToBArray(response), res) return err @@ -181,27 +229,9 @@ func (xs *XdsServer) FolderDelete(id string) error { return xs.client.HTTPDelete("/folder/" + id) } -// HTTPGet . -func (xs *XdsServer) HTTPGet(url string, data interface{}) error { - var dd []byte - if err := xs.client.HTTPGet(url, &dd); err != nil { - return err - } - return json.Unmarshal(dd, &data) -} - -// HTTPPost . -func (xs *XdsServer) HTTPPost(url string, data interface{}) (*http.Response, error) { - body, err := json.Marshal(data) - if err != nil { - return nil, err - } - return xs.HTTPPostBody(url, string(body)) -} - -// HTTPPostBody . -func (xs *XdsServer) HTTPPostBody(url string, body string) (*http.Response, error) { - return xs.client.HTTPPostWithRes(url, body) +// FolderSync Send POST request to force synchronization of a folder +func (xs *XdsServer) FolderSync(id string) error { + return xs.client.HTTPPost("/folder/sync/"+id, "") } // SetAPIRouterGroup . @@ -218,7 +248,7 @@ func (xs *XdsServer) PassthroughGet(url string) { xs.apiRouter.GET(url, func(c *gin.Context) { var data interface{} - if err := xs.HTTPGet(url, &data); err != nil { + if err := xs._HTTPGet(url, &data); err != nil { if strings.Contains(err.Error(), "connection refused") { xs.Connected = false xs._NotifyState() @@ -246,7 +276,7 @@ func (xs *XdsServer) PassthroughPost(url string) { return } - response, err := xs.HTTPPostBody(url, string(bodyReq[:n])) + response, err := xs._HTTPPost(url, bodyReq[:n]) if err != nil { common.APIError(c, err.Error()) return @@ -260,49 +290,132 @@ func (xs *XdsServer) PassthroughPost(url string) { }) } +// EventRegister Post a request to register to an XdsServer event +func (xs *XdsServer) EventRegister(evName string, id string) error { + var err error + _, err = xs._HTTPPost("/events/register", XdsEventRegisterArgs{ + Name: evName, + ProjectID: id, + }) + return err +} + // EventOn Register a callback on events reception -func (xs *XdsServer) EventOn(message string, f interface{}) (err error) { +func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, error) { if xs.ioSock == nil { - return fmt.Errorf("Io.Socket not initialized") + return uuid.Nil, fmt.Errorf("Io.Socket not initialized") } - // FIXME SEB: support chain / multiple listeners - /* sockEvents map[string][]*caller + xs.sockEventsLock.Lock() - xs.sockEvents[message] = append(xs.sockEvents[message], f) - xs.sockEventsLock.Unlock() - xs.ioSock.On(message, func(ev) { + defer xs.sockEventsLock.Unlock() + + if _, exist := xs.sockEvents[evName]; !exist { + // Register listener only the first time + evn := evName + + // FIXME: use generic type: data interface{} instead of data XdsEventFolderChange + var err error + if evName == "event:FolderStateChanged" { + err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error { + xs.sockEventsLock.Lock() + defer xs.sockEventsLock.Unlock() + for _, c := range xs.sockEvents[evn] { + c.Func(data) + } + return nil + }) + } else { + err = xs.ioSock.On(evn, f) + } + if err != nil { + return uuid.Nil, err + } + } - }) - */ - return xs.ioSock.On(message, f) + c := &caller{ + id: uuid.NewV1(), + EventName: evName, + Func: f, + } + + xs.sockEvents[evName] = append(xs.sockEvents[evName], c) + return c.id, nil +} + +// EventOff Un-register a (or all) callbacks associated to an event +func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error { + xs.sockEventsLock.Lock() + defer xs.sockEventsLock.Unlock() + if _, exist := xs.sockEvents[evName]; exist { + if id == uuid.Nil { + // Un-register all + xs.sockEvents[evName] = []*caller{} + } else { + // Un-register only the specified callback + for i, ff := range xs.sockEvents[evName] { + if uuid.Equal(ff.id, id) { + xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...) + break + } + } + } + } + return nil } -// ProjectToFolder -func (xs *XdsServer) ProjectToFolder(pPrj ProjectConfig) *FolderConfig { +// ProjectToFolder Convert Project structure to Folder structure +func (xs *XdsServer) ProjectToFolder(pPrj ProjectConfig) *XdsFolderConfig { stID := "" if pPrj.Type == XdsTypeCloudSync { stID, _ = xs.SThg.IDGet() } - fPrj := FolderConfig{ + fPrj := XdsFolderConfig{ ID: pPrj.ID, Label: pPrj.Label, ClientPath: pPrj.ClientPath, - Type: FolderType(pPrj.Type), + Type: XdsFolderType(pPrj.Type), Status: pPrj.Status, IsInSync: pPrj.IsInSync, DefaultSdk: pPrj.DefaultSdk, - DataPathMap: PathMapConfig{ + DataPathMap: XdsPathMapConfig{ ServerPath: pPrj.ServerPath, }, - DataCloudSync: CloudSyncConfig{ - SyncThingID: stID, + DataCloudSync: XdsCloudSyncConfig{ + SyncThingID: stID, + STLocIsInSync: pPrj.IsInSync, + STLocStatus: pPrj.Status, + STSvrIsInSync: pPrj.IsInSync, + STSvrStatus: pPrj.Status, }, } + return &fPrj } -// FolderToProject -func (xs *XdsServer) FolderToProject(fPrj FolderConfig) ProjectConfig { +// FolderToProject Convert Folder structure to Project structure +func (xs *XdsServer) FolderToProject(fPrj XdsFolderConfig) ProjectConfig { + inSync := fPrj.IsInSync + sts := fPrj.Status + + if fPrj.Type == XdsTypeCloudSync { + inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync + + sts = fPrj.DataCloudSync.STSvrStatus + switch fPrj.DataCloudSync.STLocStatus { + case StatusErrorConfig, StatusDisable, StatusPause: + sts = fPrj.DataCloudSync.STLocStatus + break + case StatusSyncing: + if sts != StatusErrorConfig && sts != StatusDisable && sts != StatusPause { + sts = StatusSyncing + } + break + case StatusEnable: + // keep STSvrStatus + break + } + } + pPrj := ProjectConfig{ ID: fPrj.ID, ServerID: xs.ID, @@ -310,8 +423,8 @@ func (xs *XdsServer) FolderToProject(fPrj FolderConfig) ProjectConfig { ClientPath: fPrj.ClientPath, ServerPath: fPrj.DataPathMap.ServerPath, Type: ProjectType(fPrj.Type), - Status: fPrj.Status, - IsInSync: fPrj.IsInSync, + Status: sts, + IsInSync: inSync, DefaultSdk: fPrj.DefaultSdk, } return pPrj @@ -350,6 +463,24 @@ func (xs *XdsServer) _CreateConnectHTTP() error { return nil } +// _HTTPGet . +func (xs *XdsServer) _HTTPGet(url string, data interface{}) error { + var dd []byte + if err := xs.client.HTTPGet(url, &dd); err != nil { + return err + } + return json.Unmarshal(dd, &data) +} + +// _HTTPPost . +func (xs *XdsServer) _HTTPPost(url string, data interface{}) (*http.Response, error) { + body, err := json.Marshal(data) + if err != nil { + return nil, err + } + return xs.client.HTTPPostWithRes(url, string(body)) +} + // Re-established connection func (xs *XdsServer) _reconnect() error { err := xs._connect(true) @@ -363,8 +494,8 @@ func (xs *XdsServer) _reconnect() error { // Established HTTP and WS connection and retrieve XDSServer config func (xs *XdsServer) _connect(reConn bool) error { - xdsCfg := xdsServerConfig{} - if err := xs.HTTPGet("/config", &xdsCfg); err != nil { + xdsCfg := XdsServerConfig{} + if err := xs._HTTPGet("/config", &xdsCfg); err != nil { xs.Connected = false if !reConn { xs._NotifyState() -- cgit 1.2.3-korg