Skip to content

Commit

Permalink
include queue/duration in telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
superhawk610 committed Jun 28, 2023
1 parent dbcffbd commit 1fd7b8f
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 64 deletions.
98 changes: 66 additions & 32 deletions lib/faktory_worker/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,17 @@ defmodule FaktoryWorker.Telemetry do

require Logger

@events [:push, :beat, :fetch, :ack, :failed_ack, :job_timeout, :batch_new, :batch_open, :batch_commit]
@events [
:push,
:beat,
:fetch,
:ack,
:failed_ack,
:job_timeout,
:batch_new,
:batch_open,
:batch_commit
]

@doc false
@spec attach_default_handler :: :ok | {:error, :already_exists}
Expand Down Expand Up @@ -31,15 +41,15 @@ defmodule FaktoryWorker.Telemetry do
# Push events

defp log_event(:push, %{status: :ok}, job) do
log_info("Enqueued", job.jid, job.args, job.jobtype)
log(:info, "Enqueued", job.jid, job.args, job.jobtype, job[:queue])
end

defp log_event(:push, %{status: {:error, :not_unique}}, job) do
log_warn("NOTUNIQUE", job.jid, job.args, job.jobtype)
log(:warn, "NOTUNIQUE", job.jid, job.args, job.jobtype, job[:queue])
end

defp log_event(:push, %{status: {:error, :timeout}}, job) do
log_info("Push Timeout", job.jid, job.args, job.jobtype)
log(:info, "Push Timeout", job.jid, job.args, job.jobtype, job[:queue])
end

# Beat events
Expand All @@ -50,76 +60,100 @@ defmodule FaktoryWorker.Telemetry do
end

defp log_event(:beat, %{status: :ok}, %{wid: wid}) do
log_info("Heartbeat Succeeded", wid)
log(:info, "Heartbeat Succeeded", wid)
end

defp log_event(:beat, %{status: :error}, %{wid: wid}) do
log_warn("Heartbeat Failed", wid)
log(:warn, "Heartbeat Failed", wid)
end

# Fetch events

defp log_event(:fetch, %{status: {:error, reason}}, %{wid: wid}) do
log_info("Failed to fetch job due to '#{reason}'", wid)
log(:info, "Failed to fetch job due to '#{reason}'", wid)
end

# Acks

defp log_event(:ack, %{status: :ok}, job) do
log_info("Succeeded", job.jid, job.args, job.jobtype)
duration = format_duration(job.duration)
log(:info, "Succeeded after #{duration}", job.jid, job.args, job.jobtype, job.queue)
end

defp log_event(:ack, %{status: :error}, job) do
log_error("Failed", job.jid, job.args, job.jobtype)
duration = format_duration(job.duration)
log(:error, "Failed after #{duration}", job.jid, job.args, job.jobtype, job.queue)
end

# Failed acks

defp log_event(:failed_ack, %{status: :ok}, job) do
log_warn("Error sending 'ACK' acknowledgement to faktory", job.jid, job.args, job.jobtype)
log(
:warn,
"Error sending 'ACK' acknowledgement to faktory",
job.jid,
job.args,
job.jobtype,
job.queue
)
end

defp log_event(:failed_ack, %{status: :error}, job) do
log_error("Error sending 'FAIL' acknowledgement to faktory", job.jid, job.args, job.jobtype)
log(
:error,
"Error sending 'FAIL' acknowledgement to faktory",
job.jid,
job.args,
job.jobtype,
job.queue
)
end

# Misc events

defp log_event(:job_timeout, _, job) do
log_error("Job has reached its reservation timeout and will be failed", job.jid, job.args, job.jobtype)
log(
:error,
"Job has reached its reservation timeout and will be failed",
job.jid,
job.args,
job.jobtype,
job.queue
)
end

# Log formats

defp log_info(message) do
Logger.info("[faktory-worker] #{message}")
end
defp log(:info, message), do: Logger.info("[faktory-worker] #{message}")
defp log(:warn, message), do: Logger.warning("[faktory-worker] #{message}")
defp log(:error, message), do: Logger.error("[faktory-worker] #{message}")

defp log_info(outcome, wid) do
log_info("#{outcome} wid-#{wid}")
defp log(level, outcome, wid) do
log(level, "#{outcome} wid-#{wid}")
end

defp log_info(outcome, jid, args, worker_module) do
log_info("#{outcome} (#{worker_module}) jid-#{jid} #{inspect(args)}")
defp log(level, outcome, jid, args, worker_module, default) when default in ["default", nil] do
log(level, "#{outcome} (#{worker_module}) jid-#{jid} #{inspect(args)}")
end

def log_warn(message) do
Logger.warn("[faktory-worker] #{message}")
defp log(level, outcome, jid, args, worker_module, queue) do
log(level, "#{outcome} (#{worker_module}) [#{queue}] jid-#{jid} #{inspect(args)}")
end

