Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get rid of broadway #149

Merged
merged 12 commits into from
Feb 9, 2022
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@ erl_crash.dump
faktory_worker-*.tar

# ignore elixir language server files
/.elixir_ls
/.elixir_ls

.idea/
.DS_Store
.vscode
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,17 @@ $ docker-compose up -d
Creating faktory_worker_test ... done
Creating faktory_worker_test_tls ... done
Creating faktory_worker_password_test ... done
```

$ mix test

Faktory have free open-source solution and enterprise edition.

If you don't have enterprise license then tests will fail on enterprise features (batching operations etc). In this case you can exclude them by tag `:enterprise`
```sh
$ mix test --exclude enterprise
```

If you are enterprise user all tests should pass
```sh
$ mix test
```
4 changes: 2 additions & 2 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use Mix.Config
import Config

if Mix.env() == :test do
config :logger, backends: []
config :logger, backends: [:console]

# Set the worker startup delay to 1ms to ensure tests are speedy
config :faktory_worker, worker_startup_delay: 1
Expand Down
1 change: 0 additions & 1 deletion lib/faktory_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ defmodule FaktoryWorker do
[
{FaktoryWorker.QueueManager, opts},
{FaktoryWorker.Pool, opts},
{FaktoryWorker.PushPipeline, opts},
{FaktoryWorker.JobSupervisor, opts},
{FaktoryWorker.WorkerSupervisor, opts}
]
Expand Down
13 changes: 4 additions & 9 deletions lib/faktory_worker/batch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ defmodule FaktoryWorker.Batch do
all jobs in a batch have completed, Faktory will queue a callback job. This
allows building complex job workflows with dependencies.

Jobs pushed as part of a batch _must_ be pushed synchronously. This can be
done using the `skip_pipeline: true` option when calling `perform_async/2`. If
a job isn't pushed synchronously, you may encounter a race condition where the
batch is committed before all jobs have been pushed.

## Creating a batch

A batch is created using `new!/1` and must provide a description and declare
Expand All @@ -25,9 +20,9 @@ defmodule FaktoryWorker.Batch do
alias FaktoryWorker.Batch

{:ok, bid} = Batch.new!(on_success: {MyApp.EmailReportJob, [], []})
MyApp.Job.perform_async([1, 2], custom: %{"bid" => bid}, skip_pipeline: true)
MyApp.Job.perform_async([3, 4], custom: %{"bid" => bid}, skip_pipeline: true)
MyApp.Job.perform_async([5, 6], custom: %{"bid" => bid}, skip_pipeline: true)
MyApp.Job.perform_async([1, 2], custom: %{"bid" => bid})
MyApp.Job.perform_async([3, 4], custom: %{"bid" => bid})
MyApp.Job.perform_async([5, 6], custom: %{"bid" => bid})
Batch.commit(bid)
```

Expand All @@ -45,7 +40,7 @@ defmodule FaktoryWorker.Batch do
def perform(arg1, arg2, bid) do
Batch.open(bid)

MyApp.OtherJob.perform_async([1, 2], custom: %{"bid" => bid}, skip_pipeline: true)
MyApp.OtherJob.perform_async([1, 2], custom: %{"bid" => bid})

Batch.commit(bid)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/faktory_worker/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ defmodule FaktoryWorker.Connection do

defp put_worker_args(args, opts) do
process_wid = Keyword.get(opts, :process_wid)
sys_pid = System.get_pid()
sys_pid = System.pid()
{:ok, hostname} = :inet.gethostname()

