diff --git a/event_stream.go b/event_stream.go index 1ba84e3..b4442ac 100644 --- a/event_stream.go +++ b/event_stream.go @@ -23,6 +23,9 @@ type eventStream struct { isConnected bool isConnectedMutex sync.RWMutex + lastId string + lastIdMutex sync.RWMutex + transactionSubscribers map[string]chan *EventStreamResponse transactionSubscribersMutex sync.RWMutex @@ -30,7 +33,7 @@ type eventStream struct { resourceSubscribersMutex sync.RWMutex } -func newEventStream(url string, client *http.Client, authHeader string) *eventStream { +func newEventStream(url string, client *http.Client, authHeader string, lastId string) *eventStream { e := &eventStream{ Events: make(chan *sse.Event), transactionSubscribers: make(map[string]chan *EventStreamResponse), @@ -41,11 +44,16 @@ func newEventStream(url string, client *http.Client, authHeader string) *eventSt once: new(sync.Once), isConnected: false, isConnectedMutex: sync.RWMutex{}, + lastId: "", + lastIdMutex: sync.RWMutex{}, } SSEClient := sse.NewClient(url) SSEClient.Connection = client SSEClient.Headers["Authorization"] = authHeader + if lastId != "" { + SSEClient.Headers["Last-Event-ID"] = lastId + } SSEClient.OnDisconnect(func(c *sse.Client) { e.disconnect() }) @@ -87,6 +95,10 @@ func (e *eventStream) listen(ctx context.Context) error { continue } + if event.ID != nil { + e.setLastId(string(event.ID)) + } + var notifyResponse EventStreamResponse err := json.NewDecoder(bytes.NewBuffer(event.Data)).Decode(¬ifyResponse) if err != nil { @@ -198,3 +210,15 @@ func (e *eventStream) clearTransactionSubscriptions() { } e.transactionSubscribers = make(map[string]chan *EventStreamResponse) } + +func (e *eventStream) GetLastId() string { + e.lastIdMutex.RLock() + defer e.lastIdMutex.RUnlock() + return e.lastId +} + +func (e *eventStream) setLastId(value string) { + e.lastIdMutex.Lock() + defer e.lastIdMutex.Unlock() + e.lastId = value +}