Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): refactor runner channels into abstract queues #2971

Merged
merged 45 commits into from
Jul 27, 2023
Merged
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
5e12879
replace pg driver
schoren Jul 18, 2023
e435f93
fix test
schoren Jul 18, 2023
74cc661
--wip-- [skip ci]
schoren Jul 18, 2023
63b152f
WIP: add pipeline structure
mathnogueira Jul 19, 2023
98ec189
update assertion runner
schoren Jul 19, 2023
cca4bdf
implement linter runner
schoren Jul 19, 2023
ed4d1cb
WIP: convert other workers
mathnogueira Jul 19, 2023
d571b57
wip
mathnogueira Jul 19, 2023
86440e8
implement runner
schoren Jul 20, 2023
d4baa2a
fix rerun
schoren Jul 20, 2023
5893a9e
cleanup
schoren Jul 21, 2023
a88c4b2
fix test
schoren Jul 21, 2023
e3cea98
fix config test
schoren Jul 21, 2023
0b5fa81
clenaup test
schoren Jul 24, 2023
2d80e77
--wip-- [skip ci]
schoren Jul 24, 2023
c012f93
fix transactions
schoren Jul 24, 2023
b55841d
fix poller test
schoren Jul 24, 2023
10ec3ab
fixes
schoren Jul 24, 2023
ab31f99
fix queries
schoren Jul 24, 2023
e52736f
setup context propagation
schoren Jul 24, 2023
b0b17fc
fix tests
schoren Jul 25, 2023
93bfd02
fix: fixes header propagation
mathnogueira Jul 25, 2023
8cbbf64
fix
schoren Jul 25, 2023
782e0f2
--wip-- [skip ci]
schoren Jul 26, 2023
25d62c7
--wip-- [skip ci]
schoren Jul 26, 2023
945b5f9
remove race condiion
schoren Jul 26, 2023
6d93f17
fix: weird polling bug
mathnogueira Jul 26, 2023
15d7823
--wip-- [skip ci]
schoren Jul 26, 2023
63050c6
fix test
schoren Jul 26, 2023
211deba
update test
schoren Jul 26, 2023
ceda8eb
implement stop
schoren Jul 26, 2023
f8cc155
remove debug logs
schoren Jul 26, 2023
c9caa5b
remove debug logs
schoren Jul 26, 2023
84430ea
rollback change
schoren Jul 27, 2023
0dd4ecd
revert change
schoren Jul 27, 2023
a3b7acd
organize code
schoren Jul 27, 2023
5dc4e28
server/openapi
schoren Jul 27, 2023
65ad8f8
fix
schoren Jul 27, 2023
7269452
fix pp
schoren Jul 27, 2023
a81e761
Update server/pkg/id/generator.go
schoren Jul 27, 2023
66add09
name magic constant
schoren Jul 27, 2023
796aeec
handle empty queues
schoren Jul 27, 2023
8a20d1d
name magic constant
schoren Jul 27, 2023
f36fbbf
fix rerun state
schoren Jul 27, 2023
d910771
fix
schoren Jul 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
setup context propagation
  • Loading branch information
schoren committed Jul 26, 2023
commit e52736fdecaa5b8b9363ae66ae82638729290034
12 changes: 8 additions & 4 deletions server/executor/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/kubeshop/tracetest/server/pkg/id"
"github.com/kubeshop/tracetest/server/test"
"github.com/kubeshop/tracetest/server/transaction"
"go.opentelemetry.io/otel/propagation"
)

const (
Expand Down Expand Up @@ -198,11 +199,14 @@ type QueueDriver interface {
}

func (q Queue) Enqueue(ctx context.Context, job Job) {
// TODO: carry context propagation
job = job.increaseEnqueueCount()

headers := map[string]string{}
propagator := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
propagator.Inject(ctx, propagation.MapCarrier(headers))

q.driver.Enqueue(Job{
ctxHeaders: job.ctxHeaders,
ctxHeaders: headers,

Test: test.Test{ID: job.Test.ID},
Run: test.Run{ID: job.Run.ID},
Expand All @@ -217,8 +221,8 @@ func (q Queue) Enqueue(ctx context.Context, job Job) {

func (q Queue) Listen(job Job) {
// this is called when a new job is put in the queue and we need to process it
ctx := context.Background()
// TODO - carry over headers
propagator := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
ctx := propagator.Extract(context.Background(), propagation.MapCarrier(job.ctxHeaders))

newJob := Job{
ctxHeaders: job.ctxHeaders,
Expand Down