Added basic support for unsubscribing from the event stream. Also refactored the eventstream code to make it more dry and robust.

This commit is contained in:
Jeff Walter 2018-09-19 16:35:05 -05:00
parent 01c4316571
commit e4006df88a
3 changed files with 130 additions and 66 deletions

14
arlo.go
View File

@ -94,17 +94,19 @@ func (a *Arlo) GetDevices() (Devices, error) {
deviceResponse.Data[i].arlo = a
}
// Unsubscribe all of the basestations to the EventStream.
for i := range a.Basestations {
if err := a.Basestations[i].Unsubscribe(); err != nil {
return nil, errors.WithMessage(err, "failed to get devices")
}
}
// Cache the devices as their respective types.
a.Cameras = deviceResponse.Data.GetCameras()
a.Basestations = deviceResponse.Data.GetBasestations()
// Connect each basestation to the EventStream.
// Subscribe each basestation to the EventStream.
for i := range a.Basestations {
if err := a.Basestations[i].Unsubscribe(); err != nil {
return nil, errors.WithMessage(err, "failed to get devices")
}
if err := a.Basestations[i].Subscribe(); err != nil {
return nil, errors.WithMessage(err, "failed to get devices")
}

View File

@ -2,6 +2,8 @@ package arlo
import (
"fmt"
"github.com/pkg/errors"
)
type BaseStationMetadata struct {
@ -40,67 +42,120 @@ type Basestations []Basestation
func (b *Basestation) Subscribe() error {
b.eventStream = NewEventStream(BaseUrl+fmt.Sprintf(SubscribeUri, b.arlo.Account.Token), b.arlo.client.HttpClient)
b.eventStream.Listen()
connected := b.eventStream.Listen()
body := Payload{
outoffor:
for {
// TODO: Need to add a timeout here.
// We blocking here because we can't really do anything with the event stream until we're connected.
// Once we have confirmation that we're connected to the event stream, we will "subscribe" to events.
select {
case b.eventStream.Connected = <-connected:
if b.eventStream.Connected {
break outoffor
} else {
// TODO: What do we do if Connected is false? Probably need retry logic here.
break
}
case <-b.eventStream.Close:
return errors.New("failed to subscribe to the event stream")
}
}
// This is a crude (temporary?) way to monitor the connection. It's late and I'm tired, so this will probably go away.
go func() {
outoffor:
for {
select {
case b.eventStream.Connected = <-connected:
// TODO: What do we do if Connected is false? Probably need retry logic here.
break outoffor
case <-b.eventStream.Close:
// TODO: Figure out what to do here if the eventStream is closed. (Panic?)
return
}
}
}()
payload := Payload{
Action: "set",
Resource: fmt.Sprintf("subscriptions/%s_%s", b.UserId, TransIdPrefix),
PublishResponse: false,
Properties: map[string][1]string{"devices": {b.DeviceId}},
TransId: genTransId(),
From: fmt.Sprintf("%s_%s", b.UserId, TransIdPrefix),
To: b.DeviceId,
}
resp, err := b.arlo.post(fmt.Sprintf(NotifyUri, b.DeviceId), b.XCloudId, body, nil)
return checkRequest(*resp, err, "failed to subscribe to the event stream")
if _, err := b.makeEventStreamRequest(payload, "failed to subscribe to the event stream"); err != nil {
return err
}
return nil
}
func (b *Basestation) Unsubscribe() error {
// TODO: Close channel to stop EventStream.
//return errors.New("not implemented")
if b.eventStream != nil {
close(b.eventStream.Close)
}
return nil
}
func (b *Basestation) IsConnected() error {
if !b.eventStream.Connected {
return errors.New("basestation not connected to event stream")
}
return nil
}
func (b *Basestation) GetState() (*EventStreamResponse, error) {
transId := genTransId()
b.eventStream.Subscriptions[transId] = make(chan *EventStreamResponse)
body := Payload{
payload := Payload{
Action: "get",
Resource: "basestation",
PublishResponse: false,
TransId: transId,
From: fmt.Sprintf("%s_%s", b.UserId, TransIdPrefix),
To: b.DeviceId,
}
resp, err := b.arlo.post(fmt.Sprintf(NotifyUri, b.DeviceId), b.XCloudId, body, nil)
if err := checkRequest(*resp, err, "failed to get basestation state"); err != nil {
return nil, err
}
return <-b.eventStream.Subscriptions[transId], nil
return b.makeEventStreamRequest(payload, "failed to get basestation state")
}
func (b *Basestation) GetAssociatedCamerasState() (*EventStreamResponse, error) {
transId := genTransId()
b.eventStream.Subscriptions[transId] = make(chan *EventStreamResponse)
body := Payload{
payload := Payload{
Action: "get",
Resource: "cameras",
PublishResponse: false,
TransId: transId,
From: fmt.Sprintf("%s_%s", b.UserId, TransIdPrefix),
To: b.DeviceId,
}
resp, err := b.arlo.post(fmt.Sprintf(NotifyUri, b.DeviceId), b.XCloudId, body, nil)
if err := checkRequest(*resp, err, "failed to get camera state"); err != nil {
return b.makeEventStreamRequest(payload, "failed to get associated cameras state")
}
func (b *Basestation) makeEventStreamRequest(payload Payload, msg string) (*EventStreamResponse, error) {
transId := genTransId()
payload.TransId = transId
if err := b.IsConnected(); err != nil {
return nil, errors.WithMessage(err, msg)
}
b.eventStream.Subscriptions[transId] = make(chan *EventStreamResponse)
defer close(b.eventStream.Subscriptions[transId])
resp, err := b.arlo.post(fmt.Sprintf(NotifyUri, b.DeviceId), b.XCloudId, payload, nil)
if err := checkRequest(*resp, err, msg); err != nil {
return nil, err
}
return <-b.eventStream.Subscriptions[transId], nil
select {
case eventStreamResponse := <-b.eventStream.Subscriptions[transId]:
return eventStreamResponse, nil
case err = <-b.eventStream.Error:
return nil, errors.Wrap(err, "failed to get basestation")
case <-b.eventStream.Close:
return nil, errors.New("event stream was closed before response was read")
}
}

View File

@ -13,17 +13,17 @@ import (
)
var (
FAILED_TO_PUBLISH = errors.New("Failed to publish")
FAILED_TO_DECODE_JSON = errors.New("Failed to decode JSON")
FAILED_TO_SUBSCRIBE = errors.New("Failed to subscribe to SSEClient")
FAILED_TO_PUBLISH = errors.New("failed to publish")
FAILED_TO_DECODE_JSON = errors.New("failed to decode json")
FAILED_TO_SUBSCRIBE = errors.New("failed to subscribe to seeclient")
)
type EventStream struct {
SSEClient *sse.Client
Subscriptions map[string]chan *EventStreamResponse
Events chan *sse.Event
ErrorChan chan error
Registered bool
Error chan error
Close chan interface{}
Connected bool
Verbose bool
@ -39,51 +39,58 @@ func NewEventStream(url string, client *http.Client) *EventStream {
SSEClient: SSEClient,
Events: make(chan *sse.Event),
Subscriptions: make(map[string]chan *EventStreamResponse),
ErrorChan: make(chan error, 1),
Error: make(chan error),
Close: make(chan interface{}),
}
}
func (e *EventStream) Listen() {
func (e *EventStream) Listen() (connected chan bool) {
connected = make(chan bool)
go func() {
err := e.SSEClient.SubscribeChanRaw(e.Events)
if err != nil {
fmt.Println(FAILED_TO_SUBSCRIBE)
e.ErrorChan <- FAILED_TO_SUBSCRIBE
e.Error <- FAILED_TO_SUBSCRIBE
}
}()
go func() {
for event := range e.Events {
/*
fmt.Println("Got event message.")
fmt.Printf("EVENT: %s\n", event.Event)
fmt.Printf("DATA: %s\n", event.Data)
*/
for {
select {
case event := <-e.Events:
/*
fmt.Println("Got event message.")
fmt.Printf("EVENT: %s\n", event.Event)
fmt.Printf("DATA: %s\n", event.Data)
*/
if event.Data != nil {
notifyResponse := &EventStreamResponse{}
b := bytes.NewBuffer(event.Data)
err := json.NewDecoder(b).Decode(notifyResponse)
if err != nil {
e.ErrorChan <- FAILED_TO_DECODE_JSON
break
}
if event.Data != nil {
notifyResponse := &EventStreamResponse{}
b := bytes.NewBuffer(event.Data)
err := json.NewDecoder(b).Decode(notifyResponse)
if err != nil {
e.Error <- FAILED_TO_DECODE_JSON
break
}
if notifyResponse.Status == "connected" {
e.Connected = true
} else if notifyResponse.Status == "disconnected" {
e.Connected = false
} else {
if subscriber, ok := e.Subscriptions[notifyResponse.TransId]; ok {
e.Lock()
subscriber <- notifyResponse
close(subscriber)
delete(e.Subscriptions, notifyResponse.TransId)
e.Unlock()
if notifyResponse.Status == "connected" {
connected <- true
} else if notifyResponse.Status == "disconnected" {
connected <- false
} else {
if subscriber, ok := e.Subscriptions[notifyResponse.TransId]; ok {
e.Lock()
subscriber <- notifyResponse
e.Unlock()
}
}
}
case <-e.Close:
connected <- false
return
}
}
}()
return connected
}