From 97ca1f277dc8b6973d6fa67add5593a9c395ce60 Mon Sep 17 00:00:00 2001 From: Sebastien Douheret Date: Mon, 25 Sep 2017 14:15:16 +0200 Subject: Added webapp Dashboard + logic to interact with server. Signed-off-by: Sebastien Douheret --- lib/agent/agent.go | 132 +++++++++++- lib/agent/apiv1-browse.go | 28 +++ lib/agent/apiv1-config.go | 108 ++++++++++ lib/agent/apiv1-events.go | 73 +++++++ lib/agent/apiv1-exec.go | 99 +++++++++ lib/agent/apiv1-projects.go | 72 +++++++ lib/agent/apiv1-version.go | 45 ++++ lib/agent/apiv1.go | 129 +++++++++++ lib/agent/events.go | 131 ++++++++++++ lib/agent/project-interface.go | 47 ++++ lib/agent/project-pathmap.go | 79 +++++++ lib/agent/project-st.go | 93 ++++++++ lib/agent/projects.go | 254 ++++++++++++++++++++++ lib/agent/session.go | 224 +++++++++++++++++++ lib/agent/webserver.go | 246 +++++++++++++++++++++ lib/agent/xdsserver.go | 472 +++++++++++++++++++++++++++++++++++++++++ lib/apiv1/apiv1.go | 36 ---- lib/apiv1/config.go | 45 ---- lib/apiv1/version.go | 24 --- lib/session/session.go | 227 -------------------- lib/syncthing/st.go | 95 +++++++-- lib/syncthing/stEvent.go | 265 +++++++++++++++++++++++ lib/syncthing/stfolder.go | 103 +++++++-- lib/webserver/server.go | 226 -------------------- lib/xdsconfig/config.go | 78 +++++-- lib/xdsconfig/configfile.go | 112 ++++++++++ lib/xdsconfig/fileconfig.go | 111 ---------- 27 files changed, 2837 insertions(+), 717 deletions(-) create mode 100644 lib/agent/apiv1-browse.go create mode 100644 lib/agent/apiv1-config.go create mode 100644 lib/agent/apiv1-events.go create mode 100644 lib/agent/apiv1-exec.go create mode 100644 lib/agent/apiv1-projects.go create mode 100644 lib/agent/apiv1-version.go create mode 100644 lib/agent/apiv1.go create mode 100644 lib/agent/events.go create mode 100644 lib/agent/project-interface.go create mode 100644 lib/agent/project-pathmap.go create mode 100644 lib/agent/project-st.go create mode 100644 lib/agent/projects.go create mode 100644 lib/agent/session.go create mode 100644 lib/agent/webserver.go create mode 100644 lib/agent/xdsserver.go delete mode 100644 lib/apiv1/apiv1.go delete mode 100644 lib/apiv1/config.go delete mode 100644 lib/apiv1/version.go delete mode 100644 lib/session/session.go create mode 100644 lib/syncthing/stEvent.go delete mode 100644 lib/webserver/server.go create mode 100644 lib/xdsconfig/configfile.go delete mode 100644 lib/xdsconfig/fileconfig.go (limited to 'lib') diff --git a/lib/agent/agent.go b/lib/agent/agent.go index 74872f7..29b0622 100644 --- a/lib/agent/agent.go +++ b/lib/agent/agent.go @@ -2,18 +2,22 @@ package agent import ( "fmt" + "log" "os" "os/exec" "os/signal" + "path/filepath" "syscall" + "time" "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" "github.com/iotbzh/xds-agent/lib/syncthing" "github.com/iotbzh/xds-agent/lib/xdsconfig" - "github.com/iotbzh/xds-agent/lib/webserver" ) +const cookieMaxAge = "3600" + // Context holds the Agent context structure type Context struct { ProgName string @@ -22,8 +26,14 @@ type Context struct { SThg *st.SyncThing SThgCmd *exec.Cmd SThgInotCmd *exec.Cmd - WWWServer *webserver.ServerService - Exit chan os.Signal + + webServer *WebServer + xdsServers map[string]*XdsServer + sessions *Sessions + events *Events + projects *Projects + + Exit chan os.Signal } // NewAgent Create a new instance of Agent @@ -48,6 +58,10 @@ func NewAgent(cliCtx *cli.Context) *Context { ProgName: cliCtx.App.Name, Log: log, Exit: make(chan os.Signal, 1), + + webServer: nil, + xdsServers: make(map[string]*XdsServer), + events: nil, } // register handler on SIGTERM / exit @@ -57,6 +71,114 @@ func NewAgent(cliCtx *cli.Context) *Context { return &ctx } +// Run Main function called to run agent +func (ctx *Context) Run() (int, error) { + var err error + + // Logs redirected into a file when logfile option or logsDir config is set + ctx.Config.LogVerboseOut = os.Stderr + if ctx.Config.FileConf.LogsDir != "" { + if ctx.Config.Options.LogFile != "stdout" { + logFile := ctx.Config.Options.LogFile + + fdL, err := os.OpenFile(logFile, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0666) + if err != nil { + msgErr := fmt.Errorf("Cannot create log file %s", logFile) + return int(syscall.EPERM), msgErr + } + ctx.Log.Out = fdL + + ctx._logPrint("Logging file: %s\n", logFile) + } + + logFileHTTPReq := filepath.Join(ctx.Config.FileConf.LogsDir, "xds-agent-verbose.log") + fdLH, err := os.OpenFile(logFileHTTPReq, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0666) + if err != nil { + msgErr := fmt.Errorf("Cannot create log file %s", logFileHTTPReq) + return int(syscall.EPERM), msgErr + } + ctx.Config.LogVerboseOut = fdLH + + ctx._logPrint("Logging file for HTTP requests: %s\n", logFileHTTPReq) + } + + // Create syncthing instance when section "syncthing" is present in config.json + if ctx.Config.FileConf.SThgConf != nil { + ctx.SThg = st.NewSyncThing(ctx.Config, ctx.Log) + } + + // Start local instance of Syncthing and Syncthing-notify + if ctx.SThg != nil { + ctx.Log.Infof("Starting Syncthing...") + ctx.SThgCmd, err = ctx.SThg.Start() + if err != nil { + return 2, err + } + fmt.Printf("Syncthing started (PID %d)\n", ctx.SThgCmd.Process.Pid) + + ctx.Log.Infof("Starting Syncthing-inotify...") + ctx.SThgInotCmd, err = ctx.SThg.StartInotify() + if err != nil { + return 2, err + } + fmt.Printf("Syncthing-inotify started (PID %d)\n", ctx.SThgInotCmd.Process.Pid) + + // Establish connection with local Syncthing (retry if connection fail) + time.Sleep(3 * time.Second) + maxRetry := 30 + retry := maxRetry + for retry > 0 { + if err := ctx.SThg.Connect(); err == nil { + break + } + ctx.Log.Infof("Establishing connection to Syncthing (retry %d/%d)", retry, maxRetry) + time.Sleep(time.Second) + retry-- + } + if err != nil || retry == 0 { + return 2, err + } + + // Retrieve Syncthing config + id, err := ctx.SThg.IDGet() + if err != nil { + return 2, err + } + ctx.Log.Infof("Local Syncthing ID: %s", id) + + } else { + ctx.Log.Infof("Cloud Sync / Syncthing not supported") + } + + // Create Web Server + ctx.webServer = NewWebServer(ctx) + + // Sessions manager + ctx.sessions = NewClientSessions(ctx, cookieMaxAge) + + // Create events management + ctx.events = NewEvents(ctx) + + // Create projects management + ctx.projects = NewProjects(ctx, ctx.SThg) + + // Run Web Server until exit requested (blocking call) + if err = ctx.webServer.Serve(); err != nil { + log.Println(err) + return 3, err + } + + return 4, fmt.Errorf("Program exited") +} + +// Helper function to log message on both stdout and logger +func (ctx *Context) _logPrint(format string, args ...interface{}) { + fmt.Printf(format, args...) + if ctx.Log.Out != os.Stdout { + ctx.Log.Infof(format, args...) + } +} + // Handle exit and properly stop/close all stuff func handlerSigTerm(ctx *Context) { <-ctx.Exit @@ -68,9 +190,9 @@ func handlerSigTerm(ctx *Context) { ctx.SThg.Stop() ctx.SThg.StopInotify() } - if ctx.WWWServer != nil { + if ctx.webServer != nil { ctx.Log.Infof("Stoping Web server...") - ctx.WWWServer.Stop() + ctx.webServer.Stop() } os.Exit(1) } diff --git a/lib/agent/apiv1-browse.go b/lib/agent/apiv1-browse.go new file mode 100644 index 0000000..1701a2e --- /dev/null +++ b/lib/agent/apiv1-browse.go @@ -0,0 +1,28 @@ +package agent + +import ( + "net/http" + + "github.com/gin-gonic/gin" +) + +type directory struct { + Name string `json:"name"` + Fullpath string `json:"fullpath"` +} + +type apiDirectory struct { + Dir []directory `json:"dir"` +} + +// browseFS used to browse local file system +func (s *APIService) browseFS(c *gin.Context) { + + response := apiDirectory{ + Dir: []directory{ + directory{Name: "TODO SEB"}, + }, + } + + c.JSON(http.StatusOK, response) +} diff --git a/lib/agent/apiv1-config.go b/lib/agent/apiv1-config.go new file mode 100644 index 0000000..31d8de6 --- /dev/null +++ b/lib/agent/apiv1-config.go @@ -0,0 +1,108 @@ +package agent + +import ( + "net/http" + "sync" + + "github.com/gin-gonic/gin" + "github.com/iotbzh/xds-agent/lib/xdsconfig" + common "github.com/iotbzh/xds-common/golib" +) + +var confMut sync.Mutex + +// APIConfig parameters (json format) of /config command +type APIConfig struct { + Servers []ServerCfg `json:"servers"` + + // Not exposed outside in JSON + Version string `json:"-"` + APIVersion string `json:"-"` + VersionGitTag string `json:"-"` +} + +// ServerCfg . +type ServerCfg struct { + ID string `json:"id"` + URL string `json:"url"` + APIURL string `json:"apiUrl"` + PartialURL string `json:"partialUrl"` + ConnRetry int `json:"connRetry"` + Connected bool `json:"connected"` + Disabled bool `json:"disabled"` +} + +// GetConfig returns the configuration +func (s *APIService) getConfig(c *gin.Context) { + confMut.Lock() + defer confMut.Unlock() + + cfg := s._getConfig() + + c.JSON(http.StatusOK, cfg) +} + +// SetConfig sets configuration +func (s *APIService) setConfig(c *gin.Context) { + var cfgArg APIConfig + if c.BindJSON(&cfgArg) != nil { + common.APIError(c, "Invalid arguments") + return + } + + confMut.Lock() + defer confMut.Unlock() + + s.Log.Debugln("SET config: ", cfgArg) + + // First delete/disable XDS Server that are no longer listed + for _, svr := range s.xdsServers { + found := false + for _, svrArg := range cfgArg.Servers { + if svr.ID == svrArg.ID { + found = true + break + } + } + if !found { + s.DelXdsServer(svr.ID) + } + } + + // Add new XDS Server + for _, svr := range cfgArg.Servers { + cfg := xdsconfig.XDSServerConf{ + ID: svr.ID, + URL: svr.URL, + ConnRetry: svr.ConnRetry, + } + if _, err := s.AddXdsServer(cfg); err != nil { + common.APIError(c, err.Error()) + return + } + } + + c.JSON(http.StatusOK, s._getConfig()) +} + +func (s *APIService) _getConfig() APIConfig { + cfg := APIConfig{ + Version: s.Config.Version, + APIVersion: s.Config.APIVersion, + VersionGitTag: s.Config.VersionGitTag, + Servers: []ServerCfg{}, + } + + for _, svr := range s.xdsServers { + cfg.Servers = append(cfg.Servers, ServerCfg{ + ID: svr.ID, + URL: svr.BaseURL, + APIURL: svr.APIURL, + PartialURL: svr.PartialURL, + ConnRetry: svr.ConnRetry, + Connected: svr.Connected, + Disabled: svr.Disabled, + }) + } + return cfg +} diff --git a/lib/agent/apiv1-events.go b/lib/agent/apiv1-events.go new file mode 100644 index 0000000..8aad18a --- /dev/null +++ b/lib/agent/apiv1-events.go @@ -0,0 +1,73 @@ +package agent + +import ( + "net/http" + + "github.com/gin-gonic/gin" + common "github.com/iotbzh/xds-common/golib" +) + +// EventRegisterArgs is the parameters (json format) of /events/register command +type EventRegisterArgs struct { + Name string `json:"name"` + ProjectID string `json:"filterProjectID"` +} + +// EventUnRegisterArgs is the parameters (json format) of /events/unregister command +type EventUnRegisterArgs struct { + Name string `json:"name"` + ID int `json:"id"` +} + +// eventsList Registering for events that will be send over a WS +func (s *APIService) eventsList(c *gin.Context) { + c.JSON(http.StatusOK, s.events.GetList()) +} + +// eventsRegister Registering for events that will be send over a WS +func (s *APIService) eventsRegister(c *gin.Context) { + var args EventRegisterArgs + + if c.BindJSON(&args) != nil || args.Name == "" { + common.APIError(c, "Invalid arguments") + return + } + + sess := s.webServer.sessions.Get(c) + if sess == nil { + common.APIError(c, "Unknown sessions") + return + } + + // Register to all or to a specific events + if err := s.events.Register(args.Name, sess.ID); err != nil { + common.APIError(c, err.Error()) + return + } + + c.JSON(http.StatusOK, gin.H{"status": "OK"}) +} + +// eventsRegister Registering for events that will be send over a WS +func (s *APIService) eventsUnRegister(c *gin.Context) { + var args EventUnRegisterArgs + + if c.BindJSON(&args) != nil || args.Name == "" { + common.APIError(c, "Invalid arguments") + return + } + + sess := s.webServer.sessions.Get(c) + if sess == nil { + common.APIError(c, "Unknown sessions") + return + } + + // Register to all or to a specific events + if err := s.events.UnRegister(args.Name, sess.ID); err != nil { + common.APIError(c, err.Error()) + return + } + + c.JSON(http.StatusOK, gin.H{"status": "OK"}) +} diff --git a/lib/agent/apiv1-exec.go b/lib/agent/apiv1-exec.go new file mode 100644 index 0000000..83ec7aa --- /dev/null +++ b/lib/agent/apiv1-exec.go @@ -0,0 +1,99 @@ +package agent + +import ( + "encoding/json" + "io/ioutil" + "net/http" + + "github.com/gin-gonic/gin" + common "github.com/iotbzh/xds-common/golib" +) + +// Only define useful fields +type ExecArgs struct { + ID string `json:"id" binding:"required"` +} + +// ExecCmd executes remotely a command +func (s *APIService) execCmd(c *gin.Context) { + s._execRequest("/exec", c) +} + +// execSignalCmd executes remotely a command +func (s *APIService) execSignalCmd(c *gin.Context) { + s._execRequest("/signal", c) +} + +func (s *APIService) _execRequest(url string, c *gin.Context) { + data, err := c.GetRawData() + if err != nil { + common.APIError(c, err.Error()) + } + + // First get Project ID to retrieve Server ID and send command to right server + id := c.Param("id") + if id == "" { + args := ExecArgs{} + // XXX - we cannot use c.BindJSON, so directly unmarshall it + // (see https://github.com/gin-gonic/gin/issues/1078) + if err := json.Unmarshal(data, &args); err != nil { + common.APIError(c, "Invalid arguments") + return + } + id = args.ID + } + prj := s.projects.Get(id) + if prj == nil { + common.APIError(c, "Unknown id") + return + } + + svr := (*prj).GetServer() + if svr == nil { + common.APIError(c, "Cannot identify XDS Server") + return + } + + // Retrieve session info + sess := s.sessions.Get(c) + if sess == nil { + common.APIError(c, "Unknown sessions") + return + } + sock := sess.IOSocket + if sock == nil { + common.APIError(c, "Websocket not established") + return + } + + // Forward XDSServer WS events to client WS + // TODO removed static event name list and get it from XDSServer + for _, evName := range []string{ + "exec:input", + "exec:output", + "exec:exit", + "exec:inferior-input", + "exec:inferior-output", + } { + evN := evName + svr.EventOn(evN, func(evData interface{}) { + (*sock).Emit(evN, evData) + }) + } + + // Forward back command to right server + response, err := svr.HTTPPostBody(url, string(data)) + if err != nil { + common.APIError(c, err.Error()) + return + } + + // Decode response + body, err := ioutil.ReadAll(response.Body) + if err != nil { + common.APIError(c, "Cannot read response body") + return + } + c.JSON(http.StatusOK, string(body)) + +} diff --git a/lib/agent/apiv1-projects.go b/lib/agent/apiv1-projects.go new file mode 100644 index 0000000..d4b5e74 --- /dev/null +++ b/lib/agent/apiv1-projects.go @@ -0,0 +1,72 @@ +package agent + +import ( + "net/http" + + "github.com/gin-gonic/gin" + common "github.com/iotbzh/xds-common/golib" +) + +// getProjects returns all projects configuration +func (s *APIService) getProjects(c *gin.Context) { + c.JSON(http.StatusOK, s.projects.GetProjectArr()) +} + +// getProject returns a specific project configuration +func (s *APIService) getProject(c *gin.Context) { + prj := s.projects.Get(c.Param("id")) + if prj == nil { + common.APIError(c, "Invalid id") + return + } + + c.JSON(http.StatusOK, (*prj).GetProject()) +} + +// addProject adds a new project to server config +func (s *APIService) addProject(c *gin.Context) { + var cfgArg ProjectConfig + if c.BindJSON(&cfgArg) != nil { + common.APIError(c, "Invalid arguments") + return + } + + s.Log.Debugln("Add project config: ", cfgArg) + + newFld, err := s.projects.Add(cfgArg) + if err != nil { + common.APIError(c, err.Error()) + return + } + + c.JSON(http.StatusOK, newFld) +} + +// syncProject force synchronization of project files +func (s *APIService) syncProject(c *gin.Context) { + id := c.Param("id") + + s.Log.Debugln("Sync project id: ", id) + + err := s.projects.ForceSync(id) + if err != nil { + common.APIError(c, err.Error()) + return + } + + c.JSON(http.StatusOK, "") +} + +// delProject deletes project from server config +func (s *APIService) delProject(c *gin.Context) { + id := c.Param("id") + + s.Log.Debugln("Delete project id ", id) + + delEntry, err := s.projects.Delete(id) + if err != nil { + common.APIError(c, err.Error()) + return + } + c.JSON(http.StatusOK, delEntry) +} diff --git a/lib/agent/apiv1-version.go b/lib/agent/apiv1-version.go new file mode 100644 index 0000000..c2387c1 --- /dev/null +++ b/lib/agent/apiv1-version.go @@ -0,0 +1,45 @@ +package agent + +import ( + "net/http" + + "github.com/gin-gonic/gin" + common "github.com/iotbzh/xds-common/golib" +) + +type version struct { + ID string `json:"id"` + Version string `json:"version"` + APIVersion string `json:"apiVersion"` + VersionGitTag string `json:"gitTag"` +} + +type apiVersion struct { + Client version `json:"client"` + Server []version `json:"servers"` +} + +// getInfo : return various information about server +func (s *APIService) getVersion(c *gin.Context) { + response := apiVersion{ + Client: version{ + ID: "", + Version: s.Config.Version, + APIVersion: s.Config.APIVersion, + VersionGitTag: s.Config.VersionGitTag, + }, + } + + svrVer := []version{} + for _, svr := range s.xdsServers { + res := version{} + if err := svr.HTTPGet("/version", &res); err != nil { + common.APIError(c, "Cannot retrieve version of XDS server ID %s : %v", svr.ID, err.Error()) + return + } + svrVer = append(svrVer, res) + } + response.Server = svrVer + + c.JSON(http.StatusOK, response) +} diff --git a/lib/agent/apiv1.go b/lib/agent/apiv1.go new file mode 100644 index 0000000..77b05ba --- /dev/null +++ b/lib/agent/apiv1.go @@ -0,0 +1,129 @@ +package agent + +import ( + "fmt" + "strconv" + + "github.com/gin-gonic/gin" + "github.com/iotbzh/xds-agent/lib/xdsconfig" +) + +const apiBaseUrl = "/api/v1" + +// APIService . +type APIService struct { + *Context + apiRouter *gin.RouterGroup + serverIndex int +} + +// NewAPIV1 creates a new instance of API service +func NewAPIV1(ctx *Context) *APIService { + s := &APIService{ + Context: ctx, + apiRouter: ctx.webServer.router.Group(apiBaseUrl), + serverIndex: 0, + } + + s.apiRouter.GET("/version", s.getVersion) + + s.apiRouter.GET("/config", s.getConfig) + s.apiRouter.POST("/config", s.setConfig) + + s.apiRouter.GET("/browse", s.browseFS) + + s.apiRouter.GET("/projects", s.getProjects) + s.apiRouter.GET("/project/:id", s.getProject) + s.apiRouter.POST("/project", s.addProject) + s.apiRouter.POST("/project/sync/:id", s.syncProject) + s.apiRouter.DELETE("/project/:id", s.delProject) + + s.apiRouter.POST("/exec", s.execCmd) + s.apiRouter.POST("/exec/:id", s.execCmd) + s.apiRouter.POST("/signal", s.execSignalCmd) + + s.apiRouter.GET("/events", s.eventsList) + s.apiRouter.POST("/events/register", s.eventsRegister) + s.apiRouter.POST("/events/unregister", s.eventsUnRegister) + + return s +} + +// Stop Used to stop/close created services +func (s *APIService) Stop() { + for _, svr := range s.xdsServers { + svr.Close() + } +} + +// AddXdsServer Add a new XDS Server to the list of a server +func (s *APIService) AddXdsServer(cfg xdsconfig.XDSServerConf) (*XdsServer, error) { + var svr *XdsServer + var exist, tempoID bool + tempoID = false + + // First check if not already exist and update it + if svr, exist = s.xdsServers[cfg.ID]; exist { + + // Update: Found, so just update some settings + svr.ConnRetry = cfg.ConnRetry + + tempoID = svr.IsTempoID() + if svr.Connected && !svr.Disabled && svr.BaseURL == cfg.URL && tempoID { + return svr, nil + } + + // URL differ or not connected, so need to reconnect + svr.BaseURL = cfg.URL + + } else { + + // Create a new server object + if cfg.APIBaseURL == "" { + cfg.APIBaseURL = apiBaseUrl + } + if cfg.APIPartialURL == "" { + cfg.APIPartialURL = "/server/" + strconv.Itoa(s.serverIndex) + s.serverIndex = s.serverIndex + 1 + } + + // Create a new XDS Server + svr = NewXdsServer(s.Context, cfg) + + svr.SetLoggerOutput(s.Config.LogVerboseOut) + + // Passthrough routes (handle by XDS Server) + grp := s.apiRouter.Group(svr.PartialURL) + svr.SetAPIRouterGroup(grp) + svr.PassthroughGet("/sdks") + svr.PassthroughGet("/sdk/:id") + } + + // Established connection + err := svr.Connect() + + // Delete temporary ID with it has been replaced by right Server ID + if tempoID && !svr.IsTempoID() { + delete(s.xdsServers, cfg.ID) + } + + // Add to map + s.xdsServers[svr.ID] = svr + + // Load projects + if err == nil && svr.Connected { + err = s.projects.Init(svr) + } + + return svr, err +} + +// DelXdsServer Delete an XDS Server from the list of a server +func (s *APIService) DelXdsServer(id string) error { + if _, exist := s.xdsServers[id]; !exist { + return fmt.Errorf("Unknown Server ID %s", id) + } + // Don't really delete, just disable it + s.xdsServers[id].Close() + return nil +} diff --git a/lib/agent/events.go b/lib/agent/events.go new file mode 100644 index 0000000..24efc5a --- /dev/null +++ b/lib/agent/events.go @@ -0,0 +1,131 @@ +package agent + +import ( + "fmt" + "time" +) + +// Events constants +const ( + // EventTypePrefix Used as event prefix + EventTypePrefix = "event:" // following by event type + + // Supported Events type + EVTAll = "all" + EVTServerConfig = "server-config" // data type ServerCfg + EVTProjectAdd = "project-add" // data type ProjectConfig + EVTProjectDelete = "project-delete" // data type ProjectConfig + EVTProjectChange = "project-state-change" // data type ProjectConfig +) + +var EVTAllList = []string{ + EVTServerConfig, + EVTProjectAdd, + EVTProjectDelete, + EVTProjectChange, +} + +// EventMsg Message send +type EventMsg struct { + Time string `json:"time"` + Type string `json:"type"` + Data interface{} `json:"data"` +} + +type EventDef struct { + // SEB cbs []EventsCB + sids map[string]int +} + +type Events struct { + *Context + eventsMap map[string]*EventDef +} + +// NewEvents creates an instance of Events +func NewEvents(ctx *Context) *Events { + evMap := make(map[string]*EventDef) + for _, ev := range EVTAllList { + evMap[ev] = &EventDef{ + sids: make(map[string]int), + } + } + return &Events{ + Context: ctx, + eventsMap: evMap, + } +} + +// GetList returns the list of all supported events +func (e *Events) GetList() []string { + return EVTAllList +} + +// Register Used by a client/session to register to a specific (or all) event(s) +func (e *Events) Register(evName, sessionID string) error { + evs := EVTAllList + if evName != EVTAll { + if _, ok := e.eventsMap[evName]; !ok { + return fmt.Errorf("Unsupported event type name") + } + evs = []string{evName} + } + for _, ev := range evs { + e.eventsMap[ev].sids[sessionID]++ + } + return nil +} + +// UnRegister Used by a client/session to unregister event(s) +func (e *Events) UnRegister(evName, sessionID string) error { + evs := EVTAllList + if evName != EVTAll { + if _, ok := e.eventsMap[evName]; !ok { + return fmt.Errorf("Unsupported event type name") + } + evs = []string{evName} + } + for _, ev := range evs { + if _, exist := e.eventsMap[ev].sids[sessionID]; exist { + delete(e.eventsMap[ev].sids, sessionID) + break + } + } + return nil +} + +// Emit Used to manually emit an event +func (e *Events) Emit(evName string, data interface{}) error { + var firstErr error + + if _, ok := e.eventsMap[evName]; !ok { + return fmt.Errorf("Unsupported event type") + } + + e.Log.Debugf("Emit Event %s: %v", evName, data) + + firstErr = nil + evm := e.eventsMap[evName] + for sid := range evm.sids { + so := e.webServer.sessions.IOSocketGet(sid) + if so == nil { + if firstErr == nil { + firstErr = fmt.Errorf("IOSocketGet return nil") + } + continue + } + msg := EventMsg{ + Time: time.Now().String(), + Type: evName, + Data: data, + } + if err := (*so).Emit(EventTypePrefix+evName, msg); err != nil { + e.Log.Errorf("WS Emit %v error : %v", EventTypePrefix+evName, err) + if firstErr == nil { + firstErr = err + } + } + } + + return firstErr +} diff --git a/lib/agent/project-interface.go b/lib/agent/project-interface.go new file mode 100644 index 0000000..031e1d9 --- /dev/null +++ b/lib/agent/project-interface.go @@ -0,0 +1,47 @@ +package agent + +// ProjectType definition +type ProjectType string + +const ( + TypePathMap = "PathMap" + TypeCloudSync = "CloudSync" + TypeCifsSmb = "CIFS" +) + +// Project Status definition +const ( + StatusErrorConfig = "ErrorConfig" + StatusDisable = "Disable" + StatusEnable = "Enable" + StatusPause = "Pause" + StatusSyncing = "Syncing" +) + +type EventCBData map[string]interface{} +type EventCB func(cfg *ProjectConfig, data *EventCBData) + +// IPROJECT Project interface +type IPROJECT interface { + Add(cfg ProjectConfig) (*ProjectConfig, error) // Add a new project + Delete() error // Delete a project + GetProject() *ProjectConfig // Get project public configuration + SetProject(prj ProjectConfig) *ProjectConfig // Set project configuration + GetServer() *XdsServer // Get XdsServer that holds this project + GetFullPath(dir string) string // Get project full path + Sync() error // Force project files synchronization + IsInSync() (bool, error) // Check if project files are in-sync +} + +// ProjectConfig is the config for one project +type ProjectConfig struct { + ID string `json:"id"` + ServerID string `json:"serverId"` + Label string `json:"label"` + ClientPath string `json:"clientPath"` + ServerPath string `json:"serverPath"` + Type ProjectType `json:"type"` + Status string `json:"status"` + IsInSync bool `json:"isInSync"` + DefaultSdk string `json:"defaultSdk"` +} diff --git a/lib/agent/project-pathmap.go b/lib/agent/project-pathmap.go new file mode 100644 index 0000000..1de8e11 --- /dev/null +++ b/lib/agent/project-pathmap.go @@ -0,0 +1,79 @@ +package agent + +import ( + "path/filepath" +) + +// IPROJECT interface implementation for native/path mapping projects + +// PathMap . +type PathMap struct { + *Context + server *XdsServer + folder *FolderConfig +} + +// NewProjectPathMap Create a new instance of PathMap +func NewProjectPathMap(ctx *Context, svr *XdsServer) *PathMap { + p := PathMap{ + Context: ctx, + server: svr, + folder: &FolderConfig{}, + } + return &p +} + +// Add a new project +func (p *PathMap) Add(cfg ProjectConfig) (*ProjectConfig, error) { + var err error + + // SEB TODO: check local/server directory access + + err = p.server.FolderAdd(p.server.ProjectToFolder(cfg), p.folder) + if err != nil { + return nil, err + } + + return p.GetProject(), nil +} + +// Delete a project +func (p *PathMap) Delete() error { + return p.server.FolderDelete(p.folder.ID) +} + +// GetProject Get public part of project config +func (p *PathMap) GetProject() *ProjectConfig { + prj := p.server.FolderToProject(*p.folder) + prj.ServerID = p.server.ID + return &prj +} + +// SetProject Set project config +func (p *PathMap) SetProject(prj ProjectConfig) *ProjectConfig { + p.folder = p.server.ProjectToFolder(prj) + return p.GetProject() +} + +// GetServer Get the XdsServer that holds this project +func (p *PathMap) GetServer() *XdsServer { + return p.server +} + +// GetFullPath returns the full path of a directory (from server POV) +func (p *PathMap) GetFullPath(dir string) string { + if &dir == nil { + return p.folder.DataPathMap.ServerPath + } + return filepath.Join(p.folder.DataPathMap.ServerPath, dir) +} + +// Sync Force project files synchronization +func (p *PathMap) Sync() error { + return nil +} + +// IsInSync Check if project files are in-sync +func (p *PathMap) IsInSync() (bool, error) { + return true, nil +} diff --git a/lib/agent/project-st.go b/lib/agent/project-st.go new file mode 100644 index 0000000..28a287c --- /dev/null +++ b/lib/agent/project-st.go @@ -0,0 +1,93 @@ +package agent + +import "github.com/iotbzh/xds-agent/lib/syncthing" + +// SEB TODO + +// IPROJECT interface implementation for syncthing projects + +// STProject . +type STProject struct { + *Context + server *XdsServer + folder *FolderConfig +} + +// NewProjectST Create a new instance of STProject +func NewProjectST(ctx *Context, svr *XdsServer) *STProject { + p := STProject{ + Context: ctx, + server: svr, + folder: &FolderConfig{}, + } + return &p +} + +// Add a new project +func (p *STProject) Add(cfg ProjectConfig) (*ProjectConfig, error) { + var err error + + err = p.server.FolderAdd(p.server.ProjectToFolder(cfg), p.folder) + if err != nil { + return nil, err + } + svrPrj := p.GetProject() + + // Declare project into local Syncthing + p.SThg.FolderChange(st.FolderChangeArg{ + ID: cfg.ID, + Label: cfg.Label, + RelativePath: cfg.ClientPath, + SyncThingID: p.server.ServerConfig.Builder.SyncThingID, + }) + + return svrPrj, nil +} + +// Delete a project +func (p *STProject) Delete() error { + return p.server.FolderDelete(p.folder.ID) +} + +// GetProject Get public part of project config +func (p *STProject) GetProject() *ProjectConfig { + prj := p.server.FolderToProject(*p.folder) + prj.ServerID = p.server.ID + return &prj +} + +// SetProject Set project config +func (p *STProject) SetProject(prj ProjectConfig) *ProjectConfig { + // SEB TODO + p.folder = p.server.ProjectToFolder(prj) + return p.GetProject() +} + +// GetServer Get the XdsServer that holds this project +func (p *STProject) GetServer() *XdsServer { + // SEB TODO + return p.server +} + +// GetFullPath returns the full path of a directory (from server POV) +func (p *STProject) GetFullPath(dir string) string { + /* SEB + if &dir == nil { + return p.folder.DataSTProject.ServerPath + } + return filepath.Join(p.folder.DataSTProject.ServerPath, dir) + */ + return "SEB TODO" +} + +// Sync Force project files synchronization +func (p *STProject) Sync() error { + // SEB TODO + return nil +} + +// IsInSync Check if project files are in-sync +func (p *STProject) IsInSync() (bool, error) { + // SEB TODO + return false, nil +} diff --git a/lib/agent/projects.go b/lib/agent/projects.go new file mode 100644 index 0000000..39c120f --- /dev/null +++ b/lib/agent/projects.go @@ -0,0 +1,254 @@ +package agent + +import ( + "fmt" + "log" + "time" + + "github.com/iotbzh/xds-agent/lib/syncthing" + "github.com/syncthing/syncthing/lib/sync" +) + +// Projects Represent a an XDS Projects +type Projects struct { + *Context + SThg *st.SyncThing + projects map[string]*IPROJECT + //SEB registerCB []RegisteredCB +} + +/* SEB +type RegisteredCB struct { + cb *EventCB + data *EventCBData +} +*/ + +// Mutex to make add/delete atomic +var pjMutex = sync.NewMutex() + +// NewProjects Create a new instance of Project Model +func NewProjects(ctx *Context, st *st.SyncThing) *Projects { + return &Projects{ + Context: ctx, + SThg: st, + projects: make(map[string]*IPROJECT), + //registerCB: []RegisteredCB{}, + } +} + +// Init Load Projects configuration +func (p *Projects) Init(server *XdsServer) error { + svrList := make(map[string]*XdsServer) + // If server not set, load for all servers + if server == nil { + svrList = p.xdsServers + } else { + svrList[server.ID] = server + } + errMsg := "" + for _, svr := range svrList { + if svr.Disabled { + continue + } + xFlds := []FolderConfig{} + if err := svr.HTTPGet("/folders", &xFlds); err != nil { + errMsg += fmt.Sprintf("Cannot retrieve folders config of XDS server ID %s : %v \n", svr.ID, err.Error()) + continue + } + p.Log.Debugf("Server %s, %d projects detected", svr.ID[:8], len(xFlds)) + for _, prj := range xFlds { + newP := svr.FolderToProject(prj) + if /*nPrj*/ _, err := p.createUpdate(newP, false, true); err != nil { + errMsg += "Error while creating project id " + prj.ID + ": " + err.Error() + "\n " + continue + } + + /* FIXME emit EVTProjectChange event ? + if err := p.events.Emit(EVTProjectChange, *nPrj); err != nil { + p.Log.Warningf("Cannot notify project change: %v", err) + } + */ + } + + } + + p.Log.Infof("Number of loaded Projects: %d", len(p.projects)) + + if errMsg != "" { + return fmt.Errorf(errMsg) + } + return nil +} + +// Get returns the folder config or nil if not existing +func (p *Projects) Get(id string) *IPROJECT { + if id == "" { + return nil + } + fc, exist := p.projects[id] + if !exist { + return nil + } + return fc +} + +// GetProjectArr returns the config of all folders as an array +func (p *Projects) GetProjectArr() []ProjectConfig { + pjMutex.Lock() + defer pjMutex.Unlock() + + return p.GetProjectArrUnsafe() +} + +// GetProjectArrUnsafe Same as GetProjectArr without mutex protection +func (p *Projects) GetProjectArrUnsafe() []ProjectConfig { + conf := []ProjectConfig{} + for _, v := range p.projects { + prj := (*v).GetProject() + conf = append(conf, *prj) + } + return conf +} + +// Add adds a new folder +func (p *Projects) Add(newF ProjectConfig) (*ProjectConfig, error) { + prj, err := p.createUpdate(newF, true, false) + if err != nil { + return prj, err + } + + // Notify client with event + if err := p.events.Emit(EVTProjectAdd, *prj); err != nil { + p.Log.Warningf("Cannot notify project deletion: %v", err) + } + + return prj, err +} + +// CreateUpdate creates or update a folder +func (p *Projects) createUpdate(newF ProjectConfig, create bool, initial bool) (*ProjectConfig, error) { + var err error + + pjMutex.Lock() + defer pjMutex.Unlock() + + // Sanity check + if _, exist := p.projects[newF.ID]; create && exist { + return nil, fmt.Errorf("ID already exists") + } + if newF.ClientPath == "" { + return nil, fmt.Errorf("ClientPath must be set") + } + if newF.ServerID == "" { + return nil, fmt.Errorf("Server ID must be set") + } + var svr *XdsServer + var exist bool + if svr, exist = p.xdsServers[newF.ServerID]; !exist { + return nil, fmt.Errorf("Unknown Server ID %s", newF.ServerID) + } + + // Check type supported + b, exist := svr.ServerConfig.SupportedSharing[string(newF.Type)] + if !exist || !b { + return nil, fmt.Errorf("Server doesn't support project type %s", newF.Type) + } + + // Create a new folder object + var fld IPROJECT + switch newF.Type { + // SYNCTHING + case TypeCloudSync: + if p.SThg != nil { + /*SEB fld = f.SThg.NewFolderST(f.Conf)*/ + fld = NewProjectST(p.Context, svr) + } else { + return nil, fmt.Errorf("Cloud Sync project not supported") + } + + // PATH MAP + case TypePathMap: + fld = NewProjectPathMap(p.Context, svr) + default: + return nil, fmt.Errorf("Unsupported folder type") + } + + var newPrj *ProjectConfig + if create { + // Add project on server + if newPrj, err = fld.Add(newF); err != nil { + newF.Status = StatusErrorConfig + log.Printf("ERROR Adding folder: %v\n", err) + return newPrj, err + } + } else { + // Just update project config + newPrj = fld.SetProject(newF) + } + + // Sanity check + if newPrj.ID == "" { + log.Printf("ERROR project ID empty: %v", newF) + return newPrj, fmt.Errorf("Project ID empty") + } + + // Add to folders list + p.projects[newPrj.ID] = &fld + + // Force sync after creation + // (need to defer to be sure that WS events will arrive after HTTP creation reply) + go func() { + time.Sleep(time.Millisecond * 500) + fld.Sync() + }() + + return newPrj, nil +} + +// Delete deletes a specific folder +func (p *Projects) Delete(id string) (ProjectConfig, error) { + var err error + + pjMutex.Lock() + defer pjMutex.Unlock() + + fld := ProjectConfig{} + fc, exist := p.projects[id] + if !exist { + return fld, fmt.Errorf("unknown id") + } + + prj := (*fc).GetProject() + + if err = (*fc).Delete(); err != nil { + return *prj, err + } + + delete(p.projects, id) + + // Notify client with event + if err := p.events.Emit(EVTProjectDelete, *prj); err != nil { + p.Log.Warningf("Cannot notify project deletion: %v", err) + } + + return *prj, err +} + +// ForceSync Force the synchronization of a folder +func (p *Projects) ForceSync(id string) error { + fc := p.Get(id) + if fc == nil { + return fmt.Errorf("Unknown id") + } + return (*fc).Sync() +} + +// IsProjectInSync Returns true when folder is in sync +func (p *Projects) IsProjectInSync(id string) (bool, error) { + fc := p.Get(id) + if fc == nil { + return false, fmt.Errorf("Unknown id") + } + return (*fc).IsInSync() +} diff --git a/lib/agent/session.go b/lib/agent/session.go new file mode 100644 index 0000000..e50abe1 --- /dev/null +++ b/lib/agent/session.go @@ -0,0 +1,224 @@ +package agent + +import ( + "encoding/base64" + "strconv" + "time" + + "github.com/gin-gonic/gin" + "github.com/googollee/go-socket.io" + uuid "github.com/satori/go.uuid" + "github.com/syncthing/syncthing/lib/sync" +) + +const sessionCookieName = "xds-agent-sid" +const sessionHeaderName = "XDS-AGENT-SID" + +const sessionMonitorTime = 10 // Time (in seconds) to schedule monitoring session tasks + +const initSessionMaxAge = 10 // Initial session max age in seconds +const maxSessions = 100000 // Maximum number of sessions in sessMap map + +const secureCookie = false // TODO: see https://github.com/astaxie/beego/blob/master/session/session.go#L218 + +// ClientSession contains the info of a user/client session +type ClientSession struct { + ID string + WSID string // only one WebSocket per client/session + MaxAge int64 + IOSocket *socketio.Socket + + // private + expireAt time.Time + useCount int64 +} + +// Sessions holds client sessions +type Sessions struct { + *Context + cookieMaxAge int64 + sessMap map[string]ClientSession + mutex sync.Mutex + stop chan struct{} // signals intentional stop +} + +// NewClientSessions . +func NewClientSessions(ctx *Context, cookieMaxAge string) *Sessions { + ckMaxAge, err := strconv.ParseInt(cookieMaxAge, 10, 0) + if err != nil { + ckMaxAge = 0 + } + s := Sessions{ + Context: ctx, + cookieMaxAge: ckMaxAge, + sessMap: make(map[string]ClientSession), + mutex: sync.NewMutex(), + stop: make(chan struct{}), + } + s.webServer.router.Use(s.Middleware()) + + // Start monitoring of sessions Map (use to manage expiration and cleanup) + go s.monitorSessMap() + + return &s +} + +// Stop sessions management +func (s *Sessions) Stop() { + close(s.stop) +} + +// Middleware is used to managed session +func (s *Sessions) Middleware() gin.HandlerFunc { + return func(c *gin.Context) { + // FIXME Add CSRF management + + // Get session + sess := s.Get(c) + if sess == nil { + // Allocate a new session key and put in cookie + sess = s.newSession("") + } else { + s.refresh(sess.ID) + } + + // Set session in cookie and in header + // Do not set Domain to localhost (http://stackoverflow.com/questions/1134290/cookies-on-localhost-with-explicit-domain) + c.SetCookie(sessionCookieName, sess.ID, int(sess.MaxAge), "/", "", + secureCookie, false) + c.Header(sessionHeaderName, sess.ID) + + // Save session id in gin metadata + c.Set(sessionCookieName, sess.ID) + + c.Next() + } +} + +// Get returns the client session for a specific ID +func (s *Sessions) Get(c *gin.Context) *ClientSession { + var sid string + + // First get from gin metadata + v, exist := c.Get(sessionCookieName) + if v != nil { + sid = v.(string) + } + + // Then look in cookie + if !exist || sid == "" { + sid, _ = c.Cookie(sessionCookieName) + } + + // Then look in Header + if sid == "" { + sid = c.Request.Header.Get(sessionCookieName) + } + if sid != "" { + s.mutex.Lock() + defer s.mutex.Unlock() + if key, ok := s.sessMap[sid]; ok { + // TODO: return a copy ??? + return &key + } + } + return nil +} + +// IOSocketGet Get socketio definition from sid +func (s *Sessions) IOSocketGet(sid string) *socketio.Socket { + s.mutex.Lock() + defer s.mutex.Unlock() + sess, ok := s.sessMap[sid] + if ok { + return sess.IOSocket + } + return nil +} + +// UpdateIOSocket updates the IO Socket definition for of a session +func (s *Sessions) UpdateIOSocket(sid string, so *socketio.Socket) error { + s.mutex.Lock() + defer s.mutex.Unlock() + if _, ok := s.sessMap[sid]; ok { + sess := s.sessMap[sid] + if so == nil { + // Could be the case when socketio is closed/disconnected + sess.WSID = "" + } else { + sess.WSID = (*so).Id() + } + sess.IOSocket = so + s.sessMap[sid] = sess + } + return nil +} + +// nesSession Allocate a new client session +func (s *Sessions) newSession(prefix string) *ClientSession { + uuid := prefix + uuid.NewV4().String() + id := base64.URLEncoding.EncodeToString([]byte(uuid)) + se := ClientSession{ + ID: id, + WSID: "", + MaxAge: initSessionMaxAge, + IOSocket: nil, + expireAt: time.Now().Add(time.Duration(initSessionMaxAge) * time.Second), + useCount: 0, + } + s.mutex.Lock() + defer s.mutex.Unlock() + + s.sessMap[se.ID] = se + + s.Log.Debugf("NEW session (%d): %s", len(s.sessMap), id) + return &se +} + +// refresh Move this session ID to the head of the list +func (s *Sessions) refresh(sid string) { + s.mutex.Lock() + defer s.mutex.Unlock() + + sess := s.sessMap[sid] + sess.useCount++ + if sess.MaxAge < s.cookieMaxAge && sess.useCount > 1 { + sess.MaxAge = s.cookieMaxAge + sess.expireAt = time.Now().Add(time.Duration(sess.MaxAge) * time.Second) + } + + // TODO - Add flood detection (like limit_req of nginx) + // (delayed request when to much requests in a short period of time) + + s.sessMap[sid] = sess +} + +func (s *Sessions) monitorSessMap() { + const dbgFullTrace = false // for debugging + + for { + select { + case <-s.stop: + s.Log.Debugln("Stop monitorSessMap") + return + case <-time.After(sessionMonitorTime * time.Second): + if dbgFullTrace { + s.Log.Debugf("Sessions Map size: %d", len(s.sessMap)) + s.Log.Debugf("Sessions Map : %v", s.sessMap) + } + + if len(s.sessMap) > maxSessions { + s.Log.Errorln("TOO MUCH sessions, cleanup old ones !") + } + + s.mutex.Lock() + for _, ss := range s.sessMap { + if ss.expireAt.Sub(time.Now()) < 0 { + //SEB DEBUG s.Log.Debugf("Delete expired session id: %s", ss.ID) + delete(s.sessMap, ss.ID) + } + } + s.mutex.Unlock() + } + } +} diff --git a/lib/agent/webserver.go b/lib/agent/webserver.go new file mode 100644 index 0000000..ead06d1 --- /dev/null +++ b/lib/agent/webserver.go @@ -0,0 +1,246 @@ +package agent + +import ( + "fmt" + "log" + "net/http" + "os" + "path" + + "github.com/Sirupsen/logrus" + "github.com/gin-contrib/static" + "github.com/gin-gonic/gin" + "github.com/googollee/go-socket.io" +) + +// WebServer . +type WebServer struct { + *Context + router *gin.Engine + api *APIService + sIOServer *socketio.Server + webApp *gin.RouterGroup + stop chan struct{} // signals intentional stop +} + +const indexFilename = "index.html" + +// NewWebServer creates an instance of WebServer +func NewWebServer(ctx *Context) *WebServer { + + // Setup logging for gin router + if ctx.Log.Level == logrus.DebugLevel { + gin.SetMode(gin.DebugMode) + } else { + gin.SetMode(gin.ReleaseMode) + } + + // Redirect gin logs into another logger (LogVerboseOut may be stderr or a file) + gin.DefaultWriter = ctx.Config.LogVerboseOut + gin.DefaultErrorWriter = ctx.Config.LogVerboseOut + log.SetOutput(ctx.Config.LogVerboseOut) + + // Creates gin router + r := gin.New() + + svr := &WebServer{ + Context: ctx, + router: r, + api: nil, + sIOServer: nil, + webApp: nil, + stop: make(chan struct{}), + } + + return svr +} + +// Serve starts a new instance of the Web Server +func (s *WebServer) Serve() error { + var err error + + // Setup middlewares + s.router.Use(gin.Logger()) + s.router.Use(gin.Recovery()) + s.router.Use(s.middlewareCORS()) + s.router.Use(s.middlewareXDSDetails()) + s.router.Use(s.middlewareCSRF()) + + // Create REST API + s.api = NewAPIV1(s.Context) + + // Create connections to XDS Servers + // XXX - not sure there is no side effect to do it in background ! + go func() { + for _, svrCfg := range s.Config.FileConf.ServersConf { + if svr, err := s.api.AddXdsServer(svrCfg); err != nil { + // Just log error, don't consider as critical + s.Log.Infof("Cannot connect to XDS Server url=%s: %v", svr.BaseURL, err.Error()) + } + } + }() + + // Websocket routes + s.sIOServer, err = socketio.NewServer(nil) + if err != nil { + s.Log.Fatalln(err) + } + + s.router.GET("/socket.io/", s.socketHandler) + s.router.POST("/socket.io/", s.socketHandler) + /* TODO: do we want to support ws://... ? + s.router.Handle("WS", "/socket.io/", s.socketHandler) + s.router.Handle("WSS", "/socket.io/", s.socketHandler) + */ + + // Web Application (serve on / ) + idxFile := path.Join(s.Config.FileConf.WebAppDir, indexFilename) + if _, err := os.Stat(idxFile); err != nil { + s.Log.Fatalln("Web app directory not found, check/use webAppDir setting in config file: ", idxFile) + } + s.Log.Infof("Serve WEB app dir: %s", s.Config.FileConf.WebAppDir) + s.router.Use(static.Serve("/", static.LocalFile(s.Config.FileConf.WebAppDir, true))) + s.webApp = s.router.Group("/", s.serveIndexFile) + { + s.webApp.GET("/") + } + + // Serve in the background + serveError := make(chan error, 1) + go func() { + fmt.Printf("Web Server running on localhost:%s ...\n", s.Config.FileConf.HTTPPort) + serveError <- http.ListenAndServe(":"+s.Config.FileConf.HTTPPort, s.router) + }() + + fmt.Printf("XDS agent running...\n") + + // Wait for stop, restart or error signals + select { + case <-s.stop: + // Shutting down permanently + s.sessions.Stop() + s.Log.Infoln("shutting down (stop)") + case err = <-serveError: + // Error due to listen/serve failure + s.Log.Errorln(err) + } + + return nil +} + +// Stop web server +func (s *WebServer) Stop() { + s.api.Stop() + close(s.stop) +} + +// serveIndexFile provides initial file (eg. index.html) of webapp +func (s *WebServer) serveIndexFile(c *gin.Context) { + c.HTML(200, indexFilename, gin.H{}) +} + +// Add details in Header +func (s *WebServer) middlewareXDSDetails() gin.HandlerFunc { + return func(c *gin.Context) { + c.Header("XDS-Agent-Version", s.Config.Version) + c.Header("XDS-API-Version", s.Config.APIVersion) + c.Next() + } +} + +/* SEB +func (s *WebServer) isValidAPIKey(key string) bool { + return (key == s.Config.FileConf.XDSAPIKey && key != "") +} +*/ + +func (s *WebServer) middlewareCSRF() gin.HandlerFunc { + return func(c *gin.Context) { + // XXX - not used for now + c.Next() + return + /* + // Allow requests carrying a valid API key + if s.isValidAPIKey(c.Request.Header.Get("X-API-Key")) { + // Set the access-control-allow-origin header for CORS requests + // since a valid API key has been provided + c.Header("Access-Control-Allow-Origin", "*") + c.Next() + return + } + + // Allow io.socket request + if strings.HasPrefix(c.Request.URL.Path, "/socket.io") { + c.Next() + return + } + + // FIXME Add really CSRF support + + // Allow requests for anything not under the protected path prefix, + // and set a CSRF cookie if there isn't already a valid one. + //if !strings.HasPrefix(c.Request.URL.Path, prefix) { + // cookie, err := c.Cookie("CSRF-Token-" + unique) + // if err != nil || !validCsrfToken(cookie.Value) { + // s.Log.Debugln("new CSRF cookie in response to request for", c.Request.URL) + // c.SetCookie("CSRF-Token-"+unique, newCsrfToken(), 600, "/", "", false, false) + // } + // c.Next() + // return + //} + + // Verify the CSRF token + //token := c.Request.Header.Get("X-CSRF-Token-" + unique) + //if !validCsrfToken(token) { + // c.AbortWithError(403, "CSRF Error") + // return + //} + + //c.Next() + + c.AbortWithError(403, fmt.Errorf("Not valid API key")) + */ + } +} + +// CORS middleware +func (s *WebServer) middlewareCORS() gin.HandlerFunc { + return func(c *gin.Context) { + if c.Request.Method == "OPTIONS" { + c.Header("Access-Control-Allow-Origin", "*") + c.Header("Access-Control-Allow-Headers", "Content-Type, X-API-Key") + c.Header("Access-Control-Allow-Methods", "GET, POST, DELETE") + c.Header("Access-Control-Max-Age", cookieMaxAge) + c.AbortWithStatus(204) + return + } + c.Next() + } +} + +// socketHandler is the handler for the "main" websocket connection +func (s *WebServer) socketHandler(c *gin.Context) { + + // Retrieve user session + sess := s.sessions.Get(c) + if sess == nil { + c.JSON(500, gin.H{"error": "Cannot retrieve session"}) + return + } + + s.sIOServer.On("connection", func(so socketio.Socket) { + s.Log.Debugf("WS Connected (SID=%v)", so.Id()) + s.sessions.UpdateIOSocket(sess.ID, &so) + + so.On("disconnection", func() { + s.Log.Debugf("WS disconnected (SID=%v)", so.Id()) + s.sessions.UpdateIOSocket(sess.ID, nil) + }) + }) + + s.sIOServer.On("error", func(so socketio.Socket, err error) { + s.Log.Errorf("WS SID=%v Error : %v", so.Id(), err.Error()) + }) + + s.sIOServer.ServeHTTP(c.Writer, c.Request) +} diff --git a/lib/agent/xdsserver.go b/lib/agent/xdsserver.go new file mode 100644 index 0000000..014415f --- /dev/null +++ b/lib/agent/xdsserver.go @@ -0,0 +1,472 @@ +package agent + +import ( + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "strings" + "time" + + "github.com/gin-gonic/gin" + "github.com/iotbzh/xds-agent/lib/xdsconfig" + common "github.com/iotbzh/xds-common/golib" + uuid "github.com/satori/go.uuid" + sio_client "github.com/zhouhui8915/go-socket.io-client" +) + +// Server . +type XdsServer struct { + *Context + ID string + BaseURL string + APIURL string + PartialURL string + ConnRetry int + Connected bool + Disabled bool + ServerConfig *xdsServerConfig + + // callbacks + CBOnError func(error) + CBOnDisconnect func(error) + + // Private fields + client *common.HTTPClient + ioSock *sio_client.Client + logOut io.Writer + apiRouter *gin.RouterGroup +} + +// xdsServerConfig Data return by GET /config +type xdsServerConfig struct { + ID string `json:"id"` + Version string `json:"version"` + APIVersion string `json:"apiVersion"` + VersionGitTag string `json:"gitTag"` + SupportedSharing map[string]bool `json:"supportedSharing"` + Builder xdsBuilderConfig `json:"builder"` +} + +// xdsBuilderConfig represents the builder container configuration +type xdsBuilderConfig struct { + IP string `json:"ip"` + Port string `json:"port"` + SyncThingID string `json:"syncThingID"` +} + +// FolderType XdsServer folder type +type FolderType string + +const ( + XdsTypePathMap = "PathMap" + XdsTypeCloudSync = "CloudSync" + XdsTypeCifsSmb = "CIFS" +) + +// FolderConfig XdsServer folder config +type FolderConfig struct { + ID string `json:"id"` + Label string `json:"label"` + ClientPath string `json:"path"` + Type FolderType `json:"type"` + Status string `json:"status"` + IsInSync bool `json:"isInSync"` + DefaultSdk string `json:"defaultSdk"` + // Specific data depending on which Type is used + DataPathMap PathMapConfig `json:"dataPathMap,omitempty"` + DataCloudSync CloudSyncConfig `json:"dataCloudSync,omitempty"` +} + +// PathMapConfig Path mapping specific data +type PathMapConfig struct { + ServerPath string `json:"serverPath"` +} + +// CloudSyncConfig CloudSync (AKA Syncthing) specific data +type CloudSyncConfig struct { + SyncThingID string `json:"syncThingID"` +} + +const _IDTempoPrefix = "tempo-" + +// NewXdsServer creates an instance of XdsServer +func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer { + return &XdsServer{ + Context: ctx, + ID: _IDTempoPrefix + uuid.NewV1().String(), + BaseURL: conf.URL, + APIURL: conf.APIBaseURL + conf.APIPartialURL, + PartialURL: conf.APIPartialURL, + ConnRetry: conf.ConnRetry, + Connected: false, + Disabled: false, + + logOut: ctx.Log.Out, + } +} + +// Close Free and close XDS Server connection +func (xs *XdsServer) Close() error { + xs.Connected = false + xs.Disabled = true + xs.ioSock = nil + xs._NotifyState() + return nil +} + +// Connect Establish HTTP connection with XDS Server +func (xs *XdsServer) Connect() error { + var err error + var retry int + + xs.Disabled = false + xs.Connected = false + + err = nil + for retry = xs.ConnRetry; retry > 0; retry-- { + if err = xs._CreateConnectHTTP(); err == nil { + break + } + if retry == xs.ConnRetry { + // Notify only on the first conn error + // doing that avoid 2 notifs (conn false; conn true) on startup + xs._NotifyState() + } + xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry) + time.Sleep(time.Second) + } + if retry == 0 { + // FIXME SEB: re-use _reconnect to wait longer in background + return fmt.Errorf("Connection to XDS Server failure") + } + if err != nil { + return err + } + + // Check HTTP connection and establish WS connection + err = xs._connect(false) + + return err +} + +// IsTempoID returns true when server as a temporary id +func (xs *XdsServer) IsTempoID() bool { + return strings.HasPrefix(xs.ID, _IDTempoPrefix) +} + +// SetLoggerOutput Set logger ou +func (xs *XdsServer) SetLoggerOutput(out io.Writer) { + xs.logOut = out +} + +// FolderAdd Send POST request to add a folder +func (xs *XdsServer) FolderAdd(prj *FolderConfig, res interface{}) error { + response, err := xs.HTTPPost("/folder", prj) + if err != nil { + return err + } + if response.StatusCode != 200 { + return fmt.Errorf("FolderAdd error status=%s", response.Status) + } + // Result is a FolderConfig that is equivalent to ProjectConfig + err = json.Unmarshal(xs.client.ResponseToBArray(response), res) + + return err +} + +// FolderDelete Send DELETE request to delete a folder +func (xs *XdsServer) FolderDelete(id string) error { + return xs.client.HTTPDelete("/folder/" + id) +} + +// HTTPGet . +func (xs *XdsServer) HTTPGet(url string, data interface{}) error { + var dd []byte + if err := xs.client.HTTPGet(url, &dd); err != nil { + return err + } + return json.Unmarshal(dd, &data) +} + +// HTTPPost . +func (xs *XdsServer) HTTPPost(url string, data interface{}) (*http.Response, error) { + body, err := json.Marshal(data) + if err != nil { + return nil, err + } + return xs.HTTPPostBody(url, string(body)) +} + +// HTTPPostBody . +func (xs *XdsServer) HTTPPostBody(url string, body string) (*http.Response, error) { + return xs.client.HTTPPostWithRes(url, body) +} + +// SetAPIRouterGroup . +func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) { + xs.apiRouter = r +} + +// PassthroughGet Used to declare a route that sends directly a GET request to XDS Server +func (xs *XdsServer) PassthroughGet(url string) { + if xs.apiRouter == nil { + xs.Log.Errorf("apiRouter not set !") + return + } + + xs.apiRouter.GET(url, func(c *gin.Context) { + var data interface{} + if err := xs.HTTPGet(url, &data); err != nil { + if strings.Contains(err.Error(), "connection refused") { + xs.Connected = false + xs._NotifyState() + } + common.APIError(c, err.Error()) + return + } + + c.JSON(http.StatusOK, data) + }) +} + +// PassthroughPost Used to declare a route that sends directly a POST request to XDS Server +func (xs *XdsServer) PassthroughPost(url string) { + if xs.apiRouter == nil { + xs.Log.Errorf("apiRouter not set !") + return + } + + xs.apiRouter.POST(url, func(c *gin.Context) { + bodyReq := []byte{} + n, err := c.Request.Body.Read(bodyReq) + if err != nil { + common.APIError(c, err.Error()) + return + } + + response, err := xs.HTTPPostBody(url, string(bodyReq[:n])) + if err != nil { + common.APIError(c, err.Error()) + return + } + bodyRes, err := ioutil.ReadAll(response.Body) + if err != nil { + common.APIError(c, "Cannot read response body") + return + } + c.JSON(http.StatusOK, string(bodyRes)) + }) +} + +// EventOn Register a callback on events reception +func (xs *XdsServer) EventOn(message string, f interface{}) (err error) { + if xs.ioSock == nil { + return fmt.Errorf("Io.Socket not initialized") + } + // FIXME SEB: support chain / multiple listeners + /* sockEvents map[string][]*caller + xs.sockEventsLock.Lock() + xs.sockEvents[message] = append(xs.sockEvents[message], f) + xs.sockEventsLock.Unlock() + xs.ioSock.On(message, func(ev) { + + }) + */ + return xs.ioSock.On(message, f) +} + +// ProjectToFolder +func (xs *XdsServer) ProjectToFolder(pPrj ProjectConfig) *FolderConfig { + stID := "" + if pPrj.Type == XdsTypeCloudSync { + stID, _ = xs.SThg.IDGet() + } + fPrj := FolderConfig{ + ID: pPrj.ID, + Label: pPrj.Label, + ClientPath: pPrj.ClientPath, + Type: FolderType(pPrj.Type), + Status: pPrj.Status, + IsInSync: pPrj.IsInSync, + DefaultSdk: pPrj.DefaultSdk, + DataPathMap: PathMapConfig{ + ServerPath: pPrj.ServerPath, + }, + DataCloudSync: CloudSyncConfig{ + SyncThingID: stID, + }, + } + return &fPrj +} + +// FolderToProject +func (xs *XdsServer) FolderToProject(fPrj FolderConfig) ProjectConfig { + pPrj := ProjectConfig{ + ID: fPrj.ID, + ServerID: xs.ID, + Label: fPrj.Label, + ClientPath: fPrj.ClientPath, + ServerPath: fPrj.DataPathMap.ServerPath, + Type: ProjectType(fPrj.Type), + Status: fPrj.Status, + IsInSync: fPrj.IsInSync, + DefaultSdk: fPrj.DefaultSdk, + } + return pPrj +} + +/*** +** Private functions +***/ + +// Create HTTP client +func (xs *XdsServer) _CreateConnectHTTP() error { + var err error + xs.client, err = common.HTTPNewClient(xs.BaseURL, + common.HTTPClientConfig{ + URLPrefix: "/api/v1", + HeaderClientKeyName: "Xds-Sid", + CsrfDisable: true, + LogOut: xs.logOut, + LogPrefix: "XDSSERVER: ", + LogLevel: common.HTTPLogLevelWarning, + }) + + xs.client.SetLogLevel(xs.Log.Level.String()) + + if err != nil { + msg := ": " + err.Error() + if strings.Contains(err.Error(), "connection refused") { + msg = fmt.Sprintf("(url: %s)", xs.BaseURL) + } + return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg) + } + if xs.client == nil { + return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)") + } + + return nil +} + +// Re-established connection +func (xs *XdsServer) _reconnect() error { + err := xs._connect(true) + if err == nil { + // Reload projects list for this server + err = xs.projects.Init(xs) + } + return err +} + +// Established HTTP and WS connection and retrieve XDSServer config +func (xs *XdsServer) _connect(reConn bool) error { + + xdsCfg := xdsServerConfig{} + if err := xs.HTTPGet("/config", &xdsCfg); err != nil { + xs.Connected = false + if !reConn { + xs._NotifyState() + } + return err + } + + if reConn && xs.ID != xdsCfg.ID { + xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ID) + } + + // Update local XDS config + xs.ID = xdsCfg.ID + xs.ServerConfig = &xdsCfg + + // Establish WS connection and register listen + if err := xs._SocketConnect(); err != nil { + xs.Connected = false + xs._NotifyState() + return err + } + + xs.Connected = true + xs._NotifyState() + return nil +} + +// Create WebSocket (io.socket) connection +func (xs *XdsServer) _SocketConnect() error { + + xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL) + + opts := &sio_client.Options{ + Transport: "websocket", + Header: make(map[string][]string), + } + opts.Header["XDS-SID"] = []string{xs.client.GetClientID()} + + iosk, err := sio_client.NewClient(xs.BaseURL, opts) + if err != nil { + return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err) + } + xs.ioSock = iosk + + // Register some listeners + + iosk.On("error", func(err error) { + xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err) + if xs.CBOnError != nil { + xs.CBOnError(err) + } + }) + + iosk.On("disconnection", func(err error) { + xs.Log.Infof("IO.socket disconnection server %s", xs.ID) + if xs.CBOnDisconnect != nil { + xs.CBOnDisconnect(err) + } + xs.Connected = false + xs._NotifyState() + + // Try to reconnect during 15min (or at least while not disabled) + go func() { + count := 0 + waitTime := 1 + for !xs.Disabled && !xs.Connected { + count++ + if count%60 == 0 { + waitTime *= 5 + } + if waitTime > 15*60 { + xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID) + return + } + time.Sleep(time.Second * time.Duration(waitTime)) + xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count) + + xs._reconnect() + } + }() + }) + + // XXX - There is no connection event generated so, just consider that + // we are connected when NewClient return successfully + /* iosk.On("connection", func() { ... }) */ + xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID) + + return nil +} + +// Send event to notify changes +func (xs *XdsServer) _NotifyState() { + + evSts := ServerCfg{ + ID: xs.ID, + URL: xs.BaseURL, + APIURL: xs.APIURL, + PartialURL: xs.PartialURL, + ConnRetry: xs.ConnRetry, + Connected: xs.Connected, + } + if err := xs.events.Emit(EVTServerConfig, evSts); err != nil { + xs.Log.Warningf("Cannot notify XdsServer state change: %v", err) + } +} diff --git a/lib/apiv1/apiv1.go b/lib/apiv1/apiv1.go deleted file mode 100644 index 734929b..0000000 --- a/lib/apiv1/apiv1.go +++ /dev/null @@ -1,36 +0,0 @@ -package apiv1 - -import ( - "github.com/Sirupsen/logrus" - "github.com/gin-gonic/gin" - - "github.com/iotbzh/xds-agent/lib/session" - "github.com/iotbzh/xds-agent/lib/xdsconfig" -) - -// APIService . -type APIService struct { - router *gin.Engine - apiRouter *gin.RouterGroup - sessions *session.Sessions - cfg *xdsconfig.Config - log *logrus.Logger -} - -// New creates a new instance of API service -func New(sess *session.Sessions, conf *xdsconfig.Config, log *logrus.Logger, r *gin.Engine) *APIService { - s := &APIService{ - router: r, - sessions: sess, - apiRouter: r.Group("/api/v1"), - cfg: conf, - log: log, - } - - s.apiRouter.GET("/version", s.getVersion) - - s.apiRouter.GET("/config", s.getConfig) - s.apiRouter.POST("/config", s.setConfig) - - return s -} diff --git a/lib/apiv1/config.go b/lib/apiv1/config.go deleted file mode 100644 index 47155ed..0000000 --- a/lib/apiv1/config.go +++ /dev/null @@ -1,45 +0,0 @@ -package apiv1 - -import ( - "net/http" - "sync" - - "github.com/gin-gonic/gin" - "github.com/iotbzh/xds-agent/lib/xdsconfig" - common "github.com/iotbzh/xds-common/golib" -) - -var confMut sync.Mutex - -// GetConfig returns the configuration -func (s *APIService) getConfig(c *gin.Context) { - confMut.Lock() - defer confMut.Unlock() - - c.JSON(http.StatusOK, s.cfg) -} - -// SetConfig sets configuration -func (s *APIService) setConfig(c *gin.Context) { - // FIXME - must be tested - c.JSON(http.StatusNotImplemented, "Not implemented") - - var cfgArg xdsconfig.Config - - if c.BindJSON(&cfgArg) != nil { - common.APIError(c, "Invalid arguments") - return - } - - confMut.Lock() - defer confMut.Unlock() - - s.log.Debugln("SET config: ", cfgArg) - - if err := s.cfg.UpdateAll(cfgArg); err != nil { - common.APIError(c, err.Error()) - return - } - - c.JSON(http.StatusOK, s.cfg) -} diff --git a/lib/apiv1/version.go b/lib/apiv1/version.go deleted file mode 100644 index e022441..0000000 --- a/lib/apiv1/version.go +++ /dev/null @@ -1,24 +0,0 @@ -package apiv1 - -import ( - "net/http" - - "github.com/gin-gonic/gin" -) - -type version struct { - Version string `json:"version"` - APIVersion string `json:"apiVersion"` - VersionGitTag string `json:"gitTag"` -} - -// getInfo : return various information about server -func (s *APIService) getVersion(c *gin.Context) { - response := version{ - Version: s.cfg.Version, - APIVersion: s.cfg.APIVersion, - VersionGitTag: s.cfg.VersionGitTag, - } - - c.JSON(http.StatusOK, response) -} diff --git a/lib/session/session.go b/lib/session/session.go deleted file mode 100644 index b56f9ff..0000000 --- a/lib/session/session.go +++ /dev/null @@ -1,227 +0,0 @@ -package session - -import ( - "encoding/base64" - "strconv" - "time" - - "github.com/Sirupsen/logrus" - "github.com/gin-gonic/gin" - "github.com/googollee/go-socket.io" - uuid "github.com/satori/go.uuid" - "github.com/syncthing/syncthing/lib/sync" -) - -const sessionCookieName = "xds-agent-sid" -const sessionHeaderName = "XDS-AGENT-SID" - -const sessionMonitorTime = 10 // Time (in seconds) to schedule monitoring session tasks - -const initSessionMaxAge = 10 // Initial session max age in seconds -const maxSessions = 100000 // Maximum number of sessions in sessMap map - -const secureCookie = false // TODO: see https://github.com/astaxie/beego/blob/master/session/session.go#L218 - -// ClientSession contains the info of a user/client session -type ClientSession struct { - ID string - WSID string // only one WebSocket per client/session - MaxAge int64 - IOSocket *socketio.Socket - - // private - expireAt time.Time - useCount int64 -} - -// Sessions holds client sessions -type Sessions struct { - router *gin.Engine - cookieMaxAge int64 - sessMap map[string]ClientSession - mutex sync.Mutex - log *logrus.Logger - stop chan struct{} // signals intentional stop -} - -// NewClientSessions . -func NewClientSessions(router *gin.Engine, log *logrus.Logger, cookieMaxAge string) *Sessions { - ckMaxAge, err := strconv.ParseInt(cookieMaxAge, 10, 0) - if err != nil { - ckMaxAge = 0 - } - s := Sessions{ - router: router, - cookieMaxAge: ckMaxAge, - sessMap: make(map[string]ClientSession), - mutex: sync.NewMutex(), - log: log, - stop: make(chan struct{}), - } - s.router.Use(s.Middleware()) - - // Start monitoring of sessions Map (use to manage expiration and cleanup) - go s.monitorSessMap() - - return &s -} - -// Stop sessions management -func (s *Sessions) Stop() { - close(s.stop) -} - -// Middleware is used to managed session -func (s *Sessions) Middleware() gin.HandlerFunc { - return func(c *gin.Context) { - // FIXME Add CSRF management - - // Get session - sess := s.Get(c) - if sess == nil { - // Allocate a new session key and put in cookie - sess = s.newSession("") - } else { - s.refresh(sess.ID) - } - - // Set session in cookie and in header - // Do not set Domain to localhost (http://stackoverflow.com/questions/1134290/cookies-on-localhost-with-explicit-domain) - c.SetCookie(sessionCookieName, sess.ID, int(sess.MaxAge), "/", "", - secureCookie, false) - c.Header(sessionHeaderName, sess.ID) - - // Save session id in gin metadata - c.Set(sessionCookieName, sess.ID) - - c.Next() - } -} - -// Get returns the client session for a specific ID -func (s *Sessions) Get(c *gin.Context) *ClientSession { - var sid string - - // First get from gin metadata - v, exist := c.Get(sessionCookieName) - if v != nil { - sid = v.(string) - } - - // Then look in cookie - if !exist || sid == "" { - sid, _ = c.Cookie(sessionCookieName) - } - - // Then look in Header - if sid == "" { - sid = c.Request.Header.Get(sessionCookieName) - } - if sid != "" { - s.mutex.Lock() - defer s.mutex.Unlock() - if key, ok := s.sessMap[sid]; ok { - // TODO: return a copy ??? - return &key - } - } - return nil -} - -// IOSocketGet Get socketio definition from sid -func (s *Sessions) IOSocketGet(sid string) *socketio.Socket { - s.mutex.Lock() - defer s.mutex.Unlock() - sess, ok := s.sessMap[sid] - if ok { - return sess.IOSocket - } - return nil -} - -// UpdateIOSocket updates the IO Socket definition for of a session -func (s *Sessions) UpdateIOSocket(sid string, so *socketio.Socket) error { - s.mutex.Lock() - defer s.mutex.Unlock() - if _, ok := s.sessMap[sid]; ok { - sess := s.sessMap[sid] - if so == nil { - // Could be the case when socketio is closed/disconnected - sess.WSID = "" - } else { - sess.WSID = (*so).Id() - } - sess.IOSocket = so - s.sessMap[sid] = sess - } - return nil -} - -// nesSession Allocate a new client session -func (s *Sessions) newSession(prefix string) *ClientSession { - uuid := prefix + uuid.NewV4().String() - id := base64.URLEncoding.EncodeToString([]byte(uuid)) - se := ClientSession{ - ID: id, - WSID: "", - MaxAge: initSessionMaxAge, - IOSocket: nil, - expireAt: time.Now().Add(time.Duration(initSessionMaxAge) * time.Second), - useCount: 0, - } - s.mutex.Lock() - defer s.mutex.Unlock() - - s.sessMap[se.ID] = se - - s.log.Debugf("NEW session (%d): %s", len(s.sessMap), id) - return &se -} - -// refresh Move this session ID to the head of the list -func (s *Sessions) refresh(sid string) { - s.mutex.Lock() - defer s.mutex.Unlock() - - sess := s.sessMap[sid] - sess.useCount++ - if sess.MaxAge < s.cookieMaxAge && sess.useCount > 1 { - sess.MaxAge = s.cookieMaxAge - sess.expireAt = time.Now().Add(time.Duration(sess.MaxAge) * time.Second) - } - - // TODO - Add flood detection (like limit_req of nginx) - // (delayed request when to much requests in a short period of time) - - s.sessMap[sid] = sess -} - -func (s *Sessions) monitorSessMap() { - const dbgFullTrace = false // for debugging - - for { - select { - case <-s.stop: - s.log.Debugln("Stop monitorSessMap") - return - case <-time.After(sessionMonitorTime * time.Second): - if dbgFullTrace { - s.log.Debugf("Sessions Map size: %d", len(s.sessMap)) - s.log.Debugf("Sessions Map : %v", s.sessMap) - } - - if len(s.sessMap) > maxSessions { - s.log.Errorln("TOO MUCH sessions, cleanup old ones !") - } - - s.mutex.Lock() - for _, ss := range s.sessMap { - if ss.expireAt.Sub(time.Now()) < 0 { - s.log.Debugf("Delete expired session id: %s", ss.ID) - delete(s.sessMap, ss.ID) - } - } - s.mutex.Unlock() - } - } -} diff --git a/lib/syncthing/st.go b/lib/syncthing/st.go index 660738d..bc3b101 100644 --- a/lib/syncthing/st.go +++ b/lib/syncthing/st.go @@ -24,11 +24,14 @@ import ( // SyncThing . type SyncThing struct { - BaseURL string - APIKey string - Home string - STCmd *exec.Cmd - STICmd *exec.Cmd + BaseURL string + APIKey string + Home string + STCmd *exec.Cmd + STICmd *exec.Cmd + MyID string + Connected bool + Events *Events // Private fields binDir string @@ -37,6 +40,7 @@ type SyncThing struct { exitSTIChan chan ExitChan client *common.HTTPClient log *logrus.Logger + conf *xdsconfig.Config } // ExitChan Channel used for process exit @@ -45,6 +49,42 @@ type ExitChan struct { err error } +// ConfigInSync Check whether if Syncthing configuration is in sync +type configInSync struct { + ConfigInSync bool `json:"configInSync"` +} + +// FolderStatus Information about the current status of a folder. +type FolderStatus struct { + GlobalFiles int `json:"globalFiles"` + GlobalDirectories int `json:"globalDirectories"` + GlobalSymlinks int `json:"globalSymlinks"` + GlobalDeleted int `json:"globalDeleted"` + GlobalBytes int64 `json:"globalBytes"` + + LocalFiles int `json:"localFiles"` + LocalDirectories int `json:"localDirectories"` + LocalSymlinks int `json:"localSymlinks"` + LocalDeleted int `json:"localDeleted"` + LocalBytes int64 `json:"localBytes"` + + NeedFiles int `json:"needFiles"` + NeedDirectories int `json:"needDirectories"` + NeedSymlinks int `json:"needSymlinks"` + NeedDeletes int `json:"needDeletes"` + NeedBytes int64 `json:"needBytes"` + + InSyncFiles int `json:"inSyncFiles"` + InSyncBytes int64 `json:"inSyncBytes"` + + State string `json:"state"` + StateChanged time.Time `json:"stateChanged"` + + Sequence int64 `json:"sequence"` + + IgnorePatterns bool `json:"ignorePatterns"` +} + // NewSyncThing creates a new instance of Syncthing func NewSyncThing(conf *xdsconfig.Config, log *logrus.Logger) *SyncThing { var url, apiKey, home, binDir string @@ -75,8 +115,12 @@ func NewSyncThing(conf *xdsconfig.Config, log *logrus.Logger) *SyncThing { binDir: binDir, logsDir: conf.FileConf.LogsDir, log: log, + conf: conf, } + // Create Events monitoring + // SEB TO TEST s.Events = s.NewEventListener() + return &s } @@ -182,6 +226,8 @@ func (s *SyncThing) Start() (*exec.Cmd, error) { "STNOUPGRADE=1", } + /* SEB STILL NEEDED, if not SUP code + // XXX - temporary hack because -gui-apikey seems to correctly handle by // syncthing the early first time stConfigFile := filepath.Join(s.Home, "config.xml") @@ -211,12 +257,12 @@ func (s *SyncThing) Start() (*exec.Cmd, error) { return nil, fmt.Errorf("Cannot write Syncthing config file to set apikey") } } - + */ s.STCmd, err = s.startProc("syncthing", args, env, &s.exitSTChan) // Use autogenerated apikey if not set by config.json - if s.APIKey == "" { - if fd, err := os.Open(stConfigFile); err == nil { + if err == nil && s.APIKey == "" { + if fd, err := os.Open(filepath.Join(s.Home, "config.xml")); err == nil { defer fd.Close() if b, err := ioutil.ReadAll(fd); err == nil { re := regexp.MustCompile("(.*)") @@ -294,11 +340,17 @@ func (s *SyncThing) StopInotify() { // Connect Establish HTTP connection with Syncthing func (s *SyncThing) Connect() error { var err error + s.Connected = false s.client, err = common.HTTPNewClient(s.BaseURL, common.HTTPClientConfig{ URLPrefix: "/rest", HeaderClientKeyName: "X-Syncthing-ID", + LogOut: s.conf.LogVerboseOut, + LogPrefix: "SYNCTHING: ", + LogLevel: common.HTTPLogLevelWarning, }) + s.client.SetLogLevel(s.log.Level.String()) + if err != nil { msg := ": " + err.Error() if strings.Contains(err.Error(), "connection refused") { @@ -310,11 +362,17 @@ func (s *SyncThing) Connect() error { return fmt.Errorf("ERROR: cannot connect to Syncthing (null client)") } - s.client.SetLogLevel(s.log.Level.String()) - s.client.LoggerPrefix = "SYNCTHING: " - s.client.LoggerOut = s.log.Out + s.MyID, err = s.IDGet() + if err != nil { + return fmt.Errorf("ERROR: cannot retrieve ID") + } + + s.Connected = true - return nil + // Start events monitoring + //SEB TODO err = s.Events.Start() + + return err } // IDGet returns the Syncthing ID of Syncthing instance running locally @@ -347,3 +405,16 @@ func (s *SyncThing) ConfigSet(cfg config.Configuration) error { } return s.client.HTTPPost("system/config", string(body)) } + +// IsConfigInSync Returns true if configuration is in sync +func (s *SyncThing) IsConfigInSync() (bool, error) { + var data []byte + var d configInSync + if err := s.client.HTTPGet("system/config/insync", &data); err != nil { + return false, err + } + if err := json.Unmarshal(data, &d); err != nil { + return false, err + } + return d.ConfigInSync, nil +} diff --git a/lib/syncthing/stEvent.go b/lib/syncthing/stEvent.go new file mode 100644 index 0000000..9ca8b78 --- /dev/null +++ b/lib/syncthing/stEvent.go @@ -0,0 +1,265 @@ +package st + +import ( + "encoding/json" + "fmt" + "os" + "strconv" + "strings" + "time" + + "github.com/Sirupsen/logrus" +) + +// Events . +type Events struct { + MonitorTime time.Duration + Debug bool + + stop chan bool + st *SyncThing + log *logrus.Logger + cbArr map[string][]cbMap +} + +type Event struct { + Type string `json:"type"` + Time time.Time `json:"time"` + Data map[string]string `json:"data"` +} + +type EventsCBData map[string]interface{} +type EventsCB func(ev Event, cbData *EventsCBData) + +const ( + EventFolderCompletion string = "FolderCompletion" + EventFolderSummary string = "FolderSummary" + EventFolderPaused string = "FolderPaused" + EventFolderResumed string = "FolderResumed" + EventFolderErrors string = "FolderErrors" + EventStateChanged string = "StateChanged" +) + +var EventsAll string = EventFolderCompletion + "|" + + EventFolderSummary + "|" + + EventFolderPaused + "|" + + EventFolderResumed + "|" + + EventFolderErrors + "|" + + EventStateChanged + +type STEvent struct { + // Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API + SubscriptionID int `json:"id"` + // Global ID of the event across all subscriptions + GlobalID int `json:"globalID"` + Time time.Time `json:"time"` + Type string `json:"type"` + Data map[string]interface{} `json:"data"` +} + +type cbMap struct { + id int + cb EventsCB + filterID string + data *EventsCBData +} + +// NewEventListener Create a new instance of Event listener +func (s *SyncThing) NewEventListener() *Events { + _, dbg := os.LookupEnv("XDS_DEBUG_STEVENTS") // set to add more debug log + return &Events{ + MonitorTime: 100, // in Milliseconds + Debug: dbg, + stop: make(chan bool, 1), + st: s, + log: s.log, + cbArr: make(map[string][]cbMap), + } +} + +// Start starts event monitoring loop +func (e *Events) Start() error { + go e.monitorLoop() + return nil +} + +// Stop stops event monitoring loop +func (e *Events) Stop() { + e.stop <- true +} + +// Register Add a listener on an event +func (e *Events) Register(evName string, cb EventsCB, filterID string, data *EventsCBData) (int, error) { + if evName == "" || !strings.Contains(EventsAll, evName) { + return -1, fmt.Errorf("Unknown event name") + } + if data == nil { + data = &EventsCBData{} + } + + cbList := []cbMap{} + if _, ok := e.cbArr[evName]; ok { + cbList = e.cbArr[evName] + } + + id := len(cbList) + (*data)["id"] = strconv.Itoa(id) + + e.cbArr[evName] = append(cbList, cbMap{id: id, cb: cb, filterID: filterID, data: data}) + + return id, nil +} + +// UnRegister Remove a listener event +func (e *Events) UnRegister(evName string, id int) error { + cbKey, ok := e.cbArr[evName] + if !ok { + return fmt.Errorf("No event registered to such name") + } + + // FIXME - NOT TESTED + if id >= len(cbKey) { + return fmt.Errorf("Invalid id") + } else if id == len(cbKey) { + e.cbArr[evName] = cbKey[:id-1] + } else { + e.cbArr[evName] = cbKey[id : id+1] + } + + return nil +} + +// GetEvents returns the Syncthing events +func (e *Events) getEvents(since int) ([]STEvent, error) { + var data []byte + ev := []STEvent{} + url := "events" + if since != -1 { + url += "?since=" + strconv.Itoa(since) + } + if err := e.st.client.HTTPGet(url, &data); err != nil { + return ev, err + } + err := json.Unmarshal(data, &ev) + return ev, err +} + +// Loop to monitor Syncthing events +func (e *Events) monitorLoop() { + e.log.Infof("Event monitoring running...") + since := 0 + cntErrConn := 0 + cntErrRetry := 1 + for { + select { + case <-e.stop: + e.log.Infof("Event monitoring exited") + return + + case <-time.After(e.MonitorTime * time.Millisecond): + + if !e.st.Connected { + cntErrConn++ + time.Sleep(time.Second) + if cntErrConn > cntErrRetry { + e.log.Error("ST Event monitor: ST connection down") + cntErrConn = 0 + cntErrRetry *= 2 + if _, err := e.getEvents(since); err == nil { + e.st.Connected = true + cntErrRetry = 1 + // XXX - should we reset since value ? + goto readEvent + } + } + continue + } + + readEvent: + stEvArr, err := e.getEvents(since) + if err != nil { + e.log.Errorf("Syncthing Get Events: %v", err) + e.st.Connected = false + continue + } + + // Process events + for _, stEv := range stEvArr { + since = stEv.SubscriptionID + if e.Debug { + e.log.Warnf("ST EVENT: %d %s\n %v", stEv.GlobalID, stEv.Type, stEv) + } + + cbKey, ok := e.cbArr[stEv.Type] + if !ok { + continue + } + + evData := Event{ + Type: stEv.Type, + Time: stEv.Time, + } + + // Decode Events + // FIXME: re-define data struct for each events + // instead of map of string and use JSON marshing/unmarshing + fID := "" + evData.Data = make(map[string]string) + switch stEv.Type { + + case EventFolderCompletion: + fID = convString(stEv.Data["folder"]) + evData.Data["completion"] = convFloat64(stEv.Data["completion"]) + + case EventFolderSummary: + fID = convString(stEv.Data["folder"]) + evData.Data["needBytes"] = convInt64(stEv.Data["needBytes"]) + evData.Data["state"] = convString(stEv.Data["state"]) + + case EventFolderPaused, EventFolderResumed: + fID = convString(stEv.Data["id"]) + evData.Data["label"] = convString(stEv.Data["label"]) + + case EventFolderErrors: + fID = convString(stEv.Data["folder"]) + // TODO decode array evData.Data["errors"] = convString(stEv.Data["errors"]) + + case EventStateChanged: + fID = convString(stEv.Data["folder"]) + evData.Data["from"] = convString(stEv.Data["from"]) + evData.Data["to"] = convString(stEv.Data["to"]) + + default: + e.log.Warnf("Unsupported event type") + } + + if fID != "" { + evData.Data["id"] = fID + } + + // Call all registered callbacks + for _, c := range cbKey { + if e.Debug { + e.log.Warnf("EVENT CB fID=%s, filterID=%s", fID, c.filterID) + } + // Call when filterID is not set or when it matches + if c.filterID == "" || (fID != "" && fID == c.filterID) { + c.cb(evData, c.data) + } + } + } + } + } +} + +func convString(d interface{}) string { + return d.(string) +} + +func convFloat64(d interface{}) string { + return strconv.FormatFloat(d.(float64), 'f', -1, 64) +} + +func convInt64(d interface{}) string { + return strconv.FormatInt(d.(int64), 10) +} diff --git a/lib/syncthing/stfolder.go b/lib/syncthing/stfolder.go index d79e579..a5312eb 100644 --- a/lib/syncthing/stfolder.go +++ b/lib/syncthing/stfolder.go @@ -1,10 +1,12 @@ package st import ( - "path/filepath" + "encoding/json" + "fmt" "strings" - "github.com/syncthing/syncthing/lib/config" + common "github.com/iotbzh/xds-common/golib" + stconfig "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/protocol" ) @@ -18,23 +20,23 @@ type FolderChangeArg struct { } // FolderChange is called when configuration has changed -func (s *SyncThing) FolderChange(f FolderChangeArg) error { +func (s *SyncThing) FolderChange(f FolderChangeArg) (string, error) { // Get current config stCfg, err := s.ConfigGet() if err != nil { s.log.Errorln(err) - return err + return "", err } // Add new Device if needed var devID protocol.DeviceID if err := devID.UnmarshalText([]byte(f.SyncThingID)); err != nil { - s.log.Errorf("not a valid device id (err %v)\n", err) - return err + s.log.Errorf("not a valid device id (err %v)", err) + return "", err } - newDevice := config.DeviceConfiguration{ + newDevice := stconfig.DeviceConfiguration{ DeviceID: devID, Name: f.SyncThingID, Addresses: []string{"dynamic"}, @@ -60,18 +62,33 @@ func (s *SyncThing) FolderChange(f FolderChangeArg) error { id = f.SyncThingID[0:15] + "_" + label } - folder := config.FolderConfiguration{ - ID: id, - Label: label, - RawPath: filepath.Join(f.ShareRootDir, f.RelativePath), + // Resolve local path + pathCli, err := common.ResolveEnvVar(f.RelativePath) + if err != nil { + pathCli = f.RelativePath + } + // SEB still need ShareRootDir ? a sup + // pathCli := filepath.Join(f.ShareRootDir, f.RelativePath) + + folder := stconfig.FolderConfiguration{ + ID: id, + Label: label, + RawPath: pathCli, + AutoNormalize: true, } - folder.Devices = append(folder.Devices, config.FolderDeviceConfiguration{ + /* TODO - add it ? + if s.conf.FileConf.SThgConf.RescanIntervalS > 0 { + folder.RescanIntervalS = s.conf.FileConf.SThgConf.RescanIntervalS + } + */ + + folder.Devices = append(folder.Devices, stconfig.FolderDeviceConfiguration{ DeviceID: newDevice.DeviceID, }) found = false - var fld config.FolderConfiguration + var fld stconfig.FolderConfiguration for _, fld = range stCfg.Folders { if folder.ID == fld.ID { fld = folder @@ -89,7 +106,7 @@ func (s *SyncThing) FolderChange(f FolderChangeArg) error { s.log.Errorln(err) } - return nil + return id, nil } // FolderDelete is called to delete a folder config @@ -114,3 +131,61 @@ func (s *SyncThing) FolderDelete(id string) error { return nil } + +// FolderConfigGet Returns the configuration of a specific folder +func (s *SyncThing) FolderConfigGet(folderID string) (stconfig.FolderConfiguration, error) { + fc := stconfig.FolderConfiguration{} + if folderID == "" { + return fc, fmt.Errorf("folderID not set") + } + cfg, err := s.ConfigGet() + if err != nil { + return fc, err + } + for _, f := range cfg.Folders { + if f.ID == folderID { + fc = f + return fc, nil + } + } + return fc, fmt.Errorf("id not found") +} + +// FolderStatus Returns all information about the current +func (s *SyncThing) FolderStatus(folderID string) (*FolderStatus, error) { + var data []byte + var res FolderStatus + if folderID == "" { + return nil, fmt.Errorf("folderID not set") + } + if err := s.client.HTTPGet("db/status?folder="+folderID, &data); err != nil { + return nil, err + } + if err := json.Unmarshal(data, &res); err != nil { + return nil, err + } + return &res, nil +} + +// IsFolderInSync Returns true when folder is in sync +func (s *SyncThing) IsFolderInSync(folderID string) (bool, error) { + sts, err := s.FolderStatus(folderID) + if err != nil { + return false, err + } + return sts.NeedBytes == 0 && sts.State == "idle", nil +} + +// FolderScan Request immediate folder scan. +// Scan all folders if folderID param is empty +func (s *SyncThing) FolderScan(folderID string, subpath string) error { + url := "db/scan" + if folderID != "" { + url += "?folder=" + folderID + + if subpath != "" { + url += "&sub=" + subpath + } + } + return s.client.HTTPPost(url, "") +} diff --git a/lib/webserver/server.go b/lib/webserver/server.go deleted file mode 100644 index b835a65..0000000 --- a/lib/webserver/server.go +++ /dev/null @@ -1,226 +0,0 @@ -package webserver - -import ( - "fmt" - "net/http" - "strings" - - "github.com/Sirupsen/logrus" - "github.com/gin-gonic/gin" - "github.com/googollee/go-socket.io" - "github.com/iotbzh/xds-agent/lib/apiv1" - "github.com/iotbzh/xds-agent/lib/session" - "github.com/iotbzh/xds-agent/lib/xdsconfig" -) - -// ServerService . -type ServerService struct { - router *gin.Engine - api *apiv1.APIService - sIOServer *socketio.Server - webApp *gin.RouterGroup - cfg *xdsconfig.Config - sessions *session.Sessions - log *logrus.Logger - stop chan struct{} // signals intentional stop -} - -const indexFilename = "index.html" -const cookieMaxAge = "3600" - -// New creates an instance of ServerService -func New(conf *xdsconfig.Config, log *logrus.Logger) *ServerService { - - // Setup logging for gin router - if log.Level == logrus.DebugLevel { - gin.SetMode(gin.DebugMode) - } else { - gin.SetMode(gin.ReleaseMode) - } - - // TODO - // - try to bind gin DefaultWriter & DefaultErrorWriter to logrus logger - // - try to fix pb about isTerminal=false when out is in VSC Debug Console - //gin.DefaultWriter = ?? - //gin.DefaultErrorWriter = ?? - - // Creates gin router - r := gin.New() - - svr := &ServerService{ - router: r, - api: nil, - sIOServer: nil, - webApp: nil, - cfg: conf, - log: log, - sessions: nil, - stop: make(chan struct{}), - } - - return svr -} - -// Serve starts a new instance of the Web Server -func (s *ServerService) Serve() error { - var err error - - // Setup middlewares - s.router.Use(gin.Logger()) - s.router.Use(gin.Recovery()) - s.router.Use(s.middlewareCORS()) - s.router.Use(s.middlewareXDSDetails()) - s.router.Use(s.middlewareCSRF()) - - // Sessions manager - s.sessions = session.NewClientSessions(s.router, s.log, cookieMaxAge) - - s.router.GET("", s.slashHandler) - - // Create REST API - s.api = apiv1.New(s.sessions, s.cfg, s.log, s.router) - - // Websocket routes - s.sIOServer, err = socketio.NewServer(nil) - if err != nil { - s.log.Fatalln(err) - } - - s.router.GET("/socket.io/", s.socketHandler) - s.router.POST("/socket.io/", s.socketHandler) - /* TODO: do we want to support ws://... ? - s.router.Handle("WS", "/socket.io/", s.socketHandler) - s.router.Handle("WSS", "/socket.io/", s.socketHandler) - */ - - // Serve in the background - serveError := make(chan error, 1) - go func() { - fmt.Printf("Web Server running on localhost:%s ...\n", s.cfg.HTTPPort) - serveError <- http.ListenAndServe(":"+s.cfg.HTTPPort, s.router) - }() - - fmt.Printf("XDS agent running...\n") - - // Wait for stop, restart or error signals - select { - case <-s.stop: - // Shutting down permanently - s.sessions.Stop() - s.log.Infoln("shutting down (stop)") - case err = <-serveError: - // Error due to listen/serve failure - s.log.Errorln(err) - } - - return nil -} - -// Stop web server -func (s *ServerService) Stop() { - close(s.stop) -} - -// serveSlash provides response to GET "/" -func (s *ServerService) slashHandler(c *gin.Context) { - c.String(200, "Hello from XDS agent!") -} - -// Add details in Header -func (s *ServerService) middlewareXDSDetails() gin.HandlerFunc { - return func(c *gin.Context) { - c.Header("XDS-Agent-Version", s.cfg.Version) - c.Header("XDS-API-Version", s.cfg.APIVersion) - c.Next() - } -} - -func (s *ServerService) isValidAPIKey(key string) bool { - return (key == s.cfg.FileConf.XDSAPIKey && key != "") -} - -func (s *ServerService) middlewareCSRF() gin.HandlerFunc { - return func(c *gin.Context) { - // Allow requests carrying a valid API key - if s.isValidAPIKey(c.Request.Header.Get("X-API-Key")) { - // Set the access-control-allow-origin header for CORS requests - // since a valid API key has been provided - c.Header("Access-Control-Allow-Origin", "*") - c.Next() - return - } - - // Allow io.socket request - if strings.HasPrefix(c.Request.URL.Path, "/socket.io") { - c.Next() - return - } - - /* FIXME Add really CSRF support - - // Allow requests for anything not under the protected path prefix, - // and set a CSRF cookie if there isn't already a valid one. - if !strings.HasPrefix(c.Request.URL.Path, prefix) { - cookie, err := c.Cookie("CSRF-Token-" + unique) - if err != nil || !validCsrfToken(cookie.Value) { - s.log.Debugln("new CSRF cookie in response to request for", c.Request.URL) - c.SetCookie("CSRF-Token-"+unique, newCsrfToken(), 600, "/", "", false, false) - } - c.Next() - return - } - - // Verify the CSRF token - token := c.Request.Header.Get("X-CSRF-Token-" + unique) - if !validCsrfToken(token) { - c.AbortWithError(403, "CSRF Error") - return - } - - c.Next() - */ - c.AbortWithError(403, fmt.Errorf("Not valid API key")) - } -} - -// CORS middleware -func (s *ServerService) middlewareCORS() gin.HandlerFunc { - return func(c *gin.Context) { - if c.Request.Method == "OPTIONS" { - c.Header("Access-Control-Allow-Origin", "*") - c.Header("Access-Control-Allow-Headers", "Content-Type, X-API-Key") - c.Header("Access-Control-Allow-Methods", "GET, POST, DELETE") - c.Header("Access-Control-Max-Age", cookieMaxAge) - c.AbortWithStatus(204) - return - } - c.Next() - } -} - -// socketHandler is the handler for the "main" websocket connection -func (s *ServerService) socketHandler(c *gin.Context) { - - // Retrieve user session - sess := s.sessions.Get(c) - if sess == nil { - c.JSON(500, gin.H{"error": "Cannot retrieve session"}) - return - } - - s.sIOServer.On("connection", func(so socketio.Socket) { - s.log.Debugf("WS Connected (SID=%v)", so.Id()) - s.sessions.UpdateIOSocket(sess.ID, &so) - - so.On("disconnection", func() { - s.log.Debugf("WS disconnected (SID=%v)", so.Id()) - s.sessions.UpdateIOSocket(sess.ID, nil) - }) - }) - - s.sIOServer.On("error", func(so socketio.Socket, err error) { - s.log.Errorf("WS SID=%v Error : %v", so.Id(), err.Error()) - }) - - s.sIOServer.ServeHTTP(c.Writer, c.Request) -} diff --git a/lib/xdsconfig/config.go b/lib/xdsconfig/config.go index 854d383..9cff862 100644 --- a/lib/xdsconfig/config.go +++ b/lib/xdsconfig/config.go @@ -2,6 +2,8 @@ package xdsconfig import ( "fmt" + "io" + "path/filepath" "os" @@ -12,14 +14,20 @@ import ( // Config parameters (json format) of /config command type Config struct { - Version string `json:"version"` - APIVersion string `json:"apiVersion"` - VersionGitTag string `json:"gitTag"` + Version string + APIVersion string + VersionGitTag string + Options Options + FileConf FileConfig + Log *logrus.Logger + LogVerboseOut io.Writer +} - // Private / un-exported fields - HTTPPort string `json:"-"` - FileConf *FileConfig `json:"-"` - Log *logrus.Logger `json:"-"` +// Options set at the command line +type Options struct { + ConfigFile string + LogLevel string + LogFile string } // Config default values @@ -32,39 +40,75 @@ const ( func Init(ctx *cli.Context, log *logrus.Logger) (*Config, error) { var err error + defaultWebAppDir := "${EXEPATH}/www" + defaultSTHomeDir := "${HOME}/.xds/agent/syncthing-config" + // Define default configuration c := Config{ Version: ctx.App.Metadata["version"].(string), APIVersion: DefaultAPIVersion, VersionGitTag: ctx.App.Metadata["git-tag"].(string), - HTTPPort: "8010", - FileConf: &FileConfig{ - LogsDir: "/tmp/logs", + Options: Options{ + ConfigFile: ctx.GlobalString("config"), + LogLevel: ctx.GlobalString("log"), + LogFile: ctx.GlobalString("logfile"), + }, + + FileConf: FileConfig{ + HTTPPort: "8800", + WebAppDir: defaultWebAppDir, + LogsDir: "/tmp/logs", + // SEB XDSAPIKey: "1234abcezam", + ServersConf: []XDSServerConf{ + XDSServerConf{ + URL: "http://localhost:8000", + ConnRetry: 10, + }, + }, SThgConf: &SyncThingConf{ - Home: "${HOME}/.xds/agent/syncthing-config", + Home: defaultSTHomeDir, }, }, Log: log, } // config file settings overwrite default config - c.FileConf, err = updateConfigFromFile(&c, ctx.GlobalString("config")) + err = readGlobalConfig(&c, c.Options.ConfigFile) if err != nil { return nil, err } + // Handle where Logs are redirected: + // default 'stdout' (logfile option default value) + // else use file (or filepath) set by --logfile option + // that may be overwritten by LogsDir field of config file + logF := c.Options.LogFile + logD := c.FileConf.LogsDir + if logF != "stdout" { + if logD != "" { + lf := filepath.Base(logF) + if lf == "" || lf == "." { + lf = "xds-agent.log" + } + logF = filepath.Join(logD, lf) + } else { + logD = filepath.Dir(logF) + } + } + if logD == "" || logD == "." { + logD = "/tmp/xds/logs" + } + c.Options.LogFile = logF + c.FileConf.LogsDir = logD + if c.FileConf.LogsDir != "" && !common.Exists(c.FileConf.LogsDir) { if err := os.MkdirAll(c.FileConf.LogsDir, 0770); err != nil { return nil, fmt.Errorf("Cannot create logs dir: %v", err) } } + c.Log.Infoln("Logs file: ", c.Options.LogFile) c.Log.Infoln("Logs directory: ", c.FileConf.LogsDir) return &c, nil } - -// UpdateAll Update the current configuration -func (c *Config) UpdateAll(newCfg Config) error { - return fmt.Errorf("Not Supported") -} diff --git a/lib/xdsconfig/configfile.go b/lib/xdsconfig/configfile.go new file mode 100644 index 0000000..a47038b --- /dev/null +++ b/lib/xdsconfig/configfile.go @@ -0,0 +1,112 @@ +package xdsconfig + +import ( + "encoding/json" + "os" + "path" + + common "github.com/iotbzh/xds-common/golib" +) + +type SyncThingConf struct { + BinDir string `json:"binDir"` + Home string `json:"home"` + GuiAddress string `json:"gui-address"` + GuiAPIKey string `json:"gui-apikey"` +} + +type XDSServerConf struct { + URL string `json:"url"` + ConnRetry int `json:"connRetry"` + + // private/not exported fields + ID string `json:"-"` + APIBaseURL string `json:"-"` + APIPartialURL string `json:"-"` +} + +type FileConfig struct { + HTTPPort string `json:"httpPort"` + WebAppDir string `json:"webAppDir"` + LogsDir string `json:"logsDir"` + // SEB A SUP ? XDSAPIKey string `json:"xds-apikey"` + ServersConf []XDSServerConf `json:"xdsServers"` + SThgConf *SyncThingConf `json:"syncthing"` +} + +// readGlobalConfig reads configuration from a config file. +// Order to determine which config file is used: +// 1/ from command line option: "--config myConfig.json" +// 2/ $HOME/.xds/agent/agent-config.json file +// 3/ /agent-config.json file +// 4/ /agent-config.json file + +func readGlobalConfig(c *Config, confFile string) error { + + searchIn := make([]string, 0, 3) + if confFile != "" { + searchIn = append(searchIn, confFile) + } + if homeDir := common.GetUserHome(); homeDir != "" { + searchIn = append(searchIn, path.Join(homeDir, ".xds", "agent", "agent-config.json")) + } + + searchIn = append(searchIn, "/etc/xds-agent/agent-config.json") + + searchIn = append(searchIn, path.Join(common.GetExePath(), "agent-config.json")) + + var cFile *string + for _, p := range searchIn { + if _, err := os.Stat(p); err == nil { + cFile = &p + break + } + } + if cFile == nil { + c.Log.Infof("No config file found") + return nil + } + + c.Log.Infof("Use config file: %s", *cFile) + + // TODO move on viper package to support comments in JSON and also + // bind with flags (command line options) + // see https://github.com/spf13/viper#working-with-flags + + fd, _ := os.Open(*cFile) + defer fd.Close() + + // Decode config file content and save it in a first variable + fCfg := FileConfig{} + if err := json.NewDecoder(fd).Decode(&fCfg); err != nil { + return err + } + + // Decode config file content and overwrite default settings + fd.Seek(0, 0) + json.NewDecoder(fd).Decode(&c.FileConf) + + // Disable Syncthing support when there is no syncthing field in config + if fCfg.SThgConf == nil { + c.FileConf.SThgConf = nil + } + + // Support environment variables (IOW ${MY_ENV_VAR} syntax) in agent-config.json + vars := []*string{ + &c.FileConf.LogsDir, + &c.FileConf.WebAppDir, + } + if c.FileConf.SThgConf != nil { + vars = append(vars, &c.FileConf.SThgConf.Home, + &c.FileConf.SThgConf.BinDir) + } + for _, field := range vars { + var err error + *field, err = common.ResolveEnvVar(*field) + if err != nil { + return err + } + } + + return nil +} diff --git a/lib/xdsconfig/fileconfig.go b/lib/xdsconfig/fileconfig.go deleted file mode 100644 index efe94bf..0000000 --- a/lib/xdsconfig/fileconfig.go +++ /dev/null @@ -1,111 +0,0 @@ -package xdsconfig - -import ( - "encoding/json" - "os" - "os/user" - "path" - "path/filepath" - - common "github.com/iotbzh/xds-common/golib" -) - -type SyncThingConf struct { - BinDir string `json:"binDir"` - Home string `json:"home"` - GuiAddress string `json:"gui-address"` - GuiAPIKey string `json:"gui-apikey"` -} - -type FileConfig struct { - HTTPPort string `json:"httpPort"` - LogsDir string `json:"logsDir"` - XDSAPIKey string `json:"xds-apikey"` - SThgConf *SyncThingConf `json:"syncthing"` -} - -// getConfigFromFile reads configuration from a config file. -// Order to determine which config file is used: -// 1/ from command line option: "--config myConfig.json" -// 2/ $HOME/.xds/agent/agent-config.json file -// 3/ /agent-config.json file -// 4/ /agent-config.json file - -func updateConfigFromFile(c *Config, confFile string) (*FileConfig, error) { - - searchIn := make([]string, 0, 3) - if confFile != "" { - searchIn = append(searchIn, confFile) - } - if usr, err := user.Current(); err == nil { - searchIn = append(searchIn, path.Join(usr.HomeDir, ".xds", "agent", "agent-config.json")) - } - - searchIn = append(searchIn, "/etc/xds-agent/agent-config.json") - - exePath := os.Args[0] - ee, _ := os.Executable() - exeAbsPath, err := filepath.Abs(ee) - if err == nil { - exePath, err = filepath.EvalSymlinks(exeAbsPath) - if err == nil { - exePath = filepath.Dir(ee) - } else { - exePath = filepath.Dir(exeAbsPath) - } - } - searchIn = append(searchIn, path.Join(exePath, "agent-config.json")) - - var cFile *string - for _, p := range searchIn { - if _, err := os.Stat(p); err == nil { - cFile = &p - break - } - } - // Use default settings - fCfg := *c.FileConf - - // Read config file when existing - if cFile != nil { - c.Log.Infof("Use config file: %s", *cFile) - - // TODO move on viper package to support comments in JSON and also - // bind with flags (command line options) - // see https://github.com/spf13/viper#working-with-flags - - fd, _ := os.Open(*cFile) - defer fd.Close() - if err := json.NewDecoder(fd).Decode(&fCfg); err != nil { - return nil, err - } - } - - // Support environment variables (IOW ${MY_ENV_VAR} syntax) in agent-config.json - vars := []*string{ - &fCfg.LogsDir, - } - if fCfg.SThgConf != nil { - vars = append(vars, &fCfg.SThgConf.Home, &fCfg.SThgConf.BinDir) - } - for _, field := range vars { - var err error - *field, err = common.ResolveEnvVar(*field) - if err != nil { - return nil, err - } - } - - // Config file settings overwrite default config - if fCfg.HTTPPort != "" { - c.HTTPPort = fCfg.HTTPPort - } - - // Set default apikey - // FIXME - rework with dynamic key - if fCfg.XDSAPIKey == "" { - fCfg.XDSAPIKey = "1234abcezam" - } - - return &fCfg, nil -} -- cgit 1.2.3-korg