From 8f44cc7217ce48f3f94c8ea3f037cdf011c4493b Mon Sep 17 00:00:00 2001 From: Sebastien Douheret Date: Fri, 18 Aug 2017 01:04:02 +0200 Subject: Add folder synchronization status. Also add ability to force re-synchronization. --- lib/syncthing/folder-st.go | 83 +++++++++++++++- lib/syncthing/st.go | 10 ++ lib/syncthing/stEvent.go | 242 +++++++++++++++++++++++++++++++++++++++++++++ lib/syncthing/stfolder.go | 4 +- 4 files changed, 331 insertions(+), 8 deletions(-) create mode 100644 lib/syncthing/stEvent.go (limited to 'lib/syncthing') diff --git a/lib/syncthing/folder-st.go b/lib/syncthing/folder-st.go index ffcd284..da27062 100644 --- a/lib/syncthing/folder-st.go +++ b/lib/syncthing/folder-st.go @@ -6,6 +6,7 @@ import ( "github.com/iotbzh/xds-server/lib/folder" "github.com/iotbzh/xds-server/lib/xdsconfig" + uuid "github.com/satori/go.uuid" "github.com/syncthing/syncthing/lib/config" ) @@ -13,10 +14,13 @@ import ( // STFolder . type STFolder struct { - globalConfig *xdsconfig.Config - st *SyncThing - fConfig folder.FolderConfig - stfConfig config.FolderConfiguration + globalConfig *xdsconfig.Config + st *SyncThing + fConfig folder.FolderConfig + stfConfig config.FolderConfiguration + eventIDs []int + eventChangeCB *folder.EventCB + eventChangeCBData *folder.EventCBData } // NewFolderST Create a new instance of STFolder @@ -27,6 +31,15 @@ func (s *SyncThing) NewFolderST(gc *xdsconfig.Config) *STFolder { } } +// NewUID Get a UUID +func (f *STFolder) NewUID(suffix string) string { + i := len(f.st.MyID) + if i > 15 { + i = 15 + } + return uuid.NewV1().String()[:14] + f.st.MyID[:i] + "_" + suffix +} + // Add a new folder func (f *STFolder) Add(cfg folder.FolderConfig) (*folder.FolderConfig, error) { @@ -59,6 +72,16 @@ func (f *STFolder) Add(cfg folder.FolderConfig) (*folder.FolderConfig, error) { return nil, err } + // Register to events to update folder status + for _, evName := range []string{EventStateChanged, EventFolderPaused} { + evID, err := f.st.Events.Register(evName, f.cbEventState, id, nil) + if err != nil { + return nil, err + } + f.eventIDs = append(f.eventIDs, evID) + } + + f.fConfig.IsInSync = false // will be updated later by events f.fConfig.Status = folder.StatusEnable } @@ -86,6 +109,20 @@ func (f *STFolder) Remove() error { return f.st.FolderDelete(f.stfConfig.ID) } +// RegisterEventChange requests registration for folder event change +func (f *STFolder) RegisterEventChange(cb *folder.EventCB, data *folder.EventCBData) error { + f.eventChangeCB = cb + f.eventChangeCBData = data + return nil +} + +// UnRegisterEventChange remove registered callback +func (f *STFolder) UnRegisterEventChange() error { + f.eventChangeCB = nil + f.eventChangeCBData = nil + return nil +} + // Sync Force folder files synchronization func (f *STFolder) Sync() error { return f.st.FolderScan(f.stfConfig.ID, "") @@ -93,5 +130,41 @@ func (f *STFolder) Sync() error { // IsInSync Check if folder files are in-sync func (f *STFolder) IsInSync() (bool, error) { - return f.st.IsFolderInSync(f.stfConfig.ID) + sts, err := f.st.IsFolderInSync(f.stfConfig.ID) + if err != nil { + return false, err + } + f.fConfig.IsInSync = sts + return sts, nil +} + +// callback use to update IsInSync status +func (f *STFolder) cbEventState(ev Event, data *EventsCBData) { + prevSync := f.fConfig.IsInSync + prevStatus := f.fConfig.Status + + switch ev.Type { + + case EventStateChanged: + to := ev.Data["to"] + switch to { + case "scanning", "syncing": + f.fConfig.Status = folder.StatusSyncing + case "idle": + f.fConfig.Status = folder.StatusEnable + } + f.fConfig.IsInSync = (to == "idle") + + case EventFolderPaused: + if f.fConfig.Status == folder.StatusEnable { + f.fConfig.Status = folder.StatusPause + } + f.fConfig.IsInSync = false + } + + if f.eventChangeCB != nil && + (prevSync != f.fConfig.IsInSync || prevStatus != f.fConfig.Status) { + cpConf := f.fConfig + (*f.eventChangeCB)(&cpConf, f.eventChangeCBData) + } } diff --git a/lib/syncthing/st.go b/lib/syncthing/st.go index 9bdb48f..10210a4 100644 --- a/lib/syncthing/st.go +++ b/lib/syncthing/st.go @@ -42,6 +42,7 @@ type SyncThing struct { conf *xdsconfig.Config client *common.HTTPClient log *logrus.Logger + Events *Events } // ExitChan Channel used for process exit @@ -126,6 +127,9 @@ func NewSyncThing(conf *xdsconfig.Config, log *logrus.Logger) *SyncThing { conf: conf, } + // Create Events monitoring + s.Events = s.NewEventListener() + return &s } @@ -316,6 +320,12 @@ func (s *SyncThing) Connect() error { s.client.SetLogger(s.log) s.MyID, err = s.IDGet() + if err != nil { + return fmt.Errorf("ERROR: cannot retrieve ID") + } + + // Start events monitoring + err = s.Events.Start() return err } diff --git a/lib/syncthing/stEvent.go b/lib/syncthing/stEvent.go new file mode 100644 index 0000000..bf2a809 --- /dev/null +++ b/lib/syncthing/stEvent.go @@ -0,0 +1,242 @@ +package st + +import ( + "encoding/json" + "fmt" + "os" + "strconv" + "strings" + "time" + + "github.com/Sirupsen/logrus" +) + +// Events . +type Events struct { + MonitorTime time.Duration + Debug bool + + stop chan bool + st *SyncThing + log *logrus.Logger + cbArr map[string][]cbMap +} + +type Event struct { + Type string `json:"type"` + Time time.Time `json:"time"` + Data map[string]string `json:"data"` +} + +type EventsCBData map[string]interface{} +type EventsCB func(ev Event, cbData *EventsCBData) + +const ( + EventFolderCompletion string = "FolderCompletion" + EventFolderSummary string = "FolderSummary" + EventFolderPaused string = "FolderPaused" + EventFolderResumed string = "FolderResumed" + EventFolderErrors string = "FolderErrors" + EventStateChanged string = "StateChanged" +) + +var EventsAll string = EventFolderCompletion + "|" + + EventFolderSummary + "|" + + EventFolderPaused + "|" + + EventFolderResumed + "|" + + EventFolderErrors + "|" + + EventStateChanged + +type STEvent struct { + // Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API + SubscriptionID int `json:"id"` + // Global ID of the event across all subscriptions + GlobalID int `json:"globalID"` + Time time.Time `json:"time"` + Type string `json:"type"` + Data map[string]interface{} `json:"data"` +} + +type cbMap struct { + id int + cb EventsCB + filterID string + data *EventsCBData +} + +// NewEventListener Create a new instance of Event listener +func (s *SyncThing) NewEventListener() *Events { + _, dbg := os.LookupEnv("XDS_DEBUG_STEVENTS") // set to add more debug log + return &Events{ + MonitorTime: 100, // in Milliseconds + Debug: dbg, + stop: make(chan bool, 1), + st: s, + log: s.log, + cbArr: make(map[string][]cbMap), + } +} + +// Start starts event monitoring loop +func (e *Events) Start() error { + go e.monitorLoop() + return nil +} + +// Stop stops event monitoring loop +func (e *Events) Stop() { + e.stop <- true +} + +// Register Add a listener on an event +func (e *Events) Register(evName string, cb EventsCB, filterID string, data *EventsCBData) (int, error) { + if evName == "" || !strings.Contains(EventsAll, evName) { + return -1, fmt.Errorf("Unknown event name") + } + if data == nil { + data = &EventsCBData{} + } + + cbList := []cbMap{} + if _, ok := e.cbArr[evName]; ok { + cbList = e.cbArr[evName] + } + + id := len(cbList) + (*data)["id"] = strconv.Itoa(id) + + e.cbArr[evName] = append(cbList, cbMap{id: id, cb: cb, filterID: filterID, data: data}) + + return id, nil +} + +// UnRegister Remove a listener event +func (e *Events) UnRegister(evName string, id int) error { + cbKey, ok := e.cbArr[evName] + if !ok { + return fmt.Errorf("No event registered to such name") + } + + // FIXME - NOT TESTED + if id >= len(cbKey) { + return fmt.Errorf("Invalid id") + } else if id == len(cbKey) { + e.cbArr[evName] = cbKey[:id-1] + } else { + e.cbArr[evName] = cbKey[id : id+1] + } + + return nil +} + +// GetEvents returns the Syncthing events +func (e *Events) getEvents(since int) ([]STEvent, error) { + var data []byte + ev := []STEvent{} + url := "events" + if since != -1 { + url += "?since=" + strconv.Itoa(since) + } + if err := e.st.client.HTTPGet(url, &data); err != nil { + return ev, err + } + err := json.Unmarshal(data, &ev) + return ev, err +} + +// Loop to monitor Syncthing events +func (e *Events) monitorLoop() { + e.log.Infof("Event monitoring running...") + since := 0 + for { + select { + case <-e.stop: + e.log.Infof("Event monitoring exited") + return + + case <-time.After(e.MonitorTime * time.Millisecond): + stEvArr, err := e.getEvents(since) + if err != nil { + e.log.Errorf("Syncthing Get Events: %v", err) + continue + } + // Process events + for _, stEv := range stEvArr { + since = stEv.SubscriptionID + if e.Debug { + e.log.Warnf("ST EVENT: %d %s\n %v", stEv.GlobalID, stEv.Type, stEv) + } + + cbKey, ok := e.cbArr[stEv.Type] + if !ok { + continue + } + + evData := Event{ + Type: stEv.Type, + Time: stEv.Time, + } + + // Decode Events + // FIXME: re-define data struct for each events + // instead of map of string and use JSON marshing/unmarshing + fID := "" + evData.Data = make(map[string]string) + switch stEv.Type { + + case EventFolderCompletion: + fID = convString(stEv.Data["folder"]) + evData.Data["completion"] = convFloat64(stEv.Data["completion"]) + + case EventFolderSummary: + fID = convString(stEv.Data["folder"]) + evData.Data["needBytes"] = convInt64(stEv.Data["needBytes"]) + evData.Data["state"] = convString(stEv.Data["state"]) + + case EventFolderPaused, EventFolderResumed: + fID = convString(stEv.Data["id"]) + evData.Data["label"] = convString(stEv.Data["label"]) + + case EventFolderErrors: + fID = convString(stEv.Data["folder"]) + // TODO decode array evData.Data["errors"] = convString(stEv.Data["errors"]) + + case EventStateChanged: + fID = convString(stEv.Data["folder"]) + evData.Data["from"] = convString(stEv.Data["from"]) + evData.Data["to"] = convString(stEv.Data["to"]) + + default: + e.log.Warnf("Unsupported event type") + } + + if fID != "" { + evData.Data["id"] = fID + } + + // Call all registered callbacks + for _, c := range cbKey { + if e.Debug { + e.log.Warnf("EVENT CB fID=%s, filterID=%s", fID, c.filterID) + } + // Call when filterID is not set or when it matches + if c.filterID == "" || (fID != "" && fID == c.filterID) { + c.cb(evData, c.data) + } + } + } + } + } +} + +func convString(d interface{}) string { + return d.(string) +} + +func convFloat64(d interface{}) string { + return strconv.FormatFloat(d.(float64), 'f', -1, 64) +} + +func convInt64(d interface{}) string { + return strconv.FormatInt(d.(int64), 10) +} diff --git a/lib/syncthing/stfolder.go b/lib/syncthing/stfolder.go index bbdcc43..70ac70a 100644 --- a/lib/syncthing/stfolder.go +++ b/lib/syncthing/stfolder.go @@ -191,13 +191,11 @@ func (s *SyncThing) FolderStatus(folderID string) (*FolderStatus, error) { // IsFolderInSync Returns true when folder is in sync func (s *SyncThing) IsFolderInSync(folderID string) (bool, error) { - // FIXME better to detected FolderCompletion event (/rest/events) - // See https://docs.syncthing.net/dev/events.html sts, err := s.FolderStatus(folderID) if err != nil { return false, err } - return sts.NeedBytes == 0, nil + return sts.NeedBytes == 0 && sts.State == "idle", nil } // FolderScan Request immediate folder scan. -- cgit 1.2.3-korg