diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/agent/agent.go | 154 | ||||
-rw-r--r-- | lib/agent/apiv1-browse.go | 28 | ||||
-rw-r--r-- | lib/agent/apiv1-config.go | 108 | ||||
-rw-r--r-- | lib/agent/apiv1-events.go | 73 | ||||
-rw-r--r-- | lib/agent/apiv1-exec.go | 131 | ||||
-rw-r--r-- | lib/agent/apiv1-projects.go | 72 | ||||
-rw-r--r-- | lib/agent/apiv1-version.go | 45 | ||||
-rw-r--r-- | lib/agent/apiv1.go | 129 | ||||
-rw-r--r-- | lib/agent/events.go | 132 | ||||
-rw-r--r-- | lib/agent/project-interface.go | 43 | ||||
-rw-r--r-- | lib/agent/project-pathmap.go | 123 | ||||
-rw-r--r-- | lib/agent/project-st.go | 186 | ||||
-rw-r--r-- | lib/agent/projects.go | 241 | ||||
-rw-r--r-- | lib/agent/session.go (renamed from lib/session/session.go) | 31 | ||||
-rw-r--r-- | lib/agent/webserver.go | 244 | ||||
-rw-r--r-- | lib/agent/xdsserver.go | 605 | ||||
-rw-r--r-- | lib/apiv1/apiv1.go | 36 | ||||
-rw-r--r-- | lib/apiv1/config.go | 45 | ||||
-rw-r--r-- | lib/apiv1/version.go | 24 | ||||
-rw-r--r-- | lib/syncthing/st.go | 100 | ||||
-rw-r--r-- | lib/syncthing/stEvent.go | 265 | ||||
-rw-r--r-- | lib/syncthing/stfolder.go | 152 | ||||
-rw-r--r-- | lib/webserver/server.go | 226 | ||||
-rw-r--r-- | lib/xdsconfig/config.go | 88 | ||||
-rw-r--r-- | lib/xdsconfig/configfile.go | 109 | ||||
-rw-r--r-- | lib/xdsconfig/fileconfig.go | 96 |
26 files changed, 2974 insertions, 512 deletions
diff --git a/lib/agent/agent.go b/lib/agent/agent.go index 74872f7..3bdd89f 100644 --- a/lib/agent/agent.go +++ b/lib/agent/agent.go @@ -2,28 +2,39 @@ package agent import ( "fmt" + "log" "os" "os/exec" "os/signal" + "path/filepath" "syscall" + "time" "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" "github.com/iotbzh/xds-agent/lib/syncthing" "github.com/iotbzh/xds-agent/lib/xdsconfig" - "github.com/iotbzh/xds-agent/lib/webserver" ) +const cookieMaxAge = "3600" + // Context holds the Agent context structure type Context struct { - ProgName string - Config *xdsconfig.Config - Log *logrus.Logger - SThg *st.SyncThing - SThgCmd *exec.Cmd - SThgInotCmd *exec.Cmd - WWWServer *webserver.ServerService - Exit chan os.Signal + ProgName string + Config *xdsconfig.Config + Log *logrus.Logger + LogLevelSilly bool + SThg *st.SyncThing + SThgCmd *exec.Cmd + SThgInotCmd *exec.Cmd + + webServer *WebServer + xdsServers map[string]*XdsServer + sessions *Sessions + events *Events + projects *Projects + + Exit chan os.Signal } // NewAgent Create a new instance of Agent @@ -43,11 +54,18 @@ func NewAgent(cliCtx *cli.Context) *Context { } log.Formatter = &logrus.TextFormatter{} + sillyVal, sillyLog := os.LookupEnv("XDS_LOG_SILLY") + // Define default configuration ctx := Context{ - ProgName: cliCtx.App.Name, - Log: log, - Exit: make(chan os.Signal, 1), + ProgName: cliCtx.App.Name, + Log: log, + LogLevelSilly: (sillyLog && sillyVal == "1"), + Exit: make(chan os.Signal, 1), + + webServer: nil, + xdsServers: make(map[string]*XdsServer), + events: nil, } // register handler on SIGTERM / exit @@ -57,6 +75,114 @@ func NewAgent(cliCtx *cli.Context) *Context { return &ctx } +// Run Main function called to run agent +func (ctx *Context) Run() (int, error) { + var err error + + // Logs redirected into a file when logfile option or logsDir config is set + ctx.Config.LogVerboseOut = os.Stderr + if ctx.Config.FileConf.LogsDir != "" { + if ctx.Config.Options.LogFile != "stdout" { + logFile := ctx.Config.Options.LogFile + + fdL, err := os.OpenFile(logFile, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0666) + if err != nil { + msgErr := fmt.Errorf("Cannot create log file %s", logFile) + return int(syscall.EPERM), msgErr + } + ctx.Log.Out = fdL + + ctx._logPrint("Logging file: %s\n", logFile) + } + + logFileHTTPReq := filepath.Join(ctx.Config.FileConf.LogsDir, "xds-agent-verbose.log") + fdLH, err := os.OpenFile(logFileHTTPReq, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0666) + if err != nil { + msgErr := fmt.Errorf("Cannot create log file %s", logFileHTTPReq) + return int(syscall.EPERM), msgErr + } + ctx.Config.LogVerboseOut = fdLH + + ctx._logPrint("Logging file for HTTP requests: %s\n", logFileHTTPReq) + } + + // Create syncthing instance when section "syncthing" is present in config.json + if ctx.Config.FileConf.SThgConf != nil { + ctx.SThg = st.NewSyncThing(ctx.Config, ctx.Log) + } + + // Start local instance of Syncthing and Syncthing-notify + if ctx.SThg != nil { + ctx.Log.Infof("Starting Syncthing...") + ctx.SThgCmd, err = ctx.SThg.Start() + if err != nil { + return 2, err + } + fmt.Printf("Syncthing started (PID %d)\n", ctx.SThgCmd.Process.Pid) + + ctx.Log.Infof("Starting Syncthing-inotify...") + ctx.SThgInotCmd, err = ctx.SThg.StartInotify() + if err != nil { + return 2, err + } + fmt.Printf("Syncthing-inotify started (PID %d)\n", ctx.SThgInotCmd.Process.Pid) + + // Establish connection with local Syncthing (retry if connection fail) + time.Sleep(3 * time.Second) + maxRetry := 30 + retry := maxRetry + for retry > 0 { + if err := ctx.SThg.Connect(); err == nil { + break + } + ctx.Log.Infof("Establishing connection to Syncthing (retry %d/%d)", retry, maxRetry) + time.Sleep(time.Second) + retry-- + } + if err != nil || retry == 0 { + return 2, err + } + + // Retrieve Syncthing config + id, err := ctx.SThg.IDGet() + if err != nil { + return 2, err + } + ctx.Log.Infof("Local Syncthing ID: %s", id) + + } else { + ctx.Log.Infof("Cloud Sync / Syncthing not supported") + } + + // Create Web Server + ctx.webServer = NewWebServer(ctx) + + // Sessions manager + ctx.sessions = NewClientSessions(ctx, cookieMaxAge) + + // Create events management + ctx.events = NewEvents(ctx) + + // Create projects management + ctx.projects = NewProjects(ctx, ctx.SThg) + + // Run Web Server until exit requested (blocking call) + if err = ctx.webServer.Serve(); err != nil { + log.Println(err) + return 3, err + } + + return 4, fmt.Errorf("Program exited") +} + +// Helper function to log message on both stdout and logger +func (ctx *Context) _logPrint(format string, args ...interface{}) { + fmt.Printf(format, args...) + if ctx.Log.Out != os.Stdout { + ctx.Log.Infof(format, args...) + } +} + // Handle exit and properly stop/close all stuff func handlerSigTerm(ctx *Context) { <-ctx.Exit @@ -68,9 +194,9 @@ func handlerSigTerm(ctx *Context) { ctx.SThg.Stop() ctx.SThg.StopInotify() } - if ctx.WWWServer != nil { + if ctx.webServer != nil { ctx.Log.Infof("Stoping Web server...") - ctx.WWWServer.Stop() + ctx.webServer.Stop() } os.Exit(1) } diff --git a/lib/agent/apiv1-browse.go b/lib/agent/apiv1-browse.go new file mode 100644 index 0000000..1701a2e --- /dev/null +++ b/lib/agent/apiv1-browse.go @@ -0,0 +1,28 @@ +package agent + +import ( + "net/http" + + "github.com/gin-gonic/gin" +) + +type directory struct { + Name string `json:"name"` + Fullpath string `json:"fullpath"` +} + +type apiDirectory struct { + Dir []directory `json:"dir"` +} + +// browseFS used to browse local file system +func (s *APIService) browseFS(c *gin.Context) { + + response := apiDirectory{ + Dir: []directory{ + directory{Name: "TODO SEB"}, + }, + } + + c.JSON(http.StatusOK, response) +} diff --git a/lib/agent/apiv1-config.go b/lib/agent/apiv1-config.go new file mode 100644 index 0000000..31d8de6 --- /dev/null +++ b/lib/agent/apiv1-config.go @@ -0,0 +1,108 @@ +package agent + +import ( + "net/http" + "sync" + + "github.com/gin-gonic/gin" + "github.com/iotbzh/xds-agent/lib/xdsconfig" + common "github.com/iotbzh/xds-common/golib" +) + +var confMut sync.Mutex + +// APIConfig parameters (json format) of /config command +type APIConfig struct { + Servers []ServerCfg `json:"servers"` + + // Not exposed outside in JSON + Version string `json:"-"` + APIVersion string `json:"-"` + VersionGitTag string `json:"-"` +} + +// ServerCfg . +type ServerCfg struct { + ID string `json:"id"` + URL string `json:"url"` + APIURL string `json:"apiUrl"` + PartialURL string `json:"partialUrl"` + ConnRetry int `json:"connRetry"` + Connected bool `json:"connected"` + Disabled bool `json:"disabled"` +} + +// GetConfig returns the configuration +func (s *APIService) getConfig(c *gin.Context) { + confMut.Lock() + defer confMut.Unlock() + + cfg := s._getConfig() + + c.JSON(http.StatusOK, cfg) +} + +// SetConfig sets configuration +func (s *APIService) setConfig(c *gin.Context) { + var cfgArg APIConfig + if c.BindJSON(&cfgArg) != nil { + common.APIError(c, "Invalid arguments") + return + } + + confMut.Lock() + defer confMut.Unlock() + + s.Log.Debugln("SET config: ", cfgArg) + + // First delete/disable XDS Server that are no longer listed + for _, svr := range s.xdsServers { + found := false + for _, svrArg := range cfgArg.Servers { + if svr.ID == svrArg.ID { + found = true + break + } + } + if !found { + s.DelXdsServer(svr.ID) + } + } + + // Add new XDS Server + for _, svr := range cfgArg.Servers { + cfg := xdsconfig.XDSServerConf{ + ID: svr.ID, + URL: svr.URL, + ConnRetry: svr.ConnRetry, + } + if _, err := s.AddXdsServer(cfg); err != nil { + common.APIError(c, err.Error()) + return + } + } + + c.JSON(http.StatusOK, s._getConfig()) +} + +func (s *APIService) _getConfig() APIConfig { + cfg := APIConfig{ + Version: s.Config.Version, + APIVersion: s.Config.APIVersion, + VersionGitTag: s.Config.VersionGitTag, + Servers: []ServerCfg{}, + } + + for _, svr := range s.xdsServers { + cfg.Servers = append(cfg.Servers, ServerCfg{ + ID: svr.ID, + URL: svr.BaseURL, + APIURL: svr.APIURL, + PartialURL: svr.PartialURL, + ConnRetry: svr.ConnRetry, + Connected: svr.Connected, + Disabled: svr.Disabled, + }) + } + return cfg +} diff --git a/lib/agent/apiv1-events.go b/lib/agent/apiv1-events.go new file mode 100644 index 0000000..8aad18a --- /dev/null +++ b/lib/agent/apiv1-events.go @@ -0,0 +1,73 @@ +package agent + +import ( + "net/http" + + "github.com/gin-gonic/gin" + common "github.com/iotbzh/xds-common/golib" +) + +// EventRegisterArgs is the parameters (json format) of /events/register command +type EventRegisterArgs struct { + Name string `json:"name"` + ProjectID string `json:"filterProjectID"` +} + +// EventUnRegisterArgs is the parameters (json format) of /events/unregister command +type EventUnRegisterArgs struct { + Name string `json:"name"` + ID int `json:"id"` +} + +// eventsList Registering for events that will be send over a WS +func (s *APIService) eventsList(c *gin.Context) { + c.JSON(http.StatusOK, s.events.GetList()) +} + +// eventsRegister Registering for events that will be send over a WS +func (s *APIService) eventsRegister(c *gin.Context) { + var args EventRegisterArgs + + if c.BindJSON(&args) != nil || args.Name == "" { + common.APIError(c, "Invalid arguments") + return + } + + sess := s.webServer.sessions.Get(c) + if sess == nil { + common.APIError(c, "Unknown sessions") + return + } + + // Register to all or to a specific events + if err := s.events.Register(args.Name, sess.ID); err != nil { + common.APIError(c, err.Error()) + return + } + + c.JSON(http.StatusOK, gin.H{"status": "OK"}) +} + +// eventsRegister Registering for events that will be send over a WS +func (s *APIService) eventsUnRegister(c *gin.Context) { + var args EventUnRegisterArgs + + if c.BindJSON(&args) != nil || args.Name == "" { + common.APIError(c, "Invalid arguments") + return + } + + sess := s.webServer.sessions.Get(c) + if sess == nil { + common.APIError(c, "Unknown sessions") + return + } + + // Register to all or to a specific events + if err := s.events.UnRegister(args.Name, sess.ID); err != nil { + common.APIError(c, err.Error()) + return + } + + c.JSON(http.StatusOK, gin.H{"status": "OK"}) +} diff --git a/lib/agent/apiv1-exec.go b/lib/agent/apiv1-exec.go new file mode 100644 index 0000000..37070f7 --- /dev/null +++ b/lib/agent/apiv1-exec.go @@ -0,0 +1,131 @@ +package agent + +import ( + "encoding/json" + "io/ioutil" + "net/http" + + "github.com/gin-gonic/gin" + common "github.com/iotbzh/xds-common/golib" + uuid "github.com/satori/go.uuid" +) + +// ExecArgs Only define used fields +type ExecArgs struct { + ID string `json:"id" binding:"required"` + CmdID string `json:"cmdID"` // command unique ID +} + +var execCmdID = 1 + +// ExecCmd executes remotely a command +func (s *APIService) execCmd(c *gin.Context) { + s._execRequest("/exec", c) +} + +// execSignalCmd executes remotely a command +func (s *APIService) execSignalCmd(c *gin.Context) { + s._execRequest("/signal", c) +} + +func (s *APIService) _execRequest(cmd string, c *gin.Context) { + data, err := c.GetRawData() + if err != nil { + common.APIError(c, err.Error()) + } + + args := ExecArgs{} + // XXX - we cannot use c.BindJSON, so directly unmarshall it + // (see https://github.com/gin-gonic/gin/issues/1078) + if err := json.Unmarshal(data, &args); err != nil { + common.APIError(c, "Invalid arguments") + return + } + + // First get Project ID to retrieve Server ID and send command to right server + id := c.Param("id") + if id == "" { + id = args.ID + } + + prj := s.projects.Get(id) + if prj == nil { + common.APIError(c, "Unknown id") + return + } + + svr := (*prj).GetServer() + if svr == nil { + common.APIError(c, "Cannot identify XDS Server") + return + } + + // Retrieve session info + sess := s.sessions.Get(c) + if sess == nil { + common.APIError(c, "Unknown sessions") + return + } + sock := sess.IOSocket + if sock == nil { + common.APIError(c, "Websocket not established") + return + } + + // Forward XDSServer WS events to client WS + // TODO removed static event name list and get it from XDSServer + evtList := []string{ + "exec:input", + "exec:output", + "exec:inferior-input", + "exec:inferior-output", + } + so := *sock + fwdFuncID := []uuid.UUID{} + for _, evName := range evtList { + evN := evName + fwdFunc := func(evData interface{}) { + // Forward event to Client/Dashboard + so.Emit(evN, evData) + } + id, err := svr.EventOn(evN, fwdFunc) + if err != nil { + common.APIError(c, err.Error()) + return + } + fwdFuncID = append(fwdFuncID, id) + } + + // Handle Exit event separately to cleanup registered listener + var exitFuncID uuid.UUID + exitFunc := func(evData interface{}) { + so.Emit("exec:exit", evData) + + // cleanup listener + for i, evName := range evtList { + svr.EventOff(evName, fwdFuncID[i]) + } + svr.EventOff("exec:exit", exitFuncID) + } + exitFuncID, err = svr.EventOn("exec:exit", exitFunc) + if err != nil { + common.APIError(c, err.Error()) + return + } + + // Forward back command to right server + response, err := svr.SendCommand(cmd, data) + if err != nil { + common.APIError(c, err.Error()) + return + } + + // Decode response + body, err := ioutil.ReadAll(response.Body) + if err != nil { + common.APIError(c, "Cannot read response body") + return + } + c.JSON(http.StatusOK, string(body)) + +} diff --git a/lib/agent/apiv1-projects.go b/lib/agent/apiv1-projects.go new file mode 100644 index 0000000..d4b5e74 --- /dev/null +++ b/lib/agent/apiv1-projects.go @@ -0,0 +1,72 @@ +package agent + +import ( + "net/http" + + "github.com/gin-gonic/gin" + common "github.com/iotbzh/xds-common/golib" +) + +// getProjects returns all projects configuration +func (s *APIService) getProjects(c *gin.Context) { + c.JSON(http.StatusOK, s.projects.GetProjectArr()) +} + +// getProject returns a specific project configuration +func (s *APIService) getProject(c *gin.Context) { + prj := s.projects.Get(c.Param("id")) + if prj == nil { + common.APIError(c, "Invalid id") + return + } + + c.JSON(http.StatusOK, (*prj).GetProject()) +} + +// addProject adds a new project to server config +func (s *APIService) addProject(c *gin.Context) { + var cfgArg ProjectConfig + if c.BindJSON(&cfgArg) != nil { + common.APIError(c, "Invalid arguments") + return + } + + s.Log.Debugln("Add project config: ", cfgArg) + + newFld, err := s.projects.Add(cfgArg) + if err != nil { + common.APIError(c, err.Error()) + return + } + + c.JSON(http.StatusOK, newFld) +} + +// syncProject force synchronization of project files +func (s *APIService) syncProject(c *gin.Context) { + id := c.Param("id") + + s.Log.Debugln("Sync project id: ", id) + + err := s.projects.ForceSync(id) + if err != nil { + common.APIError(c, err.Error()) + return + } + + c.JSON(http.StatusOK, "") +} + +// delProject deletes project from server config +func (s *APIService) delProject(c *gin.Context) { + id := c.Param("id") + + s.Log.Debugln("Delete project id ", id) + + delEntry, err := s.projects.Delete(id) + if err != nil { + common.APIError(c, err.Error()) + return + } + c.JSON(http.StatusOK, delEntry) +} diff --git a/lib/agent/apiv1-version.go b/lib/agent/apiv1-version.go new file mode 100644 index 0000000..6b4923f --- /dev/null +++ b/lib/agent/apiv1-version.go @@ -0,0 +1,45 @@ +package agent + +import ( + "net/http" + + "github.com/gin-gonic/gin" + common "github.com/iotbzh/xds-common/golib" +) + +type version struct { + ID string `json:"id"` + Version string `json:"version"` + APIVersion string `json:"apiVersion"` + VersionGitTag string `json:"gitTag"` +} + +type apiVersion struct { + Client version `json:"client"` + Server []version `json:"servers"` +} + +// getInfo : return various information about server +func (s *APIService) getVersion(c *gin.Context) { + response := apiVersion{ + Client: version{ + ID: "", + Version: s.Config.Version, + APIVersion: s.Config.APIVersion, + VersionGitTag: s.Config.VersionGitTag, + }, + } + + svrVer := []version{} + for _, svr := range s.xdsServers { + res := version{} + if err := svr.GetVersion(&res); err != nil { + common.APIError(c, "Cannot retrieve version of XDS server ID %s : %v", svr.ID, err.Error()) + return + } + svrVer = append(svrVer, res) + } + response.Server = svrVer + + c.JSON(http.StatusOK, response) +} diff --git a/lib/agent/apiv1.go b/lib/agent/apiv1.go new file mode 100644 index 0000000..77b05ba --- /dev/null +++ b/lib/agent/apiv1.go @@ -0,0 +1,129 @@ +package agent + +import ( + "fmt" + "strconv" + + "github.com/gin-gonic/gin" + "github.com/iotbzh/xds-agent/lib/xdsconfig" +) + +const apiBaseUrl = "/api/v1" + +// APIService . +type APIService struct { + *Context + apiRouter *gin.RouterGroup + serverIndex int +} + +// NewAPIV1 creates a new instance of API service +func NewAPIV1(ctx *Context) *APIService { + s := &APIService{ + Context: ctx, + apiRouter: ctx.webServer.router.Group(apiBaseUrl), + serverIndex: 0, + } + + s.apiRouter.GET("/version", s.getVersion) + + s.apiRouter.GET("/config", s.getConfig) + s.apiRouter.POST("/config", s.setConfig) + + s.apiRouter.GET("/browse", s.browseFS) + + s.apiRouter.GET("/projects", s.getProjects) + s.apiRouter.GET("/project/:id", s.getProject) + s.apiRouter.POST("/project", s.addProject) + s.apiRouter.POST("/project/sync/:id", s.syncProject) + s.apiRouter.DELETE("/project/:id", s.delProject) + + s.apiRouter.POST("/exec", s.execCmd) + s.apiRouter.POST("/exec/:id", s.execCmd) + s.apiRouter.POST("/signal", s.execSignalCmd) + + s.apiRouter.GET("/events", s.eventsList) + s.apiRouter.POST("/events/register", s.eventsRegister) + s.apiRouter.POST("/events/unregister", s.eventsUnRegister) + + return s +} + +// Stop Used to stop/close created services +func (s *APIService) Stop() { + for _, svr := range s.xdsServers { + svr.Close() + } +} + +// AddXdsServer Add a new XDS Server to the list of a server +func (s *APIService) AddXdsServer(cfg xdsconfig.XDSServerConf) (*XdsServer, error) { + var svr *XdsServer + var exist, tempoID bool + tempoID = false + + // First check if not already exist and update it + if svr, exist = s.xdsServers[cfg.ID]; exist { + + // Update: Found, so just update some settings + svr.ConnRetry = cfg.ConnRetry + + tempoID = svr.IsTempoID() + if svr.Connected && !svr.Disabled && svr.BaseURL == cfg.URL && tempoID { + return svr, nil + } + + // URL differ or not connected, so need to reconnect + svr.BaseURL = cfg.URL + + } else { + + // Create a new server object + if cfg.APIBaseURL == "" { + cfg.APIBaseURL = apiBaseUrl + } + if cfg.APIPartialURL == "" { + cfg.APIPartialURL = "/server/" + strconv.Itoa(s.serverIndex) + s.serverIndex = s.serverIndex + 1 + } + + // Create a new XDS Server + svr = NewXdsServer(s.Context, cfg) + + svr.SetLoggerOutput(s.Config.LogVerboseOut) + + // Passthrough routes (handle by XDS Server) + grp := s.apiRouter.Group(svr.PartialURL) + svr.SetAPIRouterGroup(grp) + svr.PassthroughGet("/sdks") + svr.PassthroughGet("/sdk/:id") + } + + // Established connection + err := svr.Connect() + + // Delete temporary ID with it has been replaced by right Server ID + if tempoID && !svr.IsTempoID() { + delete(s.xdsServers, cfg.ID) + } + + // Add to map + s.xdsServers[svr.ID] = svr + + // Load projects + if err == nil && svr.Connected { + err = s.projects.Init(svr) + } + + return svr, err +} + +// DelXdsServer Delete an XDS Server from the list of a server +func (s *APIService) DelXdsServer(id string) error { + if _, exist := s.xdsServers[id]; !exist { + return fmt.Errorf("Unknown Server ID %s", id) + } + // Don't really delete, just disable it + s.xdsServers[id].Close() + return nil +} diff --git a/lib/agent/events.go b/lib/agent/events.go new file mode 100644 index 0000000..e66f758 --- /dev/null +++ b/lib/agent/events.go @@ -0,0 +1,132 @@ +package agent + +import ( + "fmt" + "time" +) + +// Events constants +const ( + // EventTypePrefix Used as event prefix + EventTypePrefix = "event:" // following by event type + + // Supported Events type + EVTAll = "all" + EVTServerConfig = "server-config" // data type ServerCfg + EVTProjectAdd = "project-add" // data type ProjectConfig + EVTProjectDelete = "project-delete" // data type ProjectConfig + EVTProjectChange = "project-state-change" // data type ProjectConfig +) + +var _EVTAllList = []string{ + EVTServerConfig, + EVTProjectAdd, + EVTProjectDelete, + EVTProjectChange, +} + +// EventMsg Message send +type EventMsg struct { + Time string `json:"time"` + Type string `json:"type"` + Data interface{} `json:"data"` +} + +type EventDef struct { + sids map[string]int +} + +type Events struct { + *Context + eventsMap map[string]*EventDef +} + +// NewEvents creates an instance of Events +func NewEvents(ctx *Context) *Events { + evMap := make(map[string]*EventDef) + for _, ev := range _EVTAllList { + evMap[ev] = &EventDef{ + sids: make(map[string]int), + } + } + return &Events{ + Context: ctx, + eventsMap: evMap, + } +} + +// GetList returns the list of all supported events +func (e *Events) GetList() []string { + return _EVTAllList +} + +// Register Used by a client/session to register to a specific (or all) event(s) +func (e *Events) Register(evName, sessionID string) error { + evs := _EVTAllList + if evName != EVTAll { + if _, ok := e.eventsMap[evName]; !ok { + return fmt.Errorf("Unsupported event type name") + } + evs = []string{evName} + } + for _, ev := range evs { + e.eventsMap[ev].sids[sessionID]++ + } + return nil +} + +// UnRegister Used by a client/session to unregister event(s) +func (e *Events) UnRegister(evName, sessionID string) error { + evs := _EVTAllList + if evName != EVTAll { + if _, ok := e.eventsMap[evName]; !ok { + return fmt.Errorf("Unsupported event type name") + } + evs = []string{evName} + } + for _, ev := range evs { + if _, exist := e.eventsMap[ev].sids[sessionID]; exist { + delete(e.eventsMap[ev].sids, sessionID) + break + } + } + return nil +} + +// Emit Used to manually emit an event +func (e *Events) Emit(evName string, data interface{}) error { + var firstErr error + + if _, ok := e.eventsMap[evName]; !ok { + return fmt.Errorf("Unsupported event type") + } + + if e.LogLevelSilly { + e.Log.Debugf("Emit Event %s: %v", evName, data) + } + + firstErr = nil + evm := e.eventsMap[evName] + for sid := range evm.sids { + so := e.webServer.sessions.IOSocketGet(sid) + if so == nil { + if firstErr == nil { + firstErr = fmt.Errorf("IOSocketGet return nil") + } + continue + } + msg := EventMsg{ + Time: time.Now().String(), + Type: evName, + Data: data, + } + if err := (*so).Emit(EventTypePrefix+evName, msg); err != nil { + e.Log.Errorf("WS Emit %v error : %v", EventTypePrefix+evName, err) + if firstErr == nil { + firstErr = err + } + } + } + + return firstErr +} diff --git a/lib/agent/project-interface.go b/lib/agent/project-interface.go new file mode 100644 index 0000000..0a4a17e --- /dev/null +++ b/lib/agent/project-interface.go @@ -0,0 +1,43 @@ +package agent + +// ProjectType definition +type ProjectType string + +const ( + TypePathMap = "PathMap" + TypeCloudSync = "CloudSync" + TypeCifsSmb = "CIFS" +) + +// Project Status definition +const ( + StatusErrorConfig = "ErrorConfig" + StatusDisable = "Disable" + StatusEnable = "Enable" + StatusPause = "Pause" + StatusSyncing = "Syncing" +) + +// IPROJECT Project interface +type IPROJECT interface { + Add(cfg ProjectConfig) (*ProjectConfig, error) // Add a new project + Delete() error // Delete a project + GetProject() *ProjectConfig // Get project public configuration + UpdateProject(prj ProjectConfig) (*ProjectConfig, error) // Update project configuration + GetServer() *XdsServer // Get XdsServer that holds this project + Sync() error // Force project files synchronization + IsInSync() (bool, error) // Check if project files are in-sync +} + +// ProjectConfig is the config for one project +type ProjectConfig struct { + ID string `json:"id"` + ServerID string `json:"serverId"` + Label string `json:"label"` + ClientPath string `json:"clientPath"` + ServerPath string `json:"serverPath"` + Type ProjectType `json:"type"` + Status string `json:"status"` + IsInSync bool `json:"isInSync"` + DefaultSdk string `json:"defaultSdk"` +} diff --git a/lib/agent/project-pathmap.go b/lib/agent/project-pathmap.go new file mode 100644 index 0000000..aacbd1f --- /dev/null +++ b/lib/agent/project-pathmap.go @@ -0,0 +1,123 @@ +package agent + +import ( + "fmt" + "io/ioutil" + "os" + "strings" + + common "github.com/iotbzh/xds-common/golib" +) + +// IPROJECT interface implementation for native/path mapping projects + +// PathMap . +type PathMap struct { + *Context + server *XdsServer + folder *XdsFolderConfig +} + +// NewProjectPathMap Create a new instance of PathMap +func NewProjectPathMap(ctx *Context, svr *XdsServer) *PathMap { + p := PathMap{ + Context: ctx, + server: svr, + folder: &XdsFolderConfig{}, + } + return &p +} + +// Add a new project +func (p *PathMap) Add(cfg ProjectConfig) (*ProjectConfig, error) { + var err error + var file *os.File + errMsg := "ClientPath sanity check error (%d): %v" + + // Sanity check to verify that we have RW permission and path-mapping is correct + dir := cfg.ClientPath + if !common.Exists(dir) { + // try to create if not existing + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, fmt.Errorf("Cannot create ClientPath directory: %s", dir) + } + } + if !common.Exists(dir) { + return nil, fmt.Errorf("ClientPath directory is not accessible: %s", dir) + } + if file, err = ioutil.TempFile(dir, ".xds_pathmap_check"); err != nil { + return nil, fmt.Errorf(errMsg, 1, err) + } + // Write a specific message that will be check by server during folder add + msg := "Pathmap checked message written by xds-agent ID: " + p.Config.AgentUID + "\n" + if n, err := file.WriteString(msg); n != len(msg) || err != nil { + return nil, fmt.Errorf(errMsg, 2, err) + } + defer func() { + if file != nil { + os.Remove(file.Name()) + file.Close() + } + }() + + // Convert to Xds folder + fld := p.server.ProjectToFolder(cfg) + fld.DataPathMap.CheckFile = file.Name() + fld.DataPathMap.CheckContent = msg + + // Send request to create folder on XDS server side + err = p.server.FolderAdd(fld, p.folder) + if err != nil { + return nil, fmt.Errorf("Folders mapping verification failure.\n%v", err) + } + + // 2nd part of sanity checker + // check specific message added by XDS Server during folder add processing + content, err := ioutil.ReadFile(file.Name()) + if err != nil { + return nil, fmt.Errorf(errMsg, 3, err) + } + if !strings.Contains(string(content), + "Pathmap checked message written by xds-server ID") { + return nil, fmt.Errorf(errMsg, 4, "file content differ") + } + + return p.GetProject(), nil +} + +// Delete a project +func (p *PathMap) Delete() error { + return p.server.FolderDelete(p.folder.ID) +} + +// GetProject Get public part of project config +func (p *PathMap) GetProject() *ProjectConfig { + prj := p.server.FolderToProject(*p.folder) + prj.ServerID = p.server.ID + return &prj +} + +// UpdateProject Set project config +func (p *PathMap) UpdateProject(prj ProjectConfig) (*ProjectConfig, error) { + p.folder = p.server.ProjectToFolder(prj) + np := p.GetProject() + if err := p.events.Emit(EVTProjectChange, np); err != nil { + return np, err + } + return np, nil +} + +// GetServer Get the XdsServer that holds this project +func (p *PathMap) GetServer() *XdsServer { + return p.server +} + +// Sync Force project files synchronization +func (p *PathMap) Sync() error { + return nil +} + +// IsInSync Check if project files are in-sync +func (p *PathMap) IsInSync() (bool, error) { + return true, nil +} diff --git a/lib/agent/project-st.go b/lib/agent/project-st.go new file mode 100644 index 0000000..c0d2550 --- /dev/null +++ b/lib/agent/project-st.go @@ -0,0 +1,186 @@ +package agent + +import ( + st "github.com/iotbzh/xds-agent/lib/syncthing" +) + +// IPROJECT interface implementation for syncthing projects + +// STProject . +type STProject struct { + *Context + server *XdsServer + folder *XdsFolderConfig + eventIDs []int +} + +// NewProjectST Create a new instance of STProject +func NewProjectST(ctx *Context, svr *XdsServer) *STProject { + p := STProject{ + Context: ctx, + server: svr, + folder: &XdsFolderConfig{}, + } + return &p +} + +// Add a new project +func (p *STProject) Add(cfg ProjectConfig) (*ProjectConfig, error) { + var err error + + // Add project/folder into XDS Server + err = p.server.FolderAdd(p.server.ProjectToFolder(cfg), p.folder) + if err != nil { + return nil, err + } + svrPrj := p.GetProject() + + // Declare project into local Syncthing + id, err := p.SThg.FolderChange(st.FolderChangeArg{ + ID: svrPrj.ID, + Label: svrPrj.Label, + RelativePath: cfg.ClientPath, + SyncThingID: p.server.ServerConfig.Builder.SyncThingID, + }) + if err != nil { + return nil, err + } + + locPrj, err := p.SThg.FolderConfigGet(id) + if err != nil { + svrPrj.Status = StatusErrorConfig + return nil, err + } + if svrPrj.ID != locPrj.ID { + p.Log.Errorf("Project ID in XDSServer and local ST differ: %s != %s", svrPrj.ID, locPrj.ID) + } + + // Use Update function to setup remains fields + return p.UpdateProject(*svrPrj) +} + +// Delete a project +func (p *STProject) Delete() error { + errSvr := p.server.FolderDelete(p.folder.ID) + errLoc := p.SThg.FolderDelete(p.folder.ID) + if errSvr != nil { + return errSvr + } + return errLoc +} + +// GetProject Get public part of project config +func (p *STProject) GetProject() *ProjectConfig { + prj := p.server.FolderToProject(*p.folder) + prj.ServerID = p.server.ID + return &prj +} + +// UpdateProject Update project config +func (p *STProject) UpdateProject(prj ProjectConfig) (*ProjectConfig, error) { + // Update folder + p.folder = p.server.ProjectToFolder(prj) + svrPrj := p.GetProject() + + // Register events to update folder status + // Register to XDS Server events + p.server.EventOn("event:FolderStateChanged", p._cbServerFolderChanged) + if err := p.server.EventRegister("FolderStateChanged", svrPrj.ID); err != nil { + p.Log.Warningf("XDS Server EventRegister failed: %v", err) + return svrPrj, err + } + + // Register to Local Syncthing events + for _, evName := range []string{st.EventStateChanged, st.EventFolderPaused} { + evID, err := p.SThg.Events.Register(evName, p._cbLocalSTEvents, svrPrj.ID, nil) + if err != nil { + return nil, err + } + p.eventIDs = append(p.eventIDs, evID) + } + + return svrPrj, nil +} + +// GetServer Get the XdsServer that holds this project +func (p *STProject) GetServer() *XdsServer { + return p.server +} + +// Sync Force project files synchronization +func (p *STProject) Sync() error { + if err := p.server.FolderSync(p.folder.ID); err != nil { + return err + } + return p.SThg.FolderScan(p.folder.ID, "") +} + +// IsInSync Check if project files are in-sync +func (p *STProject) IsInSync() (bool, error) { + // Should be up-to-date by callbacks (see below) + return p.folder.IsInSync, nil +} + +/** +** Private functions +***/ + +// callback use to update (XDS Server) folder IsInSync status + +func (p *STProject) _cbServerFolderChanged(data interface{}) { + evt := data.(XdsEventFolderChange) + + // Only process event that concerns this project/folder ID + if p.folder.ID != evt.Folder.ID { + return + } + + if evt.Folder.IsInSync != p.folder.DataCloudSync.STSvrIsInSync || + evt.Folder.Status != p.folder.DataCloudSync.STSvrStatus { + + p.folder.DataCloudSync.STSvrIsInSync = evt.Folder.IsInSync + p.folder.DataCloudSync.STSvrStatus = evt.Folder.Status + + if err := p.events.Emit(EVTProjectChange, p.server.FolderToProject(*p.folder)); err != nil { + p.Log.Warningf("Cannot notify project change: %v", err) + } + } +} + +// callback use to update IsInSync status +func (p *STProject) _cbLocalSTEvents(ev st.Event, data *st.EventsCBData) { + + inSync := p.folder.DataCloudSync.STLocIsInSync + sts := p.folder.DataCloudSync.STLocStatus + prevSync := inSync + prevStatus := sts + + switch ev.Type { + + case st.EventStateChanged: + to := ev.Data["to"] + switch to { + case "scanning", "syncing": + sts = StatusSyncing + case "idle": + sts = StatusEnable + } + inSync = (to == "idle") + + case st.EventFolderPaused: + if sts == StatusEnable { + sts = StatusPause + } + inSync = false + } + + if prevSync != inSync || prevStatus != sts { + + p.folder.DataCloudSync.STLocIsInSync = inSync + p.folder.DataCloudSync.STLocStatus = sts + + if err := p.events.Emit(EVTProjectChange, p.server.FolderToProject(*p.folder)); err != nil { + p.Log.Warningf("Cannot notify project change: %v", err) + } + } +} diff --git a/lib/agent/projects.go b/lib/agent/projects.go new file mode 100644 index 0000000..5e20395 --- /dev/null +++ b/lib/agent/projects.go @@ -0,0 +1,241 @@ +package agent + +import ( + "fmt" + "log" + "time" + + "github.com/iotbzh/xds-agent/lib/syncthing" + "github.com/syncthing/syncthing/lib/sync" +) + +// Projects Represent a an XDS Projects +type Projects struct { + *Context + SThg *st.SyncThing + projects map[string]*IPROJECT +} + +// Mutex to make add/delete atomic +var pjMutex = sync.NewMutex() + +// NewProjects Create a new instance of Project Model +func NewProjects(ctx *Context, st *st.SyncThing) *Projects { + return &Projects{ + Context: ctx, + SThg: st, + projects: make(map[string]*IPROJECT), + } +} + +// Init Load Projects configuration +func (p *Projects) Init(server *XdsServer) error { + svrList := make(map[string]*XdsServer) + // If server not set, load for all servers + if server == nil { + svrList = p.xdsServers + } else { + svrList[server.ID] = server + } + errMsg := "" + for _, svr := range svrList { + if svr.Disabled { + continue + } + xFlds := []XdsFolderConfig{} + if err := svr.GetFolders(&xFlds); err != nil { + errMsg += fmt.Sprintf("Cannot retrieve folders config of XDS server ID %s : %v \n", svr.ID, err.Error()) + continue + } + p.Log.Debugf("Connected to XDS Server %s, %d projects detected", svr.ID, len(xFlds)) + for _, prj := range xFlds { + newP := svr.FolderToProject(prj) + if _, err := p.createUpdate(newP, false, true); err != nil { + errMsg += "Error while creating project id " + prj.ID + ": " + err.Error() + "\n " + continue + } + } + } + + p.Log.Infof("Number of loaded Projects: %d", len(p.projects)) + + if errMsg != "" { + return fmt.Errorf(errMsg) + } + return nil +} + +// Get returns the folder config or nil if not existing +func (p *Projects) Get(id string) *IPROJECT { + if id == "" { + return nil + } + fc, exist := p.projects[id] + if !exist { + return nil + } + return fc +} + +// GetProjectArr returns the config of all folders as an array +func (p *Projects) GetProjectArr() []ProjectConfig { + pjMutex.Lock() + defer pjMutex.Unlock() + + return p.GetProjectArrUnsafe() +} + +// GetProjectArrUnsafe Same as GetProjectArr without mutex protection +func (p *Projects) GetProjectArrUnsafe() []ProjectConfig { + conf := []ProjectConfig{} + for _, v := range p.projects { + prj := (*v).GetProject() + conf = append(conf, *prj) + } + return conf +} + +// Add adds a new folder +func (p *Projects) Add(newF ProjectConfig) (*ProjectConfig, error) { + prj, err := p.createUpdate(newF, true, false) + if err != nil { + return prj, err + } + + // Notify client with event + if err := p.events.Emit(EVTProjectAdd, *prj); err != nil { + p.Log.Warningf("Cannot notify project deletion: %v", err) + } + + return prj, err +} + +// CreateUpdate creates or update a folder +func (p *Projects) createUpdate(newF ProjectConfig, create bool, initial bool) (*ProjectConfig, error) { + var err error + + pjMutex.Lock() + defer pjMutex.Unlock() + + // Sanity check + if _, exist := p.projects[newF.ID]; create && exist { + return nil, fmt.Errorf("ID already exists") + } + if newF.ClientPath == "" { + return nil, fmt.Errorf("ClientPath must be set") + } + if newF.ServerID == "" { + return nil, fmt.Errorf("Server ID must be set") + } + var svr *XdsServer + var exist bool + if svr, exist = p.xdsServers[newF.ServerID]; !exist { + return nil, fmt.Errorf("Unknown Server ID %s", newF.ServerID) + } + + // Check type supported + b, exist := svr.ServerConfig.SupportedSharing[string(newF.Type)] + if !exist || !b { + return nil, fmt.Errorf("Server doesn't support project type %s", newF.Type) + } + + // Create a new folder object + var fld IPROJECT + switch newF.Type { + // SYNCTHING + case TypeCloudSync: + if p.SThg != nil { + fld = NewProjectST(p.Context, svr) + } else { + return nil, fmt.Errorf("Cloud Sync project not supported") + } + + // PATH MAP + case TypePathMap: + fld = NewProjectPathMap(p.Context, svr) + default: + return nil, fmt.Errorf("Unsupported folder type") + } + + var newPrj *ProjectConfig + if create { + // Add project on server + if newPrj, err = fld.Add(newF); err != nil { + newF.Status = StatusErrorConfig + log.Printf("ERROR Adding project: %v\n", err) + return newPrj, err + } + } else { + // Just update project config + if newPrj, err = fld.UpdateProject(newF); err != nil { + newF.Status = StatusErrorConfig + log.Printf("ERROR Updating project: %v\n", err) + return newPrj, err + } + } + + // Sanity check + if newPrj.ID == "" { + log.Printf("ERROR project ID empty: %v", newF) + return newPrj, fmt.Errorf("Project ID empty") + } + + // Add to folders list + p.projects[newPrj.ID] = &fld + + // Force sync after creation + // (need to defer to be sure that WS events will arrive after HTTP creation reply) + go func() { + time.Sleep(time.Millisecond * 500) + fld.Sync() + }() + + return newPrj, nil +} + +// Delete deletes a specific folder +func (p *Projects) Delete(id string) (ProjectConfig, error) { + var err error + + pjMutex.Lock() + defer pjMutex.Unlock() + + fld := ProjectConfig{} + fc, exist := p.projects[id] + if !exist { + return fld, fmt.Errorf("unknown id") + } + + prj := (*fc).GetProject() + + if err = (*fc).Delete(); err != nil { + return *prj, err + } + + delete(p.projects, id) + + // Notify client with event + if err := p.events.Emit(EVTProjectDelete, *prj); err != nil { + p.Log.Warningf("Cannot notify project deletion: %v", err) + } + + return *prj, err +} + +// ForceSync Force the synchronization of a folder +func (p *Projects) ForceSync(id string) error { + fc := p.Get(id) + if fc == nil { + return fmt.Errorf("Unknown id") + } + return (*fc).Sync() +} + +// IsProjectInSync Returns true when folder is in sync +func (p *Projects) IsProjectInSync(id string) (bool, error) { + fc := p.Get(id) + if fc == nil { + return false, fmt.Errorf("Unknown id") + } + return (*fc).IsInSync() +} diff --git a/lib/session/session.go b/lib/agent/session.go index b56f9ff..06789d5 100644 --- a/lib/session/session.go +++ b/lib/agent/session.go @@ -1,11 +1,10 @@ -package session +package agent import ( "encoding/base64" "strconv" "time" - "github.com/Sirupsen/logrus" "github.com/gin-gonic/gin" "github.com/googollee/go-socket.io" uuid "github.com/satori/go.uuid" @@ -36,29 +35,27 @@ type ClientSession struct { // Sessions holds client sessions type Sessions struct { - router *gin.Engine + *Context cookieMaxAge int64 sessMap map[string]ClientSession mutex sync.Mutex - log *logrus.Logger stop chan struct{} // signals intentional stop } // NewClientSessions . -func NewClientSessions(router *gin.Engine, log *logrus.Logger, cookieMaxAge string) *Sessions { +func NewClientSessions(ctx *Context, cookieMaxAge string) *Sessions { ckMaxAge, err := strconv.ParseInt(cookieMaxAge, 10, 0) if err != nil { ckMaxAge = 0 } s := Sessions{ - router: router, + Context: ctx, cookieMaxAge: ckMaxAge, sessMap: make(map[string]ClientSession), mutex: sync.NewMutex(), - log: log, stop: make(chan struct{}), } - s.router.Use(s.Middleware()) + s.webServer.router.Use(s.Middleware()) // Start monitoring of sessions Map (use to manage expiration and cleanup) go s.monitorSessMap() @@ -174,7 +171,7 @@ func (s *Sessions) newSession(prefix string) *ClientSession { s.sessMap[se.ID] = se - s.log.Debugf("NEW session (%d): %s", len(s.sessMap), id) + s.Log.Debugf("NEW session (%d): %s", len(s.sessMap), id) return &se } @@ -197,27 +194,27 @@ func (s *Sessions) refresh(sid string) { } func (s *Sessions) monitorSessMap() { - const dbgFullTrace = false // for debugging - for { select { case <-s.stop: - s.log.Debugln("Stop monitorSessMap") + s.Log.Debugln("Stop monitorSessMap") return case <-time.After(sessionMonitorTime * time.Second): - if dbgFullTrace { - s.log.Debugf("Sessions Map size: %d", len(s.sessMap)) - s.log.Debugf("Sessions Map : %v", s.sessMap) + if s.LogLevelSilly { + s.Log.Debugf("Sessions Map size: %d", len(s.sessMap)) + s.Log.Debugf("Sessions Map : %v", s.sessMap) } if len(s.sessMap) > maxSessions { - s.log.Errorln("TOO MUCH sessions, cleanup old ones !") + s.Log.Errorln("TOO MUCH sessions, cleanup old ones !") } s.mutex.Lock() for _, ss := range s.sessMap { if ss.expireAt.Sub(time.Now()) < 0 { - s.log.Debugf("Delete expired session id: %s", ss.ID) + if s.LogLevelSilly { + s.Log.Debugf("Delete expired session id: %s", ss.ID) + } delete(s.sessMap, ss.ID) } } diff --git a/lib/agent/webserver.go b/lib/agent/webserver.go new file mode 100644 index 0000000..4b2e024 --- /dev/null +++ b/lib/agent/webserver.go @@ -0,0 +1,244 @@ +package agent + +import ( + "fmt" + "log" + "net/http" + "os" + "path" + + "github.com/Sirupsen/logrus" + "github.com/gin-contrib/static" + "github.com/gin-gonic/gin" + "github.com/googollee/go-socket.io" +) + +// WebServer . +type WebServer struct { + *Context + router *gin.Engine + api *APIService + sIOServer *socketio.Server + webApp *gin.RouterGroup + stop chan struct{} // signals intentional stop +} + +const indexFilename = "index.html" + +// NewWebServer creates an instance of WebServer +func NewWebServer(ctx *Context) *WebServer { + + // Setup logging for gin router + if ctx.Log.Level == logrus.DebugLevel { + gin.SetMode(gin.DebugMode) + } else { + gin.SetMode(gin.ReleaseMode) + } + + // Redirect gin logs into another logger (LogVerboseOut may be stderr or a file) + gin.DefaultWriter = ctx.Config.LogVerboseOut + gin.DefaultErrorWriter = ctx.Config.LogVerboseOut + log.SetOutput(ctx.Config.LogVerboseOut) + + // Creates gin router + r := gin.New() + + svr := &WebServer{ + Context: ctx, + router: r, + api: nil, + sIOServer: nil, + webApp: nil, + stop: make(chan struct{}), + } + + return svr +} + +// Serve starts a new instance of the Web Server +func (s *WebServer) Serve() error { + var err error + + // Setup middlewares + s.router.Use(gin.Logger()) + s.router.Use(gin.Recovery()) + s.router.Use(s.middlewareCORS()) + s.router.Use(s.middlewareXDSDetails()) + s.router.Use(s.middlewareCSRF()) + + // Create REST API + s.api = NewAPIV1(s.Context) + + // Create connections to XDS Servers + // XXX - not sure there is no side effect to do it in background ! + go func() { + for _, svrCfg := range s.Config.FileConf.ServersConf { + if svr, err := s.api.AddXdsServer(svrCfg); err != nil { + // Just log error, don't consider as critical + s.Log.Infof("Cannot connect to XDS Server url=%s: %v", svr.BaseURL, err.Error()) + } + } + }() + + // Websocket routes + s.sIOServer, err = socketio.NewServer(nil) + if err != nil { + s.Log.Fatalln(err) + } + + s.router.GET("/socket.io/", s.socketHandler) + s.router.POST("/socket.io/", s.socketHandler) + /* TODO: do we want to support ws://... ? + s.router.Handle("WS", "/socket.io/", s.socketHandler) + s.router.Handle("WSS", "/socket.io/", s.socketHandler) + */ + + // Web Application (serve on / ) + idxFile := path.Join(s.Config.FileConf.WebAppDir, indexFilename) + if _, err := os.Stat(idxFile); err != nil { + s.Log.Fatalln("Web app directory not found, check/use webAppDir setting in config file: ", idxFile) + } + s.Log.Infof("Serve WEB app dir: %s", s.Config.FileConf.WebAppDir) + s.router.Use(static.Serve("/", static.LocalFile(s.Config.FileConf.WebAppDir, true))) + s.webApp = s.router.Group("/", s.serveIndexFile) + { + s.webApp.GET("/") + } + + // Serve in the background + serveError := make(chan error, 1) + go func() { + fmt.Printf("Web Server running on localhost:%s ...\n", s.Config.FileConf.HTTPPort) + serveError <- http.ListenAndServe(":"+s.Config.FileConf.HTTPPort, s.router) + }() + + fmt.Printf("XDS agent running...\n") + + // Wait for stop, restart or error signals + select { + case <-s.stop: + // Shutting down permanently + s.sessions.Stop() + s.Log.Infoln("shutting down (stop)") + case err = <-serveError: + // Error due to listen/serve failure + s.Log.Errorln(err) + } + + return nil +} + +// Stop web server +func (s *WebServer) Stop() { + s.api.Stop() + close(s.stop) +} + +// serveIndexFile provides initial file (eg. index.html) of webapp +func (s *WebServer) serveIndexFile(c *gin.Context) { + c.HTML(200, indexFilename, gin.H{}) +} + +// Add details in Header +func (s *WebServer) middlewareXDSDetails() gin.HandlerFunc { + return func(c *gin.Context) { + c.Header("XDS-Agent-Version", s.Config.Version) + c.Header("XDS-API-Version", s.Config.APIVersion) + c.Next() + } +} + +func (s *WebServer) isValidAPIKey(key string) bool { + return (s.Config.FileConf.XDSAPIKey != "" && key == s.Config.FileConf.XDSAPIKey) +} + +func (s *WebServer) middlewareCSRF() gin.HandlerFunc { + return func(c *gin.Context) { + // XXX - not used for now + c.Next() + return + /* + // Allow requests carrying a valid API key + if s.isValidAPIKey(c.Request.Header.Get("X-API-Key")) { + // Set the access-control-allow-origin header for CORS requests + // since a valid API key has been provided + c.Header("Access-Control-Allow-Origin", "*") + c.Next() + return + } + + // Allow io.socket request + if strings.HasPrefix(c.Request.URL.Path, "/socket.io") { + c.Next() + return + } + + // FIXME Add really CSRF support + + // Allow requests for anything not under the protected path prefix, + // and set a CSRF cookie if there isn't already a valid one. + //if !strings.HasPrefix(c.Request.URL.Path, prefix) { + // cookie, err := c.Cookie("CSRF-Token-" + unique) + // if err != nil || !validCsrfToken(cookie.Value) { + // s.Log.Debugln("new CSRF cookie in response to request for", c.Request.URL) + // c.SetCookie("CSRF-Token-"+unique, newCsrfToken(), 600, "/", "", false, false) + // } + // c.Next() + // return + //} + + // Verify the CSRF token + //token := c.Request.Header.Get("X-CSRF-Token-" + unique) + //if !validCsrfToken(token) { + // c.AbortWithError(403, "CSRF Error") + // return + //} + + //c.Next() + + c.AbortWithError(403, fmt.Errorf("Not valid API key")) + */ + } +} + +// CORS middleware +func (s *WebServer) middlewareCORS() gin.HandlerFunc { + return func(c *gin.Context) { + if c.Request.Method == "OPTIONS" { + c.Header("Access-Control-Allow-Origin", "*") + c.Header("Access-Control-Allow-Headers", "Content-Type, X-API-Key") + c.Header("Access-Control-Allow-Methods", "GET, POST, DELETE") + c.Header("Access-Control-Max-Age", cookieMaxAge) + c.AbortWithStatus(204) + return + } + c.Next() + } +} + +// socketHandler is the handler for the "main" websocket connection +func (s *WebServer) socketHandler(c *gin.Context) { + + // Retrieve user session + sess := s.sessions.Get(c) + if sess == nil { + c.JSON(500, gin.H{"error": "Cannot retrieve session"}) + return + } + + s.sIOServer.On("connection", func(so socketio.Socket) { + s.Log.Debugf("WS Connected (SID=%v)", so.Id()) + s.sessions.UpdateIOSocket(sess.ID, &so) + + so.On("disconnection", func() { + s.Log.Debugf("WS disconnected (SID=%v)", so.Id()) + s.sessions.UpdateIOSocket(sess.ID, nil) + }) + }) + + s.sIOServer.On("error", func(so socketio.Socket, err error) { + s.Log.Errorf("WS SID=%v Error : %v", so.Id(), err.Error()) + }) + + s.sIOServer.ServeHTTP(c.Writer, c.Request) +} diff --git a/lib/agent/xdsserver.go b/lib/agent/xdsserver.go new file mode 100644 index 0000000..b76908c --- /dev/null +++ b/lib/agent/xdsserver.go @@ -0,0 +1,605 @@ +package agent + +import ( + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "strings" + "sync" + "time" + + "github.com/gin-gonic/gin" + "github.com/iotbzh/xds-agent/lib/xdsconfig" + common "github.com/iotbzh/xds-common/golib" + uuid "github.com/satori/go.uuid" + sio_client "github.com/sebd71/go-socket.io-client" +) + +// XdsServer . +type XdsServer struct { + *Context + ID string + BaseURL string + APIURL string + PartialURL string + ConnRetry int + Connected bool + Disabled bool + ServerConfig *XdsServerConfig + + // Events management + CBOnError func(error) + CBOnDisconnect func(error) + sockEvents map[string][]*caller + sockEventsLock *sync.Mutex + + // Private fields + client *common.HTTPClient + ioSock *sio_client.Client + logOut io.Writer + apiRouter *gin.RouterGroup +} + +// XdsServerConfig Data return by GET /config +type XdsServerConfig struct { + ID string `json:"id"` + Version string `json:"version"` + APIVersion string `json:"apiVersion"` + VersionGitTag string `json:"gitTag"` + SupportedSharing map[string]bool `json:"supportedSharing"` + Builder XdsBuilderConfig `json:"builder"` +} + +// XdsBuilderConfig represents the builder container configuration +type XdsBuilderConfig struct { + IP string `json:"ip"` + Port string `json:"port"` + SyncThingID string `json:"syncThingID"` +} + +// XdsFolderType XdsServer folder type +type XdsFolderType string + +const ( + XdsTypePathMap = "PathMap" + XdsTypeCloudSync = "CloudSync" + XdsTypeCifsSmb = "CIFS" +) + +// XdsFolderConfig XdsServer folder config +type XdsFolderConfig struct { + ID string `json:"id"` + Label string `json:"label"` + ClientPath string `json:"path"` + Type XdsFolderType `json:"type"` + Status string `json:"status"` + IsInSync bool `json:"isInSync"` + DefaultSdk string `json:"defaultSdk"` + // Specific data depending on which Type is used + DataPathMap XdsPathMapConfig `json:"dataPathMap,omitempty"` + DataCloudSync XdsCloudSyncConfig `json:"dataCloudSync,omitempty"` +} + +// XdsPathMapConfig Path mapping specific data +type XdsPathMapConfig struct { + ServerPath string `json:"serverPath"` + CheckFile string `json:"checkFile"` + CheckContent string `json:"checkContent"` +} + +// XdsCloudSyncConfig CloudSync (AKA Syncthing) specific data +type XdsCloudSyncConfig struct { + SyncThingID string `json:"syncThingID"` + STSvrStatus string `json:"-"` + STSvrIsInSync bool `json:"-"` + STLocStatus string `json:"-"` + STLocIsInSync bool `json:"-"` +} + +// XdsEventRegisterArgs arguments used to register to XDS server events +type XdsEventRegisterArgs struct { + Name string `json:"name"` + ProjectID string `json:"filterProjectID"` +} + +// XdsEventFolderChange Folder change event structure +type XdsEventFolderChange struct { + Time string `json:"time"` + Type string `json:"type"` + Folder XdsFolderConfig `json:"folder"` +} + +// caller Used to chain event listeners +type caller struct { + id uuid.UUID + EventName string + Func func(interface{}) +} + +const _IDTempoPrefix = "tempo-" + +// NewXdsServer creates an instance of XdsServer +func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer { + return &XdsServer{ + Context: ctx, + ID: _IDTempoPrefix + uuid.NewV1().String(), + BaseURL: conf.URL, + APIURL: conf.APIBaseURL + conf.APIPartialURL, + PartialURL: conf.APIPartialURL, + ConnRetry: conf.ConnRetry, + Connected: false, + Disabled: false, + + sockEvents: make(map[string][]*caller), + sockEventsLock: &sync.Mutex{}, + logOut: ctx.Log.Out, + } +} + +// Close Free and close XDS Server connection +func (xs *XdsServer) Close() error { + xs.Connected = false + xs.Disabled = true + xs.ioSock = nil + xs._NotifyState() + return nil +} + +// Connect Establish HTTP connection with XDS Server +func (xs *XdsServer) Connect() error { + var err error + var retry int + + xs.Disabled = false + xs.Connected = false + + err = nil + for retry = xs.ConnRetry; retry > 0; retry-- { + if err = xs._CreateConnectHTTP(); err == nil { + break + } + if retry == xs.ConnRetry { + // Notify only on the first conn error + // doing that avoid 2 notifs (conn false; conn true) on startup + xs._NotifyState() + } + xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry) + time.Sleep(time.Second) + } + if retry == 0 { + // FIXME: re-use _reconnect to wait longer in background + return fmt.Errorf("Connection to XDS Server failure") + } + if err != nil { + return err + } + + // Check HTTP connection and establish WS connection + err = xs._connect(false) + + return err +} + +// IsTempoID returns true when server as a temporary id +func (xs *XdsServer) IsTempoID() bool { + return strings.HasPrefix(xs.ID, _IDTempoPrefix) +} + +// SetLoggerOutput Set logger ou +func (xs *XdsServer) SetLoggerOutput(out io.Writer) { + xs.logOut = out +} + +// SendCommand Send a command to XDS Server +func (xs *XdsServer) SendCommand(cmd string, body []byte) (*http.Response, error) { + url := cmd + if !strings.HasPrefix("/", cmd) { + url = "/" + cmd + } + return xs.client.HTTPPostWithRes(url, string(body)) +} + +// GetVersion Send Get request to retrieve XDS Server version +func (xs *XdsServer) GetVersion(res interface{}) error { + return xs._HTTPGet("/version", &res) +} + +// GetFolders Send GET request to get current folder configuration +func (xs *XdsServer) GetFolders(folders *[]XdsFolderConfig) error { + return xs._HTTPGet("/folders", folders) +} + +// FolderAdd Send POST request to add a folder +func (xs *XdsServer) FolderAdd(fld *XdsFolderConfig, res interface{}) error { + response, err := xs._HTTPPost("/folder", fld) + if err != nil { + return err + } + if response.StatusCode != 200 { + return fmt.Errorf("FolderAdd error status=%s", response.Status) + } + // Result is a XdsFolderConfig that is equivalent to ProjectConfig + err = json.Unmarshal(xs.client.ResponseToBArray(response), res) + + return err +} + +// FolderDelete Send DELETE request to delete a folder +func (xs *XdsServer) FolderDelete(id string) error { + return xs.client.HTTPDelete("/folder/" + id) +} + +// FolderSync Send POST request to force synchronization of a folder +func (xs *XdsServer) FolderSync(id string) error { + return xs.client.HTTPPost("/folder/sync/"+id, "") +} + +// SetAPIRouterGroup . +func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) { + xs.apiRouter = r +} + +// PassthroughGet Used to declare a route that sends directly a GET request to XDS Server +func (xs *XdsServer) PassthroughGet(url string) { + if xs.apiRouter == nil { + xs.Log.Errorf("apiRouter not set !") + return + } + + xs.apiRouter.GET(url, func(c *gin.Context) { + var data interface{} + if err := xs._HTTPGet(url, &data); err != nil { + if strings.Contains(err.Error(), "connection refused") { + xs.Connected = false + xs._NotifyState() + } + common.APIError(c, err.Error()) + return + } + + c.JSON(http.StatusOK, data) + }) +} + +// PassthroughPost Used to declare a route that sends directly a POST request to XDS Server +func (xs *XdsServer) PassthroughPost(url string) { + if xs.apiRouter == nil { + xs.Log.Errorf("apiRouter not set !") + return + } + + xs.apiRouter.POST(url, func(c *gin.Context) { + bodyReq := []byte{} + n, err := c.Request.Body.Read(bodyReq) + if err != nil { + common.APIError(c, err.Error()) + return + } + + response, err := xs._HTTPPost(url, bodyReq[:n]) + if err != nil { + common.APIError(c, err.Error()) + return + } + bodyRes, err := ioutil.ReadAll(response.Body) + if err != nil { + common.APIError(c, "Cannot read response body") + return + } + c.JSON(http.StatusOK, string(bodyRes)) + }) +} + +// EventRegister Post a request to register to an XdsServer event +func (xs *XdsServer) EventRegister(evName string, id string) error { + var err error + _, err = xs._HTTPPost("/events/register", XdsEventRegisterArgs{ + Name: evName, + ProjectID: id, + }) + return err +} + +// EventOn Register a callback on events reception +func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, error) { + if xs.ioSock == nil { + return uuid.Nil, fmt.Errorf("Io.Socket not initialized") + } + + xs.sockEventsLock.Lock() + defer xs.sockEventsLock.Unlock() + + if _, exist := xs.sockEvents[evName]; !exist { + // Register listener only the first time + evn := evName + + // FIXME: use generic type: data interface{} instead of data XdsEventFolderChange + var err error + if evName == "event:FolderStateChanged" { + err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error { + xs.sockEventsLock.Lock() + defer xs.sockEventsLock.Unlock() + for _, c := range xs.sockEvents[evn] { + c.Func(data) + } + return nil + }) + } else { + err = xs.ioSock.On(evn, f) + } + if err != nil { + return uuid.Nil, err + } + } + + c := &caller{ + id: uuid.NewV1(), + EventName: evName, + Func: f, + } + + xs.sockEvents[evName] = append(xs.sockEvents[evName], c) + return c.id, nil +} + +// EventOff Un-register a (or all) callbacks associated to an event +func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error { + xs.sockEventsLock.Lock() + defer xs.sockEventsLock.Unlock() + if _, exist := xs.sockEvents[evName]; exist { + if id == uuid.Nil { + // Un-register all + xs.sockEvents[evName] = []*caller{} + } else { + // Un-register only the specified callback + for i, ff := range xs.sockEvents[evName] { + if uuid.Equal(ff.id, id) { + xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...) + break + } + } + } + } + return nil +} + +// ProjectToFolder Convert Project structure to Folder structure +func (xs *XdsServer) ProjectToFolder(pPrj ProjectConfig) *XdsFolderConfig { + stID := "" + if pPrj.Type == XdsTypeCloudSync { + stID, _ = xs.SThg.IDGet() + } + fPrj := XdsFolderConfig{ + ID: pPrj.ID, + Label: pPrj.Label, + ClientPath: pPrj.ClientPath, + Type: XdsFolderType(pPrj.Type), + Status: pPrj.Status, + IsInSync: pPrj.IsInSync, + DefaultSdk: pPrj.DefaultSdk, + DataPathMap: XdsPathMapConfig{ + ServerPath: pPrj.ServerPath, + }, + DataCloudSync: XdsCloudSyncConfig{ + SyncThingID: stID, + STLocIsInSync: pPrj.IsInSync, + STLocStatus: pPrj.Status, + STSvrIsInSync: pPrj.IsInSync, + STSvrStatus: pPrj.Status, + }, + } + + return &fPrj +} + +// FolderToProject Convert Folder structure to Project structure +func (xs *XdsServer) FolderToProject(fPrj XdsFolderConfig) ProjectConfig { + inSync := fPrj.IsInSync + sts := fPrj.Status + + if fPrj.Type == XdsTypeCloudSync { + inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync + + sts = fPrj.DataCloudSync.STSvrStatus + switch fPrj.DataCloudSync.STLocStatus { + case StatusErrorConfig, StatusDisable, StatusPause: + sts = fPrj.DataCloudSync.STLocStatus + break + case StatusSyncing: + if sts != StatusErrorConfig && sts != StatusDisable && sts != StatusPause { + sts = StatusSyncing + } + break + case StatusEnable: + // keep STSvrStatus + break + } + } + + pPrj := ProjectConfig{ + ID: fPrj.ID, + ServerID: xs.ID, + Label: fPrj.Label, + ClientPath: fPrj.ClientPath, + ServerPath: fPrj.DataPathMap.ServerPath, + Type: ProjectType(fPrj.Type), + Status: sts, + IsInSync: inSync, + DefaultSdk: fPrj.DefaultSdk, + } + return pPrj +} + +/*** +** Private functions +***/ + +// Create HTTP client +func (xs *XdsServer) _CreateConnectHTTP() error { + var err error + xs.client, err = common.HTTPNewClient(xs.BaseURL, + common.HTTPClientConfig{ + URLPrefix: "/api/v1", + HeaderClientKeyName: "Xds-Sid", + CsrfDisable: true, + LogOut: xs.logOut, + LogPrefix: "XDSSERVER: ", + LogLevel: common.HTTPLogLevelWarning, + }) + + xs.client.SetLogLevel(xs.Log.Level.String()) + + if err != nil { + msg := ": " + err.Error() + if strings.Contains(err.Error(), "connection refused") { + msg = fmt.Sprintf("(url: %s)", xs.BaseURL) + } + return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg) + } + if xs.client == nil { + return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)") + } + + return nil +} + +// _HTTPGet . +func (xs *XdsServer) _HTTPGet(url string, data interface{}) error { + var dd []byte + if err := xs.client.HTTPGet(url, &dd); err != nil { + return err + } + return json.Unmarshal(dd, &data) +} + +// _HTTPPost . +func (xs *XdsServer) _HTTPPost(url string, data interface{}) (*http.Response, error) { + body, err := json.Marshal(data) + if err != nil { + return nil, err + } + return xs.client.HTTPPostWithRes(url, string(body)) +} + +// Re-established connection +func (xs *XdsServer) _reconnect() error { + err := xs._connect(true) + if err == nil { + // Reload projects list for this server + err = xs.projects.Init(xs) + } + return err +} + +// Established HTTP and WS connection and retrieve XDSServer config +func (xs *XdsServer) _connect(reConn bool) error { + + xdsCfg := XdsServerConfig{} + if err := xs._HTTPGet("/config", &xdsCfg); err != nil { + xs.Connected = false + if !reConn { + xs._NotifyState() + } + return err + } + + if reConn && xs.ID != xdsCfg.ID { + xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ID) + } + + // Update local XDS config + xs.ID = xdsCfg.ID + xs.ServerConfig = &xdsCfg + + // Establish WS connection and register listen + if err := xs._SocketConnect(); err != nil { + xs.Connected = false + xs._NotifyState() + return err + } + + xs.Connected = true + xs._NotifyState() + return nil +} + +// Create WebSocket (io.socket) connection +func (xs *XdsServer) _SocketConnect() error { + + xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL) + + opts := &sio_client.Options{ + Transport: "websocket", + Header: make(map[string][]string), + } + opts.Header["XDS-SID"] = []string{xs.client.GetClientID()} + + iosk, err := sio_client.NewClient(xs.BaseURL, opts) + if err != nil { + return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err) + } + xs.ioSock = iosk + + // Register some listeners + + iosk.On("error", func(err error) { + xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err) + if xs.CBOnError != nil { + xs.CBOnError(err) + } + }) + + iosk.On("disconnection", func(err error) { + xs.Log.Infof("IO.socket disconnection server %s", xs.ID) + if xs.CBOnDisconnect != nil { + xs.CBOnDisconnect(err) + } + xs.Connected = false + xs._NotifyState() + + // Try to reconnect during 15min (or at least while not disabled) + go func() { + count := 0 + waitTime := 1 + for !xs.Disabled && !xs.Connected { + count++ + if count%60 == 0 { + waitTime *= 5 + } + if waitTime > 15*60 { + xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID) + return + } + time.Sleep(time.Second * time.Duration(waitTime)) + xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count) + + xs._reconnect() + } + }() + }) + + // XXX - There is no connection event generated so, just consider that + // we are connected when NewClient return successfully + /* iosk.On("connection", func() { ... }) */ + xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID) + + return nil +} + +// Send event to notify changes +func (xs *XdsServer) _NotifyState() { + + evSts := ServerCfg{ + ID: xs.ID, + URL: xs.BaseURL, + APIURL: xs.APIURL, + PartialURL: xs.PartialURL, + ConnRetry: xs.ConnRetry, + Connected: xs.Connected, + } + if err := xs.events.Emit(EVTServerConfig, evSts); err != nil { + xs.Log.Warningf("Cannot notify XdsServer state change: %v", err) + } +} diff --git a/lib/apiv1/apiv1.go b/lib/apiv1/apiv1.go deleted file mode 100644 index 734929b..0000000 --- a/lib/apiv1/apiv1.go +++ /dev/null @@ -1,36 +0,0 @@ -package apiv1 - -import ( - "github.com/Sirupsen/logrus" - "github.com/gin-gonic/gin" - - "github.com/iotbzh/xds-agent/lib/session" - "github.com/iotbzh/xds-agent/lib/xdsconfig" -) - -// APIService . -type APIService struct { - router *gin.Engine - apiRouter *gin.RouterGroup - sessions *session.Sessions - cfg *xdsconfig.Config - log *logrus.Logger -} - -// New creates a new instance of API service -func New(sess *session.Sessions, conf *xdsconfig.Config, log *logrus.Logger, r *gin.Engine) *APIService { - s := &APIService{ - router: r, - sessions: sess, - apiRouter: r.Group("/api/v1"), - cfg: conf, - log: log, - } - - s.apiRouter.GET("/version", s.getVersion) - - s.apiRouter.GET("/config", s.getConfig) - s.apiRouter.POST("/config", s.setConfig) - - return s -} diff --git a/lib/apiv1/config.go b/lib/apiv1/config.go deleted file mode 100644 index 47155ed..0000000 --- a/lib/apiv1/config.go +++ /dev/null @@ -1,45 +0,0 @@ -package apiv1 - -import ( - "net/http" - "sync" - - "github.com/gin-gonic/gin" - "github.com/iotbzh/xds-agent/lib/xdsconfig" - common "github.com/iotbzh/xds-common/golib" -) - -var confMut sync.Mutex - -// GetConfig returns the configuration -func (s *APIService) getConfig(c *gin.Context) { - confMut.Lock() - defer confMut.Unlock() - - c.JSON(http.StatusOK, s.cfg) -} - -// SetConfig sets configuration -func (s *APIService) setConfig(c *gin.Context) { - // FIXME - must be tested - c.JSON(http.StatusNotImplemented, "Not implemented") - - var cfgArg xdsconfig.Config - - if c.BindJSON(&cfgArg) != nil { - common.APIError(c, "Invalid arguments") - return - } - - confMut.Lock() - defer confMut.Unlock() - - s.log.Debugln("SET config: ", cfgArg) - - if err := s.cfg.UpdateAll(cfgArg); err != nil { - common.APIError(c, err.Error()) - return - } - - c.JSON(http.StatusOK, s.cfg) -} diff --git a/lib/apiv1/version.go b/lib/apiv1/version.go deleted file mode 100644 index e022441..0000000 --- a/lib/apiv1/version.go +++ /dev/null @@ -1,24 +0,0 @@ -package apiv1 - -import ( - "net/http" - - "github.com/gin-gonic/gin" -) - -type version struct { - Version string `json:"version"` - APIVersion string `json:"apiVersion"` - VersionGitTag string `json:"gitTag"` -} - -// getInfo : return various information about server -func (s *APIService) getVersion(c *gin.Context) { - response := version{ - Version: s.cfg.Version, - APIVersion: s.cfg.APIVersion, - VersionGitTag: s.cfg.VersionGitTag, - } - - c.JSON(http.StatusOK, response) -} diff --git a/lib/syncthing/st.go b/lib/syncthing/st.go index 6199a8f..031a2ac 100644 --- a/lib/syncthing/st.go +++ b/lib/syncthing/st.go @@ -24,11 +24,14 @@ import ( // SyncThing . type SyncThing struct { - BaseURL string - APIKey string - Home string - STCmd *exec.Cmd - STICmd *exec.Cmd + BaseURL string + APIKey string + Home string + STCmd *exec.Cmd + STICmd *exec.Cmd + MyID string + Connected bool + Events *Events // Private fields binDir string @@ -37,6 +40,7 @@ type SyncThing struct { exitSTIChan chan ExitChan client *common.HTTPClient log *logrus.Logger + conf *xdsconfig.Config } // ExitChan Channel used for process exit @@ -45,6 +49,42 @@ type ExitChan struct { err error } +// ConfigInSync Check whether if Syncthing configuration is in sync +type configInSync struct { + ConfigInSync bool `json:"configInSync"` +} + +// FolderStatus Information about the current status of a folder. +type FolderStatus struct { + GlobalFiles int `json:"globalFiles"` + GlobalDirectories int `json:"globalDirectories"` + GlobalSymlinks int `json:"globalSymlinks"` + GlobalDeleted int `json:"globalDeleted"` + GlobalBytes int64 `json:"globalBytes"` + + LocalFiles int `json:"localFiles"` + LocalDirectories int `json:"localDirectories"` + LocalSymlinks int `json:"localSymlinks"` + LocalDeleted int `json:"localDeleted"` + LocalBytes int64 `json:"localBytes"` + + NeedFiles int `json:"needFiles"` + NeedDirectories int `json:"needDirectories"` + NeedSymlinks int `json:"needSymlinks"` + NeedDeletes int `json:"needDeletes"` + NeedBytes int64 `json:"needBytes"` + + InSyncFiles int `json:"inSyncFiles"` + InSyncBytes int64 `json:"inSyncBytes"` + + State string `json:"state"` + StateChanged time.Time `json:"stateChanged"` + + Sequence int64 `json:"sequence"` + + IgnorePatterns bool `json:"ignorePatterns"` +} + // NewSyncThing creates a new instance of Syncthing func NewSyncThing(conf *xdsconfig.Config, log *logrus.Logger) *SyncThing { var url, apiKey, home, binDir string @@ -75,8 +115,12 @@ func NewSyncThing(conf *xdsconfig.Config, log *logrus.Logger) *SyncThing { binDir: binDir, logsDir: conf.FileConf.LogsDir, log: log, + conf: conf, } + // Create Events monitoring + s.Events = s.NewEventListener() + return &s } @@ -86,8 +130,9 @@ func (s *SyncThing) startProc(exeName string, args []string, env []string, eChan var exePath string // Kill existing process (useful for debug ;-) ) - if os.Getenv("DEBUG_MODE") != "" { - exec.Command("bash", "-c", "pkill -9 "+exeName).Output() + if _, dbg := os.LookupEnv("XDS_DEBUG_MODE"); dbg { + fmt.Printf("\n!!! DEBUG_MODE set: KILL existing %s process(es) !!!\n", exeName) + exec.Command("bash", "-c", "ps -ax |grep "+exeName+" |grep "+s.BaseURL+" |cut -d' ' -f 1|xargs -I{} kill -9 {}").Output() } // When not set (or set to '.') set bin to path of xds-agent executable @@ -182,6 +227,8 @@ func (s *SyncThing) Start() (*exec.Cmd, error) { "STNOUPGRADE=1", } + /* FIXME - STILL NEEDED ?, if not SUP code + // XXX - temporary hack because -gui-apikey seems to correctly handle by // syncthing the early first time stConfigFile := filepath.Join(s.Home, "config.xml") @@ -211,12 +258,12 @@ func (s *SyncThing) Start() (*exec.Cmd, error) { return nil, fmt.Errorf("Cannot write Syncthing config file to set apikey") } } - + */ s.STCmd, err = s.startProc("syncthing", args, env, &s.exitSTChan) // Use autogenerated apikey if not set by config.json - if s.APIKey == "" { - if fd, err := os.Open(stConfigFile); err == nil { + if err == nil && s.APIKey == "" { + if fd, err := os.Open(filepath.Join(s.Home, "config.xml")); err == nil { defer fd.Close() if b, err := ioutil.ReadAll(fd); err == nil { re := regexp.MustCompile("<apikey>(.*)</apikey>") @@ -294,11 +341,17 @@ func (s *SyncThing) StopInotify() { // Connect Establish HTTP connection with Syncthing func (s *SyncThing) Connect() error { var err error + s.Connected = false s.client, err = common.HTTPNewClient(s.BaseURL, common.HTTPClientConfig{ URLPrefix: "/rest", HeaderClientKeyName: "X-Syncthing-ID", + LogOut: s.conf.LogVerboseOut, + LogPrefix: "SYNCTHING: ", + LogLevel: common.HTTPLogLevelWarning, }) + s.client.SetLogLevel(s.log.Level.String()) + if err != nil { msg := ": " + err.Error() if strings.Contains(err.Error(), "connection refused") { @@ -310,11 +363,17 @@ func (s *SyncThing) Connect() error { return fmt.Errorf("ERROR: cannot connect to Syncthing (null client)") } - s.client.SetLogLevel(s.log.Level.String()) - s.client.LoggerPrefix = "SYNCTHING: " - s.client.LoggerOut = s.log.Out + s.MyID, err = s.IDGet() + if err != nil { + return fmt.Errorf("ERROR: cannot retrieve ID") + } + + s.Connected = true - return nil + // Start events monitoring + err = s.Events.Start() + + return err } // IDGet returns the Syncthing ID of Syncthing instance running locally @@ -347,3 +406,16 @@ func (s *SyncThing) ConfigSet(cfg config.Configuration) error { } return s.client.HTTPPost("system/config", string(body)) } + +// IsConfigInSync Returns true if configuration is in sync +func (s *SyncThing) IsConfigInSync() (bool, error) { + var data []byte + var d configInSync + if err := s.client.HTTPGet("system/config/insync", &data); err != nil { + return false, err + } + if err := json.Unmarshal(data, &d); err != nil { + return false, err + } + return d.ConfigInSync, nil +} diff --git a/lib/syncthing/stEvent.go b/lib/syncthing/stEvent.go new file mode 100644 index 0000000..0017555 --- /dev/null +++ b/lib/syncthing/stEvent.go @@ -0,0 +1,265 @@ +package st + +import ( + "encoding/json" + "fmt" + "os" + "strconv" + "strings" + "time" + + "github.com/Sirupsen/logrus" +) + +// Events . +type Events struct { + MonitorTime time.Duration + Debug bool + + stop chan bool + st *SyncThing + log *logrus.Logger + cbArr map[string][]cbMap +} + +type Event struct { + Type string `json:"type"` + Time time.Time `json:"time"` + Data map[string]string `json:"data"` +} + +type EventsCBData map[string]interface{} +type EventsCB func(ev Event, cbData *EventsCBData) + +const ( + EventFolderCompletion string = "FolderCompletion" + EventFolderSummary string = "FolderSummary" + EventFolderPaused string = "FolderPaused" + EventFolderResumed string = "FolderResumed" + EventFolderErrors string = "FolderErrors" + EventStateChanged string = "StateChanged" +) + +var EventsAll string = EventFolderCompletion + "|" + + EventFolderSummary + "|" + + EventFolderPaused + "|" + + EventFolderResumed + "|" + + EventFolderErrors + "|" + + EventStateChanged + +type STEvent struct { + // Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API + SubscriptionID int `json:"id"` + // Global ID of the event across all subscriptions + GlobalID int `json:"globalID"` + Time time.Time `json:"time"` + Type string `json:"type"` + Data map[string]interface{} `json:"data"` +} + +type cbMap struct { + id int + cb EventsCB + filterID string + data *EventsCBData +} + +// NewEventListener Create a new instance of Event listener +func (s *SyncThing) NewEventListener() *Events { + _, dbg := os.LookupEnv("XDS_LOG_SILLY_STEVENTS") // set to add more debug log + return &Events{ + MonitorTime: 100, // in Milliseconds + Debug: dbg, + stop: make(chan bool, 1), + st: s, + log: s.log, + cbArr: make(map[string][]cbMap), + } +} + +// Start starts event monitoring loop +func (e *Events) Start() error { + go e.monitorLoop() + return nil +} + +// Stop stops event monitoring loop +func (e *Events) Stop() { + e.stop <- true +} + +// Register Add a listener on an event +func (e *Events) Register(evName string, cb EventsCB, filterID string, data *EventsCBData) (int, error) { + if evName == "" || !strings.Contains(EventsAll, evName) { + return -1, fmt.Errorf("Unknown event name") + } + if data == nil { + data = &EventsCBData{} + } + + cbList := []cbMap{} + if _, ok := e.cbArr[evName]; ok { + cbList = e.cbArr[evName] + } + + id := len(cbList) + (*data)["id"] = strconv.Itoa(id) + + e.cbArr[evName] = append(cbList, cbMap{id: id, cb: cb, filterID: filterID, data: data}) + + return id, nil +} + +// UnRegister Remove a listener event +func (e *Events) UnRegister(evName string, id int) error { + cbKey, ok := e.cbArr[evName] + if !ok { + return fmt.Errorf("No event registered to such name") + } + + // FIXME - NOT TESTED + if id >= len(cbKey) { + return fmt.Errorf("Invalid id") + } else if id == len(cbKey) { + e.cbArr[evName] = cbKey[:id-1] + } else { + e.cbArr[evName] = cbKey[id : id+1] + } + + return nil +} + +// GetEvents returns the Syncthing events +func (e *Events) getEvents(since int) ([]STEvent, error) { + var data []byte + ev := []STEvent{} + url := "events" + if since != -1 { + url += "?since=" + strconv.Itoa(since) + } + if err := e.st.client.HTTPGet(url, &data); err != nil { + return ev, err + } + err := json.Unmarshal(data, &ev) + return ev, err +} + +// Loop to monitor Syncthing events +func (e *Events) monitorLoop() { + e.log.Infof("Event monitoring running...") + since := 0 + cntErrConn := 0 + cntErrRetry := 1 + for { + select { + case <-e.stop: + e.log.Infof("Event monitoring exited") + return + + case <-time.After(e.MonitorTime * time.Millisecond): + + if !e.st.Connected { + cntErrConn++ + time.Sleep(time.Second) + if cntErrConn > cntErrRetry { + e.log.Error("ST Event monitor: ST connection down") + cntErrConn = 0 + cntErrRetry *= 2 + if _, err := e.getEvents(since); err == nil { + e.st.Connected = true + cntErrRetry = 1 + // XXX - should we reset since value ? + goto readEvent + } + } + continue + } + + readEvent: + stEvArr, err := e.getEvents(since) + if err != nil { + e.log.Errorf("Syncthing Get Events: %v", err) + e.st.Connected = false + continue + } + + // Process events + for _, stEv := range stEvArr { + since = stEv.SubscriptionID + if e.Debug { + e.log.Warnf("ST EVENT: %d %s\n %v", stEv.GlobalID, stEv.Type, stEv) + } + + cbKey, ok := e.cbArr[stEv.Type] + if !ok { + continue + } + + evData := Event{ + Type: stEv.Type, + Time: stEv.Time, + } + + // Decode Events + // FIXME: re-define data struct for each events + // instead of map of string and use JSON marshing/unmarshing + fID := "" + evData.Data = make(map[string]string) + switch stEv.Type { + + case EventFolderCompletion: + fID = convString(stEv.Data["folder"]) + evData.Data["completion"] = convFloat64(stEv.Data["completion"]) + + case EventFolderSummary: + fID = convString(stEv.Data["folder"]) + evData.Data["needBytes"] = convInt64(stEv.Data["needBytes"]) + evData.Data["state"] = convString(stEv.Data["state"]) + + case EventFolderPaused, EventFolderResumed: + fID = convString(stEv.Data["id"]) + evData.Data["label"] = convString(stEv.Data["label"]) + + case EventFolderErrors: + fID = convString(stEv.Data["folder"]) + // TODO decode array evData.Data["errors"] = convString(stEv.Data["errors"]) + + case EventStateChanged: + fID = convString(stEv.Data["folder"]) + evData.Data["from"] = convString(stEv.Data["from"]) + evData.Data["to"] = convString(stEv.Data["to"]) + + default: + e.log.Warnf("Unsupported event type") + } + + if fID != "" { + evData.Data["id"] = fID + } + + // Call all registered callbacks + for _, c := range cbKey { + if e.Debug { + e.log.Warnf("EVENT CB fID=%s, filterID=%s", fID, c.filterID) + } + // Call when filterID is not set or when it matches + if c.filterID == "" || (fID != "" && fID == c.filterID) { + c.cb(evData, c.data) + } + } + } + } + } +} + +func convString(d interface{}) string { + return d.(string) +} + +func convFloat64(d interface{}) string { + return strconv.FormatFloat(d.(float64), 'f', -1, 64) +} + +func convInt64(d interface{}) string { + return strconv.FormatInt(d.(int64), 10) +} diff --git a/lib/syncthing/stfolder.go b/lib/syncthing/stfolder.go index d79e579..196e3c7 100644 --- a/lib/syncthing/stfolder.go +++ b/lib/syncthing/stfolder.go @@ -1,42 +1,82 @@ package st import ( - "path/filepath" + "encoding/json" + "fmt" "strings" - "github.com/syncthing/syncthing/lib/config" + common "github.com/iotbzh/xds-common/golib" + stconfig "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/protocol" ) -// FIXME remove and use an interface on xdsconfig.FolderConfig +// FolderChangeArg argument structure used by FolderChange type FolderChangeArg struct { ID string Label string RelativePath string SyncThingID string - ShareRootDir string } +// FolderLoadFromStConfig Load/Retrieve folder config from syncthing database +/* +func (s *SyncThing) FolderLoadFromStConfig(f *[]XdsFolderConfig) error { + + defaultSdk := "" // cannot know which was the default sdk + + stCfg, err := s.ConfigGet() + if err != nil { + return err + } + if len(stCfg.Devices) < 1 { + return fmt.Errorf("Cannot load syncthing config: no device defined") + } + devID := stCfg.Devices[0].DeviceID.String() + if devID == s.MyID { + if len(stCfg.Devices) < 2 { + return fmt.Errorf("Cannot load syncthing config: no valid device found") + } + devID = stCfg.Devices[1].DeviceID.String() + } + + for _, stFld := range stCfg.Folders { + *f = append(*f, XdsFolderConfig{ + ID: stFld.ID, + Label: stFld.Label, + ClientPath: strings.TrimRight(stFld.Path, "/"), + Type: XdsTypeCloudSync, + Status: StatusDisable, + DefaultSdk: defaultSdk, + RootPath: "", + DataCloudSync: XdsCloudSyncConfig{SyncThingID: devID}, + }) + } + + return nil +} +*/ + // FolderChange is called when configuration has changed -func (s *SyncThing) FolderChange(f FolderChangeArg) error { +func (s *SyncThing) FolderChange(f FolderChangeArg) (string, error) { // Get current config stCfg, err := s.ConfigGet() if err != nil { s.log.Errorln(err) - return err + return "", err } + stClientID := f.SyncThingID // Add new Device if needed var devID protocol.DeviceID - if err := devID.UnmarshalText([]byte(f.SyncThingID)); err != nil { - s.log.Errorf("not a valid device id (err %v)\n", err) - return err + if err := devID.UnmarshalText([]byte(stClientID)); err != nil { + s.log.Errorf("not a valid device id (err %v)", err) + return "", err } - newDevice := config.DeviceConfiguration{ + newDevice := stconfig.DeviceConfiguration{ DeviceID: devID, - Name: f.SyncThingID, + Name: stClientID, Addresses: []string{"dynamic"}, } @@ -57,21 +97,34 @@ func (s *SyncThing) FolderChange(f FolderChangeArg) error { label = strings.Split(id, "/")[0] } if id = f.ID; id == "" { - id = f.SyncThingID[0:15] + "_" + label + id = stClientID[0:15] + "_" + label + } + + // Resolve local path + pathCli, err := common.ResolveEnvVar(f.RelativePath) + if err != nil { + pathCli = f.RelativePath + } + + folder := stconfig.FolderConfiguration{ + ID: id, + Label: label, + Path: pathCli, + AutoNormalize: true, } - folder := config.FolderConfiguration{ - ID: id, - Label: label, - RawPath: filepath.Join(f.ShareRootDir, f.RelativePath), + /* TODO - add it ? + if s.conf.FileConf.SThgConf.RescanIntervalS > 0 { + folder.RescanIntervalS = s.conf.FileConf.SThgConf.RescanIntervalS } + */ - folder.Devices = append(folder.Devices, config.FolderDeviceConfiguration{ + folder.Devices = append(folder.Devices, stconfig.FolderDeviceConfiguration{ DeviceID: newDevice.DeviceID, }) found = false - var fld config.FolderConfiguration + var fld stconfig.FolderConfiguration for _, fld = range stCfg.Folders { if folder.ID == fld.ID { fld = folder @@ -85,11 +138,8 @@ func (s *SyncThing) FolderChange(f FolderChangeArg) error { } err = s.ConfigSet(stCfg) - if err != nil { - s.log.Errorln(err) - } - return nil + return id, err } // FolderDelete is called to delete a folder config @@ -114,3 +164,61 @@ func (s *SyncThing) FolderDelete(id string) error { return nil } + +// FolderConfigGet Returns the configuration of a specific folder +func (s *SyncThing) FolderConfigGet(folderID string) (stconfig.FolderConfiguration, error) { + fc := stconfig.FolderConfiguration{} + if folderID == "" { + return fc, fmt.Errorf("folderID not set") + } + cfg, err := s.ConfigGet() + if err != nil { + return fc, err + } + for _, f := range cfg.Folders { + if f.ID == folderID { + fc = f + return fc, nil + } + } + return fc, fmt.Errorf("id not found") +} + +// FolderStatus Returns all information about the current +func (s *SyncThing) FolderStatus(folderID string) (*FolderStatus, error) { + var data []byte + var res FolderStatus + if folderID == "" { + return nil, fmt.Errorf("folderID not set") + } + if err := s.client.HTTPGet("db/status?folder="+folderID, &data); err != nil { + return nil, err + } + if err := json.Unmarshal(data, &res); err != nil { + return nil, err + } + return &res, nil +} + +// IsFolderInSync Returns true when folder is in sync +func (s *SyncThing) IsFolderInSync(folderID string) (bool, error) { + sts, err := s.FolderStatus(folderID) + if err != nil { + return false, err + } + return sts.NeedBytes == 0 && sts.State == "idle", nil +} + +// FolderScan Request immediate folder scan. +// Scan all folders if folderID param is empty +func (s *SyncThing) FolderScan(folderID string, subpath string) error { + url := "db/scan" + if folderID != "" { + url += "?folder=" + folderID + + if subpath != "" { + url += "&sub=" + subpath + } + } + return s.client.HTTPPost(url, "") +} diff --git a/lib/webserver/server.go b/lib/webserver/server.go deleted file mode 100644 index b835a65..0000000 --- a/lib/webserver/server.go +++ /dev/null @@ -1,226 +0,0 @@ -package webserver - -import ( - "fmt" - "net/http" - "strings" - - "github.com/Sirupsen/logrus" - "github.com/gin-gonic/gin" - "github.com/googollee/go-socket.io" - "github.com/iotbzh/xds-agent/lib/apiv1" - "github.com/iotbzh/xds-agent/lib/session" - "github.com/iotbzh/xds-agent/lib/xdsconfig" -) - -// ServerService . -type ServerService struct { - router *gin.Engine - api *apiv1.APIService - sIOServer *socketio.Server - webApp *gin.RouterGroup - cfg *xdsconfig.Config - sessions *session.Sessions - log *logrus.Logger - stop chan struct{} // signals intentional stop -} - -const indexFilename = "index.html" -const cookieMaxAge = "3600" - -// New creates an instance of ServerService -func New(conf *xdsconfig.Config, log *logrus.Logger) *ServerService { - - // Setup logging for gin router - if log.Level == logrus.DebugLevel { - gin.SetMode(gin.DebugMode) - } else { - gin.SetMode(gin.ReleaseMode) - } - - // TODO - // - try to bind gin DefaultWriter & DefaultErrorWriter to logrus logger - // - try to fix pb about isTerminal=false when out is in VSC Debug Console - //gin.DefaultWriter = ?? - //gin.DefaultErrorWriter = ?? - - // Creates gin router - r := gin.New() - - svr := &ServerService{ - router: r, - api: nil, - sIOServer: nil, - webApp: nil, - cfg: conf, - log: log, - sessions: nil, - stop: make(chan struct{}), - } - - return svr -} - -// Serve starts a new instance of the Web Server -func (s *ServerService) Serve() error { - var err error - - // Setup middlewares - s.router.Use(gin.Logger()) - s.router.Use(gin.Recovery()) - s.router.Use(s.middlewareCORS()) - s.router.Use(s.middlewareXDSDetails()) - s.router.Use(s.middlewareCSRF()) - - // Sessions manager - s.sessions = session.NewClientSessions(s.router, s.log, cookieMaxAge) - - s.router.GET("", s.slashHandler) - - // Create REST API - s.api = apiv1.New(s.sessions, s.cfg, s.log, s.router) - - // Websocket routes - s.sIOServer, err = socketio.NewServer(nil) - if err != nil { - s.log.Fatalln(err) - } - - s.router.GET("/socket.io/", s.socketHandler) - s.router.POST("/socket.io/", s.socketHandler) - /* TODO: do we want to support ws://... ? - s.router.Handle("WS", "/socket.io/", s.socketHandler) - s.router.Handle("WSS", "/socket.io/", s.socketHandler) - */ - - // Serve in the background - serveError := make(chan error, 1) - go func() { - fmt.Printf("Web Server running on localhost:%s ...\n", s.cfg.HTTPPort) - serveError <- http.ListenAndServe(":"+s.cfg.HTTPPort, s.router) - }() - - fmt.Printf("XDS agent running...\n") - - // Wait for stop, restart or error signals - select { - case <-s.stop: - // Shutting down permanently - s.sessions.Stop() - s.log.Infoln("shutting down (stop)") - case err = <-serveError: - // Error due to listen/serve failure - s.log.Errorln(err) - } - - return nil -} - -// Stop web server -func (s *ServerService) Stop() { - close(s.stop) -} - -// serveSlash provides response to GET "/" -func (s *ServerService) slashHandler(c *gin.Context) { - c.String(200, "Hello from XDS agent!") -} - -// Add details in Header -func (s *ServerService) middlewareXDSDetails() gin.HandlerFunc { - return func(c *gin.Context) { - c.Header("XDS-Agent-Version", s.cfg.Version) - c.Header("XDS-API-Version", s.cfg.APIVersion) - c.Next() - } -} - -func (s *ServerService) isValidAPIKey(key string) bool { - return (key == s.cfg.FileConf.XDSAPIKey && key != "") -} - -func (s *ServerService) middlewareCSRF() gin.HandlerFunc { - return func(c *gin.Context) { - // Allow requests carrying a valid API key - if s.isValidAPIKey(c.Request.Header.Get("X-API-Key")) { - // Set the access-control-allow-origin header for CORS requests - // since a valid API key has been provided - c.Header("Access-Control-Allow-Origin", "*") - c.Next() - return - } - - // Allow io.socket request - if strings.HasPrefix(c.Request.URL.Path, "/socket.io") { - c.Next() - return - } - - /* FIXME Add really CSRF support - - // Allow requests for anything not under the protected path prefix, - // and set a CSRF cookie if there isn't already a valid one. - if !strings.HasPrefix(c.Request.URL.Path, prefix) { - cookie, err := c.Cookie("CSRF-Token-" + unique) - if err != nil || !validCsrfToken(cookie.Value) { - s.log.Debugln("new CSRF cookie in response to request for", c.Request.URL) - c.SetCookie("CSRF-Token-"+unique, newCsrfToken(), 600, "/", "", false, false) - } - c.Next() - return - } - - // Verify the CSRF token - token := c.Request.Header.Get("X-CSRF-Token-" + unique) - if !validCsrfToken(token) { - c.AbortWithError(403, "CSRF Error") - return - } - - c.Next() - */ - c.AbortWithError(403, fmt.Errorf("Not valid API key")) - } -} - -// CORS middleware -func (s *ServerService) middlewareCORS() gin.HandlerFunc { - return func(c *gin.Context) { - if c.Request.Method == "OPTIONS" { - c.Header("Access-Control-Allow-Origin", "*") - c.Header("Access-Control-Allow-Headers", "Content-Type, X-API-Key") - c.Header("Access-Control-Allow-Methods", "GET, POST, DELETE") - c.Header("Access-Control-Max-Age", cookieMaxAge) - c.AbortWithStatus(204) - return - } - c.Next() - } -} - -// socketHandler is the handler for the "main" websocket connection -func (s *ServerService) socketHandler(c *gin.Context) { - - // Retrieve user session - sess := s.sessions.Get(c) - if sess == nil { - c.JSON(500, gin.H{"error": "Cannot retrieve session"}) - return - } - - s.sIOServer.On("connection", func(so socketio.Socket) { - s.log.Debugf("WS Connected (SID=%v)", so.Id()) - s.sessions.UpdateIOSocket(sess.ID, &so) - - so.On("disconnection", func() { - s.log.Debugf("WS disconnected (SID=%v)", so.Id()) - s.sessions.UpdateIOSocket(sess.ID, nil) - }) - }) - - s.sIOServer.On("error", func(so socketio.Socket, err error) { - s.log.Errorf("WS SID=%v Error : %v", so.Id(), err.Error()) - }) - - s.sIOServer.ServeHTTP(c.Writer, c.Request) -} diff --git a/lib/xdsconfig/config.go b/lib/xdsconfig/config.go index 854d383..9e44070 100644 --- a/lib/xdsconfig/config.go +++ b/lib/xdsconfig/config.go @@ -2,24 +2,34 @@ package xdsconfig import ( "fmt" + "io" + "path/filepath" "os" "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" common "github.com/iotbzh/xds-common/golib" + uuid "github.com/satori/go.uuid" ) // Config parameters (json format) of /config command type Config struct { - Version string `json:"version"` - APIVersion string `json:"apiVersion"` - VersionGitTag string `json:"gitTag"` - - // Private / un-exported fields - HTTPPort string `json:"-"` - FileConf *FileConfig `json:"-"` - Log *logrus.Logger `json:"-"` + AgentUID string + Version string + APIVersion string + VersionGitTag string + Options Options + FileConf FileConfig + Log *logrus.Logger + LogVerboseOut io.Writer +} + +// Options set at the command line +type Options struct { + ConfigFile string + LogLevel string + LogFile string } // Config default values @@ -32,39 +42,81 @@ const ( func Init(ctx *cli.Context, log *logrus.Logger) (*Config, error) { var err error + defaultWebAppDir := "${EXEPATH}/www" + defaultSTHomeDir := "${HOME}/.xds/agent/syncthing-config" + + // TODO: allocate uuid only the first time and save+reuse it later + uuid := uuid.NewV1().String() + // Define default configuration c := Config{ + AgentUID: uuid, Version: ctx.App.Metadata["version"].(string), APIVersion: DefaultAPIVersion, VersionGitTag: ctx.App.Metadata["git-tag"].(string), - HTTPPort: "8010", - FileConf: &FileConfig{ - LogsDir: "/tmp/logs", + Options: Options{ + ConfigFile: ctx.GlobalString("config"), + LogLevel: ctx.GlobalString("log"), + LogFile: ctx.GlobalString("logfile"), + }, + + FileConf: FileConfig{ + HTTPPort: "8800", + WebAppDir: defaultWebAppDir, + LogsDir: "/tmp/logs", + ServersConf: []XDSServerConf{ + XDSServerConf{ + URL: "http://localhost:8000", + ConnRetry: 10, + }, + }, SThgConf: &SyncThingConf{ - Home: "${HOME}/.xds/agent/syncthing-config", + Home: defaultSTHomeDir, }, }, Log: log, } + c.Log.Infoln("Agent UUID: ", uuid) + // config file settings overwrite default config - c.FileConf, err = updateConfigFromFile(&c, ctx.GlobalString("config")) + err = readGlobalConfig(&c, c.Options.ConfigFile) if err != nil { return nil, err } + // Handle where Logs are redirected: + // default 'stdout' (logfile option default value) + // else use file (or filepath) set by --logfile option + // that may be overwritten by LogsDir field of config file + logF := c.Options.LogFile + logD := c.FileConf.LogsDir + if logF != "stdout" { + if logD != "" { + lf := filepath.Base(logF) + if lf == "" || lf == "." { + lf = "xds-agent.log" + } + logF = filepath.Join(logD, lf) + } else { + logD = filepath.Dir(logF) + } + } + if logD == "" || logD == "." { + logD = "/tmp/xds/logs" + } + c.Options.LogFile = logF + c.FileConf.LogsDir = logD + if c.FileConf.LogsDir != "" && !common.Exists(c.FileConf.LogsDir) { if err := os.MkdirAll(c.FileConf.LogsDir, 0770); err != nil { return nil, fmt.Errorf("Cannot create logs dir: %v", err) } } + + c.Log.Infoln("Logs file: ", c.Options.LogFile) c.Log.Infoln("Logs directory: ", c.FileConf.LogsDir) return &c, nil } - -// UpdateAll Update the current configuration -func (c *Config) UpdateAll(newCfg Config) error { - return fmt.Errorf("Not Supported") -} diff --git a/lib/xdsconfig/configfile.go b/lib/xdsconfig/configfile.go new file mode 100644 index 0000000..e3737f4 --- /dev/null +++ b/lib/xdsconfig/configfile.go @@ -0,0 +1,109 @@ +package xdsconfig + +import ( + "encoding/json" + "os" + "path" + + common "github.com/iotbzh/xds-common/golib" +) + +type SyncThingConf struct { + BinDir string `json:"binDir"` + Home string `json:"home"` + GuiAddress string `json:"gui-address"` + GuiAPIKey string `json:"gui-apikey"` +} + +type XDSServerConf struct { + URL string `json:"url"` + ConnRetry int `json:"connRetry"` + + // private/not exported fields + ID string `json:"-"` + APIBaseURL string `json:"-"` + APIPartialURL string `json:"-"` +} + +type FileConfig struct { + HTTPPort string `json:"httpPort"` + WebAppDir string `json:"webAppDir"` + LogsDir string `json:"logsDir"` + XDSAPIKey string `json:"xds-apikey"` + ServersConf []XDSServerConf `json:"xdsServers"` + SThgConf *SyncThingConf `json:"syncthing"` +} + +// readGlobalConfig reads configuration from a config file. +// Order to determine which config file is used: +// 1/ from command line option: "--config myConfig.json" +// 2/ $HOME/.xds/agent/agent-config.json file +// 3/ /etc/xds-agent/config.json file + +func readGlobalConfig(c *Config, confFile string) error { + + searchIn := make([]string, 0, 3) + if confFile != "" { + searchIn = append(searchIn, confFile) + } + if homeDir := common.GetUserHome(); homeDir != "" { + searchIn = append(searchIn, path.Join(homeDir, ".xds", "agent", "agent-config.json")) + } + + searchIn = append(searchIn, "/etc/xds-agent/agent-config.json") + + var cFile *string + for _, p := range searchIn { + if _, err := os.Stat(p); err == nil { + cFile = &p + break + } + } + if cFile == nil { + c.Log.Infof("No config file found") + return nil + } + + c.Log.Infof("Use config file: %s", *cFile) + + // TODO move on viper package to support comments in JSON and also + // bind with flags (command line options) + // see https://github.com/spf13/viper#working-with-flags + + fd, _ := os.Open(*cFile) + defer fd.Close() + + // Decode config file content and save it in a first variable + fCfg := FileConfig{} + if err := json.NewDecoder(fd).Decode(&fCfg); err != nil { + return err + } + + // Decode config file content and overwrite default settings + fd.Seek(0, 0) + json.NewDecoder(fd).Decode(&c.FileConf) + + // Disable Syncthing support when there is no syncthing field in config + if fCfg.SThgConf == nil { + c.FileConf.SThgConf = nil + } + + // Support environment variables (IOW ${MY_ENV_VAR} syntax) in agent-config.json + vars := []*string{ + &c.FileConf.LogsDir, + &c.FileConf.WebAppDir, + } + if c.FileConf.SThgConf != nil { + vars = append(vars, &c.FileConf.SThgConf.Home, + &c.FileConf.SThgConf.BinDir) + } + for _, field := range vars { + var err error + *field, err = common.ResolveEnvVar(*field) + if err != nil { + return err + } + } + + return nil +} diff --git a/lib/xdsconfig/fileconfig.go b/lib/xdsconfig/fileconfig.go deleted file mode 100644 index d936bbe..0000000 --- a/lib/xdsconfig/fileconfig.go +++ /dev/null @@ -1,96 +0,0 @@ -package xdsconfig - -import ( - "encoding/json" - "os" - "os/user" - "path" - - common "github.com/iotbzh/xds-common/golib" -) - -type SyncThingConf struct { - BinDir string `json:"binDir"` - Home string `json:"home"` - GuiAddress string `json:"gui-address"` - GuiAPIKey string `json:"gui-apikey"` -} - -type FileConfig struct { - HTTPPort string `json:"httpPort"` - LogsDir string `json:"logsDir"` - XDSAPIKey string `json:"xds-apikey"` - SThgConf *SyncThingConf `json:"syncthing"` -} - -// getConfigFromFile reads configuration from a config file. -// Order to determine which config file is used: -// 1/ from command line option: "--config myConfig.json" -// 2/ $HOME/.xds/agent/agent-config.json file -// 3/ /etc/xds-agent/config.json file - -func updateConfigFromFile(c *Config, confFile string) (*FileConfig, error) { - - searchIn := make([]string, 0, 3) - if confFile != "" { - searchIn = append(searchIn, confFile) - } - if usr, err := user.Current(); err == nil { - searchIn = append(searchIn, path.Join(usr.HomeDir, ".xds", "agent", "agent-config.json")) - } - - searchIn = append(searchIn, "/etc/xds-agent/config.json") - - var cFile *string - for _, p := range searchIn { - if _, err := os.Stat(p); err == nil { - cFile = &p - break - } - } - // Use default settings - fCfg := *c.FileConf - - // Read config file when existing - if cFile != nil { - c.Log.Infof("Use config file: %s", *cFile) - - // TODO move on viper package to support comments in JSON and also - // bind with flags (command line options) - // see https://github.com/spf13/viper#working-with-flags - - fd, _ := os.Open(*cFile) - defer fd.Close() - if err := json.NewDecoder(fd).Decode(&fCfg); err != nil { - return nil, err - } - } - - // Support environment variables (IOW ${MY_ENV_VAR} syntax) in agent-config.json - vars := []*string{ - &fCfg.LogsDir, - } - if fCfg.SThgConf != nil { - vars = append(vars, &fCfg.SThgConf.Home, &fCfg.SThgConf.BinDir) - } - for _, field := range vars { - var err error - *field, err = common.ResolveEnvVar(*field) - if err != nil { - return nil, err - } - } - - // Config file settings overwrite default config - if fCfg.HTTPPort != "" { - c.HTTPPort = fCfg.HTTPPort - } - - // Set default apikey - // FIXME - rework with dynamic key - if fCfg.XDSAPIKey == "" { - fCfg.XDSAPIKey = "1234abcezam" - } - - return &fCfg, nil -} |