aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSebastien Douheret <sebastien.douheret@iot.bzh>2017-12-05 11:02:56 +0100
committerSebastien Douheret <sebastien.douheret@iot.bzh>2017-12-05 11:02:56 +0100
commitd0d64817590d38d182faeb0e040861d3d3cb9f3b (patch)
treed96a612511e10375aaa4c6ed8552c6c813b71548
parentf75d24ca885690289c16adeac0e5c5e7bb56e36c (diff)
Fixed XDS Server connection lost and reconnection.
-rw-r--r--lib/agent/apiv1-config.go3
-rw-r--r--lib/agent/projects.go10
-rw-r--r--lib/agent/xdsserver.go54
3 files changed, 43 insertions, 24 deletions
diff --git a/lib/agent/apiv1-config.go b/lib/agent/apiv1-config.go
index bd4da33..0b1e200 100644
--- a/lib/agent/apiv1-config.go
+++ b/lib/agent/apiv1-config.go
@@ -68,6 +68,9 @@ func (s *APIService) setConfig(c *gin.Context) {
// Add new XDS Server
for _, svr := range cfgArg.Servers {
+ if svr.Connected && svr.ID != "" {
+ continue
+ }
cfg := xdsconfig.XDSServerConf{
ID: svr.ID,
URL: svr.URL,
diff --git a/lib/agent/projects.go b/lib/agent/projects.go
index a0b641a..a2d8fe1 100644
--- a/lib/agent/projects.go
+++ b/lib/agent/projects.go
@@ -248,10 +248,12 @@ func (p *Projects) createUpdate(newF xaapiv1.ProjectConfig, create bool, initial
// Force sync after creation
// (need to defer to be sure that WS events will arrive after HTTP creation reply)
- go func() {
- time.Sleep(time.Millisecond * 500)
- fld.Sync()
- }()
+ if create {
+ go func() {
+ time.Sleep(time.Millisecond * 500)
+ fld.Sync()
+ }()
+ }
return newPrj, nil
}
diff --git a/lib/agent/xdsserver.go b/lib/agent/xdsserver.go
index 98f7649..7020ef0 100644
--- a/lib/agent/xdsserver.go
+++ b/lib/agent/xdsserver.go
@@ -96,11 +96,9 @@ func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
// Close Free and close XDS Server connection
func (xs *XdsServer) Close() error {
- xs.Connected = false
+ err := xs._Disconnected()
xs.Disabled = true
- xs.ioSock = nil
- xs._NotifyState()
- return nil
+ return err
}
// Connect Establish HTTP connection with XDS Server
@@ -125,7 +123,7 @@ func (xs *XdsServer) Connect() error {
time.Sleep(time.Second)
}
if retry == 0 {
- // FIXME: re-use _reconnect to wait longer in background
+ // FIXME: re-use _Reconnect to wait longer in background
return fmt.Errorf("Connection to XDS Server failure")
}
if err != nil {
@@ -133,7 +131,7 @@ func (xs *XdsServer) Connect() error {
}
// Check HTTP connection and establish WS connection
- err = xs._connect(false)
+ err = xs._Connect(false)
return err
}
@@ -223,8 +221,7 @@ func (xs *XdsServer) PassthroughGet(url string) {
// Send Get request
if err := xs.client.Get(nURL, &data); err != nil {
if strings.Contains(err.Error(), "connection refused") {
- xs.Connected = false
- xs._NotifyState()
+ xs._Disconnected()
}
common.APIError(c, err.Error())
return
@@ -348,6 +345,7 @@ func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uu
}
xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
+ xs.LogSillyf("XS EventOn: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
return c.id, nil
}
@@ -369,6 +367,7 @@ func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
}
}
}
+ xs.LogSillyf("XS EventOff: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
return nil
}
@@ -502,9 +501,9 @@ func (xs *XdsServer) _CreateConnectHTTP() error {
return nil
}
-// Re-established connection
-func (xs *XdsServer) _reconnect() error {
- err := xs._connect(true)
+// _Reconnect Re-established connection
+func (xs *XdsServer) _Reconnect() error {
+ err := xs._Connect(true)
if err == nil {
// Reload projects list for this server
err = xs.projects.Init(xs)
@@ -512,8 +511,8 @@ func (xs *XdsServer) _reconnect() error {
return err
}
-// Established HTTP and WS connection and retrieve XDSServer config
-func (xs *XdsServer) _connect(reConn bool) error {
+// _Connect Established HTTP and WS connection and retrieve XDSServer config
+func (xs *XdsServer) _Connect(reConn bool) error {
xdsCfg := xsapiv1.APIConfig{}
if err := xs.client.Get("/config", &xdsCfg); err != nil {
@@ -534,8 +533,7 @@ func (xs *XdsServer) _connect(reConn bool) error {
// Establish WS connection and register listen
if err := xs._SocketConnect(); err != nil {
- xs.Connected = false
- xs._NotifyState()
+ xs._Disconnected()
return err
}
@@ -544,7 +542,7 @@ func (xs *XdsServer) _connect(reConn bool) error {
return nil
}
-// Create WebSocket (io.socket) connection
+// _SocketConnect Create WebSocket (io.socket) connection
func (xs *XdsServer) _SocketConnect() error {
xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
@@ -575,8 +573,7 @@ func (xs *XdsServer) _SocketConnect() error {
if xs.CBOnDisconnect != nil {
xs.CBOnDisconnect(err)
}
- xs.Connected = false
- xs._NotifyState()
+ xs._Disconnected()
// Try to reconnect during 15min (or at least while not disabled)
go func() {
@@ -594,7 +591,12 @@ func (xs *XdsServer) _SocketConnect() error {
time.Sleep(time.Second * time.Duration(waitTime))
xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
- xs._reconnect()
+ err := xs._Reconnect()
+ if err != nil &&
+ !(strings.Contains(err.Error(), "dial tcp") && strings.Contains(err.Error(), "connection refused")) {
+ xs.Log.Errorf("ERROR while reconnecting: %v", err.Error())
+ }
+
}
}()
})
@@ -607,7 +609,19 @@ func (xs *XdsServer) _SocketConnect() error {
return nil
}
-// Send event to notify changes
+// _Disconnected Set XDS Server as disconnected
+func (xs *XdsServer) _Disconnected() error {
+ // Clear all register events as socket is closed
+ for k := range xs.sockEvents {
+ delete(xs.sockEvents, k)
+ }
+ xs.Connected = false
+ xs.ioSock = nil
+ xs._NotifyState()
+ return nil
+}
+
+// _NotifyState Send event to notify changes
func (xs *XdsServer) _NotifyState() {
evSts := xaapiv1.ServerCfg{