worker_args = %{
Expand Down
47 changes: 10 additions & 37 deletions lib/faktory_worker/job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,10 @@ defmodule FaktoryWorker.Job do

## Synchronous job pushing

By default, jobs are pushed asynchronously to the Faktory server. To ensure a
job has been successfully submitted before continuing, jobs can be pushed
synchronously instead. To do this, pass the `:skip_pipeline` option with the
value of `true` to `perform_async/2`.

Synchronous pushing is required in certain situations to guarantee ordering,
such as when using the Faktory Enterprise batching feature.
Previous version used Broadway to send jobs and `:skip_pipeline` parameter was used to do it synchronously.
`:skip_pipeline` is not supported anymore.
Since Batch operations is a feature of Faktory Enterprise this library now sends any single job synchronously
and makes HTTP call to faktory server (see `FaktoryWorker.Batch`).

## Worker Configuration

Expand All @@ -88,7 +85,7 @@ defmodule FaktoryWorker.Job do
means only values that implement the `Jason.Encoder` protocol are valid when calling the `perform_async/2` function.
"""

alias FaktoryWorker.{ConnectionManager, Random, Pool, Sandbox, Telemetry}
alias FaktoryWorker.{ConnectionManager, Random, Pool, Telemetry, Sandbox}

# Look at supporting the following optional fields when pushing a job
# priority
Expand Down Expand Up @@ -128,36 +125,17 @@ defmodule FaktoryWorker.Job do

@doc false
def perform_async(payload, opts) do
case Keyword.get(opts, :skip_pipeline, false) do
false ->
opts
|> push_pipeline_name()
|> perform_async(payload, opts)

true ->
opts
|> faktory_name()
|> push(payload)
end
end

@doc false
def perform_async(_, {:error, _} = error, _), do: error

def perform_async(pipeline_name, payload, opts) do
if Sandbox.active?() do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to keep the sandbox, it is what we use to simulate the server during testing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeremyowensboggs makes sense, I overdid it with simplifications

Sandbox.enqueue_job(
String.to_existing_atom("Elixir." <> payload.jobtype),
payload.args,
opts
)
{:ok, payload}
else
message = %Broadway.Message{
acknowledger: {FaktoryWorker.PushPipeline.Acknowledger, :push_message, []},
data: {pipeline_name, payload}
}

Broadway.push_messages(pipeline_name, [message])
opts
|> faktory_name()
|> push(payload)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching from Broadway.push_messages to push is going to change the return of this function. From the PR discussion, I gather we are planning on doing a version change.

  1. Are there additional changes needed to this code base to signal that change?
  2. Does module/package documentation need to be updated?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this PR, just documenting a thought - but if we can return a Job with the faktory job id here, that would be a great gateway into being able to check on the status of a job.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Broadway.push_messages and Sandbox.enqueue_job both always return :ok, so I think it should be OK to have this return {:ok, job_id} instead.

+1 for updating documentation as well

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like we don't need perform_async/3 at all and we can return same result for any call. private function handle_push_result prepares result and it returns {:ok, job}.
I am not sure is it better to change to {:ok, job_id} or leave it as is. job contains a bit more info, see sample below:

{:ok,
 %{
   args: [%{hey: "there!"}],
   jid: "cc7cb6ec723b1e1f35815ba1",
   jobtype: "FaktoryWorker.DefaultWorker"
 }}

Copy link
Contributor

@superhawk610 superhawk610 Jan 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be OK to return it as-is. Can you also update Sandbox.enqueue_job to return a similar value? You can just generate a random jid.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@superhawk610 we have changed return to {:ok, job}, but I checked our app and I see it always expect :ok.
Question is -> revert changes and return :ok as before, change app and it's tests to have support of {:ok, job). or 3rd option: add configuration option and return optionally :ok to keep back compatibilities and return by default {:ok, job}.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's change our app to accept {:ok, job}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure that this is mentioned in the upgrade guide as well, as it's a breaking change.

end
end

Expand All @@ -170,6 +148,7 @@ defmodule FaktoryWorker.Job do
end

@doc false
def push(_, invalid_payload = {:error, _}), do: invalid_payload
def push(faktory_name, job) do
faktory_name
|> Pool.format_pool_name()
Expand Down Expand Up @@ -215,12 +194,6 @@ defmodule FaktoryWorker.Job do
"The field '#{Atom.to_string(field)}' has an invalid value '#{inspect(value)}'"
end

defp push_pipeline_name(opts) do
opts
|> faktory_name()
|> FaktoryWorker.PushPipeline.format_pipeline_name()
end

defp faktory_name(opts) do
Keyword.get(opts, :faktory_name, FaktoryWorker)
end
Expand Down
45 changes: 0 additions & 45 deletions lib/faktory_worker/push_pipeline.ex

This file was deleted.

21 changes: 0 additions & 21 deletions lib/faktory_worker/push_pipeline/acknowledger.ex

This file was deleted.

21 changes: 0 additions & 21 deletions lib/faktory_worker/push_pipeline/consumer.ex

This file was deleted.

19 changes: 0 additions & 19 deletions lib/faktory_worker/push_pipeline/producer.ex

This file was deleted.

1 change: 0 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ defmodule FaktoryWorker.MixProject do

defp deps do
[
{:broadway, "~> 1.0.0"},
{:certifi, "~> 2.5"},
{:excoveralls, "~> 0.10", only: :test},
{:jason, "~> 1.1"},
Expand Down
2 changes: 1 addition & 1 deletion test/faktory_worker/connection_manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ defmodule FaktoryWorker.ConnectionManagerTest do
{{:ok, result}, _} = ConnectionManager.send_command(state, {:push, payload})

assert result == "halt reason"
end) =~ "[warn] [123456] Halt: halt reason"
end) |> String.match?(~r/\[warn.*(?<!ing)\]*\[123456\]*[Halt: halt reason]/)
end

test "should unset the connection when there is a socket failure" do
Expand Down
6 changes: 2 additions & 4 deletions test/faktory_worker/job/batch_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ defmodule FaktoryWorker.BatchIntegrationTest do

job_opts = [
faktory_name: faktory_name,
custom: %{"bid" => bid},
skip_pipeline: true
custom: %{"bid" => bid}
]

DefaultWorker.perform_async(["1"], job_opts)
Expand Down Expand Up @@ -72,8 +71,7 @@ defmodule FaktoryWorker.BatchIntegrationTest do

job_opts = [
faktory_name: faktory_name,
custom: %{"bid" => bid},
skip_pipeline: true
custom: %{"bid" => bid}
]

DefaultWorker.perform_async(["1"], job_opts)
Expand Down
5 changes: 3 additions & 2 deletions test/faktory_worker/job/job_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule FaktoryWorker.JobIntegrationTest do

setup :flush_faktory!

describe "perform_async/3" do
describe "perform_async/2" do
test "should send a new job to faktory" do
faktory_name = :"Test_#{Random.string()}"

Expand All @@ -21,7 +21,8 @@ defmodule FaktoryWorker.JobIntegrationTest do

job = Job.build_payload(DefaultWorker, %{hey: "there!"}, opts)

Job.perform_async(job, opts)
{:ok, job_sent} = Job.perform_async(job, opts)
assert job_sent == job

assert_queue_size("default", 1)

Expand Down
11 changes: 0 additions & 11 deletions test/faktory_worker/job/job_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,4 @@ defmodule FaktoryWorker.Job.JobTest do
end
end

describe "perform_async/3" do
test "should not send a bad payload" do
data = %{hey: "there!"}
opts = [queue: 123]
{:error, _} = payload = Job.build_payload(Test.Worker, data, opts)

{:error, error} = Job.perform_async(TestPipeline, payload, [])

assert error == "The field 'queue' has an invalid value '123'"
end
end
end
Loading