353 lines
8.4 KiB
Go
353 lines
8.4 KiB
Go
package arlo
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"time"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
"github.com/go-resty/resty/v2"
|
|
)
|
|
|
|
type Arlo struct {
|
|
client *resty.Client
|
|
eventStream *eventStream
|
|
|
|
Account Account
|
|
Basestations []*Basestation
|
|
Cameras []*Camera
|
|
}
|
|
|
|
func NewArlo() (arlo *Arlo) {
|
|
|
|
c := resty.New().
|
|
SetHostURL(BaseUrl).
|
|
SetTimeout(30 * time.Second)
|
|
|
|
return &Arlo{
|
|
client: c,
|
|
}
|
|
}
|
|
|
|
func (a *Arlo) getBaseRequest(body interface{}, result interface{}, errResult interface{}, xcloudId string) *resty.Request {
|
|
r := a.client.R().
|
|
SetBody(body).
|
|
SetResult(result).
|
|
SetError(errResult)
|
|
if xcloudId != "" {
|
|
r.SetHeader("xcloudId", xcloudId)
|
|
}
|
|
return r
|
|
}
|
|
|
|
func (a *Arlo) get(url string, result interface{}) error {
|
|
var errorResponse ErrorResponse
|
|
resp, err := a.getBaseRequest(nil, result, &errorResponse, "").
|
|
Get(url)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if resp.IsError() {
|
|
return fmt.Errorf(errorResponse.Reason)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a *Arlo) post(url string, body interface{}, result interface{}, xcloudId string) error {
|
|
var errorResponse ErrorResponse
|
|
request := a.getBaseRequest(body, result, &errorResponse, xcloudId)
|
|
resp, err := request.
|
|
Post(url)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if resp.IsError() {
|
|
return fmt.Errorf(errorResponse.Reason)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a *Arlo) put(url string, result interface{}, xcloudId string) error {
|
|
var errorResponse ErrorResponse
|
|
resp, err := a.getBaseRequest(nil, result, &errorResponse, xcloudId).
|
|
Put(url)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if resp.IsError() {
|
|
return fmt.Errorf(errorResponse.Reason)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a *Arlo) Login(ctx context.Context, user string, pass string) error {
|
|
var loginResponse LoginResponse
|
|
err := a.post(LoginV2Uri, map[string]string{
|
|
"email": user,
|
|
"password": pass,
|
|
}, &loginResponse, "")
|
|
if err != nil {
|
|
return fmt.Errorf("posting login request: %v", err)
|
|
}
|
|
if !loginResponse.Success {
|
|
return fmt.Errorf("no success but no error")
|
|
}
|
|
a.Account = loginResponse.Data
|
|
a.client.SetHeader("Authorization", a.Account.Token)
|
|
|
|
var response DeviceResponse
|
|
err = a.get(fmt.Sprintf(DevicesUri, time.Now().Format("20060102")), &response)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !response.Success {
|
|
return fmt.Errorf("no success but no error")
|
|
}
|
|
if len(response.Data) == 0 {
|
|
return fmt.Errorf("no device found")
|
|
}
|
|
|
|
a.Basestations = nil
|
|
a.Cameras = nil
|
|
for _, device := range response.Data {
|
|
if device.IsBasestation() {
|
|
a.Basestations = append(a.Basestations, NewBaseStation(a, device))
|
|
continue
|
|
}
|
|
if device.IsCamera() {
|
|
a.Cameras = append(a.Cameras, NewCamera(a, device))
|
|
}
|
|
}
|
|
|
|
if err := a.Subscribe(ctx); err != nil {
|
|
return fmt.Errorf("subscribing to event stream: %v", err)
|
|
}
|
|
|
|
go func(ctx context.Context) {
|
|
ticker := time.NewTicker(pingTime)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case _ = <-ticker.C:
|
|
if err := a.Ping(ctx); err != nil {
|
|
log.Errorf("Pingloop > error while pinging: %v > disconnect event stream", err)
|
|
_ = a.Disconnect()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}(ctx)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (a *Arlo) Logout() error {
|
|
|
|
var response BaseResponse
|
|
err := a.put(LogoutUri, &response, "")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !response.Success {
|
|
return fmt.Errorf("no success but no error")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a *Arlo) GetSession() (Session, error) {
|
|
var response SessionResponse
|
|
err := a.get(SessionUri, &response)
|
|
if err != nil {
|
|
return Session{}, err
|
|
}
|
|
if !response.Success {
|
|
return Session{}, fmt.Errorf("no success but no error")
|
|
}
|
|
return response.Data, nil
|
|
}
|
|
|
|
// GetProfile returns the user profile for the currently logged in user.
|
|
func (a *Arlo) GetProfile() (*UserProfile, error) {
|
|
var response UserProfileResponse
|
|
err := a.get(ProfileUri, &response)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !response.Success {
|
|
return nil, fmt.Errorf("no success but no error")
|
|
}
|
|
return &response.Data, nil
|
|
}
|
|
|
|
func (a *Arlo) GetBaseStation(deviceId string) *Basestation {
|
|
for _, basestation := range a.Basestations {
|
|
if basestation.DeviceId == deviceId {
|
|
return basestation
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a *Arlo) makeEventStreamRequest(ctx context.Context, payload EventStreamPayload, xcloudid string) (*EventStreamResponse, error) {
|
|
|
|
if !a.IsConnected() {
|
|
log.Infof("event stream not connected: reconnecting")
|
|
err := a.Subscribe(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("reconnecting to event stream: %v", err)
|
|
}
|
|
}
|
|
|
|
transId := genTransId()
|
|
payload.TransId = transId
|
|
|
|
responseChan := a.eventStream.subscribeTransaction(transId)
|
|
|
|
// Send the payload to the event stream.
|
|
if err := a.NotifyEventStream(payload, xcloudid); err != nil {
|
|
return nil, fmt.Errorf("notifying event stream: %v", err)
|
|
}
|
|
timer := time.NewTimer(eventStreamTimeout)
|
|
|
|
select {
|
|
case response := <-responseChan:
|
|
return response, nil
|
|
case err := <-a.eventStream.Error:
|
|
return nil, fmt.Errorf("event stream error: %v", err)
|
|
case <-a.eventStream.disconnectedChan:
|
|
log.Warn("event stream was closed before response was read")
|
|
return a.makeEventStreamRequest(ctx, payload, xcloudid)
|
|
case <-timer.C:
|
|
return nil, fmt.Errorf("event stream response timed out after %.0f second", eventStreamTimeout.Seconds())
|
|
}
|
|
}
|
|
|
|
func (a *Arlo) IsConnected() bool {
|
|
select {
|
|
case <-a.eventStream.disconnectedChan:
|
|
return false
|
|
default:
|
|
return true
|
|
}
|
|
}
|
|
|
|
func (a *Arlo) Subscribe(ctx context.Context) error {
|
|
a.eventStream = newEventStream(
|
|
BaseUrl+fmt.Sprintf(NotifyResponsesPushServiceUri),
|
|
&http.Client{Jar: a.client.GetClient().Jar},
|
|
a.client.Header.Get("Authorization"),
|
|
)
|
|
|
|
err := a.eventStream.listen(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("setting up event stream: %v", err)
|
|
}
|
|
|
|
connectedloop:
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return fmt.Errorf("failed to subscribe to the event stream: requesting shutdown")
|
|
case <-a.eventStream.disconnectedChan:
|
|
return fmt.Errorf("failed to subscribe to the event stream: disconnected")
|
|
default:
|
|
if a.eventStream.GetIsConnected() {
|
|
break connectedloop
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
if err := a.Ping(ctx); err != nil {
|
|
_ = a.Disconnect()
|
|
return fmt.Errorf("pingloop > error while pinging: %v > disconnect event stream", err)
|
|
}
|
|
|
|
for _, camera := range a.Cameras {
|
|
camera.subscribeToStateUpdate()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (a *Arlo) Unsubscribe(xcloudid string) error {
|
|
var response BaseResponse
|
|
err := a.put(UnsubscribeUri, &response, xcloudid)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !response.Success {
|
|
return fmt.Errorf("no success but no error")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a *Arlo) Disconnect() error {
|
|
// disconnect channel to stop event stream.
|
|
if a.eventStream != nil {
|
|
a.eventStream.disconnect()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Ping makes a call to the subscriptions endpoint. The Arlo event stream requires this message to be sent every 30s.
|
|
|
|
func (a *Arlo) NotifyEventStream(payload EventStreamPayload, xcloudId string) error {
|
|
var response ErrorResponse
|
|
err := a.post(fmt.Sprintf(NotifyUri, payload.To), payload, &response, xcloudId)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !response.Success {
|
|
if response.Reason != "" {
|
|
return fmt.Errorf(response.Reason)
|
|
} else {
|
|
return fmt.Errorf("no success but no error")
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a *Arlo) makeRequest(ctx context.Context, deviceId string, xcloudid string, action string, resource string, publishResponse bool, properties interface{}, result interface{}) error {
|
|
payload := EventStreamPayload{
|
|
Action: action,
|
|
Resource: resource,
|
|
PublishResponse: publishResponse,
|
|
Properties: properties,
|
|
From: fmt.Sprintf("%s_%s", a.Account.UserId, TransIdPrefix),
|
|
To: deviceId,
|
|
}
|
|
resp, err := a.makeEventStreamRequest(ctx, payload, xcloudid)
|
|
if err != nil {
|
|
return fmt.Errorf("making event stream request: %v", err)
|
|
}
|
|
if result != nil {
|
|
err = json.Unmarshal(resp.RawProperties, result)
|
|
if err != nil {
|
|
return fmt.Errorf("unmarshalling properties: %v", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a *Arlo) Ping(ctx context.Context) error {
|
|
var devices []string
|
|
var xcloudid string
|
|
for _, basestation := range a.Basestations {
|
|
devices = append(devices, basestation.DeviceId)
|
|
xcloudid = basestation.XCloudId
|
|
}
|
|
if len(devices) == 0 {
|
|
return fmt.Errorf("no basestation registered")
|
|
}
|
|
err := a.makeRequest(ctx, devices[0], xcloudid, "set", fmt.Sprintf("subscriptions/%s_%s", a.Account.UserId, TransIdPrefix), false, map[string][]string{"devices": devices}, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|