package arlo import ( "bytes" "context" "encoding/json" "fmt" "net/http" "sync" "github.com/r3labs/sse" log "github.com/sirupsen/logrus" ) type eventStream struct { SSEClient *sse.Client Events chan *sse.Event Error chan error Verbose bool DisconnectedChan chan interface{} once *sync.Once subscribers map[string]chan *EventStreamResponse subscribersMutex sync.RWMutex errorsubsribers map[string]chan error errorMutex sync.RWMutex } func newEventStream(url string, client *http.Client) *eventStream { e := &eventStream{ Events: make(chan *sse.Event), subscribers: make(map[string]chan *EventStreamResponse), subscribersMutex: sync.RWMutex{}, errorsubsribers: make(map[string]chan error), errorMutex: sync.RWMutex{}, DisconnectedChan: make(chan interface{}), once: new(sync.Once), } SSEClient := sse.NewClient(url) SSEClient.Connection = client SSEClient.OnDisconnect(func(c *sse.Client) { e.disconnect() }) e.SSEClient = SSEClient return e } func (e *eventStream) disconnect() { e.once.Do(func() { close(e.DisconnectedChan) }) } func (e *eventStream) listen(ctx context.Context) (chan bool, error) { connectedChan := make(chan bool) err := e.SSEClient.SubscribeChanRaw(e.Events) if err != nil { return nil, fmt.Errorf("failed to subscribe to seeclient") } go func() { for { select { case <-ctx.Done(): e.disconnect() return case event, ok := <-e.Events: if !ok { return } if event == nil || event.Data == nil { log.Warn("EventStream > nil event or nil data in event") continue } fmt.Printf("DATA : %s\n", event.Data) var notifyResponse EventStreamResponse err := json.NewDecoder(bytes.NewBuffer(event.Data)).Decode(¬ifyResponse) if err != nil { log.Warnf("EventStream > failed to decode event: %s", event.Data) continue } bytesProperties, err := json.Marshal(notifyResponse.EventStreamPayload.Properties) if err != nil { log.Warnf("EventStream > failed to marshal raw properties: %s", event.Data) continue } notifyResponse.RawProperties = bytesProperties if notifyResponse.Status == "connected" { connectedChan <- true continue } if notifyResponse.Status == "disconnected" { e.disconnect() continue } if notifyResponse.Action == "logout" { log.Warn("EventStream > logged out") e.disconnect() continue } e.subscribersMutex.RLock() subscriber, ok := e.subscribers[notifyResponse.TransId] e.subscribersMutex.RUnlock() if ok { subscriber <- ¬ifyResponse } case <-e.DisconnectedChan: connectedChan <- false return } } }() return connectedChan, nil } func (e *eventStream) unsubscribe(transId string) { e.subscribersMutex.Lock() if c, ok := e.subscribers[transId]; ok { close(c) delete(e.subscribers, transId) } e.subscribersMutex.Unlock() e.errorMutex.Lock() if c, ok := e.errorsubsribers[transId]; ok { close(c) delete(e.errorsubsribers, transId) } e.errorMutex.Unlock() } func (e *eventStream) subscribe(transId string, subscriber chan *EventStreamResponse, errorChan chan error) { e.subscribersMutex.Lock() e.subscribers[transId] = subscriber e.subscribersMutex.Unlock() e.errorMutex.Lock() e.errorsubsribers[transId] = errorChan e.errorMutex.Unlock() }