From 89563054ea6305c7270dcc6ab6fa5b286eb5f742 Mon Sep 17 00:00:00 2001 From: Sebastien Douheret Date: Fri, 15 Dec 2017 22:48:35 +0100 Subject: Fixed xds-server folder events detection. Signed-off-by: Sebastien Douheret --- lib/agent/agent.go | 6 +++--- lib/agent/project-st.go | 31 +++++++++++++++++++++++-------- lib/agent/projects.go | 12 +++++------- lib/agent/xdsserver.go | 17 +---------------- lib/xaapiv1/events.go | 2 +- 5 files changed, 33 insertions(+), 35 deletions(-) (limited to 'lib') diff --git a/lib/agent/agent.go b/lib/agent/agent.go index a562e77..989c9a0 100644 --- a/lib/agent/agent.go +++ b/lib/agent/agent.go @@ -132,6 +132,9 @@ func (ctx *Context) Run() (int, error) { ctx._logPrint("Logging file for HTTP requests: %s\n", logFileHTTPReq) } + // Create events management + ctx.events = NewEvents(ctx) + // Create syncthing instance when section "syncthing" is present in agent-config.json if ctx.Config.FileConf.SThgConf != nil { ctx.SThg = st.NewSyncThing(ctx.Config, ctx.Log) @@ -186,9 +189,6 @@ func (ctx *Context) Run() (int, error) { // Sessions manager ctx.sessions = NewClientSessions(ctx, cookieMaxAge) - // Create events management - ctx.events = NewEvents(ctx) - // Create projects management ctx.projects = NewProjects(ctx, ctx.SThg) diff --git a/lib/agent/project-st.go b/lib/agent/project-st.go index a68bd19..8ac5f36 100644 --- a/lib/agent/project-st.go +++ b/lib/agent/project-st.go @@ -18,6 +18,7 @@ package agent import ( + "encoding/json" "fmt" st "github.com/iotbzh/xds-agent/lib/syncthing" @@ -105,8 +106,8 @@ func (p *STProject) Setup(prj xaapiv1.ProjectConfig) (*xaapiv1.ProjectConfig, er // Register events to update folder status // Register to XDS Server events - p.server.EventOn("event:folder-state-change", "", p._cbServerFolderChanged) - if err := p.server.EventRegister("folder-state-change", svrPrj.ID); err != nil { + p.server.EventOn(xsapiv1.EVTFolderStateChange, "", p._cbServerFolderChanged) + if err := p.server.EventRegister(xsapiv1.EVTFolderStateChange, svrPrj.ID); err != nil { p.Log.Warningf("XDS Server EventRegister failed: %v", err) return svrPrj, err } @@ -164,18 +165,32 @@ func (p *STProject) IsInSync() (bool, error) { // callback use to update (XDS Server) folder IsInSync status func (p *STProject) _cbServerFolderChanged(pData interface{}, data interface{}) error { - evt := data.(xsapiv1.EventMsg) + evt := xsapiv1.EventMsg{} + d, err := json.Marshal(data) + if err != nil { + p.Log.Errorf("Cannot marshal XDS Server event folder-change err=%v", err) + return err + } + if err = json.Unmarshal(d, &evt); err != nil { + p.Log.Errorf("Cannot unmarshal XDS Server event folder-change err=%v", err) + return err + } + + fld, err := evt.DecodeFolderConfig() + if err != nil { + p.Log.Errorf("Cannot decode FolderChanged event: %v", data) + } // Only process event that concerns this project/folder ID - if p.folder.ID != evt.Folder.ID { + if p.folder.ID != fld.ID { return nil } - if evt.Folder.IsInSync != p.folder.DataCloudSync.STSvrIsInSync || - evt.Folder.Status != p.folder.DataCloudSync.STSvrStatus { + if fld.IsInSync != p.folder.DataCloudSync.STSvrIsInSync || + fld.Status != p.folder.DataCloudSync.STSvrStatus { - p.folder.DataCloudSync.STSvrIsInSync = evt.Folder.IsInSync - p.folder.DataCloudSync.STSvrStatus = evt.Folder.Status + p.folder.DataCloudSync.STSvrIsInSync = fld.IsInSync + p.folder.DataCloudSync.STSvrStatus = fld.Status if err := p.events.Emit(xaapiv1.EVTProjectChange, p.server.FolderToProject(*p.folder), ""); err != nil { p.Log.Warningf("Cannot notify project change (from server): %v", err) diff --git a/lib/agent/projects.go b/lib/agent/projects.go index a2d8fe1..d6268fa 100644 --- a/lib/agent/projects.go +++ b/lib/agent/projects.go @@ -246,14 +246,12 @@ func (p *Projects) createUpdate(newF xaapiv1.ProjectConfig, create bool, initial // Add to folders list p.projects[newPrj.ID] = &fld - // Force sync after creation + // Force sync to get an initial sync status // (need to defer to be sure that WS events will arrive after HTTP creation reply) - if create { - go func() { - time.Sleep(time.Millisecond * 500) - fld.Sync() - }() - } + go func() { + time.Sleep(time.Millisecond * 500) + fld.Sync() + }() return newPrj, nil } diff --git a/lib/agent/xdsserver.go b/lib/agent/xdsserver.go index 7020ef0..32656cf 100644 --- a/lib/agent/xdsserver.go +++ b/lib/agent/xdsserver.go @@ -307,10 +307,7 @@ func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uu // Register listener only the first time evn := evName - // FIXME: use generic type: data interface{} instead of data xsapiv1.EventMsg - var err error - if evName == "event:folder-state-change" { - err = xs.ioSock.On(evn, func(data xsapiv1.EventMsg) error { + err := xs.ioSock.On(evn, func(data interface{}) error { xs.sockEventsLock.Lock() sEvts := make([]*caller, len(xs.sockEvents[evn])) copy(sEvts, xs.sockEvents[evn]) @@ -320,18 +317,6 @@ func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uu } return nil }) - } else { - err = xs.ioSock.On(evn, func(data interface{}) error { - xs.sockEventsLock.Lock() - sEvts := make([]*caller, len(xs.sockEvents[evn])) - copy(sEvts, xs.sockEvents[evn]) - xs.sockEventsLock.Unlock() - for _, c := range sEvts { - c.Func(c.PrivateData, data) - } - return nil - }) - } if err != nil { return uuid.Nil, err } diff --git a/lib/xaapiv1/events.go b/lib/xaapiv1/events.go index 85dc02a..0ac08e8 100644 --- a/lib/xaapiv1/events.go +++ b/lib/xaapiv1/events.go @@ -58,7 +58,7 @@ var EVTAllList = []string{ // EventMsg Event message send over Websocket, data format depend to Type (see DecodeXXX function) type EventMsg struct { Time string `json:"time"` // Timestamp - FromSessionID string `json:"sessionID"` // Session ID of client that emits this event + FromSessionID string `json:"sessionID"` // Session ID of client who produce this event Type string `json:"type"` // Data type Data interface{} `json:"data"` // Data } -- cgit 1.2.3-korg