Skip to content

Commit

Permalink
workflow UI (#99)
Browse files Browse the repository at this point in the history
* tons of improvements to the workflow UI

* default to true in workflow feature flag
  • Loading branch information
bgentry authored Jul 25, 2024
1 parent e640773 commit 8b5f177
Show file tree
Hide file tree
Showing 29 changed files with 1,747 additions and 151 deletions.
159 changes: 158 additions & 1 deletion api_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,82 @@ func (a *workflowGetEndpoint) Execute(ctx context.Context, req *workflowGetReque
}, nil
}

//
// workflowListEndpoint
//

type workflowListEndpoint struct {
apiBundle
apiendpoint.Endpoint[jobCancelRequest, listResponse[workflowListItem]]
}

func (*workflowListEndpoint) Meta() *apiendpoint.EndpointMeta {
return &apiendpoint.EndpointMeta{
Pattern: "GET /api/workflows",
StatusCode: http.StatusOK,
}
}

type workflowListRequest struct {
After *string `json:"-" validate:"omitempty"` // from ExtractRaw
Limit *int `json:"-" validate:"omitempty,min=0,max=1000"` // from ExtractRaw
State string `json:"-" validate:"omitempty,oneof=active inactive"` // from ExtractRaw
}

func (req *workflowListRequest) ExtractRaw(r *http.Request) error {
if after := r.URL.Query().Get("after"); after != "" {
req.After = (&after)
}

if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
limit, err := strconv.Atoi(limitStr)
if err != nil {
return apierror.NewBadRequest("Couldn't convert `limit` to integer: %s.", err)
}

req.Limit = &limit
}

if state := r.URL.Query().Get("state"); state != "" {
req.State = (state)
}

return nil
}

func (a *workflowListEndpoint) Execute(ctx context.Context, req *workflowListRequest) (*listResponse[workflowListItem], error) {
switch req.State {
case "active":
workflows, err := dbsqlc.New().WorkflowListActive(ctx, a.dbPool, &dbsqlc.WorkflowListActiveParams{
After: ptrutil.ValOrDefault(req.After, ""),
PaginationLimit: int32(ptrutil.ValOrDefault(req.Limit, 100)),
})
if err != nil {
return nil, fmt.Errorf("error listing workflows: %w", err)
}

return listResponseFrom(sliceutil.Map(workflows, internalWorkflowListActiveToSerializableWorkflow)), nil
case "inactive":
workflows, err := dbsqlc.New().WorkflowListInactive(ctx, a.dbPool, &dbsqlc.WorkflowListInactiveParams{
After: ptrutil.ValOrDefault(req.After, ""),
PaginationLimit: int32(ptrutil.ValOrDefault(req.Limit, 100)),
})
if err != nil {
return nil, fmt.Errorf("error listing workflows: %w", err)
}
return listResponseFrom(sliceutil.Map(workflows, internalWorkflowListInactiveToSerializableWorkflow)), nil
default:
workflows, err := dbsqlc.New().WorkflowListAll(ctx, a.dbPool, &dbsqlc.WorkflowListAllParams{
After: ptrutil.ValOrDefault(req.After, ""),
PaginationLimit: int32(ptrutil.ValOrDefault(req.Limit, 100)),
})
if err != nil {
return nil, fmt.Errorf("error listing workflows: %w", err)
}
return listResponseFrom(sliceutil.Map(workflows, internalWorkflowListAllToSerializableWorkflow)), nil
}
}

