From e4006df88a868664d7f35351139b4e59785f897f Mon Sep 17 00:00:00 2001 From: Jeff Walter Date: Wed, 19 Sep 2018 16:35:05 -0500 Subject: [PATCH] Added basic support for unsubscribing from the event stream. Also refactored the eventstream code to make it more dry and robust. --- arlo.go | 14 ++++--- basestation.go | 105 ++++++++++++++++++++++++++++++++++++----------- events_stream.go | 77 ++++++++++++++++++---------------- 3 files changed, 130 insertions(+), 66 deletions(-) diff --git a/arlo.go b/arlo.go index c4adbd4..2d083b1 100644 --- a/arlo.go +++ b/arlo.go @@ -94,17 +94,19 @@ func (a *Arlo) GetDevices() (Devices, error) { deviceResponse.Data[i].arlo = a } + // Unsubscribe all of the basestations to the EventStream. + for i := range a.Basestations { + if err := a.Basestations[i].Unsubscribe(); err != nil { + return nil, errors.WithMessage(err, "failed to get devices") + } + } + // Cache the devices as their respective types. a.Cameras = deviceResponse.Data.GetCameras() a.Basestations = deviceResponse.Data.GetBasestations() - // Connect each basestation to the EventStream. + // Subscribe each basestation to the EventStream. for i := range a.Basestations { - - if err := a.Basestations[i].Unsubscribe(); err != nil { - return nil, errors.WithMessage(err, "failed to get devices") - } - if err := a.Basestations[i].Subscribe(); err != nil { return nil, errors.WithMessage(err, "failed to get devices") } diff --git a/basestation.go b/basestation.go index dd52b01..73a47cd 100644 --- a/basestation.go +++ b/basestation.go @@ -2,6 +2,8 @@ package arlo import ( "fmt" + + "github.com/pkg/errors" ) type BaseStationMetadata struct { @@ -40,67 +42,120 @@ type Basestations []Basestation func (b *Basestation) Subscribe() error { b.eventStream = NewEventStream(BaseUrl+fmt.Sprintf(SubscribeUri, b.arlo.Account.Token), b.arlo.client.HttpClient) - b.eventStream.Listen() + connected := b.eventStream.Listen() - body := Payload{ +outoffor: + for { + // TODO: Need to add a timeout here. + // We blocking here because we can't really do anything with the event stream until we're connected. + // Once we have confirmation that we're connected to the event stream, we will "subscribe" to events. + select { + case b.eventStream.Connected = <-connected: + if b.eventStream.Connected { + break outoffor + } else { + // TODO: What do we do if Connected is false? Probably need retry logic here. + break + } + case <-b.eventStream.Close: + return errors.New("failed to subscribe to the event stream") + } + } + + // This is a crude (temporary?) way to monitor the connection. It's late and I'm tired, so this will probably go away. + go func() { + outoffor: + for { + select { + case b.eventStream.Connected = <-connected: + // TODO: What do we do if Connected is false? Probably need retry logic here. + break outoffor + case <-b.eventStream.Close: + // TODO: Figure out what to do here if the eventStream is closed. (Panic?) + return + } + } + }() + + payload := Payload{ Action: "set", Resource: fmt.Sprintf("subscriptions/%s_%s", b.UserId, TransIdPrefix), PublishResponse: false, Properties: map[string][1]string{"devices": {b.DeviceId}}, - TransId: genTransId(), From: fmt.Sprintf("%s_%s", b.UserId, TransIdPrefix), To: b.DeviceId, } - resp, err := b.arlo.post(fmt.Sprintf(NotifyUri, b.DeviceId), b.XCloudId, body, nil) - return checkRequest(*resp, err, "failed to subscribe to the event stream") + + if _, err := b.makeEventStreamRequest(payload, "failed to subscribe to the event stream"); err != nil { + return err + } + + return nil } func (b *Basestation) Unsubscribe() error { // TODO: Close channel to stop EventStream. //return errors.New("not implemented") + if b.eventStream != nil { + close(b.eventStream.Close) + } + return nil +} + +func (b *Basestation) IsConnected() error { + if !b.eventStream.Connected { + return errors.New("basestation not connected to event stream") + } return nil } func (b *Basestation) GetState() (*EventStreamResponse, error) { - transId := genTransId() - b.eventStream.Subscriptions[transId] = make(chan *EventStreamResponse) - - body := Payload{ + payload := Payload{ Action: "get", Resource: "basestation", PublishResponse: false, - TransId: transId, From: fmt.Sprintf("%s_%s", b.UserId, TransIdPrefix), To: b.DeviceId, } - resp, err := b.arlo.post(fmt.Sprintf(NotifyUri, b.DeviceId), b.XCloudId, body, nil) - if err := checkRequest(*resp, err, "failed to get basestation state"); err != nil { - return nil, err - } - - return <-b.eventStream.Subscriptions[transId], nil + return b.makeEventStreamRequest(payload, "failed to get basestation state") } func (b *Basestation) GetAssociatedCamerasState() (*EventStreamResponse, error) { - transId := genTransId() - - b.eventStream.Subscriptions[transId] = make(chan *EventStreamResponse) - - body := Payload{ + payload := Payload{ Action: "get", Resource: "cameras", PublishResponse: false, - TransId: transId, From: fmt.Sprintf("%s_%s", b.UserId, TransIdPrefix), To: b.DeviceId, } - resp, err := b.arlo.post(fmt.Sprintf(NotifyUri, b.DeviceId), b.XCloudId, body, nil) - if err := checkRequest(*resp, err, "failed to get camera state"); err != nil { + return b.makeEventStreamRequest(payload, "failed to get associated cameras state") +} + +func (b *Basestation) makeEventStreamRequest(payload Payload, msg string) (*EventStreamResponse, error) { + transId := genTransId() + payload.TransId = transId + + if err := b.IsConnected(); err != nil { + return nil, errors.WithMessage(err, msg) + } + + b.eventStream.Subscriptions[transId] = make(chan *EventStreamResponse) + defer close(b.eventStream.Subscriptions[transId]) + + resp, err := b.arlo.post(fmt.Sprintf(NotifyUri, b.DeviceId), b.XCloudId, payload, nil) + if err := checkRequest(*resp, err, msg); err != nil { return nil, err } - return <-b.eventStream.Subscriptions[transId], nil + select { + case eventStreamResponse := <-b.eventStream.Subscriptions[transId]: + return eventStreamResponse, nil + case err = <-b.eventStream.Error: + return nil, errors.Wrap(err, "failed to get basestation") + case <-b.eventStream.Close: + return nil, errors.New("event stream was closed before response was read") + } } diff --git a/events_stream.go b/events_stream.go index c4abf96..8efcc51 100644 --- a/events_stream.go +++ b/events_stream.go @@ -13,17 +13,17 @@ import ( ) var ( - FAILED_TO_PUBLISH = errors.New("Failed to publish") - FAILED_TO_DECODE_JSON = errors.New("Failed to decode JSON") - FAILED_TO_SUBSCRIBE = errors.New("Failed to subscribe to SSEClient") + FAILED_TO_PUBLISH = errors.New("failed to publish") + FAILED_TO_DECODE_JSON = errors.New("failed to decode json") + FAILED_TO_SUBSCRIBE = errors.New("failed to subscribe to seeclient") ) type EventStream struct { SSEClient *sse.Client Subscriptions map[string]chan *EventStreamResponse Events chan *sse.Event - ErrorChan chan error - Registered bool + Error chan error + Close chan interface{} Connected bool Verbose bool @@ -39,51 +39,58 @@ func NewEventStream(url string, client *http.Client) *EventStream { SSEClient: SSEClient, Events: make(chan *sse.Event), Subscriptions: make(map[string]chan *EventStreamResponse), - ErrorChan: make(chan error, 1), + Error: make(chan error), + Close: make(chan interface{}), } } -func (e *EventStream) Listen() { +func (e *EventStream) Listen() (connected chan bool) { + + connected = make(chan bool) go func() { err := e.SSEClient.SubscribeChanRaw(e.Events) if err != nil { fmt.Println(FAILED_TO_SUBSCRIBE) - e.ErrorChan <- FAILED_TO_SUBSCRIBE + e.Error <- FAILED_TO_SUBSCRIBE } - }() - go func() { - for event := range e.Events { - /* - fmt.Println("Got event message.") - fmt.Printf("EVENT: %s\n", event.Event) - fmt.Printf("DATA: %s\n", event.Data) - */ + for { + select { + case event := <-e.Events: + /* + fmt.Println("Got event message.") + fmt.Printf("EVENT: %s\n", event.Event) + fmt.Printf("DATA: %s\n", event.Data) + */ - if event.Data != nil { - notifyResponse := &EventStreamResponse{} - b := bytes.NewBuffer(event.Data) - err := json.NewDecoder(b).Decode(notifyResponse) - if err != nil { - e.ErrorChan <- FAILED_TO_DECODE_JSON - break - } + if event.Data != nil { + notifyResponse := &EventStreamResponse{} + b := bytes.NewBuffer(event.Data) + err := json.NewDecoder(b).Decode(notifyResponse) + if err != nil { + e.Error <- FAILED_TO_DECODE_JSON + break + } - if notifyResponse.Status == "connected" { - e.Connected = true - } else if notifyResponse.Status == "disconnected" { - e.Connected = false - } else { - if subscriber, ok := e.Subscriptions[notifyResponse.TransId]; ok { - e.Lock() - subscriber <- notifyResponse - close(subscriber) - delete(e.Subscriptions, notifyResponse.TransId) - e.Unlock() + if notifyResponse.Status == "connected" { + connected <- true + } else if notifyResponse.Status == "disconnected" { + connected <- false + } else { + if subscriber, ok := e.Subscriptions[notifyResponse.TransId]; ok { + e.Lock() + subscriber <- notifyResponse + e.Unlock() + } } } + case <-e.Close: + connected <- false + return } } }() + + return connected }