change fixture copyfrom to batchexec

This commit is contained in:
Laurent Le Houerou 2024-03-22 18:50:22 +00:00
parent fea1e96085
commit 9494bbe760
5 changed files with 79 additions and 91 deletions

View File

@ -10,6 +10,7 @@ import (
"errors" "errors"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
) )
var ( var (
@ -171,3 +172,63 @@ func (b *CreateOrUpdateCountriesBatchResults) Close() error {
b.closed = true b.closed = true
return b.br.Close() return b.br.Close()
} }
const createOrUpdateFixtures = `-- name: CreateOrUpdateFixtures :batchexec
INSERT INTO fixtures (slug, display_name, state, start_date, end_date, game_week)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (slug) DO UPDATE
SET display_name = $2, state = $3, start_date = $4, end_date = $5, game_week = $6
`
type CreateOrUpdateFixturesBatchResults struct {
br pgx.BatchResults
tot int
closed bool
}
type CreateOrUpdateFixturesParams struct {
Slug string
DisplayName string
State string
StartDate pgtype.Timestamptz
EndDate pgtype.Timestamptz
GameWeek int32
}
func (q *Queries) CreateOrUpdateFixtures(ctx context.Context, arg []CreateOrUpdateFixturesParams) *CreateOrUpdateFixturesBatchResults {
batch := &pgx.Batch{}
for _, a := range arg {
vals := []interface{}{
a.Slug,
a.DisplayName,
a.State,
a.StartDate,
a.EndDate,
a.GameWeek,
}
batch.Queue(createOrUpdateFixtures, vals...)
}
br := q.db.SendBatch(ctx, batch)
return &CreateOrUpdateFixturesBatchResults{br, len(arg), false}
}
func (b *CreateOrUpdateFixturesBatchResults) Exec(f func(int, error)) {
defer b.br.Close()
for t := 0; t < b.tot; t++ {
if b.closed {
if f != nil {
f(t, ErrBatchAlreadyClosed)
}
continue
}
_, err := b.br.Exec()
if f != nil {
f(t, err)
}
}
}
func (b *CreateOrUpdateFixturesBatchResults) Close() error {
b.closed = true
return b.br.Close()
}

View File

@ -172,41 +172,3 @@ func (r iteratorForBatchInsertTeams) Err() error {
func (q *Queries) BatchInsertTeams(ctx context.Context, arg []BatchInsertTeamsParams) (int64, error) { func (q *Queries) BatchInsertTeams(ctx context.Context, arg []BatchInsertTeamsParams) (int64, error) {
return q.db.CopyFrom(ctx, []string{"teams"}, []string{"slug", "display_name", "country_slug", "domestic_league_slug", "short_name", "picture_url", "team_type"}, &iteratorForBatchInsertTeams{rows: arg}) return q.db.CopyFrom(ctx, []string{"teams"}, []string{"slug", "display_name", "country_slug", "domestic_league_slug", "short_name", "picture_url", "team_type"}, &iteratorForBatchInsertTeams{rows: arg})
} }
// iteratorForCreateFixtures implements pgx.CopyFromSource.
type iteratorForCreateFixtures struct {
rows []CreateFixturesParams
skippedFirstNextCall bool
}
func (r *iteratorForCreateFixtures) Next() bool {
if len(r.rows) == 0 {
return false
}
if !r.skippedFirstNextCall {
r.skippedFirstNextCall = true
return true
}
r.rows = r.rows[1:]
return len(r.rows) > 0
}
func (r iteratorForCreateFixtures) Values() ([]interface{}, error) {
return []interface{}{
r.rows[0].Slug,
r.rows[0].DisplayName,
r.rows[0].State,
r.rows[0].StartDate,
r.rows[0].EndDate,
r.rows[0].GameWeek,
}, nil
}
func (r iteratorForCreateFixtures) Err() error {
return nil
}
// Active: 1709890109198@@192.168.1.250@5436@sorare
func (q *Queries) CreateFixtures(ctx context.Context, arg []CreateFixturesParams) (int64, error) {
return q.db.CopyFrom(ctx, []string{"fixtures"}, []string{"slug", "display_name", "state", "start_date", "end_date", "game_week"}, &iteratorForCreateFixtures{rows: arg})
}

View File

@ -7,47 +7,8 @@ package model
import ( import (
"context" "context"
"github.com/jackc/pgx/v5/pgtype"
) )
type CreateFixturesParams struct {
Slug string
DisplayName string
State string
StartDate pgtype.Timestamptz
EndDate pgtype.Timestamptz
GameWeek int32
}
const createOrUpdateFixture = `-- name: CreateOrUpdateFixture :exec
INSERT INTO fixtures (slug, display_name, state, start_date, end_date, game_week)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (slug) DO UPDATE
SET display_name = $2, state = $3, start_date = $4, end_date = $5, game_week = $6
`
type CreateOrUpdateFixtureParams struct {
Slug string
DisplayName string
State string
StartDate pgtype.Timestamptz
EndDate pgtype.Timestamptz
GameWeek int32
}
func (q *Queries) CreateOrUpdateFixture(ctx context.Context, arg CreateOrUpdateFixtureParams) error {
_, err := q.db.Exec(ctx, createOrUpdateFixture,
arg.Slug,
arg.DisplayName,
arg.State,
arg.StartDate,
arg.EndDate,
arg.GameWeek,
)
return err
}
const getAllFixtures = `-- name: GetAllFixtures :many const getAllFixtures = `-- name: GetAllFixtures :many
SELECT slug, display_name, state, start_date, end_date, game_week FROM fixtures SELECT slug, display_name, state, start_date, end_date, game_week FROM fixtures
` `

View File

@ -1,9 +1,4 @@
-- Active: 1709890109198@@192.168.1.250@5436@sorare -- name: CreateOrUpdateFixtures :batchexec
-- name: CreateFixtures :copyfrom
INSERT INTO fixtures (slug, display_name, state, start_date, end_date, game_week)
VALUES ($1, $2, $3, $4, $5, $6);
-- name: CreateOrUpdateFixture :exec
INSERT INTO fixtures (slug, display_name, state, start_date, end_date, game_week) INSERT INTO fixtures (slug, display_name, state, start_date, end_date, game_week)
VALUES ($1, $2, $3, $4, $5, $6) VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (slug) DO UPDATE ON CONFLICT (slug) DO UPDATE

View File

@ -34,10 +34,10 @@ func (u *UpdateService) InitSyncDatabase(ctx context.Context) error {
} }
log.Debug().Msgf("fixtures: %v", sfixtures) log.Debug().Msgf("fixtures: %v", sfixtures)
cnt, err := u.db.CreateFixtures( batchFixtures := u.db.CreateOrUpdateFixtures(
ctx, ctx,
lo.Map(sfixtures, func(fixture football.So5Fixture, index int) model.CreateFixturesParams { lo.Map(sfixtures, func(fixture football.So5Fixture, index int) model.CreateOrUpdateFixturesParams {
return model.CreateFixturesParams{ return model.CreateOrUpdateFixturesParams{
Slug: fixture.Slug, Slug: fixture.Slug,
DisplayName: fixture.DisplayName, DisplayName: fixture.DisplayName,
State: fixture.AasmState, State: fixture.AasmState,
@ -47,11 +47,19 @@ func (u *UpdateService) InitSyncDatabase(ctx context.Context) error {
} }
}), }),
) )
if err != nil { var batcherr error
batchFixtures.Exec(func(_ int, err error) {
if err != nil {
batcherr = err
batchFixtures.Close()
}
})
if batcherr != nil {
return err return err
} }
log.Debug().Msgf("created %d fixtures", cnt) log.Debug().Msgf("created %d fixtures", len(sfixtures))
fixtures, err := u.db.GetAllFixtures(ctx) fixtures, err := u.db.GetAllFixtures(ctx)
if err != nil { if err != nil {
@ -178,7 +186,7 @@ func (u *UpdateService) InitSyncDatabase(ctx context.Context) error {
} }
}), }),
) )
var batcherr error batcherr = nil
batchCountries.Exec(func(_ int, err error) { batchCountries.Exec(func(_ int, err error) {
if err != nil { if err != nil {
batcherr = err batcherr = err
@ -188,7 +196,7 @@ func (u *UpdateService) InitSyncDatabase(ctx context.Context) error {
if batcherr != nil { if batcherr != nil {
return err return err
} }
log.Debug().Msgf("%d countries inserted", cnt) log.Debug().Msgf("%d countries inserted", len(countries))
log.Debug().Msg("inserting competitions into db...") log.Debug().Msg("inserting competitions into db...")
@ -206,6 +214,7 @@ func (u *UpdateService) InitSyncDatabase(ctx context.Context) error {
} }
}), }),
) )
batcherr = nil
batchCompetitions.Exec(func(_ int, err error) { batchCompetitions.Exec(func(_ int, err error) {
if err != nil { if err != nil {
batcherr = err batcherr = err
@ -219,7 +228,7 @@ func (u *UpdateService) InitSyncDatabase(ctx context.Context) error {
log.Debug().Msgf("%d competitions inserted", len(competitions)) log.Debug().Msgf("%d competitions inserted", len(competitions))
log.Debug().Msg("inserting teams into db...") log.Debug().Msg("inserting teams into db...")
cnt, err = u.db.BatchInsertTeams(ctx, lo.Union( cnt, err := u.db.BatchInsertTeams(ctx, lo.Union(
lo.Map(clubs, func(club football.Club, index int) model.BatchInsertTeamsParams { lo.Map(clubs, func(club football.Club, index int) model.BatchInsertTeamsParams {
return model.BatchInsertTeamsParams{ return model.BatchInsertTeamsParams{
Slug: club.Slug, Slug: club.Slug,