type RiverJob struct {
ID int64 `json:"id"`
Args json.RawMessage `json:"args"`
Expand Down Expand Up @@ -642,7 +718,7 @@ func internalJobToSerializableJob(internal *dbsqlc.RiverJob) *RiverJob {
Attempt: int(internal.Attempt),
AttemptedAt: timePtr(internal.AttemptedAt),
AttemptedBy: attemptedBy,
CreatedAt: internal.CreatedAt.Time,
CreatedAt: internal.CreatedAt.Time.UTC(),
Errors: errs,
FinalizedAt: timePtr(internal.FinalizedAt),
Kind: internal.Kind,
Expand Down Expand Up @@ -721,6 +797,87 @@ func riverQueuesToSerializableQueues(internal []*rivertype.Queue, counts []*dbsq
return listResponseFrom(queues)
}

type workflowListItem struct {
CountAvailable int `json:"count_available"`
CountCancelled int `json:"count_cancelled"`
CountCompleted int `json:"count_completed"`
CountDiscarded int `json:"count_discarded"`
CountFailedDeps int `json:"count_failed_deps"`
CountPending int `json:"count_pending"`
CountRetryable int `json:"count_retryable"`
CountRunning int `json:"count_running"`
CountScheduled int `json:"count_scheduled"`
CreatedAt time.Time `json:"created_at"`
ID string `json:"id"`
Name *string `json:"name"`
}

func internalWorkflowListActiveToSerializableWorkflow(internal *dbsqlc.WorkflowListActiveRow) *workflowListItem {
var name *string
if internal.WorkflowName != "" {
name = &internal.WorkflowName
}

return &workflowListItem{
CountAvailable: int(internal.CountAvailable),
CountCancelled: int(internal.CountCancelled),
CountCompleted: int(internal.CountCompleted),
CountDiscarded: int(internal.CountDiscarded),
CountFailedDeps: int(internal.CountFailedDeps),
CountPending: int(internal.CountPending),
CountRetryable: int(internal.CountRetryable),
CountRunning: int(internal.CountRunning),
CountScheduled: int(internal.CountScheduled),
CreatedAt: internal.EarliestCreatedAt.Time.UTC(),
ID: internal.WorkflowID,
Name: name,
}
}

func internalWorkflowListAllToSerializableWorkflow(internal *dbsqlc.WorkflowListAllRow) *workflowListItem {
var name *string
if internal.WorkflowName != "" {
name = &internal.WorkflowName
}

return &workflowListItem{
CountAvailable: int(internal.CountAvailable),
CountCancelled: int(internal.CountCancelled),
CountCompleted: int(internal.CountCompleted),
CountDiscarded: int(internal.CountDiscarded),
CountFailedDeps: int(internal.CountFailedDeps),
CountPending: int(internal.CountPending),
CountRetryable: int(internal.CountRetryable),
CountRunning: int(internal.CountRunning),
CountScheduled: int(internal.CountScheduled),
CreatedAt: internal.EarliestCreatedAt.Time.UTC(),
ID: internal.WorkflowID,
Name: name,
}
}

func internalWorkflowListInactiveToSerializableWorkflow(internal *dbsqlc.WorkflowListInactiveRow) *workflowListItem {
var name *string
if internal.WorkflowName != "" {
name = &internal.WorkflowName
}

return &workflowListItem{
CountAvailable: int(internal.CountAvailable),
CountCancelled: int(internal.CountCancelled),
CountCompleted: int(internal.CountCompleted),
CountDiscarded: int(internal.CountDiscarded),
CountFailedDeps: int(internal.CountFailedDeps),
CountPending: int(internal.CountPending),
CountRetryable: int(internal.CountRetryable),
CountRunning: int(internal.CountRunning),
CountScheduled: int(internal.CountScheduled),
CreatedAt: internal.EarliestCreatedAt.Time.UTC(),
ID: internal.WorkflowID,
Name: name,
}
}

func timePtr(t pgtype.Timestamptz) *time.Time {
if !t.Valid {
return nil
Expand Down
97 changes: 97 additions & 0 deletions api_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,3 +531,100 @@ func TestAPIHandlerWorkflowGet(t *testing.T) {
requireAPIError(t, apierror.NewNotFoundWorkflow(workflowID.String()), err)
})
}

func TestAPIHandlerWorkflowList(t *testing.T) {
t.Parallel()

ctx := context.Background()

t.Run("Success", func(t *testing.T) {
t.Parallel()

endpoint, bundle := setupEndpoint[workflowListEndpoint](ctx, t)

job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{
Metadata: []byte(`{"workflow_id":"1", "workflow_name":"first_wf", "task":"a"}`),
State: ptrutil.Ptr(rivertype.JobStatePending),
})
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{
FinalizedAt: ptrutil.Ptr(time.Now()),
Metadata: []byte(`{"workflow_id":"2", "task":"b"}`),
State: ptrutil.Ptr(rivertype.JobStateCompleted),
})
_ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{
FinalizedAt: ptrutil.Ptr(time.Now()),
Metadata: []byte(`{"workflow_id":"2", "task":"c", "workflow_deps_failed_at":"2024-01-01T00:00:00Z"}`),
State: ptrutil.Ptr(rivertype.JobStateCancelled),
})

t.Run("All", func(t *testing.T) {
resp, err := endpoint.Execute(ctx, &workflowListRequest{})
require.NoError(t, err)
require.Len(t, resp.Data, 2)
require.Equal(t, 1, resp.Data[0].CountCancelled)
require.Equal(t, 1, resp.Data[0].CountCompleted)
t.Logf("resp0: %+v", resp.Data[0])
require.Equal(t, 1, resp.Data[0].CountFailedDeps)
require.Nil(t, resp.Data[0].Name)

require.Equal(t, 0, resp.Data[1].CountAvailable)
require.Equal(t, 0, resp.Data[1].CountCancelled)
require.Equal(t, 0, resp.Data[1].CountCompleted)
require.Equal(t, 0, resp.Data[1].CountDiscarded)
require.Equal(t, 0, resp.Data[1].CountFailedDeps)
require.Equal(t, 1, resp.Data[1].CountPending)
require.Equal(t, 0, resp.Data[1].CountRetryable)
require.Equal(t, 0, resp.Data[1].CountRunning)
require.Equal(t, 0, resp.Data[1].CountScheduled)
require.Equal(t, "first_wf", *resp.Data[1].Name)
})

t.Run("Active", func(t *testing.T) {
resp, err := endpoint.Execute(ctx, &workflowListRequest{State: "active"})
require.NoError(t, err)
require.Len(t, resp.Data, 1)
require.Equal(t, 0, resp.Data[0].CountAvailable)
require.Equal(t, 0, resp.Data[0].CountCancelled)
require.Equal(t, 0, resp.Data[0].CountCompleted)
require.Equal(t, 0, resp.Data[0].CountDiscarded)
require.Equal(t, 0, resp.Data[0].CountFailedDeps)
require.Equal(t, 1, resp.Data[0].CountPending)
require.Equal(t, 0, resp.Data[0].CountRetryable)
require.Equal(t, 0, resp.Data[0].CountRunning)
require.Equal(t, 0, resp.Data[0].CountScheduled)
require.Equal(t, "first_wf", *resp.Data[0].Name)
require.Equal(t, job1.CreatedAt.UTC(), resp.Data[0].CreatedAt)
})

t.Run("Inactive", func(t *testing.T) {
resp, err := endpoint.Execute(ctx, &workflowListRequest{State: "inactive"})
require.NoError(t, err)
require.Len(t, resp.Data, 1)
require.Equal(t, 1, resp.Data[0].CountCompleted)
require.Equal(t, 1, resp.Data[0].CountFailedDeps)
require.Nil(t, resp.Data[0].Name)
require.Equal(t, job2.CreatedAt.UTC(), resp.Data[0].CreatedAt)
})
})

