117 lines
2.1 KiB
Go
117 lines
2.1 KiB
Go
package subscriptions
|
|
|
|
import (
|
|
"context"
|
|
"math/rand"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
const (
|
|
minDurationSeconds = 3600
|
|
maxDurationSeconds = 5400
|
|
sleepDuration = 10 * time.Second
|
|
maxRetries = 3
|
|
)
|
|
|
|
func scheduleWs[T any](
|
|
ctx context.Context,
|
|
subscriptionName string,
|
|
subscriptionParams string,
|
|
debug bool,
|
|
) (chan T, error) {
|
|
res := make(chan T)
|
|
go func() {
|
|
defer close(res)
|
|
duration := newRandomDuration(minDurationSeconds, maxDurationSeconds)
|
|
t := time.NewTicker(duration)
|
|
var ws *wsClient[T]
|
|
ws, err := getNewWsClientWithRetry[T](
|
|
ctx,
|
|
subscriptionName,
|
|
subscriptionParams,
|
|
res,
|
|
maxRetries,
|
|
debug,
|
|
)
|
|
if err != nil {
|
|
return
|
|
}
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
ws.Stop()
|
|
<-ws.Done
|
|
return
|
|
|
|
case <-t.C:
|
|
|
|
newws, err := getNewWsClientWithRetry[T](
|
|
ctx,
|
|
subscriptionName,
|
|
subscriptionParams,
|
|
res,
|
|
maxRetries,
|
|
debug,
|
|
)
|
|
if err != nil {
|
|
return
|
|
}
|
|
time.Sleep(10 * time.Second)
|
|
ws.Stop()
|
|
ws = newws
|
|
case <-ws.Done:
|
|
|
|
newws, err := getNewWsClientWithRetry[T](
|
|
ctx,
|
|
subscriptionName,
|
|
subscriptionParams,
|
|
res,
|
|
maxRetries,
|
|
debug,
|
|
)
|
|
if err != nil {
|
|
return
|
|
}
|
|
time.Sleep(10 * time.Second)
|
|
ws.Stop()
|
|
ws = newws
|
|
}
|
|
}
|
|
}()
|
|
return res, nil
|
|
}
|
|
|
|
func newRandomDuration(minseconds int, maxseconds int) time.Duration {
|
|
return time.Duration(rand.Intn(maxseconds-minseconds)+minseconds) * time.Second
|
|
}
|
|
|
|
func getNewWsClientWithRetry[T any](
|
|
ctx context.Context,
|
|
subscriptionName string,
|
|
subscriptionParams string,
|
|
resultChan chan T,
|
|
maxRetries int,
|
|
debug bool,
|
|
) (*wsClient[T], error) {
|
|
var ws *wsClient[T]
|
|
var err error
|
|
for i := 0; i < maxRetries; i++ {
|
|
ws, err = newWsClient[T](ctx, subscriptionName, subscriptionParams, debug)
|
|
if err == nil {
|
|
break
|
|
}
|
|
time.Sleep(time.Duration(i+1) * sleepDuration)
|
|
}
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "creating new websocket graphQlClient")
|
|
}
|
|
go func() {
|
|
for data := range ws.Data {
|
|
resultChan <- data
|
|
}
|
|
}()
|
|
return ws, nil
|
|
}
|