sorare/subscriptions/wsscheduler.go

117 lines
2.1 KiB
Go

package subscriptions
import (
"context"
"math/rand"
"time"
"github.com/pkg/errors"
)
const (
minDurationSeconds = 3600
maxDurationSeconds = 5400
sleepDuration = 10 * time.Second
maxRetries = 3
)
func scheduleWs[T any](
ctx context.Context,
subscriptionName string,
subscriptionParams string,
debug bool,
) (chan T, error) {
res := make(chan T)
go func() {
defer close(res)
duration := newRandomDuration(minDurationSeconds, maxDurationSeconds)
t := time.NewTicker(duration)
var ws *wsClient[T]
ws, err := getNewWsClientWithRetry[T](
ctx,
subscriptionName,
subscriptionParams,
res,
maxRetries,
debug,
)
if err != nil {
return
}
for {
select {
case <-ctx.Done():
ws.Stop()
<-ws.Done
return
case <-t.C:
newws, err := getNewWsClientWithRetry[T](
ctx,
subscriptionName,
subscriptionParams,
res,
maxRetries,
debug,
)
if err != nil {
return
}
time.Sleep(10 * time.Second)
ws.Stop()
ws = newws
case <-ws.Done:
newws, err := getNewWsClientWithRetry[T](
ctx,
subscriptionName,
subscriptionParams,
res,
maxRetries,
debug,
)
if err != nil {
return
}
time.Sleep(10 * time.Second)
ws.Stop()
ws = newws
}
}
}()
return res, nil
}
func newRandomDuration(minseconds int, maxseconds int) time.Duration {
return time.Duration(rand.Intn(maxseconds-minseconds)+minseconds) * time.Second
}
func getNewWsClientWithRetry[T any](
ctx context.Context,
subscriptionName string,
subscriptionParams string,
resultChan chan T,
maxRetries int,
debug bool,
) (*wsClient[T], error) {
var ws *wsClient[T]
var err error
for i := 0; i < maxRetries; i++ {
ws, err = newWsClient[T](ctx, subscriptionName, subscriptionParams, debug)
if err == nil {
break
}
time.Sleep(time.Duration(i+1) * sleepDuration)
}
if err != nil {
return nil, errors.Wrap(err, "creating new websocket graphQlClient")
}
go func() {
for data := range ws.Data {
resultChan <- data
}
}()
return ws, nil
}