summaryrefslogtreecommitdiffstats
path: root/lib/agent
diff options
context:
space:
mode:
Diffstat (limited to 'lib/agent')
-rw-r--r--lib/agent/agent.go36
-rw-r--r--lib/agent/apiv1-exec.go66
-rw-r--r--lib/agent/apiv1-version.go2
-rw-r--r--lib/agent/events.go15
-rw-r--r--lib/agent/project-interface.go18
-rw-r--r--lib/agent/project-pathmap.go26
-rw-r--r--lib/agent/project-st.go155
-rw-r--r--lib/agent/projects.go33
-rw-r--r--lib/agent/session.go8
-rw-r--r--lib/agent/webserver.go4
-rw-r--r--lib/agent/xdsserver.go285
11 files changed, 441 insertions, 207 deletions
diff --git a/lib/agent/agent.go b/lib/agent/agent.go
index 29b0622..3bdd89f 100644
--- a/lib/agent/agent.go
+++ b/lib/agent/agent.go
@@ -20,18 +20,19 @@ const cookieMaxAge = "3600"
// Context holds the Agent context structure
type Context struct {
- ProgName string
- Config *xdsconfig.Config
- Log *logrus.Logger
- SThg *st.SyncThing
- SThgCmd *exec.Cmd
- SThgInotCmd *exec.Cmd
-
- webServer *WebServer
+ ProgName string
+ Config *xdsconfig.Config
+ Log *logrus.Logger
+ LogLevelSilly bool
+ SThg *st.SyncThing
+ SThgCmd *exec.Cmd
+ SThgInotCmd *exec.Cmd
+
+ webServer *WebServer
xdsServers map[string]*XdsServer
- sessions *Sessions
- events *Events
- projects *Projects
+ sessions *Sessions
+ events *Events
+ projects *Projects
Exit chan os.Signal
}
@@ -53,15 +54,18 @@ func NewAgent(cliCtx *cli.Context) *Context {
}
log.Formatter = &logrus.TextFormatter{}
+ sillyVal, sillyLog := os.LookupEnv("XDS_LOG_SILLY")
+
// Define default configuration
ctx := Context{
- ProgName: cliCtx.App.Name,
- Log: log,
- Exit: make(chan os.Signal, 1),
+ ProgName: cliCtx.App.Name,
+ Log: log,
+ LogLevelSilly: (sillyLog && sillyVal == "1"),
+ Exit: make(chan os.Signal, 1),
- webServer: nil,
+ webServer: nil,
xdsServers: make(map[string]*XdsServer),
- events: nil,
+ events: nil,
}
// register handler on SIGTERM / exit
diff --git a/lib/agent/apiv1-exec.go b/lib/agent/apiv1-exec.go
index 83ec7aa..37070f7 100644
--- a/lib/agent/apiv1-exec.go
+++ b/lib/agent/apiv1-exec.go
@@ -7,13 +7,17 @@ import (
"github.com/gin-gonic/gin"
common "github.com/iotbzh/xds-common/golib"
+ uuid "github.com/satori/go.uuid"
)
-// Only define useful fields
+// ExecArgs Only define used fields
type ExecArgs struct {
- ID string `json:"id" binding:"required"`
+ ID string `json:"id" binding:"required"`
+ CmdID string `json:"cmdID"` // command unique ID
}
+var execCmdID = 1
+
// ExecCmd executes remotely a command
func (s *APIService) execCmd(c *gin.Context) {
s._execRequest("/exec", c)
@@ -24,24 +28,26 @@ func (s *APIService) execSignalCmd(c *gin.Context) {
s._execRequest("/signal", c)
}
-func (s *APIService) _execRequest(url string, c *gin.Context) {
+func (s *APIService) _execRequest(cmd string, c *gin.Context) {
data, err := c.GetRawData()
if err != nil {
common.APIError(c, err.Error())
}
+ args := ExecArgs{}
+ // XXX - we cannot use c.BindJSON, so directly unmarshall it
+ // (see https://github.com/gin-gonic/gin/issues/1078)
+ if err := json.Unmarshal(data, &args); err != nil {
+ common.APIError(c, "Invalid arguments")
+ return
+ }
+
// First get Project ID to retrieve Server ID and send command to right server
id := c.Param("id")
if id == "" {
- args := ExecArgs{}
- // XXX - we cannot use c.BindJSON, so directly unmarshall it
- // (see https://github.com/gin-gonic/gin/issues/1078)
- if err := json.Unmarshal(data, &args); err != nil {
- common.APIError(c, "Invalid arguments")
- return
- }
id = args.ID
}
+
prj := s.projects.Get(id)
if prj == nil {
common.APIError(c, "Unknown id")
@@ -68,21 +74,47 @@ func (s *APIService) _execRequest(url string, c *gin.Context) {
// Forward XDSServer WS events to client WS
// TODO removed static event name list and get it from XDSServer
- for _, evName := range []string{
+ evtList := []string{
"exec:input",
"exec:output",
- "exec:exit",
"exec:inferior-input",
"exec:inferior-output",
- } {
+ }
+ so := *sock
+ fwdFuncID := []uuid.UUID{}
+ for _, evName := range evtList {
evN := evName
- svr.EventOn(evN, func(evData interface{}) {
- (*sock).Emit(evN, evData)
- })
+ fwdFunc := func(evData interface{}) {
+ // Forward event to Client/Dashboard
+ so.Emit(evN, evData)
+ }
+ id, err := svr.EventOn(evN, fwdFunc)
+ if err != nil {
+ common.APIError(c, err.Error())
+ return
+ }
+ fwdFuncID = append(fwdFuncID, id)
+ }
+
+ // Handle Exit event separately to cleanup registered listener
+ var exitFuncID uuid.UUID
+ exitFunc := func(evData interface{}) {
+ so.Emit("exec:exit", evData)
+
+ // cleanup listener
+ for i, evName := range evtList {
+ svr.EventOff(evName, fwdFuncID[i])
+ }
+ svr.EventOff("exec:exit", exitFuncID)
+ }
+ exitFuncID, err = svr.EventOn("exec:exit", exitFunc)
+ if err != nil {
+ common.APIError(c, err.Error())
+ return
}
// Forward back command to right server
- response, err := svr.HTTPPostBody(url, string(data))
+ response, err := svr.SendCommand(cmd, data)
if err != nil {
common.APIError(c, err.Error())
return
diff --git a/lib/agent/apiv1-version.go b/lib/agent/apiv1-version.go
index c2387c1..6b4923f 100644
--- a/lib/agent/apiv1-version.go
+++ b/lib/agent/apiv1-version.go
@@ -33,7 +33,7 @@ func (s *APIService) getVersion(c *gin.Context) {
svrVer := []version{}
for _, svr := range s.xdsServers {
res := version{}
- if err := svr.HTTPGet("/version", &res); err != nil {
+ if err := svr.GetVersion(&res); err != nil {
common.APIError(c, "Cannot retrieve version of XDS server ID %s : %v", svr.ID, err.Error())
return
}
diff --git a/lib/agent/events.go b/lib/agent/events.go
index 24efc5a..e66f758 100644
--- a/lib/agent/events.go
+++ b/lib/agent/events.go
@@ -18,7 +18,7 @@ const (
EVTProjectChange = "project-state-change" // data type ProjectConfig
)
-var EVTAllList = []string{
+var _EVTAllList = []string{
EVTServerConfig,
EVTProjectAdd,
EVTProjectDelete,
@@ -33,7 +33,6 @@ type EventMsg struct {
}
type EventDef struct {
- // SEB cbs []EventsCB
sids map[string]int
}
@@ -45,7 +44,7 @@ type Events struct {
// NewEvents creates an instance of Events
func NewEvents(ctx *Context) *Events {
evMap := make(map[string]*EventDef)
- for _, ev := range EVTAllList {
+ for _, ev := range _EVTAllList {
evMap[ev] = &EventDef{
sids: make(map[string]int),
}
@@ -58,12 +57,12 @@ func NewEvents(ctx *Context) *Events {
// GetList returns the list of all supported events
func (e *Events) GetList() []string {
- return EVTAllList
+ return _EVTAllList
}
// Register Used by a client/session to register to a specific (or all) event(s)
func (e *Events) Register(evName, sessionID string) error {
- evs := EVTAllList
+ evs := _EVTAllList
if evName != EVTAll {
if _, ok := e.eventsMap[evName]; !ok {
return fmt.Errorf("Unsupported event type name")
@@ -78,7 +77,7 @@ func (e *Events) Register(evName, sessionID string) error {
// UnRegister Used by a client/session to unregister event(s)
func (e *Events) UnRegister(evName, sessionID string) error {
- evs := EVTAllList
+ evs := _EVTAllList
if evName != EVTAll {
if _, ok := e.eventsMap[evName]; !ok {
return fmt.Errorf("Unsupported event type name")
@@ -102,7 +101,9 @@ func (e *Events) Emit(evName string, data interface{}) error {
return fmt.Errorf("Unsupported event type")
}
- e.Log.Debugf("Emit Event %s: %v", evName, data)
+ if e.LogLevelSilly {
+ e.Log.Debugf("Emit Event %s: %v", evName, data)
+ }
firstErr = nil
evm := e.eventsMap[evName]
diff --git a/lib/agent/project-interface.go b/lib/agent/project-interface.go
index 031e1d9..0a4a17e 100644
--- a/lib/agent/project-interface.go
+++ b/lib/agent/project-interface.go
@@ -18,19 +18,15 @@ const (
StatusSyncing = "Syncing"
)
-type EventCBData map[string]interface{}
-type EventCB func(cfg *ProjectConfig, data *EventCBData)
-
// IPROJECT Project interface
type IPROJECT interface {
- Add(cfg ProjectConfig) (*ProjectConfig, error) // Add a new project
- Delete() error // Delete a project
- GetProject() *ProjectConfig // Get project public configuration
- SetProject(prj ProjectConfig) *ProjectConfig // Set project configuration
- GetServer() *XdsServer // Get XdsServer that holds this project
- GetFullPath(dir string) string // Get project full path
- Sync() error // Force project files synchronization
- IsInSync() (bool, error) // Check if project files are in-sync
+ Add(cfg ProjectConfig) (*ProjectConfig, error) // Add a new project
+ Delete() error // Delete a project
+ GetProject() *ProjectConfig // Get project public configuration
+ UpdateProject(prj ProjectConfig) (*ProjectConfig, error) // Update project configuration
+ GetServer() *XdsServer // Get XdsServer that holds this project
+ Sync() error // Force project files synchronization
+ IsInSync() (bool, error) // Check if project files are in-sync
}
// ProjectConfig is the config for one project
diff --git a/lib/agent/project-pathmap.go b/lib/agent/project-pathmap.go
index 1de8e11..6c49d6a 100644
--- a/lib/agent/project-pathmap.go
+++ b/lib/agent/project-pathmap.go
@@ -1,16 +1,12 @@
package agent
-import (
- "path/filepath"
-)
-
// IPROJECT interface implementation for native/path mapping projects
// PathMap .
type PathMap struct {
*Context
server *XdsServer
- folder *FolderConfig
+ folder *XdsFolderConfig
}
// NewProjectPathMap Create a new instance of PathMap
@@ -18,7 +14,7 @@ func NewProjectPathMap(ctx *Context, svr *XdsServer) *PathMap {
p := PathMap{
Context: ctx,
server: svr,
- folder: &FolderConfig{},
+ folder: &XdsFolderConfig{},
}
return &p
}
@@ -49,10 +45,14 @@ func (p *PathMap) GetProject() *ProjectConfig {
return &prj
}
-// SetProject Set project config
-func (p *PathMap) SetProject(prj ProjectConfig) *ProjectConfig {
+// UpdateProject Set project config
+func (p *PathMap) UpdateProject(prj ProjectConfig) (*ProjectConfig, error) {
p.folder = p.server.ProjectToFolder(prj)
- return p.GetProject()
+ np := p.GetProject()
+ if err := p.events.Emit(EVTProjectChange, np); err != nil {
+ return np, err
+ }
+ return np, nil
}
// GetServer Get the XdsServer that holds this project
@@ -60,14 +60,6 @@ func (p *PathMap) GetServer() *XdsServer {
return p.server
}
-// GetFullPath returns the full path of a directory (from server POV)
-func (p *PathMap) GetFullPath(dir string) string {
- if &dir == nil {
- return p.folder.DataPathMap.ServerPath
- }
- return filepath.Join(p.folder.DataPathMap.ServerPath, dir)
-}
-
// Sync Force project files synchronization
func (p *PathMap) Sync() error {
return nil
diff --git a/lib/agent/project-st.go b/lib/agent/project-st.go
index 28a287c..c0d2550 100644
--- a/lib/agent/project-st.go
+++ b/lib/agent/project-st.go
@@ -1,16 +1,17 @@
package agent
-import "github.com/iotbzh/xds-agent/lib/syncthing"
-
-// SEB TODO
+import (
+ st "github.com/iotbzh/xds-agent/lib/syncthing"
+)
// IPROJECT interface implementation for syncthing projects
// STProject .
type STProject struct {
*Context
- server *XdsServer
- folder *FolderConfig
+ server *XdsServer
+ folder *XdsFolderConfig
+ eventIDs []int
}
// NewProjectST Create a new instance of STProject
@@ -18,7 +19,7 @@ func NewProjectST(ctx *Context, svr *XdsServer) *STProject {
p := STProject{
Context: ctx,
server: svr,
- folder: &FolderConfig{},
+ folder: &XdsFolderConfig{},
}
return &p
}
@@ -27,6 +28,7 @@ func NewProjectST(ctx *Context, svr *XdsServer) *STProject {
func (p *STProject) Add(cfg ProjectConfig) (*ProjectConfig, error) {
var err error
+ // Add project/folder into XDS Server
err = p.server.FolderAdd(p.server.ProjectToFolder(cfg), p.folder)
if err != nil {
return nil, err
@@ -34,19 +36,37 @@ func (p *STProject) Add(cfg ProjectConfig) (*ProjectConfig, error) {
svrPrj := p.GetProject()
// Declare project into local Syncthing
- p.SThg.FolderChange(st.FolderChangeArg{
- ID: cfg.ID,
- Label: cfg.Label,
+ id, err := p.SThg.FolderChange(st.FolderChangeArg{
+ ID: svrPrj.ID,
+ Label: svrPrj.Label,
RelativePath: cfg.ClientPath,
SyncThingID: p.server.ServerConfig.Builder.SyncThingID,
})
+ if err != nil {
+ return nil, err
+ }
- return svrPrj, nil
+ locPrj, err := p.SThg.FolderConfigGet(id)
+ if err != nil {
+ svrPrj.Status = StatusErrorConfig
+ return nil, err
+ }
+ if svrPrj.ID != locPrj.ID {
+ p.Log.Errorf("Project ID in XDSServer and local ST differ: %s != %s", svrPrj.ID, locPrj.ID)
+ }
+
+ // Use Update function to setup remains fields
+ return p.UpdateProject(*svrPrj)
}
// Delete a project
func (p *STProject) Delete() error {
- return p.server.FolderDelete(p.folder.ID)
+ errSvr := p.server.FolderDelete(p.folder.ID)
+ errLoc := p.SThg.FolderDelete(p.folder.ID)
+ if errSvr != nil {
+ return errSvr
+ }
+ return errLoc
}
// GetProject Get public part of project config
@@ -56,38 +76,111 @@ func (p *STProject) GetProject() *ProjectConfig {
return &prj
}
-// SetProject Set project config
-func (p *STProject) SetProject(prj ProjectConfig) *ProjectConfig {
- // SEB TODO
+// UpdateProject Update project config
+func (p *STProject) UpdateProject(prj ProjectConfig) (*ProjectConfig, error) {
+ // Update folder
p.folder = p.server.ProjectToFolder(prj)
- return p.GetProject()
+ svrPrj := p.GetProject()
+
+ // Register events to update folder status
+ // Register to XDS Server events
+ p.server.EventOn("event:FolderStateChanged", p._cbServerFolderChanged)
+ if err := p.server.EventRegister("FolderStateChanged", svrPrj.ID); err != nil {
+ p.Log.Warningf("XDS Server EventRegister failed: %v", err)
+ return svrPrj, err
+ }
+
+ // Register to Local Syncthing events
+ for _, evName := range []string{st.EventStateChanged, st.EventFolderPaused} {
+ evID, err := p.SThg.Events.Register(evName, p._cbLocalSTEvents, svrPrj.ID, nil)
+ if err != nil {
+ return nil, err
+ }
+ p.eventIDs = append(p.eventIDs, evID)
+ }
+
+ return svrPrj, nil
}
// GetServer Get the XdsServer that holds this project
func (p *STProject) GetServer() *XdsServer {
- // SEB TODO
return p.server
}
-// GetFullPath returns the full path of a directory (from server POV)
-func (p *STProject) GetFullPath(dir string) string {
- /* SEB
- if &dir == nil {
- return p.folder.DataSTProject.ServerPath
- }
- return filepath.Join(p.folder.DataSTProject.ServerPath, dir)
- */
- return "SEB TODO"
-}
-
// Sync Force project files synchronization
func (p *STProject) Sync() error {
- // SEB TODO
- return nil
+ if err := p.server.FolderSync(p.folder.ID); err != nil {
+ return err
+ }
+ return p.SThg.FolderScan(p.folder.ID, "")
}
// IsInSync Check if project files are in-sync
func (p *STProject) IsInSync() (bool, error) {
- // SEB TODO
- return false, nil
+ // Should be up-to-date by callbacks (see below)
+ return p.folder.IsInSync, nil
+}
+
+/**
+** Private functions
+***/
+
+// callback use to update (XDS Server) folder IsInSync status
+
+func (p *STProject) _cbServerFolderChanged(data interface{}) {
+ evt := data.(XdsEventFolderChange)
+
+ // Only process event that concerns this project/folder ID
+ if p.folder.ID != evt.Folder.ID {
+ return
+ }
+
+ if evt.Folder.IsInSync != p.folder.DataCloudSync.STSvrIsInSync ||
+ evt.Folder.Status != p.folder.DataCloudSync.STSvrStatus {
+
+ p.folder.DataCloudSync.STSvrIsInSync = evt.Folder.IsInSync
+ p.folder.DataCloudSync.STSvrStatus = evt.Folder.Status
+
+ if err := p.events.Emit(EVTProjectChange, p.server.FolderToProject(*p.folder)); err != nil {
+ p.Log.Warningf("Cannot notify project change: %v", err)
+ }
+ }
+}
+
+// callback use to update IsInSync status
+func (p *STProject) _cbLocalSTEvents(ev st.Event, data *st.EventsCBData) {
+
+ inSync := p.folder.DataCloudSync.STLocIsInSync
+ sts := p.folder.DataCloudSync.STLocStatus
+ prevSync := inSync
+ prevStatus := sts
+
+ switch ev.Type {
+
+ case st.EventStateChanged:
+ to := ev.Data["to"]
+ switch to {
+ case "scanning", "syncing":
+ sts = StatusSyncing
+ case "idle":
+ sts = StatusEnable
+ }
+ inSync = (to == "idle")
+
+ case st.EventFolderPaused:
+ if sts == StatusEnable {
+ sts = StatusPause
+ }
+ inSync = false
+ }
+
+ if prevSync != inSync || prevStatus != sts {
+
+ p.folder.DataCloudSync.STLocIsInSync = inSync
+ p.folder.DataCloudSync.STLocStatus = sts
+
+ if err := p.events.Emit(EVTProjectChange, p.server.FolderToProject(*p.folder)); err != nil {
+ p.Log.Warningf("Cannot notify project change: %v", err)
+ }
+ }
}
diff --git a/lib/agent/projects.go b/lib/agent/projects.go
index 39c120f..5e20395 100644
--- a/lib/agent/projects.go
+++ b/lib/agent/projects.go
@@ -14,16 +14,8 @@ type Projects struct {
*Context
SThg *st.SyncThing
projects map[string]*IPROJECT
- //SEB registerCB []RegisteredCB
}
-/* SEB
-type RegisteredCB struct {
- cb *EventCB
- data *EventCBData
-}
-*/
-
// Mutex to make add/delete atomic
var pjMutex = sync.NewMutex()
@@ -33,7 +25,6 @@ func NewProjects(ctx *Context, st *st.SyncThing) *Projects {
Context: ctx,
SThg: st,
projects: make(map[string]*IPROJECT),
- //registerCB: []RegisteredCB{},
}
}
@@ -51,26 +42,19 @@ func (p *Projects) Init(server *XdsServer) error {
if svr.Disabled {
continue
}
- xFlds := []FolderConfig{}
- if err := svr.HTTPGet("/folders", &xFlds); err != nil {
+ xFlds := []XdsFolderConfig{}
+ if err := svr.GetFolders(&xFlds); err != nil {
errMsg += fmt.Sprintf("Cannot retrieve folders config of XDS server ID %s : %v \n", svr.ID, err.Error())
continue
}
- p.Log.Debugf("Server %s, %d projects detected", svr.ID[:8], len(xFlds))
+ p.Log.Debugf("Connected to XDS Server %s, %d projects detected", svr.ID, len(xFlds))
for _, prj := range xFlds {
newP := svr.FolderToProject(prj)
- if /*nPrj*/ _, err := p.createUpdate(newP, false, true); err != nil {
+ if _, err := p.createUpdate(newP, false, true); err != nil {
errMsg += "Error while creating project id " + prj.ID + ": " + err.Error() + "\n "
continue
}
-
- /* FIXME emit EVTProjectChange event ?
- if err := p.events.Emit(EVTProjectChange, *nPrj); err != nil {
- p.Log.Warningf("Cannot notify project change: %v", err)
- }
- */
}
-
}
p.Log.Infof("Number of loaded Projects: %d", len(p.projects))
@@ -161,7 +145,6 @@ func (p *Projects) createUpdate(newF ProjectConfig, create bool, initial bool) (
// SYNCTHING
case TypeCloudSync:
if p.SThg != nil {
- /*SEB fld = f.SThg.NewFolderST(f.Conf)*/
fld = NewProjectST(p.Context, svr)
} else {
return nil, fmt.Errorf("Cloud Sync project not supported")
@@ -179,12 +162,16 @@ func (p *Projects) createUpdate(newF ProjectConfig, create bool, initial bool) (
// Add project on server
if newPrj, err = fld.Add(newF); err != nil {
newF.Status = StatusErrorConfig
- log.Printf("ERROR Adding folder: %v\n", err)
+ log.Printf("ERROR Adding project: %v\n", err)
return newPrj, err
}
} else {
// Just update project config
- newPrj = fld.SetProject(newF)
+ if newPrj, err = fld.UpdateProject(newF); err != nil {
+ newF.Status = StatusErrorConfig
+ log.Printf("ERROR Updating project: %v\n", err)
+ return newPrj, err
+ }
}
// Sanity check
diff --git a/lib/agent/session.go b/lib/agent/session.go
index e50abe1..06789d5 100644
--- a/lib/agent/session.go
+++ b/lib/agent/session.go
@@ -194,15 +194,13 @@ func (s *Sessions) refresh(sid string) {
}
func (s *Sessions) monitorSessMap() {
- const dbgFullTrace = false // for debugging
-
for {
select {
case <-s.stop:
s.Log.Debugln("Stop monitorSessMap")
return
case <-time.After(sessionMonitorTime * time.Second):
- if dbgFullTrace {
+ if s.LogLevelSilly {
s.Log.Debugf("Sessions Map size: %d", len(s.sessMap))
s.Log.Debugf("Sessions Map : %v", s.sessMap)
}
@@ -214,7 +212,9 @@ func (s *Sessions) monitorSessMap() {
s.mutex.Lock()
for _, ss := range s.sessMap {
if ss.expireAt.Sub(time.Now()) < 0 {
- //SEB DEBUG s.Log.Debugf("Delete expired session id: %s", ss.ID)
+ if s.LogLevelSilly {
+ s.Log.Debugf("Delete expired session id: %s", ss.ID)
+ }
delete(s.sessMap, ss.ID)
}
}
diff --git a/lib/agent/webserver.go b/lib/agent/webserver.go
index ead06d1..4b2e024 100644
--- a/lib/agent/webserver.go
+++ b/lib/agent/webserver.go
@@ -148,11 +148,9 @@ func (s *WebServer) middlewareXDSDetails() gin.HandlerFunc {
}
}
-/* SEB
func (s *WebServer) isValidAPIKey(key string) bool {
- return (key == s.Config.FileConf.XDSAPIKey && key != "")
+ return (s.Config.FileConf.XDSAPIKey != "" && key == s.Config.FileConf.XDSAPIKey)
}
-*/
func (s *WebServer) middlewareCSRF() gin.HandlerFunc {
return func(c *gin.Context) {
diff --git a/lib/agent/xdsserver.go b/lib/agent/xdsserver.go
index 518c68b..c900c9e 100644
--- a/lib/agent/xdsserver.go
+++ b/lib/agent/xdsserver.go
@@ -7,6 +7,7 @@ import (
"io/ioutil"
"net/http"
"strings"
+ "sync"
"time"
"github.com/gin-gonic/gin"
@@ -16,7 +17,7 @@ import (
sio_client "github.com/sebd71/go-socket.io-client"
)
-// Server .
+// XdsServer .
type XdsServer struct {
*Context
ID string
@@ -26,11 +27,13 @@ type XdsServer struct {
ConnRetry int
Connected bool
Disabled bool
- ServerConfig *xdsServerConfig
+ ServerConfig *XdsServerConfig
- // callbacks
+ // Events management
CBOnError func(error)
CBOnDisconnect func(error)
+ sockEvents map[string][]*caller
+ sockEventsLock *sync.Mutex
// Private fields
client *common.HTTPClient
@@ -39,25 +42,25 @@ type XdsServer struct {
apiRouter *gin.RouterGroup
}
-// xdsServerConfig Data return by GET /config
-type xdsServerConfig struct {
+// XdsServerConfig Data return by GET /config
+type XdsServerConfig struct {
ID string `json:"id"`
Version string `json:"version"`
APIVersion string `json:"apiVersion"`
VersionGitTag string `json:"gitTag"`
SupportedSharing map[string]bool `json:"supportedSharing"`
- Builder xdsBuilderConfig `json:"builder"`
+ Builder XdsBuilderConfig `json:"builder"`
}
-// xdsBuilderConfig represents the builder container configuration
-type xdsBuilderConfig struct {
+// XdsBuilderConfig represents the builder container configuration
+type XdsBuilderConfig struct {
IP string `json:"ip"`
Port string `json:"port"`
SyncThingID string `json:"syncThingID"`
}
-// FolderType XdsServer folder type
-type FolderType string
+// XdsFolderType XdsServer folder type
+type XdsFolderType string
const (
XdsTypePathMap = "PathMap"
@@ -65,28 +68,52 @@ const (
XdsTypeCifsSmb = "CIFS"
)
-// FolderConfig XdsServer folder config
-type FolderConfig struct {
- ID string `json:"id"`
- Label string `json:"label"`
- ClientPath string `json:"path"`
- Type FolderType `json:"type"`
- Status string `json:"status"`
- IsInSync bool `json:"isInSync"`
- DefaultSdk string `json:"defaultSdk"`
+// XdsFolderConfig XdsServer folder config
+type XdsFolderConfig struct {
+ ID string `json:"id"`
+ Label string `json:"label"`
+ ClientPath string `json:"path"`
+ Type XdsFolderType `json:"type"`
+ Status string `json:"status"`
+ IsInSync bool `json:"isInSync"`
+ DefaultSdk string `json:"defaultSdk"`
// Specific data depending on which Type is used
- DataPathMap PathMapConfig `json:"dataPathMap,omitempty"`
- DataCloudSync CloudSyncConfig `json:"dataCloudSync,omitempty"`
+ DataPathMap XdsPathMapConfig `json:"dataPathMap,omitempty"`
+ DataCloudSync XdsCloudSyncConfig `json:"dataCloudSync,omitempty"`
}
-// PathMapConfig Path mapping specific data
-type PathMapConfig struct {
+// XdsPathMapConfig Path mapping specific data
+type XdsPathMapConfig struct {
ServerPath string `json:"serverPath"`
}
-// CloudSyncConfig CloudSync (AKA Syncthing) specific data
-type CloudSyncConfig struct {
- SyncThingID string `json:"syncThingID"`
+// XdsCloudSyncConfig CloudSync (AKA Syncthing) specific data
+type XdsCloudSyncConfig struct {
+ SyncThingID string `json:"syncThingID"`
+ STSvrStatus string `json:"-"`
+ STSvrIsInSync bool `json:"-"`
+ STLocStatus string `json:"-"`
+ STLocIsInSync bool `json:"-"`
+}
+
+// XdsEventRegisterArgs arguments used to register to XDS server events
+type XdsEventRegisterArgs struct {
+ Name string `json:"name"`
+ ProjectID string `json:"filterProjectID"`
+}
+
+// XdsEventFolderChange Folder change event structure
+type XdsEventFolderChange struct {
+ Time string `json:"time"`
+ Type string `json:"type"`
+ Folder XdsFolderConfig `json:"folder"`
+}
+
+// caller Used to chain event listeners
+type caller struct {
+ id uuid.UUID
+ EventName string
+ Func func(interface{})
}
const _IDTempoPrefix = "tempo-"
@@ -103,7 +130,9 @@ func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
Connected: false,
Disabled: false,
- logOut: ctx.Log.Out,
+ sockEvents: make(map[string][]*caller),
+ sockEventsLock: &sync.Mutex{},
+ logOut: ctx.Log.Out,
}
}
@@ -138,7 +167,7 @@ func (xs *XdsServer) Connect() error {
time.Sleep(time.Second)
}
if retry == 0 {
- // FIXME SEB: re-use _reconnect to wait longer in background
+ // FIXME: re-use _reconnect to wait longer in background
return fmt.Errorf("Connection to XDS Server failure")
}
if err != nil {
@@ -161,16 +190,35 @@ func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
xs.logOut = out
}
+// SendCommand Send a command to XDS Server
+func (xs *XdsServer) SendCommand(cmd string, body []byte) (*http.Response, error) {
+ url := cmd
+ if !strings.HasPrefix("/", cmd) {
+ url = "/" + cmd
+ }
+ return xs.client.HTTPPostWithRes(url, string(body))
+}
+
+// GetVersion Send Get request to retrieve XDS Server version
+func (xs *XdsServer) GetVersion(res interface{}) error {
+ return xs._HTTPGet("/version", &res)
+}
+
+// GetFolders Send GET request to get current folder configuration
+func (xs *XdsServer) GetFolders(prjs *[]XdsFolderConfig) error {
+ return xs._HTTPGet("/folders", prjs)
+}
+
// FolderAdd Send POST request to add a folder
-func (xs *XdsServer) FolderAdd(prj *FolderConfig, res interface{}) error {
- response, err := xs.HTTPPost("/folder", prj)
+func (xs *XdsServer) FolderAdd(prj *XdsFolderConfig, res interface{}) error {
+ response, err := xs._HTTPPost("/folder", prj)
if err != nil {
return err
}
if response.StatusCode != 200 {
return fmt.Errorf("FolderAdd error status=%s", response.Status)
}
- // Result is a FolderConfig that is equivalent to ProjectConfig
+ // Result is a XdsFolderConfig that is equivalent to ProjectConfig
err = json.Unmarshal(xs.client.ResponseToBArray(response), res)
return err
@@ -181,27 +229,9 @@ func (xs *XdsServer) FolderDelete(id string) error {
return xs.client.HTTPDelete("/folder/" + id)
}
-// HTTPGet .
-func (xs *XdsServer) HTTPGet(url string, data interface{}) error {
- var dd []byte
- if err := xs.client.HTTPGet(url, &dd); err != nil {
- return err
- }
- return json.Unmarshal(dd, &data)
-}
-
-// HTTPPost .
-func (xs *XdsServer) HTTPPost(url string, data interface{}) (*http.Response, error) {
- body, err := json.Marshal(data)
- if err != nil {
- return nil, err
- }
- return xs.HTTPPostBody(url, string(body))
-}
-
-// HTTPPostBody .
-func (xs *XdsServer) HTTPPostBody(url string, body string) (*http.Response, error) {
- return xs.client.HTTPPostWithRes(url, body)
+// FolderSync Send POST request to force synchronization of a folder
+func (xs *XdsServer) FolderSync(id string) error {
+ return xs.client.HTTPPost("/folder/sync/"+id, "")
}
// SetAPIRouterGroup .
@@ -218,7 +248,7 @@ func (xs *XdsServer) PassthroughGet(url string) {
xs.apiRouter.GET(url, func(c *gin.Context) {
var data interface{}
- if err := xs.HTTPGet(url, &data); err != nil {
+ if err := xs._HTTPGet(url, &data); err != nil {
if strings.Contains(err.Error(), "connection refused") {
xs.Connected = false
xs._NotifyState()
@@ -246,7 +276,7 @@ func (xs *XdsServer) PassthroughPost(url string) {
return
}
- response, err := xs.HTTPPostBody(url, string(bodyReq[:n]))
+ response, err := xs._HTTPPost(url, bodyReq[:n])
if err != nil {
common.APIError(c, err.Error())
return
@@ -260,49 +290,132 @@ func (xs *XdsServer) PassthroughPost(url string) {
})
}
+// EventRegister Post a request to register to an XdsServer event
+func (xs *XdsServer) EventRegister(evName string, id string) error {
+ var err error
+ _, err = xs._HTTPPost("/events/register", XdsEventRegisterArgs{
+ Name: evName,
+ ProjectID: id,
+ })
+ return err
+}
+
// EventOn Register a callback on events reception
-func (xs *XdsServer) EventOn(message string, f interface{}) (err error) {
+func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, error) {
if xs.ioSock == nil {
- return fmt.Errorf("Io.Socket not initialized")
+ return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
}
- // FIXME SEB: support chain / multiple listeners
- /* sockEvents map[string][]*caller
+
xs.sockEventsLock.Lock()
- xs.sockEvents[message] = append(xs.sockEvents[message], f)
- xs.sockEventsLock.Unlock()
- xs.ioSock.On(message, func(ev) {
+ defer xs.sockEventsLock.Unlock()
+
+ if _, exist := xs.sockEvents[evName]; !exist {
+ // Register listener only the first time
+ evn := evName
+
+ // FIXME: use generic type: data interface{} instead of data XdsEventFolderChange
+ var err error
+ if evName == "event:FolderStateChanged" {
+ err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error {
+ xs.sockEventsLock.Lock()
+ defer xs.sockEventsLock.Unlock()
+ for _, c := range xs.sockEvents[evn] {
+ c.Func(data)
+ }
+ return nil
+ })
+ } else {
+ err = xs.ioSock.On(evn, f)
+ }
+ if err != nil {
+ return uuid.Nil, err
+ }
+ }
- })
- */
- return xs.ioSock.On(message, f)
+ c := &caller{
+ id: uuid.NewV1(),
+ EventName: evName,
+ Func: f,
+ }
+
+ xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
+ return c.id, nil
+}
+
+// EventOff Un-register a (or all) callbacks associated to an event
+func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
+ xs.sockEventsLock.Lock()
+ defer xs.sockEventsLock.Unlock()
+ if _, exist := xs.sockEvents[evName]; exist {
+ if id == uuid.Nil {
+ // Un-register all
+ xs.sockEvents[evName] = []*caller{}
+ } else {
+ // Un-register only the specified callback
+ for i, ff := range xs.sockEvents[evName] {
+ if uuid.Equal(ff.id, id) {
+ xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
+ break
+ }
+ }
+ }
+ }
+ return nil
}
-// ProjectToFolder
-func (xs *XdsServer) ProjectToFolder(pPrj ProjectConfig) *FolderConfig {
+// ProjectToFolder Convert Project structure to Folder structure
+func (xs *XdsServer) ProjectToFolder(pPrj ProjectConfig) *XdsFolderConfig {
stID := ""
if pPrj.Type == XdsTypeCloudSync {
stID, _ = xs.SThg.IDGet()
}
- fPrj := FolderConfig{
+ fPrj := XdsFolderConfig{
ID: pPrj.ID,
Label: pPrj.Label,
ClientPath: pPrj.ClientPath,
- Type: FolderType(pPrj.Type),
+ Type: XdsFolderType(pPrj.Type),
Status: pPrj.Status,
IsInSync: pPrj.IsInSync,
DefaultSdk: pPrj.DefaultSdk,
- DataPathMap: PathMapConfig{
+ DataPathMap: XdsPathMapConfig{
ServerPath: pPrj.ServerPath,
},
- DataCloudSync: CloudSyncConfig{
- SyncThingID: stID,
+ DataCloudSync: XdsCloudSyncConfig{
+ SyncThingID: stID,
+ STLocIsInSync: pPrj.IsInSync,
+ STLocStatus: pPrj.Status,
+ STSvrIsInSync: pPrj.IsInSync,
+ STSvrStatus: pPrj.Status,
},
}
+
return &fPrj
}
-// FolderToProject
-func (xs *XdsServer) FolderToProject(fPrj FolderConfig) ProjectConfig {
+// FolderToProject Convert Folder structure to Project structure
+func (xs *XdsServer) FolderToProject(fPrj XdsFolderConfig) ProjectConfig {
+ inSync := fPrj.IsInSync
+ sts := fPrj.Status
+
+ if fPrj.Type == XdsTypeCloudSync {
+ inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
+
+ sts = fPrj.DataCloudSync.STSvrStatus
+ switch fPrj.DataCloudSync.STLocStatus {
+ case StatusErrorConfig, StatusDisable, StatusPause:
+ sts = fPrj.DataCloudSync.STLocStatus
+ break
+ case StatusSyncing:
+ if sts != StatusErrorConfig && sts != StatusDisable && sts != StatusPause {
+ sts = StatusSyncing
+ }
+ break
+ case StatusEnable:
+ // keep STSvrStatus
+ break
+ }
+ }
+
pPrj := ProjectConfig{
ID: fPrj.ID,
ServerID: xs.ID,
@@ -310,8 +423,8 @@ func (xs *XdsServer) FolderToProject(fPrj FolderConfig) ProjectConfig {
ClientPath: fPrj.ClientPath,
ServerPath: fPrj.DataPathMap.ServerPath,
Type: ProjectType(fPrj.Type),
- Status: fPrj.Status,
- IsInSync: fPrj.IsInSync,
+ Status: sts,
+ IsInSync: inSync,
DefaultSdk: fPrj.DefaultSdk,
}
return pPrj
@@ -350,6 +463,24 @@ func (xs *XdsServer) _CreateConnectHTTP() error {
return nil
}
+// _HTTPGet .
+func (xs *XdsServer) _HTTPGet(url string, data interface{}) error {
+ var dd []byte
+ if err := xs.client.HTTPGet(url, &dd); err != nil {
+ return err
+ }
+ return json.Unmarshal(dd, &data)
+}
+
+// _HTTPPost .
+func (xs *XdsServer) _HTTPPost(url string, data interface{}) (*http.Response, error) {
+ body, err := json.Marshal(data)
+ if err != nil {
+ return nil, err
+ }
+ return xs.client.HTTPPostWithRes(url, string(body))
+}
+
// Re-established connection
func (xs *XdsServer) _reconnect() error {
err := xs._connect(true)
@@ -363,8 +494,8 @@ func (xs *XdsServer) _reconnect() error {
// Established HTTP and WS connection and retrieve XDSServer config
func (xs *XdsServer) _connect(reConn bool) error {
- xdsCfg := xdsServerConfig{}
- if err := xs.HTTPGet("/config", &xdsCfg); err != nil {
+ xdsCfg := XdsServerConfig{}
+ if err := xs._HTTPGet("/config", &xdsCfg); err != nil {
xs.Connected = false
if !reConn {
xs._NotifyState()