Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(webhook): Break down SendWebhookJob into 2 jobs (one creates the models, the other sends HTTP) #1993

Merged
merged 4 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions app/jobs/send_http_webhook_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# frozen_string_literal: true

class SendHttpWebhookJob < ApplicationJob
def perform(webhook)
Webhooks::SendHttpService.call(webhook:)
end
end
9 changes: 8 additions & 1 deletion app/jobs/send_webhook_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ class SendWebhookJob < ApplicationJob
def perform(webhook_type, object, options = {}, webhook_id = nil)
raise(NotImplementedError) unless WEBHOOK_SERVICES.include?(webhook_type)

WEBHOOK_SERVICES.fetch(webhook_type).new(object:, options:, webhook_id:).call
# NOTE: This condition is only temporary to handle enqueued jobs
# TODO: Remove this condition after queued jobs are processed
if webhook_id
SendHttpWebhookJob.perform_later(Webhook.find(webhook_id))
return
end

WEBHOOK_SERVICES.fetch(webhook_type).new(object:, options:).call
end
end
36 changes: 36 additions & 0 deletions app/models/webhook.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,47 @@ class Webhook < ApplicationRecord
belongs_to :webhook_endpoint
belongs_to :object, polymorphic: true, optional: true

# TODO: Use relation to be able to eager load
delegate :organization, to: :webhook_endpoint

enum status: STATUS

def self.ransackable_attributes(_auth_object = nil)
%w[id webhook_type]
end

def generate_headers
signature = case webhook_endpoint.signature_algo&.to_sym
when :jwt
jwt_signature
when :hmac
hmac_signature
end

{
'X-Lago-Signature' => signature,
'X-Lago-Signature-Algorithm' => webhook_endpoint.signature_algo.to_s,
'X-Lago-Unique-Key' => id
}
end

def jwt_signature
JWT.encode(
{
data: payload.to_json,
iss: issuer
},
RsaPrivateKey,
'RS256',
)
end

def hmac_signature
hmac = OpenSSL::HMAC.digest('sha-256', organization.api_key, payload.to_json)
Base64.strict_encode64(hmac)
end

def issuer
ENV['LAGO_API_URL']
end
end
113 changes: 9 additions & 104 deletions app/services/webhooks/base_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@
module Webhooks
# NOTE: Abstract Service, should not be used directly
class BaseService
def initialize(object:, options: {}, webhook_id: nil)
def initialize(object:, options: {})
@object = object
@options = options&.with_indifferent_access
@webhook_id = webhook_id
end

def call
return resend if webhook_id.present?
return if current_organization.webhook_endpoints.none?

payload = {
Expand All @@ -21,27 +19,17 @@ def call
object_type => object_serializer.serialize
}

# TODO: Wrap in transaction so we create all webhook models or none
# Ensure the http jobs are dispatched after the transaction is committed
current_organization.webhook_endpoints.each do |webhook_endpoint|
webhook = initialize_webhook(webhook_endpoint, payload)
send_webhook(webhook, webhook_endpoint, payload)
webhook = create_webhook(webhook_endpoint, payload)
SendHttpWebhookJob.perform_later(webhook)
end
end

def resend
webhook = Webhook.find_by(id: webhook_id)
return if webhook.blank?

webhook.retries += 1 if webhook.failed?
webhook.last_retried_at = Time.zone.now if webhook.retries.positive?
webhook.endpoint = webhook.webhook_endpoint.webhook_url

payload = JSON.parse(webhook.payload)
send_webhook(webhook, webhook.webhook_endpoint, payload)
end

private

attr_reader :object, :options, :webhook_id
attr_reader :object, :options

def object_serializer
# Empty
Expand All @@ -59,99 +47,16 @@ def object_type
# Empty
end

def send_webhook(webhook, webhook_endpoint, payload)
http_client = LagoHttpClient::Client.new(webhook_endpoint.webhook_url)
headers = generate_headers(webhook.id, webhook_endpoint, payload)
response = http_client.post_with_response(payload, headers)

succeed_webhook(webhook, response)
rescue LagoHttpClient::HttpError,
Net::OpenTimeout,
Net::ReadTimeout,
Net::HTTPBadResponse,
Errno::ECONNRESET,
Errno::ECONNREFUSED,
Errno::EPIPE,
OpenSSL::SSL::SSLError,
SocketError,
EOFError => e
fail_webhook(webhook, e)

# NOTE: By default, Lago is retrying 3 times a webhook
return if webhook.retries >= ENV.fetch('LAGO_WEBHOOK_ATTEMPTS', 3).to_i

