Fixed race conditions, refactored the basestation and eventstream code to make it safer.

This commit is contained in:
Jeff Walter 2018-09-22 09:15:22 -05:00
parent 049f3c5652
commit 1d2ce6546c
8 changed files with 139 additions and 75 deletions

View File

@ -26,8 +26,8 @@
name = "github.com/mitchellh/mapstructure"
[[constraint]]
branch = "master"
name = "github.com/pkg/errors"
version = "0.8.0"
[[constraint]]
branch = "master"

11
arlo.go
View File

@ -1,6 +1,8 @@
package arlo
import (
"sync"
"github.com/jeffreydwalter/arlo-golang/internal/request"
"github.com/pkg/errors"
@ -13,6 +15,7 @@ type Arlo struct {
Account Account
Basestations Basestations
Cameras Cameras
rwmutex sync.RWMutex
}
func newArlo(user string, pass string) (arlo *Arlo) {
@ -96,18 +99,20 @@ func (a *Arlo) GetDevices() (devices Devices, err error) {
deviceResponse.Data[i].arlo = a
}
// Unsubscribe all of the basestations from the EventStream.
// disconnect all of the basestations from the EventStream.
for i := range a.Basestations {
if err := a.Basestations[i].Unsubscribe(); err != nil {
if err := a.Basestations[i].Disconnect(); err != nil {
return nil, errors.WithMessage(err, "failed to get devices")
}
}
a.rwmutex.Lock()
// Cache the devices as their respective types.
a.Cameras = deviceResponse.Data.GetCameras()
a.Basestations = deviceResponse.Data.GetBasestations()
a.rwmutex.Unlock()
// Subscribe each basestation to the EventStream.
// subscribe each basestation to the EventStream.
for i := range a.Basestations {
if err := a.Basestations[i].Subscribe(); err != nil {
return nil, errors.WithMessage(err, "failed to get devices")

View File

@ -8,49 +8,18 @@ import (
)
const eventStreamTimeout = 10 * time.Second
const pingTime = 30 * time.Second
// A Basestation is a Device that's not type "camera" (basestation, arloq, arloqs, etc.).
// This type is here just for semantics. Some methods explicitly require a device of a certain type.
type Basestation struct {
Device
eventStream *EventStream
eventStream *eventStream
}
// Basestations is an array of Basestation objects.
type Basestations []Basestation
func (b *Basestation) makeEventStreamRequest(payload EventStreamPayload, msg string) (response *EventStreamResponse, err error) {
transId := genTransId()
payload.TransId = transId
if err := b.IsConnected(); err != nil {
return nil, errors.WithMessage(err, msg)
}
b.eventStream.Subscriptions[transId] = make(chan *EventStreamResponse)
defer close(b.eventStream.Subscriptions[transId])
if err := b.NotifyEventStream(payload, msg); err != nil {
return nil, err
}
timer := time.NewTimer(eventStreamTimeout)
defer timer.Stop()
select {
case <-timer.C:
err = fmt.Errorf("event stream response timed out after %.0f second", eventStreamTimeout.Seconds())
return nil, errors.WithMessage(err, msg)
case response := <-b.eventStream.Subscriptions[transId]:
return response, nil
case err = <-b.eventStream.Error:
return nil, errors.Wrap(err, msg)
case <-b.eventStream.Close:
err = errors.New("event stream was closed before response was read")
return nil, errors.WithMessage(err, msg)
}
}
// Find returns a basestation with the device id passed in.
func (bs *Basestations) Find(deviceId string) *Basestation {
for _, b := range *bs {
@ -62,30 +31,75 @@ func (bs *Basestations) Find(deviceId string) *Basestation {
return nil
}
func (b *Basestation) IsConnected() error {
if !b.eventStream.Connected {
return errors.New("basestation not connected to event stream")
// makeEventStreamRequest is a helper function sets up a response channel, sends a message to the event stream, and blocks waiting for the response.
func (b *Basestation) makeEventStreamRequest(payload EventStreamPayload, msg string) (response *EventStreamResponse, err error) {
transId := genTransId()
payload.TransId = transId
if err := b.IsConnected(); err != nil {
return nil, errors.WithMessage(err, msg)
}
subscriber := make(subscriber)
// Add the response channel to the event stream queue so the response can be written to it.
b.eventStream.subscribe(transId, subscriber)
// Make sure we close and remove the response channel before returning.
defer b.eventStream.unsubscribe(transId)
// Send the payload to the event stream.
if err := b.NotifyEventStream(payload, msg); err != nil {
return nil, err
}
timer := time.NewTimer(eventStreamTimeout)
defer timer.Stop()
// Wait for the response to come back from the event stream on the response channel.
select {
// If we get a response, return it to the caller.
case response := <-subscriber:
return response, nil
case err = <-b.eventStream.Error:
return nil, errors.Wrap(err, msg)
// If the event stream is closed, return an error about it.
case <-b.eventStream.Disconnected:
err = errors.New("event stream was closed before response was read")
return nil, errors.WithMessage(err, msg)
// If we timeout, return an error about it.
case <-timer.C:
err = fmt.Errorf("event stream response timed out after %.0f second", eventStreamTimeout.Seconds())
return nil, errors.WithMessage(err, msg)
}
}
func (b *Basestation) IsConnected() error {
// If the event stream is closed, return an error about it.
select {
case <-b.eventStream.Disconnected:
return errors.New("basestation not connected to event stream")
default:
return nil
}
return nil
}
func (b *Basestation) Subscribe() error {
b.eventStream = NewEventStream(BaseUrl+fmt.Sprintf(SubscribeUri, b.arlo.Account.Token), b.arlo.client.HttpClient)
connected := b.eventStream.Listen()
b.eventStream = newEventStream(BaseUrl+fmt.Sprintf(SubscribeUri, b.arlo.Account.Token), b.arlo.client.HttpClient)
forLoop:
for {
// We blocking here because we can't really do anything with the event stream until we're connected.
// Once we have confirmation that we're connected to the event stream, we will "subscribe" to events.
select {
case b.eventStream.Connected = <-connected:
if b.eventStream.Connected {
case connected := <-b.eventStream.listen():
if connected {
break forLoop
} else {
return errors.New("failed to subscribe to the event stream")
}
case <-b.eventStream.Close:
return errors.New("failed to subscribe to the event stream")
case <-b.eventStream.Disconnected:
err := errors.New("event stream was closed")
return errors.WithMessage(err, "failed to subscribe to the event stream")
}
}
@ -96,9 +110,9 @@ forLoop:
// The Arlo event stream requires a "ping" every 30s.
go func() {
for {
time.Sleep(30 * time.Second)
time.Sleep(pingTime)
if err := b.Ping(); err != nil {
b.Unsubscribe()
b.Disconnect()
break
}
}
@ -107,10 +121,10 @@ forLoop:
return nil
}
func (b *Basestation) Unsubscribe() error {
// Close channel to stop EventStream.
func (b *Basestation) Disconnect() error {
// disconnect channel to stop event stream.
if b.eventStream != nil {
close(b.eventStream.Close)
b.eventStream.disconnect()
}
return nil
}

View File

@ -282,6 +282,14 @@ func (c *Camera) DisableAudioAlerts(sensitivity int) (response *EventStreamRespo
return b.makeEventStreamRequest(payload, msg)
}
// PushToTalk starts a push-to-talk session.
// FIXME: This feature requires more API calls to make it actually work, and I haven't figure out how to fully implement it.
// It appears that the audio stream is Real-Time Transport Protocol (RTP), which requires a player (ffmpeg?) to consume the stream.
func (c *Camera) PushToTalk() error {
resp, err := c.arlo.get(fmt.Sprintf(PushToTalkUri, c.UniqueId), c.XCloudId, nil)
return checkRequest(resp, err, "failed to enable push to talk")
}
// action: disabled OR recordSnapshot OR recordVideo
func (c *Camera) SetAlertNotificationMethods(action string, email, push bool) (response *EventStreamResponse, err error) {
payload := EventStreamPayload{

View File

@ -1,8 +1,6 @@
package arlo
const (
TransIdPrefix = "web"
BaseUrl = "https://arlo.netgear.com/hmsweb"
LoginUri = "/login/v2"
LogoutUri = "/logout"
@ -13,6 +11,7 @@ const (
ServiceLevelUri = "/users/serviceLevel"
OffersUri = "/users/payment/offers"
UserProfileUri = "/users/profile"
PushToTalkUri = "/users/devices/%s/pushtotalk"
UserChangePasswordUri = "/users/changePassword"
UserSessionUri = "/users/session"
UserFriendsUri = "/users/friends"
@ -32,4 +31,5 @@ const (
DeviceTypeBasestation = "basestation"
DeviceTypeCamera = "camera"
DeviceTypeArloQ = "arloq"
TransIdPrefix = "web"
)

View File

@ -17,33 +17,48 @@ var (
FAILED_TO_SUBSCRIBE = errors.New("failed to subscribe to seeclient")
)
type EventStream struct {
SSEClient *sse.Client
Subscriptions map[string]chan *EventStreamResponse
Events chan *sse.Event
Error chan error
Close chan interface{}
Connected bool
Verbose bool
type subscriber chan *EventStreamResponse
sync.Mutex
type subscribers map[string]subscriber
type subscriptions struct {
subscribers
rwmutex sync.RWMutex
}
func NewEventStream(url string, client *http.Client) *EventStream {
type eventStream struct {
SSEClient *sse.Client
Events chan *sse.Event
Error chan error
Verbose bool
Disconnected chan interface{}
once *sync.Once
subscriptions
}
func newEventStream(url string, client *http.Client) *eventStream {
SSEClient := sse.NewClient(url)
SSEClient.Connection = client
return &EventStream{
return &eventStream{
SSEClient: SSEClient,
Events: make(chan *sse.Event),
Subscriptions: make(map[string]chan *EventStreamResponse),
subscriptions: subscriptions{make(map[string]subscriber), sync.RWMutex{}},
Error: make(chan error),
Close: make(chan interface{}),
Disconnected: make(chan interface{}),
once: new(sync.Once),
}
}
func (e *EventStream) Listen() (connected chan bool) {
func (e *eventStream) disconnect() {
e.once.Do(func() {
close(e.Disconnected)
})
}
func (e *eventStream) listen() (connected chan bool) {
connected = make(chan bool)
@ -74,16 +89,17 @@ func (e *EventStream) Listen() (connected chan bool) {
if notifyResponse.Status == "connected" {
connected <- true
} else if notifyResponse.Status == "disconnected" {
connected <- false
e.disconnect()
} else {
if subscriber, ok := e.Subscriptions[notifyResponse.TransId]; ok {
e.Lock()
e.subscriptions.rwmutex.RLock()
subscriber, ok := e.subscribers[notifyResponse.TransId]
e.subscriptions.rwmutex.RUnlock()
if ok {
subscriber <- notifyResponse
e.Unlock()
}
}
}
case <-e.Close:
case <-e.Disconnected:
connected <- false
return
}
@ -92,3 +108,19 @@ func (e *EventStream) Listen() (connected chan bool) {
return connected
}
func (s *subscriptions) unsubscribe(transId string) {
s.rwmutex.Lock()
defer s.rwmutex.Unlock()
if _, ok := s.subscribers[transId]; ok {
close(s.subscribers[transId])
delete(s.subscribers, transId)
}
}
func (s *subscriptions) subscribe(transId string, subscriber subscriber) {
s.rwmutex.Lock()
s.subscribers[transId] = subscriber
s.rwmutex.Unlock()
}

View File

@ -4,7 +4,6 @@ import (
"bytes"
"encoding/json"
"io"
"log"
"net/http"
"net/http/cookiejar"
"net/url"
@ -81,7 +80,7 @@ func (c *Client) newRequest(method string, uri string, body interface{}, header
return nil, errors.Wrap(err, "failed to create request object")
}
}
log.Printf("\n\nBODY (%s): %s\n\n", uri, buf)
// log.Printf("\n\nBODY (%s): %s\n\n", uri, buf)
u := c.BaseURL.String() + uri
req, err := http.NewRequest(method, u, buf)

View File

@ -60,7 +60,9 @@ func genTransId() string {
func (a *Arlo) get(uri, xCloudId string, header http.Header) (*request.Response, error) {
if len(xCloudId) > 0 {
a.rwmutex.Lock()
a.client.BaseHttpHeader.Set("xcloudId", xCloudId)
a.rwmutex.Unlock()
}
return a.client.Get(uri, header)
@ -68,7 +70,9 @@ func (a *Arlo) get(uri, xCloudId string, header http.Header) (*request.Response,
func (a *Arlo) put(uri, xCloudId string, body interface{}, header http.Header) (*request.Response, error) {
if len(xCloudId) > 0 {
a.rwmutex.Lock()
a.client.BaseHttpHeader.Set("xcloudId", xCloudId)
a.rwmutex.Unlock()
}
return a.client.Put(uri, body, header)
@ -76,7 +80,9 @@ func (a *Arlo) put(uri, xCloudId string, body interface{}, header http.Header) (
func (a *Arlo) post(uri, xCloudId string, body interface{}, header http.Header) (*request.Response, error) {
if len(xCloudId) > 0 {
a.rwmutex.Lock()
a.client.BaseHttpHeader.Set("xcloudId", xCloudId)
a.rwmutex.Unlock()
}
return a.client.Post(uri, body, header)