From a5998883fffbc78e8f45c46b60cceda472136460 Mon Sep 17 00:00:00 2001 From: Marco Rizzi Date: Wed, 20 Dec 2023 17:08:51 +0100 Subject: [PATCH] Ent: IngestSources optimized with concurrently (#1595) Signed-off-by: mrizzi --- pkg/assembler/backends/ent/backend/source.go | 23 ++++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/pkg/assembler/backends/ent/backend/source.go b/pkg/assembler/backends/ent/backend/source.go index 537eb4cd1a..1a1289a094 100644 --- a/pkg/assembler/backends/ent/backend/source.go +++ b/pkg/assembler/backends/ent/backend/source.go @@ -30,6 +30,7 @@ import ( "github.com/guacsec/guac/pkg/assembler/graphql/model" "github.com/pkg/errors" "github.com/vektah/gqlparser/v2/gqlerror" + "golang.org/x/sync/errgroup" ) func (b *EntBackend) HasSourceAt(ctx context.Context, filter *model.HasSourceAtSpec) ([]*model.HasSourceAt, error) { @@ -162,12 +163,20 @@ func (b *EntBackend) Sources(ctx context.Context, filter *model.SourceSpec) ([]* func (b *EntBackend) IngestSources(ctx context.Context, sources []*model.SourceInputSpec) ([]*model.SourceIDs, error) { ids := make([]*model.SourceIDs, len(sources)) - for i, src := range sources { - s, err := b.IngestSource(ctx, *src) - if err != nil { - return nil, err - } - ids[i] = s + eg, ctx := errgroup.WithContext(ctx) + for i := range sources { + index := i + src := sources[index] + concurrently(eg, func() error { + s, err := b.IngestSource(ctx, *src) + if err == nil { + ids[index] = s + } + return err + }) + } + if err := eg.Wait(); err != nil { + return nil, err } return ids, nil } @@ -248,7 +257,7 @@ func upsertSource(ctx context.Context, client *ent.Tx, src model.SourceInputSpec ID(ctx) if err != nil { if err != stdsql.ErrNoRows { - return nil, errors.Wrap(err, "upsert package version") + return nil, errors.Wrap(err, "upsert source name") } sourceNameID, err = client.SourceName.Query().