diff --git a/Gopkg.toml b/Gopkg.toml index 22a96b7..f0df04d 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -26,8 +26,8 @@ name = "github.com/mitchellh/mapstructure" [[constraint]] + branch = "master" name = "github.com/pkg/errors" - version = "0.8.0" [[constraint]] branch = "master" diff --git a/arlo.go b/arlo.go index c8184d6..2e996f8 100644 --- a/arlo.go +++ b/arlo.go @@ -1,6 +1,8 @@ package arlo import ( + "sync" + "github.com/jeffreydwalter/arlo-golang/internal/request" "github.com/pkg/errors" @@ -13,6 +15,7 @@ type Arlo struct { Account Account Basestations Basestations Cameras Cameras + rwmutex sync.RWMutex } func newArlo(user string, pass string) (arlo *Arlo) { @@ -96,18 +99,20 @@ func (a *Arlo) GetDevices() (devices Devices, err error) { deviceResponse.Data[i].arlo = a } - // Unsubscribe all of the basestations from the EventStream. + // disconnect all of the basestations from the EventStream. for i := range a.Basestations { - if err := a.Basestations[i].Unsubscribe(); err != nil { + if err := a.Basestations[i].Disconnect(); err != nil { return nil, errors.WithMessage(err, "failed to get devices") } } + a.rwmutex.Lock() // Cache the devices as their respective types. a.Cameras = deviceResponse.Data.GetCameras() a.Basestations = deviceResponse.Data.GetBasestations() + a.rwmutex.Unlock() - // Subscribe each basestation to the EventStream. + // subscribe each basestation to the EventStream. for i := range a.Basestations { if err := a.Basestations[i].Subscribe(); err != nil { return nil, errors.WithMessage(err, "failed to get devices") diff --git a/basestation.go b/basestation.go index 685833e..622753b 100644 --- a/basestation.go +++ b/basestation.go @@ -8,49 +8,18 @@ import ( ) const eventStreamTimeout = 10 * time.Second +const pingTime = 30 * time.Second // A Basestation is a Device that's not type "camera" (basestation, arloq, arloqs, etc.). // This type is here just for semantics. Some methods explicitly require a device of a certain type. type Basestation struct { Device - eventStream *EventStream + eventStream *eventStream } // Basestations is an array of Basestation objects. type Basestations []Basestation -func (b *Basestation) makeEventStreamRequest(payload EventStreamPayload, msg string) (response *EventStreamResponse, err error) { - transId := genTransId() - payload.TransId = transId - - if err := b.IsConnected(); err != nil { - return nil, errors.WithMessage(err, msg) - } - - b.eventStream.Subscriptions[transId] = make(chan *EventStreamResponse) - defer close(b.eventStream.Subscriptions[transId]) - - if err := b.NotifyEventStream(payload, msg); err != nil { - return nil, err - } - - timer := time.NewTimer(eventStreamTimeout) - defer timer.Stop() - - select { - case <-timer.C: - err = fmt.Errorf("event stream response timed out after %.0f second", eventStreamTimeout.Seconds()) - return nil, errors.WithMessage(err, msg) - case response := <-b.eventStream.Subscriptions[transId]: - return response, nil - case err = <-b.eventStream.Error: - return nil, errors.Wrap(err, msg) - case <-b.eventStream.Close: - err = errors.New("event stream was closed before response was read") - return nil, errors.WithMessage(err, msg) - } -} - // Find returns a basestation with the device id passed in. func (bs *Basestations) Find(deviceId string) *Basestation { for _, b := range *bs { @@ -62,30 +31,75 @@ func (bs *Basestations) Find(deviceId string) *Basestation { return nil } -func (b *Basestation) IsConnected() error { - if !b.eventStream.Connected { - return errors.New("basestation not connected to event stream") +// makeEventStreamRequest is a helper function sets up a response channel, sends a message to the event stream, and blocks waiting for the response. +func (b *Basestation) makeEventStreamRequest(payload EventStreamPayload, msg string) (response *EventStreamResponse, err error) { + transId := genTransId() + payload.TransId = transId + + if err := b.IsConnected(); err != nil { + return nil, errors.WithMessage(err, msg) + } + + subscriber := make(subscriber) + + // Add the response channel to the event stream queue so the response can be written to it. + b.eventStream.subscribe(transId, subscriber) + // Make sure we close and remove the response channel before returning. + defer b.eventStream.unsubscribe(transId) + + // Send the payload to the event stream. + if err := b.NotifyEventStream(payload, msg); err != nil { + return nil, err + } + + timer := time.NewTimer(eventStreamTimeout) + defer timer.Stop() + + // Wait for the response to come back from the event stream on the response channel. + select { + // If we get a response, return it to the caller. + case response := <-subscriber: + return response, nil + case err = <-b.eventStream.Error: + return nil, errors.Wrap(err, msg) + // If the event stream is closed, return an error about it. + case <-b.eventStream.Disconnected: + err = errors.New("event stream was closed before response was read") + return nil, errors.WithMessage(err, msg) + // If we timeout, return an error about it. + case <-timer.C: + err = fmt.Errorf("event stream response timed out after %.0f second", eventStreamTimeout.Seconds()) + return nil, errors.WithMessage(err, msg) + } +} + +func (b *Basestation) IsConnected() error { + // If the event stream is closed, return an error about it. + select { + case <-b.eventStream.Disconnected: + return errors.New("basestation not connected to event stream") + default: + return nil } - return nil } func (b *Basestation) Subscribe() error { - b.eventStream = NewEventStream(BaseUrl+fmt.Sprintf(SubscribeUri, b.arlo.Account.Token), b.arlo.client.HttpClient) - connected := b.eventStream.Listen() + b.eventStream = newEventStream(BaseUrl+fmt.Sprintf(SubscribeUri, b.arlo.Account.Token), b.arlo.client.HttpClient) forLoop: for { // We blocking here because we can't really do anything with the event stream until we're connected. // Once we have confirmation that we're connected to the event stream, we will "subscribe" to events. select { - case b.eventStream.Connected = <-connected: - if b.eventStream.Connected { + case connected := <-b.eventStream.listen(): + if connected { break forLoop } else { return errors.New("failed to subscribe to the event stream") } - case <-b.eventStream.Close: - return errors.New("failed to subscribe to the event stream") + case <-b.eventStream.Disconnected: + err := errors.New("event stream was closed") + return errors.WithMessage(err, "failed to subscribe to the event stream") } } @@ -96,9 +110,9 @@ forLoop: // The Arlo event stream requires a "ping" every 30s. go func() { for { - time.Sleep(30 * time.Second) + time.Sleep(pingTime) if err := b.Ping(); err != nil { - b.Unsubscribe() + b.Disconnect() break } } @@ -107,10 +121,10 @@ forLoop: return nil } -func (b *Basestation) Unsubscribe() error { - // Close channel to stop EventStream. +func (b *Basestation) Disconnect() error { + // disconnect channel to stop event stream. if b.eventStream != nil { - close(b.eventStream.Close) + b.eventStream.disconnect() } return nil } diff --git a/camera.go b/camera.go index d79da48..7eab0bd 100644 --- a/camera.go +++ b/camera.go @@ -282,6 +282,14 @@ func (c *Camera) DisableAudioAlerts(sensitivity int) (response *EventStreamRespo return b.makeEventStreamRequest(payload, msg) } +// PushToTalk starts a push-to-talk session. +// FIXME: This feature requires more API calls to make it actually work, and I haven't figure out how to fully implement it. +// It appears that the audio stream is Real-Time Transport Protocol (RTP), which requires a player (ffmpeg?) to consume the stream. +func (c *Camera) PushToTalk() error { + resp, err := c.arlo.get(fmt.Sprintf(PushToTalkUri, c.UniqueId), c.XCloudId, nil) + return checkRequest(resp, err, "failed to enable push to talk") +} + // action: disabled OR recordSnapshot OR recordVideo func (c *Camera) SetAlertNotificationMethods(action string, email, push bool) (response *EventStreamResponse, err error) { payload := EventStreamPayload{ diff --git a/const.go b/const.go index 8f4f6a8..8d2c860 100644 --- a/const.go +++ b/const.go @@ -1,8 +1,6 @@ package arlo const ( - TransIdPrefix = "web" - BaseUrl = "https://arlo.netgear.com/hmsweb" LoginUri = "/login/v2" LogoutUri = "/logout" @@ -13,6 +11,7 @@ const ( ServiceLevelUri = "/users/serviceLevel" OffersUri = "/users/payment/offers" UserProfileUri = "/users/profile" + PushToTalkUri = "/users/devices/%s/pushtotalk" UserChangePasswordUri = "/users/changePassword" UserSessionUri = "/users/session" UserFriendsUri = "/users/friends" @@ -32,4 +31,5 @@ const ( DeviceTypeBasestation = "basestation" DeviceTypeCamera = "camera" DeviceTypeArloQ = "arloq" + TransIdPrefix = "web" ) diff --git a/events_stream.go b/events_stream.go index b8119dc..6d85cc6 100644 --- a/events_stream.go +++ b/events_stream.go @@ -17,33 +17,48 @@ var ( FAILED_TO_SUBSCRIBE = errors.New("failed to subscribe to seeclient") ) -type EventStream struct { - SSEClient *sse.Client - Subscriptions map[string]chan *EventStreamResponse - Events chan *sse.Event - Error chan error - Close chan interface{} - Connected bool - Verbose bool +type subscriber chan *EventStreamResponse - sync.Mutex +type subscribers map[string]subscriber + +type subscriptions struct { + subscribers + rwmutex sync.RWMutex } -func NewEventStream(url string, client *http.Client) *EventStream { +type eventStream struct { + SSEClient *sse.Client + Events chan *sse.Event + Error chan error + Verbose bool + Disconnected chan interface{} + once *sync.Once + + subscriptions +} + +func newEventStream(url string, client *http.Client) *eventStream { SSEClient := sse.NewClient(url) SSEClient.Connection = client - return &EventStream{ + return &eventStream{ SSEClient: SSEClient, Events: make(chan *sse.Event), - Subscriptions: make(map[string]chan *EventStreamResponse), + subscriptions: subscriptions{make(map[string]subscriber), sync.RWMutex{}}, Error: make(chan error), - Close: make(chan interface{}), + Disconnected: make(chan interface{}), + once: new(sync.Once), } } -func (e *EventStream) Listen() (connected chan bool) { +func (e *eventStream) disconnect() { + e.once.Do(func() { + close(e.Disconnected) + }) +} + +func (e *eventStream) listen() (connected chan bool) { connected = make(chan bool) @@ -74,16 +89,17 @@ func (e *EventStream) Listen() (connected chan bool) { if notifyResponse.Status == "connected" { connected <- true } else if notifyResponse.Status == "disconnected" { - connected <- false + e.disconnect() } else { - if subscriber, ok := e.Subscriptions[notifyResponse.TransId]; ok { - e.Lock() + e.subscriptions.rwmutex.RLock() + subscriber, ok := e.subscribers[notifyResponse.TransId] + e.subscriptions.rwmutex.RUnlock() + if ok { subscriber <- notifyResponse - e.Unlock() } } } - case <-e.Close: + case <-e.Disconnected: connected <- false return } @@ -92,3 +108,19 @@ func (e *EventStream) Listen() (connected chan bool) { return connected } + +func (s *subscriptions) unsubscribe(transId string) { + s.rwmutex.Lock() + defer s.rwmutex.Unlock() + if _, ok := s.subscribers[transId]; ok { + close(s.subscribers[transId]) + delete(s.subscribers, transId) + } + +} + +func (s *subscriptions) subscribe(transId string, subscriber subscriber) { + s.rwmutex.Lock() + s.subscribers[transId] = subscriber + s.rwmutex.Unlock() +} diff --git a/internal/request/client.go b/internal/request/client.go index 87c2806..de3b48a 100644 --- a/internal/request/client.go +++ b/internal/request/client.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/json" "io" - "log" "net/http" "net/http/cookiejar" "net/url" @@ -81,7 +80,7 @@ func (c *Client) newRequest(method string, uri string, body interface{}, header return nil, errors.Wrap(err, "failed to create request object") } } - log.Printf("\n\nBODY (%s): %s\n\n", uri, buf) + // log.Printf("\n\nBODY (%s): %s\n\n", uri, buf) u := c.BaseURL.String() + uri req, err := http.NewRequest(method, u, buf) diff --git a/util.go b/util.go index 2c0ac40..b10b027 100644 --- a/util.go +++ b/util.go @@ -60,7 +60,9 @@ func genTransId() string { func (a *Arlo) get(uri, xCloudId string, header http.Header) (*request.Response, error) { if len(xCloudId) > 0 { + a.rwmutex.Lock() a.client.BaseHttpHeader.Set("xcloudId", xCloudId) + a.rwmutex.Unlock() } return a.client.Get(uri, header) @@ -68,7 +70,9 @@ func (a *Arlo) get(uri, xCloudId string, header http.Header) (*request.Response, func (a *Arlo) put(uri, xCloudId string, body interface{}, header http.Header) (*request.Response, error) { if len(xCloudId) > 0 { + a.rwmutex.Lock() a.client.BaseHttpHeader.Set("xcloudId", xCloudId) + a.rwmutex.Unlock() } return a.client.Put(uri, body, header) @@ -76,7 +80,9 @@ func (a *Arlo) put(uri, xCloudId string, body interface{}, header http.Header) ( func (a *Arlo) post(uri, xCloudId string, body interface{}, header http.Header) (*request.Response, error) { if len(xCloudId) > 0 { + a.rwmutex.Lock() a.client.BaseHttpHeader.Set("xcloudId", xCloudId) + a.rwmutex.Unlock() } return a.client.Post(uri, body, header)