package subscriptions import ( "context" "math/rand" "time" "github.com/pkg/errors" ) const ( minDurationSeconds = 3600 maxDurationSeconds = 5400 sleepDuration = 10 * time.Second maxRetries = 3 ) func scheduleWs[T any]( ctx context.Context, subscriptionName string, subscriptionParams string, debug bool, ) (chan T, error) { res := make(chan T) go func() { defer close(res) duration := newRandomDuration(minDurationSeconds, maxDurationSeconds) t := time.NewTicker(duration) var ws *wsClient[T] ws, err := getNewWsClientWithRetry[T]( ctx, subscriptionName, subscriptionParams, res, maxRetries, debug, ) if err != nil { return } for { select { case <-ctx.Done(): ws.Stop() <-ws.Done return case <-t.C: newws, err := getNewWsClientWithRetry[T]( ctx, subscriptionName, subscriptionParams, res, maxRetries, debug, ) if err != nil { return } time.Sleep(10 * time.Second) ws.Stop() ws = newws case <-ws.Done: newws, err := getNewWsClientWithRetry[T]( ctx, subscriptionName, subscriptionParams, res, maxRetries, debug, ) if err != nil { return } time.Sleep(10 * time.Second) ws.Stop() ws = newws } } }() return res, nil } func newRandomDuration(minseconds int, maxseconds int) time.Duration { return time.Duration(rand.Intn(maxseconds-minseconds)+minseconds) * time.Second } func getNewWsClientWithRetry[T any]( ctx context.Context, subscriptionName string, subscriptionParams string, resultChan chan T, maxRetries int, debug bool, ) (*wsClient[T], error) { var ws *wsClient[T] var err error for i := 0; i < maxRetries; i++ { ws, err = newWsClient[T](ctx, subscriptionName, subscriptionParams, debug) if err == nil { break } time.Sleep(time.Duration(i+1) * sleepDuration) } if err != nil { return nil, errors.Wrap(err, "creating new websocket graphQlClient") } go func() { for data := range ws.Data { resultChan <- data } }() return ws, nil }