From 011447ef60b093876d6a7a9659b917008e357e7d Mon Sep 17 00:00:00 2001 From: Jeff Walter Date: Tue, 18 Sep 2018 13:02:21 -0500 Subject: [PATCH] EventSteam working, still needs cleanup. --- Gopkg.lock | 57 ++++++++++++++++++++++++++-- Gopkg.toml | 4 ++ account.go | 17 +++++---- basestation.go | 67 +++++++++++++++++++++------------ devices.go | 12 ++++-- events_stream.go | 73 +++++++----------------------------- internal/request/client.go | 8 ++-- internal/request/response.go | 1 - types.go | 2 +- 9 files changed, 137 insertions(+), 104 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index dded0ae..b3e1d27 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -2,20 +2,71 @@ [[projects]] + digest = "1:7365acd48986e205ccb8652cc746f09c8b7876030d53710ea6ef7d0bd0dcd7ca" name = "github.com/pkg/errors" packages = ["."] + pruneopts = "" revision = "645ef00459ed84a119197bfb8d8205042c6df63d" version = "v0.8.0" [[projects]] + branch = "master" + digest = "1:63eb8e7863bcc41b162d76c3e1b5dd4991fd01f29585017b3a8bab7a5928edd3" name = "github.com/r3labs/sse" packages = ["."] - revision = "ab73c814bbdece537f16e92302cd99d1618d0e0d" - version = "1.0.1" + pruneopts = "" + revision = "ad82e5b42970ce737f0ce61490a1607331299496" + +[[projects]] + branch = "master" + digest = "1:fbdbb6cf8db3278412c9425ad78b26bb8eb788181f26a3ffb3e4f216b314f86a" + name = "golang.org/x/net" + packages = [ + "context", + "idna", + "publicsuffix", + ] + pruneopts = "" + revision = "26e67e76b6c3f6ce91f7c52def5af501b4e0f3a2" + +[[projects]] + digest = "1:5acd3512b047305d49e8763eef7ba423901e85d5dd2fd1e71778a0ea8de10bd4" + name = "golang.org/x/text" + packages = [ + "collate", + "collate/build", + "internal/colltab", + "internal/gen", + "internal/tag", + "internal/triegen", + "internal/ucd", + "language", + "secure/bidirule", + "transform", + "unicode/bidi", + "unicode/cldr", + "unicode/norm", + "unicode/rangetable", + ] + pruneopts = "" + revision = "f21a4dfb5e38f5895301dc265a8def02365cc3d0" + version = "v0.3.0" + +[[projects]] + digest = "1:1cf61f0228e64d1bfddaa42bfd1f5047ed3626925b9bea69bbcb9168472dae23" + name = "gopkg.in/cenkalti/backoff.v1" + packages = ["."] + pruneopts = "" + revision = "61153c768f31ee5f130071d08fc82b85208528de" + version = "v1.1.0" [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "559855ebed7f1c0bf0bea2b6f750822d2eb67595d23a2f9006a8302a22b74e47" + input-imports = [ + "github.com/pkg/errors", + "github.com/r3labs/sse", + "golang.org/x/net/publicsuffix", + ] solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index acda3d6..22a96b7 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -28,3 +28,7 @@ [[constraint]] name = "github.com/pkg/errors" version = "0.8.0" + +[[constraint]] + branch = "master" + name = "github.com/r3labs/sse" diff --git a/account.go b/account.go index 85cd6b7..0adf1d9 100644 --- a/account.go +++ b/account.go @@ -2,7 +2,6 @@ package arlo_golang import ( "fmt" - "log" "math" "math/rand" "strconv" @@ -58,7 +57,6 @@ func Login(user string, pass string) (*Arlo, error) { body := map[string]string{"email": a.user, "password": a.pass} resp, err := a.client.Post(LoginUri, body, nil) - if err != nil { return nil, errors.WithMessage(err, "login request failed") } @@ -72,6 +70,12 @@ func Login(user string, pass string) (*Arlo, error) { // Cache the auth token. a.client.BaseHttpHeader.Add("Authorization", loginResponse.Data.Token) + // Add other important headers. + a.client.BaseHttpHeader.Add("DNT", "1") + a.client.BaseHttpHeader.Add("schemaVersion", "1") + a.client.BaseHttpHeader.Add("Host", "arlo.netgear.com") + a.client.BaseHttpHeader.Add("Referer", "https://arlo.netgear.com/") + // Save the account info with the Arlo struct. a.Account = loginResponse.Data @@ -83,17 +87,16 @@ func Login(user string, pass string) (*Arlo, error) { } // Set the XCloudId header for future requests. You can override this on a per-request basis if needed. - a.client.BaseHttpHeader.Add("xCloudId", deviceResponse.Data[0].XCloudId) + a.client.BaseHttpHeader.Add("xcloudId", deviceResponse.Data[0].XCloudId) // Cache the devices as their respective types. a.Cameras = deviceResponse.Data.GetCameras() a.Basestations = deviceResponse.Data.GetBasestations() // Connect each basestation to the EventStream. for i := range a.Basestations { - a.Basestations[i].connect(a) + a.Basestations[i].arlo = a + a.Basestations[i].Subscribe() } - - log.Printf("HERE: %v", util.PrettyPrint(a.Basestations)) } } else { return nil, errors.New("failed to login") @@ -105,7 +108,6 @@ func Login(user string, pass string) (*Arlo, error) { func (a *Arlo) Logout() (*Status, error) { resp, err := a.client.Put(LogoutUri, nil, nil) - if err != nil { return nil, errors.WithMessage(err, "logout request failed") } @@ -123,7 +125,6 @@ func (a *Arlo) UpdateProfile(firstName, lastName string) (*Status, error) { body := map[string]string{"firstName": firstName, "lastName": lastName} resp, err := a.client.Put(UserProfileUri, body, nil) - if err != nil { return nil, errors.WithMessage(err, "failed to update profile") } diff --git a/basestation.go b/basestation.go index 50b2452..5eb0ab5 100644 --- a/basestation.go +++ b/basestation.go @@ -1,9 +1,7 @@ package arlo_golang import ( - "encoding/json" "fmt" - "time" "github.com/jeffreydwalter/arlo-golang/internal/util" "github.com/pkg/errors" @@ -38,14 +36,39 @@ type BaseStationMetadata struct { type Basestation struct { Device eventStream *EventStream + arlo *Arlo } // Basestations is an array of Basestation objects. type Basestations []Basestation -func (b *Basestation) connect(a *Arlo) { - b.eventStream = NewEventStream(BaseUrl+fmt.Sprintf(SubscribeUri, a.Account.Token), util.HeaderToMap(*a.client.BaseHttpHeader)) +func (b *Basestation) Subscribe() (*Status, error) { + b.eventStream = NewEventStream(BaseUrl+fmt.Sprintf(SubscribeUri, b.arlo.Account.Token), b.arlo.client.HttpClient, util.HeaderToMap(*b.arlo.client.BaseHttpHeader)) b.eventStream.Listen() + + transId := GenTransId() + + body := NotifyPayload{ + Action: "set", + Resource: fmt.Sprintf("subscriptions/%s_%s", b.UserId, "web"), + PublishResponse: false, + Properties: map[string][]string{"devices": []string{b.DeviceId}}, + TransId: transId, + From: fmt.Sprintf("%s_%s", b.UserId, TransIdPrefix), + To: b.DeviceId, + } + + resp, err := b.arlo.client.Post(fmt.Sprintf(NotifyUri, b.DeviceId), body, nil) + if err != nil { + return nil, errors.WithMessage(err, "failed to subscribe to the event stream") + } + + var status Status + if err := resp.Decode(&status); err != nil { + return nil, err + } + + return &status, nil } /* @@ -64,7 +87,7 @@ func (b *Basestation) connect(a *Arlo) { "id":"XXX-XXXXXXX" } */ -func (a *Arlo) GetBasestationState(b Basestation) (*NotifyResponse, error) { +func (b *Basestation) GetState() (*NotifyResponse, error) { transId := GenTransId() @@ -72,37 +95,31 @@ func (a *Arlo) GetBasestationState(b Basestation) (*NotifyResponse, error) { Action: "get", Resource: "basestation", PublishResponse: false, - Properties: map[string]string{}, TransId: transId, From: fmt.Sprintf("%s_%s", b.UserId, TransIdPrefix), To: b.DeviceId, } + //fmt.Printf("BODY: %+v\n", body) + //fmt.Printf("HEADERS: %+v\n", a.client.BaseHttpHeader) + + fmt.Println("Subscribing to the eventstream.") b.eventStream.Subscriptions[transId] = new(Subscriber) - for b.eventStream.Connected == false { - fmt.Println("Not connected yet.") - time.Sleep(1000 * time.Millisecond) - } - fmt.Println("Connected now.") - - resp, err := a.client.Post(fmt.Sprintf(NotifyUri, b.DeviceId), body, nil) + resp, err := b.arlo.client.Post(fmt.Sprintf(NotifyUri, b.DeviceId), body, nil) if err != nil { - return nil, errors.WithMessage(err, "failed to start stream") + return nil, errors.WithMessage(err, "failed to get basestation state") } - ep := &NotifyResponse{} - err = json.NewDecoder(resp.Body).Decode(ep) - if err != nil { - return nil, errors.WithMessage(err, "failed to decode body") + var status Status + if err := resp.Decode(&status); err != nil { + return nil, err } - for { - fmt.Println("Subscribing to the eventstream.") - select { - case notifyResponse := <-*b.eventStream.Subscriptions[transId]: - fmt.Println("Recieved a response from the subscription.") - return ¬ifyResponse, nil - } + if !status.Success { + return nil, errors.New("failed to get basestation status") } + + notifyResponse := <-*b.eventStream.Subscriptions[transId] + return ¬ifyResponse, nil } diff --git a/devices.go b/devices.go index a4aa532..62fdc8f 100644 --- a/devices.go +++ b/devices.go @@ -111,10 +111,10 @@ func (ds *Devices) GetCameras() Cameras { func (a *Arlo) GetDevices() (*DeviceResponse, error) { resp, err := a.client.Get(DevicesUri, nil) - if err != nil { return nil, errors.WithMessage(err, "failed to get devices") } + defer resp.Body.Close() var deviceResponse DeviceResponse if err := resp.Decode(&deviceResponse); err != nil { @@ -132,11 +132,12 @@ func (a *Arlo) GetDevices() (*DeviceResponse, error) { func (a *Arlo) UpdateDeviceName(d Device, name string) (*Status, error) { body := map[string]string{"deviceId": d.DeviceId, "deviceName": name, "parentId": d.ParentId} - resp, err := a.client.Put(DeviceRenameUri, body, nil) + resp, err := a.client.Put(DeviceRenameUri, body, nil) if err != nil { return nil, errors.WithMessage(err, "failed to update device name") } + defer resp.Body.Close() var status Status if err := resp.Decode(&status); err != nil { @@ -155,6 +156,7 @@ func (a *Arlo) UpdateDisplayOrder(d DeviceOrder) (*Status, error) { if err != nil { return nil, errors.WithMessage(err, "failed to update display order") } + defer resp.Body.Close() var status Status if err := resp.Decode(&status); err != nil { @@ -185,10 +187,10 @@ func (a *Arlo) StartStream(c Camera) (*StreamResponse, error) { } resp, err := a.client.Post(DeviceStartStreamUri, body, nil) - if err != nil { return nil, errors.WithMessage(err, "failed to start stream") } + defer resp.Body.Close() var streamResponse StreamResponse if err := resp.Decode(&streamResponse); err != nil { @@ -209,10 +211,12 @@ func (a *Arlo) TakeSnapshot(c Camera) (*StreamResponse, error) { } body := map[string]string{"deviceId": c.DeviceId, "parentId": c.ParentId, "xcloudId": c.XCloudId, "olsonTimeZone": c.Properties.OlsonTimeZone} + resp, err := a.client.Post(DeviceTakeSnapshotUri, body, nil) if err != nil { return nil, errors.WithMessage(err, "failed to take snapshot") } + defer resp.Body.Close() var status Status if err := resp.Decode(&status); err != nil { @@ -233,10 +237,12 @@ func (a *Arlo) StartRecording(c Camera) (*StreamResponse, error) { } body := map[string]string{"deviceId": c.DeviceId, "parentId": c.ParentId, "xcloudId": c.XCloudId, "olsonTimeZone": c.Properties.OlsonTimeZone} + resp, err := a.client.Post(DeviceStartRecordUri, body, nil) if err != nil { return nil, errors.WithMessage(err, "failed to start recording") } + defer resp.Body.Close() var status Status if err := resp.Decode(&status); err != nil { diff --git a/events_stream.go b/events_stream.go index 4e52047..f002ee5 100644 --- a/events_stream.go +++ b/events_stream.go @@ -4,7 +4,7 @@ import ( "bytes" "encoding/json" "fmt" - "log" + "net/http" "sync" "github.com/pkg/errors" @@ -12,11 +12,11 @@ import ( "github.com/r3labs/sse" ) -var FAILED_TO_PUBLISH = errors.New("Failed to publish") - -var FAILED_TO_DECODE_JSON = errors.New("Failed to decode JSON") - -var FAILED_TO_SUBSCRIBE = errors.New("Failed to subscribe to SSEClient") +var ( + FAILED_TO_PUBLISH = errors.New("Failed to publish") + FAILED_TO_DECODE_JSON = errors.New("Failed to decode JSON") + FAILED_TO_SUBSCRIBE = errors.New("Failed to subscribe to SSEClient") +) type Subscriber chan NotifyResponse @@ -33,9 +33,10 @@ type EventStream struct { sync.Mutex } -func NewEventStream(url string, headers map[string]string) *EventStream { +func NewEventStream(url string, client *http.Client, headers map[string]string) *EventStream { SSEClient := sse.NewClient(url) + SSEClient.Connection = client SSEClient.Headers = headers return &EventStream{ @@ -49,35 +50,13 @@ func NewEventStream(url string, headers map[string]string) *EventStream { func (e *EventStream) Listen() { go func() { - err := e.SSEClient.SubscribeChan("", e.Events) + err := e.SSEClient.SubscribeChanRaw(e.Events) if err != nil { fmt.Println(FAILED_TO_SUBSCRIBE) e.ErrorChan <- FAILED_TO_SUBSCRIBE } }() - for event := range e.Events { - fmt.Println("Got event message here.") - fmt.Printf("EVENT: %s\n", event.Event) - fmt.Printf("DATA: %s\n", event.Data) - - if event.Data != nil { - notifyResponse := &NotifyResponse{} - b := bytes.NewBuffer(event.Data) - err := json.NewDecoder(b).Decode(notifyResponse) - if err != nil { - e.ErrorChan <- errors.WithMessage(err, "failed to decode JSON") - break - } - - if notifyResponse.Status == "connected" { - e.Connected = true - fmt.Println("Connected.") - break - } - } - } - go func() { for event := range e.Events { fmt.Println("Got event message.") @@ -93,6 +72,7 @@ func (e *EventStream) Listen() { break } + fmt.Printf("%s\n", notifyResponse) if notifyResponse.Status == "connected" { fmt.Println("Connected.") e.Connected = true @@ -100,7 +80,7 @@ func (e *EventStream) Listen() { fmt.Println("Disconnected.") e.Connected = false } else { - fmt.Printf("Message for transId: %s", notifyResponse.TransId) + fmt.Printf("Message for transId: %s\n", notifyResponse.TransId) if subscriber, ok := e.Subscriptions[notifyResponse.TransId]; ok { e.Lock() *subscriber <- *notifyResponse @@ -112,36 +92,9 @@ func (e *EventStream) Listen() { fmt.Println("Throwing away message.") } } + } else { + fmt.Printf("Event data was nil.\n") } } }() - /* - go func() { - - fmt.Println("go func to recieve a subscription.") - for { - fmt.Println("go func for loop to recieve a subscription.") - select { - case s := <-e.Subscriptions: - if resp, ok := e.Responses[s.transId]; ok { - fmt.Println("Recieved a subscription, sending response.") - s.ResponseChan <- resp - e.Lock() - delete(e.Responses, s.transId) - e.Unlock() - } else { - fmt.Println("Recieved a subscription error, sending error response.") - e.ErrorChan <- FAILED_TO_PUBLISH - break - } - } - } - }() - */ -} - -func (e *EventStream) verbose(params ...interface{}) { - if e.Verbose { - log.Println(params...) - } } diff --git a/internal/request/client.go b/internal/request/client.go index 20446a4..7bd5605 100644 --- a/internal/request/client.go +++ b/internal/request/client.go @@ -9,6 +9,8 @@ import ( "net/url" "github.com/pkg/errors" + + "golang.org/x/net/publicsuffix" ) type Client struct { @@ -21,7 +23,7 @@ func NewClient(baseurl string) (*Client, error) { var err error var jar *cookiejar.Jar - options := cookiejar.Options{} + options := cookiejar.Options{PublicSuffixList: publicsuffix.List} if jar, err = cookiejar.New(&options); err != nil { return nil, errors.Wrap(err, "failed to create client object") @@ -33,7 +35,7 @@ func NewClient(baseurl string) (*Client, error) { } header := make(http.Header) - header.Add("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36") + header.Add("User-Agent", "Mozilla/5.0 (iPhone; CPU iPhone OS 11_1_2 like Mac OS X) AppleWebKit/604.3.5 (KHTML, like Gecko) Mobile/15B202 NETGEAR/v1 (iOS Vuezone)") header.Add("Content-Type", "application/json") header.Add("Accept", "application/json") @@ -78,7 +80,7 @@ func (c *Client) newRequest(method string, uri string, body interface{}, header return nil, errors.Wrap(err, "failed to create request object") } } - // log.Printf("JSON: %v", buf) + u := c.BaseURL.String() + uri req, err := http.NewRequest(method, u, buf) if err != nil { diff --git a/internal/request/response.go b/internal/request/response.go index 7ed3c25..35a807f 100644 --- a/internal/request/response.go +++ b/internal/request/response.go @@ -19,7 +19,6 @@ type Response struct { func (resp *Response) GetContentType() (string, error) { mediaType, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) - if err != nil { return "", errors.Wrap(err, "failed to get content type") } diff --git a/types.go b/types.go index bcf1478..4805aea 100644 --- a/types.go +++ b/types.go @@ -53,7 +53,7 @@ type StreamUrl struct { type NotifyPayload struct { Action string `json:"action,omitempty"` Resource string `json:"resource,omitempty"` - PublishResponse bool `json:"publishResponse,omitempty"` + PublishResponse bool `json:"publishResponse"` Properties interface{} `json:"properties,omitempty"` TransId string `json:"transId"` From string `json:"from"`