Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get rid of broadway #149

Merged
merged 12 commits into from
Feb 9, 2022
13 changes: 4 additions & 9 deletions lib/faktory_worker/batch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ defmodule FaktoryWorker.Batch do
all jobs in a batch have completed, Faktory will queue a callback job. This
allows building complex job workflows with dependencies.

Jobs pushed as part of a batch _must_ be pushed synchronously. This can be
done using the `skip_pipeline: true` option when calling `perform_async/2`. If
a job isn't pushed synchronously, you may encounter a race condition where the
batch is committed before all jobs have been pushed.

## Creating a batch

A batch is created using `new!/1` and must provide a description and declare
Expand All @@ -25,9 +20,9 @@ defmodule FaktoryWorker.Batch do
alias FaktoryWorker.Batch

{:ok, bid} = Batch.new!(on_success: {MyApp.EmailReportJob, [], []})
MyApp.Job.perform_async([1, 2], custom: %{"bid" => bid}, skip_pipeline: true)
MyApp.Job.perform_async([3, 4], custom: %{"bid" => bid}, skip_pipeline: true)
MyApp.Job.perform_async([5, 6], custom: %{"bid" => bid}, skip_pipeline: true)
MyApp.Job.perform_async([1, 2], custom: %{"bid" => bid})
MyApp.Job.perform_async([3, 4], custom: %{"bid" => bid})
MyApp.Job.perform_async([5, 6], custom: %{"bid" => bid})
Batch.commit(bid)
```

Expand All @@ -45,7 +40,7 @@ defmodule FaktoryWorker.Batch do
def perform(arg1, arg2, bid) do
Batch.open(bid)

MyApp.OtherJob.perform_async([1, 2], custom: %{"bid" => bid}, skip_pipeline: true)
MyApp.OtherJob.perform_async([1, 2], custom: %{"bid" => bid})

Batch.commit(bid)
end
Expand Down
58 changes: 9 additions & 49 deletions lib/faktory_worker/job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,10 @@ defmodule FaktoryWorker.Job do

## Synchronous job pushing

By default, jobs are pushed asynchronously to the Faktory server. To ensure a
job has been successfully submitted before continuing, jobs can be pushed
synchronously instead. To do this, pass the `:skip_pipeline` option with the
value of `true` to `perform_async/2`.

Synchronous pushing is required in certain situations to guarantee ordering,
such as when using the Faktory Enterprise batching feature.
Previous version used Broadway to send jobs and `:skip_pipeline` parameter was used to do it synchronously.
`:skip_pipeline` is not supported anymore.
Since Batch operations is a feature of Faktory Enterprise this library now sends any single job synchronously
and makes HTTP call to faktory server (see `FaktoryWorker.Batch`).

## Worker Configuration

Expand All @@ -88,7 +85,7 @@ defmodule FaktoryWorker.Job do
means only values that implement the `Jason.Encoder` protocol are valid when calling the `perform_async/2` function.
"""

alias FaktoryWorker.{ConnectionManager, Random, Pool, Sandbox, Telemetry}
alias FaktoryWorker.{ConnectionManager, Random, Pool, Telemetry}

# Look at supporting the following optional fields when pushing a job
# priority
Expand Down Expand Up @@ -128,37 +125,9 @@ defmodule FaktoryWorker.Job do

@doc false
def perform_async(payload, opts) do
case Keyword.get(opts, :skip_pipeline, false) do
false ->
opts
|> push_pipeline_name()
|> perform_async(payload, opts)

true ->
opts
|> faktory_name()
|> push(payload)
end
end

@doc false
def perform_async(_, {:error, _} = error, _), do: error

def perform_async(pipeline_name, payload, opts) do
if Sandbox.active?() do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to keep the sandbox, it is what we use to simulate the server during testing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeremyowensboggs makes sense, I overdid it with simplifications

Sandbox.enqueue_job(
String.to_existing_atom("Elixir." <> payload.jobtype),
payload.args,
opts
)
else
# TODO:: do we want to split pipelines into multiple threads???
# It was implemented with a Broadway & Producers <--> processes model.
# But Faktory is a job server and do we want to care and build one more not needed layer???
opts
|> faktory_name()
|> push(payload)
end
opts
|> faktory_name()
|> push(payload)
end

@doc false
Expand All @@ -170,6 +139,7 @@ defmodule FaktoryWorker.Job do
end

@doc false
def push(_, invalid_payload = {:error, _}), do: invalid_payload
def push(faktory_name, job) do
faktory_name
|> Pool.format_pool_name()
Expand Down Expand Up @@ -215,16 +185,6 @@ defmodule FaktoryWorker.Job do
"The field '#{Atom.to_string(field)}' has an invalid value '#{inspect(value)}'"
end

defp push_pipeline_name(opts) do
opts
|> faktory_name()
|> format_pipeline_name()
end

defp format_pipeline_name(name) when is_atom(name) do
:"#{name}_pipeline"
end

defp faktory_name(opts) do
Keyword.get(opts, :faktory_name, FaktoryWorker)
end
Expand Down
2 changes: 1 addition & 1 deletion test/faktory_worker/connection_manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ defmodule FaktoryWorker.ConnectionManagerTest do
{{:ok, result}, _} = ConnectionManager.send_command(state, {:push, payload})

assert result == "halt reason"
end) =~ "[warn] [123456] Halt: halt reason"
end) |> String.match?(~r/\[warn.*(?<!ing)\]*\[123456\]*[Halt: halt reason]/)
end

test "should unset the connection when there is a socket failure" do
Expand Down
6 changes: 2 additions & 4 deletions test/faktory_worker/job/batch_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ defmodule FaktoryWorker.BatchIntegrationTest do

job_opts = [
faktory_name: faktory_name,
custom: %{"bid" => bid},
skip_pipeline: true
custom: %{"bid" => bid}
]

DefaultWorker.perform_async(["1"], job_opts)
Expand Down Expand Up @@ -72,8 +71,7 @@ defmodule FaktoryWorker.BatchIntegrationTest do

job_opts = [
faktory_name: faktory_name,
custom: %{"bid" => bid},
skip_pipeline: true
custom: %{"bid" => bid}
]

DefaultWorker.perform_async(["1"], job_opts)
Expand Down
5 changes: 3 additions & 2 deletions test/faktory_worker/job/job_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule FaktoryWorker.JobIntegrationTest do

setup :flush_faktory!

describe "perform_async/3" do
describe "perform_async/2" do
test "should send a new job to faktory" do
faktory_name = :"Test_#{Random.string()}"

Expand All @@ -21,7 +21,8 @@ defmodule FaktoryWorker.JobIntegrationTest do

job = Job.build_payload(DefaultWorker, %{hey: "there!"}, opts)

Job.perform_async(job, opts)
{:ok, job_sent} = Job.perform_async(job, opts)
assert job_sent == job

assert_queue_size("default", 1)

Expand Down
11 changes: 0 additions & 11 deletions test/faktory_worker/job/job_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,4 @@ defmodule FaktoryWorker.Job.JobTest do
end
end

describe "perform_async/3" do
test "should not send a bad payload" do
data = %{hey: "there!"}
opts = [queue: 123]
{:error, _} = payload = Job.build_payload(Test.Worker, data, opts)

{:error, error} = Job.perform_async(TestPipeline, payload, [])

assert error == "The field 'queue' has an invalid value '123'"
end
end
end