t.Run("Limit", func(t *testing.T) {
t.Parallel()

endpoint, bundle := setupEndpoint[workflowListEndpoint](ctx, t)

_ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{
Metadata: []byte(`{"workflow_id":"1", "workflow_name":"first_wf", "task":"a"}`),
State: ptrutil.Ptr(rivertype.JobStatePending),
})
_ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{
Metadata: []byte(`{"workflow_id":"2", "task":"b"}`),
ScheduledAt: ptrutil.Ptr(time.Now().Add(time.Hour)),
State: ptrutil.Ptr(rivertype.JobStateScheduled),
})

resp, err := endpoint.Execute(ctx, &workflowListRequest{Limit: ptrutil.Ptr(1)})
require.NoError(t, err)
require.Len(t, resp.Data, 1)
require.Equal(t, "2", resp.Data[0].ID) // DESC order means last one gets returned
})
}
1 change: 1 addition & 0 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func NewHandler(opts *HandlerOpts) (http.Handler, error) {
apiendpoint.Mount(mux, opts.Logger, &queueResumeEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &stateAndCountGetEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &workflowGetEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &workflowListEndpoint{apiBundle: apiBundle})

if err := mountStaticFiles(opts.Logger, mux); err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func TestNewHandlerIntegration(t *testing.T) {
makeAPICall(t, "QueueResume", http.MethodPut, makeURL("/api/queues/%s/resume", queuePaused.Name), nil)
makeAPICall(t, "StateAndCountGet", http.MethodGet, makeURL("/api/states"), nil)
makeAPICall(t, "WorkflowGet", http.MethodGet, makeURL("/api/workflows/%s", workflowID), nil)
makeAPICall(t, "WorkflowList", http.MethodGet, makeURL("/api/workflows"), nil)

//
// Static files
Expand Down
4 changes: 3 additions & 1 deletion internal/dbsqlc/sqlc.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
version: "2"
sql:
- engine: "postgresql"
queries: "query.sql"
queries:
- "query.sql"
- "workflow.sql"
schema: "migrations"
gen:
go:
Expand Down
85 changes: 85 additions & 0 deletions internal/dbsqlc/workflow.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
-- name: WorkflowListActive :many
WITH workflow_ids AS (
SELECT DISTINCT ON (workflow_id) metadata->>'workflow_id' AS workflow_id
FROM river_job
WHERE state IN ('available', 'pending', 'retryable', 'running', 'scheduled')
AND metadata ? 'workflow_id'
AND metadata->>'workflow_id' > @after::text
ORDER BY workflow_id DESC
LIMIT @pagination_limit::integer
)

SELECT
(river_job.metadata->>'workflow_id')::text AS workflow_id,
coalesce(river_job.metadata->>'workflow_name', '')::text AS workflow_name,
MIN(river_job.created_at)::timestamptz AS earliest_created_at,
COUNT(*) FILTER (WHERE river_job.metadata ? 'workflow_deps_failed_at') AS count_failed_deps,
COUNT(*) FILTER (WHERE river_job.state = 'available') AS count_available,
COUNT(*) FILTER (WHERE river_job.state = 'cancelled') AS count_cancelled,
COUNT(*) FILTER (WHERE river_job.state = 'completed') AS count_completed,
COUNT(*) FILTER (WHERE river_job.state = 'discarded') AS count_discarded,
COUNT(*) FILTER (WHERE river_job.state = 'pending') AS count_pending,
COUNT(*) FILTER (WHERE river_job.state = 'retryable') AS count_retryable,
COUNT(*) FILTER (WHERE river_job.state = 'running') AS count_running,
COUNT(*) FILTER (WHERE river_job.state = 'scheduled') AS count_scheduled
FROM river_job
INNER JOIN workflow_ids
ON river_job.metadata->>'workflow_id' = workflow_ids.workflow_id
WHERE
river_job.metadata ? 'workflow_id'
GROUP BY river_job.metadata->>'workflow_id', metadata->>'workflow_name'
ORDER BY river_job.metadata->>'workflow_id' DESC;

-- name: WorkflowListAll :many
SELECT
(river_job.metadata->>'workflow_id')::text AS workflow_id,
coalesce(river_job.metadata->>'workflow_name', '')::text AS workflow_name,
MIN(river_job.created_at)::timestamptz AS earliest_created_at,
COUNT(*) FILTER (WHERE river_job.metadata ? 'workflow_deps_failed_at') AS count_failed_deps,
COUNT(*) FILTER (WHERE river_job.state = 'available') AS count_available,
COUNT(*) FILTER (WHERE river_job.state = 'cancelled') AS count_cancelled,
COUNT(*) FILTER (WHERE river_job.state = 'completed') AS count_completed,
COUNT(*) FILTER (WHERE river_job.state = 'discarded') AS count_discarded,
COUNT(*) FILTER (WHERE river_job.state = 'pending') AS count_pending,
COUNT(*) FILTER (WHERE river_job.state = 'retryable') AS count_retryable,
COUNT(*) FILTER (WHERE river_job.state = 'running') AS count_running,
COUNT(*) FILTER (WHERE river_job.state = 'scheduled') AS count_scheduled
FROM river_job
WHERE
river_job.metadata ? 'workflow_id'
AND river_job.metadata->>'workflow_id' > @after::text
GROUP BY river_job.metadata->>'workflow_id', metadata->>'workflow_name'
ORDER BY river_job.metadata->>'workflow_id' DESC
LIMIT @pagination_limit::integer;

-- name: WorkflowListInactive :many
WITH active_workflow_ids AS (
SELECT DISTINCT ON (workflow_id) metadata->>'workflow_id' AS workflow_id
FROM river_job
WHERE state IN ('available', 'pending', 'retryable', 'running', 'scheduled')
AND metadata->>'workflow_id' > @after::text
AND metadata ? 'workflow_id'
ORDER BY workflow_id DESC
)

SELECT
(river_job.metadata->>'workflow_id')::text AS workflow_id,
coalesce(river_job.metadata->>'workflow_name', '')::text AS workflow_name,
MIN(river_job.created_at)::timestamptz AS earliest_created_at,
COUNT(*) FILTER (WHERE river_job.metadata ? 'workflow_deps_failed_at') AS count_failed_deps,
COUNT(*) FILTER (WHERE river_job.state = 'available') AS count_available,
COUNT(*) FILTER (WHERE river_job.state = 'cancelled') AS count_cancelled,
COUNT(*) FILTER (WHERE river_job.state = 'completed') AS count_completed,
COUNT(*) FILTER (WHERE river_job.state = 'discarded') AS count_discarded,
COUNT(*) FILTER (WHERE river_job.state = 'pending') AS count_pending,
COUNT(*) FILTER (WHERE river_job.state = 'retryable') AS count_retryable,
COUNT(*) FILTER (WHERE river_job.state = 'running') AS count_running,
COUNT(*) FILTER (WHERE river_job.state = 'scheduled') AS count_scheduled
FROM river_job
WHERE
state IN ('completed', 'cancelled', 'discarded')
AND metadata ? 'workflow_id'
AND metadata ->> 'workflow_id' NOT IN (SELECT workflow_id FROM active_workflow_ids)
GROUP BY metadata->>'workflow_id', metadata->>'workflow_name'
ORDER BY workflow_id DESC
LIMIT @pagination_limit::integer;
Loading

0 comments on commit 8b5f177

Please sign in to comment.