diff options
author | Sebastien Douheret <sebastien.douheret@iot.bzh> | 2017-12-05 11:02:56 +0100 |
---|---|---|
committer | Sebastien Douheret <sebastien.douheret@iot.bzh> | 2017-12-05 11:02:56 +0100 |
commit | d0d64817590d38d182faeb0e040861d3d3cb9f3b (patch) | |
tree | d96a612511e10375aaa4c6ed8552c6c813b71548 | |
parent | f75d24ca885690289c16adeac0e5c5e7bb56e36c (diff) |
Fixed XDS Server connection lost and reconnection.
-rw-r--r-- | lib/agent/apiv1-config.go | 3 | ||||
-rw-r--r-- | lib/agent/projects.go | 10 | ||||
-rw-r--r-- | lib/agent/xdsserver.go | 54 |
3 files changed, 43 insertions, 24 deletions
diff --git a/lib/agent/apiv1-config.go b/lib/agent/apiv1-config.go index bd4da33..0b1e200 100644 --- a/lib/agent/apiv1-config.go +++ b/lib/agent/apiv1-config.go @@ -68,6 +68,9 @@ func (s *APIService) setConfig(c *gin.Context) { // Add new XDS Server for _, svr := range cfgArg.Servers { + if svr.Connected && svr.ID != "" { + continue + } cfg := xdsconfig.XDSServerConf{ ID: svr.ID, URL: svr.URL, diff --git a/lib/agent/projects.go b/lib/agent/projects.go index a0b641a..a2d8fe1 100644 --- a/lib/agent/projects.go +++ b/lib/agent/projects.go @@ -248,10 +248,12 @@ func (p *Projects) createUpdate(newF xaapiv1.ProjectConfig, create bool, initial // Force sync after creation // (need to defer to be sure that WS events will arrive after HTTP creation reply) - go func() { - time.Sleep(time.Millisecond * 500) - fld.Sync() - }() + if create { + go func() { + time.Sleep(time.Millisecond * 500) + fld.Sync() + }() + } return newPrj, nil } diff --git a/lib/agent/xdsserver.go b/lib/agent/xdsserver.go index 98f7649..7020ef0 100644 --- a/lib/agent/xdsserver.go +++ b/lib/agent/xdsserver.go @@ -96,11 +96,9 @@ func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer { // Close Free and close XDS Server connection func (xs *XdsServer) Close() error { - xs.Connected = false + err := xs._Disconnected() xs.Disabled = true - xs.ioSock = nil - xs._NotifyState() - return nil + return err } // Connect Establish HTTP connection with XDS Server @@ -125,7 +123,7 @@ func (xs *XdsServer) Connect() error { time.Sleep(time.Second) } if retry == 0 { - // FIXME: re-use _reconnect to wait longer in background + // FIXME: re-use _Reconnect to wait longer in background return fmt.Errorf("Connection to XDS Server failure") } if err != nil { @@ -133,7 +131,7 @@ func (xs *XdsServer) Connect() error { } // Check HTTP connection and establish WS connection - err = xs._connect(false) + err = xs._Connect(false) return err } @@ -223,8 +221,7 @@ func (xs *XdsServer) PassthroughGet(url string) { // Send Get request if err := xs.client.Get(nURL, &data); err != nil { if strings.Contains(err.Error(), "connection refused") { - xs.Connected = false - xs._NotifyState() + xs._Disconnected() } common.APIError(c, err.Error()) return @@ -348,6 +345,7 @@ func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uu } xs.sockEvents[evName] = append(xs.sockEvents[evName], c) + xs.LogSillyf("XS EventOn: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName])) return c.id, nil } @@ -369,6 +367,7 @@ func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error { } } } + xs.LogSillyf("XS EventOff: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName])) return nil } @@ -502,9 +501,9 @@ func (xs *XdsServer) _CreateConnectHTTP() error { return nil } -// Re-established connection -func (xs *XdsServer) _reconnect() error { - err := xs._connect(true) +// _Reconnect Re-established connection +func (xs *XdsServer) _Reconnect() error { + err := xs._Connect(true) if err == nil { // Reload projects list for this server err = xs.projects.Init(xs) @@ -512,8 +511,8 @@ func (xs *XdsServer) _reconnect() error { return err } -// Established HTTP and WS connection and retrieve XDSServer config -func (xs *XdsServer) _connect(reConn bool) error { +// _Connect Established HTTP and WS connection and retrieve XDSServer config +func (xs *XdsServer) _Connect(reConn bool) error { xdsCfg := xsapiv1.APIConfig{} if err := xs.client.Get("/config", &xdsCfg); err != nil { @@ -534,8 +533,7 @@ func (xs *XdsServer) _connect(reConn bool) error { // Establish WS connection and register listen if err := xs._SocketConnect(); err != nil { - xs.Connected = false - xs._NotifyState() + xs._Disconnected() return err } @@ -544,7 +542,7 @@ func (xs *XdsServer) _connect(reConn bool) error { return nil } -// Create WebSocket (io.socket) connection +// _SocketConnect Create WebSocket (io.socket) connection func (xs *XdsServer) _SocketConnect() error { xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL) @@ -575,8 +573,7 @@ func (xs *XdsServer) _SocketConnect() error { if xs.CBOnDisconnect != nil { xs.CBOnDisconnect(err) } - xs.Connected = false - xs._NotifyState() + xs._Disconnected() // Try to reconnect during 15min (or at least while not disabled) go func() { @@ -594,7 +591,12 @@ func (xs *XdsServer) _SocketConnect() error { time.Sleep(time.Second * time.Duration(waitTime)) xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count) - xs._reconnect() + err := xs._Reconnect() + if err != nil && + !(strings.Contains(err.Error(), "dial tcp") && strings.Contains(err.Error(), "connection refused")) { + xs.Log.Errorf("ERROR while reconnecting: %v", err.Error()) + } + } }() }) @@ -607,7 +609,19 @@ func (xs *XdsServer) _SocketConnect() error { return nil } -// Send event to notify changes +// _Disconnected Set XDS Server as disconnected +func (xs *XdsServer) _Disconnected() error { + // Clear all register events as socket is closed + for k := range xs.sockEvents { + delete(xs.sockEvents, k) + } + xs.Connected = false + xs.ioSock = nil + xs._NotifyState() + return nil +} + +// _NotifyState Send event to notify changes func (xs *XdsServer) _NotifyState() { evSts := xaapiv1.ServerCfg{ |