1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
|
package st
import (
"encoding/json"
"fmt"
"os"
"strconv"
"strings"
"time"
"github.com/Sirupsen/logrus"
)
// Events .
type Events struct {
MonitorTime time.Duration
Debug bool
stop chan bool
st *SyncThing
log *logrus.Logger
cbArr map[string][]cbMap
}
type Event struct {
Type string `json:"type"`
Time time.Time `json:"time"`
Data map[string]string `json:"data"`
}
type EventsCBData map[string]interface{}
type EventsCB func(ev Event, cbData *EventsCBData)
const (
EventFolderCompletion string = "FolderCompletion"
EventFolderSummary string = "FolderSummary"
EventFolderPaused string = "FolderPaused"
EventFolderResumed string = "FolderResumed"
EventFolderErrors string = "FolderErrors"
EventStateChanged string = "StateChanged"
)
var EventsAll string = EventFolderCompletion + "|" +
EventFolderSummary + "|" +
EventFolderPaused + "|" +
EventFolderResumed + "|" +
EventFolderErrors + "|" +
EventStateChanged
type STEvent struct {
// Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
SubscriptionID int `json:"id"`
// Global ID of the event across all subscriptions
GlobalID int `json:"globalID"`
Time time.Time `json:"time"`
Type string `json:"type"`
Data map[string]interface{} `json:"data"`
}
type cbMap struct {
id int
cb EventsCB
filterID string
data *EventsCBData
}
// NewEventListener Create a new instance of Event listener
func (s *SyncThing) NewEventListener() *Events {
_, dbg := os.LookupEnv("XDS_DEBUG_STEVENTS") // set to add more debug log
return &Events{
MonitorTime: 100, // in Milliseconds
Debug: dbg,
stop: make(chan bool, 1),
st: s,
log: s.log,
cbArr: make(map[string][]cbMap),
}
}
// Start starts event monitoring loop
func (e *Events) Start() error {
go e.monitorLoop()
return nil
}
// Stop stops event monitoring loop
func (e *Events) Stop() {
e.stop <- true
}
// Register Add a listener on an event
func (e *Events) Register(evName string, cb EventsCB, filterID string, data *EventsCBData) (int, error) {
if evName == "" || !strings.Contains(EventsAll, evName) {
return -1, fmt.Errorf("Unknown event name")
}
if data == nil {
data = &EventsCBData{}
}
cbList := []cbMap{}
if _, ok := e.cbArr[evName]; ok {
cbList = e.cbArr[evName]
}
id := len(cbList)
(*data)["id"] = strconv.Itoa(id)
e.cbArr[evName] = append(cbList, cbMap{id: id, cb: cb, filterID: filterID, data: data})
return id, nil
}
// UnRegister Remove a listener event
func (e *Events) UnRegister(evName string, id int) error {
cbKey, ok := e.cbArr[evName]
if !ok {
return fmt.Errorf("No event registered to such name")
}
// FIXME - NOT TESTED
if id >= len(cbKey) {
return fmt.Errorf("Invalid id")
} else if id == len(cbKey) {
e.cbArr[evName] = cbKey[:id-1]
} else {
e.cbArr[evName] = cbKey[id : id+1]
}
return nil
}
// GetEvents returns the Syncthing events
func (e *Events) getEvents(since int) ([]STEvent, error) {
var data []byte
ev := []STEvent{}
url := "events"
if since != -1 {
url += "?since=" + strconv.Itoa(since)
}
if err := e.st.client.HTTPGet(url, &data); err != nil {
return ev, err
}
err := json.Unmarshal(data, &ev)
return ev, err
}
// Loop to monitor Syncthing events
func (e *Events) monitorLoop() {
e.log.Infof("Event monitoring running...")
since := 0
for {
select {
case <-e.stop:
e.log.Infof("Event monitoring exited")
return
case <-time.After(e.MonitorTime * time.Millisecond):
stEvArr, err := e.getEvents(since)
if err != nil {
e.log.Errorf("Syncthing Get Events: %v", err)
continue
}
// Process events
for _, stEv := range stEvArr {
since = stEv.SubscriptionID
if e.Debug {
e.log.Warnf("ST EVENT: %d %s\n %v", stEv.GlobalID, stEv.Type, stEv)
}
cbKey, ok := e.cbArr[stEv.Type]
if !ok {
continue
}
evData := Event{
Type: stEv.Type,
Time: stEv.Time,
}
// Decode Events
// FIXME: re-define data struct for each events
// instead of map of string and use JSON marshing/unmarshing
fID := ""
evData.Data = make(map[string]string)
switch stEv.Type {
case EventFolderCompletion:
fID = convString(stEv.Data["folder"])
evData.Data["completion"] = convFloat64(stEv.Data["completion"])
case EventFolderSummary:
fID = convString(stEv.Data["folder"])
evData.Data["needBytes"] = convInt64(stEv.Data["needBytes"])
evData.Data["state"] = convString(stEv.Data["state"])
case EventFolderPaused, EventFolderResumed:
fID = convString(stEv.Data["id"])
evData.Data["label"] = convString(stEv.Data["label"])
case EventFolderErrors:
fID = convString(stEv.Data["folder"])
// TODO decode array evData.Data["errors"] = convString(stEv.Data["errors"])
case EventStateChanged:
fID = convString(stEv.Data["folder"])
evData.Data["from"] = convString(stEv.Data["from"])
evData.Data["to"] = convString(stEv.Data["to"])
default:
e.log.Warnf("Unsupported event type")
}
if fID != "" {
evData.Data["id"] = fID
}
// Call all registered callbacks
for _, c := range cbKey {
if e.Debug {
e.log.Warnf("EVENT CB fID=%s, filterID=%s", fID, c.filterID)
}
// Call when filterID is not set or when it matches
if c.filterID == "" || (fID != "" && fID == c.filterID) {
c.cb(evData, c.data)
}
}
}
}
}
}
func convString(d interface{}) string {
return d.(string)
}
func convFloat64(d interface{}) string {
return strconv.FormatFloat(d.(float64), 'f', -1, 64)
}
func convInt64(d interface{}) string {
return strconv.FormatInt(d.(int64), 10)
}
|