diff --git a/arlo.go b/arlo.go index 83df32d..a72e0d1 100644 --- a/arlo.go +++ b/arlo.go @@ -2,21 +2,23 @@ package arlo import ( "context" + "encoding/json" "fmt" - "sync" + "net/http" "time" + log "github.com/sirupsen/logrus" + "github.com/go-resty/resty/v2" ) type Arlo struct { - user string - pass string - client *resty.Client + client *resty.Client + eventStream *eventStream + Account Account - Basestations Basestations - Cameras Cameras - rwmutex sync.RWMutex + Basestations []*Basestation + Cameras []*Camera } func NewArlo() (arlo *Arlo) { @@ -30,11 +32,20 @@ func NewArlo() (arlo *Arlo) { } } +func (a *Arlo) getBaseRequest(body interface{}, result interface{}, errResult interface{}, xcloudId string) *resty.Request { + r := a.client.R(). + SetBody(body). + SetResult(result). + SetError(errResult) + if xcloudId != "" { + r.SetHeader("xcloudId", xcloudId) + } + return r +} + func (a *Arlo) get(url string, result interface{}) error { var errorResponse ErrorResponse - resp, err := a.client.R(). - SetResult(result). - SetError(&errorResponse). + resp, err := a.getBaseRequest(nil, result, &errorResponse, ""). Get(url) if err != nil { return err @@ -47,14 +58,9 @@ func (a *Arlo) get(url string, result interface{}) error { func (a *Arlo) post(url string, body interface{}, result interface{}, xcloudId string) error { var errorResponse ErrorResponse - request := a.client.R(). - SetBody(body). - SetResult(result). - SetError(&errorResponse) - if xcloudId != "" { - request.SetHeader("xcloudId", xcloudId) - } - resp, err := request.Post(url) + request := a.getBaseRequest(body, result, &errorResponse, xcloudId) + resp, err := request. + Post(url) if err != nil { return err } @@ -66,13 +72,8 @@ func (a *Arlo) post(url string, body interface{}, result interface{}, xcloudId s func (a *Arlo) put(url string, result interface{}, xcloudId string) error { var errorResponse ErrorResponse - request := a.client.R(). - SetResult(result). - SetError(&errorResponse) - if xcloudId != "" { - request.SetHeader("xcloudId", xcloudId) - } - resp, err := request.Put(url) + resp, err := a.getBaseRequest(nil, result, &errorResponse, xcloudId). + Put(url) if err != nil { return err } @@ -94,11 +95,53 @@ func (a *Arlo) Login(ctx context.Context, user string, pass string) error { if !loginResponse.Success { return fmt.Errorf("no success but no error") } - a.client.SetHeader("Authorization", loginResponse.Data.Token) a.Account = loginResponse.Data - if _, err := a.GetDevices(ctx); err != nil { - return fmt.Errorf("getting devices: %v", err) + a.client.SetHeader("Authorization", a.Account.Token) + + var response DeviceResponse + err = a.get(fmt.Sprintf(DevicesUri, time.Now().Format("20060102")), &response) + if err != nil { + return err } + if !response.Success { + return fmt.Errorf("no success but no error") + } + if len(response.Data) == 0 { + return fmt.Errorf("no device found") + } + + a.Basestations = nil + a.Cameras = nil + for _, device := range response.Data { + if device.IsBasestation() { + a.Basestations = append(a.Basestations, NewBaseStation(a, device)) + continue + } + if device.IsCamera() { + a.Cameras = append(a.Cameras, NewCamera(a, device)) + } + } + + if err := a.Subscribe(ctx); err != nil { + return fmt.Errorf("subscribing to event stream: %v", err) + } + + go func(ctx context.Context) { + ticker := time.NewTicker(pingTime) + for { + select { + case <-ctx.Done(): + return + case _ = <-ticker.C: + if err := a.Ping(ctx); err != nil { + log.Errorf("Pingloop > error while pinging: %v > disconnect event stream", err) + _ = a.Disconnect() + return + } + } + } + }(ctx) + return nil } @@ -115,56 +158,16 @@ func (a *Arlo) Logout() error { return nil } -func (a *Arlo) GetSession() (*Session, error) { +func (a *Arlo) GetSession() (Session, error) { var response SessionResponse err := a.get(SessionUri, &response) if err != nil { - return nil, err + return Session{}, err } if !response.Success { - return nil, fmt.Errorf("no success but no error") + return Session{}, fmt.Errorf("no success but no error") } - return &response.Data, nil -} - -func (a *Arlo) GetDevices(ctx context.Context) (*Devices, error) { - var response DeviceResponse - err := a.get(fmt.Sprintf(DevicesUri, time.Now().Format("20060102")), &response) - if err != nil { - return nil, err - } - if !response.Success { - return nil, fmt.Errorf("no success but no error") - } - if len(response.Data) == 0 { - return nil, fmt.Errorf("no device found") - } - - // Cache a pointer to the arlo object with each device. - for i := range response.Data { - response.Data[i].arlo = a - } - - // Disconnect all of the basestations from the EventStream. - for _, basestation := range a.Basestations { - if err := basestation.Disconnect(); err != nil { - return nil, fmt.Errorf("disconnecting device %s: %v", basestation.DeviceName, err) - } - } - - a.rwmutex.Lock() - // Cache the devices as their respective types. - a.Cameras = response.Data.GetCameras() - a.Basestations = response.Data.GetBasestations() - a.rwmutex.Unlock() - - // subscribe each basestation to the EventStream. - for _, basestation := range a.Basestations { - if err := basestation.Subscribe(ctx); err != nil { - return nil, fmt.Errorf("subscribing device %s: %v", basestation.DeviceName, err) - } - } - return &response.Data, nil + return response.Data, nil } // GetProfile returns the user profile for the currently logged in user. @@ -179,3 +182,171 @@ func (a *Arlo) GetProfile() (*UserProfile, error) { } return &response.Data, nil } + +func (a *Arlo) GetBaseStation(deviceId string) *Basestation { + for _, basestation := range a.Basestations { + if basestation.DeviceId == deviceId { + return basestation + } + } + return nil +} + +func (a *Arlo) makeEventStreamRequest(ctx context.Context, payload EventStreamPayload, xcloudid string) (*EventStreamResponse, error) { + + if !a.IsConnected() { + log.Infof("event stream not connected: reconnecting") + err := a.Subscribe(ctx) + if err != nil { + return nil, fmt.Errorf("reconnecting to event stream: %v", err) + } + } + + transId := genTransId() + payload.TransId = transId + + responseChan := a.eventStream.subscribeTransaction(transId) + + // Send the payload to the event stream. + if err := a.NotifyEventStream(payload, xcloudid); err != nil { + return nil, fmt.Errorf("notifying event stream: %v", err) + } + timer := time.NewTimer(eventStreamTimeout) + + select { + case response := <-responseChan: + return response, nil + case err := <-a.eventStream.Error: + return nil, fmt.Errorf("event stream error: %v", err) + case <-a.eventStream.disconnectedChan: + log.Warn("event stream was closed before response was read") + return a.makeEventStreamRequest(ctx, payload, xcloudid) + case <-timer.C: + return nil, fmt.Errorf("event stream response timed out after %.0f second", eventStreamTimeout.Seconds()) + } +} + +func (a *Arlo) IsConnected() bool { + select { + case <-a.eventStream.disconnectedChan: + return false + default: + return true + } +} + +func (a *Arlo) Subscribe(ctx context.Context) error { + a.eventStream = newEventStream( + BaseUrl+fmt.Sprintf(NotifyResponsesPushServiceUri), + &http.Client{Jar: a.client.GetClient().Jar}, + a.client.Header.Get("Authorization"), + ) + + err := a.eventStream.listen(ctx) + if err != nil { + return fmt.Errorf("setting up event stream: %v", err) + } + +connectedloop: + for { + select { + case <-ctx.Done(): + return fmt.Errorf("failed to subscribe to the event stream: requesting shutdown") + case <-a.eventStream.disconnectedChan: + return fmt.Errorf("failed to subscribe to the event stream: disconnected") + default: + if a.eventStream.GetIsConnected() { + break connectedloop + } + time.Sleep(100 * time.Millisecond) + } + } + + if err := a.Ping(ctx); err != nil { + _ = a.Disconnect() + return fmt.Errorf("pingloop > error while pinging: %v > disconnect event stream", err) + } + + for _, camera := range a.Cameras { + camera.subscribeToStateUpdate() + } + + return nil +} + +func (a *Arlo) Unsubscribe(xcloudid string) error { + var response BaseResponse + err := a.put(UnsubscribeUri, &response, xcloudid) + if err != nil { + return err + } + if !response.Success { + return fmt.Errorf("no success but no error") + } + return nil +} + +func (a *Arlo) Disconnect() error { + // disconnect channel to stop event stream. + if a.eventStream != nil { + a.eventStream.disconnect() + } + return nil +} + +// Ping makes a call to the subscriptions endpoint. The Arlo event stream requires this message to be sent every 30s. + +func (a *Arlo) NotifyEventStream(payload EventStreamPayload, xcloudId string) error { + var response ErrorResponse + err := a.post(fmt.Sprintf(NotifyUri, payload.To), payload, &response, xcloudId) + if err != nil { + return err + } + if !response.Success { + if response.Reason != "" { + return fmt.Errorf(response.Reason) + } else { + return fmt.Errorf("no success but no error") + } + } + return nil +} + +func (a *Arlo) makeRequest(ctx context.Context, deviceId string, xcloudid string, action string, resource string, publishResponse bool, properties interface{}, result interface{}) error { + payload := EventStreamPayload{ + Action: action, + Resource: resource, + PublishResponse: publishResponse, + Properties: properties, + From: fmt.Sprintf("%s_%s", a.Account.UserId, TransIdPrefix), + To: deviceId, + } + resp, err := a.makeEventStreamRequest(ctx, payload, xcloudid) + if err != nil { + return fmt.Errorf("making event stream request: %v", err) + } + if result != nil { + err = json.Unmarshal(resp.RawProperties, result) + if err != nil { + return fmt.Errorf("unmarshalling properties: %v", err) + } + } + return nil +} + +func (a *Arlo) Ping(ctx context.Context) error { + var devices []string + var xcloudid string + for _, basestation := range a.Basestations { + devices = append(devices, basestation.DeviceId) + xcloudid = basestation.XCloudId + } + if len(devices) == 0 { + return fmt.Errorf("no basestation registered") + } + err := a.makeRequest(ctx, devices[0], xcloudid, "set", fmt.Sprintf("subscriptions/%s_%s", a.Account.UserId, TransIdPrefix), false, map[string][]string{"devices": devices}, nil) + if err != nil { + return err + } + return nil +} diff --git a/basestation.go b/basestation.go index 0da5da9..2498dd5 100644 --- a/basestation.go +++ b/basestation.go @@ -2,12 +2,8 @@ package arlo import ( "context" - "encoding/json" "fmt" - "net/http" "time" - - log "github.com/sirupsen/logrus" ) const eventStreamTimeout = 30 * time.Second @@ -16,45 +12,8 @@ const pingTime = 30 * time.Second // A Basestation is a Device that's not type "camera" (basestation, arloq, arloqs, etc.). // This type is here just for semantics. Some methods explicitly require a device of a certain type. type Basestation struct { + arlo *Arlo Device - eventStream *eventStream -} - -type BaseStationState struct { - InterfaceVersion int `json:"interfaceVersion"` - APIVersion int `json:"apiVersion"` - State string `json:"state"` - SwVersion string `json:"swVersion"` - HwVersion string `json:"hwVersion"` - ModelID string `json:"modelId"` - Capabilities []string `json:"capabilities"` - McsEnabled bool `json:"mcsEnabled"` - AutoUpdateEnabled bool `json:"autoUpdateEnabled"` - UpdateAvailable interface{} `json:"updateAvailable"` - TimeZone string `json:"timeZone"` - OlsonTimeZone string `json:"olsonTimeZone"` - UploadBandwidthSaturated bool `json:"uploadBandwidthSaturated"` - AntiFlicker struct { - Mode int `json:"mode"` - AutoDefault int `json:"autoDefault"` - } `json:"antiFlicker"` - LowBatteryAlert struct { - Enabled bool `json:"enabled"` - } `json:"lowBatteryAlert"` - LowSignalAlert struct { - Enabled bool `json:"enabled"` - } `json:"lowSignalAlert"` - Claimed bool `json:"claimed"` - TimeSyncState string `json:"timeSyncState"` - Connectivity []struct { - Type string `json:"type"` - Connected bool `json:"connected"` - } `json:"connectivity"` - Groups []interface{} `json:"groups"` - LocalCert struct { - OwnCert string `json:"ownCert"` - PeerCerts []string `json:"peerCerts"` - } `json:"localCert"` } type GetModesResponse struct { @@ -100,177 +59,18 @@ type CalendarMode struct { } `json:"schedule"` } -// Basestations is a slice of Basestation objects. -type Basestations []*Basestation - -// Find returns a basestation with the device id passed in. -func (bs *Basestations) Find(deviceId string) *Basestation { - for _, b := range *bs { - if b.DeviceId == deviceId { - return b - } +func NewBaseStation(arlo *Arlo, device Device) *Basestation { + return &Basestation{ + arlo: arlo, + Device: device, } - return nil } // makeEventStreamRequest is a helper function sets up a response channel, sends a message to the event stream, and blocks waiting for the response. -func (b *Basestation) makeEventStreamRequest(ctx context.Context, payload EventStreamPayload) (*EventStreamResponse, error) { - - if !b.IsConnected() { - log.Infof("event stream not connected: reconnecting") - err := b.Subscribe(ctx) - if err != nil { - return nil, fmt.Errorf("reconnecting to event stream: %v", err) - } - } - - transId := genTransId() - payload.TransId = transId - - responseChan := b.eventStream.subscribeTransaction(transId) - - // Send the payload to the event stream. - if err := b.NotifyEventStream(payload); err != nil { - return nil, fmt.Errorf("notifying event stream: %v", err) - } - timer := time.NewTimer(eventStreamTimeout) - - select { - case response := <-responseChan: - return response, nil - case err := <-b.eventStream.Error: - return nil, fmt.Errorf("event stream error: %v", err) - case <-b.eventStream.disconnectedChan: - log.Warn("event stream was closed before response was read") - return b.makeEventStreamRequest(ctx, payload) - case <-timer.C: - return nil, fmt.Errorf("event stream response timed out after %.0f second", eventStreamTimeout.Seconds()) - } -} - -func (b *Basestation) IsConnected() bool { - select { - case <-b.eventStream.disconnectedChan: - return false - default: - return true - } -} - -func (b *Basestation) Subscribe(ctx context.Context) error { - b.eventStream = newEventStream( - BaseUrl+fmt.Sprintf(NotifyResponsesPushServiceUri, b.arlo.Account.Token), - &http.Client{Jar: b.arlo.client.GetClient().Jar}) - - connectedChan, err := b.eventStream.listen(ctx) - if err != nil { - return fmt.Errorf("setting up event stream: %v", err) - } - select { - case <-ctx.Done(): - return fmt.Errorf("failed to subscribe to the event stream: requesting shutdown") - case connected := <-connectedChan: - if !connected { - return fmt.Errorf("failed to subscribe to the event stream") - } - } - - if err := b.Ping(ctx); err != nil { - _ = b.Disconnect() - return fmt.Errorf("Pingloop > error while pinging: %v > disconnect event stream", err) - } - - // The Arlo event stream requires a "ping" every 30s. - go func(ctx context.Context) { - ticker := time.NewTicker(pingTime) - for { - select { - case <-ctx.Done(): - return - case _ = <-ticker.C: - if err := b.Ping(ctx); err != nil { - log.Errorf("Pingloop > error while pinging: %v > disconnect event stream", err) - _ = b.Disconnect() - return - } - } - } - }(ctx) - - return nil -} - -func (b *Basestation) Unsubscribe() error { - var response BaseResponse - err := b.arlo.put(UnsubscribeUri, &response, b.XCloudId) - if err != nil { - return err - } - if !response.Success { - return fmt.Errorf("no success but no error") - } - return nil -} - -func (b *Basestation) Disconnect() error { - // disconnect channel to stop event stream. - if b.eventStream != nil { - b.eventStream.disconnect() - } - return nil -} - -// Ping makes a call to the subscriptions endpoint. The Arlo event stream requires this message to be sent every 30s. - -func (b *Basestation) NotifyEventStream(payload EventStreamPayload) error { - var response ErrorResponse - err := b.arlo.post(fmt.Sprintf(NotifyUri, b.DeviceId), payload, &response, b.XCloudId) - if err != nil { - return err - } - if !response.Success { - if response.Reason != "" { - return fmt.Errorf(response.Reason) - } else { - return fmt.Errorf("no success but no error") - } - } - return nil -} - -func (b *Basestation) makeRequest(ctx context.Context, action string, resource string, publishResponse bool, properties interface{}, result interface{}) error { - payload := EventStreamPayload{ - Action: action, - Resource: resource, - PublishResponse: publishResponse, - Properties: properties, - From: fmt.Sprintf("%s_%s", b.UserId, TransIdPrefix), - To: b.DeviceId, - } - resp, err := b.makeEventStreamRequest(ctx, payload) - if err != nil { - return fmt.Errorf("making event stream request: %v", err) - } - if result != nil { - err = json.Unmarshal(resp.RawProperties, result) - if err != nil { - return fmt.Errorf("unmarshalling properties: %v", err) - } - } - return nil -} - -func (b *Basestation) Ping(ctx context.Context) error { - err := b.makeRequest(ctx, "set", fmt.Sprintf("subscriptions/%s_%s", b.UserId, TransIdPrefix), false, map[string][1]string{"devices": {b.DeviceId}}, nil) - if err != nil { - return err - } - return nil -} func (b *Basestation) GetState(ctx context.Context) (*BaseStationState, error) { var state BaseStationState - err := b.makeRequest(ctx, "get", "basestation", false, nil, &state) + err := b.arlo.makeRequest(ctx, b.DeviceId, b.XCloudId, "get", "basestation", false, nil, &state) if err != nil { return nil, fmt.Errorf("getting basestation %s state: %v", b.DeviceName, err) } @@ -279,7 +79,7 @@ func (b *Basestation) GetState(ctx context.Context) (*BaseStationState, error) { func (b *Basestation) GetAllCameraState(ctx context.Context) ([]CameraState, error) { var states []CameraState - err := b.makeRequest(ctx, "get", "cameras", false, nil, &states) + err := b.arlo.makeRequest(ctx, b.DeviceId, b.XCloudId, "get", "cameras", false, nil, &states) if err != nil { return nil, fmt.Errorf("getting associated cameras state: %v", err) } @@ -288,7 +88,7 @@ func (b *Basestation) GetAllCameraState(ctx context.Context) ([]CameraState, err func (b *Basestation) GetRules(ctx context.Context) ([]Rule, error) { var resp GetRulesResponse - err := b.makeRequest(ctx, "get", "rules", false, nil, &resp) + err := b.arlo.makeRequest(ctx, b.DeviceId, b.XCloudId, "get", "rules", false, nil, &resp) if err != nil { return nil, fmt.Errorf("getting rules: %v", err) } @@ -297,7 +97,7 @@ func (b *Basestation) GetRules(ctx context.Context) ([]Rule, error) { func (b *Basestation) GetCalendarMode(ctx context.Context) (*CalendarMode, error) { var calendarMode CalendarMode - err := b.makeRequest(ctx, "get", "schedule", false, nil, &calendarMode) + err := b.arlo.makeRequest(ctx, b.DeviceId, b.XCloudId, "get", "schedule", false, nil, &calendarMode) if err != nil { return nil, fmt.Errorf("getting calendar mode: %v", err) } @@ -309,7 +109,7 @@ func (b *Basestation) GetCalendarMode(ctx context.Context) (*CalendarMode, error // You should probably do the same, although, the UI reflects the switch from calendar mode to say armed mode without explicitly setting calendar mode to inactive. func (b *Basestation) SetCalendarMode(ctx context.Context, active bool) error { resp := make(map[string]bool) - err := b.makeRequest(ctx, "set", "schedule", true, struct { + err := b.arlo.makeRequest(ctx, b.DeviceId, b.XCloudId, "set", "schedule", true, struct { Active bool `json:"active"` }{ Active: active, @@ -329,7 +129,7 @@ func (b *Basestation) SetCalendarMode(ctx context.Context, active bool) error { func (b *Basestation) GetModes(ctx context.Context) (*GetModesResponse, error) { var resp GetModesResponse - err := b.makeRequest(ctx, "get", "modes", false, nil, &resp) + err := b.arlo.makeRequest(ctx, b.DeviceId, b.XCloudId, "get", "modes", false, nil, &resp) if err != nil { return nil, fmt.Errorf("getting modes: %v", err) } @@ -338,7 +138,7 @@ func (b *Basestation) GetModes(ctx context.Context) (*GetModesResponse, error) { func (b *Basestation) SetCustomMode(ctx context.Context, mode string) error { resp := make(map[string]string) - err := b.makeRequest(ctx, "set", "modes", true, struct { + err := b.arlo.makeRequest(ctx, b.DeviceId, b.XCloudId, "set", "modes", true, struct { Active string `json:"active"` }{ Active: mode, @@ -357,7 +157,7 @@ func (b *Basestation) SetCustomMode(ctx context.Context, mode string) error { } func (b *Basestation) DeleteMode(ctx context.Context, mode string) error { - err := b.makeRequest(ctx, "delete", fmt.Sprintf("modes/%s", mode), true, nil, nil) + err := b.arlo.makeRequest(ctx, b.DeviceId, b.XCloudId, "delete", fmt.Sprintf("modes/%s", mode), true, nil, nil) if err != nil { return fmt.Errorf("deleting mode %s: %v", mode, err) } @@ -389,7 +189,7 @@ type SetSirenResponse struct { func (b *Basestation) SirenOn(ctx context.Context) error { var response SetSirenResponse - err := b.makeRequest(ctx, "set", "siren", true, SirenProperties{ + err := b.arlo.makeRequest(ctx, b.DeviceId, b.XCloudId, "set", "siren", true, SirenProperties{ SirenState: "on", Duration: 300, Volume: 8, @@ -406,7 +206,7 @@ func (b *Basestation) SirenOn(ctx context.Context) error { func (b *Basestation) SirenOff(ctx context.Context) error { var response SetSirenResponse - err := b.makeRequest(ctx, "set", "siren", true, SirenProperties{ + err := b.arlo.makeRequest(ctx, b.DeviceId, b.XCloudId, "set", "siren", true, SirenProperties{ SirenState: "off", Duration: 300, Volume: 8, diff --git a/basestation_state.go b/basestation_state.go new file mode 100644 index 0000000..103ae79 --- /dev/null +++ b/basestation_state.go @@ -0,0 +1,38 @@ +package arlo + +type BaseStationState struct { + InterfaceVersion int `json:"interfaceVersion"` + APIVersion int `json:"apiVersion"` + State string `json:"state"` + SwVersion string `json:"swVersion"` + HwVersion string `json:"hwVersion"` + ModelID string `json:"modelId"` + Capabilities []string `json:"capabilities"` + McsEnabled bool `json:"mcsEnabled"` + AutoUpdateEnabled bool `json:"autoUpdateEnabled"` + UpdateAvailable interface{} `json:"updateAvailable"` + TimeZone string `json:"timeZone"` + OlsonTimeZone string `json:"olsonTimeZone"` + UploadBandwidthSaturated bool `json:"uploadBandwidthSaturated"` + AntiFlicker struct { + Mode int `json:"mode"` + AutoDefault int `json:"autoDefault"` + } `json:"antiFlicker"` + LowBatteryAlert struct { + Enabled bool `json:"enabled"` + } `json:"lowBatteryAlert"` + LowSignalAlert struct { + Enabled bool `json:"enabled"` + } `json:"lowSignalAlert"` + Claimed bool `json:"claimed"` + TimeSyncState string `json:"timeSyncState"` + Connectivity []struct { + Type string `json:"type"` + Connected bool `json:"connected"` + } `json:"connectivity"` + Groups []interface{} `json:"groups"` + LocalCert struct { + OwnCert string `json:"ownCert"` + PeerCerts []string `json:"peerCerts"` + } `json:"localCert"` +} diff --git a/camera.go b/camera.go index 194c112..dbc8484 100644 --- a/camera.go +++ b/camera.go @@ -4,13 +4,22 @@ import ( "context" "encoding/json" "fmt" + "sync" log "github.com/sirupsen/logrus" ) // A Camera is a Device of type "camera". // This type is here just for semantics. Some methods explicitly require a device of a certain type. -type Camera Device +type Camera struct { + arlo *Arlo + Device + CameraState + stateMutex sync.RWMutex + + motionSubscribers []chan bool + motionSubscribersMutex sync.RWMutex +} type CameraState struct { InterfaceVersion int `json:"interfaceVersion"` @@ -80,27 +89,15 @@ type CameraState struct { BestLocalLiveStreaming string `json:"bestLocalLiveStreaming"` } -// Cameras is a slice of Camera objects. -type Cameras []*Camera - -// Find returns a camera with the device id passed in. -func (cs *Cameras) Find(deviceId string) *Camera { - for _, c := range *cs { - if c.DeviceId == deviceId { - return c - } +func NewCamera(arlo *Arlo, device Device) *Camera { + return &Camera{ + arlo: arlo, + Device: device, } - - return nil } func (c *Camera) On(ctx context.Context) error { - b := c.arlo.Basestations.Find(c.ParentId) - if b == nil { - return fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId) - } - - err := b.makeRequest(ctx, "set", fmt.Sprintf("cameras/%s", c.DeviceId), true, CameraProperties{ + err := c.arlo.makeRequest(ctx, c.DeviceId, c.XCloudId, "set", fmt.Sprintf("cameras/%s", c.DeviceId), true, CameraProperties{ PrivacyActive: false, }, nil) if err != nil { @@ -119,14 +116,9 @@ func (c *Camera) Off(ctx context.Context) (response *EventStreamResponse, err er PrivacyActive: true, }, From: fmt.Sprintf("%s_%s", c.UserId, TransIdPrefix), - To: c.ParentId, + To: c.DeviceId, } - - b := c.arlo.Basestations.Find(c.ParentId) - if b == nil { - return nil, fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId) - } - return b.makeEventStreamRequest(ctx, payload) + return c.arlo.makeEventStreamRequest(ctx, payload, c.XCloudId) } // SetBrightness sets the camera brightness. @@ -148,21 +140,14 @@ func (c *Camera) SetBrightness(ctx context.Context, brightness int) (response *E Brightness: brightness, }, From: fmt.Sprintf("%s_%s", c.UserId, TransIdPrefix), - To: c.ParentId, + To: c.DeviceId, } - b := c.arlo.Basestations.Find(c.ParentId) - if b == nil { - return nil, fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId) - } - return b.makeEventStreamRequest(ctx, payload) + + return c.arlo.makeEventStreamRequest(ctx, payload, c.XCloudId) } func (c *Camera) EnableMotionAlerts(ctx context.Context, sensitivity int, zones []string) error { - b := c.arlo.Basestations.Find(c.ParentId) - if b == nil { - return fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId) - } - err := b.makeRequest(ctx, "set", fmt.Sprintf("cameras/%s", c.DeviceId), true, MotionDetectionProperties{ + err := c.arlo.makeRequest(ctx, c.DeviceId, c.XCloudId, "set", fmt.Sprintf("cameras/%s", c.DeviceId), true, MotionDetectionProperties{ BaseDetectionProperties: BaseDetectionProperties{ Armed: true, Sensitivity: sensitivity, @@ -189,14 +174,10 @@ func (c *Camera) DisableMotionAlerts(ctx context.Context, sensitivity int, zones }, }, From: fmt.Sprintf("%s_%s", c.UserId, TransIdPrefix), - To: c.ParentId, + To: c.DeviceId, } - b := c.arlo.Basestations.Find(c.ParentId) - if b == nil { - return nil, fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId) - } - return b.makeEventStreamRequest(ctx, payload) + return c.arlo.makeEventStreamRequest(ctx, payload, c.XCloudId) } func (c *Camera) EnableAudioAlerts(ctx context.Context, sensitivity int) (response *EventStreamResponse, err error) { @@ -213,12 +194,7 @@ func (c *Camera) EnableAudioAlerts(ctx context.Context, sensitivity int) (respon From: fmt.Sprintf("%s_%s", c.UserId, TransIdPrefix), To: c.ParentId, } - - b := c.arlo.Basestations.Find(c.ParentId) - if b == nil { - return nil, fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId) - } - return b.makeEventStreamRequest(ctx, payload) + return c.arlo.makeEventStreamRequest(ctx, payload, c.XCloudId) } func (c *Camera) DisableAudioAlerts(ctx context.Context, sensitivity int) (response *EventStreamResponse, err error) { @@ -233,14 +209,10 @@ func (c *Camera) DisableAudioAlerts(ctx context.Context, sensitivity int) (respo }, }, From: fmt.Sprintf("%s_%s", c.UserId, TransIdPrefix), - To: c.ParentId, + To: c.DeviceId, } - b := c.arlo.Basestations.Find(c.ParentId) - if b == nil { - return nil, fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId) - } - return b.makeEventStreamRequest(ctx, payload) + return c.arlo.makeEventStreamRequest(ctx, payload, c.XCloudId) } // action: disabled OR recordSnapshot OR recordVideo @@ -262,34 +234,53 @@ func (c *Camera) SetAlertNotificationMethods(ctx context.Context, action string, }, }, From: fmt.Sprintf("%s_%s", c.UserId, TransIdPrefix), - To: c.ParentId, + To: c.DeviceId, } - b := c.arlo.Basestations.Find(c.ParentId) - if b == nil { - return nil, fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId) - } - return b.makeEventStreamRequest(ctx, payload) + return c.arlo.makeEventStreamRequest(ctx, payload, c.XCloudId) } -func (c *Camera) SubscribeToMotionDetection(ctx context.Context) (chan bool, error) { - b := c.arlo.Basestations.Find(c.ParentId) - if b == nil { - return nil, fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId) - } - out := make(chan bool) - respChan := b.eventStream.subscribeResource(fmt.Sprintf("cameras/%s", c.DeviceId)) +func (c *Camera) subscribeToStateUpdate() { go func() { + respChan := c.arlo.eventStream.subscribeResource(fmt.Sprintf("cameras/%s", c.DeviceId)) for msg := range respChan { - var state CameraState - err := json.Unmarshal(msg.RawProperties, &state) + c.stateMutex.Lock() + old := *c + err := json.Unmarshal(msg.RawProperties, &c) if err != nil { - log.Errorf("unmarshalling properties: %v", err) + log.Errorf("camera state update > unmarshalling properties: %v", err) continue } - out <- state.MotionDetected + c.stateMutex.Unlock() + c.processStateChanges(old, *c) } }() - - return out, nil +} + +func (c *Camera) RefreshState(ctx context.Context) error { + old := *c + err := c.arlo.makeRequest(ctx, c.DeviceId, c.XCloudId, "get", fmt.Sprintf("cameras/%s", c.DeviceId), false, nil, &c) + if err != nil { + return err + } + c.processStateChanges(old, *c) + return nil +} + +func (c *Camera) processStateChanges(old Camera, new Camera) { + if old.MotionDetected != new.MotionDetected { + c.motionSubscribersMutex.RLock() + for _, subscriber := range c.motionSubscribers { + subscriber <- new.MotionDetected + } + c.motionSubscribersMutex.RUnlock() + } +} + +func (c *Camera) SubscribeToMotion() chan bool { + c.motionSubscribersMutex.Lock() + defer c.motionSubscribersMutex.Unlock() + out := make(chan bool) + c.motionSubscribers = append(c.motionSubscribers, out) + return out } diff --git a/cmd/main.go b/cmd/main.go index 2ae037d..34fa697 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -15,14 +15,16 @@ func main() { log.Errorf("login: %v", err) return } + for _, device := range a.Cameras { + err := device.RefreshState(ctx) + if err != nil { + log.Errorf("%v", err) + } + } for _, device := range a.Cameras { if device.DeviceName == "Salon" { - motionChan, err := device.SubscribeToMotionDetection(ctx) - if err != nil { - log.Errorf("subscribing to motion: %v", err) - return - } + motionChan := device.SubscribeToMotion() for b := range motionChan { log.Infof("motion salon %t", b) @@ -30,6 +32,4 @@ func main() { } } - select {} - } diff --git a/const.go b/const.go index 716d390..954410d 100644 --- a/const.go +++ b/const.go @@ -75,7 +75,7 @@ const ( MigrateZonesUri = "/users/devices/%uniqueId/activityzones/migrate" MobileOffersUri = "/users/payment/offers/dataplans/v5" ModifyBillingUri = "/users/payment/billing/%paymentId" - NotifyResponsesPushServiceUri = "/client/subscribe?token=%s" + NotifyResponsesPushServiceUri = "/client/subscribe" NotifyUri = "/users/devices/notify/%s" OffersDetailsUri = "/users/payment/offersdetail" OffersDvrChangeUri = "/users/payment/offers/arloq/html/v5/change" diff --git a/devices.go b/devices.go index 0658631..9ad11da 100644 --- a/devices.go +++ b/devices.go @@ -2,7 +2,6 @@ package arlo // A Device is the device data, this can be a camera, basestation, arloq, etc. type Device struct { - arlo *Arlo // Let's hold a reference to the parent arlo object since it holds the http.Client object and references to all devices. AnalyticsEnabled bool `json:"analyticsEnabled"` ArloMobilePlan bool `json:"arloMobilePlan"` ArloMobilePlanId string `json:"arloMobilePlanId"` @@ -42,45 +41,6 @@ type Device struct { XCloudId string `json:"xCloudId"` } -// Devices is a slice of Device objects. -type Devices []*Device - -// A DeviceOrder holds a map of device ids and a numeric index. The numeric index is the device order. -// Device order is mainly used by the UI to determine which order to show the devices. -/* -{ - "devices":{ - "XXXXXXXXXXXXX":1, - "XXXXXXXXXXXXX":2, - "XXXXXXXXXXXXX":3 -} -*/ -type DeviceOrder struct { - Devices map[string]int `json:"devices"` -} - -// Find returns a device with the device id passed in. -func (ds Devices) Find(deviceId string) *Device { - for _, d := range ds { - if d.DeviceId == deviceId { - return d - } - } - - return nil -} - -func (ds Devices) FindCameras(basestationId string) Cameras { - cs := Cameras{} - for _, d := range ds { - if d.ParentId == basestationId { - cam := Camera(*d) - cs = append(cs, &cam) - } - } - return cs -} - func (d Device) IsBasestation() bool { return d.DeviceType == DeviceTypeBasestation } @@ -106,39 +66,3 @@ func (d Device) IsLight() bool { func (d Device) IsSiren() bool { return d.DeviceType == DeviceTypeSiren } - -// GetBasestations returns a Basestations object containing all devices that are NOT type "camera". -// I did this because some device types, like arloq, don't have a basestation. -// So, when interacting with them you must treat them like a basestation and a camera. -// Cameras also includes devices of this type, so you can get the same data there or cast. -func (ds Devices) GetBasestations() Basestations { - var basestations Basestations - for _, d := range ds { - if d.IsBasestation() { - basestations = append(basestations, &Basestation{Device: *d}) - } - } - return basestations -} - -// GetCameras returns a Cameras object containing all devices that are of type "camera". -// I did this because some device types, like arloq, don't have a basestation. -// So, when interacting with them you must treat them like a basestation and a camera. -// Basestations also includes devices of this type, so you can get the same data there or cast. -func (ds Devices) GetCameras() Cameras { - var cameras Cameras - for _, d := range ds { - if d.IsCamera() { - cam := Camera(*d) - cameras = append(cameras, &cam) - } - } - return cameras -} - -// UpdateDeviceName sets the name of the given device to the name argument. -//func (d *Device) UpdateDeviceName(name string) error { -// body := map[string]string{"deviceId": d.DeviceId, "deviceName": name, "parentId": d.ParentId} -// resp, err := d.arlo.put(RenameDeviceUri, d.XCloudId, body, nil) -// return checkRequest(resp, err, "failed to update device name") -//} diff --git a/events_stream.go b/event_stream.go similarity index 83% rename from events_stream.go rename to event_stream.go index 9aee1cd..1f4c1c7 100644 --- a/events_stream.go +++ b/event_stream.go @@ -20,6 +20,9 @@ type eventStream struct { disconnectedChan chan interface{} once *sync.Once + isConnected bool + isConnectedMutex sync.RWMutex + transactionSubscribers map[string]chan *EventStreamResponse transactionSubscribersMutex sync.RWMutex @@ -27,7 +30,7 @@ type eventStream struct { resourceSubscribersMutex sync.RWMutex } -func newEventStream(url string, client *http.Client) *eventStream { +func newEventStream(url string, client *http.Client, authHeader string) *eventStream { e := &eventStream{ Events: make(chan *sse.Event), transactionSubscribers: make(map[string]chan *EventStreamResponse), @@ -36,10 +39,13 @@ func newEventStream(url string, client *http.Client) *eventStream { resourceSubscribersMutex: sync.RWMutex{}, disconnectedChan: make(chan interface{}), once: new(sync.Once), + isConnected: false, + isConnectedMutex: sync.RWMutex{}, } SSEClient := sse.NewClient(url) SSEClient.Connection = client + SSEClient.Headers["Authorization"] = authHeader SSEClient.OnDisconnect(func(c *sse.Client) { e.disconnect() }) @@ -49,23 +55,24 @@ func newEventStream(url string, client *http.Client) *eventStream { func (e *eventStream) disconnect() { e.once.Do(func() { + log.Info("disconnect > close disconnectedChan") close(e.disconnectedChan) }) } -func (e *eventStream) listen(ctx context.Context) (chan bool, error) { +func (e *eventStream) listen(ctx context.Context) error { + log.Info("eventStream.listen > start") err := e.SSEClient.SubscribeChanRaw(e.Events) if err != nil { - return nil, fmt.Errorf("failed to subscribe to seeclient") + return fmt.Errorf("failed to subscribe to seeclient") } - connectedChan := make(chan bool) - go func() { defer func() { e.clearResourceSubscriptions() e.clearTransactionSubscriptions() + log.Info("eventStream.listen > stop") }() for { @@ -99,7 +106,7 @@ func (e *eventStream) listen(ctx context.Context) (chan bool, error) { notifyResponse.RawProperties = bytesProperties if notifyResponse.Status == "connected" { - connectedChan <- true + e.setIsConnected(true) continue } if notifyResponse.Status == "disconnected" { @@ -129,13 +136,26 @@ func (e *eventStream) listen(ctx context.Context) (chan bool, error) { } case <-e.disconnectedChan: - connectedChan <- false + log.Info("listen loop <- disconnectedchan") + e.setIsConnected(false) return } } }() - return connectedChan, nil + return nil +} + +func (e *eventStream) GetIsConnected() bool { + e.isConnectedMutex.RLock() + defer e.isConnectedMutex.RUnlock() + return e.isConnected +} + +func (e *eventStream) setIsConnected(isConnected bool) { + e.isConnectedMutex.Lock() + defer e.isConnectedMutex.Unlock() + e.isConnected = isConnected } func (e *eventStream) subscribeTransaction(transId string) chan *EventStreamResponse { diff --git a/responses.go b/responses.go index 52354bb..3ce4b77 100644 --- a/responses.go +++ b/responses.go @@ -32,7 +32,7 @@ type UserProfileResponse struct { type DeviceResponse struct { BaseResponse - Data Devices `json:"data"` + Data []Device `json:"data"` } // LibraryMetaDataResponse is an intermediate struct used when parsing data from the GetLibraryMetaData() call. diff --git a/types.go b/types.go index 96edbaf..b7b1d86 100644 --- a/types.go +++ b/types.go @@ -68,13 +68,12 @@ type UserProfile struct { // Friend is the account data for non-primary account holders designated as friends. type Friend struct { - FirstName string `json:"firstName"` - LastName string `json:"lastName"` - Devices DeviceOrder `json:"devices"` - LastModified int64 `json:"lastModified"` - AdminUser bool `json:"adminUser"` - Email string `json:"email"` - Id string `json:"id"` + FirstName string `json:"firstName"` + LastName string `json:"lastName"` + LastModified int64 `json:"lastModified"` + AdminUser bool `json:"adminUser"` + Email string `json:"email"` + Id string `json:"id"` } // Connectivity is part of the Device data.