summaryrefslogtreecommitdiffstats
path: root/lib/agent
diff options
context:
space:
mode:
Diffstat (limited to 'lib/agent')
-rw-r--r--lib/agent/agent.go132
-rw-r--r--lib/agent/apiv1-browse.go28
-rw-r--r--lib/agent/apiv1-config.go108
-rw-r--r--lib/agent/apiv1-events.go73
-rw-r--r--lib/agent/apiv1-exec.go99
-rw-r--r--lib/agent/apiv1-projects.go72
-rw-r--r--lib/agent/apiv1-version.go45
-rw-r--r--lib/agent/apiv1.go129
-rw-r--r--lib/agent/events.go131
-rw-r--r--lib/agent/project-interface.go47
-rw-r--r--lib/agent/project-pathmap.go79
-rw-r--r--lib/agent/project-st.go93
-rw-r--r--lib/agent/projects.go254
-rw-r--r--lib/agent/session.go224
-rw-r--r--lib/agent/webserver.go246
-rw-r--r--lib/agent/xdsserver.go472
16 files changed, 2227 insertions, 5 deletions
diff --git a/lib/agent/agent.go b/lib/agent/agent.go
index 74872f7..29b0622 100644
--- a/lib/agent/agent.go
+++ b/lib/agent/agent.go
@@ -2,18 +2,22 @@ package agent
import (
"fmt"
+ "log"
"os"
"os/exec"
"os/signal"
+ "path/filepath"
"syscall"
+ "time"
"github.com/Sirupsen/logrus"
"github.com/codegangsta/cli"
"github.com/iotbzh/xds-agent/lib/syncthing"
"github.com/iotbzh/xds-agent/lib/xdsconfig"
- "github.com/iotbzh/xds-agent/lib/webserver"
)
+const cookieMaxAge = "3600"
+
// Context holds the Agent context structure
type Context struct {
ProgName string
@@ -22,8 +26,14 @@ type Context struct {
SThg *st.SyncThing
SThgCmd *exec.Cmd
SThgInotCmd *exec.Cmd
- WWWServer *webserver.ServerService
- Exit chan os.Signal
+
+ webServer *WebServer
+ xdsServers map[string]*XdsServer
+ sessions *Sessions
+ events *Events
+ projects *Projects
+
+ Exit chan os.Signal
}
// NewAgent Create a new instance of Agent
@@ -48,6 +58,10 @@ func NewAgent(cliCtx *cli.Context) *Context {
ProgName: cliCtx.App.Name,
Log: log,
Exit: make(chan os.Signal, 1),
+
+ webServer: nil,
+ xdsServers: make(map[string]*XdsServer),
+ events: nil,
}
// register handler on SIGTERM / exit
@@ -57,6 +71,114 @@ func NewAgent(cliCtx *cli.Context) *Context {
return &ctx
}
+// Run Main function called to run agent
+func (ctx *Context) Run() (int, error) {
+ var err error
+
+ // Logs redirected into a file when logfile option or logsDir config is set
+ ctx.Config.LogVerboseOut = os.Stderr
+ if ctx.Config.FileConf.LogsDir != "" {
+ if ctx.Config.Options.LogFile != "stdout" {
+ logFile := ctx.Config.Options.LogFile
+
+ fdL, err := os.OpenFile(logFile, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0666)
+ if err != nil {
+ msgErr := fmt.Errorf("Cannot create log file %s", logFile)
+ return int(syscall.EPERM), msgErr
+ }
+ ctx.Log.Out = fdL
+
+ ctx._logPrint("Logging file: %s\n", logFile)
+ }
+
+ logFileHTTPReq := filepath.Join(ctx.Config.FileConf.LogsDir, "xds-agent-verbose.log")
+ fdLH, err := os.OpenFile(logFileHTTPReq, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0666)
+ if err != nil {
+ msgErr := fmt.Errorf("Cannot create log file %s", logFileHTTPReq)
+ return int(syscall.EPERM), msgErr
+ }
+ ctx.Config.LogVerboseOut = fdLH
+
+ ctx._logPrint("Logging file for HTTP requests: %s\n", logFileHTTPReq)
+ }
+
+ // Create syncthing instance when section "syncthing" is present in config.json
+ if ctx.Config.FileConf.SThgConf != nil {
+ ctx.SThg = st.NewSyncThing(ctx.Config, ctx.Log)
+ }
+
+ // Start local instance of Syncthing and Syncthing-notify
+ if ctx.SThg != nil {
+ ctx.Log.Infof("Starting Syncthing...")
+ ctx.SThgCmd, err = ctx.SThg.Start()
+ if err != nil {
+ return 2, err
+ }
+ fmt.Printf("Syncthing started (PID %d)\n", ctx.SThgCmd.Process.Pid)
+
+ ctx.Log.Infof("Starting Syncthing-inotify...")
+ ctx.SThgInotCmd, err = ctx.SThg.StartInotify()
+ if err != nil {
+ return 2, err
+ }
+ fmt.Printf("Syncthing-inotify started (PID %d)\n", ctx.SThgInotCmd.Process.Pid)
+
+ // Establish connection with local Syncthing (retry if connection fail)
+ time.Sleep(3 * time.Second)
+ maxRetry := 30
+ retry := maxRetry
+ for retry > 0 {
+ if err := ctx.SThg.Connect(); err == nil {
+ break
+ }
+ ctx.Log.Infof("Establishing connection to Syncthing (retry %d/%d)", retry, maxRetry)
+ time.Sleep(time.Second)
+ retry--
+ }
+ if err != nil || retry == 0 {
+ return 2, err
+ }
+
+ // Retrieve Syncthing config
+ id, err := ctx.SThg.IDGet()
+ if err != nil {
+ return 2, err
+ }
+ ctx.Log.Infof("Local Syncthing ID: %s", id)
+
+ } else {
+ ctx.Log.Infof("Cloud Sync / Syncthing not supported")
+ }
+
+ // Create Web Server
+ ctx.webServer = NewWebServer(ctx)
+
+ // Sessions manager
+ ctx.sessions = NewClientSessions(ctx, cookieMaxAge)
+
+ // Create events management
+ ctx.events = NewEvents(ctx)
+
+ // Create projects management
+ ctx.projects = NewProjects(ctx, ctx.SThg)
+
+ // Run Web Server until exit requested (blocking call)
+ if err = ctx.webServer.Serve(); err != nil {
+ log.Println(err)
+ return 3, err
+ }
+
+ return 4, fmt.Errorf("Program exited")
+}
+
+// Helper function to log message on both stdout and logger
+func (ctx *Context) _logPrint(format string, args ...interface{}) {
+ fmt.Printf(format, args...)
+ if ctx.Log.Out != os.Stdout {
+ ctx.Log.Infof(format, args...)
+ }
+}
+
// Handle exit and properly stop/close all stuff
func handlerSigTerm(ctx *Context) {
<-ctx.Exit
@@ -68,9 +190,9 @@ func handlerSigTerm(ctx *Context) {
ctx.SThg.Stop()
ctx.SThg.StopInotify()
}
- if ctx.WWWServer != nil {
+ if ctx.webServer != nil {
ctx.Log.Infof("Stoping Web server...")
- ctx.WWWServer.Stop()
+ ctx.webServer.Stop()
}
os.Exit(1)
}
diff --git a/lib/agent/apiv1-browse.go b/lib/agent/apiv1-browse.go
new file mode 100644
index 0000000..1701a2e
--- /dev/null
+++ b/lib/agent/apiv1-browse.go
@@ -0,0 +1,28 @@
+package agent
+
+import (
+ "net/http"
+
+ "github.com/gin-gonic/gin"
+)
+
+type directory struct {
+ Name string `json:"name"`
+ Fullpath string `json:"fullpath"`
+}
+
+type apiDirectory struct {
+ Dir []directory `json:"dir"`
+}
+
+// browseFS used to browse local file system
+func (s *APIService) browseFS(c *gin.Context) {
+
+ response := apiDirectory{
+ Dir: []directory{
+ directory{Name: "TODO SEB"},
+ },
+ }
+
+ c.JSON(http.StatusOK, response)
+}
diff --git a/lib/agent/apiv1-config.go b/lib/agent/apiv1-config.go
new file mode 100644
index 0000000..31d8de6
--- /dev/null
+++ b/lib/agent/apiv1-config.go
@@ -0,0 +1,108 @@
+package agent
+
+import (
+ "net/http"
+ "sync"
+
+ "github.com/gin-gonic/gin"
+ "github.com/iotbzh/xds-agent/lib/xdsconfig"
+ common "github.com/iotbzh/xds-common/golib"
+)
+
+var confMut sync.Mutex
+
+// APIConfig parameters (json format) of /config command
+type APIConfig struct {
+ Servers []ServerCfg `json:"servers"`
+
+ // Not exposed outside in JSON
+ Version string `json:"-"`
+ APIVersion string `json:"-"`
+ VersionGitTag string `json:"-"`
+}
+
+// ServerCfg .
+type ServerCfg struct {
+ ID string `json:"id"`
+ URL string `json:"url"`
+ APIURL string `json:"apiUrl"`
+ PartialURL string `json:"partialUrl"`
+ ConnRetry int `json:"connRetry"`
+ Connected bool `json:"connected"`
+ Disabled bool `json:"disabled"`
+}
+
+// GetConfig returns the configuration
+func (s *APIService) getConfig(c *gin.Context) {
+ confMut.Lock()
+ defer confMut.Unlock()
+
+ cfg := s._getConfig()
+
+ c.JSON(http.StatusOK, cfg)
+}
+
+// SetConfig sets configuration
+func (s *APIService) setConfig(c *gin.Context) {
+ var cfgArg APIConfig
+ if c.BindJSON(&cfgArg) != nil {
+ common.APIError(c, "Invalid arguments")
+ return
+ }
+
+ confMut.Lock()
+ defer confMut.Unlock()
+
+ s.Log.Debugln("SET config: ", cfgArg)
+
+ // First delete/disable XDS Server that are no longer listed
+ for _, svr := range s.xdsServers {
+ found := false
+ for _, svrArg := range cfgArg.Servers {
+ if svr.ID == svrArg.ID {
+ found = true
+ break
+ }
+ }
+ if !found {
+ s.DelXdsServer(svr.ID)
+ }
+ }
+
+ // Add new XDS Server
+ for _, svr := range cfgArg.Servers {
+ cfg := xdsconfig.XDSServerConf{
+ ID: svr.ID,
+ URL: svr.URL,
+ ConnRetry: svr.ConnRetry,
+ }
+ if _, err := s.AddXdsServer(cfg); err != nil {
+ common.APIError(c, err.Error())
+ return
+ }
+ }
+
+ c.JSON(http.StatusOK, s._getConfig())
+}
+
+func (s *APIService) _getConfig() APIConfig {
+ cfg := APIConfig{
+ Version: s.Config.Version,
+ APIVersion: s.Config.APIVersion,
+ VersionGitTag: s.Config.VersionGitTag,
+ Servers: []ServerCfg{},
+ }
+
+ for _, svr := range s.xdsServers {
+ cfg.Servers = append(cfg.Servers, ServerCfg{
+ ID: svr.ID,
+ URL: svr.BaseURL,
+ APIURL: svr.APIURL,
+ PartialURL: svr.PartialURL,
+ ConnRetry: svr.ConnRetry,
+ Connected: svr.Connected,
+ Disabled: svr.Disabled,
+ })
+ }
+ return cfg
+}
diff --git a/lib/agent/apiv1-events.go b/lib/agent/apiv1-events.go
new file mode 100644
index 0000000..8aad18a
--- /dev/null
+++ b/lib/agent/apiv1-events.go
@@ -0,0 +1,73 @@
+package agent
+
+import (
+ "net/http"
+
+ "github.com/gin-gonic/gin"
+ common "github.com/iotbzh/xds-common/golib"
+)
+
+// EventRegisterArgs is the parameters (json format) of /events/register command
+type EventRegisterArgs struct {
+ Name string `json:"name"`
+ ProjectID string `json:"filterProjectID"`
+}
+
+// EventUnRegisterArgs is the parameters (json format) of /events/unregister command
+type EventUnRegisterArgs struct {
+ Name string `json:"name"`
+ ID int `json:"id"`
+}
+
+// eventsList Registering for events that will be send over a WS
+func (s *APIService) eventsList(c *gin.Context) {
+ c.JSON(http.StatusOK, s.events.GetList())
+}
+
+// eventsRegister Registering for events that will be send over a WS
+func (s *APIService) eventsRegister(c *gin.Context) {
+ var args EventRegisterArgs
+
+ if c.BindJSON(&args) != nil || args.Name == "" {
+ common.APIError(c, "Invalid arguments")
+ return
+ }
+
+ sess := s.webServer.sessions.Get(c)
+ if sess == nil {
+ common.APIError(c, "Unknown sessions")
+ return
+ }
+
+ // Register to all or to a specific events
+ if err := s.events.Register(args.Name, sess.ID); err != nil {
+ common.APIError(c, err.Error())
+ return
+ }
+
+ c.JSON(http.StatusOK, gin.H{"status": "OK"})
+}
+
+// eventsRegister Registering for events that will be send over a WS
+func (s *APIService) eventsUnRegister(c *gin.Context) {
+ var args EventUnRegisterArgs
+
+ if c.BindJSON(&args) != nil || args.Name == "" {
+ common.APIError(c, "Invalid arguments")
+ return
+ }
+
+ sess := s.webServer.sessions.Get(c)
+ if sess == nil {
+ common.APIError(c, "Unknown sessions")
+ return
+ }
+
+ // Register to all or to a specific events
+ if err := s.events.UnRegister(args.Name, sess.ID); err != nil {
+ common.APIError(c, err.Error())
+ return
+ }
+
+ c.JSON(http.StatusOK, gin.H{"status": "OK"})
+}
diff --git a/lib/agent/apiv1-exec.go b/lib/agent/apiv1-exec.go
new file mode 100644
index 0000000..83ec7aa
--- /dev/null
+++ b/lib/agent/apiv1-exec.go
@@ -0,0 +1,99 @@
+package agent
+
+import (
+ "encoding/json"
+ "io/ioutil"
+ "net/http"
+
+ "github.com/gin-gonic/gin"
+ common "github.com/iotbzh/xds-common/golib"
+)
+
+// Only define useful fields
+type ExecArgs struct {
+ ID string `json:"id" binding:"required"`
+}
+
+// ExecCmd executes remotely a command
+func (s *APIService) execCmd(c *gin.Context) {
+ s._execRequest("/exec", c)
+}
+
+// execSignalCmd executes remotely a command
+func (s *APIService) execSignalCmd(c *gin.Context) {
+ s._execRequest("/signal", c)
+}
+
+func (s *APIService) _execRequest(url string, c *gin.Context) {
+ data, err := c.GetRawData()
+ if err != nil {
+ common.APIError(c, err.Error())
+ }
+
+ // First get Project ID to retrieve Server ID and send command to right server
+ id := c.Param("id")
+ if id == "" {
+ args := ExecArgs{}
+ // XXX - we cannot use c.BindJSON, so directly unmarshall it
+ // (see https://github.com/gin-gonic/gin/issues/1078)
+ if err := json.Unmarshal(data, &args); err != nil {
+ common.APIError(c, "Invalid arguments")
+ return
+ }
+ id = args.ID
+ }
+ prj := s.projects.Get(id)
+ if prj == nil {
+ common.APIError(c, "Unknown id")
+ return
+ }
+
+ svr := (*prj).GetServer()
+ if svr == nil {
+ common.APIError(c, "Cannot identify XDS Server")
+ return
+ }
+
+ // Retrieve session info
+ sess := s.sessions.Get(c)
+ if sess == nil {
+ common.APIError(c, "Unknown sessions")
+ return
+ }
+ sock := sess.IOSocket
+ if sock == nil {
+ common.APIError(c, "Websocket not established")
+ return
+ }
+
+ // Forward XDSServer WS events to client WS
+ // TODO removed static event name list and get it from XDSServer
+ for _, evName := range []string{
+ "exec:input",
+ "exec:output",
+ "exec:exit",
+ "exec:inferior-input",
+ "exec:inferior-output",
+ } {
+ evN := evName
+ svr.EventOn(evN, func(evData interface{}) {
+ (*sock).Emit(evN, evData)
+ })
+ }
+
+ // Forward back command to right server
+ response, err := svr.HTTPPostBody(url, string(data))
+ if err != nil {
+ common.APIError(c, err.Error())
+ return
+ }
+
+ // Decode response
+ body, err := ioutil.ReadAll(response.Body)
+ if err != nil {
+ common.APIError(c, "Cannot read response body")
+ return
+ }
+ c.JSON(http.StatusOK, string(body))
+
+}
diff --git a/lib/agent/apiv1-projects.go b/lib/agent/apiv1-projects.go
new file mode 100644
index 0000000..d4b5e74
--- /dev/null
+++ b/lib/agent/apiv1-projects.go
@@ -0,0 +1,72 @@
+package agent
+
+import (
+ "net/http"
+
+ "github.com/gin-gonic/gin"
+ common "github.com/iotbzh/xds-common/golib"
+)
+
+// getProjects returns all projects configuration
+func (s *APIService) getProjects(c *gin.Context) {
+ c.JSON(http.StatusOK, s.projects.GetProjectArr())
+}
+
+// getProject returns a specific project configuration
+func (s *APIService) getProject(c *gin.Context) {
+ prj := s.projects.Get(c.Param("id"))
+ if prj == nil {
+ common.APIError(c, "Invalid id")
+ return
+ }
+
+ c.JSON(http.StatusOK, (*prj).GetProject())
+}
+
+// addProject adds a new project to server config
+func (s *APIService) addProject(c *gin.Context) {
+ var cfgArg ProjectConfig
+ if c.BindJSON(&cfgArg) != nil {
+ common.APIError(c, "Invalid arguments")
+ return
+ }
+
+ s.Log.Debugln("Add project config: ", cfgArg)
+
+ newFld, err := s.projects.Add(cfgArg)
+ if err != nil {
+ common.APIError(c, err.Error())
+ return
+ }
+
+ c.JSON(http.StatusOK, newFld)
+}
+
+// syncProject force synchronization of project files
+func (s *APIService) syncProject(c *gin.Context) {
+ id := c.Param("id")
+
+ s.Log.Debugln("Sync project id: ", id)
+
+ err := s.projects.ForceSync(id)
+ if err != nil {
+ common.APIError(c, err.Error())
+ return
+ }
+
+ c.JSON(http.StatusOK, "")
+}
+
+// delProject deletes project from server config
+func (s *APIService) delProject(c *gin.Context) {
+ id := c.Param("id")
+
+ s.Log.Debugln("Delete project id ", id)
+
+ delEntry, err := s.projects.Delete(id)
+ if err != nil {
+ common.APIError(c, err.Error())
+ return
+ }
+ c.JSON(http.StatusOK, delEntry)
+}
diff --git a/lib/agent/apiv1-version.go b/lib/agent/apiv1-version.go
new file mode 100644
index 0000000..c2387c1
--- /dev/null
+++ b/lib/agent/apiv1-version.go
@@ -0,0 +1,45 @@
+package agent
+
+import (
+ "net/http"
+
+ "github.com/gin-gonic/gin"
+ common "github.com/iotbzh/xds-common/golib"
+)
+
+type version struct {
+ ID string `json:"id"`
+ Version string `json:"version"`
+ APIVersion string `json:"apiVersion"`
+ VersionGitTag string `json:"gitTag"`
+}
+
+type apiVersion struct {
+ Client version `json:"client"`
+ Server []version `json:"servers"`
+}
+
+// getInfo : return various information about server
+func (s *APIService) getVersion(c *gin.Context) {
+ response := apiVersion{
+ Client: version{
+ ID: "",
+ Version: s.Config.Version,
+ APIVersion: s.Config.APIVersion,
+ VersionGitTag: s.Config.VersionGitTag,
+ },
+ }
+
+ svrVer := []version{}
+ for _, svr := range s.xdsServers {
+ res := version{}
+ if err := svr.HTTPGet("/version", &res); err != nil {
+ common.APIError(c, "Cannot retrieve version of XDS server ID %s : %v", svr.ID, err.Error())
+ return
+ }
+ svrVer = append(svrVer, res)
+ }
+ response.Server = svrVer
+
+ c.JSON(http.StatusOK, response)
+}
diff --git a/lib/agent/apiv1.go b/lib/agent/apiv1.go
new file mode 100644
index 0000000..77b05ba
--- /dev/null
+++ b/lib/agent/apiv1.go
@@ -0,0 +1,129 @@
+package agent
+
+import (
+ "fmt"
+ "strconv"
+
+ "github.com/gin-gonic/gin"
+ "github.com/iotbzh/xds-agent/lib/xdsconfig"
+)
+
+const apiBaseUrl = "/api/v1"
+
+// APIService .
+type APIService struct {
+ *Context
+ apiRouter *gin.RouterGroup
+ serverIndex int
+}
+
+// NewAPIV1 creates a new instance of API service
+func NewAPIV1(ctx *Context) *APIService {
+ s := &APIService{
+ Context: ctx,
+ apiRouter: ctx.webServer.router.Group(apiBaseUrl),
+ serverIndex: 0,
+ }
+
+ s.apiRouter.GET("/version", s.getVersion)
+
+ s.apiRouter.GET("/config", s.getConfig)
+ s.apiRouter.POST("/config", s.setConfig)
+
+ s.apiRouter.GET("/browse", s.browseFS)
+
+ s.apiRouter.GET("/projects", s.getProjects)
+ s.apiRouter.GET("/project/:id", s.getProject)
+ s.apiRouter.POST("/project", s.addProject)
+ s.apiRouter.POST("/project/sync/:id", s.syncProject)
+ s.apiRouter.DELETE("/project/:id", s.delProject)
+
+ s.apiRouter.POST("/exec", s.execCmd)
+ s.apiRouter.POST("/exec/:id", s.execCmd)
+ s.apiRouter.POST("/signal", s.execSignalCmd)
+
+ s.apiRouter.GET("/events", s.eventsList)
+ s.apiRouter.POST("/events/register", s.eventsRegister)
+ s.apiRouter.POST("/events/unregister", s.eventsUnRegister)
+
+ return s
+}
+
+// Stop Used to stop/close created services
+func (s *APIService) Stop() {
+ for _, svr := range s.xdsServers {
+ svr.Close()
+ }
+}
+
+// AddXdsServer Add a new XDS Server to the list of a server
+func (s *APIService) AddXdsServer(cfg xdsconfig.XDSServerConf) (*XdsServer, error) {
+ var svr *XdsServer
+ var exist, tempoID bool
+ tempoID = false
+
+ // First check if not already exist and update it
+ if svr, exist = s.xdsServers[cfg.ID]; exist {
+
+ // Update: Found, so just update some settings
+ svr.ConnRetry = cfg.ConnRetry
+
+ tempoID = svr.IsTempoID()
+ if svr.Connected && !svr.Disabled && svr.BaseURL == cfg.URL && tempoID {
+ return svr, nil
+ }
+
+ // URL differ or not connected, so need to reconnect
+ svr.BaseURL = cfg.URL
+
+ } else {
+
+ // Create a new server object
+ if cfg.APIBaseURL == "" {
+ cfg.APIBaseURL = apiBaseUrl
+ }
+ if cfg.APIPartialURL == "" {
+ cfg.APIPartialURL = "/server/" + strconv.Itoa(s.serverIndex)
+ s.serverIndex = s.serverIndex + 1
+ }
+
+ // Create a new XDS Server
+ svr = NewXdsServer(s.Context, cfg)
+
+ svr.SetLoggerOutput(s.Config.LogVerboseOut)
+
+ // Passthrough routes (handle by XDS Server)
+ grp := s.apiRouter.Group(svr.PartialURL)
+ svr.SetAPIRouterGroup(grp)
+ svr.PassthroughGet("/sdks")
+ svr.PassthroughGet("/sdk/:id")
+ }
+
+ // Established connection
+ err := svr.Connect()
+
+ // Delete temporary ID with it has been replaced by right Server ID
+ if tempoID && !svr.IsTempoID() {
+ delete(s.xdsServers, cfg.ID)
+ }
+
+ // Add to map
+ s.xdsServers[svr.ID] = svr
+
+ // Load projects
+ if err == nil && svr.Connected {
+ err = s.projects.Init(svr)
+ }
+
+ return svr, err
+}
+
+// DelXdsServer Delete an XDS Server from the list of a server
+func (s *APIService) DelXdsServer(id string) error {
+ if _, exist := s.xdsServers[id]; !exist {
+ return fmt.Errorf("Unknown Server ID %s", id)
+ }
+ // Don't really delete, just disable it
+ s.xdsServers[id].Close()
+ return nil
+}
diff --git a/lib/agent/events.go b/lib/agent/events.go
new file mode 100644
index 0000000..24efc5a
--- /dev/null
+++ b/lib/agent/events.go
@@ -0,0 +1,131 @@
+package agent
+
+import (
+ "fmt"
+ "time"
+)
+
+// Events constants
+const (
+ // EventTypePrefix Used as event prefix
+ EventTypePrefix = "event:" // following by event type
+
+ // Supported Events type
+ EVTAll = "all"
+ EVTServerConfig = "server-config" // data type ServerCfg
+ EVTProjectAdd = "project-add" // data type ProjectConfig
+ EVTProjectDelete = "project-delete" // data type ProjectConfig
+ EVTProjectChange = "project-state-change" // data type ProjectConfig
+)
+
+var EVTAllList = []string{
+ EVTServerConfig,
+ EVTProjectAdd,
+ EVTProjectDelete,
+ EVTProjectChange,
+}
+
+// EventMsg Message send
+type EventMsg struct {
+ Time string `json:"time"`
+ Type string `json:"type"`
+ Data interface{} `json:"data"`
+}
+
+type EventDef struct {
+ // SEB cbs []EventsCB
+ sids map[string]int
+}
+
+type Events struct {
+ *Context
+ eventsMap map[string]*EventDef
+}
+
+// NewEvents creates an instance of Events
+func NewEvents(ctx *Context) *Events {
+ evMap := make(map[string]*EventDef)
+ for _, ev := range EVTAllList {
+ evMap[ev] = &EventDef{
+ sids: make(map[string]int),
+ }
+ }
+ return &Events{
+ Context: ctx,
+ eventsMap: evMap,
+ }
+}
+
+// GetList returns the list of all supported events
+func (e *Events) GetList() []string {
+ return EVTAllList
+}
+
+// Register Used by a client/session to register to a specific (or all) event(s)
+func (e *Events) Register(evName, sessionID string) error {
+ evs := EVTAllList
+ if evName != EVTAll {
+ if _, ok := e.eventsMap[evName]; !ok {
+ return fmt.Errorf("Unsupported event type name")
+ }
+ evs = []string{evName}
+ }
+ for _, ev := range evs {
+ e.eventsMap[ev].sids[sessionID]++
+ }
+ return nil
+}
+
+// UnRegister Used by a client/session to unregister event(s)
+func (e *Events) UnRegister(evName, sessionID string) error {
+ evs := EVTAllList
+ if evName != EVTAll {
+ if _, ok := e.eventsMap[evName]; !ok {
+ return fmt.Errorf("Unsupported event type name")
+ }
+ evs = []string{evName}
+ }
+ for _, ev := range evs {
+ if _, exist := e.eventsMap[ev].sids[sessionID]; exist {
+ delete(e.eventsMap[ev].sids, sessionID)
+ break
+ }
+ }
+ return nil
+}
+
+// Emit Used to manually emit an event
+func (e *Events) Emit(evName string, data interface{}) error {
+ var firstErr error
+
+ if _, ok := e.eventsMap[evName]; !ok {
+ return fmt.Errorf("Unsupported event type")
+ }
+
+ e.Log.Debugf("Emit Event %s: %v", evName, data)
+
+ firstErr = nil
+ evm := e.eventsMap[evName]
+ for sid := range evm.sids {
+ so := e.webServer.sessions.IOSocketGet(sid)
+ if so == nil {
+ if firstErr == nil {
+ firstErr = fmt.Errorf("IOSocketGet return nil")
+ }
+ continue
+ }
+ msg := EventMsg{
+ Time: time.Now().String(),
+ Type: evName,
+ Data: data,
+ }
+ if err := (*so).Emit(EventTypePrefix+evName, msg); err != nil {
+ e.Log.Errorf("WS Emit %v error : %v", EventTypePrefix+evName, err)
+ if firstErr == nil {
+ firstErr = err
+ }
+ }
+ }
+
+ return firstErr
+}
diff --git a/lib/agent/project-interface.go b/lib/agent/project-interface.go
new file mode 100644
index 0000000..031e1d9
--- /dev/null
+++ b/lib/agent/project-interface.go
@@ -0,0 +1,47 @@
+package agent
+
+// ProjectType definition
+type ProjectType string
+
+const (
+ TypePathMap = "PathMap"
+ TypeCloudSync = "CloudSync"
+ TypeCifsSmb = "CIFS"
+)
+
+// Project Status definition
+const (
+ StatusErrorConfig = "ErrorConfig"
+ StatusDisable = "Disable"
+ StatusEnable = "Enable"
+ StatusPause = "Pause"
+ StatusSyncing = "Syncing"
+)
+
+type EventCBData map[string]interface{}
+type EventCB func(cfg *ProjectConfig, data *EventCBData)
+
+// IPROJECT Project interface
+type IPROJECT interface {
+ Add(cfg ProjectConfig) (*ProjectConfig, error) // Add a new project
+ Delete() error // Delete a project
+ GetProject() *ProjectConfig // Get project public configuration
+ SetProject(prj ProjectConfig) *ProjectConfig // Set project configuration
+ GetServer() *XdsServer // Get XdsServer that holds this project
+ GetFullPath(dir string) string // Get project full path
+ Sync() error // Force project files synchronization
+ IsInSync() (bool, error) // Check if project files are in-sync
+}
+
+// ProjectConfig is the config for one project
+type ProjectConfig struct {
+ ID string `json:"id"`
+ ServerID string `json:"serverId"`
+ Label string `json:"label"`
+ ClientPath string `json:"clientPath"`
+ ServerPath string `json:"serverPath"`
+ Type ProjectType `json:"type"`
+ Status string `json:"status"`
+ IsInSync bool `json:"isInSync"`
+ DefaultSdk string `json:"defaultSdk"`
+}
diff --git a/lib/agent/project-pathmap.go b/lib/agent/project-pathmap.go
new file mode 100644
index 0000000..1de8e11
--- /dev/null
+++ b/lib/agent/project-pathmap.go
@@ -0,0 +1,79 @@
+package agent
+
+import (
+ "path/filepath"
+)
+
+// IPROJECT interface implementation for native/path mapping projects
+
+// PathMap .
+type PathMap struct {
+ *Context
+ server *XdsServer
+ folder *FolderConfig
+}
+
+// NewProjectPathMap Create a new instance of PathMap
+func NewProjectPathMap(ctx *Context, svr *XdsServer) *PathMap {
+ p := PathMap{
+ Context: ctx,
+ server: svr,
+ folder: &FolderConfig{},
+ }
+ return &p
+}
+
+// Add a new project
+func (p *PathMap) Add(cfg ProjectConfig) (*ProjectConfig, error) {
+ var err error
+
+ // SEB TODO: check local/server directory access
+
+ err = p.server.FolderAdd(p.server.ProjectToFolder(cfg), p.folder)
+ if err != nil {
+ return nil, err
+ }
+
+ return p.GetProject(), nil
+}
+
+// Delete a project
+func (p *PathMap) Delete() error {
+ return p.server.FolderDelete(p.folder.ID)
+}
+
+// GetProject Get public part of project config
+func (p *PathMap) GetProject() *ProjectConfig {
+ prj := p.server.FolderToProject(*p.folder)
+ prj.ServerID = p.server.ID
+ return &prj
+}
+
+// SetProject Set project config
+func (p *PathMap) SetProject(prj ProjectConfig) *ProjectConfig {
+ p.folder = p.server.ProjectToFolder(prj)
+ return p.GetProject()
+}
+
+// GetServer Get the XdsServer that holds this project
+func (p *PathMap) GetServer() *XdsServer {
+ return p.server
+}
+
+// GetFullPath returns the full path of a directory (from server POV)
+func (p *PathMap) GetFullPath(dir string) string {
+ if &dir == nil {
+ return p.folder.DataPathMap.ServerPath
+ }
+ return filepath.Join(p.folder.DataPathMap.ServerPath, dir)
+}
+
+// Sync Force project files synchronization
+func (p *PathMap) Sync() error {
+ return nil
+}
+
+// IsInSync Check if project files are in-sync
+func (p *PathMap) IsInSync() (bool, error) {
+ return true, nil
+}
diff --git a/lib/agent/project-st.go b/lib/agent/project-st.go
new file mode 100644
index 0000000..28a287c
--- /dev/null
+++ b/lib/agent/project-st.go
@@ -0,0 +1,93 @@
+package agent
+
+import "github.com/iotbzh/xds-agent/lib/syncthing"
+
+// SEB TODO
+
+// IPROJECT interface implementation for syncthing projects
+
+// STProject .
+type STProject struct {
+ *Context
+ server *XdsServer
+ folder *FolderConfig
+}
+
+// NewProjectST Create a new instance of STProject
+func NewProjectST(ctx *Context, svr *XdsServer) *STProject {
+ p := STProject{
+ Context: ctx,
+ server: svr,
+ folder: &FolderConfig{},
+ }
+ return &p
+}
+
+// Add a new project
+func (p *STProject) Add(cfg ProjectConfig) (*ProjectConfig, error) {
+ var err error
+
+ err = p.server.FolderAdd(p.server.ProjectToFolder(cfg), p.folder)
+ if err != nil {
+ return nil, err
+ }
+ svrPrj := p.GetProject()
+
+ // Declare project into local Syncthing
+ p.SThg.FolderChange(st.FolderChangeArg{
+ ID: cfg.ID,
+ Label: cfg.Label,
+ RelativePath: cfg.ClientPath,
+ SyncThingID: p.server.ServerConfig.Builder.SyncThingID,
+ })
+
+ return svrPrj, nil
+}
+
+// Delete a project
+func (p *STProject) Delete() error {
+ return p.server.FolderDelete(p.folder.ID)
+}
+
+// GetProject Get public part of project config
+func (p *STProject) GetProject() *ProjectConfig {
+ prj := p.server.FolderToProject(*p.folder)
+ prj.ServerID = p.server.ID
+ return &prj
+}
+
+// SetProject Set project config
+func (p *STProject) SetProject(prj ProjectConfig) *ProjectConfig {
+ // SEB TODO
+ p.folder = p.server.ProjectToFolder(prj)
+ return p.GetProject()
+}
+
+// GetServer Get the XdsServer that holds this project
+func (p *STProject) GetServer() *XdsServer {
+ // SEB TODO
+ return p.server
+}
+
+// GetFullPath returns the full path of a directory (from server POV)
+func (p *STProject) GetFullPath(dir string) string {
+ /* SEB
+ if &dir == nil {
+ return p.folder.DataSTProject.ServerPath
+ }
+ return filepath.Join(p.folder.DataSTProject.ServerPath, dir)
+ */
+ return "SEB TODO"
+}
+
+// Sync Force project files synchronization
+func (p *STProject) Sync() error {
+ // SEB TODO
+ return nil
+}
+
+// IsInSync Check if project files are in-sync
+func (p *STProject) IsInSync() (bool, error) {
+ // SEB TODO
+ return false, nil
+}
diff --git a/lib/agent/projects.go b/lib/agent/projects.go
new file mode 100644
index 0000000..39c120f
--- /dev/null
+++ b/lib/agent/projects.go
@@ -0,0 +1,254 @@
+package agent
+
+import (
+ "fmt"
+ "log"
+ "time"
+
+ "github.com/iotbzh/xds-agent/lib/syncthing"
+ "github.com/syncthing/syncthing/lib/sync"
+)
+
+// Projects Represent a an XDS Projects
+type Projects struct {
+ *Context
+ SThg *st.SyncThing
+ projects map[string]*IPROJECT
+ //SEB registerCB []RegisteredCB
+}
+
+/* SEB
+type RegisteredCB struct {
+ cb *EventCB
+ data *EventCBData
+}
+*/
+
+// Mutex to make add/delete atomic
+var pjMutex = sync.NewMutex()
+
+// NewProjects Create a new instance of Project Model
+func NewProjects(ctx *Context, st *st.SyncThing) *Projects {
+ return &Projects{
+ Context: ctx,
+ SThg: st,
+ projects: make(map[string]*IPROJECT),
+ //registerCB: []RegisteredCB{},
+ }
+}
+
+// Init Load Projects configuration
+func (p *Projects) Init(server *XdsServer) error {
+ svrList := make(map[string]*XdsServer)
+ // If server not set, load for all servers
+ if server == nil {
+ svrList = p.xdsServers
+ } else {
+ svrList[server.ID] = server
+ }
+ errMsg := ""
+ for _, svr := range svrList {
+ if svr.Disabled {
+ continue
+ }
+ xFlds := []FolderConfig{}
+ if err := svr.HTTPGet("/folders", &xFlds); err != nil {
+ errMsg += fmt.Sprintf("Cannot retrieve folders config of XDS server ID %s : %v \n", svr.ID, err.Error())
+ continue
+ }
+ p.Log.Debugf("Server %s, %d projects detected", svr.ID[:8], len(xFlds))
+ for _, prj := range xFlds {
+ newP := svr.FolderToProject(prj)
+ if /*nPrj*/ _, err := p.createUpdate(newP, false, true); err != nil {
+ errMsg += "Error while creating project id " + prj.ID + ": " + err.Error() + "\n "
+ continue
+ }
+
+ /* FIXME emit EVTProjectChange event ?
+ if err := p.events.Emit(EVTProjectChange, *nPrj); err != nil {
+ p.Log.Warningf("Cannot notify project change: %v", err)
+ }
+ */
+ }
+
+ }
+
+ p.Log.Infof("Number of loaded Projects: %d", len(p.projects))
+
+ if errMsg != "" {
+ return fmt.Errorf(errMsg)
+ }
+ return nil
+}
+
+// Get returns the folder config or nil if not existing
+func (p *Projects) Get(id string) *IPROJECT {
+ if id == "" {
+ return nil
+ }
+ fc, exist := p.projects[id]
+ if !exist {
+ return nil
+ }
+ return fc
+}
+
+// GetProjectArr returns the config of all folders as an array
+func (p *Projects) GetProjectArr() []ProjectConfig {
+ pjMutex.Lock()
+ defer pjMutex.Unlock()
+
+ return p.GetProjectArrUnsafe()
+}
+
+// GetProjectArrUnsafe Same as GetProjectArr without mutex protection
+func (p *Projects) GetProjectArrUnsafe() []ProjectConfig {
+ conf := []ProjectConfig{}
+ for _, v := range p.projects {
+ prj := (*v).GetProject()
+ conf = append(conf, *prj)
+ }
+ return conf
+}
+
+// Add adds a new folder
+func (p *Projects) Add(newF ProjectConfig) (*ProjectConfig, error) {
+ prj, err := p.createUpdate(newF, true, false)
+ if err != nil {
+ return prj, err
+ }
+
+ // Notify client with event
+ if err := p.events.Emit(EVTProjectAdd, *prj); err != nil {
+ p.Log.Warningf("Cannot notify project deletion: %v", err)
+ }
+
+ return prj, err
+}
+
+// CreateUpdate creates or update a folder
+func (p *Projects) createUpdate(newF ProjectConfig, create bool, initial bool) (*ProjectConfig, error) {
+ var err error
+
+ pjMutex.Lock()
+ defer pjMutex.Unlock()
+
+ // Sanity check
+ if _, exist := p.projects[newF.ID]; create && exist {
+ return nil, fmt.Errorf("ID already exists")
+ }
+ if newF.ClientPath == "" {
+ return nil, fmt.Errorf("ClientPath must be set")
+ }
+ if newF.ServerID == "" {
+ return nil, fmt.Errorf("Server ID must be set")
+ }
+ var svr *XdsServer
+ var exist bool
+ if svr, exist = p.xdsServers[newF.ServerID]; !exist {
+ return nil, fmt.Errorf("Unknown Server ID %s", newF.ServerID)
+ }
+
+ // Check type supported
+ b, exist := svr.ServerConfig.SupportedSharing[string(newF.Type)]
+ if !exist || !b {
+ return nil, fmt.Errorf("Server doesn't support project type %s", newF.Type)
+ }
+
+ // Create a new folder object
+ var fld IPROJECT
+ switch newF.Type {
+ // SYNCTHING
+ case TypeCloudSync:
+ if p.SThg != nil {
+ /*SEB fld = f.SThg.NewFolderST(f.Conf)*/
+ fld = NewProjectST(p.Context, svr)
+ } else {
+ return nil, fmt.Errorf("Cloud Sync project not supported")
+ }
+
+ // PATH MAP
+ case TypePathMap:
+ fld = NewProjectPathMap(p.Context, svr)
+ default:
+ return nil, fmt.Errorf("Unsupported folder type")
+ }
+
+ var newPrj *ProjectConfig
+ if create {
+ // Add project on server
+ if newPrj, err = fld.Add(newF); err != nil {
+ newF.Status = StatusErrorConfig
+ log.Printf("ERROR Adding folder: %v\n", err)
+ return newPrj, err
+ }
+ } else {
+ // Just update project config
+ newPrj = fld.SetProject(newF)
+ }
+
+ // Sanity check
+ if newPrj.ID == "" {
+ log.Printf("ERROR project ID empty: %v", newF)
+ return newPrj, fmt.Errorf("Project ID empty")
+ }
+
+ // Add to folders list
+ p.projects[newPrj.ID] = &fld
+
+ // Force sync after creation
+ // (need to defer to be sure that WS events will arrive after HTTP creation reply)
+ go func() {
+ time.Sleep(time.Millisecond * 500)
+ fld.Sync()
+ }()
+
+ return newPrj, nil
+}
+
+// Delete deletes a specific folder
+func (p *Projects) Delete(id string) (ProjectConfig, error) {
+ var err error
+
+ pjMutex.Lock()
+ defer pjMutex.Unlock()
+
+ fld := ProjectConfig{}
+ fc, exist := p.projects[id]
+ if !exist {
+ return fld, fmt.Errorf("unknown id")
+ }
+
+ prj := (*fc).GetProject()
+
+ if err = (*fc).Delete(); err != nil {
+ return *prj, err
+ }
+
+ delete(p.projects, id)
+
+ // Notify client with event
+ if err := p.events.Emit(EVTProjectDelete, *prj); err != nil {
+ p.Log.Warningf("Cannot notify project deletion: %v", err)
+ }
+
+ return *prj, err
+}
+
+// ForceSync Force the synchronization of a folder
+func (p *Projects) ForceSync(id string) error {
+ fc := p.Get(id)
+ if fc == nil {
+ return fmt.Errorf("Unknown id")
+ }
+ return (*fc).Sync()
+}
+
+// IsProjectInSync Returns true when folder is in sync
+func (p *Projects) IsProjectInSync(id string) (bool, error) {
+ fc := p.Get(id)
+ if fc == nil {
+ return false, fmt.Errorf("Unknown id")
+ }
+ return (*fc).IsInSync()
+}
diff --git a/lib/agent/session.go b/lib/agent/session.go
new file mode 100644
index 0000000..e50abe1
--- /dev/null
+++ b/lib/agent/session.go
@@ -0,0 +1,224 @@
+package agent
+
+import (
+ "encoding/base64"
+ "strconv"
+ "time"
+
+ "github.com/gin-gonic/gin"
+ "github.com/googollee/go-socket.io"
+ uuid "github.com/satori/go.uuid"
+ "github.com/syncthing/syncthing/lib/sync"
+)
+
+const sessionCookieName = "xds-agent-sid"
+const sessionHeaderName = "XDS-AGENT-SID"
+
+const sessionMonitorTime = 10 // Time (in seconds) to schedule monitoring session tasks
+
+const initSessionMaxAge = 10 // Initial session max age in seconds
+const maxSessions = 100000 // Maximum number of sessions in sessMap map
+
+const secureCookie = false // TODO: see https://github.com/astaxie/beego/blob/master/session/session.go#L218
+
+// ClientSession contains the info of a user/client session
+type ClientSession struct {
+ ID string
+ WSID string // only one WebSocket per client/session
+ MaxAge int64
+ IOSocket *socketio.Socket
+
+ // private
+ expireAt time.Time
+ useCount int64
+}
+
+// Sessions holds client sessions
+type Sessions struct {
+ *Context
+ cookieMaxAge int64
+ sessMap map[string]ClientSession
+ mutex sync.Mutex
+ stop chan struct{} // signals intentional stop
+}
+
+// NewClientSessions .
+func NewClientSessions(ctx *Context, cookieMaxAge string) *Sessions {
+ ckMaxAge, err := strconv.ParseInt(cookieMaxAge, 10, 0)
+ if err != nil {
+ ckMaxAge = 0
+ }
+ s := Sessions{
+ Context: ctx,
+ cookieMaxAge: ckMaxAge,
+ sessMap: make(map[string]ClientSession),
+ mutex: sync.NewMutex(),
+ stop: make(chan struct{}),
+ }
+ s.webServer.router.Use(s.Middleware())
+
+ // Start monitoring of sessions Map (use to manage expiration and cleanup)
+ go s.monitorSessMap()
+
+ return &s
+}
+
+// Stop sessions management
+func (s *Sessions) Stop() {
+ close(s.stop)
+}
+
+// Middleware is used to managed session
+func (s *Sessions) Middleware() gin.HandlerFunc {
+ return func(c *gin.Context) {
+ // FIXME Add CSRF management
+
+ // Get session
+ sess := s.Get(c)
+ if sess == nil {
+ // Allocate a new session key and put in cookie
+ sess = s.newSession("")
+ } else {
+ s.refresh(sess.ID)
+ }
+
+ // Set session in cookie and in header
+ // Do not set Domain to localhost (http://stackoverflow.com/questions/1134290/cookies-on-localhost-with-explicit-domain)
+ c.SetCookie(sessionCookieName, sess.ID, int(sess.MaxAge), "/", "",
+ secureCookie, false)
+ c.Header(sessionHeaderName, sess.ID)
+
+ // Save session id in gin metadata
+ c.Set(sessionCookieName, sess.ID)
+
+ c.Next()
+ }
+}
+
+// Get returns the client session for a specific ID
+func (s *Sessions) Get(c *gin.Context) *ClientSession {
+ var sid string
+
+ // First get from gin metadata
+ v, exist := c.Get(sessionCookieName)
+ if v != nil {
+ sid = v.(string)
+ }
+
+ // Then look in cookie
+ if !exist || sid == "" {
+ sid, _ = c.Cookie(sessionCookieName)
+ }
+
+ // Then look in Header
+ if sid == "" {
+ sid = c.Request.Header.Get(sessionCookieName)
+ }
+ if sid != "" {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ if key, ok := s.sessMap[sid]; ok {
+ // TODO: return a copy ???
+ return &key
+ }
+ }
+ return nil
+}
+
+// IOSocketGet Get socketio definition from sid
+func (s *Sessions) IOSocketGet(sid string) *socketio.Socket {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ sess, ok := s.sessMap[sid]
+ if ok {
+ return sess.IOSocket
+ }
+ return nil
+}
+
+// UpdateIOSocket updates the IO Socket definition for of a session
+func (s *Sessions) UpdateIOSocket(sid string, so *socketio.Socket) error {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ if _, ok := s.sessMap[sid]; ok {
+ sess := s.sessMap[sid]
+ if so == nil {
+ // Could be the case when socketio is closed/disconnected
+ sess.WSID = ""
+ } else {
+ sess.WSID = (*so).Id()
+ }
+ sess.IOSocket = so
+ s.sessMap[sid] = sess
+ }
+ return nil
+}
+
+// nesSession Allocate a new client session
+func (s *Sessions) newSession(prefix string) *ClientSession {
+ uuid := prefix + uuid.NewV4().String()
+ id := base64.URLEncoding.EncodeToString([]byte(uuid))
+ se := ClientSession{
+ ID: id,
+ WSID: "",
+ MaxAge: initSessionMaxAge,
+ IOSocket: nil,
+ expireAt: time.Now().Add(time.Duration(initSessionMaxAge) * time.Second),
+ useCount: 0,
+ }
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+
+ s.sessMap[se.ID] = se
+
+ s.Log.Debugf("NEW session (%d): %s", len(s.sessMap), id)
+ return &se
+}
+
+// refresh Move this session ID to the head of the list
+func (s *Sessions) refresh(sid string) {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+
+ sess := s.sessMap[sid]
+ sess.useCount++
+ if sess.MaxAge < s.cookieMaxAge && sess.useCount > 1 {
+ sess.MaxAge = s.cookieMaxAge
+ sess.expireAt = time.Now().Add(time.Duration(sess.MaxAge) * time.Second)
+ }
+
+ // TODO - Add flood detection (like limit_req of nginx)
+ // (delayed request when to much requests in a short period of time)
+
+ s.sessMap[sid] = sess
+}
+
+func (s *Sessions) monitorSessMap() {
+ const dbgFullTrace = false // for debugging
+
+ for {
+ select {
+ case <-s.stop:
+ s.Log.Debugln("Stop monitorSessMap")
+ return
+ case <-time.After(sessionMonitorTime * time.Second):
+ if dbgFullTrace {
+ s.Log.Debugf("Sessions Map size: %d", len(s.sessMap))
+ s.Log.Debugf("Sessions Map : %v", s.sessMap)
+ }
+
+ if len(s.sessMap) > maxSessions {
+ s.Log.Errorln("TOO MUCH sessions, cleanup old ones !")
+ }
+
+ s.mutex.Lock()
+ for _, ss := range s.sessMap {
+ if ss.expireAt.Sub(time.Now()) < 0 {
+ //SEB DEBUG s.Log.Debugf("Delete expired session id: %s", ss.ID)
+ delete(s.sessMap, ss.ID)
+ }
+ }
+ s.mutex.Unlock()
+ }
+ }
+}
diff --git a/lib/agent/webserver.go b/lib/agent/webserver.go
new file mode 100644
index 0000000..ead06d1
--- /dev/null
+++ b/lib/agent/webserver.go
@@ -0,0 +1,246 @@
+package agent
+
+import (
+ "fmt"
+ "log"
+ "net/http"
+ "os"
+ "path"
+
+ "github.com/Sirupsen/logrus"
+ "github.com/gin-contrib/static"
+ "github.com/gin-gonic/gin"
+ "github.com/googollee/go-socket.io"
+)
+
+// WebServer .
+type WebServer struct {
+ *Context
+ router *gin.Engine
+ api *APIService
+ sIOServer *socketio.Server
+ webApp *gin.RouterGroup
+ stop chan struct{} // signals intentional stop
+}
+
+const indexFilename = "index.html"
+
+// NewWebServer creates an instance of WebServer
+func NewWebServer(ctx *Context) *WebServer {
+
+ // Setup logging for gin router
+ if ctx.Log.Level == logrus.DebugLevel {
+ gin.SetMode(gin.DebugMode)
+ } else {
+ gin.SetMode(gin.ReleaseMode)
+ }
+
+ // Redirect gin logs into another logger (LogVerboseOut may be stderr or a file)
+ gin.DefaultWriter = ctx.Config.LogVerboseOut
+ gin.DefaultErrorWriter = ctx.Config.LogVerboseOut
+ log.SetOutput(ctx.Config.LogVerboseOut)
+
+ // Creates gin router
+ r := gin.New()
+
+ svr := &WebServer{
+ Context: ctx,
+ router: r,
+ api: nil,
+ sIOServer: nil,
+ webApp: nil,
+ stop: make(chan struct{}),
+ }
+
+ return svr
+}
+
+// Serve starts a new instance of the Web Server
+func (s *WebServer) Serve() error {
+ var err error
+
+ // Setup middlewares
+ s.router.Use(gin.Logger())
+ s.router.Use(gin.Recovery())
+ s.router.Use(s.middlewareCORS())
+ s.router.Use(s.middlewareXDSDetails())
+ s.router.Use(s.middlewareCSRF())
+
+ // Create REST API
+ s.api = NewAPIV1(s.Context)
+
+ // Create connections to XDS Servers
+ // XXX - not sure there is no side effect to do it in background !
+ go func() {
+ for _, svrCfg := range s.Config.FileConf.ServersConf {
+ if svr, err := s.api.AddXdsServer(svrCfg); err != nil {
+ // Just log error, don't consider as critical
+ s.Log.Infof("Cannot connect to XDS Server url=%s: %v", svr.BaseURL, err.Error())
+ }
+ }
+ }()
+
+ // Websocket routes
+ s.sIOServer, err = socketio.NewServer(nil)
+ if err != nil {
+ s.Log.Fatalln(err)
+ }
+
+ s.router.GET("/socket.io/", s.socketHandler)
+ s.router.POST("/socket.io/", s.socketHandler)
+ /* TODO: do we want to support ws://... ?
+ s.router.Handle("WS", "/socket.io/", s.socketHandler)
+ s.router.Handle("WSS", "/socket.io/", s.socketHandler)
+ */
+
+ // Web Application (serve on / )
+ idxFile := path.Join(s.Config.FileConf.WebAppDir, indexFilename)
+ if _, err := os.Stat(idxFile); err != nil {
+ s.Log.Fatalln("Web app directory not found, check/use webAppDir setting in config file: ", idxFile)
+ }
+ s.Log.Infof("Serve WEB app dir: %s", s.Config.FileConf.WebAppDir)
+ s.router.Use(static.Serve("/", static.LocalFile(s.Config.FileConf.WebAppDir, true)))
+ s.webApp = s.router.Group("/", s.serveIndexFile)
+ {
+ s.webApp.GET("/")
+ }
+
+ // Serve in the background
+ serveError := make(chan error, 1)
+ go func() {
+ fmt.Printf("Web Server running on localhost:%s ...\n", s.Config.FileConf.HTTPPort)
+ serveError <- http.ListenAndServe(":"+s.Config.FileConf.HTTPPort, s.router)
+ }()
+
+ fmt.Printf("XDS agent running...\n")
+
+ // Wait for stop, restart or error signals
+ select {
+ case <-s.stop:
+ // Shutting down permanently
+ s.sessions.Stop()
+ s.Log.Infoln("shutting down (stop)")
+ case err = <-serveError:
+ // Error due to listen/serve failure
+ s.Log.Errorln(err)
+ }
+
+ return nil
+}
+
+// Stop web server
+func (s *WebServer) Stop() {
+ s.api.Stop()
+ close(s.stop)
+}
+
+// serveIndexFile provides initial file (eg. index.html) of webapp
+func (s *WebServer) serveIndexFile(c *gin.Context) {
+ c.HTML(200, indexFilename, gin.H{})
+}
+
+// Add details in Header
+func (s *WebServer) middlewareXDSDetails() gin.HandlerFunc {
+ return func(c *gin.Context) {
+ c.Header("XDS-Agent-Version", s.Config.Version)
+ c.Header("XDS-API-Version", s.Config.APIVersion)
+ c.Next()
+ }
+}
+
+/* SEB
+func (s *WebServer) isValidAPIKey(key string) bool {
+ return (key == s.Config.FileConf.XDSAPIKey && key != "")
+}
+*/
+
+func (s *WebServer) middlewareCSRF() gin.HandlerFunc {
+ return func(c *gin.Context) {
+ // XXX - not used for now
+ c.Next()
+ return
+ /*
+ // Allow requests carrying a valid API key
+ if s.isValidAPIKey(c.Request.Header.Get("X-API-Key")) {
+ // Set the access-control-allow-origin header for CORS requests
+ // since a valid API key has been provided
+ c.Header("Access-Control-Allow-Origin", "*")
+ c.Next()
+ return
+ }
+
+ // Allow io.socket request
+ if strings.HasPrefix(c.Request.URL.Path, "/socket.io") {
+ c.Next()
+ return
+ }
+
+ // FIXME Add really CSRF support
+
+ // Allow requests for anything not under the protected path prefix,
+ // and set a CSRF cookie if there isn't already a valid one.
+ //if !strings.HasPrefix(c.Request.URL.Path, prefix) {
+ // cookie, err := c.Cookie("CSRF-Token-" + unique)
+ // if err != nil || !validCsrfToken(cookie.Value) {
+ // s.Log.Debugln("new CSRF cookie in response to request for", c.Request.URL)
+ // c.SetCookie("CSRF-Token-"+unique, newCsrfToken(), 600, "/", "", false, false)
+ // }
+ // c.Next()
+ // return
+ //}
+
+ // Verify the CSRF token
+ //token := c.Request.Header.Get("X-CSRF-Token-" + unique)
+ //if !validCsrfToken(token) {
+ // c.AbortWithError(403, "CSRF Error")
+ // return
+ //}
+
+ //c.Next()
+
+ c.AbortWithError(403, fmt.Errorf("Not valid API key"))
+ */
+ }
+}
+
+// CORS middleware
+func (s *WebServer) middlewareCORS() gin.HandlerFunc {
+ return func(c *gin.Context) {
+ if c.Request.Method == "OPTIONS" {
+ c.Header("Access-Control-Allow-Origin", "*")
+ c.Header("Access-Control-Allow-Headers", "Content-Type, X-API-Key")
+ c.Header("Access-Control-Allow-Methods", "GET, POST, DELETE")
+ c.Header("Access-Control-Max-Age", cookieMaxAge)
+ c.AbortWithStatus(204)
+ return
+ }
+ c.Next()
+ }
+}
+
+// socketHandler is the handler for the "main" websocket connection
+func (s *WebServer) socketHandler(c *gin.Context) {
+
+ // Retrieve user session
+ sess := s.sessions.Get(c)
+ if sess == nil {
+ c.JSON(500, gin.H{"error": "Cannot retrieve session"})
+ return
+ }
+
+ s.sIOServer.On("connection", func(so socketio.Socket) {
+ s.Log.Debugf("WS Connected (SID=%v)", so.Id())
+ s.sessions.UpdateIOSocket(sess.ID, &so)
+
+ so.On("disconnection", func() {
+ s.Log.Debugf("WS disconnected (SID=%v)", so.Id())
+ s.sessions.UpdateIOSocket(sess.ID, nil)
+ })
+ })
+
+ s.sIOServer.On("error", func(so socketio.Socket, err error) {
+ s.Log.Errorf("WS SID=%v Error : %v", so.Id(), err.Error())
+ })
+
+ s.sIOServer.ServeHTTP(c.Writer, c.Request)
+}
diff --git a/lib/agent/xdsserver.go b/lib/agent/xdsserver.go
new file mode 100644
index 0000000..014415f
--- /dev/null
+++ b/lib/agent/xdsserver.go
@@ -0,0 +1,472 @@
+package agent
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "strings"
+ "time"
+
+ "github.com/gin-gonic/gin"
+ "github.com/iotbzh/xds-agent/lib/xdsconfig"
+ common "github.com/iotbzh/xds-common/golib"
+ uuid "github.com/satori/go.uuid"
+ sio_client "github.com/zhouhui8915/go-socket.io-client"
+)
+
+// Server .
+type XdsServer struct {
+ *Context
+ ID string
+ BaseURL string
+ APIURL string
+ PartialURL string
+ ConnRetry int
+ Connected bool
+ Disabled bool
+ ServerConfig *xdsServerConfig
+
+ // callbacks
+ CBOnError func(error)
+ CBOnDisconnect func(error)
+
+ // Private fields
+ client *common.HTTPClient
+ ioSock *sio_client.Client
+ logOut io.Writer
+ apiRouter *gin.RouterGroup
+}
+
+// xdsServerConfig Data return by GET /config
+type xdsServerConfig struct {
+ ID string `json:"id"`
+ Version string `json:"version"`
+ APIVersion string `json:"apiVersion"`
+ VersionGitTag string `json:"gitTag"`
+ SupportedSharing map[string]bool `json:"supportedSharing"`
+ Builder xdsBuilderConfig `json:"builder"`
+}
+
+// xdsBuilderConfig represents the builder container configuration
+type xdsBuilderConfig struct {
+ IP string `json:"ip"`
+ Port string `json:"port"`
+ SyncThingID string `json:"syncThingID"`
+}
+
+// FolderType XdsServer folder type
+type FolderType string
+
+const (
+ XdsTypePathMap = "PathMap"
+ XdsTypeCloudSync = "CloudSync"
+ XdsTypeCifsSmb = "CIFS"
+)
+
+// FolderConfig XdsServer folder config
+type FolderConfig struct {
+ ID string `json:"id"`
+ Label string `json:"label"`
+ ClientPath string `json:"path"`
+ Type FolderType `json:"type"`
+ Status string `json:"status"`
+ IsInSync bool `json:"isInSync"`
+ DefaultSdk string `json:"defaultSdk"`
+ // Specific data depending on which Type is used
+ DataPathMap PathMapConfig `json:"dataPathMap,omitempty"`
+ DataCloudSync CloudSyncConfig `json:"dataCloudSync,omitempty"`
+}
+
+// PathMapConfig Path mapping specific data
+type PathMapConfig struct {
+ ServerPath string `json:"serverPath"`
+}
+
+// CloudSyncConfig CloudSync (AKA Syncthing) specific data
+type CloudSyncConfig struct {
+ SyncThingID string `json:"syncThingID"`
+}
+
+const _IDTempoPrefix = "tempo-"
+
+// NewXdsServer creates an instance of XdsServer
+func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
+ return &XdsServer{
+ Context: ctx,
+ ID: _IDTempoPrefix + uuid.NewV1().String(),
+ BaseURL: conf.URL,
+ APIURL: conf.APIBaseURL + conf.APIPartialURL,
+ PartialURL: conf.APIPartialURL,
+ ConnRetry: conf.ConnRetry,
+ Connected: false,
+ Disabled: false,
+
+ logOut: ctx.Log.Out,
+ }
+}
+
+// Close Free and close XDS Server connection
+func (xs *XdsServer) Close() error {
+ xs.Connected = false
+ xs.Disabled = true
+ xs.ioSock = nil
+ xs._NotifyState()
+ return nil
+}
+
+// Connect Establish HTTP connection with XDS Server
+func (xs *XdsServer) Connect() error {
+ var err error
+ var retry int
+
+ xs.Disabled = false
+ xs.Connected = false
+
+ err = nil
+ for retry = xs.ConnRetry; retry > 0; retry-- {
+ if err = xs._CreateConnectHTTP(); err == nil {
+ break
+ }
+ if retry == xs.ConnRetry {
+ // Notify only on the first conn error
+ // doing that avoid 2 notifs (conn false; conn true) on startup
+ xs._NotifyState()
+ }
+ xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
+ time.Sleep(time.Second)
+ }
+ if retry == 0 {
+ // FIXME SEB: re-use _reconnect to wait longer in background
+ return fmt.Errorf("Connection to XDS Server failure")
+ }
+ if err != nil {
+ return err
+ }
+
+ // Check HTTP connection and establish WS connection
+ err = xs._connect(false)
+
+ return err
+}
+
+// IsTempoID returns true when server as a temporary id
+func (xs *XdsServer) IsTempoID() bool {
+ return strings.HasPrefix(xs.ID, _IDTempoPrefix)
+}
+
+// SetLoggerOutput Set logger ou
+func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
+ xs.logOut = out
+}
+
+// FolderAdd Send POST request to add a folder
+func (xs *XdsServer) FolderAdd(prj *FolderConfig, res interface{}) error {
+ response, err := xs.HTTPPost("/folder", prj)
+ if err != nil {
+ return err
+ }
+ if response.StatusCode != 200 {
+ return fmt.Errorf("FolderAdd error status=%s", response.Status)
+ }
+ // Result is a FolderConfig that is equivalent to ProjectConfig
+ err = json.Unmarshal(xs.client.ResponseToBArray(response), res)
+
+ return err
+}
+
+// FolderDelete Send DELETE request to delete a folder
+func (xs *XdsServer) FolderDelete(id string) error {
+ return xs.client.HTTPDelete("/folder/" + id)
+}
+
+// HTTPGet .
+func (xs *XdsServer) HTTPGet(url string, data interface{}) error {
+ var dd []byte
+ if err := xs.client.HTTPGet(url, &dd); err != nil {
+ return err
+ }
+ return json.Unmarshal(dd, &data)
+}
+
+// HTTPPost .
+func (xs *XdsServer) HTTPPost(url string, data interface{}) (*http.Response, error) {
+ body, err := json.Marshal(data)
+ if err != nil {
+ return nil, err
+ }
+ return xs.HTTPPostBody(url, string(body))
+}
+
+// HTTPPostBody .
+func (xs *XdsServer) HTTPPostBody(url string, body string) (*http.Response, error) {
+ return xs.client.HTTPPostWithRes(url, body)
+}
+
+// SetAPIRouterGroup .
+func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
+ xs.apiRouter = r
+}
+
+// PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
+func (xs *XdsServer) PassthroughGet(url string) {
+ if xs.apiRouter == nil {
+ xs.Log.Errorf("apiRouter not set !")
+ return
+ }
+
+ xs.apiRouter.GET(url, func(c *gin.Context) {
+ var data interface{}
+ if err := xs.HTTPGet(url, &data); err != nil {
+ if strings.Contains(err.Error(), "connection refused") {
+ xs.Connected = false
+ xs._NotifyState()
+ }
+ common.APIError(c, err.Error())
+ return
+ }
+
+ c.JSON(http.StatusOK, data)
+ })
+}
+
+// PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
+func (xs *XdsServer) PassthroughPost(url string) {
+ if xs.apiRouter == nil {
+ xs.Log.Errorf("apiRouter not set !")
+ return
+ }
+
+ xs.apiRouter.POST(url, func(c *gin.Context) {
+ bodyReq := []byte{}
+ n, err := c.Request.Body.Read(bodyReq)
+ if err != nil {
+ common.APIError(c, err.Error())
+ return
+ }
+
+ response, err := xs.HTTPPostBody(url, string(bodyReq[:n]))
+ if err != nil {
+ common.APIError(c, err.Error())
+ return
+ }
+ bodyRes, err := ioutil.ReadAll(response.Body)
+ if err != nil {
+ common.APIError(c, "Cannot read response body")
+ return
+ }
+ c.JSON(http.StatusOK, string(bodyRes))
+ })
+}
+
+// EventOn Register a callback on events reception
+func (xs *XdsServer) EventOn(message string, f interface{}) (err error) {
+ if xs.ioSock == nil {
+ return fmt.Errorf("Io.Socket not initialized")
+ }
+ // FIXME SEB: support chain / multiple listeners
+ /* sockEvents map[string][]*caller
+ xs.sockEventsLock.Lock()
+ xs.sockEvents[message] = append(xs.sockEvents[message], f)
+ xs.sockEventsLock.Unlock()
+ xs.ioSock.On(message, func(ev) {
+
+ })
+ */
+ return xs.ioSock.On(message, f)
+}
+
+// ProjectToFolder
+func (xs *XdsServer) ProjectToFolder(pPrj ProjectConfig) *FolderConfig {
+ stID := ""
+ if pPrj.Type == XdsTypeCloudSync {
+ stID, _ = xs.SThg.IDGet()
+ }
+ fPrj := FolderConfig{
+ ID: pPrj.ID,
+ Label: pPrj.Label,
+ ClientPath: pPrj.ClientPath,
+ Type: FolderType(pPrj.Type),
+ Status: pPrj.Status,
+ IsInSync: pPrj.IsInSync,
+ DefaultSdk: pPrj.DefaultSdk,
+ DataPathMap: PathMapConfig{
+ ServerPath: pPrj.ServerPath,
+ },
+ DataCloudSync: CloudSyncConfig{
+ SyncThingID: stID,
+ },
+ }
+ return &fPrj
+}
+
+// FolderToProject
+func (xs *XdsServer) FolderToProject(fPrj FolderConfig) ProjectConfig {
+ pPrj := ProjectConfig{
+ ID: fPrj.ID,
+ ServerID: xs.ID,
+ Label: fPrj.Label,
+ ClientPath: fPrj.ClientPath,
+ ServerPath: fPrj.DataPathMap.ServerPath,
+ Type: ProjectType(fPrj.Type),
+ Status: fPrj.Status,
+ IsInSync: fPrj.IsInSync,
+ DefaultSdk: fPrj.DefaultSdk,
+ }
+ return pPrj
+}
+
+/***
+** Private functions
+***/
+
+// Create HTTP client
+func (xs *XdsServer) _CreateConnectHTTP() error {
+ var err error
+ xs.client, err = common.HTTPNewClient(xs.BaseURL,
+ common.HTTPClientConfig{
+ URLPrefix: "/api/v1",
+ HeaderClientKeyName: "Xds-Sid",
+ CsrfDisable: true,
+ LogOut: xs.logOut,
+ LogPrefix: "XDSSERVER: ",
+ LogLevel: common.HTTPLogLevelWarning,
+ })
+
+ xs.client.SetLogLevel(xs.Log.Level.String())
+
+ if err != nil {
+ msg := ": " + err.Error()
+ if strings.Contains(err.Error(), "connection refused") {
+ msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
+ }
+ return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
+ }
+ if xs.client == nil {
+ return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
+ }
+
+ return nil
+}
+
+// Re-established connection
+func (xs *XdsServer) _reconnect() error {
+ err := xs._connect(true)
+ if err == nil {
+ // Reload projects list for this server
+ err = xs.projects.Init(xs)
+ }
+ return err
+}
+
+// Established HTTP and WS connection and retrieve XDSServer config
+func (xs *XdsServer) _connect(reConn bool) error {
+
+ xdsCfg := xdsServerConfig{}
+ if err := xs.HTTPGet("/config", &xdsCfg); err != nil {
+ xs.Connected = false
+ if !reConn {
+ xs._NotifyState()
+ }
+ return err
+ }
+
+ if reConn && xs.ID != xdsCfg.ID {
+ xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ID)
+ }
+
+ // Update local XDS config
+ xs.ID = xdsCfg.ID
+ xs.ServerConfig = &xdsCfg
+
+ // Establish WS connection and register listen
+ if err := xs._SocketConnect(); err != nil {
+ xs.Connected = false
+ xs._NotifyState()
+ return err
+ }
+
+ xs.Connected = true
+ xs._NotifyState()
+ return nil
+}
+
+// Create WebSocket (io.socket) connection
+func (xs *XdsServer) _SocketConnect() error {
+
+ xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
+
+ opts := &sio_client.Options{
+ Transport: "websocket",
+ Header: make(map[string][]string),
+ }
+ opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
+
+ iosk, err := sio_client.NewClient(xs.BaseURL, opts)
+ if err != nil {
+ return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
+ }
+ xs.ioSock = iosk
+
+ // Register some listeners
+
+ iosk.On("error", func(err error) {
+ xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
+ if xs.CBOnError != nil {
+ xs.CBOnError(err)
+ }
+ })
+
+ iosk.On("disconnection", func(err error) {
+ xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
+ if xs.CBOnDisconnect != nil {
+ xs.CBOnDisconnect(err)
+ }
+ xs.Connected = false
+ xs._NotifyState()
+
+ // Try to reconnect during 15min (or at least while not disabled)
+ go func() {
+ count := 0
+ waitTime := 1
+ for !xs.Disabled && !xs.Connected {
+ count++
+ if count%60 == 0 {
+ waitTime *= 5
+ }
+ if waitTime > 15*60 {
+ xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
+ return
+ }
+ time.Sleep(time.Second * time.Duration(waitTime))
+ xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
+
+ xs._reconnect()
+ }
+ }()
+ })
+
+ // XXX - There is no connection event generated so, just consider that
+ // we are connected when NewClient return successfully
+ /* iosk.On("connection", func() { ... }) */
+ xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
+
+ return nil
+}
+
+// Send event to notify changes
+func (xs *XdsServer) _NotifyState() {
+
+ evSts := ServerCfg{
+ ID: xs.ID,
+ URL: xs.BaseURL,
+ APIURL: xs.APIURL,
+ PartialURL: xs.PartialURL,
+ ConnRetry: xs.ConnRetry,
+ Connected: xs.Connected,
+ }
+ if err := xs.events.Emit(EVTServerConfig, evSts); err != nil {
+ xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)
+ }
+}