package arlo import ( "context" "encoding/json" "fmt" "net/http" "time" log "github.com/sirupsen/logrus" "github.com/go-resty/resty/v2" ) type Arlo struct { client *resty.Client eventStream *eventStream Account Account Basestations []*Basestation Cameras []*Camera } func NewArlo() (arlo *Arlo) { c := resty.New(). SetHostURL(BaseUrl). SetTimeout(30 * time.Second) return &Arlo{ client: c, } } func (a *Arlo) getBaseRequest(body interface{}, result interface{}, errResult interface{}, xcloudId string) *resty.Request { r := a.client.R(). SetBody(body). SetResult(result). SetError(errResult) if xcloudId != "" { r.SetHeader("xcloudId", xcloudId) } return r } func (a *Arlo) get(url string, result interface{}) error { var errorResponse ErrorResponse resp, err := a.getBaseRequest(nil, result, &errorResponse, ""). Get(url) if err != nil { return err } if resp.IsError() { return fmt.Errorf(errorResponse.Reason) } return nil } func (a *Arlo) post(url string, body interface{}, result interface{}, xcloudId string) error { var errorResponse ErrorResponse request := a.getBaseRequest(body, result, &errorResponse, xcloudId) resp, err := request. Post(url) if err != nil { return err } if resp.IsError() { return fmt.Errorf(errorResponse.Reason) } return nil } func (a *Arlo) put(url string, result interface{}, xcloudId string) error { var errorResponse ErrorResponse resp, err := a.getBaseRequest(nil, result, &errorResponse, xcloudId). Put(url) if err != nil { return err } if resp.IsError() { return fmt.Errorf(errorResponse.Reason) } return nil } func (a *Arlo) Login(ctx context.Context, user string, pass string) error { var loginResponse LoginResponse err := a.post(LoginV2Uri, map[string]string{ "email": user, "password": pass, }, &loginResponse, "") if err != nil { return fmt.Errorf("posting login request: %v", err) } if !loginResponse.Success { return fmt.Errorf("no success but no error") } a.Account = loginResponse.Data a.client.SetHeader("Authorization", a.Account.Token) var response DeviceResponse err = a.get(fmt.Sprintf(DevicesUri, time.Now().Format("20060102")), &response) if err != nil { return err } if !response.Success { return fmt.Errorf("no success but no error") } if len(response.Data) == 0 { return fmt.Errorf("no device found") } a.Basestations = nil a.Cameras = nil for _, device := range response.Data { if device.IsBasestation() { a.Basestations = append(a.Basestations, NewBaseStation(a, device)) continue } if device.IsCamera() { a.Cameras = append(a.Cameras, NewCamera(a, device)) } } if err := a.Subscribe(ctx); err != nil { return fmt.Errorf("subscribing to event stream: %v", err) } go func(ctx context.Context) { ticker := time.NewTicker(pingTime) for { select { case <-ctx.Done(): return case _ = <-ticker.C: if err := a.Ping(ctx); err != nil { log.Errorf("Pingloop > error while pinging: %v > disconnect event stream", err) _ = a.Disconnect() return } } } }(ctx) return nil } func (a *Arlo) Logout() error { var response BaseResponse err := a.put(LogoutUri, &response, "") if err != nil { return err } if !response.Success { return fmt.Errorf("no success but no error") } return nil } func (a *Arlo) GetSession() (Session, error) { var response SessionResponse err := a.get(SessionUri, &response) if err != nil { return Session{}, err } if !response.Success { return Session{}, fmt.Errorf("no success but no error") } return response.Data, nil } // GetProfile returns the user profile for the currently logged in user. func (a *Arlo) GetProfile() (*UserProfile, error) { var response UserProfileResponse err := a.get(ProfileUri, &response) if err != nil { return nil, err } if !response.Success { return nil, fmt.Errorf("no success but no error") } return &response.Data, nil } func (a *Arlo) GetBaseStation(deviceId string) *Basestation { for _, basestation := range a.Basestations { if basestation.DeviceId == deviceId { return basestation } } return nil } func (a *Arlo) makeEventStreamRequest(ctx context.Context, payload EventStreamPayload, xcloudid string) (*EventStreamResponse, error) { if !a.IsConnected() { log.Infof("event stream not connected: reconnecting") err := a.Subscribe(ctx) if err != nil { return nil, fmt.Errorf("reconnecting to event stream: %v", err) } } transId := genTransId() payload.TransId = transId responseChan := a.eventStream.subscribeTransaction(transId) // Send the payload to the event stream. if err := a.NotifyEventStream(payload, xcloudid); err != nil { return nil, fmt.Errorf("notifying event stream: %v", err) } timer := time.NewTimer(eventStreamTimeout) select { case response := <-responseChan: return response, nil case err := <-a.eventStream.Error: return nil, fmt.Errorf("event stream error: %v", err) case <-a.eventStream.disconnectedChan: log.Warn("event stream was closed before response was read") return a.makeEventStreamRequest(ctx, payload, xcloudid) case <-timer.C: return nil, fmt.Errorf("event stream response timed out after %.0f second", eventStreamTimeout.Seconds()) } } func (a *Arlo) IsConnected() bool { select { case <-a.eventStream.disconnectedChan: return false default: return true } } func (a *Arlo) Subscribe(ctx context.Context) error { a.eventStream = newEventStream( BaseUrl+fmt.Sprintf(NotifyResponsesPushServiceUri), &http.Client{Jar: a.client.GetClient().Jar}, a.client.Header.Get("Authorization"), ) err := a.eventStream.listen(ctx) if err != nil { return fmt.Errorf("setting up event stream: %v", err) } connectedloop: for { select { case <-ctx.Done(): return fmt.Errorf("failed to subscribe to the event stream: requesting shutdown") case <-a.eventStream.disconnectedChan: return fmt.Errorf("failed to subscribe to the event stream: disconnected") default: if a.eventStream.GetIsConnected() { break connectedloop } time.Sleep(100 * time.Millisecond) } } if err := a.Ping(ctx); err != nil { _ = a.Disconnect() return fmt.Errorf("pingloop > error while pinging: %v > disconnect event stream", err) } for _, camera := range a.Cameras { camera.subscribeToStateUpdate() } return nil } func (a *Arlo) Unsubscribe(xcloudid string) error { var response BaseResponse err := a.put(UnsubscribeUri, &response, xcloudid) if err != nil { return err } if !response.Success { return fmt.Errorf("no success but no error") } return nil } func (a *Arlo) Disconnect() error { // disconnect channel to stop event stream. if a.eventStream != nil { a.eventStream.disconnect() } return nil } // Ping makes a call to the subscriptions endpoint. The Arlo event stream requires this message to be sent every 30s. func (a *Arlo) NotifyEventStream(payload EventStreamPayload, xcloudId string) error { var response ErrorResponse err := a.post(fmt.Sprintf(NotifyUri, payload.To), payload, &response, xcloudId) if err != nil { return err } if !response.Success { if response.Reason != "" { return fmt.Errorf(response.Reason) } else { return fmt.Errorf("no success but no error") } } return nil } func (a *Arlo) makeRequest(ctx context.Context, deviceId string, xcloudid string, action string, resource string, publishResponse bool, properties interface{}, result interface{}) error { payload := EventStreamPayload{ Action: action, Resource: resource, PublishResponse: publishResponse, Properties: properties, From: fmt.Sprintf("%s_%s", a.Account.UserId, TransIdPrefix), To: deviceId, } resp, err := a.makeEventStreamRequest(ctx, payload, xcloudid) if err != nil { return fmt.Errorf("making event stream request: %v", err) } if result != nil { err = json.Unmarshal(resp.RawProperties, result) if err != nil { return fmt.Errorf("unmarshalling properties: %v", err) } } return nil } func (a *Arlo) Ping(ctx context.Context) error { var devices []string var xcloudid string for _, basestation := range a.Basestations { devices = append(devices, basestation.DeviceId) xcloudid = basestation.XCloudId } if len(devices) == 0 { return fmt.Errorf("no basestation registered") } err := a.makeRequest(ctx, devices[0], xcloudid, "set", fmt.Sprintf("subscriptions/%s_%s", a.Account.UserId, TransIdPrefix), false, map[string][]string{"devices": devices}, nil) if err != nil { return err } return nil }