change country upsert to batchexec

This commit is contained in:
Laurent Le Houerou 2024-03-22 11:49:07 +00:00
parent 3affe1f2e3
commit fea1e96085
5 changed files with 133 additions and 60 deletions

View File

@ -91,3 +91,83 @@ func (b *CreateOrUpdateCompetitionsBatchResults) Close() error {
b.closed = true
return b.br.Close()
}
const createOrUpdateCountries = `-- name: CreateOrUpdateCountries :batchexec
INSERT INTO countries (
slug,
code,
display_name,
three_letter_code,
flag_flat_64_url,
flag_flat_32_url,
flag_round_64_url,
flag_round_32_url
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (slug)
DO UPDATE
SET code = EXCLUDED.code,
display_name = EXCLUDED.display_name,
three_letter_code = EXCLUDED.three_letter_code,
flag_flat_64_url = EXCLUDED.flag_flat_64_url,
flag_flat_32_url = EXCLUDED.flag_flat_32_url,
flag_round_64_url = EXCLUDED.flag_round_64_url,
flag_round_32_url = EXCLUDED.flag_round_32_url
`
type CreateOrUpdateCountriesBatchResults struct {
br pgx.BatchResults
tot int
closed bool
}
type CreateOrUpdateCountriesParams struct {
Slug string
Code string
DisplayName string
ThreeLetterCode string
FlagFlat64Url string
FlagFlat32Url string
FlagRound64Url string
FlagRound32Url string
}
func (q *Queries) CreateOrUpdateCountries(ctx context.Context, arg []CreateOrUpdateCountriesParams) *CreateOrUpdateCountriesBatchResults {
batch := &pgx.Batch{}
for _, a := range arg {
vals := []interface{}{
a.Slug,
a.Code,
a.DisplayName,
a.ThreeLetterCode,
a.FlagFlat64Url,
a.FlagFlat32Url,
a.FlagRound64Url,
a.FlagRound32Url,
}
batch.Queue(createOrUpdateCountries, vals...)
}
br := q.db.SendBatch(ctx, batch)
return &CreateOrUpdateCountriesBatchResults{br, len(arg), false}
}
func (b *CreateOrUpdateCountriesBatchResults) Exec(f func(int, error)) {
defer b.br.Close()
for t := 0; t < b.tot; t++ {
if b.closed {
if f != nil {
f(t, ErrBatchAlreadyClosed)
}
continue
}
_, err := b.br.Exec()
if f != nil {
f(t, err)
}
}
}
func (b *CreateOrUpdateCountriesBatchResults) Close() error {
b.closed = true
return b.br.Close()
}

View File