defp log_warn(outcome, wid) do
log_warn("#{outcome} wid-#{wid}")
end

defp log_warn(outcome, jid, args, worker_module) do
log_warn("#{outcome} (#{worker_module}) jid-#{jid} #{inspect(args)}")
end
defp format_duration(ms) when ms < 1_000, do: "#{ms}ms"
defp format_duration(ms) when ms < 60_000, do: "#{ms / 1_000}s"

def log_error(message) do
Logger.error("[faktory-worker] #{message}")
defp format_duration(ms) when ms < 3_600_000 do
minutes = ms / 1_000 / 60
m = floor(minutes)
rest = round((minutes - m) * 60 * 1_000)
"#{m}m #{format_duration(rest)}"
end

defp log_error(outcome, jid, args, worker_module) do
log_error("#{outcome} (#{worker_module}) jid-#{jid} #{inspect(args)}")
defp format_duration(ms) do
hours = ms / 1_000 / 60 / 60
h = floor(hours)
rest = round((hours - h) * 60 * 60 * 1_000)
"#{h}h #{format_duration(rest)}"
end
end
16 changes: 13 additions & 3 deletions lib/faktory_worker/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ defmodule FaktoryWorker.Worker do
:job_id,
:job,
:job_timeout_ref,
:job_start,
:retry_interval
]

Expand Down Expand Up @@ -87,7 +88,8 @@ defmodule FaktoryWorker.Worker do
Telemetry.execute(:job_timeout, {:error, :job_timeout}, %{
jid: state.job_id,
args: state.job["args"],
jobtype: state.job["jobtype"]
jobtype: state.job["jobtype"],
queue: state.job["queue"]
})

state
Expand Down Expand Up @@ -135,6 +137,7 @@ defmodule FaktoryWorker.Worker do
|> String.split(".")
|> Module.safe_concat()

job_start = System.monotonic_time(:millisecond)
job_ref = JobSupervisor.async_nolink(job_supervisor, job_module, job["args"])

reserve_for_seconds = Map.get(job, "reserve_for", @faktory_default_reserve_for)
Expand All @@ -145,6 +148,7 @@ defmodule FaktoryWorker.Worker do
state
| worker_state: :running_job,
job_timeout_ref: timeout_ref,
job_start: job_start,
job_ref: job_ref,
job_id: job["jid"],
job: job
Expand Down Expand Up @@ -187,7 +191,9 @@ defmodule FaktoryWorker.Worker do
Telemetry.execute(:ack, ack_type, %{
jid: state.job_id,
args: state.job["args"],
jobtype: state.job["jobtype"]
jobtype: state.job["jobtype"],
queue: state.job["queue"],
duration: System.monotonic_time(:millisecond) - state.job_start
})

cancel_timer(state.job_timeout_ref)
Expand All @@ -197,6 +203,7 @@ defmodule FaktoryWorker.Worker do
| worker_state: :ok,
queues: nil,
job_timeout_ref: nil,
job_start: nil,
job_ref: nil,
job_id: nil,
job: nil
Expand All @@ -207,7 +214,9 @@ defmodule FaktoryWorker.Worker do
Telemetry.execute(:failed_ack, ack_type, %{
jid: state.job_id,
args: state.job["args"],
jobtype: state.job["jobtype"]
jobtype: state.job["jobtype"],
queue: state.job["queue"],
duration: System.monotonic_time(:millisecond) - state.job_start
})

cancel_timer(state.job_timeout_ref)
Expand All @@ -217,6 +226,7 @@ defmodule FaktoryWorker.Worker do
| worker_state: :ok,
queues: nil,
job_timeout_ref: nil,
job_start: nil,
job_ref: nil,
job_id: nil,
job: nil
Expand Down
69 changes: 40 additions & 29 deletions test/faktory_worker/telemetry_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ defmodule FaktoryWorker.TelemetryTest do
end)

assert log_message =~
"[faktory-worker] Enqueued (#{metadata.jobtype}) jid-#{metadata.jid} #{
inspect(metadata.args)
}"
"[faktory-worker] Enqueued (#{metadata.jobtype}) jid-#{metadata.jid} #{inspect(metadata.args)}"
end

test "should log a not unique job event" do
Expand All @@ -90,9 +88,7 @@ defmodule FaktoryWorker.TelemetryTest do
end)

assert log_message =~
"[faktory-worker] NOTUNIQUE (#{metadata.jobtype}) jid-#{metadata.jid} #{
inspect(metadata.args)
}"
"[faktory-worker] NOTUNIQUE (#{metadata.jobtype}) jid-#{metadata.jid} #{inspect(metadata.args)}"
end

test "should log a successful beat when the previous beat failed" do
Expand Down Expand Up @@ -156,9 +152,7 @@ defmodule FaktoryWorker.TelemetryTest do
end)

