aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/agent/agent.go132
-rw-r--r--lib/agent/apiv1-browse.go28
-rw-r--r--lib/agent/apiv1-config.go108
-rw-r--r--lib/agent/apiv1-events.go73
-rw-r--r--lib/agent/apiv1-exec.go99
-rw-r--r--lib/agent/apiv1-projects.go72
-rw-r--r--lib/agent/apiv1-version.go45
-rw-r--r--lib/agent/apiv1.go129
-rw-r--r--lib/agent/events.go131
-rw-r--r--lib/agent/project-interface.go47
-rw-r--r--lib/agent/project-pathmap.go79
-rw-r--r--lib/agent/project-st.go93
-rw-r--r--lib/agent/projects.go254
-rw-r--r--lib/agent/session.go (renamed from lib/session/session.go)25
-rw-r--r--lib/agent/webserver.go246
-rw-r--r--lib/agent/xdsserver.go472
-rw-r--r--lib/apiv1/apiv1.go36
-rw-r--r--lib/apiv1/config.go45
-rw-r--r--lib/apiv1/version.go24
-rw-r--r--lib/syncthing/st.go95
-rw-r--r--lib/syncthing/stEvent.go265
-rw-r--r--lib/syncthing/stfolder.go103
-rw-r--r--lib/webserver/server.go226
-rw-r--r--lib/xdsconfig/config.go78
-rw-r--r--lib/xdsconfig/configfile.go112
-rw-r--r--lib/xdsconfig/fileconfig.go111
26 files changed, 2624 insertions, 504 deletions
diff --git a/lib/agent/agent.go b/lib/agent/agent.go
index 74872f7..29b0622 100644
--- a/lib/agent/agent.go
+++ b/lib/agent/agent.go
@@ -2,18 +2,22 @@ package agent
import (
"fmt"
+ "log"
"os"
"os/exec"
"os/signal"
+ "path/filepath"
"syscall"
+ "time"
"github.com/Sirupsen/logrus"
"github.com/codegangsta/cli"
"github.com/iotbzh/xds-agent/lib/syncthing"
"github.com/iotbzh/xds-agent/lib/xdsconfig"
- "github.com/iotbzh/xds-agent/lib/webserver"
)
+const cookieMaxAge = "3600"
+
// Context holds the Agent context structure
type Context struct {
ProgName string
@@ -22,8 +26,14 @@ type Context struct {
SThg *st.SyncThing
SThgCmd *exec.Cmd
SThgInotCmd *exec.Cmd
- WWWServer *webserver.ServerService
- Exit chan os.Signal
+
+ webServer *WebServer
+ xdsServers map[string]*XdsServer
+ sessions *Sessions
+ events *Events
+ projects *Projects
+
+ Exit chan os.Signal
}
// NewAgent Create a new instance of Agent
@@ -48,6 +58,10 @@ func NewAgent(cliCtx *cli.Context) *Context {
ProgName: cliCtx.App.Name,
Log: log,
Exit: make(chan os.Signal, 1),
+
+ webServer: nil,
+ xdsServers: make(map[string]*XdsServer),
+ events: nil,
}
// register handler on SIGTERM / exit
@@ -57,6 +71,114 @@ func NewAgent(cliCtx *cli.Context) *Context {
return &ctx
}
+// Run Main function called to run agent
+func (ctx *Context) Run() (int, error) {
+ var err error
+
+ // Logs redirected into a file when logfile option or logsDir config is set
+ ctx.Config.LogVerboseOut = os.Stderr
+ if ctx.Config.FileConf.LogsDir != "" {
+ if ctx.Config.Options.LogFile != "stdout" {
+ logFile := ctx.Config.Options.LogFile
+
+ fdL, err := os.OpenFile(logFile, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0666)
+ if err != nil {
+ msgErr := fmt.Errorf("Cannot create log file %s", logFile)
+ return int(syscall.EPERM), msgErr
+ }
+ ctx.Log.Out = fdL
+
+ ctx._logPrint("Logging file: %s\n", logFile)
+ }
+
+ logFileHTTPReq := filepath.Join(ctx.Config.FileConf.LogsDir, "xds-agent-verbose.log")
+ fdLH, err := os.OpenFile(logFileHTTPReq, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0666)
+ if err != nil {
+ msgErr := fmt.Errorf("Cannot create log file %s", logFileHTTPReq)
+ return int(syscall.EPERM), msgErr
+ }
+ ctx.Config.LogVerboseOut = fdLH
+
+ ctx._logPrint("Logging file for HTTP requests: %s\n", logFileHTTPReq)
+ }
+
+ // Create syncthing instance when section "syncthing" is present in config.json
+ if ctx.Config.FileConf.SThgConf != nil {
+ ctx.SThg = st.NewSyncThing(ctx.Config, ctx.Log)
+ }
+
+ // Start local instance of Syncthing and Syncthing-notify
+ if ctx.SThg != nil {
+ ctx.Log.Infof("Starting Syncthing...")
+ ctx.SThgCmd, err = ctx.SThg.Start()
+ if err != nil {
+ return 2, err
+ }
+ fmt.Printf("Syncthing started (PID %d)\n", ctx.SThgCmd.Process.Pid)
+
+ ctx.Log.Infof("Starting Syncthing-inotify...")
+ ctx.SThgInotCmd, err = ctx.SThg.StartInotify()
+ if err != nil {
+ return 2, err
+ }
+ fmt.Printf("Syncthing-inotify started (PID %d)\n", ctx.SThgInotCmd.Process.Pid)
+
+ // Establish connection with local Syncthing (retry if connection fail)
+ time.Sleep(3 * time.Second)
+ maxRetry := 30
+ retry := maxRetry
+ for retry > 0 {
+ if err := ctx.SThg.Connect(); err == nil {
+ break
+ }
+ ctx.Log.Infof("Establishing connection to Syncthing (retry %d/%d)", retry, maxRetry)
+ time.Sleep(time.Second)
+ retry--
+ }
+ if err != nil || retry == 0 {
+ return 2, err
+ }
+
+ // Retrieve Syncthing config
+ id, err := ctx.SThg.IDGet()
+ if err != nil {
+ return 2, err
+ }
+ ctx.Log.Infof("Local Syncthing ID: %s", id)
+
+ } else {
+ ctx.Log.Infof("Cloud Sync / Syncthing not supported")
+ }
+
+ // Create Web Server
+ ctx.webServer = NewWebServer(ctx)
+
+ // Sessions manager
+ ctx.sessions = NewClientSessions(ctx, cookieMaxAge)
+
+ // Create events management
+ ctx.events = NewEvents(ctx)
+
+ // Create projects management
+ ctx.projects = NewProjects(ctx, ctx.SThg)
+
+ // Run Web Server until exit requested (blocking call)
+ if err = ctx.webServer.Serve(); err != nil {
+ log.Println(err)
+ return 3, err
+ }
+
+ return 4, fmt.Errorf("Program exited")
+}
+
+// Helper function to log message on both stdout and logger
+func (ctx *Context) _logPrint(format string, args ...interface{}) {
+ fmt.Printf(format, args...)
+ if ctx.Log.Out != os.Stdout {
+ ctx.Log.Infof(format, args...)
+ }
+}
+
// Handle exit and properly stop/close all stuff
func handlerSigTerm(ctx *Context) {
<-ctx.Exit
@@ -68,9 +190,9 @@ func handlerSigTerm(ctx *Context) {
ctx.SThg.Stop()
ctx.SThg.StopInotify()
}
- if ctx.WWWServer != nil {
+ if ctx.webServer != nil {
ctx.Log.Infof("Stoping Web server...")
- ctx.WWWServer.Stop()
+ ctx.webServer.Stop()
}
os.Exit(1)
}
diff --git a/lib/agent/apiv1-browse.go b/lib/agent/apiv1-browse.go
new file mode 100644
index 0000000..1701a2e
--- /dev/null
+++ b/lib/agent/apiv1-browse.go
@@ -0,0 +1,28 @@
+package agent
+
+import (
+ "net/http"
+
+ "github.com/gin-gonic/gin"
+)
+
+type directory struct {
+ Name string `json:"name"`
+ Fullpath string `json:"fullpath"`
+}
+
+type apiDirectory struct {
+ Dir []directory `json:"dir"`
+}
+
+// browseFS used to browse local file system
+func (s *APIService) browseFS(c *gin.Context) {
+
+ response := apiDirectory{
+ Dir: []directory{
+ directory{Name: "TODO SEB"},
+ },
+ }
+
+ c.JSON(http.StatusOK, response)
+}
diff --git a/lib/agent/apiv1-config.go b/lib/agent/apiv1-config.go
new file mode 100644
index 0000000..31d8de6
--- /dev/null
+++ b/lib/agent/apiv1-config.go
@@ -0,0 +1,108 @@
+package agent
+
+import (
+ "net/http"
+ "sync"
+
+ "github.com/gin-gonic/gin"
+ "github.com/iotbzh/xds-agent/lib/xdsconfig"
+ common "github.com/iotbzh/xds-common/golib"
+)
+
+var confMut sync.Mutex
+
+// APIConfig parameters (json format) of /config command
+type APIConfig struct {
+ Servers []ServerCfg `json:"servers"`
+
+ // Not exposed outside in JSON
+ Version string `json:"-"`
+ APIVersion string `json:"-"`
+ VersionGitTag string `json:"-"`
+}
+
+// ServerCfg .
+type ServerCfg struct {
+ ID string `json:"id"`
+ URL string `json:"url"`
+ APIURL string `json:"apiUrl"`
+ PartialURL string `json:"partialUrl"`
+ ConnRetry int `json:"connRetry"`
+ Connected bool `json:"connected"`
+ Disabled bool `json:"disabled"`
+}
+
+// GetConfig returns the configuration
+func (s *APIService) getConfig(c *gin.Context) {
+ confMut.Lock()
+ defer confMut.Unlock()
+
+ cfg := s._getConfig()
+
+ c.JSON(http.StatusOK, cfg)
+}
+
+// SetConfig sets configuration
+func (s *APIService) setConfig(c *gin.Context) {
+ var cfgArg APIConfig
+ if c.BindJSON(&cfgArg) != nil {
+ common.APIError(c, "Invalid arguments")
+ return
+ }
+
+ confMut.Lock()
+ defer confMut.Unlock()
+
+ s.Log.Debugln("SET config: ", cfgArg)
+
+ // First delete/disable XDS Server that are no longer listed
+ for _, svr := range s.xdsServers {
+ found := false
+ for _, svrArg := range cfgArg.Servers {
+ if svr.ID == svrArg.ID {
+ found = true
+ break
+ }
+ }
+ if !found {
+ s.DelXdsServer(svr.ID)
+ }
+ }
+
+ // Add new XDS Server
+ for _, svr := range cfgArg.Servers {
+ cfg := xdsconfig.XDSServerConf{
+ ID: svr.ID,
+ URL: svr.URL,
+ ConnRetry: svr.ConnRetry,
+ }
+ if _, err := s.AddXdsServer(cfg); err != nil {
+ common.APIError(c, err.Error())
+ return
+ }
+ }
+
+ c.JSON(http.StatusOK, s._getConfig())
+}
+
+func (s *APIService) _getConfig() APIConfig {
+ cfg := APIConfig{
+ Version: s.Config.Version,
+ APIVersion: s.Config.APIVersion,
+ VersionGitTag: s.Config.VersionGitTag,
+ Servers: []ServerCfg{},
+ }
+
+ for _, svr := range s.xdsServers {
+ cfg.Servers = append(cfg.Servers, ServerCfg{
+ ID: svr.ID,
+ URL: svr.BaseURL,
+ APIURL: svr.APIURL,
+ PartialURL: svr.PartialURL,
+ ConnRetry: svr.ConnRetry,
+ Connected: svr.Connected,
+ Disabled: svr.Disabled,
+ })
+ }
+ return cfg
+}
diff --git a/lib/agent/apiv1-events.go b/lib/agent/apiv1-events.go
new file mode 100644
index 0000000..8aad18a
--- /dev/null
+++ b/lib/agent/apiv1-events.go
@@ -0,0 +1,73 @@
+package agent
+
+import (
+ "net/http"
+
+ "github.com/gin-gonic/gin"
+ common "github.com/iotbzh/xds-common/golib"
+)
+
+// EventRegisterArgs is the parameters (json format) of /events/register command
+type EventRegisterArgs struct {
+ Name string `json:"name"`
+ ProjectID string `json:"filterProjectID"`
+}
+
+// EventUnRegisterArgs is the parameters (json format) of /events/unregister command
+type EventUnRegisterArgs struct {
+ Name string `json:"name"`
+ ID int `json:"id"`
+}
+
+// eventsList Registering for events that will be send over a WS
+func (s *APIService) eventsList(c *gin.Context) {
+ c.JSON(http.StatusOK, s.events.GetList())
+}
+
+// eventsRegister Registering for events that will be send over a WS
+func (s *APIService) eventsRegister(c *gin.Context) {
+ var args EventRegisterArgs
+
+ if c.BindJSON(&args) != nil || args.Name == "" {
+ common.APIError(c, "Invalid arguments")
+ return
+ }
+
+ sess := s.webServer.sessions.Get(c)
+ if sess == nil {
+ common.APIError(c, "Unknown sessions")
+ return
+ }
+
+ // Register to all or to a specific events
+ if err := s.events.Register(args.Name, sess.ID); err != nil {
+ common.APIError(c, err.Error())
+ return
+ }
+
+ c.JSON(http.StatusOK, gin.H{"status": "OK"})
+}
+
+// eventsRegister Registering for events that will be send over a WS
+func (s *APIService) eventsUnRegister(c *gin.Context) {
+ var args EventUnRegisterArgs
+
+ if c.BindJSON(&args) != nil || args.Name == "" {
+ common.APIError(c, "Invalid arguments")
+ return
+ }
+
+ sess := s.webServer.sessions.Get(c)
+ if sess == nil {
+ common.APIError(c, "Unknown sessions")
+ return
+ }
+
+ // Register to all or to a specific events
+ if err := s.events.UnRegister(args.Name, sess.ID); err != nil {
+ common.APIError(c, err.Error())
+ return
+ }
+
+ c.JSON(http.StatusOK, gin.H{"status": "OK"})
+}
diff --git a/lib/agent/apiv1-exec.go b/lib/agent/apiv1-exec.go
new file mode 100644
index 0000000..83ec7aa
--- /dev/null
+++ b/lib/agent/apiv1-exec.go
@@ -0,0 +1,99 @@
+package agent
+
+import (
+ "encoding/json"
+ "io/ioutil"
+ "net/http"
+
+ "github.com/gin-gonic/gin"
+ common "github.com/iotbzh/xds-common/golib"
+)
+
+// Only define useful fields
+type ExecArgs struct {
+ ID string `json:"id" binding:"required"`
+}
+
+// ExecCmd executes remotely a command
+func (s *APIService) execCmd(c *gin.Context) {
+ s._execRequest("/exec", c)
+}
+
+// execSignalCmd executes remotely a command
+func (s *APIService) execSignalCmd(c *gin.Context) {
+ s._execRequest("/signal", c)
+}
+
+func (s *APIService) _execRequest(url string, c *gin.Context) {
+ data, err := c.GetRawData()
+ if err != nil {
+ common.APIError(c, err.Error())
+ }
+
+ // First get Project ID to retrieve Server ID and send command to right server
+ id := c.Param("id")
+ if id == "" {
+ args := ExecArgs{}
+ // XXX - we cannot use c.BindJSON, so directly unmarshall it
+ // (see https://github.com/gin-gonic/gin/issues/1078)
+ if err := json.Unmarshal(data, &args); err != nil {
+ common.APIError(c, "Invalid arguments")
+ return
+ }
+ id = args.ID
+ }
+ prj := s.projects.Get(id)
+ if prj == nil {
+ common.APIError(c, "Unknown id")
+ return
+ }
+
+ svr := (*prj).GetServer()
+ if svr == nil {
+ common.APIError(c, "Cannot identify XDS Server")
+ return
+ }
+
+ // Retrieve session info
+ sess := s.sessions.Get(c)
+ if sess == nil {
+ common.APIError(c, "Unknown sessions")
+ return
+ }
+ sock := sess.IOSocket
+ if sock == nil {
+ common.APIError(c, "Websocket not established")
+ return
+ }
+
+ // Forward XDSServer WS events to client WS
+ // TODO removed static event name list and get it from XDSServer
+ for _, evName := range []string{
+ "exec:input",
+ "exec:output",
+ "exec:exit",
+ "exec:inferior-input",
+ "exec:inferior-output",
+ } {
+ evN := evName
+ svr.EventOn(evN, func(evData interface{}) {
+ (*sock).Emit(evN, evData)
+ })
+ }
+
+ // Forward back command to right server
+ response, err := svr.HTTPPostBody(url, string(data))
+ if err != nil {
+ common.APIError(c, err.Error())
+ return
+ }
+
+ // Decode response
+ body, err := ioutil.ReadAll(response.Body)
+ if err != nil {
+ common.APIError(c, "Cannot read response body")
+ return
+ }
+ c.JSON(http.StatusOK, string(body))
+
+}
diff --git a/lib/agent/apiv1-projects.go b/lib/agent/apiv1-projects.go
new file mode 100644
index 0000000..d4b5e74
--- /dev/null
+++ b/lib/agent/apiv1-projects.go
@@ -0,0 +1,72 @@
+package agent
+
+import (
+ "net/http"
+
+ "github.com/gin-gonic/gin"
+ common "github.com/iotbzh/xds-common/golib"
+)
+
+// getProjects returns all projects configuration
+func (s *APIService) getProjects(c *gin.Context) {
+ c.JSON(http.StatusOK, s.projects.GetProjectArr())
+}
+
+// getProject returns a specific project configuration
+func (s *APIService) getProject(c *gin.Context) {
+ prj := s.projects.Get(c.Param("id"))
+ if prj == nil {
+ common.APIError(c, "Invalid id")
+ return
+ }
+
+ c.JSON(http.StatusOK, (*prj).GetProject())
+}
+
+// addProject adds a new project to server config
+func (s *APIService) addProject(c *gin.Context) {
+ var cfgArg ProjectConfig
+ if c.BindJSON(&cfgArg) != nil {
+ common.APIError(c, "Invalid arguments")
+ return
+ }
+
+ s.Log.Debugln("Add project config: ", cfgArg)
+
+ newFld, err := s.projects.Add(cfgArg)
+ if err != nil {
+ common.APIError(c, err.Error())
+ return
+ }
+
+ c.JSON(http.StatusOK, newFld)
+}
+
+// syncProject force synchronization of project files
+func (s *APIService) syncProject(c *gin.Context) {
+ id := c.Param("id")
+
+ s.Log.Debugln("Sync project id: ", id)
+
+ err := s.projects.ForceSync(id)
+ if err != nil {
+ common.APIError(c, err.Error())
+ return
+ }
+
+ c.JSON(http.StatusOK, "")
+}
+
+// delProject deletes project from server config
+func (s *APIService) delProject(c *gin.Context) {
+ id := c.Param("id")
+
+ s.Log.Debugln("Delete project id ", id)
+
+ delEntry, err := s.projects.Delete(id)
+ if err != nil {
+ common.APIError(c, err.Error())
+ return
+ }
+ c.JSON(http.StatusOK, delEntry)
+}
diff --git a/lib/agent/apiv1-version.go b/lib/agent/apiv1-version.go
new file mode 100644
index 0000000..c2387c1
--- /dev/null
+++ b/lib/agent/apiv1-version.go
@@ -0,0 +1,45 @@
+package agent
+
+import (
+ "net/http"
+
+ "github.com/gin-gonic/gin"
+ common "github.com/iotbzh/xds-common/golib"
+)
+
+type version struct {
+ ID string `json:"id"`
+ Version string `json:"version"`
+ APIVersion string `json:"apiVersion"`
+ VersionGitTag string `json:"gitTag"`
+}
+
+type apiVersion struct {
+ Client version `json:"client"`
+ Server []version `json:"servers"`
+}
+
+// getInfo : return various information about server
+func (s *APIService) getVersion(c *gin.Context) {
+ response := apiVersion{
+ Client: version{
+ ID: "",
+ Version: s.Config.Version,
+ APIVersion: s.Config.APIVersion,
+ VersionGitTag: s.Config.VersionGitTag,
+ },
+ }
+
+ svrVer := []version{}
+ for _, svr := range s.xdsServers {
+ res := version{}
+ if err := svr.HTTPGet("/version", &res); err != nil {
+ common.APIError(c, "Cannot retrieve version of XDS server ID %s : %v", svr.ID, err.Error())
+ return
+ }
+ svrVer = append(svrVer, res)
+ }
+ response.Server = svrVer
+
+ c.JSON(http.StatusOK, response)
+}
diff --git a/lib/agent/apiv1.go b/lib/agent/apiv1.go
new file mode 100644
index 0000000..77b05ba
--- /dev/null
+++ b/lib/agent/apiv1.go
@@ -0,0 +1,129 @@
+package agent
+
+import (
+ "fmt"
+ "strconv"
+
+ "github.com/gin-gonic/gin"
+ "github.com/iotbzh/xds-agent/lib/xdsconfig"
+)
+
+const apiBaseUrl = "/api/v1"
+
+// APIService .
+type APIService struct {
+ *Context
+ apiRouter *gin.RouterGroup
+ serverIndex int
+}
+
+// NewAPIV1 creates a new instance of API service
+func NewAPIV1(ctx *Context) *APIService {
+ s := &APIService{
+ Context: ctx,
+ apiRouter: ctx.webServer.router.Group(apiBaseUrl),
+ serverIndex: 0,
+ }
+
+ s.apiRouter.GET("/version", s.getVersion)
+
+ s.apiRouter.GET("/config", s.getConfig)
+ s.apiRouter.POST("/config", s.setConfig)
+
+ s.apiRouter.GET("/browse", s.browseFS)
+
+ s.apiRouter.GET("/projects", s.getProjects)
+ s.apiRouter.GET("/project/:id", s.getProject)
+ s.apiRouter.POST("/project", s.addProject)
+ s.apiRouter.POST("/project/sync/:id", s.syncProject)
+ s.apiRouter.DELETE("/project/:id", s.delProject)
+
+ s.apiRouter.POST("/exec", s.execCmd)
+ s.apiRouter.POST("/exec/:id", s.execCmd)
+ s.apiRouter.POST("/signal", s.execSignalCmd)
+
+ s.apiRouter.GET("/events", s.eventsList)
+ s.apiRouter.POST("/events/register", s.eventsRegister)
+ s.apiRouter.POST("/events/unregister", s.eventsUnRegister)
+
+ return s
+}
+
+// Stop Used to stop/close created services
+func (s *APIService) Stop() {
+ for _, svr := range s.xdsServers {
+ svr.Close()
+ }
+}
+
+// AddXdsServer Add a new XDS Server to the list of a server
+func (s *APIService) AddXdsServer(cfg xdsconfig.XDSServerConf) (*XdsServer, error) {
+ var svr *XdsServer
+ var exist, tempoID bool
+ tempoID = false
+
+ // First check if not already exist and update it
+ if svr, exist = s.xdsServers[cfg.ID]; exist {
+
+ // Update: Found, so just update some settings
+ svr.ConnRetry = cfg.ConnRetry
+
+ tempoID = svr.IsTempoID()
+ if svr.Connected && !svr.Disabled && svr.BaseURL == cfg.URL && tempoID {
+ return svr, nil
+ }
+
+ // URL differ or not connected, so need to reconnect
+ svr.BaseURL = cfg.URL
+
+ } else {
+
+ // Create a new server object
+ if cfg.APIBaseURL == "" {
+ cfg.APIBaseURL = apiBaseUrl
+ }
+ if cfg.APIPartialURL == "" {
+ cfg.APIPartialURL = "/server/" + strconv.Itoa(s.serverIndex)
+ s.serverIndex = s.serverIndex + 1
+ }
+
+ // Create a new XDS Server
+ svr = NewXdsServer(s.Context, cfg)
+
+ svr.SetLoggerOutput(s.Config.LogVerboseOut)
+
+ // Passthrough routes (handle by XDS Server)
+ grp := s.apiRouter.Group(svr.PartialURL)
+ svr.SetAPIRouterGroup(grp)
+ svr.PassthroughGet("/sdks")
+ svr.PassthroughGet("/sdk/:id")
+ }
+
+ // Established connection
+ err := svr.Connect()
+
+ // Delete temporary ID with it has been replaced by right Server ID
+ if tempoID && !svr.IsTempoID() {
+ delete(s.xdsServers, cfg.ID)
+ }
+
+ // Add to map
+ s.xdsServers[svr.ID] = svr
+
+ // Load projects
+ if err == nil && svr.Connected {
+ err = s.projects.Init(svr)
+ }
+
+ return svr, err
+}
+
+// DelXdsServer Delete an XDS Server from the list of a server
+func (s *APIService) DelXdsServer(id string) error {
+ if _, exist := s.xdsServers[id]; !exist {
+ return fmt.Errorf("Unknown Server ID %s", id)
+ }
+ // Don't really delete, just disable it
+ s.xdsServers[id].Close()
+ return nil
+}
diff --git a/lib/agent/events.go b/lib/agent/events.go
new file mode 100644
index 0000000..24efc5a
--- /dev/null
+++ b/lib/agent/events.go
@@ -0,0 +1,131 @@
+package agent
+
+import (
+ "fmt"
+ "time"
+)
+
+// Events constants
+const (
+ // EventTypePrefix Used as event prefix
+ EventTypePrefix = "event:" // following by event type
+
+ // Supported Events type
+ EVTAll = "all"
+ EVTServerConfig = "server-config" // data type ServerCfg
+ EVTProjectAdd = "project-add" // data type ProjectConfig
+ EVTProjectDelete = "project-delete" // data type ProjectConfig
+ EVTProjectChange = "project-state-change" // data type ProjectConfig
+)
+
+var EVTAllList = []string{
+ EVTServerConfig,
+ EVTProjectAdd,
+ EVTProjectDelete,
+ EVTProjectChange,
+}
+
+// EventMsg Message send
+type EventMsg struct {
+ Time string `json:"time"`
+ Type string `json:"type"`
+ Data interface{} `json:"data"`
+}
+
+type EventDef struct {
+ // SEB cbs []EventsCB
+ sids map[string]int
+}
+
+type Events struct {
+ *Context
+ eventsMap map[string]*EventDef
+}
+
+// NewEvents creates an instance of Events
+func NewEvents(ctx *Context) *Events {
+ evMap := make(map[string]*EventDef)
+ for _, ev := range EVTAllList {
+ evMap[ev] = &EventDef{
+ sids: make(map[string]int),
+ }
+ }
+ return &Events{
+ Context: ctx,
+ eventsMap: evMap,
+ }
+}
+
+// GetList returns the list of all supported events
+func (e *Events) GetList() []string {
+ return EVTAllList
+}
+
+// Register Used by a client/session to register to a specific (or all) event(s)
+func (e *Events) Register(evName, sessionID string) error {
+ evs := EVTAllList
+ if evName != EVTAll {
+ if _, ok := e.eventsMap[evName]; !ok {
+ return fmt.Errorf("Unsupported event type name")
+ }
+ evs = []string{evName}
+ }
+ for _, ev := range evs {
+ e.eventsMap[ev].sids[sessionID]++
+ }
+ return nil
+}
+
+// UnRegister Used by a client/session to unregister event(s)
+func (e *Events) UnRegister(evName, sessionID string) error {
+ evs := EVTAllList
+ if evName != EVTAll {
+ if _, ok := e.eventsMap[evName]; !ok {
+ return fmt.Errorf("Unsupported event type name")
+ }
+ evs = []string{evName}
+ }
+ for _, ev := range evs {
+ if _, exist := e.eventsMap[ev].sids[sessionID]; exist {
+ delete(e.eventsMap[ev].sids, sessionID)
+ break
+ }
+ }
+ return nil
+}
+
+// Emit Used to manually emit an event
+func (e *Events) Emit(evName string, data interface{}) error {
+ var firstErr error
+
+ if _, ok := e.eventsMap[evName]; !ok {
+ return fmt.Errorf("Unsupported event type")
+ }
+
+ e.Log.Debugf("Emit Event %s: %v", evName, data)
+
+ firstErr = nil
+ evm := e.eventsMap[evName]
+ for sid := range evm.sids {
+ so := e.webServer.sessions.IOSocketGet(sid)
+ if so == nil {
+ if firstErr == nil {
+ firstErr = fmt.Errorf("IOSocketGet return nil")
+ }
+ continue
+ }
+ msg := EventMsg{
+ Time: time.Now().String(),
+ Type: evName,
+ Data: data,
+ }
+ if err := (*so).Emit(EventTypePrefix+evName, msg); err != nil {
+ e.Log.Errorf("WS Emit %v error : %v", EventTypePrefix+evName, err)
+ if firstErr == nil {
+ firstErr = err
+ }
+ }
+ }
+
+ return firstErr
+}
diff --git a/lib/agent/project-interface.go b/lib/agent/project-interface.go
new file mode 100644
index 0000000..031e1d9
--- /dev/null
+++ b/lib/agent/project-interface.go
@@ -0,0 +1,47 @@
+package agent
+
+// ProjectType definition
+type ProjectType string
+
+const (
+ TypePathMap = "PathMap"
+ TypeCloudSync = "CloudSync"
+ TypeCifsSmb = "CIFS"
+)
+
+// Project Status definition
+const (
+ StatusErrorConfig = "ErrorConfig"
+ StatusDisable = "Disable"
+ StatusEnable = "Enable"
+ StatusPause = "Pause"
+ StatusSyncing = "Syncing"
+)
+
+type EventCBData map[string]interface{}
+type EventCB func(cfg *ProjectConfig, data *EventCBData)
+
+// IPROJECT Project interface
+type IPROJECT interface {
+ Add(cfg ProjectConfig) (*ProjectConfig, error) // Add a new project
+ Delete() error // Delete a project
+ GetProject() *ProjectConfig // Get project public configuration
+ SetProject(prj ProjectConfig) *ProjectConfig // Set project configuration
+ GetServer() *XdsServer // Get XdsServer that holds this project
+ GetFullPath(dir string) string // Get project full path
+ Sync() error // Force project files synchronization
+ IsInSync() (bool, error) // Check if project files are in-sync
+}
+
+// ProjectConfig is the config for one project
+type ProjectConfig struct {
+ ID string `json:"id"`
+ ServerID string `json:"serverId"`
+ Label string `json:"label"`
+ ClientPath string `json:"clientPath"`
+ ServerPath string `json:"serverPath"`
+ Type ProjectType `json:"type"`
+ Status string `json:"status"`
+ IsInSync bool `json:"isInSync"`
+ DefaultSdk string `json:"defaultSdk"`
+}
diff --git a/lib/agent/project-pathmap.go b/lib/agent/project-pathmap.go
new file mode 100644
index 0000000..1de8e11
--- /dev/null
+++ b/lib/agent/project-pathmap.go
@@ -0,0 +1,79 @@
+package agent
+
+import (
+ "path/filepath"
+)
+
+// IPROJECT interface implementation for native/path mapping projects
+
+// PathMap .
+type PathMap struct {
+ *Context
+ server *XdsServer
+ folder *FolderConfig
+}
+
+// NewProjectPathMap Create a new instance of PathMap
+func NewProjectPathMap(ctx *Context, svr *XdsServer) *PathMap {
+ p := PathMap{
+ Context: ctx,
+ server: svr,
+ folder: &FolderConfig{},
+ }
+ return &p
+}
+
+// Add a new project
+func (p *PathMap) Add(cfg ProjectConfig) (*ProjectConfig, error) {
+ var err error
+
+ // SEB TODO: check local/server directory access
+
+ err = p.server.FolderAdd(p.server.ProjectToFolder(cfg), p.folder)
+ if err != nil {
+ return nil, err
+ }
+
+ return p.GetProject(), nil
+}
+
+// Delete a project
+func (p *PathMap) Delete() error {
+ return p.server.FolderDelete(p.folder.ID)
+}
+
+// GetProject Get public part of project config
+func (p *PathMap) GetProject() *ProjectConfig {
+ prj := p.server.FolderToProject(*p.folder)
+ prj.ServerID = p.server.ID
+ return &prj
+}
+
+// SetProject Set project config
+func (p *PathMap) SetProject(prj ProjectConfig) *ProjectConfig {
+ p.folder = p.server.ProjectToFolder(prj)
+ return p.GetProject()
+}
+
+// GetServer Get the XdsServer that holds this project
+func (p *PathMap) GetServer() *XdsServer {
+ return p.server
+}
+
+// GetFullPath returns the full path of a directory (from server POV)
+func (p *PathMap) GetFullPath(dir string) string {
+ if &dir == nil {
+ return p.folder.DataPathMap.ServerPath
+ }
+ return filepath.Join(p.folder.DataPathMap.ServerPath, dir)
+}
+
+// Sync Force project files synchronization
+func (p *PathMap) Sync() error {
+ return nil
+}
+
+// IsInSync Check if project files are in-sync
+func (p *PathMap) IsInSync() (bool, error) {
+ return true, nil
+}
diff --git a/lib/agent/project-st.go b/lib/agent/project-st.go
new file mode 100644
index 0000000..28a287c
--- /dev/null
+++ b/lib/agent/project-st.go
@@ -0,0 +1,93 @@
+package agent
+
+import "github.com/iotbzh/xds-agent/lib/syncthing"
+
+// SEB TODO
+
+// IPROJECT interface implementation for syncthing projects
+
+// STProject .
+type STProject struct {
+ *Context
+ server *XdsServer
+ folder *FolderConfig
+}
+
+// NewProjectST Create a new instance of STProject
+func NewProjectST(ctx *Context, svr *XdsServer) *STProject {
+ p := STProject{
+ Context: ctx,
+ server: svr,
+ folder: &FolderConfig{},
+ }
+ return &p
+}
+
+// Add a new project
+func (p *STProject) Add(cfg ProjectConfig) (*ProjectConfig, error) {
+ var err error
+
+ err = p.server.FolderAdd(p.server.ProjectToFolder(cfg), p.folder)
+ if err != nil {
+ return nil, err
+ }
+ svrPrj := p.GetProject()
+
+ // Declare project into local Syncthing
+ p.SThg.FolderChange(st.FolderChangeArg{
+ ID: cfg.ID,
+ Label: cfg.Label,
+ RelativePath: cfg.ClientPath,
+ SyncThingID: p.server.ServerConfig.Builder.SyncThingID,
+ })
+
+ return svrPrj, nil
+}
+
+// Delete a project
+func (p *STProject) Delete() error {
+ return p.server.FolderDelete(p.folder.ID)
+}
+
+// GetProject Get public part of project config
+func (p *STProject) GetProject() *ProjectConfig {
+ prj := p.server.FolderToProject(*p.folder)
+ prj.ServerID = p.server.ID
+ return &prj
+}
+
+// SetProject Set project config
+func (p *STProject) SetProject(prj ProjectConfig) *ProjectConfig {
+ // SEB TODO
+ p.folder = p.server.ProjectToFolder(prj)
+ return p.GetProject()
+}
+
+// GetServer Get the XdsServer that holds this project
+func (p *STProject) GetServer() *XdsServer {
+ // SEB TODO
+ return p.server
+}
+
+// GetFullPath returns the full path of a directory (from server POV)
+func (p *STProject) GetFullPath(dir string) string {
+ /* SEB
+ if &dir == nil {
+ return p.folder.DataSTProject.ServerPath
+ }
+ return filepath.Join(p.folder.DataSTProject.ServerPath, dir)
+ */
+ return "SEB TODO"
+}
+
+// Sync Force project files synchronization
+func (p *STProject) Sync() error {
+ // SEB TODO
+ return nil
+}
+
+// IsInSync Check if project files are in-sync
+func (p *STProject) IsInSync() (bool, error) {
+ // SEB TODO
+ return false, nil
+}
diff --git a/lib/agent/projects.go b/lib/agent/projects.go
new file mode 100644
index 0000000..39c120f
--- /dev/null
+++ b/lib/agent/projects.go
@@ -0,0 +1,254 @@
+package agent
+
+import (
+ "fmt"
+ "log"
+ "time"
+
+ "github.com/iotbzh/xds-agent/lib/syncthing"
+ "github.com/syncthing/syncthing/lib/sync"
+)
+
+// Projects Represent a an XDS Projects
+type Projects struct {
+ *Context
+ SThg *st.SyncThing
+ projects map[string]*IPROJECT
+ //SEB registerCB []RegisteredCB
+}
+
+/* SEB
+type RegisteredCB struct {
+ cb *EventCB
+ data *EventCBData
+}
+*/
+
+// Mutex to make add/delete atomic
+var pjMutex = sync.NewMutex()
+
+// NewProjects Create a new instance of Project Model
+func NewProjects(ctx *Context, st *st.SyncThing) *Projects {
+ return &Projects{
+ Context: ctx,
+ SThg: st,
+ projects: make(map[string]*IPROJECT),
+ //registerCB: []RegisteredCB{},
+ }
+}
+
+// Init Load Projects configuration
+func (p *Projects) Init(server *XdsServer) error {
+ svrList := make(map[string]*XdsServer)
+ // If server not set, load for all servers
+ if server == nil {
+ svrList = p.xdsServers
+ } else {
+ svrList[server.ID] = server
+ }
+ errMsg := ""
+ for _, svr := range svrList {
+ if svr.Disabled {
+ continue
+ }
+ xFlds := []FolderConfig{}
+ if err := svr.HTTPGet("/folders", &xFlds); err != nil {
+ errMsg += fmt.Sprintf("Cannot retrieve folders config of XDS server ID %s : %v \n", svr.ID, err.Error())
+ continue
+ }
+ p.Log.Debugf("Server %s, %d projects detected", svr.ID[:8], len(xFlds))
+ for _, prj := range xFlds {
+ newP := svr.FolderToProject(prj)
+ if /*nPrj*/ _, err := p.createUpdate(newP, false, true); err != nil {
+ errMsg += "Error while creating project id " + prj.ID + ": " + err.Error() + "\n "
+ continue
+ }
+
+ /* FIXME emit EVTProjectChange event ?
+ if err := p.events.Emit(EVTProjectChange, *nPrj); err != nil {
+ p.Log.Warningf("Cannot notify project change: %v", err)
+ }
+ */
+ }
+
+ }
+
+ p.Log.Infof("Number of loaded Projects: %d", len(p.projects))
+
+ if errMsg != "" {
+ return fmt.Errorf(errMsg)
+ }
+ return nil
+}
+
+// Get returns the folder config or nil if not existing
+func (p *Projects) Get(id string) *IPROJECT {
+ if id == "" {
+ return nil
+ }
+ fc, exist := p.projects[id]
+ if !exist {
+ return nil
+ }
+ return fc
+}
+
+// GetProjectArr returns the config of all folders as an array
+func (p *Projects) GetProjectArr() []ProjectConfig {
+ pjMutex.Lock()
+ defer pjMutex.Unlock()
+
+ return p.GetProjectArrUnsafe()
+}
+
+// GetProjectArrUnsafe Same as GetProjectArr without mutex protection
+func (p *Projects) GetProjectArrUnsafe() []ProjectConfig {
+ conf := []ProjectConfig{}
+ for _, v := range p.projects {
+ prj := (*v).GetProject()
+ conf = append(conf, *prj)
+ }
+ return conf
+}
+
+// Add adds a new folder
+func (p *Projects) Add(newF ProjectConfig) (*ProjectConfig, error) {
+ prj, err := p.createUpdate(newF, true, false)
+ if err != nil {
+ return prj, err
+ }
+
+ // Notify client with event
+ if err := p.events.Emit(EVTProjectAdd, *prj); err != nil {
+ p.Log.Warningf("Cannot notify project deletion: %v", err)
+ }
+
+ return prj, err
+}
+
+// CreateUpdate creates or update a folder
+func (p *Projects) createUpdate(newF ProjectConfig, create bool, initial bool) (*ProjectConfig, error) {
+ var err error
+
+ pjMutex.Lock()
+ defer pjMutex.Unlock()
+
+ // Sanity check
+ if _, exist := p.projects[newF.ID]; create && exist {
+ return nil, fmt.Errorf("ID already exists")
+ }
+ if newF.ClientPath == "" {
+ return nil, fmt.Errorf("ClientPath must be set")
+ }
+ if newF.ServerID == "" {
+ return nil, fmt.Errorf("Server ID must be set")
+ }
+ var svr *XdsServer
+ var exist bool
+ if svr, exist = p.xdsServers[newF.ServerID]; !exist {
+ return nil, fmt.Errorf("Unknown Server ID %s", newF.ServerID)
+ }
+
+ // Check type supported
+ b, exist := svr.ServerConfig.SupportedSharing[string(newF.Type)]
+ if !exist || !b {
+ return nil, fmt.Errorf("Server doesn't support project type %s", newF.Type)
+ }
+
+ // Create a new folder object
+ var fld IPROJECT
+ switch newF.Type {
+ // SYNCTHING
+ case TypeCloudSync:
+ if p.SThg != nil {
+ /*SEB fld = f.SThg.NewFolderST(f.Conf)*/
+ fld = NewProjectST(p.Context, svr)
+ } else {
+ return nil, fmt.Errorf("Cloud Sync project not supported")
+ }
+
+ // PATH MAP
+ case TypePathMap:
+ fld = NewProjectPathMap(p.Context, svr)
+ default:
+ return nil, fmt.Errorf("Unsupported folder type")
+ }
+
+ var newPrj *ProjectConfig
+ if create {
+ // Add project on server
+ if newPrj, err = fld.Add(newF); err != nil {
+ newF.Status = StatusErrorConfig
+ log.Printf("ERROR Adding folder: %v\n", err)
+ return newPrj, err
+ }
+ } else {
+ // Just update project config
+ newPrj = fld.SetProject(newF)
+ }
+
+ // Sanity check
+ if newPrj.ID == "" {
+ log.Printf("ERROR project ID empty: %v", newF)
+ return newPrj, fmt.Errorf("Project ID empty")
+ }
+
+ // Add to folders list
+ p.projects[newPrj.ID] = &fld
+
+ // Force sync after creation
+ // (need to defer to be sure that WS events will arrive after HTTP creation reply)
+ go func() {
+ time.Sleep(time.Millisecond * 500)
+ fld.Sync()
+ }()
+
+ return newPrj, nil
+}
+
+// Delete deletes a specific folder
+func (p *Projects) Delete(id string) (ProjectConfig, error) {
+ var err error
+
+ pjMutex.Lock()
+ defer pjMutex.Unlock()
+
+ fld := ProjectConfig{}
+ fc, exist := p.projects[id]
+ if !exist {
+ return fld, fmt.Errorf("unknown id")
+ }
+
+ prj := (*fc).GetProject()
+
+ if err = (*fc).Delete(); err != nil {
+ return *prj, err
+ }
+
+ delete(p.projects, id)
+
+ // Notify client with event
+ if err := p.events.Emit(EVTProjectDelete, *prj); err != nil {
+ p.Log.Warningf("Cannot notify project deletion: %v", err)
+ }
+
+ return *prj, err
+}
+
+// ForceSync Force the synchronization of a folder
+func (p *Projects) ForceSync(id string) error {
+ fc := p.Get(id)
+ if fc == nil {
+ return fmt.Errorf("Unknown id")
+ }
+ return (*fc).Sync()
+}
+
+// IsProjectInSync Returns true when folder is in sync
+func (p *Projects) IsProjectInSync(id string) (bool, error) {
+ fc := p.Get(id)
+ if fc == nil {
+ return false, fmt.Errorf("Unknown id")
+ }
+ return (*fc).IsInSync()
+}
diff --git a/lib/session/session.go b/lib/agent/session.go
index b56f9ff..e50abe1 100644
--- a/lib/session/session.go
+++ b/lib/agent/session.go
@@ -1,11 +1,10 @@
-package session
+package agent
import (
"encoding/base64"
"strconv"
"time"
- "github.com/Sirupsen/logrus"
"github.com/gin-gonic/gin"
"github.com/googollee/go-socket.io"
uuid "github.com/satori/go.uuid"
@@ -36,29 +35,27 @@ type ClientSession struct {
// Sessions holds client sessions
type Sessions struct {
- router *gin.Engine
+ *Context
cookieMaxAge int64
sessMap map[string]ClientSession
mutex sync.Mutex
- log *logrus.Logger
stop chan struct{} // signals intentional stop
}
// NewClientSessions .
-func NewClientSessions(router *gin.Engine, log *logrus.Logger, cookieMaxAge string) *Sessions {
+func NewClientSessions(ctx *Context, cookieMaxAge string) *Sessions {
ckMaxAge, err := strconv.ParseInt(cookieMaxAge, 10, 0)
if err != nil {
ckMaxAge = 0
}
s := Sessions{
- router: router,
+ Context: ctx,
cookieMaxAge: ckMaxAge,
sessMap: make(map[string]ClientSession),
mutex: sync.NewMutex(),
- log: log,
stop: make(chan struct{}),
}
- s.router.Use(s.Middleware())
+ s.webServer.router.Use(s.Middleware())
// Start monitoring of sessions Map (use to manage expiration and cleanup)
go s.monitorSessMap()
@@ -174,7 +171,7 @@ func (s *Sessions) newSession(prefix string) *ClientSession {
s.sessMap[se.ID] = se
- s.log.Debugf("NEW session (%d): %s", len(s.sessMap), id)
+ s.Log.Debugf("NEW session (%d): %s", len(s.sessMap), id)
return &se
}
@@ -202,22 +199,22 @@ func (s *Sessions) monitorSessMap() {
for {
select {
case <-s.stop:
- s.log.Debugln("Stop monitorSessMap")
+ s.Log.Debugln("Stop monitorSessMap")
return
case <-time.After(sessionMonitorTime * time.Second):
if dbgFullTrace {
- s.log.Debugf("Sessions Map size: %d", len(s.sessMap))
- s.log.Debugf("Sessions Map : %v", s.sessMap)
+ s.Log.Debugf("Sessions Map size: %d", len(s.sessMap))
+ s.Log.Debugf("Sessions Map : %v", s.sessMap)
}
if len(s.sessMap) > maxSessions {
- s.log.Errorln("TOO MUCH sessions, cleanup old ones !")
+ s.Log.Errorln("TOO MUCH sessions, cleanup old ones !")
}
s.mutex.Lock()
for _, ss := range s.sessMap {
if ss.expireAt.Sub(time.Now()) < 0 {
- s.log.Debugf("Delete expired session id: %s", ss.ID)
+ //SEB DEBUG s.Log.Debugf("Delete expired session id: %s", ss.ID)
delete(s.sessMap, ss.ID)
}
}
diff --git a/lib/agent/webserver.go b/lib/agent/webserver.go
new file mode 100644
index 0000000..ead06d1
--- /dev/null
+++ b/lib/agent/webserver.go
@@ -0,0 +1,246 @@
+package agent
+
+import (
+ "fmt"
+ "log"
+ "net/http"
+ "os"
+ "path"
+
+ "github.com/Sirupsen/logrus"
+ "github.com/gin-contrib/static"
+ "github.com/gin-gonic/gin"
+ "github.com/googollee/go-socket.io"
+)
+
+// WebServer .
+type WebServer struct {
+ *Context
+ router *gin.Engine
+ api *APIService
+ sIOServer *socketio.Server
+ webApp *gin.RouterGroup
+ stop chan struct{} // signals intentional stop
+}
+
+const indexFilename = "index.html"
+
+// NewWebServer creates an instance of WebServer
+func NewWebServer(ctx *Context) *WebServer {
+
+ // Setup logging for gin router
+ if ctx.Log.Level == logrus.DebugLevel {
+ gin.SetMode(gin.DebugMode)
+ } else {
+ gin.SetMode(gin.ReleaseMode)
+ }
+
+ // Redirect gin logs into another logger (LogVerboseOut may be stderr or a file)
+ gin.DefaultWriter = ctx.Config.LogVerboseOut
+ gin.DefaultErrorWriter = ctx.Config.LogVerboseOut
+ log.SetOutput(ctx.Config.LogVerboseOut)
+
+ // Creates gin router
+ r := gin.New()
+
+ svr := &WebServer{
+ Context: ctx,
+ router: r,
+ api: nil,
+ sIOServer: nil,
+ webApp: nil,
+ stop: make(chan struct{}),
+ }
+
+ return svr
+}
+
+// Serve starts a new instance of the Web Server
+func (s *WebServer) Serve() error {
+ var err error
+
+ // Setup middlewares
+ s.router.Use(gin.Logger())
+ s.router.Use(gin.Recovery())
+ s.router.Use(s.middlewareCORS())
+ s.router.Use(s.middlewareXDSDetails())
+ s.router.Use(s.middlewareCSRF())
+
+ // Create REST API
+ s.api = NewAPIV1(s.Context)
+
+ // Create connections to XDS Servers
+ // XXX - not sure there is no side effect to do it in background !
+ go func() {
+ for _, svrCfg := range s.Config.FileConf.ServersConf {
+ if svr, err := s.api.AddXdsServer(svrCfg); err != nil {
+ // Just log error, don't consider as critical
+ s.Log.Infof("Cannot connect to XDS Server url=%s: %v", svr.BaseURL, err.Error())
+ }
+ }
+ }()
+
+ // Websocket routes
+ s.sIOServer, err = socketio.NewServer(nil)
+ if err != nil {
+ s.Log.Fatalln(err)
+ }
+
+ s.router.GET("/socket.io/", s.socketHandler)
+ s.router.POST("/socket.io/", s.socketHandler)
+ /* TODO: do we want to support ws://... ?
+ s.router.Handle("WS", "/socket.io/", s.socketHandler)
+ s.router.Handle("WSS", "/socket.io/", s.socketHandler)
+ */
+
+ // Web Application (serve on / )
+ idxFile := path.Join(s.Config.FileConf.WebAppDir, indexFilename)
+ if _, err := os.Stat(idxFile); err != nil {
+ s.Log.Fatalln("Web app directory not found, check/use webAppDir setting in config file: ", idxFile)
+ }
+ s.Log.Infof("Serve WEB app dir: %s", s.Config.FileConf.WebAppDir)
+ s.router.Use(static.Serve("/", static.LocalFile(s.Config.FileConf.WebAppDir, true)))
+ s.webApp = s.router.Group("/", s.serveIndexFile)
+ {
+ s.webApp.GET("/")
+ }
+
+ // Serve in the background
+ serveError := make(chan error, 1)
+ go func() {
+ fmt.Printf("Web Server running on localhost:%s ...\n", s.Config.FileConf.HTTPPort)
+ serveError <- http.ListenAndServe(":"+s.Config.FileConf.HTTPPort, s.router)
+ }()
+
+ fmt.Printf("XDS agent running...\n")
+
+ // Wait for stop, restart or error signals
+ select {
+ case <-s.stop:
+ // Shutting down permanently
+ s.sessions.Stop()
+ s.Log.Infoln("shutting down (stop)")
+ case err = <-serveError:
+ // Error due to listen/serve failure
+ s.Log.Errorln(err)
+ }
+
+ return nil
+}
+
+// Stop web server
+func (s *WebServer) Stop() {
+ s.api.Stop()
+ close(s.stop)
+}
+
+// serveIndexFile provides initial file (eg. index.html) of webapp
+func (s *WebServer) serveIndexFile(c *gin.Context) {
+ c.HTML(200, indexFilename, gin.H{})
+}
+
+// Add details in Header
+func (s *WebServer) middlewareXDSDetails() gin.HandlerFunc {
+ return func(c *gin.Context) {
+ c.Header("XDS-Agent-Version", s.Config.Version)
+ c.Header("XDS-API-Version", s.Config.APIVersion)
+ c.Next()
+ }
+}
+
+/* SEB
+func (s *WebServer) isValidAPIKey(key string) bool {
+ return (key == s.Config.FileConf.XDSAPIKey && key != "")
+}
+*/
+
+func (s *WebServer) middlewareCSRF() gin.HandlerFunc {
+ return func(c *gin.Context) {
+ // XXX - not used for now
+ c.Next()
+ return
+ /*
+ // Allow requests carrying a valid API key
+ if s.isValidAPIKey(c.Request.Header.Get("X-API-Key")) {
+ // Set the access-control-allow-origin header for CORS requests
+ // since a valid API key has been provided
+ c.Header("Access-Control-Allow-Origin", "*")
+ c.Next()
+ return
+ }
+
+ // Allow io.socket request
+ if strings.HasPrefix(c.Request.URL.Path, "/socket.io") {
+ c.Next()
+ return
+ }
+
+ // FIXME Add really CSRF support
+
+ // Allow requests for anything not under the protected path prefix,
+ // and set a CSRF cookie if there isn't already a valid one.
+ //if !strings.HasPrefix(c.Request.URL.Path, prefix) {
+ // cookie, err := c.Cookie("CSRF-Token-" + unique)
+ // if err != nil || !validCsrfToken(cookie.Value) {
+ // s.Log.Debugln("new CSRF cookie in response to request for", c.Request.URL)
+ // c.SetCookie("CSRF-Token-"+unique, newCsrfToken(), 600, "/", "", false, false)
+ // }
+ // c.Next()
+ // return
+ //}
+
+ // Verify the CSRF token
+ //token := c.Request.Header.Get("X-CSRF-Token-" + unique)
+ //if !validCsrfToken(token) {
+ // c.AbortWithError(403, "CSRF Error")
+ // return
+ //}
+
+ //c.Next()
+
+ c.AbortWithError(403, fmt.Errorf("Not valid API key"))
+ */
+ }
+}
+
+// CORS middleware
+func (s *WebServer) middlewareCORS() gin.HandlerFunc {
+ return func(c *gin.Context) {
+ if c.Request.Method == "OPTIONS" {
+ c.Header("Access-Control-Allow-Origin", "*")
+ c.Header("Access-Control-Allow-Headers", "Content-Type, X-API-Key")
+ c.Header("Access-Control-Allow-Methods", "GET, POST, DELETE")
+ c.Header("Access-Control-Max-Age", cookieMaxAge)
+ c.AbortWithStatus(204)
+ return
+ }
+ c.Next()
+ }
+}
+
+// socketHandler is the handler for the "main" websocket connection
+func (s *WebServer) socketHandler(c *gin.Context) {
+
+ // Retrieve user session
+ sess := s.sessions.Get(c)
+ if sess == nil {
+ c.JSON(500, gin.H{"error": "Cannot retrieve session"})
+ return
+ }
+
+ s.sIOServer.On("connection", func(so socketio.Socket) {
+ s.Log.Debugf("WS Connected (SID=%v)", so.Id())
+ s.sessions.UpdateIOSocket(sess.ID, &so)
+
+ so.On("disconnection", func() {
+ s.Log.Debugf("WS disconnected (SID=%v)", so.Id())
+ s.sessions.UpdateIOSocket(sess.ID, nil)
+ })
+ })
+
+ s.sIOServer.On("error", func(so socketio.Socket, err error) {
+ s.Log.Errorf("WS SID=%v Error : %v", so.Id(), err.Error())
+ })
+
+ s.sIOServer.ServeHTTP(c.Writer, c.Request)
+}
diff --git a/lib/agent/xdsserver.go b/lib/agent/xdsserver.go
new file mode 100644
index 0000000..014415f
--- /dev/null
+++ b/lib/agent/xdsserver.go
@@ -0,0 +1,472 @@
+package agent
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "strings"
+ "time"
+
+ "github.com/gin-gonic/gin"
+ "github.com/iotbzh/xds-agent/lib/xdsconfig"
+ common "github.com/iotbzh/xds-common/golib"
+ uuid "github.com/satori/go.uuid"
+ sio_client "github.com/zhouhui8915/go-socket.io-client"
+)
+
+// Server .
+type XdsServer struct {
+ *Context
+ ID string
+ BaseURL string
+ APIURL string
+ PartialURL string
+ ConnRetry int
+ Connected bool
+ Disabled bool
+ ServerConfig *xdsServerConfig
+
+ // callbacks
+ CBOnError func(error)
+ CBOnDisconnect func(error)
+
+ // Private fields
+ client *common.HTTPClient
+ ioSock *sio_client.Client
+ logOut io.Writer
+ apiRouter *gin.RouterGroup
+}
+
+// xdsServerConfig Data return by GET /config
+type xdsServerConfig struct {
+ ID string `json:"id"`
+ Version string `json:"version"`
+ APIVersion string `json:"apiVersion"`
+ VersionGitTag string `json:"gitTag"`
+ SupportedSharing map[string]bool `json:"supportedSharing"`
+ Builder xdsBuilderConfig `json:"builder"`
+}
+
+// xdsBuilderConfig represents the builder container configuration
+type xdsBuilderConfig struct {
+ IP string `json:"ip"`
+ Port string `json:"port"`
+ SyncThingID string `json:"syncThingID"`
+}
+
+// FolderType XdsServer folder type
+type FolderType string
+
+const (
+ XdsTypePathMap = "PathMap"
+ XdsTypeCloudSync = "CloudSync"
+ XdsTypeCifsSmb = "CIFS"
+)
+
+// FolderConfig XdsServer folder config
+type FolderConfig struct {
+ ID string `json:"id"`
+ Label string `json:"label"`
+ ClientPath string `json:"path"`
+ Type FolderType `json:"type"`
+ Status string `json:"status"`
+ IsInSync bool `json:"isInSync"`
+ DefaultSdk string `json:"defaultSdk"`
+ // Specific data depending on which Type is used
+ DataPathMap PathMapConfig `json:"dataPathMap,omitempty"`
+ DataCloudSync CloudSyncConfig `json:"dataCloudSync,omitempty"`
+}
+
+// PathMapConfig Path mapping specific data
+type PathMapConfig struct {
+ ServerPath string `json:"serverPath"`
+}
+
+// CloudSyncConfig CloudSync (AKA Syncthing) specific data
+type CloudSyncConfig struct {
+ SyncThingID string `json:"syncThingID"`
+}
+
+const _IDTempoPrefix = "tempo-"
+
+// NewXdsServer creates an instance of XdsServer
+func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
+ return &XdsServer{
+ Context: ctx,
+ ID: _IDTempoPrefix + uuid.NewV1().String(),
+ BaseURL: conf.URL,
+ APIURL: conf.APIBaseURL + conf.APIPartialURL,
+ PartialURL: conf.APIPartialURL,
+ ConnRetry: conf.ConnRetry,
+ Connected: false,
+ Disabled: false,
+
+ logOut: ctx.Log.Out,
+ }
+}
+
+// Close Free and close XDS Server connection
+func (xs *XdsServer) Close() error {
+ xs.Connected = false
+ xs.Disabled = true
+ xs.ioSock = nil
+ xs._NotifyState()
+ return nil
+}
+
+// Connect Establish HTTP connection with XDS Server
+func (xs *XdsServer) Connect() error {
+ var err error
+ var retry int
+
+ xs.Disabled = false
+ xs.Connected = false
+
+ err = nil
+ for retry = xs.ConnRetry; retry > 0; retry-- {
+ if err = xs._CreateConnectHTTP(); err == nil {
+ break
+ }
+ if retry == xs.ConnRetry {
+ // Notify only on the first conn error
+ // doing that avoid 2 notifs (conn false; conn true) on startup
+ xs._NotifyState()
+ }
+ xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
+ time.Sleep(time.Second)
+ }
+ if retry == 0 {
+ // FIXME SEB: re-use _reconnect to wait longer in background
+ return fmt.Errorf("Connection to XDS Server failure")
+ }
+ if err != nil {
+ return err
+ }
+
+ // Check HTTP connection and establish WS connection
+ err = xs._connect(false)
+
+ return err
+}
+
+// IsTempoID returns true when server as a temporary id
+func (xs *XdsServer) IsTempoID() bool {
+ return strings.HasPrefix(xs.ID, _IDTempoPrefix)
+}
+
+// SetLoggerOutput Set logger ou
+func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
+ xs.logOut = out
+}
+
+// FolderAdd Send POST request to add a folder
+func (xs *XdsServer) FolderAdd(prj *FolderConfig, res interface{}) error {
+ response, err := xs.HTTPPost("/folder", prj)
+ if err != nil {
+ return err
+ }
+ if response.StatusCode != 200 {
+ return fmt.Errorf("FolderAdd error status=%s", response.Status)
+ }
+ // Result is a FolderConfig that is equivalent to ProjectConfig
+ err = json.Unmarshal(xs.client.ResponseToBArray(response), res)
+
+ return err
+}
+
+// FolderDelete Send DELETE request to delete a folder
+func (xs *XdsServer) FolderDelete(id string) error {
+ return xs.client.HTTPDelete("/folder/" + id)
+}
+
+// HTTPGet .
+func (xs *XdsServer) HTTPGet(url string, data interface{}) error {
+ var dd []byte
+ if err := xs.client.HTTPGet(url, &dd); err != nil {
+ return err
+ }
+ return json.Unmarshal(dd, &data)
+}
+
+// HTTPPost .
+func (xs *XdsServer) HTTPPost(url string, data interface{}) (*http.Response, error) {
+ body, err := json.Marshal(data)
+ if err != nil {
+ return nil, err
+ }
+ return xs.HTTPPostBody(url, string(body))
+}
+
+// HTTPPostBody .
+func (xs *XdsServer) HTTPPostBody(url string, body string) (*http.Response, error) {
+ return xs.client.HTTPPostWithRes(url, body)
+}
+
+// SetAPIRouterGroup .
+func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
+ xs.apiRouter = r
+}
+
+// PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
+func (xs *XdsServer) PassthroughGet(url string) {
+ if xs.apiRouter == nil {
+ xs.Log.Errorf("apiRouter not set !")
+ return
+ }
+
+ xs.apiRouter.GET(url, func(c *gin.Context) {
+ var data interface{}
+ if err := xs.HTTPGet(url, &data); err != nil {
+ if strings.Contains(err.Error(), "connection refused") {
+ xs.Connected = false
+ xs._NotifyState()
+ }
+ common.APIError(c, err.Error())
+ return
+ }
+
+ c.JSON(http.StatusOK, data)
+ })
+}
+
+// PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
+func (xs *XdsServer) PassthroughPost(url string) {
+ if xs.apiRouter == nil {
+ xs.Log.Errorf("apiRouter not set !")
+ return
+ }
+
+ xs.apiRouter.POST(url, func(c *gin.Context) {
+ bodyReq := []byte{}
+ n, err := c.Request.Body.Read(bodyReq)
+ if err != nil {
+ common.APIError(c, err.Error())
+ return
+ }
+
+ response, err := xs.HTTPPostBody(url, string(bodyReq[:n]))
+ if err != nil {
+ common.APIError(c, err.Error())
+ return
+ }
+ bodyRes, err := ioutil.ReadAll(response.Body)
+ if err != nil {
+ common.APIError(c, "Cannot read response body")
+ return
+ }
+ c.JSON(http.StatusOK, string(bodyRes))
+ })
+}
+
+// EventOn Register a callback on events reception
+func (xs *XdsServer) EventOn(message string, f interface{}) (err error) {
+ if xs.ioSock == nil {
+ return fmt.Errorf("Io.Socket not initialized")
+ }
+ // FIXME SEB: support chain / multiple listeners
+ /* sockEvents map[string][]*caller
+ xs.sockEventsLock.Lock()
+ xs.sockEvents[message] = append(xs.sockEvents[message], f)
+ xs.sockEventsLock.Unlock()
+ xs.ioSock.On(message, func(ev) {
+
+ })
+ */
+ return xs.ioSock.On(message, f)
+}
+
+// ProjectToFolder
+func (xs *XdsServer) ProjectToFolder(pPrj ProjectConfig) *FolderConfig {
+ stID := ""
+ if pPrj.Type == XdsTypeCloudSync {
+ stID, _ = xs.SThg.IDGet()
+ }
+ fPrj := FolderConfig{
+ ID: pPrj.ID,
+ Label: pPrj.Label,
+ ClientPath: pPrj.ClientPath,
+ Type: FolderType(pPrj.Type),
+ Status: pPrj.Status,
+ IsInSync: pPrj.IsInSync,
+ DefaultSdk: pPrj.DefaultSdk,
+ DataPathMap: PathMapConfig{
+ ServerPath: pPrj.ServerPath,
+ },
+ DataCloudSync: CloudSyncConfig{
+ SyncThingID: stID,
+ },
+ }
+ return &fPrj
+}
+
+// FolderToProject
+func (xs *XdsServer) FolderToProject(fPrj FolderConfig) ProjectConfig {
+ pPrj := ProjectConfig{
+ ID: fPrj.ID,
+ ServerID: xs.ID,
+ Label: fPrj.Label,
+ ClientPath: fPrj.ClientPath,
+ ServerPath: fPrj.DataPathMap.ServerPath,
+ Type: ProjectType(fPrj.Type),
+ Status: fPrj.Status,
+ IsInSync: fPrj.IsInSync,
+ DefaultSdk: fPrj.DefaultSdk,
+ }
+ return pPrj
+}
+
+/***
+** Private functions
+***/
+
+// Create HTTP client
+func (xs *XdsServer) _CreateConnectHTTP() error {
+ var err error
+ xs.client, err = common.HTTPNewClient(xs.BaseURL,
+ common.HTTPClientConfig{
+ URLPrefix: "/api/v1",
+ HeaderClientKeyName: "Xds-Sid",
+ CsrfDisable: true,
+ LogOut: xs.logOut,
+ LogPrefix: "XDSSERVER: ",
+ LogLevel: common.HTTPLogLevelWarning,
+ })
+
+ xs.client.SetLogLevel(xs.Log.Level.String())
+
+ if err != nil {
+ msg := ": " + err.Error()
+ if strings.Contains(err.Error(), "connection refused") {
+ msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
+ }
+ return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
+ }
+ if xs.client == nil {
+ return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
+ }
+
+ return nil
+}
+
+// Re-established connection
+func (xs *XdsServer) _reconnect() error {
+ err := xs._connect(true)
+ if err == nil {
+ // Reload projects list for this server
+ err = xs.projects.Init(xs)
+ }
+ return err
+}
+
+// Established HTTP and WS connection and retrieve XDSServer config
+func (xs *XdsServer) _connect(reConn bool) error {
+
+ xdsCfg := xdsServerConfig{}
+ if err := xs.HTTPGet("/config", &xdsCfg); err != nil {
+ xs.Connected = false
+ if !reConn {
+ xs._NotifyState()
+ }
+ return err
+ }
+
+ if reConn && xs.ID != xdsCfg.ID {
+ xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ID)
+ }
+
+ // Update local XDS config
+ xs.ID = xdsCfg.ID
+ xs.ServerConfig = &xdsCfg
+
+ // Establish WS connection and register listen
+ if err := xs._SocketConnect(); err != nil {
+ xs.Connected = false
+ xs._NotifyState()
+ return err
+ }
+
+ xs.Connected = true
+ xs._NotifyState()
+ return nil
+}
+
+// Create WebSocket (io.socket) connection
+func (xs *XdsServer) _SocketConnect() error {
+
+ xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
+
+ opts := &sio_client.Options{
+ Transport: "websocket",
+ Header: make(map[string][]string),
+ }
+ opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
+
+ iosk, err := sio_client.NewClient(xs.BaseURL, opts)
+ if err != nil {
+ return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
+ }
+ xs.ioSock = iosk
+
+ // Register some listeners
+
+ iosk.On("error", func(err error) {
+ xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
+ if xs.CBOnError != nil {
+ xs.CBOnError(err)
+ }
+ })
+
+ iosk.On("disconnection", func(err error) {
+ xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
+ if xs.CBOnDisconnect != nil {
+ xs.CBOnDisconnect(err)
+ }
+ xs.Connected = false
+ xs._NotifyState()
+
+ // Try to reconnect during 15min (or at least while not disabled)
+ go func() {
+ count := 0
+ waitTime := 1
+ for !xs.Disabled && !xs.Connected {
+ count++
+ if count%60 == 0 {
+ waitTime *= 5
+ }
+ if waitTime > 15*60 {
+ xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
+ return
+ }
+ time.Sleep(time.Second * time.Duration(waitTime))
+ xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
+
+ xs._reconnect()
+ }
+ }()
+ })
+
+ // XXX - There is no connection event generated so, just consider that
+ // we are connected when NewClient return successfully
+ /* iosk.On("connection", func() { ... }) */
+ xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
+
+ return nil
+}
+
+// Send event to notify changes
+func (xs *XdsServer) _NotifyState() {
+
+ evSts := ServerCfg{
+ ID: xs.ID,
+ URL: xs.BaseURL,
+ APIURL: xs.APIURL,
+ PartialURL: xs.PartialURL,
+ ConnRetry: xs.ConnRetry,
+ Connected: xs.Connected,
+ }
+ if err := xs.events.Emit(EVTServerConfig, evSts); err != nil {
+ xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)
+ }
+}
diff --git a/lib/apiv1/apiv1.go b/lib/apiv1/apiv1.go
deleted file mode 100644
index 734929b..0000000
--- a/lib/apiv1/apiv1.go
+++ /dev/null
@@ -1,36 +0,0 @@
-package apiv1
-
-import (
- "github.com/Sirupsen/logrus"
- "github.com/gin-gonic/gin"
-
- "github.com/iotbzh/xds-agent/lib/session"
- "github.com/iotbzh/xds-agent/lib/xdsconfig"
-)
-
-// APIService .
-type APIService struct {
- router *gin.Engine
- apiRouter *gin.RouterGroup
- sessions *session.Sessions
- cfg *xdsconfig.Config
- log *logrus.Logger
-}
-
-// New creates a new instance of API service
-func New(sess *session.Sessions, conf *xdsconfig.Config, log *logrus.Logger, r *gin.Engine) *APIService {
- s := &APIService{
- router: r,
- sessions: sess,
- apiRouter: r.Group("/api/v1"),
- cfg: conf,
- log: log,
- }
-
- s.apiRouter.GET("/version", s.getVersion)
-
- s.apiRouter.GET("/config", s.getConfig)
- s.apiRouter.POST("/config", s.setConfig)
-
- return s
-}
diff --git a/lib/apiv1/config.go b/lib/apiv1/config.go
deleted file mode 100644
index 47155ed..0000000
--- a/lib/apiv1/config.go
+++ /dev/null
@@ -1,45 +0,0 @@
-package apiv1
-
-import (
- "net/http"
- "sync"
-
- "github.com/gin-gonic/gin"
- "github.com/iotbzh/xds-agent/lib/xdsconfig"
- common "github.com/iotbzh/xds-common/golib"
-)
-
-var confMut sync.Mutex
-
-// GetConfig returns the configuration
-func (s *APIService) getConfig(c *gin.Context) {
- confMut.Lock()
- defer confMut.Unlock()
-
- c.JSON(http.StatusOK, s.cfg)
-}
-
-// SetConfig sets configuration
-func (s *APIService) setConfig(c *gin.Context) {
- // FIXME - must be tested
- c.JSON(http.StatusNotImplemented, "Not implemented")
-
- var cfgArg xdsconfig.Config
-
- if c.BindJSON(&cfgArg) != nil {
- common.APIError(c, "Invalid arguments")
- return
- }
-
- confMut.Lock()
- defer confMut.Unlock()
-
- s.log.Debugln("SET config: ", cfgArg)
-
- if err := s.cfg.UpdateAll(cfgArg); err != nil {
- common.APIError(c, err.Error())
- return
- }
-
- c.JSON(http.StatusOK, s.cfg)
-}
diff --git a/lib/apiv1/version.go b/lib/apiv1/version.go
deleted file mode 100644
index e022441..0000000
--- a/lib/apiv1/version.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package apiv1
-
-import (
- "net/http"
-
- "github.com/gin-gonic/gin"
-)
-
-type version struct {
- Version string `json:"version"`
- APIVersion string `json:"apiVersion"`
- VersionGitTag string `json:"gitTag"`
-}
-
-// getInfo : return various information about server
-func (s *APIService) getVersion(c *gin.Context) {
- response := version{
- Version: s.cfg.Version,
- APIVersion: s.cfg.APIVersion,
- VersionGitTag: s.cfg.VersionGitTag,
- }
-
- c.JSON(http.StatusOK, response)
-}
diff --git a/lib/syncthing/st.go b/lib/syncthing/st.go
index 660738d..bc3b101 100644
--- a/lib/syncthing/st.go
+++ b/lib/syncthing/st.go
@@ -24,11 +24,14 @@ import (
// SyncThing .
type SyncThing struct {
- BaseURL string
- APIKey string
- Home string
- STCmd *exec.Cmd
- STICmd *exec.Cmd
+ BaseURL string
+ APIKey string
+ Home string
+ STCmd *exec.Cmd
+ STICmd *exec.Cmd
+ MyID string
+ Connected bool
+ Events *Events
// Private fields
binDir string
@@ -37,6 +40,7 @@ type SyncThing struct {
exitSTIChan chan ExitChan
client *common.HTTPClient
log *logrus.Logger
+ conf *xdsconfig.Config
}
// ExitChan Channel used for process exit
@@ -45,6 +49,42 @@ type ExitChan struct {
err error
}
+// ConfigInSync Check whether if Syncthing configuration is in sync
+type configInSync struct {
+ ConfigInSync bool `json:"configInSync"`
+}
+
+// FolderStatus Information about the current status of a folder.
+type FolderStatus struct {
+ GlobalFiles int `json:"globalFiles"`
+ GlobalDirectories int `json:"globalDirectories"`
+ GlobalSymlinks int `json:"globalSymlinks"`
+ GlobalDeleted int `json:"globalDeleted"`
+ GlobalBytes int64 `json:"globalBytes"`
+
+ LocalFiles int `json:"localFiles"`
+ LocalDirectories int `json:"localDirectories"`
+ LocalSymlinks int `json:"localSymlinks"`
+ LocalDeleted int `json:"localDeleted"`
+ LocalBytes int64 `json:"localBytes"`
+
+ NeedFiles int `json:"needFiles"`
+ NeedDirectories int `json:"needDirectories"`
+ NeedSymlinks int `json:"needSymlinks"`
+ NeedDeletes int `json:"needDeletes"`
+ NeedBytes int64 `json:"needBytes"`
+
+ InSyncFiles int `json:"inSyncFiles"`
+ InSyncBytes int64 `json:"inSyncBytes"`
+
+ State string `json:"state"`
+ StateChanged time.Time `json:"stateChanged"`
+
+ Sequence int64 `json:"sequence"`
+
+ IgnorePatterns bool `json:"ignorePatterns"`
+}
+
// NewSyncThing creates a new instance of Syncthing
func NewSyncThing(conf *xdsconfig.Config, log *logrus.Logger) *SyncThing {
var url, apiKey, home, binDir string
@@ -75,8 +115,12 @@ func NewSyncThing(conf *xdsconfig.Config, log *logrus.Logger) *SyncThing {
binDir: binDir,
logsDir: conf.FileConf.LogsDir,
log: log,
+ conf: conf,
}
+ // Create Events monitoring
+ // SEB TO TEST s.Events = s.NewEventListener()
+
return &s
}
@@ -182,6 +226,8 @@ func (s *SyncThing) Start() (*exec.Cmd, error) {
"STNOUPGRADE=1",
}
+ /* SEB STILL NEEDED, if not SUP code
+
// XXX - temporary hack because -gui-apikey seems to correctly handle by
// syncthing the early first time
stConfigFile := filepath.Join(s.Home, "config.xml")
@@ -211,12 +257,12 @@ func (s *SyncThing) Start() (*exec.Cmd, error) {
return nil, fmt.Errorf("Cannot write Syncthing config file to set apikey")
}
}
-
+ */
s.STCmd, err = s.startProc("syncthing", args, env, &s.exitSTChan)
// Use autogenerated apikey if not set by config.json
- if s.APIKey == "" {
- if fd, err := os.Open(stConfigFile); err == nil {
+ if err == nil && s.APIKey == "" {
+ if fd, err := os.Open(filepath.Join(s.Home, "config.xml")); err == nil {
defer fd.Close()
if b, err := ioutil.ReadAll(fd); err == nil {
re := regexp.MustCompile("<apikey>(.*)</apikey>")
@@ -294,11 +340,17 @@ func (s *SyncThing) StopInotify() {
// Connect Establish HTTP connection with Syncthing
func (s *SyncThing) Connect() error {
var err error
+ s.Connected = false
s.client, err = common.HTTPNewClient(s.BaseURL,
common.HTTPClientConfig{
URLPrefix: "/rest",
HeaderClientKeyName: "X-Syncthing-ID",
+ LogOut: s.conf.LogVerboseOut,
+ LogPrefix: "SYNCTHING: ",
+ LogLevel: common.HTTPLogLevelWarning,
})
+ s.client.SetLogLevel(s.log.Level.String())
+
if err != nil {
msg := ": " + err.Error()
if strings.Contains(err.Error(), "connection refused") {
@@ -310,11 +362,17 @@ func (s *SyncThing) Connect() error {
return fmt.Errorf("ERROR: cannot connect to Syncthing (null client)")
}
- s.client.SetLogLevel(s.log.Level.String())
- s.client.LoggerPrefix = "SYNCTHING: "
- s.client.LoggerOut = s.log.Out
+ s.MyID, err = s.IDGet()
+ if err != nil {
+ return fmt.Errorf("ERROR: cannot retrieve ID")
+ }
+
+ s.Connected = true
- return nil
+ // Start events monitoring
+ //SEB TODO err = s.Events.Start()
+
+ return err
}
// IDGet returns the Syncthing ID of Syncthing instance running locally
@@ -347,3 +405,16 @@ func (s *SyncThing) ConfigSet(cfg config.Configuration) error {
}
return s.client.HTTPPost("system/config", string(body))
}
+
+// IsConfigInSync Returns true if configuration is in sync
+func (s *SyncThing) IsConfigInSync() (bool, error) {
+ var data []byte
+ var d configInSync
+ if err := s.client.HTTPGet("system/config/insync", &data); err != nil {
+ return false, err
+ }
+ if err := json.Unmarshal(data, &d); err != nil {
+ return false, err
+ }
+ return d.ConfigInSync, nil
+}
diff --git a/lib/syncthing/stEvent.go b/lib/syncthing/stEvent.go
new file mode 100644
index 0000000..9ca8b78
--- /dev/null
+++ b/lib/syncthing/stEvent.go
@@ -0,0 +1,265 @@
+package st
+
+import (
+ "encoding/json"
+ "fmt"
+ "os"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/Sirupsen/logrus"
+)
+
+// Events .
+type Events struct {
+ MonitorTime time.Duration
+ Debug bool
+
+ stop chan bool
+ st *SyncThing
+ log *logrus.Logger
+ cbArr map[string][]cbMap
+}
+
+type Event struct {
+ Type string `json:"type"`
+ Time time.Time `json:"time"`
+ Data map[string]string `json:"data"`
+}
+
+type EventsCBData map[string]interface{}
+type EventsCB func(ev Event, cbData *EventsCBData)
+
+const (
+ EventFolderCompletion string = "FolderCompletion"
+ EventFolderSummary string = "FolderSummary"
+ EventFolderPaused string = "FolderPaused"
+ EventFolderResumed string = "FolderResumed"
+ EventFolderErrors string = "FolderErrors"
+ EventStateChanged string = "StateChanged"
+)
+
+var EventsAll string = EventFolderCompletion + "|" +
+ EventFolderSummary + "|" +
+ EventFolderPaused + "|" +
+ EventFolderResumed + "|" +
+ EventFolderErrors + "|" +
+ EventStateChanged
+
+type STEvent struct {
+ // Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
+ SubscriptionID int `json:"id"`
+ // Global ID of the event across all subscriptions
+ GlobalID int `json:"globalID"`
+ Time time.Time `json:"time"`
+ Type string `json:"type"`
+ Data map[string]interface{} `json:"data"`
+}
+
+type cbMap struct {
+ id int
+ cb EventsCB
+ filterID string
+ data *EventsCBData
+}
+
+// NewEventListener Create a new instance of Event listener
+func (s *SyncThing) NewEventListener() *Events {
+ _, dbg := os.LookupEnv("XDS_DEBUG_STEVENTS") // set to add more debug log
+ return &Events{
+ MonitorTime: 100, // in Milliseconds
+ Debug: dbg,
+ stop: make(chan bool, 1),
+ st: s,
+ log: s.log,
+ cbArr: make(map[string][]cbMap),
+ }
+}
+
+// Start starts event monitoring loop
+func (e *Events) Start() error {
+ go e.monitorLoop()
+ return nil
+}
+
+// Stop stops event monitoring loop
+func (e *Events) Stop() {
+ e.stop <- true
+}
+
+// Register Add a listener on an event
+func (e *Events) Register(evName string, cb EventsCB, filterID string, data *EventsCBData) (int, error) {
+ if evName == "" || !strings.Contains(EventsAll, evName) {
+ return -1, fmt.Errorf("Unknown event name")
+ }
+ if data == nil {
+ data = &EventsCBData{}
+ }
+
+ cbList := []cbMap{}
+ if _, ok := e.cbArr[evName]; ok {
+ cbList = e.cbArr[evName]
+ }
+
+ id := len(cbList)
+ (*data)["id"] = strconv.Itoa(id)
+
+ e.cbArr[evName] = append(cbList, cbMap{id: id, cb: cb, filterID: filterID, data: data})
+
+ return id, nil
+}
+
+// UnRegister Remove a listener event
+func (e *Events) UnRegister(evName string, id int) error {
+ cbKey, ok := e.cbArr[evName]
+ if !ok {
+ return fmt.Errorf("No event registered to such name")
+ }
+
+ // FIXME - NOT TESTED
+ if id >= len(cbKey) {
+ return fmt.Errorf("Invalid id")
+ } else if id == len(cbKey) {
+ e.cbArr[evName] = cbKey[:id-1]
+ } else {
+ e.cbArr[evName] = cbKey[id : id+1]
+ }
+
+ return nil
+}
+
+// GetEvents returns the Syncthing events
+func (e *Events) getEvents(since int) ([]STEvent, error) {
+ var data []byte
+ ev := []STEvent{}
+ url := "events"
+ if since != -1 {
+ url += "?since=" + strconv.Itoa(since)
+ }
+ if err := e.st.client.HTTPGet(url, &data); err != nil {
+ return ev, err
+ }
+ err := json.Unmarshal(data, &ev)
+ return ev, err
+}
+
+// Loop to monitor Syncthing events
+func (e *Events) monitorLoop() {
+ e.log.Infof("Event monitoring running...")
+ since := 0
+ cntErrConn := 0
+ cntErrRetry := 1
+ for {
+ select {
+ case <-e.stop:
+ e.log.Infof("Event monitoring exited")
+ return
+
+ case <-time.After(e.MonitorTime * time.Millisecond):
+
+ if !e.st.Connected {
+ cntErrConn++
+ time.Sleep(time.Second)
+ if cntErrConn > cntErrRetry {
+ e.log.Error("ST Event monitor: ST connection down")
+ cntErrConn = 0
+ cntErrRetry *= 2
+ if _, err := e.getEvents(since); err == nil {
+ e.st.Connected = true
+ cntErrRetry = 1
+ // XXX - should we reset since value ?
+ goto readEvent
+ }
+ }
+ continue
+ }
+
+ readEvent:
+ stEvArr, err := e.getEvents(since)
+ if err != nil {
+ e.log.Errorf("Syncthing Get Events: %v", err)
+ e.st.Connected = false
+ continue
+ }
+
+ // Process events
+ for _, stEv := range stEvArr {
+ since = stEv.SubscriptionID
+ if e.Debug {
+ e.log.Warnf("ST EVENT: %d %s\n %v", stEv.GlobalID, stEv.Type, stEv)
+ }
+
+ cbKey, ok := e.cbArr[stEv.Type]
+ if !ok {
+ continue
+ }
+
+ evData := Event{
+ Type: stEv.Type,
+ Time: stEv.Time,
+ }
+
+ // Decode Events
+ // FIXME: re-define data struct for each events
+ // instead of map of string and use JSON marshing/unmarshing
+ fID := ""
+ evData.Data = make(map[string]string)
+ switch stEv.Type {
+
+ case EventFolderCompletion:
+ fID = convString(stEv.Data["folder"])
+ evData.Data["completion"] = convFloat64(stEv.Data["completion"])
+
+ case EventFolderSummary:
+ fID = convString(stEv.Data["folder"])
+ evData.Data["needBytes"] = convInt64(stEv.Data["needBytes"])
+ evData.Data["state"] = convString(stEv.Data["state"])
+
+ case EventFolderPaused, EventFolderResumed:
+ fID = convString(stEv.Data["id"])
+ evData.Data["label"] = convString(stEv.Data["label"])
+
+ case EventFolderErrors:
+ fID = convString(stEv.Data["folder"])
+ // TODO decode array evData.Data["errors"] = convString(stEv.Data["errors"])
+
+ case EventStateChanged:
+ fID = convString(stEv.Data["folder"])
+ evData.Data["from"] = convString(stEv.Data["from"])
+ evData.Data["to"] = convString(stEv.Data["to"])
+
+ default:
+ e.log.Warnf("Unsupported event type")
+ }
+
+ if fID != "" {
+ evData.Data["id"] = fID
+ }
+
+ // Call all registered callbacks
+ for _, c := range cbKey {
+ if e.Debug {
+ e.log.Warnf("EVENT CB fID=%s, filterID=%s", fID, c.filterID)
+ }
+ // Call when filterID is not set or when it matches
+ if c.filterID == "" || (fID != "" && fID == c.filterID) {
+ c.cb(evData, c.data)
+ }
+ }
+ }
+ }
+ }
+}
+
+func convString(d interface{}) string {
+ return d.(string)
+}
+
+func convFloat64(d interface{}) string {
+ return strconv.FormatFloat(d.(float64), 'f', -1, 64)
+}
+
+func convInt64(d interface{}) string {
+ return strconv.FormatInt(d.(int64), 10)
+}
diff --git a/lib/syncthing/stfolder.go b/lib/syncthing/stfolder.go
index d79e579..a5312eb 100644
--- a/lib/syncthing/stfolder.go
+++ b/lib/syncthing/stfolder.go
@@ -1,10 +1,12 @@
package st
import (
- "path/filepath"
+ "encoding/json"
+ "fmt"
"strings"
- "github.com/syncthing/syncthing/lib/config"
+ common "github.com/iotbzh/xds-common/golib"
+ stconfig "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/protocol"
)
@@ -18,23 +20,23 @@ type FolderChangeArg struct {
}
// FolderChange is called when configuration has changed
-func (s *SyncThing) FolderChange(f FolderChangeArg) error {
+func (s *SyncThing) FolderChange(f FolderChangeArg) (string, error) {
// Get current config
stCfg, err := s.ConfigGet()
if err != nil {
s.log.Errorln(err)
- return err
+ return "", err
}
// Add new Device if needed
var devID protocol.DeviceID
if err := devID.UnmarshalText([]byte(f.SyncThingID)); err != nil {
- s.log.Errorf("not a valid device id (err %v)\n", err)
- return err
+ s.log.Errorf("not a valid device id (err %v)", err)
+ return "", err
}
- newDevice := config.DeviceConfiguration{
+ newDevice := stconfig.DeviceConfiguration{
DeviceID: devID,
Name: f.SyncThingID,
Addresses: []string{"dynamic"},
@@ -60,18 +62,33 @@ func (s *SyncThing) FolderChange(f FolderChangeArg) error {
id = f.SyncThingID[0:15] + "_" + label
}
- folder := config.FolderConfiguration{
- ID: id,
- Label: label,
- RawPath: filepath.Join(f.ShareRootDir, f.RelativePath),
+ // Resolve local path
+ pathCli, err := common.ResolveEnvVar(f.RelativePath)
+ if err != nil {
+ pathCli = f.RelativePath
+ }
+ // SEB still need ShareRootDir ? a sup
+ // pathCli := filepath.Join(f.ShareRootDir, f.RelativePath)
+
+ folder := stconfig.FolderConfiguration{
+ ID: id,
+ Label: label,
+ RawPath: pathCli,
+ AutoNormalize: true,
}
- folder.Devices = append(folder.Devices, config.FolderDeviceConfiguration{
+ /* TODO - add it ?
+ if s.conf.FileConf.SThgConf.RescanIntervalS > 0 {
+ folder.RescanIntervalS = s.conf.FileConf.SThgConf.RescanIntervalS
+ }
+ */
+
+ folder.Devices = append(folder.Devices, stconfig.FolderDeviceConfiguration{
DeviceID: newDevice.DeviceID,
})
found = false
- var fld config.FolderConfiguration
+ var fld stconfig.FolderConfiguration
for _, fld = range stCfg.Folders {
if folder.ID == fld.ID {
fld = folder
@@ -89,7 +106,7 @@ func (s *SyncThing) FolderChange(f FolderChangeArg) error {
s.log.Errorln(err)
}
- return nil
+ return id, nil
}
// FolderDelete is called to delete a folder config
@@ -114,3 +131,61 @@ func (s *SyncThing) FolderDelete(id string) error {
return nil
}
+
+// FolderConfigGet Returns the configuration of a specific folder
+func (s *SyncThing) FolderConfigGet(folderID string) (stconfig.FolderConfiguration, error) {
+ fc := stconfig.FolderConfiguration{}
+ if folderID == "" {
+ return fc, fmt.Errorf("folderID not set")
+ }
+ cfg, err := s.ConfigGet()
+ if err != nil {
+ return fc, err
+ }
+ for _, f := range cfg.Folders {
+ if f.ID == folderID {
+ fc = f
+ return fc, nil
+ }
+ }
+ return fc, fmt.Errorf("id not found")
+}
+
+// FolderStatus Returns all information about the current
+func (s *SyncThing) FolderStatus(folderID string) (*FolderStatus, error) {
+ var data []byte
+ var res FolderStatus
+ if folderID == "" {
+ return nil, fmt.Errorf("folderID not set")
+ }
+ if err := s.client.HTTPGet("db/status?folder="+folderID, &data); err != nil {
+ return nil, err
+ }
+ if err := json.Unmarshal(data, &res); err != nil {
+ return nil, err
+ }
+ return &res, nil
+}
+
+// IsFolderInSync Returns true when folder is in sync
+func (s *SyncThing) IsFolderInSync(folderID string) (bool, error) {
+ sts, err := s.FolderStatus(folderID)
+ if err != nil {
+ return false, err
+ }
+ return sts.NeedBytes == 0 && sts.State == "idle", nil
+}
+
+// FolderScan Request immediate folder scan.
+// Scan all folders if folderID param is empty
+func (s *SyncThing) FolderScan(folderID string, subpath string) error {
+ url := "db/scan"
+ if folderID != "" {
+ url += "?folder=" + folderID
+
+ if subpath != "" {
+ url += "&sub=" + subpath
+ }
+ }
+ return s.client.HTTPPost(url, "")
+}
diff --git a/lib/webserver/server.go b/lib/webserver/server.go
deleted file mode 100644
index b835a65..0000000
--- a/lib/webserver/server.go
+++ /dev/null
@@ -1,226 +0,0 @@
-package webserver
-
-import (
- "fmt"
- "net/http"
- "strings"
-
- "github.com/Sirupsen/logrus"
- "github.com/gin-gonic/gin"
- "github.com/googollee/go-socket.io"
- "github.com/iotbzh/xds-agent/lib/apiv1"
- "github.com/iotbzh/xds-agent/lib/session"
- "github.com/iotbzh/xds-agent/lib/xdsconfig"
-)
-
-// ServerService .
-type ServerService struct {
- router *gin.Engine
- api *apiv1.APIService
- sIOServer *socketio.Server
- webApp *gin.RouterGroup
- cfg *xdsconfig.Config
- sessions *session.Sessions
- log *logrus.Logger
- stop chan struct{} // signals intentional stop
-}
-
-const indexFilename = "index.html"
-const cookieMaxAge = "3600"
-
-// New creates an instance of ServerService
-func New(conf *xdsconfig.Config, log *logrus.Logger) *ServerService {
-
- // Setup logging for gin router
- if log.Level == logrus.DebugLevel {
- gin.SetMode(gin.DebugMode)
- } else {
- gin.SetMode(gin.ReleaseMode)
- }
-
- // TODO
- // - try to bind gin DefaultWriter & DefaultErrorWriter to logrus logger
- // - try to fix pb about isTerminal=false when out is in VSC Debug Console
- //gin.DefaultWriter = ??
- //gin.DefaultErrorWriter = ??
-
- // Creates gin router
- r := gin.New()
-
- svr := &ServerService{
- router: r,
- api: nil,
- sIOServer: nil,
- webApp: nil,
- cfg: conf,
- log: log,
- sessions: nil,
- stop: make(chan struct{}),
- }
-
- return svr
-}
-
-// Serve starts a new instance of the Web Server
-func (s *ServerService) Serve() error {
- var err error
-
- // Setup middlewares
- s.router.Use(gin.Logger())
- s.router.Use(gin.Recovery())
- s.router.Use(s.middlewareCORS())
- s.router.Use(s.middlewareXDSDetails())
- s.router.Use(s.middlewareCSRF())
-
- // Sessions manager
- s.sessions = session.NewClientSessions(s.router, s.log, cookieMaxAge)
-
- s.router.GET("", s.slashHandler)
-
- // Create REST API
- s.api = apiv1.New(s.sessions, s.cfg, s.log, s.router)
-
- // Websocket routes
- s.sIOServer, err = socketio.NewServer(nil)
- if err != nil {
- s.log.Fatalln(err)
- }
-
- s.router.GET("/socket.io/", s.socketHandler)
- s.router.POST("/socket.io/", s.socketHandler)
- /* TODO: do we want to support ws://... ?
- s.router.Handle("WS", "/socket.io/", s.socketHandler)
- s.router.Handle("WSS", "/socket.io/", s.socketHandler)
- */
-
- // Serve in the background
- serveError := make(chan error, 1)
- go func() {
- fmt.Printf("Web Server running on localhost:%s ...\n", s.cfg.HTTPPort)
- serveError <- http.ListenAndServe(":"+s.cfg.HTTPPort, s.router)
- }()
-
- fmt.Printf("XDS agent running...\n")
-
- // Wait for stop, restart or error signals
- select {
- case <-s.stop:
- // Shutting down permanently
- s.sessions.Stop()
- s.log.Infoln("shutting down (stop)")
- case err = <-serveError:
- // Error due to listen/serve failure
- s.log.Errorln(err)
- }
-
- return nil
-}
-
-// Stop web server
-func (s *ServerService) Stop() {
- close(s.stop)
-}
-
-// serveSlash provides response to GET "/"
-func (s *ServerService) slashHandler(c *gin.Context) {
- c.String(200, "Hello from XDS agent!")
-}
-
-// Add details in Header
-func (s *ServerService) middlewareXDSDetails() gin.HandlerFunc {
- return func(c *gin.Context) {
- c.Header("XDS-Agent-Version", s.cfg.Version)
- c.Header("XDS-API-Version", s.cfg.APIVersion)
- c.Next()
- }
-}
-
-func (s *ServerService) isValidAPIKey(key string) bool {
- return (key == s.cfg.FileConf.XDSAPIKey && key != "")
-}
-
-func (s *ServerService) middlewareCSRF() gin.HandlerFunc {
- return func(c *gin.Context) {
- // Allow requests carrying a valid API key
- if s.isValidAPIKey(c.Request.Header.Get("X-API-Key")) {
- // Set the access-control-allow-origin header for CORS requests
- // since a valid API key has been provided
- c.Header("Access-Control-Allow-Origin", "*")
- c.Next()
- return
- }
-
- // Allow io.socket request
- if strings.HasPrefix(c.Request.URL.Path, "/socket.io") {
- c.Next()
- return
- }
-
- /* FIXME Add really CSRF support
-
- // Allow requests for anything not under the protected path prefix,
- // and set a CSRF cookie if there isn't already a valid one.
- if !strings.HasPrefix(c.Request.URL.Path, prefix) {
- cookie, err := c.Cookie("CSRF-Token-" + unique)
- if err != nil || !validCsrfToken(cookie.Value) {
- s.log.Debugln("new CSRF cookie in response to request for", c.Request.URL)
- c.SetCookie("CSRF-Token-"+unique, newCsrfToken(), 600, "/", "", false, false)
- }
- c.Next()
- return
- }
-
- // Verify the CSRF token
- token := c.Request.Header.Get("X-CSRF-Token-" + unique)
- if !validCsrfToken(token) {
- c.AbortWithError(403, "CSRF Error")
- return
- }
-
- c.Next()
- */
- c.AbortWithError(403, fmt.Errorf("Not valid API key"))
- }
-}
-
-// CORS middleware
-func (s *ServerService) middlewareCORS() gin.HandlerFunc {
- return func(c *gin.Context) {
- if c.Request.Method == "OPTIONS" {
- c.Header("Access-Control-Allow-Origin", "*")
- c.Header("Access-Control-Allow-Headers", "Content-Type, X-API-Key")
- c.Header("Access-Control-Allow-Methods", "GET, POST, DELETE")
- c.Header("Access-Control-Max-Age", cookieMaxAge)
- c.AbortWithStatus(204)
- return
- }
- c.Next()
- }
-}
-
-// socketHandler is the handler for the "main" websocket connection
-func (s *ServerService) socketHandler(c *gin.Context) {
-
- // Retrieve user session
- sess := s.sessions.Get(c)
- if sess == nil {
- c.JSON(500, gin.H{"error": "Cannot retrieve session"})
- return
- }
-
- s.sIOServer.On("connection", func(so socketio.Socket) {
- s.log.Debugf("WS Connected (SID=%v)", so.Id())
- s.sessions.UpdateIOSocket(sess.ID, &so)
-
- so.On("disconnection", func() {
- s.log.Debugf("WS disconnected (SID=%v)", so.Id())
- s.sessions.UpdateIOSocket(sess.ID, nil)
- })
- })
-
- s.sIOServer.On("error", func(so socketio.Socket, err error) {
- s.log.Errorf("WS SID=%v Error : %v", so.Id(), err.Error())
- })
-
- s.sIOServer.ServeHTTP(c.Writer, c.Request)
-}
diff --git a/lib/xdsconfig/config.go b/lib/xdsconfig/config.go
index 854d383..9cff862 100644
--- a/lib/xdsconfig/config.go
+++ b/lib/xdsconfig/config.go
@@ -2,6 +2,8 @@ package xdsconfig
import (
"fmt"
+ "io"
+ "path/filepath"
"os"
@@ -12,14 +14,20 @@ import (
// Config parameters (json format) of /config command
type Config struct {
- Version string `json:"version"`
- APIVersion string `json:"apiVersion"`
- VersionGitTag string `json:"gitTag"`
+ Version string
+ APIVersion string
+ VersionGitTag string
+ Options Options
+ FileConf FileConfig
+ Log *logrus.Logger
+ LogVerboseOut io.Writer
+}
- // Private / un-exported fields
- HTTPPort string `json:"-"`
- FileConf *FileConfig `json:"-"`
- Log *logrus.Logger `json:"-"`
+// Options set at the command line
+type Options struct {
+ ConfigFile string
+ LogLevel string
+ LogFile string
}
// Config default values
@@ -32,39 +40,75 @@ const (
func Init(ctx *cli.Context, log *logrus.Logger) (*Config, error) {
var err error
+ defaultWebAppDir := "${EXEPATH}/www"
+ defaultSTHomeDir := "${HOME}/.xds/agent/syncthing-config"
+
// Define default configuration
c := Config{
Version: ctx.App.Metadata["version"].(string),
APIVersion: DefaultAPIVersion,
VersionGitTag: ctx.App.Metadata["git-tag"].(string),
- HTTPPort: "8010",
- FileConf: &FileConfig{
- LogsDir: "/tmp/logs",
+ Options: Options{
+ ConfigFile: ctx.GlobalString("config"),
+ LogLevel: ctx.GlobalString("log"),
+ LogFile: ctx.GlobalString("logfile"),
+ },
+
+ FileConf: FileConfig{
+ HTTPPort: "8800",
+ WebAppDir: defaultWebAppDir,
+ LogsDir: "/tmp/logs",
+ // SEB XDSAPIKey: "1234abcezam",
+ ServersConf: []XDSServerConf{
+ XDSServerConf{
+ URL: "http://localhost:8000",
+ ConnRetry: 10,
+ },
+ },
SThgConf: &SyncThingConf{
- Home: "${HOME}/.xds/agent/syncthing-config",
+ Home: defaultSTHomeDir,
},
},
Log: log,
}
// config file settings overwrite default config
- c.FileConf, err = updateConfigFromFile(&c, ctx.GlobalString("config"))
+ err = readGlobalConfig(&c, c.Options.ConfigFile)
if err != nil {
return nil, err
}
+ // Handle where Logs are redirected:
+ // default 'stdout' (logfile option default value)
+ // else use file (or filepath) set by --logfile option
+ // that may be overwritten by LogsDir field of config file
+ logF := c.Options.LogFile
+ logD := c.FileConf.LogsDir
+ if logF != "stdout" {
+ if logD != "" {
+ lf := filepath.Base(logF)
+ if lf == "" || lf == "." {
+ lf = "xds-agent.log"
+ }
+ logF = filepath.Join(logD, lf)
+ } else {
+ logD = filepath.Dir(logF)
+ }
+ }
+ if logD == "" || logD == "." {
+ logD = "/tmp/xds/logs"
+ }
+ c.Options.LogFile = logF
+ c.FileConf.LogsDir = logD
+
if c.FileConf.LogsDir != "" && !common.Exists(c.FileConf.LogsDir) {
if err := os.MkdirAll(c.FileConf.LogsDir, 0770); err != nil {
return nil, fmt.Errorf("Cannot create logs dir: %v", err)
}
}
+ c.Log.Infoln("Logs file: ", c.Options.LogFile)
c.Log.Infoln("Logs directory: ", c.FileConf.LogsDir)
return &c, nil
}
-
-// UpdateAll Update the current configuration
-func (c *Config) UpdateAll(newCfg Config) error {
- return fmt.Errorf("Not Supported")
-}
diff --git a/lib/xdsconfig/configfile.go b/lib/xdsconfig/configfile.go
new file mode 100644
index 0000000..a47038b
--- /dev/null
+++ b/lib/xdsconfig/configfile.go
@@ -0,0 +1,112 @@
+package xdsconfig
+
+import (
+ "encoding/json"
+ "os"
+ "path"
+
+ common "github.com/iotbzh/xds-common/golib"
+)
+
+type SyncThingConf struct {
+ BinDir string `json:"binDir"`
+ Home string `json:"home"`
+ GuiAddress string `json:"gui-address"`
+ GuiAPIKey string `json:"gui-apikey"`
+}
+
+type XDSServerConf struct {
+ URL string `json:"url"`
+ ConnRetry int `json:"connRetry"`
+
+ // private/not exported fields
+ ID string `json:"-"`
+ APIBaseURL string `json:"-"`
+ APIPartialURL string `json:"-"`
+}
+
+type FileConfig struct {
+ HTTPPort string `json:"httpPort"`
+ WebAppDir string `json:"webAppDir"`
+ LogsDir string `json:"logsDir"`
+ // SEB A SUP ? XDSAPIKey string `json:"xds-apikey"`
+ ServersConf []XDSServerConf `json:"xdsServers"`
+ SThgConf *SyncThingConf `json:"syncthing"`
+}
+
+// readGlobalConfig reads configuration from a config file.
+// Order to determine which config file is used:
+// 1/ from command line option: "--config myConfig.json"
+// 2/ $HOME/.xds/agent/agent-config.json file
+// 3/ <current_dir>/agent-config.json file
+// 4/ <executable dir>/agent-config.json file
+
+func readGlobalConfig(c *Config, confFile string) error {
+
+ searchIn := make([]string, 0, 3)
+ if confFile != "" {
+ searchIn = append(searchIn, confFile)
+ }
+ if homeDir := common.GetUserHome(); homeDir != "" {
+ searchIn = append(searchIn, path.Join(homeDir, ".xds", "agent", "agent-config.json"))
+ }
+
+ searchIn = append(searchIn, "/etc/xds-agent/agent-config.json")
+
+ searchIn = append(searchIn, path.Join(common.GetExePath(), "agent-config.json"))
+
+ var cFile *string
+ for _, p := range searchIn {
+ if _, err := os.Stat(p); err == nil {
+ cFile = &p
+ break
+ }
+ }
+ if cFile == nil {
+ c.Log.Infof("No config file found")
+ return nil
+ }
+
+ c.Log.Infof("Use config file: %s", *cFile)
+
+ // TODO move on viper package to support comments in JSON and also
+ // bind with flags (command line options)
+ // see https://github.com/spf13/viper#working-with-flags
+
+ fd, _ := os.Open(*cFile)
+ defer fd.Close()
+
+ // Decode config file content and save it in a first variable
+ fCfg := FileConfig{}
+ if err := json.NewDecoder(fd).Decode(&fCfg); err != nil {
+ return err
+ }
+
+ // Decode config file content and overwrite default settings
+ fd.Seek(0, 0)
+ json.NewDecoder(fd).Decode(&c.FileConf)
+
+ // Disable Syncthing support when there is no syncthing field in config
+ if fCfg.SThgConf == nil {
+ c.FileConf.SThgConf = nil
+ }
+
+ // Support environment variables (IOW ${MY_ENV_VAR} syntax) in agent-config.json
+ vars := []*string{
+ &c.FileConf.LogsDir,
+ &c.FileConf.WebAppDir,
+ }
+ if c.FileConf.SThgConf != nil {
+ vars = append(vars, &c.FileConf.SThgConf.Home,
+ &c.FileConf.SThgConf.BinDir)
+ }
+ for _, field := range vars {
+ var err error
+ *field, err = common.ResolveEnvVar(*field)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
diff --git a/lib/xdsconfig/fileconfig.go b/lib/xdsconfig/fileconfig.go
deleted file mode 100644
index efe94bf..0000000
--- a/lib/xdsconfig/fileconfig.go
+++ /dev/null
@@ -1,111 +0,0 @@
-package xdsconfig
-
-import (
- "encoding/json"
- "os"
- "os/user"
- "path"
- "path/filepath"
-
- common "github.com/iotbzh/xds-common/golib"
-)
-
-type SyncThingConf struct {
- BinDir string `json:"binDir"`
- Home string `json:"home"`
- GuiAddress string `json:"gui-address"`
- GuiAPIKey string `json:"gui-apikey"`
-}
-
-type FileConfig struct {
- HTTPPort string `json:"httpPort"`
- LogsDir string `json:"logsDir"`
- XDSAPIKey string `json:"xds-apikey"`
- SThgConf *SyncThingConf `json:"syncthing"`
-}
-
-// getConfigFromFile reads configuration from a config file.
-// Order to determine which config file is used:
-// 1/ from command line option: "--config myConfig.json"
-// 2/ $HOME/.xds/agent/agent-config.json file
-// 3/ <current_dir>/agent-config.json file
-// 4/ <executable dir>/agent-config.json file
-
-func updateConfigFromFile(c *Config, confFile string) (*FileConfig, error) {
-
- searchIn := make([]string, 0, 3)
- if confFile != "" {
- searchIn = append(searchIn, confFile)
- }
- if usr, err := user.Current(); err == nil {
- searchIn = append(searchIn, path.Join(usr.HomeDir, ".xds", "agent", "agent-config.json"))
- }
-
- searchIn = append(searchIn, "/etc/xds-agent/agent-config.json")
-
- exePath := os.Args[0]
- ee, _ := os.Executable()
- exeAbsPath, err := filepath.Abs(ee)
- if err == nil {
- exePath, err = filepath.EvalSymlinks(exeAbsPath)
- if err == nil {
- exePath = filepath.Dir(ee)
- } else {
- exePath = filepath.Dir(exeAbsPath)
- }
- }
- searchIn = append(searchIn, path.Join(exePath, "agent-config.json"))
-
- var cFile *string
- for _, p := range searchIn {
- if _, err := os.Stat(p); err == nil {
- cFile = &p
- break
- }
- }
- // Use default settings
- fCfg := *c.FileConf
-
- // Read config file when existing
- if cFile != nil {
- c.Log.Infof("Use config file: %s", *cFile)
-
- // TODO move on viper package to support comments in JSON and also
- // bind with flags (command line options)
- // see https://github.com/spf13/viper#working-with-flags
-
- fd, _ := os.Open(*cFile)
- defer fd.Close()
- if err := json.NewDecoder(fd).Decode(&fCfg); err != nil {
- return nil, err
- }
- }
-
- // Support environment variables (IOW ${MY_ENV_VAR} syntax) in agent-config.json
- vars := []*string{
- &fCfg.LogsDir,
- }
- if fCfg.SThgConf != nil {
- vars = append(vars, &fCfg.SThgConf.Home, &fCfg.SThgConf.BinDir)
- }
- for _, field := range vars {
- var err error
- *field, err = common.ResolveEnvVar(*field)
- if err != nil {
- return nil, err
- }
- }
-
- // Config file settings overwrite default config
- if fCfg.HTTPPort != "" {
- c.HTTPPort = fCfg.HTTPPort
- }
-
- // Set default apikey
- // FIXME - rework with dynamic key
- if fCfg.XDSAPIKey == "" {
- fCfg.XDSAPIKey = "1234abcezam"
- }
-
- return &fCfg, nil
-}