SendWebhookJob.set(wait: wait_value(webhook))
.perform_later(webhook_type, object, options, webhook.id)
end

def generate_headers(webhook_id, webhook_endpoint, payload)
signature = case webhook_endpoint.signature_algo&.to_sym
when :jwt
jwt_signature(payload)
when :hmac
hmac_signature(payload)
end

{
'X-Lago-Signature' => signature,
'X-Lago-Signature-Algorithm' => webhook_endpoint.signature_algo.to_s,
'X-Lago-Unique-Key' => webhook_id
}
end

def jwt_signature(payload)
JWT.encode(
{
data: payload.to_json,
iss: issuer
},
RsaPrivateKey,
'RS256',
)
end

def hmac_signature(payload)
hmac = OpenSSL::HMAC.digest('sha-256', current_organization.api_key, payload.to_json)
Base64.strict_encode64(hmac)
end

def issuer
ENV['LAGO_API_URL']
end

def initialize_webhook(webhook_endpoint, payload)
def create_webhook(webhook_endpoint, payload)
webhook = Webhook.new(webhook_endpoint:)
webhook.webhook_type = webhook_type
webhook.endpoint = webhook_endpoint.webhook_url
# Question: When can this be a hash?
webhook.object_id = object.is_a?(Hash) ? object.fetch(:id, nil) : object&.id
webhook.object_type = object.is_a?(Hash) ? object.fetch(:class, nil) : object&.class&.to_s
webhook.payload = payload.to_json
webhook.retries += 1 if webhook.failed?
webhook.last_retried_at = Time.zone.now if webhook.retries.positive?
webhook.payload = payload
webhook.pending!
webhook
end

def succeed_webhook(webhook, response)
webhook.http_status = response&.code&.to_i
webhook.response = response&.body.presence || {}
webhook.succeeded!
end

def fail_webhook(webhook, error)
if error.is_a?(LagoHttpClient::HttpError)
webhook.http_status = error.error_code
webhook.response = error.error_body
else
webhook.response = error.message
end
webhook.failed!
end

def wait_value(webhook)
# NOTE: This is based on the Rails Active Job wait algorithm
executions = webhook.retries
((executions**4) + (Kernel.rand * (executions**4) * 0.15)) + 2
end
end
end
7 changes: 1 addition & 6 deletions app/services/webhooks/retry_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,7 @@ def call
return result.not_found_failure!(resource: 'webhook') unless webhook
return result.not_allowed_failure!(code: 'is_succeeded') if webhook.succeeded?

SendWebhookJob.perform_later(
webhook.webhook_type,
webhook.object,
{},
webhook.id,
)
SendHttpWebhookJob.perform_later(webhook)

result.webhook = webhook
result
Expand Down
66 changes: 66 additions & 0 deletions app/services/webhooks/send_http_service.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# frozen_string_literal: true

module Webhooks
class SendHttpService < ::BaseService
def initialize(webhook:)
@webhook = webhook

super
end

def call
webhook.endpoint = webhook.webhook_endpoint.webhook_url

http_client = LagoHttpClient::Client.new(webhook.webhook_endpoint.webhook_url)
response = http_client.post_with_response(webhook.payload, webhook.generate_headers)

mark_webhook_as_succeeded(response)
rescue LagoHttpClient::HttpError,
Net::OpenTimeout,
Net::ReadTimeout,
Net::HTTPBadResponse,
Errno::ECONNRESET,
Errno::ECONNREFUSED,
Errno::EPIPE,
OpenSSL::SSL::SSLError,
SocketError,
EOFError => e
mark_webhook_as_failed(e)

# NOTE: By default, Lago is retrying 3 times a webhook
return if webhook.retries >= ENV.fetch('LAGO_WEBHOOK_ATTEMPTS', 3).to_i

SendHttpWebhookJob.set(wait: wait_value).perform_later(webhook)
end

private

attr_reader :webhook

def mark_webhook_as_succeeded(response)
webhook.http_status = response&.code&.to_i
webhook.response = response&.body.presence || {}
webhook.status = :succeeded
webhook.save!
end

def mark_webhook_as_failed(error)
if error.is_a?(LagoHttpClient::HttpError)
webhook.http_status = error.error_code
webhook.response = error.error_body
else
webhook.response = error.message
end
webhook.retries += 1
webhook.last_retried_at = Time.zone.now
webhook.status = :failed
webhook.save!
end

def wait_value
# NOTE: This is based on the Rails Active Job wait algorithm
executions = webhook.retries
((executions**4) + (Kernel.rand * (executions**4) * 0.15)) + 2
end
end
end
Loading
Loading