/* * Copyright (c) 2018 Jeffrey Walter * * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to the following conditions: * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the * Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ package arlo import ( "bytes" "context" "encoding/json" "fmt" "net/http" "sync" "github.com/r3labs/sse" log "github.com/sirupsen/logrus" ) type eventStream struct { SSEClient *sse.Client Events chan *sse.Event Error chan error Verbose bool DisconnectedChan chan interface{} once *sync.Once subscribers map[string]chan *EventStreamResponse subscribersMutex sync.RWMutex errorsubsribers map[string]chan error errorMutex sync.RWMutex } func newEventStream(url string, client *http.Client) *eventStream { e := &eventStream{ Events: make(chan *sse.Event), subscribers: make(map[string]chan *EventStreamResponse), subscribersMutex: sync.RWMutex{}, errorsubsribers: make(map[string]chan error), errorMutex: sync.RWMutex{}, DisconnectedChan: make(chan interface{}), once: new(sync.Once), } SSEClient := sse.NewClient(url) SSEClient.Connection = client SSEClient.OnDisconnect(func(c *sse.Client) { e.disconnect() }) e.SSEClient = SSEClient return e } func (e *eventStream) disconnect() { e.once.Do(func() { close(e.DisconnectedChan) }) } func (e *eventStream) listen(ctx context.Context) (chan bool, error) { connectedChan := make(chan bool) err := e.SSEClient.SubscribeChanRaw(e.Events) if err != nil { return nil, fmt.Errorf("failed to subscribe to seeclient") } go func() { for { select { case <-ctx.Done(): e.disconnect() return case event, ok := <-e.Events: if !ok { return } if event == nil || event.Data == nil { log.Warn("EventStream > nil event or nil data in event") continue } fmt.Printf("DATA : %s\n", event.Data) var notifyResponse EventStreamResponse err := json.NewDecoder(bytes.NewBuffer(event.Data)).Decode(¬ifyResponse) if err != nil { log.Warnf("EventStream > failed to decode event: %s", event.Data) continue } bytesProperties, err := json.Marshal(notifyResponse.EventStreamPayload.Properties) if err != nil { log.Warnf("EventStream > failed to marshal raw properties: %s", event.Data) continue } notifyResponse.RawProperties = bytesProperties if notifyResponse.Status == "connected" { connectedChan <- true continue } if notifyResponse.Status == "disconnected" { e.disconnect() continue } if notifyResponse.Action == "logout" { log.Warn("EventStream > logged out") e.disconnect() continue } e.subscribersMutex.RLock() subscriber, ok := e.subscribers[notifyResponse.TransId] e.subscribersMutex.RUnlock() if ok { subscriber <- ¬ifyResponse } case <-e.DisconnectedChan: connectedChan <- false return } } }() return connectedChan, nil } func (e *eventStream) unsubscribe(transId string) { e.subscribersMutex.Lock() if c, ok := e.subscribers[transId]; ok { close(c) delete(e.subscribers, transId) } e.subscribersMutex.Unlock() e.errorMutex.Lock() if c, ok := e.errorsubsribers[transId]; ok { close(c) delete(e.errorsubsribers, transId) } e.errorMutex.Unlock() } func (e *eventStream) subscribe(transId string, subscriber chan *EventStreamResponse, errorChan chan error) { e.subscribersMutex.Lock() e.subscribers[transId] = subscriber e.subscribersMutex.Unlock() e.errorMutex.Lock() e.errorsubsribers[transId] = errorChan e.errorMutex.Unlock() }