From 551716051fda6eeda6b6e888bfd9d2437235643d Mon Sep 17 00:00:00 2001 From: Laurent Le Houerou Date: Sat, 6 Jun 2020 16:23:15 +0400 Subject: [PATCH] starting resource subscribers implementation --- basestation.go | 20 +++++--------- camera.go | 31 ++++++++++------------ cmd/main.go | 2 +- events_stream.go | 69 +++++++++++++++++++++++++++++++++++++++++------- 4 files changed, 82 insertions(+), 40 deletions(-) diff --git a/basestation.go b/basestation.go index 18d8c5d..577f951 100644 --- a/basestation.go +++ b/basestation.go @@ -127,9 +127,7 @@ func (b *Basestation) makeEventStreamRequest(ctx context.Context, payload EventS transId := genTransId() payload.TransId = transId - responseChan := make(chan *EventStreamResponse) - b.eventStream.subscribe(transId, responseChan) - defer b.eventStream.unsubscribe(transId) + responseChan := b.eventStream.subscribeTransaction(transId) // Send the payload to the event stream. if err := b.NotifyEventStream(payload); err != nil { @@ -166,16 +164,12 @@ func (b *Basestation) Subscribe(ctx context.Context) error { if err != nil { return fmt.Errorf("setting up event stream: %v", err) } -forLoop: - for { - select { - case <-ctx.Done(): - return fmt.Errorf("failed to subscribe to the event stream: requesting shutdown") - case connected := <-connectedChan: - if !connected { - return fmt.Errorf("failed to subscribe to the event stream") - } - break forLoop + select { + case <-ctx.Done(): + return fmt.Errorf("failed to subscribe to the event stream: requesting shutdown") + case connected := <-connectedChan: + if !connected { + return fmt.Errorf("failed to subscribe to the event stream") } } diff --git a/camera.go b/camera.go index 06da3b7..f4fff10 100644 --- a/camera.go +++ b/camera.go @@ -154,26 +154,23 @@ func (c *Camera) SetBrightness(ctx context.Context, brightness int) (response *E return b.makeEventStreamRequest(ctx, payload) } -func (c *Camera) EnableMotionAlerts(ctx context.Context, sensitivity int, zones []string) (response *EventStreamResponse, err error) { - payload := EventStreamPayload{ - Action: "set", - Resource: fmt.Sprintf("cameras/%s", c.DeviceId), - PublishResponse: true, - Properties: MotionDetectionProperties{ - BaseDetectionProperties: BaseDetectionProperties{ - Armed: true, - Sensitivity: sensitivity, - Zones: zones, - }, - }, - From: fmt.Sprintf("%s_%s", c.UserId, TransIdPrefix), - To: c.ParentId, - } +func (c *Camera) EnableMotionAlerts(ctx context.Context, sensitivity int, zones []string) error { b := c.arlo.Basestations.Find(c.ParentId) if b == nil { - return nil, fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId) + return fmt.Errorf("basestation (%s) not found for camera (%s)", c.ParentId, c.DeviceId) } - return b.makeEventStreamRequest(ctx, payload) + err := b.makeRequest(ctx, "set", fmt.Sprintf("cameras/%s", c.DeviceId), true, MotionDetectionProperties{ + BaseDetectionProperties: BaseDetectionProperties{ + Armed: true, + Sensitivity: sensitivity, + Zones: zones, + }, + }, nil) + + if err != nil { + return err + } + return nil } func (c *Camera) DisableMotionAlerts(ctx context.Context, sensitivity int, zones []string) (response *EventStreamResponse, err error) { diff --git a/cmd/main.go b/cmd/main.go index c8b1951..54a08f3 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -18,7 +18,7 @@ func main() { for _, device := range a.Cameras { log.Infof("%s", device.DeviceName) - err := device.On(ctx) + err := device.EnableMotionAlerts(ctx, 70, nil) if err != nil { log.Errorf("setting camera %s on: %v", device.DeviceName, err) } diff --git a/events_stream.go b/events_stream.go index 334f741..9aee1cd 100644 --- a/events_stream.go +++ b/events_stream.go @@ -22,6 +22,9 @@ type eventStream struct { transactionSubscribers map[string]chan *EventStreamResponse transactionSubscribersMutex sync.RWMutex + + resourceSubscribers map[string][]chan *EventStreamResponse + resourceSubscribersMutex sync.RWMutex } func newEventStream(url string, client *http.Client) *eventStream { @@ -29,6 +32,8 @@ func newEventStream(url string, client *http.Client) *eventStream { Events: make(chan *sse.Event), transactionSubscribers: make(map[string]chan *EventStreamResponse), transactionSubscribersMutex: sync.RWMutex{}, + resourceSubscribers: make(map[string][]chan *EventStreamResponse), + resourceSubscribersMutex: sync.RWMutex{}, disconnectedChan: make(chan interface{}), once: new(sync.Once), } @@ -58,6 +63,11 @@ func (e *eventStream) listen(ctx context.Context) (chan bool, error) { connectedChan := make(chan bool) go func() { + defer func() { + e.clearResourceSubscriptions() + e.clearTransactionSubscriptions() + }() + for { select { case <-ctx.Done(): @@ -106,6 +116,16 @@ func (e *eventStream) listen(ctx context.Context) (chan bool, error) { e.transactionSubscribersMutex.RUnlock() if ok { subscriber <- ¬ifyResponse + e.removeTransactionSubscription(notifyResponse.TransId) + } + + e.resourceSubscribersMutex.RLock() + subscribers, ok := e.resourceSubscribers[notifyResponse.Resource] + e.resourceSubscribersMutex.RUnlock() + if ok { + for _, s := range subscribers { + s <- ¬ifyResponse + } } case <-e.disconnectedChan: @@ -118,17 +138,48 @@ func (e *eventStream) listen(ctx context.Context) (chan bool, error) { return connectedChan, nil } -func (e *eventStream) unsubscribe(transId string) { +func (e *eventStream) subscribeTransaction(transId string) chan *EventStreamResponse { + msgchan := make(chan *EventStreamResponse) e.transactionSubscribersMutex.Lock() - if c, ok := e.transactionSubscribers[transId]; ok { - close(c) - delete(e.transactionSubscribers, transId) - } + e.transactionSubscribers[transId] = msgchan e.transactionSubscribersMutex.Unlock() + return msgchan } -func (e *eventStream) subscribe(transId string, subscriber chan *EventStreamResponse) { - e.transactionSubscribersMutex.Lock() - e.transactionSubscribers[transId] = subscriber - e.transactionSubscribersMutex.Unlock() +func (e *eventStream) subscribeResource(resource string) chan *EventStreamResponse { + msgchan := make(chan *EventStreamResponse) + e.resourceSubscribersMutex.Lock() + e.resourceSubscribers[resource] = append(e.resourceSubscribers[resource], msgchan) + e.resourceSubscribersMutex.Unlock() + return msgchan +} + +func (e *eventStream) removeTransactionSubscription(transId string) { + e.transactionSubscribersMutex.Lock() + defer e.transactionSubscribersMutex.Unlock() + subscriber, ok := e.transactionSubscribers[transId] + if ok { + close(subscriber) + delete(e.transactionSubscribers, transId) + } +} + +func (e *eventStream) clearResourceSubscriptions() { + e.resourceSubscribersMutex.Lock() + defer e.resourceSubscribersMutex.Unlock() + for _, subscribers := range e.resourceSubscribers { + for _, subscriber := range subscribers { + close(subscriber) + } + } + e.resourceSubscribers = make(map[string][]chan *EventStreamResponse) +} + +func (e *eventStream) clearTransactionSubscriptions() { + e.transactionSubscribersMutex.Lock() + defer e.transactionSubscribersMutex.Unlock() + for _, subscriber := range e.transactionSubscribers { + close(subscriber) + } + e.transactionSubscribers = make(map[string]chan *EventStreamResponse) }