assert log_message =~
"[faktory-worker] Failed to fetch job due to 'Shutdown in progress' wid-#{
metadata.wid
}"
"[faktory-worker] Failed to fetch job due to 'Shutdown in progress' wid-#{metadata.wid}"
end

test "should log a successful job ack event" do
Expand All @@ -167,7 +161,9 @@ defmodule FaktoryWorker.TelemetryTest do
metadata = %{
jid: Random.job_id(),
args: %{hey: "there!"},
jobtype: "TestQueueWorker"
jobtype: "TestQueueWorker",
duration: 32_400,
queue: "default"
}

log_message =
Expand All @@ -176,9 +172,7 @@ defmodule FaktoryWorker.TelemetryTest do
end)

assert log_message =~
"[faktory-worker] Succeeded (#{metadata.jobtype}) jid-#{metadata.jid} #{
inspect(metadata.args)
}"
"[faktory-worker] Succeeded after 32.4s (#{metadata.jobtype}) jid-#{metadata.jid} #{inspect(metadata.args)}"
end

test "should log a failed job ack event" do
Expand All @@ -187,7 +181,9 @@ defmodule FaktoryWorker.TelemetryTest do
metadata = %{
jid: Random.job_id(),
args: %{hey: "there!"},
jobtype: "TestQueueWorker"
jobtype: "TestQueueWorker",
duration: 1_000,
queue: "default"
}

log_message =
Expand All @@ -196,9 +192,7 @@ defmodule FaktoryWorker.TelemetryTest do
end)

assert log_message =~
"[faktory-worker] Failed (#{metadata.jobtype}) jid-#{metadata.jid} #{
inspect(metadata.args)
}"
"[faktory-worker] Failed after 1.0s (#{metadata.jobtype}) jid-#{metadata.jid} #{inspect(metadata.args)}"
end

test "should log a failed 'ACK' ack event" do
Expand All @@ -207,7 +201,8 @@ defmodule FaktoryWorker.TelemetryTest do
metadata = %{
jid: Random.job_id(),
args: %{hey: "there!"},
jobtype: "TestQueueWorker"
jobtype: "TestQueueWorker",
queue: "default"
}

log_message =
Expand All @@ -216,9 +211,7 @@ defmodule FaktoryWorker.TelemetryTest do
end)

assert log_message =~
"[faktory-worker] Error sending 'ACK' acknowledgement to faktory (#{
metadata.jobtype
}) jid-#{metadata.jid} #{inspect(metadata.args)}"
"[faktory-worker] Error sending 'ACK' acknowledgement to faktory (#{metadata.jobtype}) jid-#{metadata.jid} #{inspect(metadata.args)}"
end

test "should log a failed 'FAIL' ack event" do
Expand All @@ -227,7 +220,8 @@ defmodule FaktoryWorker.TelemetryTest do
metadata = %{
jid: Random.job_id(),
args: %{hey: "there!"},
jobtype: "TestQueueWorker"
jobtype: "TestQueueWorker",
queue: "default"
}

log_message =
Expand All @@ -236,16 +230,35 @@ defmodule FaktoryWorker.TelemetryTest do
end)

assert log_message =~
"[faktory-worker] Error sending 'FAIL' acknowledgement to faktory (#{
metadata.jobtype
}) jid-#{metadata.jid} #{inspect(metadata.args)}"
"[faktory-worker] Error sending 'FAIL' acknowledgement to faktory (#{metadata.jobtype}) jid-#{metadata.jid} #{inspect(metadata.args)}"
end

test "should include the queue name when not `default`" do
outcome = %{status: :ok}

metadata = %{
jid: Random.job_id(),
args: %{hey: "there!"},
jobtype: "TestQueueWorker",
duration: 2_300,
queue: "queue"
}

log_message =
capture_log(fn ->
Telemetry.handle_event([:faktory_worker, :ack], outcome, metadata, [])
end)

assert log_message =~
"[faktory-worker] Succeeded after 2.3s (#{metadata.jobtype}) [#{metadata.queue}] jid-#{metadata.jid} #{inspect(metadata.args)}"
end

test "should log job timeouts" do
metadata = %{
jid: Random.job_id(),
args: %{hey: "there!"},
jobtype: "TestQueueWorker"
jobtype: "TestQueueWorker",
queue: "default"
}

log_message =
Expand All @@ -254,9 +267,7 @@ defmodule FaktoryWorker.TelemetryTest do
end)

assert log_message =~
"[faktory-worker] Job has reached its reservation timeout and will be failed (#{
metadata.jobtype
}) jid-#{metadata.jid} #{inspect(metadata.args)}"
"[faktory-worker] Job has reached its reservation timeout and will be failed (#{metadata.jobtype}) jid-#{metadata.jid} #{inspect(metadata.args)}"
end
end
end

0 comments on commit 1fd7b8f

Please sign in to comment.