diff options
Diffstat (limited to 'lib/syncthing')
-rw-r--r-- | lib/syncthing/st.go | 95 | ||||
-rw-r--r-- | lib/syncthing/stEvent.go | 265 | ||||
-rw-r--r-- | lib/syncthing/stfolder.go | 103 |
3 files changed, 437 insertions, 26 deletions
diff --git a/lib/syncthing/st.go b/lib/syncthing/st.go index 660738d..bc3b101 100644 --- a/lib/syncthing/st.go +++ b/lib/syncthing/st.go @@ -24,11 +24,14 @@ import ( // SyncThing . type SyncThing struct { - BaseURL string - APIKey string - Home string - STCmd *exec.Cmd - STICmd *exec.Cmd + BaseURL string + APIKey string + Home string + STCmd *exec.Cmd + STICmd *exec.Cmd + MyID string + Connected bool + Events *Events // Private fields binDir string @@ -37,6 +40,7 @@ type SyncThing struct { exitSTIChan chan ExitChan client *common.HTTPClient log *logrus.Logger + conf *xdsconfig.Config } // ExitChan Channel used for process exit @@ -45,6 +49,42 @@ type ExitChan struct { err error } +// ConfigInSync Check whether if Syncthing configuration is in sync +type configInSync struct { + ConfigInSync bool `json:"configInSync"` +} + +// FolderStatus Information about the current status of a folder. +type FolderStatus struct { + GlobalFiles int `json:"globalFiles"` + GlobalDirectories int `json:"globalDirectories"` + GlobalSymlinks int `json:"globalSymlinks"` + GlobalDeleted int `json:"globalDeleted"` + GlobalBytes int64 `json:"globalBytes"` + + LocalFiles int `json:"localFiles"` + LocalDirectories int `json:"localDirectories"` + LocalSymlinks int `json:"localSymlinks"` + LocalDeleted int `json:"localDeleted"` + LocalBytes int64 `json:"localBytes"` + + NeedFiles int `json:"needFiles"` + NeedDirectories int `json:"needDirectories"` + NeedSymlinks int `json:"needSymlinks"` + NeedDeletes int `json:"needDeletes"` + NeedBytes int64 `json:"needBytes"` + + InSyncFiles int `json:"inSyncFiles"` + InSyncBytes int64 `json:"inSyncBytes"` + + State string `json:"state"` + StateChanged time.Time `json:"stateChanged"` + + Sequence int64 `json:"sequence"` + + IgnorePatterns bool `json:"ignorePatterns"` +} + // NewSyncThing creates a new instance of Syncthing func NewSyncThing(conf *xdsconfig.Config, log *logrus.Logger) *SyncThing { var url, apiKey, home, binDir string @@ -75,8 +115,12 @@ func NewSyncThing(conf *xdsconfig.Config, log *logrus.Logger) *SyncThing { binDir: binDir, logsDir: conf.FileConf.LogsDir, log: log, + conf: conf, } + // Create Events monitoring + // SEB TO TEST s.Events = s.NewEventListener() + return &s } @@ -182,6 +226,8 @@ func (s *SyncThing) Start() (*exec.Cmd, error) { "STNOUPGRADE=1", } + /* SEB STILL NEEDED, if not SUP code + // XXX - temporary hack because -gui-apikey seems to correctly handle by // syncthing the early first time stConfigFile := filepath.Join(s.Home, "config.xml") @@ -211,12 +257,12 @@ func (s *SyncThing) Start() (*exec.Cmd, error) { return nil, fmt.Errorf("Cannot write Syncthing config file to set apikey") } } - + */ s.STCmd, err = s.startProc("syncthing", args, env, &s.exitSTChan) // Use autogenerated apikey if not set by config.json - if s.APIKey == "" { - if fd, err := os.Open(stConfigFile); err == nil { + if err == nil && s.APIKey == "" { + if fd, err := os.Open(filepath.Join(s.Home, "config.xml")); err == nil { defer fd.Close() if b, err := ioutil.ReadAll(fd); err == nil { re := regexp.MustCompile("<apikey>(.*)</apikey>") @@ -294,11 +340,17 @@ func (s *SyncThing) StopInotify() { // Connect Establish HTTP connection with Syncthing func (s *SyncThing) Connect() error { var err error + s.Connected = false s.client, err = common.HTTPNewClient(s.BaseURL, common.HTTPClientConfig{ URLPrefix: "/rest", HeaderClientKeyName: "X-Syncthing-ID", + LogOut: s.conf.LogVerboseOut, + LogPrefix: "SYNCTHING: ", + LogLevel: common.HTTPLogLevelWarning, }) + s.client.SetLogLevel(s.log.Level.String()) + if err != nil { msg := ": " + err.Error() if strings.Contains(err.Error(), "connection refused") { @@ -310,11 +362,17 @@ func (s *SyncThing) Connect() error { return fmt.Errorf("ERROR: cannot connect to Syncthing (null client)") } - s.client.SetLogLevel(s.log.Level.String()) - s.client.LoggerPrefix = "SYNCTHING: " - s.client.LoggerOut = s.log.Out + s.MyID, err = s.IDGet() + if err != nil { + return fmt.Errorf("ERROR: cannot retrieve ID") + } + + s.Connected = true - return nil + // Start events monitoring + //SEB TODO err = s.Events.Start() + + return err } // IDGet returns the Syncthing ID of Syncthing instance running locally @@ -347,3 +405,16 @@ func (s *SyncThing) ConfigSet(cfg config.Configuration) error { } return s.client.HTTPPost("system/config", string(body)) } + +// IsConfigInSync Returns true if configuration is in sync +func (s *SyncThing) IsConfigInSync() (bool, error) { + var data []byte + var d configInSync + if err := s.client.HTTPGet("system/config/insync", &data); err != nil { + return false, err + } + if err := json.Unmarshal(data, &d); err != nil { + return false, err + } + return d.ConfigInSync, nil +} diff --git a/lib/syncthing/stEvent.go b/lib/syncthing/stEvent.go new file mode 100644 index 0000000..9ca8b78 --- /dev/null +++ b/lib/syncthing/stEvent.go @@ -0,0 +1,265 @@ +package st + +import ( + "encoding/json" + "fmt" + "os" + "strconv" + "strings" + "time" + + "github.com/Sirupsen/logrus" +) + +// Events . +type Events struct { + MonitorTime time.Duration + Debug bool + + stop chan bool + st *SyncThing + log *logrus.Logger + cbArr map[string][]cbMap +} + +type Event struct { + Type string `json:"type"` + Time time.Time `json:"time"` + Data map[string]string `json:"data"` +} + +type EventsCBData map[string]interface{} +type EventsCB func(ev Event, cbData *EventsCBData) + +const ( + EventFolderCompletion string = "FolderCompletion" + EventFolderSummary string = "FolderSummary" + EventFolderPaused string = "FolderPaused" + EventFolderResumed string = "FolderResumed" + EventFolderErrors string = "FolderErrors" + EventStateChanged string = "StateChanged" +) + +var EventsAll string = EventFolderCompletion + "|" + + EventFolderSummary + "|" + + EventFolderPaused + "|" + + EventFolderResumed + "|" + + EventFolderErrors + "|" + + EventStateChanged + +type STEvent struct { + // Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API + SubscriptionID int `json:"id"` + // Global ID of the event across all subscriptions + GlobalID int `json:"globalID"` + Time time.Time `json:"time"` + Type string `json:"type"` + Data map[string]interface{} `json:"data"` +} + +type cbMap struct { + id int + cb EventsCB + filterID string + data *EventsCBData +} + +// NewEventListener Create a new instance of Event listener +func (s *SyncThing) NewEventListener() *Events { + _, dbg := os.LookupEnv("XDS_DEBUG_STEVENTS") // set to add more debug log + return &Events{ + MonitorTime: 100, // in Milliseconds + Debug: dbg, + stop: make(chan bool, 1), + st: s, + log: s.log, + cbArr: make(map[string][]cbMap), + } +} + +// Start starts event monitoring loop +func (e *Events) Start() error { + go e.monitorLoop() + return nil +} + +// Stop stops event monitoring loop +func (e *Events) Stop() { + e.stop <- true +} + +// Register Add a listener on an event +func (e *Events) Register(evName string, cb EventsCB, filterID string, data *EventsCBData) (int, error) { + if evName == "" || !strings.Contains(EventsAll, evName) { + return -1, fmt.Errorf("Unknown event name") + } + if data == nil { + data = &EventsCBData{} + } + + cbList := []cbMap{} + if _, ok := e.cbArr[evName]; ok { + cbList = e.cbArr[evName] + } + + id := len(cbList) + (*data)["id"] = strconv.Itoa(id) + + e.cbArr[evName] = append(cbList, cbMap{id: id, cb: cb, filterID: filterID, data: data}) + + return id, nil +} + +// UnRegister Remove a listener event +func (e *Events) UnRegister(evName string, id int) error { + cbKey, ok := e.cbArr[evName] + if !ok { + return fmt.Errorf("No event registered to such name") + } + + // FIXME - NOT TESTED + if id >= len(cbKey) { + return fmt.Errorf("Invalid id") + } else if id == len(cbKey) { + e.cbArr[evName] = cbKey[:id-1] + } else { + e.cbArr[evName] = cbKey[id : id+1] + } + + return nil +} + +// GetEvents returns the Syncthing events +func (e *Events) getEvents(since int) ([]STEvent, error) { + var data []byte + ev := []STEvent{} + url := "events" + if since != -1 { + url += "?since=" + strconv.Itoa(since) + } + if err := e.st.client.HTTPGet(url, &data); err != nil { + return ev, err + } + err := json.Unmarshal(data, &ev) + return ev, err +} + +// Loop to monitor Syncthing events +func (e *Events) monitorLoop() { + e.log.Infof("Event monitoring running...") + since := 0 + cntErrConn := 0 + cntErrRetry := 1 + for { + select { + case <-e.stop: + e.log.Infof("Event monitoring exited") + return + + case <-time.After(e.MonitorTime * time.Millisecond): + + if !e.st.Connected { + cntErrConn++ + time.Sleep(time.Second) + if cntErrConn > cntErrRetry { + e.log.Error("ST Event monitor: ST connection down") + cntErrConn = 0 + cntErrRetry *= 2 + if _, err := e.getEvents(since); err == nil { + e.st.Connected = true + cntErrRetry = 1 + // XXX - should we reset since value ? + goto readEvent + } + } + continue + } + + readEvent: + stEvArr, err := e.getEvents(since) + if err != nil { + e.log.Errorf("Syncthing Get Events: %v", err) + e.st.Connected = false + continue + } + + // Process events + for _, stEv := range stEvArr { + since = stEv.SubscriptionID + if e.Debug { + e.log.Warnf("ST EVENT: %d %s\n %v", stEv.GlobalID, stEv.Type, stEv) + } + + cbKey, ok := e.cbArr[stEv.Type] + if !ok { + continue + } + + evData := Event{ + Type: stEv.Type, + Time: stEv.Time, + } + + // Decode Events + // FIXME: re-define data struct for each events + // instead of map of string and use JSON marshing/unmarshing + fID := "" + evData.Data = make(map[string]string) + switch stEv.Type { + + case EventFolderCompletion: + fID = convString(stEv.Data["folder"]) + evData.Data["completion"] = convFloat64(stEv.Data["completion"]) + + case EventFolderSummary: + fID = convString(stEv.Data["folder"]) + evData.Data["needBytes"] = convInt64(stEv.Data["needBytes"]) + evData.Data["state"] = convString(stEv.Data["state"]) + + case EventFolderPaused, EventFolderResumed: + fID = convString(stEv.Data["id"]) + evData.Data["label"] = convString(stEv.Data["label"]) + + case EventFolderErrors: + fID = convString(stEv.Data["folder"]) + // TODO decode array evData.Data["errors"] = convString(stEv.Data["errors"]) + + case EventStateChanged: + fID = convString(stEv.Data["folder"]) + evData.Data["from"] = convString(stEv.Data["from"]) + evData.Data["to"] = convString(stEv.Data["to"]) + + default: + e.log.Warnf("Unsupported event type") + } + + if fID != "" { + evData.Data["id"] = fID + } + + // Call all registered callbacks + for _, c := range cbKey { + if e.Debug { + e.log.Warnf("EVENT CB fID=%s, filterID=%s", fID, c.filterID) + } + // Call when filterID is not set or when it matches + if c.filterID == "" || (fID != "" && fID == c.filterID) { + c.cb(evData, c.data) + } + } + } + } + } +} + +func convString(d interface{}) string { + return d.(string) +} + +func convFloat64(d interface{}) string { + return strconv.FormatFloat(d.(float64), 'f', -1, 64) +} + +func convInt64(d interface{}) string { + return strconv.FormatInt(d.(int64), 10) +} diff --git a/lib/syncthing/stfolder.go b/lib/syncthing/stfolder.go index d79e579..a5312eb 100644 --- a/lib/syncthing/stfolder.go +++ b/lib/syncthing/stfolder.go @@ -1,10 +1,12 @@ package st import ( - "path/filepath" + "encoding/json" + "fmt" "strings" - "github.com/syncthing/syncthing/lib/config" + common "github.com/iotbzh/xds-common/golib" + stconfig "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/protocol" ) @@ -18,23 +20,23 @@ type FolderChangeArg struct { } // FolderChange is called when configuration has changed -func (s *SyncThing) FolderChange(f FolderChangeArg) error { +func (s *SyncThing) FolderChange(f FolderChangeArg) (string, error) { // Get current config stCfg, err := s.ConfigGet() if err != nil { s.log.Errorln(err) - return err + return "", err } // Add new Device if needed var devID protocol.DeviceID if err := devID.UnmarshalText([]byte(f.SyncThingID)); err != nil { - s.log.Errorf("not a valid device id (err %v)\n", err) - return err + s.log.Errorf("not a valid device id (err %v)", err) + return "", err } - newDevice := config.DeviceConfiguration{ + newDevice := stconfig.DeviceConfiguration{ DeviceID: devID, Name: f.SyncThingID, Addresses: []string{"dynamic"}, @@ -60,18 +62,33 @@ func (s *SyncThing) FolderChange(f FolderChangeArg) error { id = f.SyncThingID[0:15] + "_" + label } - folder := config.FolderConfiguration{ - ID: id, - Label: label, - RawPath: filepath.Join(f.ShareRootDir, f.RelativePath), + // Resolve local path + pathCli, err := common.ResolveEnvVar(f.RelativePath) + if err != nil { + pathCli = f.RelativePath + } + // SEB still need ShareRootDir ? a sup + // pathCli := filepath.Join(f.ShareRootDir, f.RelativePath) + + folder := stconfig.FolderConfiguration{ + ID: id, + Label: label, + RawPath: pathCli, + AutoNormalize: true, } - folder.Devices = append(folder.Devices, config.FolderDeviceConfiguration{ + /* TODO - add it ? + if s.conf.FileConf.SThgConf.RescanIntervalS > 0 { + folder.RescanIntervalS = s.conf.FileConf.SThgConf.RescanIntervalS + } + */ + + folder.Devices = append(folder.Devices, stconfig.FolderDeviceConfiguration{ DeviceID: newDevice.DeviceID, }) found = false - var fld config.FolderConfiguration + var fld stconfig.FolderConfiguration for _, fld = range stCfg.Folders { if folder.ID == fld.ID { fld = folder @@ -89,7 +106,7 @@ func (s *SyncThing) FolderChange(f FolderChangeArg) error { s.log.Errorln(err) } - return nil + return id, nil } // FolderDelete is called to delete a folder config @@ -114,3 +131,61 @@ func (s *SyncThing) FolderDelete(id string) error { return nil } + +// FolderConfigGet Returns the configuration of a specific folder +func (s *SyncThing) FolderConfigGet(folderID string) (stconfig.FolderConfiguration, error) { + fc := stconfig.FolderConfiguration{} + if folderID == "" { + return fc, fmt.Errorf("folderID not set") + } + cfg, err := s.ConfigGet() + if err != nil { + return fc, err + } + for _, f := range cfg.Folders { + if f.ID == folderID { + fc = f + return fc, nil + } + } + return fc, fmt.Errorf("id not found") +} + +// FolderStatus Returns all information about the current +func (s *SyncThing) FolderStatus(folderID string) (*FolderStatus, error) { + var data []byte + var res FolderStatus + if folderID == "" { + return nil, fmt.Errorf("folderID not set") + } + if err := s.client.HTTPGet("db/status?folder="+folderID, &data); err != nil { + return nil, err + } + if err := json.Unmarshal(data, &res); err != nil { + return nil, err + } + return &res, nil +} + +// IsFolderInSync Returns true when folder is in sync +func (s *SyncThing) IsFolderInSync(folderID string) (bool, error) { + sts, err := s.FolderStatus(folderID) + if err != nil { + return false, err + } + return sts.NeedBytes == 0 && sts.State == "idle", nil +} + +// FolderScan Request immediate folder scan. +// Scan all folders if folderID param is empty +func (s *SyncThing) FolderScan(folderID string, subpath string) error { + url := "db/scan" + if folderID != "" { + url += "?folder=" + folderID + + if subpath != "" { + url += "&sub=" + subpath + } + } + return s.client.HTTPPost(url, "") +} |