package arlo import ( "bytes" "context" "encoding/json" "fmt" "net/http" "sync" "github.com/r3labs/sse" log "github.com/sirupsen/logrus" ) type eventStream struct { SSEClient *sse.Client Events chan *sse.Event Error chan error Verbose bool disconnectedChan chan interface{} once *sync.Once transactionSubscribers map[string]chan *EventStreamResponse transactionSubscribersMutex sync.RWMutex resourceSubscribers map[string][]chan *EventStreamResponse resourceSubscribersMutex sync.RWMutex } func newEventStream(url string, client *http.Client) *eventStream { e := &eventStream{ Events: make(chan *sse.Event), transactionSubscribers: make(map[string]chan *EventStreamResponse), transactionSubscribersMutex: sync.RWMutex{}, resourceSubscribers: make(map[string][]chan *EventStreamResponse), resourceSubscribersMutex: sync.RWMutex{}, disconnectedChan: make(chan interface{}), once: new(sync.Once), } SSEClient := sse.NewClient(url) SSEClient.Connection = client SSEClient.OnDisconnect(func(c *sse.Client) { e.disconnect() }) e.SSEClient = SSEClient return e } func (e *eventStream) disconnect() { e.once.Do(func() { close(e.disconnectedChan) }) } func (e *eventStream) listen(ctx context.Context) (chan bool, error) { err := e.SSEClient.SubscribeChanRaw(e.Events) if err != nil { return nil, fmt.Errorf("failed to subscribe to seeclient") } connectedChan := make(chan bool) go func() { defer func() { e.clearResourceSubscriptions() e.clearTransactionSubscriptions() }() for { select { case <-ctx.Done(): e.disconnect() return case event, ok := <-e.Events: if !ok { return } if event == nil || event.Data == nil { log.Warn("EventStream > nil event or nil data in event") continue } fmt.Printf("DATA : %s\n", event.Data) var notifyResponse EventStreamResponse err := json.NewDecoder(bytes.NewBuffer(event.Data)).Decode(¬ifyResponse) if err != nil { log.Warnf("EventStream > failed to decode event: %s", event.Data) continue } bytesProperties, err := json.Marshal(notifyResponse.EventStreamPayload.Properties) if err != nil { log.Warnf("EventStream > failed to marshal raw properties: %s", event.Data) continue } notifyResponse.RawProperties = bytesProperties if notifyResponse.Status == "connected" { connectedChan <- true continue } if notifyResponse.Status == "disconnected" { e.disconnect() continue } if notifyResponse.Action == "logout" { log.Warn("EventStream > logged out") e.disconnect() continue } e.transactionSubscribersMutex.RLock() subscriber, ok := e.transactionSubscribers[notifyResponse.TransId] e.transactionSubscribersMutex.RUnlock() if ok { subscriber <- ¬ifyResponse e.removeTransactionSubscription(notifyResponse.TransId) } e.resourceSubscribersMutex.RLock() subscribers, ok := e.resourceSubscribers[notifyResponse.Resource] e.resourceSubscribersMutex.RUnlock() if ok { for _, s := range subscribers { s <- ¬ifyResponse } } case <-e.disconnectedChan: connectedChan <- false return } } }() return connectedChan, nil } func (e *eventStream) subscribeTransaction(transId string) chan *EventStreamResponse { msgchan := make(chan *EventStreamResponse) e.transactionSubscribersMutex.Lock() e.transactionSubscribers[transId] = msgchan e.transactionSubscribersMutex.Unlock() return msgchan } func (e *eventStream) subscribeResource(resource string) chan *EventStreamResponse { msgchan := make(chan *EventStreamResponse) e.resourceSubscribersMutex.Lock() e.resourceSubscribers[resource] = append(e.resourceSubscribers[resource], msgchan) e.resourceSubscribersMutex.Unlock() return msgchan } func (e *eventStream) removeTransactionSubscription(transId string) { e.transactionSubscribersMutex.Lock() defer e.transactionSubscribersMutex.Unlock() subscriber, ok := e.transactionSubscribers[transId] if ok { close(subscriber) delete(e.transactionSubscribers, transId) } } func (e *eventStream) clearResourceSubscriptions() { e.resourceSubscribersMutex.Lock() defer e.resourceSubscribersMutex.Unlock() for _, subscribers := range e.resourceSubscribers { for _, subscriber := range subscribers { close(subscriber) } } e.resourceSubscribers = make(map[string][]chan *EventStreamResponse) } func (e *eventStream) clearTransactionSubscriptions() { e.transactionSubscribersMutex.Lock() defer e.transactionSubscribersMutex.Unlock() for _, subscriber := range e.transactionSubscribers { close(subscriber) } e.transactionSubscribers = make(map[string]chan *EventStreamResponse) }