summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSebastien Douheret <sebastien.douheret@iot.bzh>2017-05-27 23:10:33 +0200
committerSebastien Douheret <sebastien.douheret@iot.bzh>2017-05-27 23:10:33 +0200
commit330cffa06c3efea42a42ca8e908b8b24db6bba3f (patch)
tree84e444d3ab66a1b8334c9c94f853a384ae34de84
parent0805255ee01103c500f2aae8845850b3dae6a0b6 (diff)
Wait folder insync before sending exit event.
By default wait folder insync. Set ExitImmediate param to true to send exit event immedialty without waiting folder synchronization.
-rw-r--r--lib/apiv1/exec.go42
-rw-r--r--lib/apiv1/make.go27
-rw-r--r--lib/common/execPipeWs.go11
-rw-r--r--lib/model/folder.go10
-rw-r--r--lib/syncthing/st.go90
5 files changed, 165 insertions, 15 deletions
diff --git a/lib/apiv1/exec.go b/lib/apiv1/exec.go
index 895807d..675f6fb 100644
--- a/lib/apiv1/exec.go
+++ b/lib/apiv1/exec.go
@@ -12,13 +12,14 @@ import (
// ExecArgs JSON parameters of /exec command
type ExecArgs struct {
- ID string `json:"id" binding:"required"`
- SdkID string `json:"sdkid"` // sdk ID to use for setting env
- Cmd string `json:"cmd" binding:"required"`
- Args []string `json:"args"`
- Env []string `json:"env"`
- RPath string `json:"rpath"` // relative path into project
- CmdTimeout int `json:"timeout"` // command completion timeout in Second
+ ID string `json:"id" binding:"required"`
+ SdkID string `json:"sdkid"` // sdk ID to use for setting env
+ Cmd string `json:"cmd" binding:"required"`
+ Args []string `json:"args"`
+ Env []string `json:"env"`
+ RPath string `json:"rpath"` // relative path into project
+ ExitImmediate bool `json:"exitImmediate"` // when true, exit event sent immediately when command exited (IOW, don't wait file synchronization)
+ CmdTimeout int `json:"timeout"` // command completion timeout in Second
}
// ExecOutMsg Message send on each output (stdout+stderr) of executed command
@@ -122,7 +123,7 @@ func (s *APIService) execCmd(c *gin.Context) {
}
// Define callback for output
- eCB := func(sid string, id int, code int, err error) {
+ eCB := func(sid string, id int, code int, err error, data *map[string]interface{}) {
s.log.Debugf("Command [Cmd ID %d] exited: code %d, error: %v", id, code, err)
// IO socket can be nil when disconnected
@@ -132,6 +133,30 @@ func (s *APIService) execCmd(c *gin.Context) {
return
}
+ // Retrieve project ID and RootPath
+ prjID := (*data)["ID"].(string)
+ exitImm := (*data)["ExitImmediate"].(bool)
+
+ // XXX - workaround to be sure that Syncthing detected all changes
+ if err := s.mfolder.ForceSync(prjID); err != nil {
+ s.log.Errorf("Error while syncing folder %s: %v", prjID, err)
+ }
+ if !exitImm {
+ // Wait end of file sync
+ // FIXME pass as argument
+ tmo := 60
+ for t := tmo; t > 0; t-- {
+ s.log.Debugf("Wait file insync for %s (%d/%d)", prjID, t, tmo)
+ if sync, err := s.mfolder.IsFolderInSync(prjID); sync || err != nil {
+ if err != nil {
+ s.log.Errorf("ERROR IsFolderInSync (%s): %v", prjID, err)
+ }
+ break
+ }
+ time.Sleep(time.Second)
+ }
+ }
+
// FIXME replace by .BroadcastTo a room
e := (*so).Emit(ExecExitEvent, ExecExitMsg{
CmdID: strconv.Itoa(id),
@@ -164,6 +189,7 @@ func (s *APIService) execCmd(c *gin.Context) {
data := make(map[string]interface{})
data["ID"] = prj.ID
data["RootPath"] = prj.RootPath
+ data["ExitImmediate"] = args.ExitImmediate
err := common.ExecPipeWs(cmd, args.Env, sop, sess.ID, cmdID, execTmo, s.log, oCB, eCB, &data)
if err != nil {
diff --git a/lib/apiv1/make.go b/lib/apiv1/make.go
index 098e41c..d015d2b 100644
--- a/lib/apiv1/make.go
+++ b/lib/apiv1/make.go
@@ -2,6 +2,7 @@ package apiv1
import (
"net/http"
+ "strings"
"time"
@@ -95,7 +96,16 @@ func (s *APIService) buildMake(c *gin.Context) {
s.log.Infof("%s not emitted: WS closed - sid: %s - msg id:%d", MakeOutEvent, sid, id)
return
}
- s.log.Debugf("%s emitted - WS sid %s - id:%d", MakeOutEvent, sid, id)
+
+ // Retrieve project ID and RootPath
+ prjID := (*data)["ID"].(string)
+ prjRootPath := (*data)["RootPath"].(string)
+
+ // Cleanup any references to internal rootpath in stdout & stderr
+ stdout = strings.Replace(stdout, prjRootPath, "", -1)
+ stderr = strings.Replace(stderr, prjRootPath, "", -1)
+
+ s.log.Debugf("%s emitted - WS sid %s - id:%d - prjID:%s", MakeOutEvent, sid, id, prjID)
// FIXME replace by .BroadcastTo a room
err := (*so).Emit(MakeOutEvent, MakeOutMsg{
@@ -110,7 +120,7 @@ func (s *APIService) buildMake(c *gin.Context) {
}
// Define callback for output
- eCB := func(sid string, id int, code int, err error) {
+ eCB := func(sid string, id int, code int, err error, data *map[string]interface{}) {
s.log.Debugf("Command [Cmd ID %d] exited: code %d, error: %v", id, code, err)
// IO socket can be nil when disconnected
@@ -120,6 +130,14 @@ func (s *APIService) buildMake(c *gin.Context) {
return
}
+ // Retrieve project ID and RootPath
+ prjID := (*data)["ID"].(string)
+
+ // XXX - workaround to be sure that Syncthing detected all changes
+ if err := s.mfolder.ForceSync(prjID); err != nil {
+ s.log.Errorf("Error while syncing folder %s: %v", prjID, err)
+ }
+
// FIXME replace by .BroadcastTo a room
e := (*so).Emit(MakeExitEvent, MakeExitMsg{
CmdID: strconv.Itoa(id),
@@ -148,6 +166,11 @@ func (s *APIService) buildMake(c *gin.Context) {
}
s.log.Debugf("Execute [Cmd ID %d]: %v", cmdID, cmd)
+
+ data := make(map[string]interface{})
+ data["ID"] = prj.ID
+ data["RootPath"] = prj.RootPath
+
err := common.ExecPipeWs(cmd, args.Env, sop, sess.ID, cmdID, execTmo, s.log, oCB, eCB, nil)
if err != nil {
common.APIError(c, err.Error())
diff --git a/lib/common/execPipeWs.go b/lib/common/execPipeWs.go
index 4994d9d..9bb4517 100644
--- a/lib/common/execPipeWs.go
+++ b/lib/common/execPipeWs.go
@@ -18,7 +18,7 @@ import (
type EmitOutputCB func(sid string, cmdID int, stdout, stderr string, data *map[string]interface{})
// EmitExitCB is the function callback used to emit exit proc code
-type EmitExitCB func(sid string, cmdID int, code int, err error)
+type EmitExitCB func(sid string, cmdID int, code int, err error, data *map[string]interface{})
// Inspired by :
// https://github.com/gorilla/websocket/blob/master/examples/command/main.go
@@ -63,7 +63,7 @@ func ExecPipeWs(cmd []string, env []string, so *socketio.Socket, sid string, cmd
go cmdPumpStdout(so, outr, stdoutDone, sid, cmdID, log, eoCB, data)
// Blocking function that poll input or wait for end of process
- cmdPumpStdin(so, inw, proc, sid, cmdID, cmdExecTimeout, log, eeCB)
+ cmdPumpStdin(so, inw, proc, sid, cmdID, cmdExecTimeout, log, eeCB, data)
// Some commands will exit when stdin is closed.
inw.Close()
@@ -94,7 +94,8 @@ func ExecPipeWs(cmd []string, env []string, so *socketio.Socket, sid string, cmd
}
func cmdPumpStdin(so *socketio.Socket, w io.Writer, proc *os.Process,
- sid string, cmdID int, tmo int, log *logrus.Logger, exitFuncCB EmitExitCB) {
+ sid string, cmdID int, tmo int, log *logrus.Logger, exitFuncCB EmitExitCB,
+ data *map[string]interface{}) {
/* XXX - code to add to support stdin through WS
for {
_, message, err := so. ?? ReadMessage()
@@ -127,10 +128,10 @@ func cmdPumpStdin(so *socketio.Socket, w io.Writer, proc *os.Process,
// Wait cmd complete
select {
case dC := <-done:
- exitFuncCB(sid, cmdID, dC.status, dC.err)
+ exitFuncCB(sid, cmdID, dC.status, dC.err, data)
case <-time.After(time.Duration(tmo) * time.Second):
exitFuncCB(sid, cmdID, -99,
- fmt.Errorf("Exit Timeout for command ID %v", cmdID))
+ fmt.Errorf("Exit Timeout for command ID %v", cmdID), data)
}
}
diff --git a/lib/model/folder.go b/lib/model/folder.go
index e461f9c..fa94409 100644
--- a/lib/model/folder.go
+++ b/lib/model/folder.go
@@ -98,3 +98,13 @@ func (c *Folder) DeleteFolder(id string) (xdsconfig.FolderConfig, error) {
return fld, err
}
+
+// ForceSync Force the synchronization of a folder
+func (c *Folder) ForceSync(id string) error {
+ return c.SThg.FolderScan(id, "")
+}
+
+// IsFolderInSync Returns true when folder is in sync
+func (c *Folder) IsFolderInSync(id string) (bool, error) {
+ return c.SThg.IsFolderInSync(id)
+}
diff --git a/lib/syncthing/st.go b/lib/syncthing/st.go
index 957dd65..75bdf80 100644
--- a/lib/syncthing/st.go
+++ b/lib/syncthing/st.go
@@ -49,6 +49,42 @@ type ExitChan struct {
err error
}
+// ConfigInSync Check whether if Syncthing configuration is in sync
+type configInSync struct {
+ ConfigInSync bool `json:"configInSync"`
+}
+
+// FolderStatus Information about the current status of a folder.
+type FolderStatus struct {
+ GlobalFiles int `json:"globalFiles"`
+ GlobalDirectories int `json:"globalDirectories"`
+ GlobalSymlinks int `json:"globalSymlinks"`
+ GlobalDeleted int `json:"globalDeleted"`
+ GlobalBytes int64 `json:"globalBytes"`
+
+ LocalFiles int `json:"localFiles"`
+ LocalDirectories int `json:"localDirectories"`
+ LocalSymlinks int `json:"localSymlinks"`
+ LocalDeleted int `json:"localDeleted"`
+ LocalBytes int64 `json:"localBytes"`
+
+ NeedFiles int `json:"needFiles"`
+ NeedDirectories int `json:"needDirectories"`
+ NeedSymlinks int `json:"needSymlinks"`
+ NeedDeletes int `json:"needDeletes"`
+ NeedBytes int64 `json:"needBytes"`
+
+ InSyncFiles int `json:"inSyncFiles"`
+ InSyncBytes int64 `json:"inSyncBytes"`
+
+ State string `json:"state"`
+ StateChanged time.Time `json:"stateChanged"`
+
+ Sequence int64 `json:"sequence"`
+
+ IgnorePatterns bool `json:"ignorePatterns"`
+}
+
// NewSyncThing creates a new instance of Syncthing
func NewSyncThing(conf *xdsconfig.Config, log *logrus.Logger) *SyncThing {
var url, apiKey, home, binDir string
@@ -309,3 +345,57 @@ func (s *SyncThing) ConfigSet(cfg config.Configuration) error {
}
return s.client.HTTPPost("system/config", string(body))
}
+
+// IsConfigInSync Returns true if configuration is in sync
+func (s *SyncThing) IsConfigInSync() (bool, error) {
+ var data []byte
+ var d configInSync
+ if err := s.client.HTTPGet("system/config/insync", &data); err != nil {
+ return false, err
+ }
+ if err := json.Unmarshal(data, &d); err != nil {
+ return false, err
+ }
+ return d.ConfigInSync, nil
+}
+
+// FolderStatus Returns all information about the current
+func (s *SyncThing) FolderStatus(folderID string) (*FolderStatus, error) {
+ var data []byte
+ var res FolderStatus
+ if folderID == "" {
+ return nil, fmt.Errorf("folderID not set")
+ }
+ if err := s.client.HTTPGet("db/status?folder="+folderID, &data); err != nil {
+ return nil, err
+ }
+ if err := json.Unmarshal(data, &res); err != nil {
+ return nil, err
+ }
+ return &res, nil
+}
+
+// IsFolderInSync Returns true when folder is in sync
+func (s *SyncThing) IsFolderInSync(folderID string) (bool, error) {
+ // FIXME better to detected FolderCompletion event (/rest/events)
+ // See https://docs.syncthing.net/dev/events.html
+ sts, err := s.FolderStatus(folderID)
+ if err != nil {
+ return false, err
+ }
+ return sts.NeedBytes == 0, nil
+}
+
+// FolderScan Request immediate folder scan.
+// Scan all folders if folderID param is empty
+func (s *SyncThing) FolderScan(folderID string, subpath string) error {
+ url := "db/scan"
+ if folderID != "" {
+ url += "?folder=" + folderID
+
+ if subpath != "" {
+ url += "&sub=" + subpath
+ }
+ }
+ return s.client.HTTPPost(url, "")
+}