@ -173,45 +173,6 @@ func (q *Queries) BatchInsertTeams(ctx context.Context, arg []BatchInsertTeamsPa
return q.db.CopyFrom(ctx, []string{"teams"}, []string{"slug", "display_name", "country_slug", "domestic_league_slug", "short_name", "picture_url", "team_type"}, &iteratorForBatchInsertTeams{rows: arg})
}
// iteratorForCreateCountries implements pgx.CopyFromSource.
type iteratorForCreateCountries struct {
rows []CreateCountriesParams
skippedFirstNextCall bool
}
func (r *iteratorForCreateCountries) Next() bool {
if len(r.rows) == 0 {
return false
}
if !r.skippedFirstNextCall {
r.skippedFirstNextCall = true
return true
}
r.rows = r.rows[1:]
return len(r.rows) > 0
}
func (r iteratorForCreateCountries) Values() ([]interface{}, error) {
return []interface{}{
r.rows[0].Slug,
r.rows[0].Code,
r.rows[0].DisplayName,
r.rows[0].ThreeLetterCode,
r.rows[0].FlagFlat64Url,
r.rows[0].FlagFlat32Url,
r.rows[0].FlagRound64Url,
r.rows[0].FlagRound32Url,
}, nil
}
func (r iteratorForCreateCountries) Err() error {
return nil
}
func (q *Queries) CreateCountries(ctx context.Context, arg []CreateCountriesParams) (int64, error) {
return q.db.CopyFrom(ctx, []string{"countries"}, []string{"slug", "code", "display_name", "three_letter_code", "flag_flat_64_url", "flag_flat_32_url", "flag_round_64_url", "flag_round_32_url"}, &iteratorForCreateCountries{rows: arg})
}
// iteratorForCreateFixtures implements pgx.CopyFromSource.
type iteratorForCreateFixtures struct {
rows []CreateFixturesParams

View File

@ -9,17 +9,6 @@ import (
"context"
)
type CreateCountriesParams struct {
Slug string
Code string
DisplayName string
ThreeLetterCode string
FlagFlat64Url string
FlagFlat32Url string
FlagRound64Url string
FlagRound32Url string
}
const createOrUpdateCountry = `-- name: CreateOrUpdateCountry :exec
INSERT INTO countries (
slug,
@ -67,3 +56,25 @@ func (q *Queries) CreateOrUpdateCountry(ctx context.Context, arg CreateOrUpdateC
)
return err
}
const getCountryBySlug = `-- name: GetCountryBySlug :one
SELECT slug, code, display_name, three_letter_code, flag_flat_64_url, flag_flat_32_url, flag_round_64_url, flag_round_32_url
FROM countries
WHERE slug = $1
`
func (q *Queries) GetCountryBySlug(ctx context.Context, slug string) (Country, error) {
row := q.db.QueryRow(ctx, getCountryBySlug, slug)
var i Country
err := row.Scan(
&i.Slug,
&i.Code,
&i.DisplayName,
&i.ThreeLetterCode,
&i.FlagFlat64Url,
&i.FlagFlat32Url,
&i.FlagRound64Url,
&i.FlagRound32Url,
)
return i, err
}

View File

@ -1,4 +1,4 @@
-- name: CreateCountries :copyfrom
-- name: CreateOrUpdateCountries :batchexec
INSERT INTO countries (
slug,
code,
@ -9,7 +9,16 @@ INSERT INTO countries (
flag_round_64_url,
flag_round_32_url
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8);
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (slug)
DO UPDATE
SET code = EXCLUDED.code,
display_name = EXCLUDED.display_name,
three_letter_code = EXCLUDED.three_letter_code,
flag_flat_64_url = EXCLUDED.flag_flat_64_url,
flag_flat_32_url = EXCLUDED.flag_flat_32_url,
flag_round_64_url = EXCLUDED.flag_round_64_url,
flag_round_32_url = EXCLUDED.flag_round_32_url;
-- name: CreateOrUpdateCountry :exec
INSERT INTO countries (
@ -32,3 +41,9 @@ INSERT INTO countries (
flag_flat_32_url = EXCLUDED.flag_flat_32_url,
flag_round_64_url = EXCLUDED.flag_round_64_url,
flag_round_32_url = EXCLUDED.flag_round_32_url;
-- name: GetCountryBySlug :one
SELECT *
FROM countries
WHERE slug = $1;

View File

@ -163,10 +163,10 @@ func (u *UpdateService) InitSyncDatabase(ctx context.Context) error {
log.Debug().Msgf("found %d countries", len(countries))
log.Debug().Msg("inserting countries into db...")
cnt, err = u.db.CreateCountries(
batchCountries := u.db.CreateOrUpdateCountries(
ctx,
lo.Map(countries, func(country sorare.Country, index int) model.CreateCountriesParams {
return model.CreateCountriesParams{
lo.Map(countries, func(country sorare.Country, index int) model.CreateOrUpdateCountriesParams {
return model.CreateOrUpdateCountriesParams{
Slug: country.Slug,
Code: country.Code,
DisplayName: country.Name,
@ -178,15 +178,21 @@ func (u *UpdateService) InitSyncDatabase(ctx context.Context) error {
}
}),
)
if err != nil {
var batcherr error
batchCountries.Exec(func(_ int, err error) {
if err != nil {
batcherr = err
batchCountries.Close()
}
})
if batcherr != nil {
return err
}
log.Debug().Msgf("%d countries inserted", cnt)
log.Debug().Msg("inserting competitions into db...")
var batcherr error
batch := u.db.CreateOrUpdateCompetitions(
batchCompetitions := u.db.CreateOrUpdateCompetitions(
ctx,
lo.Map(competitions, func(competition football.Competition, index int) model.CreateOrUpdateCompetitionsParams {
return model.CreateOrUpdateCompetitionsParams{
@ -200,10 +206,10 @@ func (u *UpdateService) InitSyncDatabase(ctx context.Context) error {
}
}),
)
batch.Exec(func(_ int, err error) {
batchCompetitions.Exec(func(_ int, err error) {
if err != nil {
batcherr = err
batch.Close()
batchCompetitions.Close()
}
})
if batcherr != nil {