diff options
Diffstat (limited to 'lib/agent')
-rw-r--r-- | lib/agent/agent.go | 154 | ||||
-rw-r--r-- | lib/agent/apiv1-browse.go | 28 | ||||
-rw-r--r-- | lib/agent/apiv1-config.go | 108 | ||||
-rw-r--r-- | lib/agent/apiv1-events.go | 73 | ||||
-rw-r--r-- | lib/agent/apiv1-exec.go | 131 | ||||
-rw-r--r-- | lib/agent/apiv1-projects.go | 72 | ||||
-rw-r--r-- | lib/agent/apiv1-version.go | 45 | ||||
-rw-r--r-- | lib/agent/apiv1.go | 129 | ||||
-rw-r--r-- | lib/agent/events.go | 132 | ||||
-rw-r--r-- | lib/agent/project-interface.go | 43 | ||||
-rw-r--r-- | lib/agent/project-pathmap.go | 123 | ||||
-rw-r--r-- | lib/agent/project-st.go | 186 | ||||
-rw-r--r-- | lib/agent/projects.go | 241 | ||||
-rw-r--r-- | lib/agent/session.go | 224 | ||||
-rw-r--r-- | lib/agent/webserver.go | 244 | ||||
-rw-r--r-- | lib/agent/xdsserver.go | 605 |
16 files changed, 2524 insertions, 14 deletions
diff --git a/lib/agent/agent.go b/lib/agent/agent.go index 74872f7..3bdd89f 100644 --- a/lib/agent/agent.go +++ b/lib/agent/agent.go @@ -2,28 +2,39 @@ package agent import ( "fmt" + "log" "os" "os/exec" "os/signal" + "path/filepath" "syscall" + "time" "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" "github.com/iotbzh/xds-agent/lib/syncthing" "github.com/iotbzh/xds-agent/lib/xdsconfig" - "github.com/iotbzh/xds-agent/lib/webserver" ) +const cookieMaxAge = "3600" + // Context holds the Agent context structure type Context struct { - ProgName string - Config *xdsconfig.Config - Log *logrus.Logger - SThg *st.SyncThing - SThgCmd *exec.Cmd - SThgInotCmd *exec.Cmd - WWWServer *webserver.ServerService - Exit chan os.Signal + ProgName string + Config *xdsconfig.Config + Log *logrus.Logger + LogLevelSilly bool + SThg *st.SyncThing + SThgCmd *exec.Cmd + SThgInotCmd *exec.Cmd + + webServer *WebServer + xdsServers map[string]*XdsServer + sessions *Sessions + events *Events + projects *Projects + + Exit chan os.Signal } // NewAgent Create a new instance of Agent @@ -43,11 +54,18 @@ func NewAgent(cliCtx *cli.Context) *Context { } log.Formatter = &logrus.TextFormatter{} + sillyVal, sillyLog := os.LookupEnv("XDS_LOG_SILLY") + // Define default configuration ctx := Context{ - ProgName: cliCtx.App.Name, - Log: log, - Exit: make(chan os.Signal, 1), + ProgName: cliCtx.App.Name, + Log: log, + LogLevelSilly: (sillyLog && sillyVal == "1"), + Exit: make(chan os.Signal, 1), + + webServer: nil, + xdsServers: make(map[string]*XdsServer), + events: nil, } // register handler on SIGTERM / exit @@ -57,6 +75,114 @@ func NewAgent(cliCtx *cli.Context) *Context { return &ctx } +// Run Main function called to run agent +func (ctx *Context) Run() (int, error) { + var err error + + // Logs redirected into a file when logfile option or logsDir config is set + ctx.Config.LogVerboseOut = os.Stderr + if ctx.Config.FileConf.LogsDir != "" { + if ctx.Config.Options.LogFile != "stdout" { + logFile := ctx.Config.Options.LogFile + + fdL, err := os.OpenFile(logFile, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0666) + if err != nil { + msgErr := fmt.Errorf("Cannot create log file %s", logFile) + return int(syscall.EPERM), msgErr + } + ctx.Log.Out = fdL + + ctx._logPrint("Logging file: %s\n", logFile) + } + + logFileHTTPReq := filepath.Join(ctx.Config.FileConf.LogsDir, "xds-agent-verbose.log") + fdLH, err := os.OpenFile(logFileHTTPReq, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0666) + if err != nil { + msgErr := fmt.Errorf("Cannot create log file %s", logFileHTTPReq) + return int(syscall.EPERM), msgErr + } + ctx.Config.LogVerboseOut = fdLH + + ctx._logPrint("Logging file for HTTP requests: %s\n", logFileHTTPReq) + } + + // Create syncthing instance when section "syncthing" is present in config.json + if ctx.Config.FileConf.SThgConf != nil { + ctx.SThg = st.NewSyncThing(ctx.Config, ctx.Log) + } + + // Start local instance of Syncthing and Syncthing-notify + if ctx.SThg != nil { + ctx.Log.Infof("Starting Syncthing...") + ctx.SThgCmd, err = ctx.SThg.Start() + if err != nil { + return 2, err + } + fmt.Printf("Syncthing started (PID %d)\n", ctx.SThgCmd.Process.Pid) + + ctx.Log.Infof("Starting Syncthing-inotify...") + ctx.SThgInotCmd, err = ctx.SThg.StartInotify() + if err != nil { + return 2, err + } + fmt.Printf("Syncthing-inotify started (PID %d)\n", ctx.SThgInotCmd.Process.Pid) + + // Establish connection with local Syncthing (retry if connection fail) + time.Sleep(3 * time.Second) + maxRetry := 30 + retry := maxRetry + for retry > 0 { + if err := ctx.SThg.Connect(); err == nil { + break + } + ctx.Log.Infof("Establishing connection to Syncthing (retry %d/%d)", retry, maxRetry) + time.Sleep(time.Second) + retry-- + } + if err != nil || retry == 0 { + return 2, err + } + + // Retrieve Syncthing config + id, err := ctx.SThg.IDGet() + if err != nil { + return 2, err + } + ctx.Log.Infof("Local Syncthing ID: %s", id) + + } else { + ctx.Log.Infof("Cloud Sync / Syncthing not supported") + } + + // Create Web Server + ctx.webServer = NewWebServer(ctx) + + // Sessions manager + ctx.sessions = NewClientSessions(ctx, cookieMaxAge) + + // Create events management + ctx.events = NewEvents(ctx) + + // Create projects management + ctx.projects = NewProjects(ctx, ctx.SThg) + + // Run Web Server until exit requested (blocking call) + if err = ctx.webServer.Serve(); err != nil { + log.Println(err) + return 3, err + } + + return 4, fmt.Errorf("Program exited") +} + +// Helper function to log message on both stdout and logger +func (ctx *Context) _logPrint(format string, args ...interface{}) { + fmt.Printf(format, args...) + if ctx.Log.Out != os.Stdout { + ctx.Log.Infof(format, args...) + } +} + // Handle exit and properly stop/close all stuff func handlerSigTerm(ctx *Context) { <-ctx.Exit @@ -68,9 +194,9 @@ func handlerSigTerm(ctx *Context) { ctx.SThg.Stop() ctx.SThg.StopInotify() } - if ctx.WWWServer != nil { + if ctx.webServer != nil { ctx.Log.Infof("Stoping Web server...") - ctx.WWWServer.Stop() + ctx.webServer.Stop() } os.Exit(1) } diff --git a/lib/agent/apiv1-browse.go b/lib/agent/apiv1-browse.go new file mode 100644 index 0000000..1701a2e --- /dev/null +++ b/lib/agent/apiv1-browse.go @@ -0,0 +1,28 @@ +package agent + +import ( + "net/http" + + "github.com/gin-gonic/gin" +) + +type directory struct { + Name string `json:"name"` + Fullpath string `json:"fullpath"` +} + +type apiDirectory struct { + Dir []directory `json:"dir"` +} + +// browseFS used to browse local file system +func (s *APIService) browseFS(c *gin.Context) { + + response := apiDirectory{ + Dir: []directory{ + directory{Name: "TODO SEB"}, + }, + } + + c.JSON(http.StatusOK, response) +} diff --git a/lib/agent/apiv1-config.go b/lib/agent/apiv1-config.go new file mode 100644 index 0000000..31d8de6 --- /dev/null +++ b/lib/agent/apiv1-config.go @@ -0,0 +1,108 @@ +package agent + +import ( + "net/http" + "sync" + + "github.com/gin-gonic/gin" + "github.com/iotbzh/xds-agent/lib/xdsconfig" + common "github.com/iotbzh/xds-common/golib" +) + +var confMut sync.Mutex + +// APIConfig parameters (json format) of /config command +type APIConfig struct { + Servers []ServerCfg `json:"servers"` + + // Not exposed outside in JSON + Version string `json:"-"` + APIVersion string `json:"-"` + VersionGitTag string `json:"-"` +} + +// ServerCfg . +type ServerCfg struct { + ID string `json:"id"` + URL string `json:"url"` + APIURL string `json:"apiUrl"` + PartialURL string `json:"partialUrl"` + ConnRetry int `json:"connRetry"` + Connected bool `json:"connected"` + Disabled bool `json:"disabled"` +} + +// GetConfig returns the configuration +func (s *APIService) getConfig(c *gin.Context) { + confMut.Lock() + defer confMut.Unlock() + + cfg := s._getConfig() + + c.JSON(http.StatusOK, cfg) +} + +// SetConfig sets configuration +func (s *APIService) setConfig(c *gin.Context) { + var cfgArg APIConfig + if c.BindJSON(&cfgArg) != nil { + common.APIError(c, "Invalid arguments") + return + } + + confMut.Lock() + defer confMut.Unlock() + + s.Log.Debugln("SET config: ", cfgArg) + + // First delete/disable XDS Server that are no longer listed + for _, svr := range s.xdsServers { + found := false + for _, svrArg := range cfgArg.Servers { + if svr.ID == svrArg.ID { + found = true + break + } + } + if !found { + s.DelXdsServer(svr.ID) + } + } + + // Add new XDS Server + for _, svr := range cfgArg.Servers { + cfg := xdsconfig.XDSServerConf{ + ID: svr.ID, + URL: svr.URL, + ConnRetry: svr.ConnRetry, + } + if _, err := s.AddXdsServer(cfg); err != nil { + common.APIError(c, err.Error()) + return + } + } + + c.JSON(http.StatusOK, s._getConfig()) +} + +func (s *APIService) _getConfig() APIConfig { + cfg := APIConfig{ + Version: s.Config.Version, + APIVersion: s.Config.APIVersion, + VersionGitTag: s.Config.VersionGitTag, + Servers: []ServerCfg{}, + } + + for _, svr := range s.xdsServers { + cfg.Servers = append(cfg.Servers, ServerCfg{ + ID: svr.ID, + URL: svr.BaseURL, + APIURL: svr.APIURL, + PartialURL: svr.PartialURL, + ConnRetry: svr.ConnRetry, + Connected: svr.Connected, + Disabled: svr.Disabled, + }) + } + return cfg +} diff --git a/lib/agent/apiv1-events.go b/lib/agent/apiv1-events.go new file mode 100644 index 0000000..8aad18a --- /dev/null +++ b/lib/agent/apiv1-events.go @@ -0,0 +1,73 @@ +package agent + +import ( + "net/http" + + "github.com/gin-gonic/gin" + common "github.com/iotbzh/xds-common/golib" +) + +// EventRegisterArgs is the parameters (json format) of /events/register command +type EventRegisterArgs struct { + Name string `json:"name"` + ProjectID string `json:"filterProjectID"` +} + +// EventUnRegisterArgs is the parameters (json format) of /events/unregister command +type EventUnRegisterArgs struct { + Name string `json:"name"` + ID int `json:"id"` +} + +// eventsList Registering for events that will be send over a WS +func (s *APIService) eventsList(c *gin.Context) { + c.JSON(http.StatusOK, s.events.GetList()) +} + +// eventsRegister Registering for events that will be send over a WS +func (s *APIService) eventsRegister(c *gin.Context) { + var args EventRegisterArgs + + if c.BindJSON(&args) != nil || args.Name == "" { + common.APIError(c, "Invalid arguments") + return + } + + sess := s.webServer.sessions.Get(c) + if sess == nil { + common.APIError(c, "Unknown sessions") + return + } + + // Register to all or to a specific events + if err := s.events.Register(args.Name, sess.ID); err != nil { + common.APIError(c, err.Error()) + return + } + + c.JSON(http.StatusOK, gin.H{"status": "OK"}) +} + +// eventsRegister Registering for events that will be send over a WS +func (s *APIService) eventsUnRegister(c *gin.Context) { + var args EventUnRegisterArgs + + if c.BindJSON(&args) != nil || args.Name == "" { + common.APIError(c, "Invalid arguments") + return + } + + sess := s.webServer.sessions.Get(c) + if sess == nil { + common.APIError(c, "Unknown sessions") + return + } + + // Register to all or to a specific events + if err := s.events.UnRegister(args.Name, sess.ID); err != nil { + common.APIError(c, err.Error()) + return + } + + c.JSON(http.StatusOK, gin.H{"status": "OK"}) +} diff --git a/lib/agent/apiv1-exec.go b/lib/agent/apiv1-exec.go new file mode 100644 index 0000000..37070f7 --- /dev/null +++ b/lib/agent/apiv1-exec.go @@ -0,0 +1,131 @@ +package agent + +import ( + "encoding/json" + "io/ioutil" + "net/http" + + "github.com/gin-gonic/gin" + common "github.com/iotbzh/xds-common/golib" + uuid "github.com/satori/go.uuid" +) + +// ExecArgs Only define used fields +type ExecArgs struct { + ID string `json:"id" binding:"required"` + CmdID string `json:"cmdID"` // command unique ID +} + +var execCmdID = 1 + +// ExecCmd executes remotely a command +func (s *APIService) execCmd(c *gin.Context) { + s._execRequest("/exec", c) +} + +// execSignalCmd executes remotely a command +func (s *APIService) execSignalCmd(c *gin.Context) { + s._execRequest("/signal", c) +} + +func (s *APIService) _execRequest(cmd string, c *gin.Context) { + data, err := c.GetRawData() + if err != nil { + common.APIError(c, err.Error()) + } + + args := ExecArgs{} + // XXX - we cannot use c.BindJSON, so directly unmarshall it + // (see https://github.com/gin-gonic/gin/issues/1078) + if err := json.Unmarshal(data, &args); err != nil { + common.APIError(c, "Invalid arguments") + return + } + + // First get Project ID to retrieve Server ID and send command to right server + id := c.Param("id") + if id == "" { + id = args.ID + } + + prj := s.projects.Get(id) + if prj == nil { + common.APIError(c, "Unknown id") + return + } + + svr := (*prj).GetServer() + if svr == nil { + common.APIError(c, "Cannot identify XDS Server") + return + } + + // Retrieve session info + sess := s.sessions.Get(c) + if sess == nil { + common.APIError(c, "Unknown sessions") + return + } + sock := sess.IOSocket + if sock == nil { + common.APIError(c, "Websocket not established") + return + } + + // Forward XDSServer WS events to client WS + // TODO removed static event name list and get it from XDSServer + evtList := []string{ + "exec:input", + "exec:output", + "exec:inferior-input", + "exec:inferior-output", + } + so := *sock + fwdFuncID := []uuid.UUID{} + for _, evName := range evtList { + evN := evName + fwdFunc := func(evData interface{}) { + // Forward event to Client/Dashboard + so.Emit(evN, evData) + } + id, err := svr.EventOn(evN, fwdFunc) + if err != nil { + common.APIError(c, err.Error()) + return + } + fwdFuncID = append(fwdFuncID, id) + } + + // Handle Exit event separately to cleanup registered listener + var exitFuncID uuid.UUID + exitFunc := func(evData interface{}) { + so.Emit("exec:exit", evData) + + // cleanup listener + for i, evName := range evtList { + svr.EventOff(evName, fwdFuncID[i]) + } + svr.EventOff("exec:exit", exitFuncID) + } + exitFuncID, err = svr.EventOn("exec:exit", exitFunc) + if err != nil { + common.APIError(c, err.Error()) + return + } + + // Forward back command to right server + response, err := svr.SendCommand(cmd, data) + if err != nil { + common.APIError(c, err.Error()) + return + } + + // Decode response + body, err := ioutil.ReadAll(response.Body) + if err != nil { + common.APIError(c, "Cannot read response body") + return + } + c.JSON(http.StatusOK, string(body)) + +} diff --git a/lib/agent/apiv1-projects.go b/lib/agent/apiv1-projects.go new file mode 100644 index 0000000..d4b5e74 --- /dev/null +++ b/lib/agent/apiv1-projects.go @@ -0,0 +1,72 @@ +package agent + +import ( + "net/http" + + "github.com/gin-gonic/gin" + common "github.com/iotbzh/xds-common/golib" +) + +// getProjects returns all projects configuration +func (s *APIService) getProjects(c *gin.Context) { + c.JSON(http.StatusOK, s.projects.GetProjectArr()) +} + +// getProject returns a specific project configuration +func (s *APIService) getProject(c *gin.Context) { + prj := s.projects.Get(c.Param("id")) + if prj == nil { + common.APIError(c, "Invalid id") + return + } + + c.JSON(http.StatusOK, (*prj).GetProject()) +} + +// addProject adds a new project to server config +func (s *APIService) addProject(c *gin.Context) { + var cfgArg ProjectConfig + if c.BindJSON(&cfgArg) != nil { + common.APIError(c, "Invalid arguments") + return + } + + s.Log.Debugln("Add project config: ", cfgArg) + + newFld, err := s.projects.Add(cfgArg) + if err != nil { + common.APIError(c, err.Error()) + return + } + + c.JSON(http.StatusOK, newFld) +} + +// syncProject force synchronization of project files +func (s *APIService) syncProject(c *gin.Context) { + id := c.Param("id") + + s.Log.Debugln("Sync project id: ", id) + + err := s.projects.ForceSync(id) + if err != nil { + common.APIError(c, err.Error()) + return + } + + c.JSON(http.StatusOK, "") +} + +// delProject deletes project from server config +func (s *APIService) delProject(c *gin.Context) { + id := c.Param("id") + + s.Log.Debugln("Delete project id ", id) + + delEntry, err := s.projects.Delete(id) + if err != nil { + common.APIError(c, err.Error()) + return + } + c.JSON(http.StatusOK, delEntry) +} diff --git a/lib/agent/apiv1-version.go b/lib/agent/apiv1-version.go new file mode 100644 index 0000000..6b4923f --- /dev/null +++ b/lib/agent/apiv1-version.go @@ -0,0 +1,45 @@ +package agent + +import ( + "net/http" + + "github.com/gin-gonic/gin" + common "github.com/iotbzh/xds-common/golib" +) + +type version struct { + ID string `json:"id"` + Version string `json:"version"` + APIVersion string `json:"apiVersion"` + VersionGitTag string `json:"gitTag"` +} + +type apiVersion struct { + Client version `json:"client"` + Server []version `json:"servers"` +} + +// getInfo : return various information about server +func (s *APIService) getVersion(c *gin.Context) { + response := apiVersion{ + Client: version{ + ID: "", + Version: s.Config.Version, + APIVersion: s.Config.APIVersion, + VersionGitTag: s.Config.VersionGitTag, + }, + } + + svrVer := []version{} + for _, svr := range s.xdsServers { + res := version{} + if err := svr.GetVersion(&res); err != nil { + common.APIError(c, "Cannot retrieve version of XDS server ID %s : %v", svr.ID, err.Error()) + return + } + svrVer = append(svrVer, res) + } + response.Server = svrVer + + c.JSON(http.StatusOK, response) +} diff --git a/lib/agent/apiv1.go b/lib/agent/apiv1.go new file mode 100644 index 0000000..77b05ba --- /dev/null +++ b/lib/agent/apiv1.go @@ -0,0 +1,129 @@ +package agent + +import ( + "fmt" + "strconv" + + "github.com/gin-gonic/gin" + "github.com/iotbzh/xds-agent/lib/xdsconfig" +) + +const apiBaseUrl = "/api/v1" + +// APIService . +type APIService struct { + *Context + apiRouter *gin.RouterGroup + serverIndex int +} + +// NewAPIV1 creates a new instance of API service +func NewAPIV1(ctx *Context) *APIService { + s := &APIService{ + Context: ctx, + apiRouter: ctx.webServer.router.Group(apiBaseUrl), + serverIndex: 0, + } + + s.apiRouter.GET("/version", s.getVersion) + + s.apiRouter.GET("/config", s.getConfig) + s.apiRouter.POST("/config", s.setConfig) + + s.apiRouter.GET("/browse", s.browseFS) + + s.apiRouter.GET("/projects", s.getProjects) + s.apiRouter.GET("/project/:id", s.getProject) + s.apiRouter.POST("/project", s.addProject) + s.apiRouter.POST("/project/sync/:id", s.syncProject) + s.apiRouter.DELETE("/project/:id", s.delProject) + + s.apiRouter.POST("/exec", s.execCmd) + s.apiRouter.POST("/exec/:id", s.execCmd) + s.apiRouter.POST("/signal", s.execSignalCmd) + + s.apiRouter.GET("/events", s.eventsList) + s.apiRouter.POST("/events/register", s.eventsRegister) + s.apiRouter.POST("/events/unregister", s.eventsUnRegister) + + return s +} + +// Stop Used to stop/close created services +func (s *APIService) Stop() { + for _, svr := range s.xdsServers { + svr.Close() + } +} + +// AddXdsServer Add a new XDS Server to the list of a server +func (s *APIService) AddXdsServer(cfg xdsconfig.XDSServerConf) (*XdsServer, error) { + var svr *XdsServer + var exist, tempoID bool + tempoID = false + + // First check if not already exist and update it + if svr, exist = s.xdsServers[cfg.ID]; exist { + + // Update: Found, so just update some settings + svr.ConnRetry = cfg.ConnRetry + + tempoID = svr.IsTempoID() + if svr.Connected && !svr.Disabled && svr.BaseURL == cfg.URL && tempoID { + return svr, nil + } + + // URL differ or not connected, so need to reconnect + svr.BaseURL = cfg.URL + + } else { + + // Create a new server object + if cfg.APIBaseURL == "" { + cfg.APIBaseURL = apiBaseUrl + } + if cfg.APIPartialURL == "" { + cfg.APIPartialURL = "/server/" + strconv.Itoa(s.serverIndex) + s.serverIndex = s.serverIndex + 1 + } + + // Create a new XDS Server + svr = NewXdsServer(s.Context, cfg) + + svr.SetLoggerOutput(s.Config.LogVerboseOut) + + // Passthrough routes (handle by XDS Server) + grp := s.apiRouter.Group(svr.PartialURL) + svr.SetAPIRouterGroup(grp) + svr.PassthroughGet("/sdks") + svr.PassthroughGet("/sdk/:id") + } + + // Established connection + err := svr.Connect() + + // Delete temporary ID with it has been replaced by right Server ID + if tempoID && !svr.IsTempoID() { + delete(s.xdsServers, cfg.ID) + } + + // Add to map + s.xdsServers[svr.ID] = svr + + // Load projects + if err == nil && svr.Connected { + err = s.projects.Init(svr) + } + + return svr, err +} + +// DelXdsServer Delete an XDS Server from the list of a server +func (s *APIService) DelXdsServer(id string) error { + if _, exist := s.xdsServers[id]; !exist { + return fmt.Errorf("Unknown Server ID %s", id) + } + // Don't really delete, just disable it + s.xdsServers[id].Close() + return nil +} diff --git a/lib/agent/events.go b/lib/agent/events.go new file mode 100644 index 0000000..e66f758 --- /dev/null +++ b/lib/agent/events.go @@ -0,0 +1,132 @@ +package agent + +import ( + "fmt" + "time" +) + +// Events constants +const ( + // EventTypePrefix Used as event prefix + EventTypePrefix = "event:" // following by event type + + // Supported Events type + EVTAll = "all" + EVTServerConfig = "server-config" // data type ServerCfg + EVTProjectAdd = "project-add" // data type ProjectConfig + EVTProjectDelete = "project-delete" // data type ProjectConfig + EVTProjectChange = "project-state-change" // data type ProjectConfig +) + +var _EVTAllList = []string{ + EVTServerConfig, + EVTProjectAdd, + EVTProjectDelete, + EVTProjectChange, +} + +// EventMsg Message send +type EventMsg struct { + Time string `json:"time"` + Type string `json:"type"` + Data interface{} `json:"data"` +} + +type EventDef struct { + sids map[string]int +} + +type Events struct { + *Context + eventsMap map[string]*EventDef +} + +// NewEvents creates an instance of Events +func NewEvents(ctx *Context) *Events { + evMap := make(map[string]*EventDef) + for _, ev := range _EVTAllList { + evMap[ev] = &EventDef{ + sids: make(map[string]int), + } + } + return &Events{ + Context: ctx, + eventsMap: evMap, + } +} + +// GetList returns the list of all supported events +func (e *Events) GetList() []string { + return _EVTAllList +} + +// Register Used by a client/session to register to a specific (or all) event(s) +func (e *Events) Register(evName, sessionID string) error { + evs := _EVTAllList + if evName != EVTAll { + if _, ok := e.eventsMap[evName]; !ok { + return fmt.Errorf("Unsupported event type name") + } + evs = []string{evName} + } + for _, ev := range evs { + e.eventsMap[ev].sids[sessionID]++ + } + return nil +} + +// UnRegister Used by a client/session to unregister event(s) +func (e *Events) UnRegister(evName, sessionID string) error { + evs := _EVTAllList + if evName != EVTAll { + if _, ok := e.eventsMap[evName]; !ok { + return fmt.Errorf("Unsupported event type name") + } + evs = []string{evName} + } + for _, ev := range evs { + if _, exist := e.eventsMap[ev].sids[sessionID]; exist { + delete(e.eventsMap[ev].sids, sessionID) + break + } + } + return nil +} + +// Emit Used to manually emit an event +func (e *Events) Emit(evName string, data interface{}) error { + var firstErr error + + if _, ok := e.eventsMap[evName]; !ok { + return fmt.Errorf("Unsupported event type") + } + + if e.LogLevelSilly { + e.Log.Debugf("Emit Event %s: %v", evName, data) + } + + firstErr = nil + evm := e.eventsMap[evName] + for sid := range evm.sids { + so := e.webServer.sessions.IOSocketGet(sid) + if so == nil { + if firstErr == nil { + firstErr = fmt.Errorf("IOSocketGet return nil") + } + continue + } + msg := EventMsg{ + Time: time.Now().String(), + Type: evName, + Data: data, + } + if err := (*so).Emit(EventTypePrefix+evName, msg); err != nil { + e.Log.Errorf("WS Emit %v error : %v", EventTypePrefix+evName, err) + if firstErr == nil { + firstErr = err + } + } + } + + return firstErr +} diff --git a/lib/agent/project-interface.go b/lib/agent/project-interface.go new file mode 100644 index 0000000..0a4a17e --- /dev/null +++ b/lib/agent/project-interface.go @@ -0,0 +1,43 @@ +package agent + +// ProjectType definition +type ProjectType string + +const ( + TypePathMap = "PathMap" + TypeCloudSync = "CloudSync" + TypeCifsSmb = "CIFS" +) + +// Project Status definition +const ( + StatusErrorConfig = "ErrorConfig" + StatusDisable = "Disable" + StatusEnable = "Enable" + StatusPause = "Pause" + StatusSyncing = "Syncing" +) + +// IPROJECT Project interface +type IPROJECT interface { + Add(cfg ProjectConfig) (*ProjectConfig, error) // Add a new project + Delete() error // Delete a project + GetProject() *ProjectConfig // Get project public configuration + UpdateProject(prj ProjectConfig) (*ProjectConfig, error) // Update project configuration + GetServer() *XdsServer // Get XdsServer that holds this project + Sync() error // Force project files synchronization + IsInSync() (bool, error) // Check if project files are in-sync +} + +// ProjectConfig is the config for one project +type ProjectConfig struct { + ID string `json:"id"` + ServerID string `json:"serverId"` + Label string `json:"label"` + ClientPath string `json:"clientPath"` + ServerPath string `json:"serverPath"` + Type ProjectType `json:"type"` + Status string `json:"status"` + IsInSync bool `json:"isInSync"` + DefaultSdk string `json:"defaultSdk"` +} diff --git a/lib/agent/project-pathmap.go b/lib/agent/project-pathmap.go new file mode 100644 index 0000000..aacbd1f --- /dev/null +++ b/lib/agent/project-pathmap.go @@ -0,0 +1,123 @@ +package agent + +import ( + "fmt" + "io/ioutil" + "os" + "strings" + + common "github.com/iotbzh/xds-common/golib" +) + +// IPROJECT interface implementation for native/path mapping projects + +// PathMap . +type PathMap struct { + *Context + server *XdsServer + folder *XdsFolderConfig +} + +// NewProjectPathMap Create a new instance of PathMap +func NewProjectPathMap(ctx *Context, svr *XdsServer) *PathMap { + p := PathMap{ + Context: ctx, + server: svr, + folder: &XdsFolderConfig{}, + } + return &p +} + +// Add a new project +func (p *PathMap) Add(cfg ProjectConfig) (*ProjectConfig, error) { + var err error + var file *os.File + errMsg := "ClientPath sanity check error (%d): %v" + + // Sanity check to verify that we have RW permission and path-mapping is correct + dir := cfg.ClientPath + if !common.Exists(dir) { + // try to create if not existing + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, fmt.Errorf("Cannot create ClientPath directory: %s", dir) + } + } + if !common.Exists(dir) { + return nil, fmt.Errorf("ClientPath directory is not accessible: %s", dir) + } + if file, err = ioutil.TempFile(dir, ".xds_pathmap_check"); err != nil { + return nil, fmt.Errorf(errMsg, 1, err) + } + // Write a specific message that will be check by server during folder add + msg := "Pathmap checked message written by xds-agent ID: " + p.Config.AgentUID + "\n" + if n, err := file.WriteString(msg); n != len(msg) || err != nil { + return nil, fmt.Errorf(errMsg, 2, err) + } + defer func() { + if file != nil { + os.Remove(file.Name()) + file.Close() + } + }() + + // Convert to Xds folder + fld := p.server.ProjectToFolder(cfg) + fld.DataPathMap.CheckFile = file.Name() + fld.DataPathMap.CheckContent = msg + + // Send request to create folder on XDS server side + err = p.server.FolderAdd(fld, p.folder) + if err != nil { + return nil, fmt.Errorf("Folders mapping verification failure.\n%v", err) + } + + // 2nd part of sanity checker + // check specific message added by XDS Server during folder add processing + content, err := ioutil.ReadFile(file.Name()) + if err != nil { + return nil, fmt.Errorf(errMsg, 3, err) + } + if !strings.Contains(string(content), + "Pathmap checked message written by xds-server ID") { + return nil, fmt.Errorf(errMsg, 4, "file content differ") + } + + return p.GetProject(), nil +} + +// Delete a project +func (p *PathMap) Delete() error { + return p.server.FolderDelete(p.folder.ID) +} + +// GetProject Get public part of project config +func (p *PathMap) GetProject() *ProjectConfig { + prj := p.server.FolderToProject(*p.folder) + prj.ServerID = p.server.ID + return &prj +} + +// UpdateProject Set project config +func (p *PathMap) UpdateProject(prj ProjectConfig) (*ProjectConfig, error) { + p.folder = p.server.ProjectToFolder(prj) + np := p.GetProject() + if err := p.events.Emit(EVTProjectChange, np); err != nil { + return np, err + } + return np, nil +} + +// GetServer Get the XdsServer that holds this project +func (p *PathMap) GetServer() *XdsServer { + return p.server +} + +// Sync Force project files synchronization +func (p *PathMap) Sync() error { + return nil +} + +// IsInSync Check if project files are in-sync +func (p *PathMap) IsInSync() (bool, error) { + return true, nil +} diff --git a/lib/agent/project-st.go b/lib/agent/project-st.go new file mode 100644 index 0000000..c0d2550 --- /dev/null +++ b/lib/agent/project-st.go @@ -0,0 +1,186 @@ +package agent + +import ( + st "github.com/iotbzh/xds-agent/lib/syncthing" +) + +// IPROJECT interface implementation for syncthing projects + +// STProject . +type STProject struct { + *Context + server *XdsServer + folder *XdsFolderConfig + eventIDs []int +} + +// NewProjectST Create a new instance of STProject +func NewProjectST(ctx *Context, svr *XdsServer) *STProject { + p := STProject{ + Context: ctx, + server: svr, + folder: &XdsFolderConfig{}, + } + return &p +} + +// Add a new project +func (p *STProject) Add(cfg ProjectConfig) (*ProjectConfig, error) { + var err error + + // Add project/folder into XDS Server + err = p.server.FolderAdd(p.server.ProjectToFolder(cfg), p.folder) + if err != nil { + return nil, err + } + svrPrj := p.GetProject() + + // Declare project into local Syncthing + id, err := p.SThg.FolderChange(st.FolderChangeArg{ + ID: svrPrj.ID, + Label: svrPrj.Label, + RelativePath: cfg.ClientPath, + SyncThingID: p.server.ServerConfig.Builder.SyncThingID, + }) + if err != nil { + return nil, err + } + + locPrj, err := p.SThg.FolderConfigGet(id) + if err != nil { + svrPrj.Status = StatusErrorConfig + return nil, err + } + if svrPrj.ID != locPrj.ID { + p.Log.Errorf("Project ID in XDSServer and local ST differ: %s != %s", svrPrj.ID, locPrj.ID) + } + + // Use Update function to setup remains fields + return p.UpdateProject(*svrPrj) +} + +// Delete a project +func (p *STProject) Delete() error { + errSvr := p.server.FolderDelete(p.folder.ID) + errLoc := p.SThg.FolderDelete(p.folder.ID) + if errSvr != nil { + return errSvr + } + return errLoc +} + +// GetProject Get public part of project config +func (p *STProject) GetProject() *ProjectConfig { + prj := p.server.FolderToProject(*p.folder) + prj.ServerID = p.server.ID + return &prj +} + +// UpdateProject Update project config +func (p *STProject) UpdateProject(prj ProjectConfig) (*ProjectConfig, error) { + // Update folder + p.folder = p.server.ProjectToFolder(prj) + svrPrj := p.GetProject() + + // Register events to update folder status + // Register to XDS Server events + p.server.EventOn("event:FolderStateChanged", p._cbServerFolderChanged) + if err := p.server.EventRegister("FolderStateChanged", svrPrj.ID); err != nil { + p.Log.Warningf("XDS Server EventRegister failed: %v", err) + return svrPrj, err + } + + // Register to Local Syncthing events + for _, evName := range []string{st.EventStateChanged, st.EventFolderPaused} { + evID, err := p.SThg.Events.Register(evName, p._cbLocalSTEvents, svrPrj.ID, nil) + if err != nil { + return nil, err + } + p.eventIDs = append(p.eventIDs, evID) + } + + return svrPrj, nil +} + +// GetServer Get the XdsServer that holds this project +func (p *STProject) GetServer() *XdsServer { + return p.server +} + +// Sync Force project files synchronization +func (p *STProject) Sync() error { + if err := p.server.FolderSync(p.folder.ID); err != nil { + return err + } + return p.SThg.FolderScan(p.folder.ID, "") +} + +// IsInSync Check if project files are in-sync +func (p *STProject) IsInSync() (bool, error) { + // Should be up-to-date by callbacks (see below) + return p.folder.IsInSync, nil +} + +/** +** Private functions +***/ + +// callback use to update (XDS Server) folder IsInSync status + +func (p *STProject) _cbServerFolderChanged(data interface{}) { + evt := data.(XdsEventFolderChange) + + // Only process event that concerns this project/folder ID + if p.folder.ID != evt.Folder.ID { + return + } + + if evt.Folder.IsInSync != p.folder.DataCloudSync.STSvrIsInSync || + evt.Folder.Status != p.folder.DataCloudSync.STSvrStatus { + + p.folder.DataCloudSync.STSvrIsInSync = evt.Folder.IsInSync + p.folder.DataCloudSync.STSvrStatus = evt.Folder.Status + + if err := p.events.Emit(EVTProjectChange, p.server.FolderToProject(*p.folder)); err != nil { + p.Log.Warningf("Cannot notify project change: %v", err) + } + } +} + +// callback use to update IsInSync status +func (p *STProject) _cbLocalSTEvents(ev st.Event, data *st.EventsCBData) { + + inSync := p.folder.DataCloudSync.STLocIsInSync + sts := p.folder.DataCloudSync.STLocStatus + prevSync := inSync + prevStatus := sts + + switch ev.Type { + + case st.EventStateChanged: + to := ev.Data["to"] + switch to { + case "scanning", "syncing": + sts = StatusSyncing + case "idle": + sts = StatusEnable + } + inSync = (to == "idle") + + case st.EventFolderPaused: + if sts == StatusEnable { + sts = StatusPause + } + inSync = false + } + + if prevSync != inSync || prevStatus != sts { + + p.folder.DataCloudSync.STLocIsInSync = inSync + p.folder.DataCloudSync.STLocStatus = sts + + if err := p.events.Emit(EVTProjectChange, p.server.FolderToProject(*p.folder)); err != nil { + p.Log.Warningf("Cannot notify project change: %v", err) + } + } +} diff --git a/lib/agent/projects.go b/lib/agent/projects.go new file mode 100644 index 0000000..5e20395 --- /dev/null +++ b/lib/agent/projects.go @@ -0,0 +1,241 @@ +package agent + +import ( + "fmt" + "log" + "time" + + "github.com/iotbzh/xds-agent/lib/syncthing" + "github.com/syncthing/syncthing/lib/sync" +) + +// Projects Represent a an XDS Projects +type Projects struct { + *Context + SThg *st.SyncThing + projects map[string]*IPROJECT +} + +// Mutex to make add/delete atomic +var pjMutex = sync.NewMutex() + +// NewProjects Create a new instance of Project Model +func NewProjects(ctx *Context, st *st.SyncThing) *Projects { + return &Projects{ + Context: ctx, + SThg: st, + projects: make(map[string]*IPROJECT), + } +} + +// Init Load Projects configuration +func (p *Projects) Init(server *XdsServer) error { + svrList := make(map[string]*XdsServer) + // If server not set, load for all servers + if server == nil { + svrList = p.xdsServers + } else { + svrList[server.ID] = server + } + errMsg := "" + for _, svr := range svrList { + if svr.Disabled { + continue + } + xFlds := []XdsFolderConfig{} + if err := svr.GetFolders(&xFlds); err != nil { + errMsg += fmt.Sprintf("Cannot retrieve folders config of XDS server ID %s : %v \n", svr.ID, err.Error()) + continue + } + p.Log.Debugf("Connected to XDS Server %s, %d projects detected", svr.ID, len(xFlds)) + for _, prj := range xFlds { + newP := svr.FolderToProject(prj) + if _, err := p.createUpdate(newP, false, true); err != nil { + errMsg += "Error while creating project id " + prj.ID + ": " + err.Error() + "\n " + continue + } + } + } + + p.Log.Infof("Number of loaded Projects: %d", len(p.projects)) + + if errMsg != "" { + return fmt.Errorf(errMsg) + } + return nil +} + +// Get returns the folder config or nil if not existing +func (p *Projects) Get(id string) *IPROJECT { + if id == "" { + return nil + } + fc, exist := p.projects[id] + if !exist { + return nil + } + return fc +} + +// GetProjectArr returns the config of all folders as an array +func (p *Projects) GetProjectArr() []ProjectConfig { + pjMutex.Lock() + defer pjMutex.Unlock() + + return p.GetProjectArrUnsafe() +} + +// GetProjectArrUnsafe Same as GetProjectArr without mutex protection +func (p *Projects) GetProjectArrUnsafe() []ProjectConfig { + conf := []ProjectConfig{} + for _, v := range p.projects { + prj := (*v).GetProject() + conf = append(conf, *prj) + } + return conf +} + +// Add adds a new folder +func (p *Projects) Add(newF ProjectConfig) (*ProjectConfig, error) { + prj, err := p.createUpdate(newF, true, false) + if err != nil { + return prj, err + } + + // Notify client with event + if err := p.events.Emit(EVTProjectAdd, *prj); err != nil { + p.Log.Warningf("Cannot notify project deletion: %v", err) + } + + return prj, err +} + +// CreateUpdate creates or update a folder +func (p *Projects) createUpdate(newF ProjectConfig, create bool, initial bool) (*ProjectConfig, error) { + var err error + + pjMutex.Lock() + defer pjMutex.Unlock() + + // Sanity check + if _, exist := p.projects[newF.ID]; create && exist { + return nil, fmt.Errorf("ID already exists") + } + if newF.ClientPath == "" { + return nil, fmt.Errorf("ClientPath must be set") + } + if newF.ServerID == "" { + return nil, fmt.Errorf("Server ID must be set") + } + var svr *XdsServer + var exist bool + if svr, exist = p.xdsServers[newF.ServerID]; !exist { + return nil, fmt.Errorf("Unknown Server ID %s", newF.ServerID) + } + + // Check type supported + b, exist := svr.ServerConfig.SupportedSharing[string(newF.Type)] + if !exist || !b { + return nil, fmt.Errorf("Server doesn't support project type %s", newF.Type) + } + + // Create a new folder object + var fld IPROJECT + switch newF.Type { + // SYNCTHING + case TypeCloudSync: + if p.SThg != nil { + fld = NewProjectST(p.Context, svr) + } else { + return nil, fmt.Errorf("Cloud Sync project not supported") + } + + // PATH MAP + case TypePathMap: + fld = NewProjectPathMap(p.Context, svr) + default: + return nil, fmt.Errorf("Unsupported folder type") + } + + var newPrj *ProjectConfig + if create { + // Add project on server + if newPrj, err = fld.Add(newF); err != nil { + newF.Status = StatusErrorConfig + log.Printf("ERROR Adding project: %v\n", err) + return newPrj, err + } + } else { + // Just update project config + if newPrj, err = fld.UpdateProject(newF); err != nil { + newF.Status = StatusErrorConfig + log.Printf("ERROR Updating project: %v\n", err) + return newPrj, err + } + } + + // Sanity check + if newPrj.ID == "" { + log.Printf("ERROR project ID empty: %v", newF) + return newPrj, fmt.Errorf("Project ID empty") + } + + // Add to folders list + p.projects[newPrj.ID] = &fld + + // Force sync after creation + // (need to defer to be sure that WS events will arrive after HTTP creation reply) + go func() { + time.Sleep(time.Millisecond * 500) + fld.Sync() + }() + + return newPrj, nil +} + +// Delete deletes a specific folder +func (p *Projects) Delete(id string) (ProjectConfig, error) { + var err error + + pjMutex.Lock() + defer pjMutex.Unlock() + + fld := ProjectConfig{} + fc, exist := p.projects[id] + if !exist { + return fld, fmt.Errorf("unknown id") + } + + prj := (*fc).GetProject() + + if err = (*fc).Delete(); err != nil { + return *prj, err + } + + delete(p.projects, id) + + // Notify client with event + if err := p.events.Emit(EVTProjectDelete, *prj); err != nil { + p.Log.Warningf("Cannot notify project deletion: %v", err) + } + + return *prj, err +} + +// ForceSync Force the synchronization of a folder +func (p *Projects) ForceSync(id string) error { + fc := p.Get(id) + if fc == nil { + return fmt.Errorf("Unknown id") + } + return (*fc).Sync() +} + +// IsProjectInSync Returns true when folder is in sync +func (p *Projects) IsProjectInSync(id string) (bool, error) { + fc := p.Get(id) + if fc == nil { + return false, fmt.Errorf("Unknown id") + } + return (*fc).IsInSync() +} diff --git a/lib/agent/session.go b/lib/agent/session.go new file mode 100644 index 0000000..06789d5 --- /dev/null +++ b/lib/agent/session.go @@ -0,0 +1,224 @@ +package agent + +import ( + "encoding/base64" + "strconv" + "time" + + "github.com/gin-gonic/gin" + "github.com/googollee/go-socket.io" + uuid "github.com/satori/go.uuid" + "github.com/syncthing/syncthing/lib/sync" +) + +const sessionCookieName = "xds-agent-sid" +const sessionHeaderName = "XDS-AGENT-SID" + +const sessionMonitorTime = 10 // Time (in seconds) to schedule monitoring session tasks + +const initSessionMaxAge = 10 // Initial session max age in seconds +const maxSessions = 100000 // Maximum number of sessions in sessMap map + +const secureCookie = false // TODO: see https://github.com/astaxie/beego/blob/master/session/session.go#L218 + +// ClientSession contains the info of a user/client session +type ClientSession struct { + ID string + WSID string // only one WebSocket per client/session + MaxAge int64 + IOSocket *socketio.Socket + + // private + expireAt time.Time + useCount int64 +} + +// Sessions holds client sessions +type Sessions struct { + *Context + cookieMaxAge int64 + sessMap map[string]ClientSession + mutex sync.Mutex + stop chan struct{} // signals intentional stop +} + +// NewClientSessions . +func NewClientSessions(ctx *Context, cookieMaxAge string) *Sessions { + ckMaxAge, err := strconv.ParseInt(cookieMaxAge, 10, 0) + if err != nil { + ckMaxAge = 0 + } + s := Sessions{ + Context: ctx, + cookieMaxAge: ckMaxAge, + sessMap: make(map[string]ClientSession), + mutex: sync.NewMutex(), + stop: make(chan struct{}), + } + s.webServer.router.Use(s.Middleware()) + + // Start monitoring of sessions Map (use to manage expiration and cleanup) + go s.monitorSessMap() + + return &s +} + +// Stop sessions management +func (s *Sessions) Stop() { + close(s.stop) +} + +// Middleware is used to managed session +func (s *Sessions) Middleware() gin.HandlerFunc { + return func(c *gin.Context) { + // FIXME Add CSRF management + + // Get session + sess := s.Get(c) + if sess == nil { + // Allocate a new session key and put in cookie + sess = s.newSession("") + } else { + s.refresh(sess.ID) + } + + // Set session in cookie and in header + // Do not set Domain to localhost (http://stackoverflow.com/questions/1134290/cookies-on-localhost-with-explicit-domain) + c.SetCookie(sessionCookieName, sess.ID, int(sess.MaxAge), "/", "", + secureCookie, false) + c.Header(sessionHeaderName, sess.ID) + + // Save session id in gin metadata + c.Set(sessionCookieName, sess.ID) + + c.Next() + } +} + +// Get returns the client session for a specific ID +func (s *Sessions) Get(c *gin.Context) *ClientSession { + var sid string + + // First get from gin metadata + v, exist := c.Get(sessionCookieName) + if v != nil { + sid = v.(string) + } + + // Then look in cookie + if !exist || sid == "" { + sid, _ = c.Cookie(sessionCookieName) + } + + // Then look in Header + if sid == "" { + sid = c.Request.Header.Get(sessionCookieName) + } + if sid != "" { + s.mutex.Lock() + defer s.mutex.Unlock() + if key, ok := s.sessMap[sid]; ok { + // TODO: return a copy ??? + return &key + } + } + return nil +} + +// IOSocketGet Get socketio definition from sid +func (s *Sessions) IOSocketGet(sid string) *socketio.Socket { + s.mutex.Lock() + defer s.mutex.Unlock() + sess, ok := s.sessMap[sid] + if ok { + return sess.IOSocket + } + return nil +} + +// UpdateIOSocket updates the IO Socket definition for of a session +func (s *Sessions) UpdateIOSocket(sid string, so *socketio.Socket) error { + s.mutex.Lock() + defer s.mutex.Unlock() + if _, ok := s.sessMap[sid]; ok { + sess := s.sessMap[sid] + if so == nil { + // Could be the case when socketio is closed/disconnected + sess.WSID = "" + } else { + sess.WSID = (*so).Id() + } + sess.IOSocket = so + s.sessMap[sid] = sess + } + return nil +} + +// nesSession Allocate a new client session +func (s *Sessions) newSession(prefix string) *ClientSession { + uuid := prefix + uuid.NewV4().String() + id := base64.URLEncoding.EncodeToString([]byte(uuid)) + se := ClientSession{ + ID: id, + WSID: "", + MaxAge: initSessionMaxAge, + IOSocket: nil, + expireAt: time.Now().Add(time.Duration(initSessionMaxAge) * time.Second), + useCount: 0, + } + s.mutex.Lock() + defer s.mutex.Unlock() + + s.sessMap[se.ID] = se + + s.Log.Debugf("NEW session (%d): %s", len(s.sessMap), id) + return &se +} + +// refresh Move this session ID to the head of the list +func (s *Sessions) refresh(sid string) { + s.mutex.Lock() + defer s.mutex.Unlock() + + sess := s.sessMap[sid] + sess.useCount++ + if sess.MaxAge < s.cookieMaxAge && sess.useCount > 1 { + sess.MaxAge = s.cookieMaxAge + sess.expireAt = time.Now().Add(time.Duration(sess.MaxAge) * time.Second) + } + + // TODO - Add flood detection (like limit_req of nginx) + // (delayed request when to much requests in a short period of time) + + s.sessMap[sid] = sess +} + +func (s *Sessions) monitorSessMap() { + for { + select { + case <-s.stop: + s.Log.Debugln("Stop monitorSessMap") + return + case <-time.After(sessionMonitorTime * time.Second): + if s.LogLevelSilly { + s.Log.Debugf("Sessions Map size: %d", len(s.sessMap)) + s.Log.Debugf("Sessions Map : %v", s.sessMap) + } + + if len(s.sessMap) > maxSessions { + s.Log.Errorln("TOO MUCH sessions, cleanup old ones !") + } + + s.mutex.Lock() + for _, ss := range s.sessMap { + if ss.expireAt.Sub(time.Now()) < 0 { + if s.LogLevelSilly { + s.Log.Debugf("Delete expired session id: %s", ss.ID) + } + delete(s.sessMap, ss.ID) + } + } + s.mutex.Unlock() + } + } +} diff --git a/lib/agent/webserver.go b/lib/agent/webserver.go new file mode 100644 index 0000000..4b2e024 --- /dev/null +++ b/lib/agent/webserver.go @@ -0,0 +1,244 @@ +package agent + +import ( + "fmt" + "log" + "net/http" + "os" + "path" + + "github.com/Sirupsen/logrus" + "github.com/gin-contrib/static" + "github.com/gin-gonic/gin" + "github.com/googollee/go-socket.io" +) + +// WebServer . +type WebServer struct { + *Context + router *gin.Engine + api *APIService + sIOServer *socketio.Server + webApp *gin.RouterGroup + stop chan struct{} // signals intentional stop +} + +const indexFilename = "index.html" + +// NewWebServer creates an instance of WebServer +func NewWebServer(ctx *Context) *WebServer { + + // Setup logging for gin router + if ctx.Log.Level == logrus.DebugLevel { + gin.SetMode(gin.DebugMode) + } else { + gin.SetMode(gin.ReleaseMode) + } + + // Redirect gin logs into another logger (LogVerboseOut may be stderr or a file) + gin.DefaultWriter = ctx.Config.LogVerboseOut + gin.DefaultErrorWriter = ctx.Config.LogVerboseOut + log.SetOutput(ctx.Config.LogVerboseOut) + + // Creates gin router + r := gin.New() + + svr := &WebServer{ + Context: ctx, + router: r, + api: nil, + sIOServer: nil, + webApp: nil, + stop: make(chan struct{}), + } + + return svr +} + +// Serve starts a new instance of the Web Server +func (s *WebServer) Serve() error { + var err error + + // Setup middlewares + s.router.Use(gin.Logger()) + s.router.Use(gin.Recovery()) + s.router.Use(s.middlewareCORS()) + s.router.Use(s.middlewareXDSDetails()) + s.router.Use(s.middlewareCSRF()) + + // Create REST API + s.api = NewAPIV1(s.Context) + + // Create connections to XDS Servers + // XXX - not sure there is no side effect to do it in background ! + go func() { + for _, svrCfg := range s.Config.FileConf.ServersConf { + if svr, err := s.api.AddXdsServer(svrCfg); err != nil { + // Just log error, don't consider as critical + s.Log.Infof("Cannot connect to XDS Server url=%s: %v", svr.BaseURL, err.Error()) + } + } + }() + + // Websocket routes + s.sIOServer, err = socketio.NewServer(nil) + if err != nil { + s.Log.Fatalln(err) + } + + s.router.GET("/socket.io/", s.socketHandler) + s.router.POST("/socket.io/", s.socketHandler) + /* TODO: do we want to support ws://... ? + s.router.Handle("WS", "/socket.io/", s.socketHandler) + s.router.Handle("WSS", "/socket.io/", s.socketHandler) + */ + + // Web Application (serve on / ) + idxFile := path.Join(s.Config.FileConf.WebAppDir, indexFilename) + if _, err := os.Stat(idxFile); err != nil { + s.Log.Fatalln("Web app directory not found, check/use webAppDir setting in config file: ", idxFile) + } + s.Log.Infof("Serve WEB app dir: %s", s.Config.FileConf.WebAppDir) + s.router.Use(static.Serve("/", static.LocalFile(s.Config.FileConf.WebAppDir, true))) + s.webApp = s.router.Group("/", s.serveIndexFile) + { + s.webApp.GET("/") + } + + // Serve in the background + serveError := make(chan error, 1) + go func() { + fmt.Printf("Web Server running on localhost:%s ...\n", s.Config.FileConf.HTTPPort) + serveError <- http.ListenAndServe(":"+s.Config.FileConf.HTTPPort, s.router) + }() + + fmt.Printf("XDS agent running...\n") + + // Wait for stop, restart or error signals + select { + case <-s.stop: + // Shutting down permanently + s.sessions.Stop() + s.Log.Infoln("shutting down (stop)") + case err = <-serveError: + // Error due to listen/serve failure + s.Log.Errorln(err) + } + + return nil +} + +// Stop web server +func (s *WebServer) Stop() { + s.api.Stop() + close(s.stop) +} + +// serveIndexFile provides initial file (eg. index.html) of webapp +func (s *WebServer) serveIndexFile(c *gin.Context) { + c.HTML(200, indexFilename, gin.H{}) +} + +// Add details in Header +func (s *WebServer) middlewareXDSDetails() gin.HandlerFunc { + return func(c *gin.Context) { + c.Header("XDS-Agent-Version", s.Config.Version) + c.Header("XDS-API-Version", s.Config.APIVersion) + c.Next() + } +} + +func (s *WebServer) isValidAPIKey(key string) bool { + return (s.Config.FileConf.XDSAPIKey != "" && key == s.Config.FileConf.XDSAPIKey) +} + +func (s *WebServer) middlewareCSRF() gin.HandlerFunc { + return func(c *gin.Context) { + // XXX - not used for now + c.Next() + return + /* + // Allow requests carrying a valid API key + if s.isValidAPIKey(c.Request.Header.Get("X-API-Key")) { + // Set the access-control-allow-origin header for CORS requests + // since a valid API key has been provided + c.Header("Access-Control-Allow-Origin", "*") + c.Next() + return + } + + // Allow io.socket request + if strings.HasPrefix(c.Request.URL.Path, "/socket.io") { + c.Next() + return + } + + // FIXME Add really CSRF support + + // Allow requests for anything not under the protected path prefix, + // and set a CSRF cookie if there isn't already a valid one. + //if !strings.HasPrefix(c.Request.URL.Path, prefix) { + // cookie, err := c.Cookie("CSRF-Token-" + unique) + // if err != nil || !validCsrfToken(cookie.Value) { + // s.Log.Debugln("new CSRF cookie in response to request for", c.Request.URL) + // c.SetCookie("CSRF-Token-"+unique, newCsrfToken(), 600, "/", "", false, false) + // } + // c.Next() + // return + //} + + // Verify the CSRF token + //token := c.Request.Header.Get("X-CSRF-Token-" + unique) + //if !validCsrfToken(token) { + // c.AbortWithError(403, "CSRF Error") + // return + //} + + //c.Next() + + c.AbortWithError(403, fmt.Errorf("Not valid API key")) + */ + } +} + +// CORS middleware +func (s *WebServer) middlewareCORS() gin.HandlerFunc { + return func(c *gin.Context) { + if c.Request.Method == "OPTIONS" { + c.Header("Access-Control-Allow-Origin", "*") + c.Header("Access-Control-Allow-Headers", "Content-Type, X-API-Key") + c.Header("Access-Control-Allow-Methods", "GET, POST, DELETE") + c.Header("Access-Control-Max-Age", cookieMaxAge) + c.AbortWithStatus(204) + return + } + c.Next() + } +} + +// socketHandler is the handler for the "main" websocket connection +func (s *WebServer) socketHandler(c *gin.Context) { + + // Retrieve user session + sess := s.sessions.Get(c) + if sess == nil { + c.JSON(500, gin.H{"error": "Cannot retrieve session"}) + return + } + + s.sIOServer.On("connection", func(so socketio.Socket) { + s.Log.Debugf("WS Connected (SID=%v)", so.Id()) + s.sessions.UpdateIOSocket(sess.ID, &so) + + so.On("disconnection", func() { + s.Log.Debugf("WS disconnected (SID=%v)", so.Id()) + s.sessions.UpdateIOSocket(sess.ID, nil) + }) + }) + + s.sIOServer.On("error", func(so socketio.Socket, err error) { + s.Log.Errorf("WS SID=%v Error : %v", so.Id(), err.Error()) + }) + + s.sIOServer.ServeHTTP(c.Writer, c.Request) +} diff --git a/lib/agent/xdsserver.go b/lib/agent/xdsserver.go new file mode 100644 index 0000000..b76908c --- /dev/null +++ b/lib/agent/xdsserver.go @@ -0,0 +1,605 @@ +package agent + +import ( + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "strings" + "sync" + "time" + + "github.com/gin-gonic/gin" + "github.com/iotbzh/xds-agent/lib/xdsconfig" + common "github.com/iotbzh/xds-common/golib" + uuid "github.com/satori/go.uuid" + sio_client "github.com/sebd71/go-socket.io-client" +) + +// XdsServer . +type XdsServer struct { + *Context + ID string + BaseURL string + APIURL string + PartialURL string + ConnRetry int + Connected bool + Disabled bool + ServerConfig *XdsServerConfig + + // Events management + CBOnError func(error) + CBOnDisconnect func(error) + sockEvents map[string][]*caller + sockEventsLock *sync.Mutex + + // Private fields + client *common.HTTPClient + ioSock *sio_client.Client + logOut io.Writer + apiRouter *gin.RouterGroup +} + +// XdsServerConfig Data return by GET /config +type XdsServerConfig struct { + ID string `json:"id"` + Version string `json:"version"` + APIVersion string `json:"apiVersion"` + VersionGitTag string `json:"gitTag"` + SupportedSharing map[string]bool `json:"supportedSharing"` + Builder XdsBuilderConfig `json:"builder"` +} + +// XdsBuilderConfig represents the builder container configuration +type XdsBuilderConfig struct { + IP string `json:"ip"` + Port string `json:"port"` + SyncThingID string `json:"syncThingID"` +} + +// XdsFolderType XdsServer folder type +type XdsFolderType string + +const ( + XdsTypePathMap = "PathMap" + XdsTypeCloudSync = "CloudSync" + XdsTypeCifsSmb = "CIFS" +) + +// XdsFolderConfig XdsServer folder config +type XdsFolderConfig struct { + ID string `json:"id"` + Label string `json:"label"` + ClientPath string `json:"path"` + Type XdsFolderType `json:"type"` + Status string `json:"status"` + IsInSync bool `json:"isInSync"` + DefaultSdk string `json:"defaultSdk"` + // Specific data depending on which Type is used + DataPathMap XdsPathMapConfig `json:"dataPathMap,omitempty"` + DataCloudSync XdsCloudSyncConfig `json:"dataCloudSync,omitempty"` +} + +// XdsPathMapConfig Path mapping specific data +type XdsPathMapConfig struct { + ServerPath string `json:"serverPath"` + CheckFile string `json:"checkFile"` + CheckContent string `json:"checkContent"` +} + +// XdsCloudSyncConfig CloudSync (AKA Syncthing) specific data +type XdsCloudSyncConfig struct { + SyncThingID string `json:"syncThingID"` + STSvrStatus string `json:"-"` + STSvrIsInSync bool `json:"-"` + STLocStatus string `json:"-"` + STLocIsInSync bool `json:"-"` +} + +// XdsEventRegisterArgs arguments used to register to XDS server events +type XdsEventRegisterArgs struct { + Name string `json:"name"` + ProjectID string `json:"filterProjectID"` +} + +// XdsEventFolderChange Folder change event structure +type XdsEventFolderChange struct { + Time string `json:"time"` + Type string `json:"type"` + Folder XdsFolderConfig `json:"folder"` +} + +// caller Used to chain event listeners +type caller struct { + id uuid.UUID + EventName string + Func func(interface{}) +} + +const _IDTempoPrefix = "tempo-" + +// NewXdsServer creates an instance of XdsServer +func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer { + return &XdsServer{ + Context: ctx, + ID: _IDTempoPrefix + uuid.NewV1().String(), + BaseURL: conf.URL, + APIURL: conf.APIBaseURL + conf.APIPartialURL, + PartialURL: conf.APIPartialURL, + ConnRetry: conf.ConnRetry, + Connected: false, + Disabled: false, + + sockEvents: make(map[string][]*caller), + sockEventsLock: &sync.Mutex{}, + logOut: ctx.Log.Out, + } +} + +// Close Free and close XDS Server connection +func (xs *XdsServer) Close() error { + xs.Connected = false + xs.Disabled = true + xs.ioSock = nil + xs._NotifyState() + return nil +} + +// Connect Establish HTTP connection with XDS Server +func (xs *XdsServer) Connect() error { + var err error + var retry int + + xs.Disabled = false + xs.Connected = false + + err = nil + for retry = xs.ConnRetry; retry > 0; retry-- { + if err = xs._CreateConnectHTTP(); err == nil { + break + } + if retry == xs.ConnRetry { + // Notify only on the first conn error + // doing that avoid 2 notifs (conn false; conn true) on startup + xs._NotifyState() + } + xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry) + time.Sleep(time.Second) + } + if retry == 0 { + // FIXME: re-use _reconnect to wait longer in background + return fmt.Errorf("Connection to XDS Server failure") + } + if err != nil { + return err + } + + // Check HTTP connection and establish WS connection + err = xs._connect(false) + + return err +} + +// IsTempoID returns true when server as a temporary id +func (xs *XdsServer) IsTempoID() bool { + return strings.HasPrefix(xs.ID, _IDTempoPrefix) +} + +// SetLoggerOutput Set logger ou +func (xs *XdsServer) SetLoggerOutput(out io.Writer) { + xs.logOut = out +} + +// SendCommand Send a command to XDS Server +func (xs *XdsServer) SendCommand(cmd string, body []byte) (*http.Response, error) { + url := cmd + if !strings.HasPrefix("/", cmd) { + url = "/" + cmd + } + return xs.client.HTTPPostWithRes(url, string(body)) +} + +// GetVersion Send Get request to retrieve XDS Server version +func (xs *XdsServer) GetVersion(res interface{}) error { + return xs._HTTPGet("/version", &res) +} + +// GetFolders Send GET request to get current folder configuration +func (xs *XdsServer) GetFolders(folders *[]XdsFolderConfig) error { + return xs._HTTPGet("/folders", folders) +} + +// FolderAdd Send POST request to add a folder +func (xs *XdsServer) FolderAdd(fld *XdsFolderConfig, res interface{}) error { + response, err := xs._HTTPPost("/folder", fld) + if err != nil { + return err + } + if response.StatusCode != 200 { + return fmt.Errorf("FolderAdd error status=%s", response.Status) + } + // Result is a XdsFolderConfig that is equivalent to ProjectConfig + err = json.Unmarshal(xs.client.ResponseToBArray(response), res) + + return err +} + +// FolderDelete Send DELETE request to delete a folder +func (xs *XdsServer) FolderDelete(id string) error { + return xs.client.HTTPDelete("/folder/" + id) +} + +// FolderSync Send POST request to force synchronization of a folder +func (xs *XdsServer) FolderSync(id string) error { + return xs.client.HTTPPost("/folder/sync/"+id, "") +} + +// SetAPIRouterGroup . +func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) { + xs.apiRouter = r +} + +// PassthroughGet Used to declare a route that sends directly a GET request to XDS Server +func (xs *XdsServer) PassthroughGet(url string) { + if xs.apiRouter == nil { + xs.Log.Errorf("apiRouter not set !") + return + } + + xs.apiRouter.GET(url, func(c *gin.Context) { + var data interface{} + if err := xs._HTTPGet(url, &data); err != nil { + if strings.Contains(err.Error(), "connection refused") { + xs.Connected = false + xs._NotifyState() + } + common.APIError(c, err.Error()) + return + } + + c.JSON(http.StatusOK, data) + }) +} + +// PassthroughPost Used to declare a route that sends directly a POST request to XDS Server +func (xs *XdsServer) PassthroughPost(url string) { + if xs.apiRouter == nil { + xs.Log.Errorf("apiRouter not set !") + return + } + + xs.apiRouter.POST(url, func(c *gin.Context) { + bodyReq := []byte{} + n, err := c.Request.Body.Read(bodyReq) + if err != nil { + common.APIError(c, err.Error()) + return + } + + response, err := xs._HTTPPost(url, bodyReq[:n]) + if err != nil { + common.APIError(c, err.Error()) + return + } + bodyRes, err := ioutil.ReadAll(response.Body) + if err != nil { + common.APIError(c, "Cannot read response body") + return + } + c.JSON(http.StatusOK, string(bodyRes)) + }) +} + +// EventRegister Post a request to register to an XdsServer event +func (xs *XdsServer) EventRegister(evName string, id string) error { + var err error + _, err = xs._HTTPPost("/events/register", XdsEventRegisterArgs{ + Name: evName, + ProjectID: id, + }) + return err +} + +// EventOn Register a callback on events reception +func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, error) { + if xs.ioSock == nil { + return uuid.Nil, fmt.Errorf("Io.Socket not initialized") + } + + xs.sockEventsLock.Lock() + defer xs.sockEventsLock.Unlock() + + if _, exist := xs.sockEvents[evName]; !exist { + // Register listener only the first time + evn := evName + + // FIXME: use generic type: data interface{} instead of data XdsEventFolderChange + var err error + if evName == "event:FolderStateChanged" { + err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error { + xs.sockEventsLock.Lock() + defer xs.sockEventsLock.Unlock() + for _, c := range xs.sockEvents[evn] { + c.Func(data) + } + return nil + }) + } else { + err = xs.ioSock.On(evn, f) + } + if err != nil { + return uuid.Nil, err + } + } + + c := &caller{ + id: uuid.NewV1(), + EventName: evName, + Func: f, + } + + xs.sockEvents[evName] = append(xs.sockEvents[evName], c) + return c.id, nil +} + +// EventOff Un-register a (or all) callbacks associated to an event +func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error { + xs.sockEventsLock.Lock() + defer xs.sockEventsLock.Unlock() + if _, exist := xs.sockEvents[evName]; exist { + if id == uuid.Nil { + // Un-register all + xs.sockEvents[evName] = []*caller{} + } else { + // Un-register only the specified callback + for i, ff := range xs.sockEvents[evName] { + if uuid.Equal(ff.id, id) { + xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...) + break + } + } + } + } + return nil +} + +// ProjectToFolder Convert Project structure to Folder structure +func (xs *XdsServer) ProjectToFolder(pPrj ProjectConfig) *XdsFolderConfig { + stID := "" + if pPrj.Type == XdsTypeCloudSync { + stID, _ = xs.SThg.IDGet() + } + fPrj := XdsFolderConfig{ + ID: pPrj.ID, + Label: pPrj.Label, + ClientPath: pPrj.ClientPath, + Type: XdsFolderType(pPrj.Type), + Status: pPrj.Status, + IsInSync: pPrj.IsInSync, + DefaultSdk: pPrj.DefaultSdk, + DataPathMap: XdsPathMapConfig{ + ServerPath: pPrj.ServerPath, + }, + DataCloudSync: XdsCloudSyncConfig{ + SyncThingID: stID, + STLocIsInSync: pPrj.IsInSync, + STLocStatus: pPrj.Status, + STSvrIsInSync: pPrj.IsInSync, + STSvrStatus: pPrj.Status, + }, + } + + return &fPrj +} + +// FolderToProject Convert Folder structure to Project structure +func (xs *XdsServer) FolderToProject(fPrj XdsFolderConfig) ProjectConfig { + inSync := fPrj.IsInSync + sts := fPrj.Status + + if fPrj.Type == XdsTypeCloudSync { + inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync + + sts = fPrj.DataCloudSync.STSvrStatus + switch fPrj.DataCloudSync.STLocStatus { + case StatusErrorConfig, StatusDisable, StatusPause: + sts = fPrj.DataCloudSync.STLocStatus + break + case StatusSyncing: + if sts != StatusErrorConfig && sts != StatusDisable && sts != StatusPause { + sts = StatusSyncing + } + break + case StatusEnable: + // keep STSvrStatus + break + } + } + + pPrj := ProjectConfig{ + ID: fPrj.ID, + ServerID: xs.ID, + Label: fPrj.Label, + ClientPath: fPrj.ClientPath, + ServerPath: fPrj.DataPathMap.ServerPath, + Type: ProjectType(fPrj.Type), + Status: sts, + IsInSync: inSync, + DefaultSdk: fPrj.DefaultSdk, + } + return pPrj +} + +/*** +** Private functions +***/ + +// Create HTTP client +func (xs *XdsServer) _CreateConnectHTTP() error { + var err error + xs.client, err = common.HTTPNewClient(xs.BaseURL, + common.HTTPClientConfig{ + URLPrefix: "/api/v1", + HeaderClientKeyName: "Xds-Sid", + CsrfDisable: true, + LogOut: xs.logOut, + LogPrefix: "XDSSERVER: ", + LogLevel: common.HTTPLogLevelWarning, + }) + + xs.client.SetLogLevel(xs.Log.Level.String()) + + if err != nil { + msg := ": " + err.Error() + if strings.Contains(err.Error(), "connection refused") { + msg = fmt.Sprintf("(url: %s)", xs.BaseURL) + } + return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg) + } + if xs.client == nil { + return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)") + } + + return nil +} + +// _HTTPGet . +func (xs *XdsServer) _HTTPGet(url string, data interface{}) error { + var dd []byte + if err := xs.client.HTTPGet(url, &dd); err != nil { + return err + } + return json.Unmarshal(dd, &data) +} + +// _HTTPPost . +func (xs *XdsServer) _HTTPPost(url string, data interface{}) (*http.Response, error) { + body, err := json.Marshal(data) + if err != nil { + return nil, err + } + return xs.client.HTTPPostWithRes(url, string(body)) +} + +// Re-established connection +func (xs *XdsServer) _reconnect() error { + err := xs._connect(true) + if err == nil { + // Reload projects list for this server + err = xs.projects.Init(xs) + } + return err +} + +// Established HTTP and WS connection and retrieve XDSServer config +func (xs *XdsServer) _connect(reConn bool) error { + + xdsCfg := XdsServerConfig{} + if err := xs._HTTPGet("/config", &xdsCfg); err != nil { + xs.Connected = false + if !reConn { + xs._NotifyState() + } + return err + } + + if reConn && xs.ID != xdsCfg.ID { + xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ID) + } + + // Update local XDS config + xs.ID = xdsCfg.ID + xs.ServerConfig = &xdsCfg + + // Establish WS connection and register listen + if err := xs._SocketConnect(); err != nil { + xs.Connected = false + xs._NotifyState() + return err + } + + xs.Connected = true + xs._NotifyState() + return nil +} + +// Create WebSocket (io.socket) connection +func (xs *XdsServer) _SocketConnect() error { + + xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL) + + opts := &sio_client.Options{ + Transport: "websocket", + Header: make(map[string][]string), + } + opts.Header["XDS-SID"] = []string{xs.client.GetClientID()} + + iosk, err := sio_client.NewClient(xs.BaseURL, opts) + if err != nil { + return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err) + } + xs.ioSock = iosk + + // Register some listeners + + iosk.On("error", func(err error) { + xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err) + if xs.CBOnError != nil { + xs.CBOnError(err) + } + }) + + iosk.On("disconnection", func(err error) { + xs.Log.Infof("IO.socket disconnection server %s", xs.ID) + if xs.CBOnDisconnect != nil { + xs.CBOnDisconnect(err) + } + xs.Connected = false + xs._NotifyState() + + // Try to reconnect during 15min (or at least while not disabled) + go func() { + count := 0 + waitTime := 1 + for !xs.Disabled && !xs.Connected { + count++ + if count%60 == 0 { + waitTime *= 5 + } + if waitTime > 15*60 { + xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID) + return + } + time.Sleep(time.Second * time.Duration(waitTime)) + xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count) + + xs._reconnect() + } + }() + }) + + // XXX - There is no connection event generated so, just consider that + // we are connected when NewClient return successfully + /* iosk.On("connection", func() { ... }) */ + xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID) + + return nil +} + +// Send event to notify changes +func (xs *XdsServer) _NotifyState() { + + evSts := ServerCfg{ + ID: xs.ID, + URL: xs.BaseURL, + APIURL: xs.APIURL, + PartialURL: xs.PartialURL, + ConnRetry: xs.ConnRetry, + Connected: xs.Connected, + } + if err := xs.events.Emit(EVTServerConfig, evSts); err != nil { + xs.Log.Warningf("Cannot notify XdsServer state change: %v", err) + } +} |