This commit is contained in:
Laurent Le Houerou 2020-05-27 23:03:26 +04:00
parent b58279176c
commit 36735458f8
5 changed files with 160 additions and 149 deletions

View File

@ -114,50 +114,48 @@ func (bs *Basestations) Find(deviceId string) *Basestation {
} }
// makeEventStreamRequest is a helper function sets up a response channel, sends a message to the event stream, and blocks waiting for the response. // makeEventStreamRequest is a helper function sets up a response channel, sends a message to the event stream, and blocks waiting for the response.
func (b *Basestation) makeEventStreamRequest(payload EventStreamPayload) (*EventStreamResponse, error) { func (b *Basestation) makeEventStreamRequest(ctx context.Context, payload EventStreamPayload) (*EventStreamResponse, error) {
if !b.IsConnected() {
log.Infof("event stream not connected: reconnecting")
err := b.Subscribe(ctx)
if err != nil {
return nil, fmt.Errorf("reconnecting to event stream: %v", err)
}
}
transId := genTransId() transId := genTransId()
payload.TransId = transId payload.TransId = transId
if err := b.IsConnected(); err != nil {
return nil, fmt.Errorf("event stream not connected")
}
responseChan := make(chan *EventStreamResponse) responseChan := make(chan *EventStreamResponse)
errorChan := make(chan error) b.eventStream.subscribe(transId, responseChan)
b.eventStream.subscribe(transId, responseChan, errorChan)
defer b.eventStream.unsubscribe(transId) defer b.eventStream.unsubscribe(transId)
// Send the payload to the event stream. // Send the payload to the event stream.
if err := b.NotifyEventStream(payload); err != nil { if err := b.NotifyEventStream(payload); err != nil {
return nil, fmt.Errorf("notifying event stream: %v", err) return nil, fmt.Errorf("notifying event stream: %v", err)
} }
timer := time.NewTimer(eventStreamTimeout) timer := time.NewTimer(eventStreamTimeout)
defer timer.Stop()
// Wait for the response to come back from the event stream on the response channel.
select { select {
// If we get a response, return it to the caller.
case response := <-responseChan: case response := <-responseChan:
return response, nil return response, nil
case err := <-b.eventStream.Error: case err := <-b.eventStream.Error:
return nil, fmt.Errorf("event stream error: %v", err) return nil, fmt.Errorf("event stream error: %v", err)
// If the event stream is closed, return an error about it. case <-b.eventStream.disconnectedChan:
case <-b.eventStream.DisconnectedChan: log.Warn("event stream was closed before response was read")
return nil, fmt.Errorf("event stream was closed before response was read") return b.makeEventStreamRequest(ctx, payload)
// If we timeout, return an error about it.
case <-timer.C: case <-timer.C:
return nil, fmt.Errorf("event stream response timed out after %.0f second", eventStreamTimeout.Seconds()) return nil, fmt.Errorf("event stream response timed out after %.0f second", eventStreamTimeout.Seconds())
} }
} }
func (b *Basestation) IsConnected() error { func (b *Basestation) IsConnected() bool {
// If the event stream is closed, return an error about it.
select { select {
case <-b.eventStream.DisconnectedChan: case <-b.eventStream.disconnectedChan:
return fmt.Errorf("basestation not connected to event stream") return false
default: default:
return nil return true
} }
} }
@ -178,12 +176,10 @@ forLoop:
return fmt.Errorf("failed to subscribe to the event stream") return fmt.Errorf("failed to subscribe to the event stream")
} }
break forLoop break forLoop
case <-b.eventStream.DisconnectedChan:
return fmt.Errorf("failed to subscribe to the event stream: event stream was closed")
} }
} }
if err := b.Ping(); err != nil { if err := b.Ping(ctx); err != nil {
_ = b.Disconnect() _ = b.Disconnect()
return fmt.Errorf("Pingloop > error while pinging: %v > disconnect event stream", err) return fmt.Errorf("Pingloop > error while pinging: %v > disconnect event stream", err)
} }
@ -196,7 +192,7 @@ forLoop:
case <-ctx.Done(): case <-ctx.Done():
return return
case _ = <-ticker.C: case _ = <-ticker.C:
if err := b.Ping(); err != nil { if err := b.Ping(ctx); err != nil {
log.Errorf("Pingloop > error while pinging: %v > disconnect event stream", err) log.Errorf("Pingloop > error while pinging: %v > disconnect event stream", err)
_ = b.Disconnect() _ = b.Disconnect()
return return
@ -246,7 +242,7 @@ func (b *Basestation) NotifyEventStream(payload EventStreamPayload) error {
return nil return nil
} }
func (b *Basestation) makeRequest(action string, resource string, publishResponse bool, properties interface{}, result interface{}) error { func (b *Basestation) makeRequest(ctx context.Context, action string, resource string, publishResponse bool, properties interface{}, result interface{}) error {
payload := EventStreamPayload{ payload := EventStreamPayload{
Action: action, Action: action,
Resource: resource, Resource: resource,
@ -255,7 +251,7 @@ func (b *Basestation) makeRequest(action string, resource string, publishRespons
From: fmt.Sprintf("%s_%s", b.UserId, TransIdPrefix), From: fmt.Sprintf("%s_%s", b.UserId, TransIdPrefix),
To: b.DeviceId, To: b.DeviceId,
} }
resp, err := b.makeEventStreamRequest(payload) resp, err := b.makeEventStreamRequest(ctx, payload)
if err != nil { if err != nil {
return fmt.Errorf("making event stream request: %v", err) return fmt.Errorf("making event stream request: %v", err)
} }
@ -268,44 +264,44 @@ func (b *Basestation) makeRequest(action string, resource string, publishRespons
return nil return nil
} }
func (b *Basestation) Ping() error { func (b *Basestation) Ping(ctx context.Context) error {
err := b.makeRequest("set", fmt.Sprintf("subscriptions/%s_%s", b.UserId, TransIdPrefix), false, map[string][1]string{"devices": {b.DeviceId}}, nil) err := b.makeRequest(ctx, "set", fmt.Sprintf("subscriptions/%s_%s", b.UserId, TransIdPrefix), false, map[string][1]string{"devices": {b.DeviceId}}, nil)
if err != nil { if err != nil {
return fmt.Errorf("getting basestation %s state: %v", b.DeviceName, err) return err
} }
return nil return nil
} }
func (b *Basestation) GetState() (*BaseStationState, error) { func (b *Basestation) GetState(ctx context.Context) (*BaseStationState, error) {
var state BaseStationState var state BaseStationState
err := b.makeRequest("get", "basestation", false, nil, &state) err := b.makeRequest(ctx, "get", "basestation", false, nil, &state)
if err != nil { if err != nil {
return nil, fmt.Errorf("getting basestation %s state: %v", b.DeviceName, err) return nil, fmt.Errorf("getting basestation %s state: %v", b.DeviceName, err)
} }
return &state, nil return &state, nil
} }
func (b *Basestation) GetAllCameraState() ([]CameraState, error) { func (b *Basestation) GetAllCameraState(ctx context.Context) ([]CameraState, error) {
var states []CameraState var states []CameraState
err := b.makeRequest("get", "cameras", false, nil, &states) err := b.makeRequest(ctx, "get", "cameras", false, nil, &states)
if err != nil { if err != nil {
return nil, fmt.Errorf("getting associated cameras state: %v", err) return nil, fmt.Errorf("getting associated cameras state: %v", err)
} }
return states, nil return states, nil
} }
func (b *Basestation) GetRules() ([]Rule, error) { func (b *Basestation) GetRules(ctx context.Context) ([]Rule, error) {
var resp GetRulesResponse var resp GetRulesResponse
err := b.makeRequest("get", "rules", false, nil, &resp) err := b.makeRequest(ctx, "get", "rules", false, nil, &resp)
if err != nil { if err != nil {
return nil, fmt.Errorf("getting rules: %v", err) return nil, fmt.Errorf("getting rules: %v", err)
} }
return resp.Rules, nil return resp.Rules, nil
} }
func (b *Basestation) GetCalendarMode() (*CalendarMode, error) { func (b *Basestation) GetCalendarMode(ctx context.Context) (*CalendarMode, error) {
var calendarMode CalendarMode var calendarMode CalendarMode
err := b.makeRequest("get", "schedule", false, nil, &calendarMode) err := b.makeRequest(ctx, "get", "schedule", false, nil, &calendarMode)
if err != nil { if err != nil {
return nil, fmt.Errorf("getting calendar mode: %v", err) return nil, fmt.Errorf("getting calendar mode: %v", err)
} }
@ -315,9 +311,9 @@ func (b *Basestation) GetCalendarMode() (*CalendarMode, error) {
// SetCalendarMode toggles calendar mode. // SetCalendarMode toggles calendar mode.
// NOTE: The Arlo API seems to disable calendar mode when switching to other modes, if it's enabled. // NOTE: The Arlo API seems to disable calendar mode when switching to other modes, if it's enabled.
// You should probably do the same, although, the UI reflects the switch from calendar mode to say armed mode without explicitly setting calendar mode to inactive. // You should probably do the same, although, the UI reflects the switch from calendar mode to say armed mode without explicitly setting calendar mode to inactive.
func (b *Basestation) SetCalendarMode(active bool) error { func (b *Basestation) SetCalendarMode(ctx context.Context, active bool) error {
resp := make(map[string]bool) resp := make(map[string]bool)
err := b.makeRequest("set", "schedule", true, struct { err := b.makeRequest(ctx, "set", "schedule", true, struct {
Active bool `json:"active"` Active bool `json:"active"`
}{ }{
Active: active, Active: active,
@ -335,18 +331,18 @@ func (b *Basestation) SetCalendarMode(active bool) error {
return nil return nil
} }
func (b *Basestation) GetModes() (*GetModesResponse, error) { func (b *Basestation) GetModes(ctx context.Context) (*GetModesResponse, error) {
var resp GetModesResponse var resp GetModesResponse
err := b.makeRequest("get", "modes", false, nil, &resp) err := b.makeRequest(ctx, "get", "modes", false, nil, &resp)
if err != nil { if err != nil {
return nil, fmt.Errorf("getting modes: %v", err) return nil, fmt.Errorf("getting modes: %v", err)
} }
return &resp, nil return &resp, nil
} }
func (b *Basestation) SetCustomMode(mode string) error { func (b *Basestation) SetCustomMode(ctx context.Context, mode string) error {
resp := make(map[string]string) resp := make(map[string]string)
err := b.makeRequest("set", "modes", true, struct { err := b.makeRequest(ctx, "set", "modes", true, struct {
Active string `json:"active"` Active string `json:"active"`
}{ }{
Active: mode, Active: mode,
@ -364,62 +360,67 @@ func (b *Basestation) SetCustomMode(mode string) error {
return nil return nil
} }
func (b *Basestation) DeleteMode(mode string) error { func (b *Basestation) DeleteMode(ctx context.Context, mode string) error {
err := b.makeRequest("delete", fmt.Sprintf("modes/%s", mode), true, nil, nil) err := b.makeRequest(ctx, "delete", fmt.Sprintf("modes/%s", mode), true, nil, nil)
if err != nil { if err != nil {
return fmt.Errorf("deleting mode %s: %v", mode, err) return fmt.Errorf("deleting mode %s: %v", mode, err)
} }
return nil return nil
} }
func (b *Basestation) Arm() error { func (b *Basestation) Arm(ctx context.Context) error {
err := b.SetCustomMode("mode1") err := b.SetCustomMode(ctx, "mode1")
if err != nil { if err != nil {
return fmt.Errorf("arming (mode1): %v", err) return fmt.Errorf("arming (mode1): %v", err)
} }
return nil return nil
} }
func (b *Basestation) Disarm() error { func (b *Basestation) Disarm(ctx context.Context) error {
err := b.SetCustomMode("mode0") err := b.SetCustomMode(ctx, "mode0")
if err != nil { if err != nil {
return fmt.Errorf("disarming (mode0): %v", err) return fmt.Errorf("disarming (mode0): %v", err)
} }
return nil return nil
} }
func (b *Basestation) SirenOn() (response *EventStreamResponse, err error) { type SetSirenResponse struct {
payload := EventStreamPayload{ SirenState string `json:"sirenState"`
Action: "set", SirenTrigger string `json:"sirenTrigger"`
Resource: "siren", Duration int `json:"duration"`
PublishResponse: true, Timestamp int64 `json:"timestamp"`
Properties: SirenProperties{ }
func (b *Basestation) SirenOn(ctx context.Context) error {
var response SetSirenResponse
err := b.makeRequest(ctx, "set", "siren", true, SirenProperties{
SirenState: "on", SirenState: "on",
Duration: 300, Duration: 300,
Volume: 8, Volume: 8,
Pattern: "alarm", Pattern: "alarm",
}, }, &response)
From: fmt.Sprintf("%s_%s", b.UserId, TransIdPrefix), if err != nil {
To: b.DeviceId, return fmt.Errorf("making request: %v", err)
} }
if response.SirenState != "on" {
return b.makeEventStreamRequest(payload) return fmt.Errorf("siren not on in response")
}
return nil
} }
func (b *Basestation) SirenOff() (response *EventStreamResponse, err error) { func (b *Basestation) SirenOff(ctx context.Context) error {
payload := EventStreamPayload{ var response SetSirenResponse
Action: "set", err := b.makeRequest(ctx, "set", "siren", true, SirenProperties{
Resource: "siren",
PublishResponse: true,
Properties: SirenProperties{
SirenState: "off", SirenState: "off",
Duration: 300, Duration: 300,
Volume: 8, Volume: 8,
Pattern: "alarm", Pattern: "alarm",
}, }, &response)
From: fmt.Sprintf("%s_%s", b.UserId, TransIdPrefix), if err != nil {
To: b.DeviceId, return fmt.Errorf("making request: %v", err)
} }
if response.SirenState != "off" {
return b.makeEventStreamRequest(payload) return fmt.Errorf("siren not off in response")
}
return nil
} }

View File

@ -1,6 +1,7 @@
package arlo package arlo
import ( import (
"context"
"fmt" "fmt"
) )
@ -90,28 +91,23 @@ func (cs *Cameras) Find(deviceId string) *Camera {
return nil return nil
} }
// On turns a camera on; meaning it will detect and record events. func (c *Camera) On(ctx context.Context) error {
func (c *Camera) On() (response *EventStreamResponse, err error) {
payload := EventStreamPayload{
Action: "set",
Resource: fmt.Sprintf("cameras/%s", c.DeviceId),
PublishResponse: true,
Properties: CameraProperties{
PrivacyActive: false,
},
From: fmt.Sprintf("%s_%s", c.UserId, TransIdPrefix),
To: c.ParentId,
}
b := c.arlo.Basestations.Find(c.ParentId) b := c.arlo.Basestations.Find(c.ParentId)
if b == nil { if b == nil {
return nil, fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId) return fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId)
} }
return b.makeEventStreamRequest(payload)
err := b.makeRequest(ctx, "set", fmt.Sprintf("cameras/%s", c.DeviceId), true, CameraProperties{
PrivacyActive: false,
}, nil)
if err != nil {
return err
}
return nil
} }
// On turns a camera off; meaning it won't detect and record events. // On turns a camera off; meaning it won't detect and record events.
func (c *Camera) Off() (response *EventStreamResponse, err error) { func (c *Camera) Off(ctx context.Context) (response *EventStreamResponse, err error) {
payload := EventStreamPayload{ payload := EventStreamPayload{
Action: "set", Action: "set",
Resource: fmt.Sprintf("cameras/%s", c.DeviceId), Resource: fmt.Sprintf("cameras/%s", c.DeviceId),
@ -127,13 +123,13 @@ func (c *Camera) Off() (response *EventStreamResponse, err error) {
if b == nil { if b == nil {
return nil, fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId) return nil, fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId)
} }
return b.makeEventStreamRequest(payload) return b.makeEventStreamRequest(ctx, payload)
} }
// SetBrightness sets the camera brightness. // SetBrightness sets the camera brightness.
// NOTE: Brightness is between -2 and 2 in increments of 1 (-2, -1, 0, 1, 2). // NOTE: Brightness is between -2 and 2 in increments of 1 (-2, -1, 0, 1, 2).
// Setting it to an invalid value has no effect. // Setting it to an invalid value has no effect.
func (c *Camera) SetBrightness(brightness int) (response *EventStreamResponse, err error) { func (c *Camera) SetBrightness(ctx context.Context, brightness int) (response *EventStreamResponse, err error) {
// Sanity check; if the values are above or below the allowed limits, set them to their limit. // Sanity check; if the values are above or below the allowed limits, set them to their limit.
if brightness < -2 { if brightness < -2 {
brightness = -2 brightness = -2
@ -155,10 +151,10 @@ func (c *Camera) SetBrightness(brightness int) (response *EventStreamResponse, e
if b == nil { if b == nil {
return nil, fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId) return nil, fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId)
} }
return b.makeEventStreamRequest(payload) return b.makeEventStreamRequest(ctx, payload)
} }
func (c *Camera) EnableMotionAlerts(sensitivity int, zones []string) (response *EventStreamResponse, err error) { func (c *Camera) EnableMotionAlerts(ctx context.Context, sensitivity int, zones []string) (response *EventStreamResponse, err error) {
payload := EventStreamPayload{ payload := EventStreamPayload{
Action: "set", Action: "set",
Resource: fmt.Sprintf("cameras/%s", c.DeviceId), Resource: fmt.Sprintf("cameras/%s", c.DeviceId),
@ -177,10 +173,10 @@ func (c *Camera) EnableMotionAlerts(sensitivity int, zones []string) (response *
if b == nil { if b == nil {
return nil, fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId) return nil, fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId)
} }
return b.makeEventStreamRequest(payload) return b.makeEventStreamRequest(ctx, payload)
} }
func (c *Camera) DisableMotionAlerts(sensitivity int, zones []string) (response *EventStreamResponse, err error) { func (c *Camera) DisableMotionAlerts(ctx context.Context, sensitivity int, zones []string) (response *EventStreamResponse, err error) {
payload := EventStreamPayload{ payload := EventStreamPayload{
Action: "set", Action: "set",
Resource: fmt.Sprintf("cameras/%s", c.DeviceId), Resource: fmt.Sprintf("cameras/%s", c.DeviceId),
@ -200,10 +196,10 @@ func (c *Camera) DisableMotionAlerts(sensitivity int, zones []string) (response
if b == nil { if b == nil {
return nil, fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId) return nil, fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId)
} }
return b.makeEventStreamRequest(payload) return b.makeEventStreamRequest(ctx, payload)
} }
func (c *Camera) EnableAudioAlerts(sensitivity int) (response *EventStreamResponse, err error) { func (c *Camera) EnableAudioAlerts(ctx context.Context, sensitivity int) (response *EventStreamResponse, err error) {
payload := EventStreamPayload{ payload := EventStreamPayload{
Action: "set", Action: "set",
Resource: fmt.Sprintf("cameras/%s", c.DeviceId), Resource: fmt.Sprintf("cameras/%s", c.DeviceId),
@ -222,10 +218,10 @@ func (c *Camera) EnableAudioAlerts(sensitivity int) (response *EventStreamRespon
if b == nil { if b == nil {
return nil, fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId) return nil, fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId)
} }
return b.makeEventStreamRequest(payload) return b.makeEventStreamRequest(ctx, payload)
} }
func (c *Camera) DisableAudioAlerts(sensitivity int) (response *EventStreamResponse, err error) { func (c *Camera) DisableAudioAlerts(ctx context.Context, sensitivity int) (response *EventStreamResponse, err error) {
payload := EventStreamPayload{ payload := EventStreamPayload{
Action: "set", Action: "set",
Resource: fmt.Sprintf("cameras/%s", c.DeviceId), Resource: fmt.Sprintf("cameras/%s", c.DeviceId),
@ -244,11 +240,11 @@ func (c *Camera) DisableAudioAlerts(sensitivity int) (response *EventStreamRespo
if b == nil { if b == nil {
return nil, fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId) return nil, fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId)
} }
return b.makeEventStreamRequest(payload) return b.makeEventStreamRequest(ctx, payload)
} }
// action: disabled OR recordSnapshot OR recordVideo // action: disabled OR recordSnapshot OR recordVideo
func (c *Camera) SetAlertNotificationMethods(action string, email, push bool) (response *EventStreamResponse, err error) { func (c *Camera) SetAlertNotificationMethods(ctx context.Context, action string, email, push bool) (response *EventStreamResponse, err error) {
payload := EventStreamPayload{ payload := EventStreamPayload{
Action: "set", Action: "set",
Resource: fmt.Sprintf("cameras/%s", c.DeviceId), Resource: fmt.Sprintf("cameras/%s", c.DeviceId),
@ -273,5 +269,5 @@ func (c *Camera) SetAlertNotificationMethods(action string, email, push bool) (r
if b == nil { if b == nil {
return nil, fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId) return nil, fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId)
} }
return b.makeEventStreamRequest(payload) return b.makeEventStreamRequest(ctx, payload)
} }

29
cmd/main.go Normal file
View File

@ -0,0 +1,29 @@
package main
import (
"context"
"git.lehouerou.net/laurent/arlo-go"
log "github.com/sirupsen/logrus"
)
func main() {
ctx := context.Background()
a := arlo.NewArlo()
err := a.Login(ctx, "hass@lehouerou.net", "TiPXMVLUeZfUg6RrmwzK")
if err != nil {
log.Errorf("login: %v", err)
return
}
for _, device := range a.Cameras {
log.Infof("%s", device.DeviceName)
err := device.On(ctx)
if err != nil {
log.Errorf("setting camera %s on: %v", device.DeviceName, err)
}
}
select {}
}

View File

@ -9,7 +9,7 @@ const (
DeviceTypeSiren = "siren" DeviceTypeSiren = "siren"
TransIdPrefix = "web" TransIdPrefix = "web"
BaseUrl = "https://my.arlo.com/hmsweb" BaseUrl = "https://myapi.arlo.com/hmsweb"
// TODO: Implement all of the following urls. There are many here I don't have devices for. :/ // TODO: Implement all of the following urls. There are many here I don't have devices for. :/
ActiveAutomationUri = "/users/devices/automation/active" ActiveAutomationUri = "/users/devices/automation/active"

View File

@ -17,24 +17,19 @@ type eventStream struct {
Events chan *sse.Event Events chan *sse.Event
Error chan error Error chan error
Verbose bool Verbose bool
DisconnectedChan chan interface{} disconnectedChan chan interface{}
once *sync.Once once *sync.Once
subscribers map[string]chan *EventStreamResponse transactionSubscribers map[string]chan *EventStreamResponse
subscribersMutex sync.RWMutex transactionSubscribersMutex sync.RWMutex
errorsubsribers map[string]chan error
errorMutex sync.RWMutex
} }
func newEventStream(url string, client *http.Client) *eventStream { func newEventStream(url string, client *http.Client) *eventStream {
e := &eventStream{ e := &eventStream{
Events: make(chan *sse.Event), Events: make(chan *sse.Event),
subscribers: make(map[string]chan *EventStreamResponse), transactionSubscribers: make(map[string]chan *EventStreamResponse),
subscribersMutex: sync.RWMutex{}, transactionSubscribersMutex: sync.RWMutex{},
errorsubsribers: make(map[string]chan error), disconnectedChan: make(chan interface{}),
errorMutex: sync.RWMutex{},
DisconnectedChan: make(chan interface{}),
once: new(sync.Once), once: new(sync.Once),
} }
@ -49,17 +44,19 @@ func newEventStream(url string, client *http.Client) *eventStream {
func (e *eventStream) disconnect() { func (e *eventStream) disconnect() {
e.once.Do(func() { e.once.Do(func() {
close(e.DisconnectedChan) close(e.disconnectedChan)
}) })
} }
func (e *eventStream) listen(ctx context.Context) (chan bool, error) { func (e *eventStream) listen(ctx context.Context) (chan bool, error) {
connectedChan := make(chan bool)
err := e.SSEClient.SubscribeChanRaw(e.Events) err := e.SSEClient.SubscribeChanRaw(e.Events)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to subscribe to seeclient") return nil, fmt.Errorf("failed to subscribe to seeclient")
} }
connectedChan := make(chan bool)
go func() { go func() {
for { for {
select { select {
@ -104,14 +101,14 @@ func (e *eventStream) listen(ctx context.Context) (chan bool, error) {
e.disconnect() e.disconnect()
continue continue
} }
e.subscribersMutex.RLock() e.transactionSubscribersMutex.RLock()
subscriber, ok := e.subscribers[notifyResponse.TransId] subscriber, ok := e.transactionSubscribers[notifyResponse.TransId]
e.subscribersMutex.RUnlock() e.transactionSubscribersMutex.RUnlock()
if ok { if ok {
subscriber <- &notifyResponse subscriber <- &notifyResponse
} }
case <-e.DisconnectedChan: case <-e.disconnectedChan:
connectedChan <- false connectedChan <- false
return return
} }
@ -122,28 +119,16 @@ func (e *eventStream) listen(ctx context.Context) (chan bool, error) {
} }
func (e *eventStream) unsubscribe(transId string) { func (e *eventStream) unsubscribe(transId string) {
e.subscribersMutex.Lock() e.transactionSubscribersMutex.Lock()
if c, ok := e.subscribers[transId]; ok { if c, ok := e.transactionSubscribers[transId]; ok {
close(c) close(c)
delete(e.subscribers, transId) delete(e.transactionSubscribers, transId)
} }
e.subscribersMutex.Unlock() e.transactionSubscribersMutex.Unlock()
e.errorMutex.Lock()
if c, ok := e.errorsubsribers[transId]; ok {
close(c)
delete(e.errorsubsribers, transId)
}
e.errorMutex.Unlock()
} }
func (e *eventStream) subscribe(transId string, subscriber chan *EventStreamResponse, errorChan chan error) { func (e *eventStream) subscribe(transId string, subscriber chan *EventStreamResponse) {
e.subscribersMutex.Lock() e.transactionSubscribersMutex.Lock()
e.subscribers[transId] = subscriber e.transactionSubscribers[transId] = subscriber
e.subscribersMutex.Unlock() e.transactionSubscribersMutex.Unlock()
e.errorMutex.Lock()
e.errorsubsribers[transId] = errorChan
e.errorMutex.Unlock()
} }