aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSebastien Douheret <sebastien.douheret@iot.bzh>2017-11-08 10:19:54 +0100
committerSebastien Douheret <sebastien.douheret@iot.bzh>2017-11-08 10:19:54 +0100
commit02aec942b44eecd2ea9b311bb4ba2d60cce21e9a (patch)
treed5243e01ffaba8f23c86c9660eb1f7c0765d4dd8
parentd96e5b59d352f1c9eaf73a13d5c4bc25c6a4ebc4 (diff)
Fixed events definition and callback processing.
-rw-r--r--lib/agent/apiv1-exec.go51
-rw-r--r--lib/agent/events.go5
-rw-r--r--lib/agent/project-st.go7
-rw-r--r--lib/agent/xdsserver.go38
-rw-r--r--lib/apiv1/events.go10
5 files changed, 77 insertions, 34 deletions
diff --git a/lib/agent/apiv1-exec.go b/lib/agent/apiv1-exec.go
index 9c65bc2..c199267 100644
--- a/lib/agent/apiv1-exec.go
+++ b/lib/agent/apiv1-exec.go
@@ -12,6 +12,7 @@ import (
)
var execCmdID = 1
+var fwdFuncID []uuid.UUID
// ExecCmd executes remotely a command
func (s *APIService) execCmd(c *gin.Context) {
@@ -38,11 +39,15 @@ func (s *APIService) _execRequest(cmd string, c *gin.Context) {
}
// First get Project ID to retrieve Server ID and send command to right server
- id := c.Param("id")
- if id == "" {
- id = args.ID
+ iid := c.Param("id")
+ if iid == "" {
+ iid = args.ID
+ }
+ id, err := s.projects.ResolveID(iid)
+ if err != nil {
+ common.APIError(c, err.Error())
+ return
}
-
prj := s.projects.Get(id)
if prj == nil {
common.APIError(c, "Unknown id")
@@ -75,15 +80,23 @@ func (s *APIService) _execRequest(cmd string, c *gin.Context) {
apiv1.ExecInferiorInEvent,
apiv1.ExecInferiorOutEvent,
}
- so := *sock
- fwdFuncID := []uuid.UUID{}
+
for _, evName := range evtList {
evN := evName
- fwdFunc := func(evData interface{}) {
+ fwdFunc := func(pData interface{}, evData interface{}) error {
+ sid := pData.(string)
+ // IO socket can be nil when disconnected
+ so := s.sessions.IOSocketGet(sid)
+ if so == nil {
+ s.Log.Infof("%s not emitted: WS closed (sid:%s)", evN, sid)
+ return nil
+ }
+
// Forward event to Client/Dashboard
- so.Emit(evN, evData)
+ (*so).Emit(evN, evData)
+ return nil
}
- id, err := svr.EventOn(evN, fwdFunc)
+ id, err := svr.EventOn(evN, sess.ID, fwdFunc)
if err != nil {
common.APIError(c, err.Error())
return
@@ -93,16 +106,28 @@ func (s *APIService) _execRequest(cmd string, c *gin.Context) {
// Handle Exit event separately to cleanup registered listener
var exitFuncID uuid.UUID
- exitFunc := func(evData interface{}) {
- so.Emit(apiv1.ExecExitEvent, evData)
+ exitFunc := func(pData interface{}, evData interface{}) error {
+ evN := apiv1.ExecExitEvent
+ sid := pData.(string)
+
+ // IO socket can be nil when disconnected
+ so := s.sessions.IOSocketGet(sid)
+ if so == nil {
+ s.Log.Infof("%s not emitted: WS closed (sid:%s)", evN, sid)
+ return nil
+ }
+
+ (*so).Emit(evN, evData)
// cleanup listener
for i, evName := range evtList {
svr.EventOff(evName, fwdFuncID[i])
}
- svr.EventOff(apiv1.ExecExitEvent, exitFuncID)
+ svr.EventOff(evN, exitFuncID)
+
+ return nil
}
- exitFuncID, err = svr.EventOn(apiv1.ExecExitEvent, exitFunc)
+ exitFuncID, err = svr.EventOn(apiv1.ExecExitEvent, sess.ID, exitFunc)
if err != nil {
common.APIError(c, err.Error())
return
diff --git a/lib/agent/events.go b/lib/agent/events.go
index 046c377..2684ff5 100644
--- a/lib/agent/events.go
+++ b/lib/agent/events.go
@@ -104,8 +104,9 @@ func (e *Events) Emit(evName string, data interface{}) error {
Type: evName,
Data: data,
}
- if err := (*so).Emit(apiv1.EventTypePrefix+evName, msg); err != nil {
- e.Log.Errorf("WS Emit %v error : %v", apiv1.EventTypePrefix+evName, err)
+ e.Log.Debugf("Emit Event %s: %v", evName, sid)
+ if err := (*so).Emit(evName, msg); err != nil {
+ e.Log.Errorf("WS Emit %v error : %v", evName, err)
if firstErr == nil {
firstErr = err
}
diff --git a/lib/agent/project-st.go b/lib/agent/project-st.go
index dba5978..cd55656 100644
--- a/lib/agent/project-st.go
+++ b/lib/agent/project-st.go
@@ -85,7 +85,7 @@ func (p *STProject) UpdateProject(prj apiv1.ProjectConfig) (*apiv1.ProjectConfig
// Register events to update folder status
// Register to XDS Server events
- p.server.EventOn("event:FolderStateChanged", p._cbServerFolderChanged)
+ p.server.EventOn("event:FolderStateChanged", "", p._cbServerFolderChanged)
if err := p.server.EventRegister("FolderStateChanged", svrPrj.ID); err != nil {
p.Log.Warningf("XDS Server EventRegister failed: %v", err)
return svrPrj, err
@@ -128,12 +128,12 @@ func (p *STProject) IsInSync() (bool, error) {
// callback use to update (XDS Server) folder IsInSync status
-func (p *STProject) _cbServerFolderChanged(data interface{}) {
+func (p *STProject) _cbServerFolderChanged(pData interface{}, data interface{}) error {
evt := data.(XdsEventFolderChange)
// Only process event that concerns this project/folder ID
if p.folder.ID != evt.Folder.ID {
- return
+ return nil
}
if evt.Folder.IsInSync != p.folder.DataCloudSync.STSvrIsInSync ||
@@ -146,6 +146,7 @@ func (p *STProject) _cbServerFolderChanged(data interface{}) {
p.Log.Warningf("Cannot notify project change: %v", err)
}
}
+ return nil
}
// callback use to update IsInSync status
diff --git a/lib/agent/xdsserver.go b/lib/agent/xdsserver.go
index 2037094..73a5bd9 100644
--- a/lib/agent/xdsserver.go
+++ b/lib/agent/xdsserver.go
@@ -112,11 +112,15 @@ type XdsEventFolderChange struct {
Folder XdsFolderConfig `json:"folder"`
}
+// Event emitter callback
+type EventCB func(privData interface{}, evtData interface{}) error
+
// caller Used to chain event listeners
type caller struct {
- id uuid.UUID
- EventName string
- Func func(interface{})
+ id uuid.UUID
+ EventName string
+ Func EventCB
+ PrivateData interface{}
}
const _IDTempoPrefix = "tempo-"
@@ -316,7 +320,7 @@ func (xs *XdsServer) EventRegister(evName string, id string) error {
}
// EventOn Register a callback on events reception
-func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, error) {
+func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
if xs.ioSock == nil {
return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
}
@@ -333,14 +337,25 @@ func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, err
if evName == "event:FolderStateChanged" {
err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error {
xs.sockEventsLock.Lock()
- defer xs.sockEventsLock.Unlock()
- for _, c := range xs.sockEvents[evn] {
- c.Func(data)
+ sEvts := make([]*caller, len(xs.sockEvents[evn]))
+ copy(sEvts, xs.sockEvents[evn])
+ xs.sockEventsLock.Unlock()
+ for _, c := range sEvts {
+ c.Func(c.PrivateData, data)
}
return nil
})
} else {
- err = xs.ioSock.On(evn, f)
+ err = xs.ioSock.On(evn, func(data interface{}) error {
+ xs.sockEventsLock.Lock()
+ sEvts := make([]*caller, len(xs.sockEvents[evn]))
+ copy(sEvts, xs.sockEvents[evn])
+ xs.sockEventsLock.Unlock()
+ for _, c := range sEvts {
+ c.Func(c.PrivateData, data)
+ }
+ return nil
+ })
}
if err != nil {
return uuid.Nil, err
@@ -348,9 +363,10 @@ func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, err
}
c := &caller{
- id: uuid.NewV1(),
- EventName: evName,
- Func: f,
+ id: uuid.NewV1(),
+ EventName: evName,
+ Func: f,
+ PrivateData: privData,
}
xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
diff --git a/lib/apiv1/events.go b/lib/apiv1/events.go
index 8bad394..da9a2af 100644
--- a/lib/apiv1/events.go
+++ b/lib/apiv1/events.go
@@ -18,11 +18,11 @@ const (
EventTypePrefix = "event:" // following by event type
// Supported Events type
- EVTAll = "all"
- EVTServerConfig = "server-config" // data type apiv1.ServerCfg
- EVTProjectAdd = "project-add" // data type apiv1.ProjectConfig
- EVTProjectDelete = "project-delete" // data type apiv1.ProjectConfig
- EVTProjectChange = "project-state-change" // data type apiv1.ProjectConfig
+ EVTAll = EventTypePrefix + "all"
+ EVTServerConfig = EventTypePrefix + "server-config" // data type apiv1.ServerCfg
+ EVTProjectAdd = EventTypePrefix + "project-add" // data type apiv1.ProjectConfig
+ EVTProjectDelete = EventTypePrefix + "project-delete" // data type apiv1.ProjectConfig
+ EVTProjectChange = EventTypePrefix + "project-state-change" // data type apiv1.ProjectConfig
)
// EventMsg Message send