gestion du lastid

This commit is contained in:
Laurent Le Houerou 2020-06-23 09:59:40 +04:00
parent d78269f5b9
commit 7579a341d5

View File

@ -23,6 +23,9 @@ type eventStream struct {
isConnected bool
isConnectedMutex sync.RWMutex
lastId string
lastIdMutex sync.RWMutex
transactionSubscribers map[string]chan *EventStreamResponse
transactionSubscribersMutex sync.RWMutex
@ -30,7 +33,7 @@ type eventStream struct {
resourceSubscribersMutex sync.RWMutex
}
func newEventStream(url string, client *http.Client, authHeader string) *eventStream {
func newEventStream(url string, client *http.Client, authHeader string, lastId string) *eventStream {
e := &eventStream{
Events: make(chan *sse.Event),
transactionSubscribers: make(map[string]chan *EventStreamResponse),
@ -41,11 +44,16 @@ func newEventStream(url string, client *http.Client, authHeader string) *eventSt
once: new(sync.Once),
isConnected: false,
isConnectedMutex: sync.RWMutex{},
lastId: "",
lastIdMutex: sync.RWMutex{},
}
SSEClient := sse.NewClient(url)
SSEClient.Connection = client
SSEClient.Headers["Authorization"] = authHeader
if lastId != "" {
SSEClient.Headers["Last-Event-ID"] = lastId
}
SSEClient.OnDisconnect(func(c *sse.Client) {
e.disconnect()
})
@ -87,6 +95,10 @@ func (e *eventStream) listen(ctx context.Context) error {
continue
}
if event.ID != nil {
e.setLastId(string(event.ID))
}
var notifyResponse EventStreamResponse
err := json.NewDecoder(bytes.NewBuffer(event.Data)).Decode(&notifyResponse)
if err != nil {
@ -198,3 +210,15 @@ func (e *eventStream) clearTransactionSubscriptions() {
}
e.transactionSubscribers = make(map[string]chan *EventStreamResponse)
}
func (e *eventStream) GetLastId() string {
e.lastIdMutex.RLock()
defer e.lastIdMutex.RUnlock()
return e.lastId
}
func (e *eventStream) setLastId(value string) {
e.lastIdMutex.Lock()
defer e.lastIdMutex.Unlock()
e.lastId = value
}