package arlo import ( "bytes" "context" "encoding/json" "fmt" "net/http" "sync" "github.com/r3labs/sse" log "github.com/sirupsen/logrus" ) type eventStream struct { SSEClient *sse.Client Events chan *sse.Event Error chan error Verbose bool disconnectedChan chan interface{} once *sync.Once isConnected bool isConnectedMutex sync.RWMutex lastId string lastIdMutex sync.RWMutex transactionSubscribers map[string]chan *EventStreamResponse transactionSubscribersMutex sync.RWMutex resourceSubscribers map[string][]chan *EventStreamResponse resourceSubscribersMutex sync.RWMutex } func newEventStream(url string, client *http.Client, authHeader string, lastId string) *eventStream { e := &eventStream{ Events: make(chan *sse.Event), transactionSubscribers: make(map[string]chan *EventStreamResponse), transactionSubscribersMutex: sync.RWMutex{}, resourceSubscribers: make(map[string][]chan *EventStreamResponse), resourceSubscribersMutex: sync.RWMutex{}, disconnectedChan: make(chan interface{}), once: new(sync.Once), isConnected: false, isConnectedMutex: sync.RWMutex{}, lastId: "", lastIdMutex: sync.RWMutex{}, } SSEClient := sse.NewClient(url) SSEClient.Connection = client SSEClient.Headers["Authorization"] = authHeader if lastId != "" { SSEClient.Headers["Last-Event-ID"] = lastId } SSEClient.OnDisconnect(func(c *sse.Client) { e.disconnect() }) e.SSEClient = SSEClient return e } func (e *eventStream) disconnect() { e.once.Do(func() { log.Info("disconnect > close disconnectedChan") close(e.disconnectedChan) }) } func (e *eventStream) listen(ctx context.Context) error { err := e.SSEClient.SubscribeChanRaw(e.Events) if err != nil { return fmt.Errorf("failed to subscribe to seeclient") } go func() { defer func() { e.clearResourceSubscriptions() e.clearTransactionSubscriptions() }() for { select { case <-ctx.Done(): e.disconnect() return case event, ok := <-e.Events: if !ok { e.disconnect() return } if event == nil || event.Data == nil { log.Warn("EventStream > nil event or nil data in event") e.disconnect() continue } if event.ID != nil { e.setLastId(string(event.ID)) } log.Tracef("Data > %s", event.Data) var notifyResponse EventStreamResponse err := json.NewDecoder(bytes.NewBuffer(event.Data)).Decode(¬ifyResponse) if err != nil { log.Warnf("EventStream > failed to decode event: %s", event.Data) continue } bytesProperties, err := json.Marshal(notifyResponse.EventStreamPayload.Properties) if err != nil { log.Warnf("EventStream > failed to marshal raw properties: %s", event.Data) continue } notifyResponse.RawProperties = bytesProperties if notifyResponse.Status == "connected" { e.setIsConnected(true) continue } if notifyResponse.Status == "disconnected" { e.disconnect() continue } if notifyResponse.Action == "logout" { log.Warn("EventStream > logged out") e.disconnect() continue } e.transactionSubscribersMutex.RLock() subscriber, ok := e.transactionSubscribers[notifyResponse.TransId] e.transactionSubscribersMutex.RUnlock() if ok { subscriber <- ¬ifyResponse e.removeTransactionSubscription(notifyResponse.TransId) } e.resourceSubscribersMutex.RLock() subscribers, ok := e.resourceSubscribers[notifyResponse.Resource] e.resourceSubscribersMutex.RUnlock() if ok { for _, s := range subscribers { s <- ¬ifyResponse } } case <-e.disconnectedChan: log.Info("listen loop <- disconnectedchan") e.setIsConnected(false) return } } }() return nil } func (e *eventStream) GetIsConnected() bool { e.isConnectedMutex.RLock() defer e.isConnectedMutex.RUnlock() return e.isConnected } func (e *eventStream) setIsConnected(isConnected bool) { e.isConnectedMutex.Lock() defer e.isConnectedMutex.Unlock() e.isConnected = isConnected } func (e *eventStream) subscribeTransaction(transId string) chan *EventStreamResponse { msgchan := make(chan *EventStreamResponse) e.transactionSubscribersMutex.Lock() e.transactionSubscribers[transId] = msgchan e.transactionSubscribersMutex.Unlock() return msgchan } func (e *eventStream) subscribeResource(resource string) chan *EventStreamResponse { msgchan := make(chan *EventStreamResponse) e.resourceSubscribersMutex.Lock() e.resourceSubscribers[resource] = append(e.resourceSubscribers[resource], msgchan) e.resourceSubscribersMutex.Unlock() return msgchan } func (e *eventStream) removeTransactionSubscription(transId string) { e.transactionSubscribersMutex.Lock() defer e.transactionSubscribersMutex.Unlock() subscriber, ok := e.transactionSubscribers[transId] if ok { close(subscriber) delete(e.transactionSubscribers, transId) } } func (e *eventStream) clearResourceSubscriptions() { e.resourceSubscribersMutex.Lock() defer e.resourceSubscribersMutex.Unlock() for _, subscribers := range e.resourceSubscribers { for _, subscriber := range subscribers { close(subscriber) } } e.resourceSubscribers = make(map[string][]chan *EventStreamResponse) } func (e *eventStream) clearTransactionSubscriptions() { e.transactionSubscribersMutex.Lock() defer e.transactionSubscribersMutex.Unlock() for _, subscriber := range e.transactionSubscribers { close(subscriber) } e.transactionSubscribers = make(map[string]chan *EventStreamResponse) } func (e *eventStream) GetLastId() string { e.lastIdMutex.RLock() defer e.lastIdMutex.RUnlock() return e.lastId } func (e *eventStream) setLastId(value string) { e.lastIdMutex.Lock() defer e.lastIdMutex.Unlock() e.lastId = value }