From fea1e96085e7fbb8f2a8a117941f63f0f76652bc Mon Sep 17 00:00:00 2001 From: Laurent Le Houerou Date: Fri, 22 Mar 2024 11:49:07 +0000 Subject: [PATCH] change country upsert to batchexec --- model/batch.go | 80 ++++++++++++++++++++++++++++++++++ model/copyfrom.go | 39 ----------------- model/country.sql.go | 33 +++++++++----- model/sql/country.sql | 19 +++++++- sorare_utils/update_service.go | 22 ++++++---- 5 files changed, 133 insertions(+), 60 deletions(-) diff --git a/model/batch.go b/model/batch.go index e36144f..fd64a35 100644 --- a/model/batch.go +++ b/model/batch.go @@ -91,3 +91,83 @@ func (b *CreateOrUpdateCompetitionsBatchResults) Close() error { b.closed = true return b.br.Close() } + +const createOrUpdateCountries = `-- name: CreateOrUpdateCountries :batchexec +INSERT INTO countries ( + slug, + code, + display_name, + three_letter_code, + flag_flat_64_url, + flag_flat_32_url, + flag_round_64_url, + flag_round_32_url + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (slug) + DO UPDATE + SET code = EXCLUDED.code, + display_name = EXCLUDED.display_name, + three_letter_code = EXCLUDED.three_letter_code, + flag_flat_64_url = EXCLUDED.flag_flat_64_url, + flag_flat_32_url = EXCLUDED.flag_flat_32_url, + flag_round_64_url = EXCLUDED.flag_round_64_url, + flag_round_32_url = EXCLUDED.flag_round_32_url +` + +type CreateOrUpdateCountriesBatchResults struct { + br pgx.BatchResults + tot int + closed bool +} + +type CreateOrUpdateCountriesParams struct { + Slug string + Code string + DisplayName string + ThreeLetterCode string + FlagFlat64Url string + FlagFlat32Url string + FlagRound64Url string + FlagRound32Url string +} + +func (q *Queries) CreateOrUpdateCountries(ctx context.Context, arg []CreateOrUpdateCountriesParams) *CreateOrUpdateCountriesBatchResults { + batch := &pgx.Batch{} + for _, a := range arg { + vals := []interface{}{ + a.Slug, + a.Code, + a.DisplayName, + a.ThreeLetterCode, + a.FlagFlat64Url, + a.FlagFlat32Url, + a.FlagRound64Url, + a.FlagRound32Url, + } + batch.Queue(createOrUpdateCountries, vals...) + } + br := q.db.SendBatch(ctx, batch) + return &CreateOrUpdateCountriesBatchResults{br, len(arg), false} +} + +func (b *CreateOrUpdateCountriesBatchResults) Exec(f func(int, error)) { + defer b.br.Close() + for t := 0; t < b.tot; t++ { + if b.closed { + if f != nil { + f(t, ErrBatchAlreadyClosed) + } + continue + } + _, err := b.br.Exec() + if f != nil { + f(t, err) + } + } +} + +func (b *CreateOrUpdateCountriesBatchResults) Close() error { + b.closed = true + return b.br.Close() +} diff --git a/model/copyfrom.go b/model/copyfrom.go index 1e12cc0..2cc2fbf 100644 --- a/model/copyfrom.go +++ b/model/copyfrom.go @@ -173,45 +173,6 @@ func (q *Queries) BatchInsertTeams(ctx context.Context, arg []BatchInsertTeamsPa return q.db.CopyFrom(ctx, []string{"teams"}, []string{"slug", "display_name", "country_slug", "domestic_league_slug", "short_name", "picture_url", "team_type"}, &iteratorForBatchInsertTeams{rows: arg}) } -// iteratorForCreateCountries implements pgx.CopyFromSource. -type iteratorForCreateCountries struct { - rows []CreateCountriesParams - skippedFirstNextCall bool -} - -func (r *iteratorForCreateCountries) Next() bool { - if len(r.rows) == 0 { - return false - } - if !r.skippedFirstNextCall { - r.skippedFirstNextCall = true - return true - } - r.rows = r.rows[1:] - return len(r.rows) > 0 -} - -func (r iteratorForCreateCountries) Values() ([]interface{}, error) { - return []interface{}{ - r.rows[0].Slug, - r.rows[0].Code, - r.rows[0].DisplayName, - r.rows[0].ThreeLetterCode, - r.rows[0].FlagFlat64Url, - r.rows[0].FlagFlat32Url, - r.rows[0].FlagRound64Url, - r.rows[0].FlagRound32Url, - }, nil -} - -func (r iteratorForCreateCountries) Err() error { - return nil -} - -func (q *Queries) CreateCountries(ctx context.Context, arg []CreateCountriesParams) (int64, error) { - return q.db.CopyFrom(ctx, []string{"countries"}, []string{"slug", "code", "display_name", "three_letter_code", "flag_flat_64_url", "flag_flat_32_url", "flag_round_64_url", "flag_round_32_url"}, &iteratorForCreateCountries{rows: arg}) -} - // iteratorForCreateFixtures implements pgx.CopyFromSource. type iteratorForCreateFixtures struct { rows []CreateFixturesParams diff --git a/model/country.sql.go b/model/country.sql.go index a7fcd2f..c4ebf14 100644 --- a/model/country.sql.go +++ b/model/country.sql.go @@ -9,17 +9,6 @@ import ( "context" ) -type CreateCountriesParams struct { - Slug string - Code string - DisplayName string - ThreeLetterCode string - FlagFlat64Url string - FlagFlat32Url string - FlagRound64Url string - FlagRound32Url string -} - const createOrUpdateCountry = `-- name: CreateOrUpdateCountry :exec INSERT INTO countries ( slug, @@ -67,3 +56,25 @@ func (q *Queries) CreateOrUpdateCountry(ctx context.Context, arg CreateOrUpdateC ) return err } + +const getCountryBySlug = `-- name: GetCountryBySlug :one + SELECT slug, code, display_name, three_letter_code, flag_flat_64_url, flag_flat_32_url, flag_round_64_url, flag_round_32_url + FROM countries + WHERE slug = $1 +` + +func (q *Queries) GetCountryBySlug(ctx context.Context, slug string) (Country, error) { + row := q.db.QueryRow(ctx, getCountryBySlug, slug) + var i Country + err := row.Scan( + &i.Slug, + &i.Code, + &i.DisplayName, + &i.ThreeLetterCode, + &i.FlagFlat64Url, + &i.FlagFlat32Url, + &i.FlagRound64Url, + &i.FlagRound32Url, + ) + return i, err +} diff --git a/model/sql/country.sql b/model/sql/country.sql index 77514b8..5a3186e 100644 --- a/model/sql/country.sql +++ b/model/sql/country.sql @@ -1,4 +1,4 @@ --- name: CreateCountries :copyfrom +-- name: CreateOrUpdateCountries :batchexec INSERT INTO countries ( slug, code, @@ -9,7 +9,16 @@ INSERT INTO countries ( flag_round_64_url, flag_round_32_url ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8); + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (slug) + DO UPDATE + SET code = EXCLUDED.code, + display_name = EXCLUDED.display_name, + three_letter_code = EXCLUDED.three_letter_code, + flag_flat_64_url = EXCLUDED.flag_flat_64_url, + flag_flat_32_url = EXCLUDED.flag_flat_32_url, + flag_round_64_url = EXCLUDED.flag_round_64_url, + flag_round_32_url = EXCLUDED.flag_round_32_url; -- name: CreateOrUpdateCountry :exec INSERT INTO countries ( @@ -32,3 +41,9 @@ INSERT INTO countries ( flag_flat_32_url = EXCLUDED.flag_flat_32_url, flag_round_64_url = EXCLUDED.flag_round_64_url, flag_round_32_url = EXCLUDED.flag_round_32_url; + + +-- name: GetCountryBySlug :one + SELECT * + FROM countries + WHERE slug = $1; diff --git a/sorare_utils/update_service.go b/sorare_utils/update_service.go index 0471028..1a392ef 100644 --- a/sorare_utils/update_service.go +++ b/sorare_utils/update_service.go @@ -163,10 +163,10 @@ func (u *UpdateService) InitSyncDatabase(ctx context.Context) error { log.Debug().Msgf("found %d countries", len(countries)) log.Debug().Msg("inserting countries into db...") - cnt, err = u.db.CreateCountries( + batchCountries := u.db.CreateOrUpdateCountries( ctx, - lo.Map(countries, func(country sorare.Country, index int) model.CreateCountriesParams { - return model.CreateCountriesParams{ + lo.Map(countries, func(country sorare.Country, index int) model.CreateOrUpdateCountriesParams { + return model.CreateOrUpdateCountriesParams{ Slug: country.Slug, Code: country.Code, DisplayName: country.Name, @@ -178,15 +178,21 @@ func (u *UpdateService) InitSyncDatabase(ctx context.Context) error { } }), ) - if err != nil { + var batcherr error + batchCountries.Exec(func(_ int, err error) { + if err != nil { + batcherr = err + batchCountries.Close() + } + }) + if batcherr != nil { return err } log.Debug().Msgf("%d countries inserted", cnt) log.Debug().Msg("inserting competitions into db...") - var batcherr error - batch := u.db.CreateOrUpdateCompetitions( + batchCompetitions := u.db.CreateOrUpdateCompetitions( ctx, lo.Map(competitions, func(competition football.Competition, index int) model.CreateOrUpdateCompetitionsParams { return model.CreateOrUpdateCompetitionsParams{ @@ -200,10 +206,10 @@ func (u *UpdateService) InitSyncDatabase(ctx context.Context) error { } }), ) - batch.Exec(func(_ int, err error) { + batchCompetitions.Exec(func(_ int, err error) { if err != nil { batcherr = err - batch.Close() + batchCompetitions.Close() } }) if batcherr != nil {