Skip to content

Commit

Permalink
Add policy-based event routing in DataUpdaterPlant
Browse files Browse the repository at this point in the history
Signed-off-by: Arnaldo Cesco <arnaldo.cesco@ispirata.com>
  • Loading branch information
Annopaolo committed May 26, 2021
1 parent b4cd47d commit a006473
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 13 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [1.1] - Unreleased
### Added
- [astarte_trigger_engine] Add a customizable HTTP payload redelivery mechanism via
trigger policies (see [#554](https://github.com/astarte-platform/astarte/issues/554)).

## [1.0.0-beta.2] - 2021-03-24
### Fixed
- [astarte_e2e] Fix alerting mechanism preventing "unknown" failures to be raised or linked.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,25 @@
#
# This file is part of Astarte.
#
# Copyright 2021 Ispirata Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

defmodule Astarte.DataUpdaterPlant.Triggers.PolicyRetriever do
use GenServer

require Logger
# alias Astarte.DataAccess.Database
# alias CQEx.Query, as: DatabaseQuery
# alias CQEx.Result, as: DatabaseResult

# API
def start_link(args \\ []) do
Expand All @@ -29,7 +44,7 @@ defmodule Astarte.DataUpdaterPlant.Triggers.PolicyRetriever do
{:reply, policy_name, Map.put(state, {realm_id, trigger_id}, policy_name)}
else
{:error, _error} ->
Logger.warn(
Logger.info(
"Policy name for trigger #{trigger_id} not found, reverting to default policy"
)

Expand All @@ -42,13 +57,33 @@ defmodule Astarte.DataUpdaterPlant.Triggers.PolicyRetriever do
end

defp retrieve_policy_name(realm_id, trigger_id) do
with {:ok, policy_name} <-
Xandra.Cluster.run(:xandra, fn conn ->
do_retrieve_policy_name(conn, realm_id, trigger_id)
end) do
{:ok, policy_name}
else
{:error, reason} ->
_ =
Logger.warn(
"Error while fetching policy for trigger #{trigger_id} in realm #{realm_id}: #{
inspect(reason)
}.",
tag: "retrieve_policy_name_failed"
)

{:error, reason}
end
end

defp do_retrieve_policy_name(conn, realm_name, trigger_id) do
retrieve_statement =
"SELECT value FROM #{realm_id}.kv_store WHERE group='trigger_to_policy' AND key=:trigger_id;"
"SELECT value FROM #{realm_name}.kv_store WHERE group='trigger_to_policy' AND key=:trigger_id;"

with {:ok, prepared} <-
Xandra.Cluster.prepare(:xandra, retrieve_statement),
Xandra.prepare(conn, retrieve_statement),
{:ok, %Xandra.Page{} = page} <-
Xandra.Cluster.execute(:xandra, prepared, %{"trigger_id" => trigger_id}),
Xandra.execute(conn, prepared, %{"trigger_id" => trigger_id}),
[%{"value" => policy_name}] <- Enum.to_list(page) do
{:ok, policy_name}
else
Expand All @@ -57,7 +92,7 @@ defmodule Astarte.DataUpdaterPlant.Triggers.PolicyRetriever do
{:error, :database_connection_error}

error ->
Logger.warn("Error while fetching policy for trigger #{trigger_id}: #{inspect(error)}")
Logger.warn("Error while fetching policy, #{inspect(error)}")
{:error, :event_processing_error}
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandler do
use Bitwise, only_operators: true
require Logger
alias Astarte.DataUpdaterPlant.Config
alias Astarte.DataUpdaterPlant.Triggers.PolicyRetriever

@moduledoc """
This module handles the triggers by generating the events requested
Expand All @@ -34,6 +33,7 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandler do

alias Astarte.Core.Triggers.SimpleTriggersProtobuf.AMQPTriggerTarget
alias Astarte.DataUpdaterPlant.AMQPEventsProducer
alias Astarte.DataUpdaterPlant.Triggers.PolicyRetriever

def register_target(%AMQPTriggerTarget{exchange: nil} = _target) do
# Default exchange, no need to declare it
Expand Down Expand Up @@ -485,18 +485,18 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandler do
|> :uuid.uuid_to_string()
|> to_string()

policy_name_str = PolicyRetriever.get_policy_name(simple_event.realm, parent_trigger_id_str)

headers = [
{"x_astarte_realm", simple_event.realm},
{"x_astarte_device_id", simple_event.device_id},
{"x_astarte_simple_trigger_id", simple_trigger_id_str},
{"x_astarte_parent_trigger_id", parent_trigger_id_str},
{"x_astarte_event_type", to_string(event_type)},
{"x_astarte_trigger_policy", policy_name_str}
{"x_astarte_event_type", to_string(event_type)}
| static_headers
]

{routing_key, headers} =
update_if_http_trigger(routing_key, headers, simple_event.realm, parent_trigger_id_str)

opts_with_nil = [
expiration: message_expiration_ms && to_string(message_expiration_ms),
priority: message_priority,
Expand All @@ -511,6 +511,8 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandler do

result = wait_ok_publish(exchange, routing_key, payload, [{:headers, headers} | opts])

Logger.debug("headers: #{inspect(headers)}, routing key: #{inspect(routing_key)}")

:telemetry.execute(
[:astarte, :data_updater_plant, :triggers_handler, :published_event],
%{},
Expand All @@ -531,4 +533,35 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandler do

"#{realm_trunc}-#{device_id_trunc}-#{timestamp_hex_str}-#{rnd}"
end

defp update_if_http_trigger(
"trigger_engine",
headers,
realm,
parent_trigger_id_str
) do
policy_name = PolicyRetriever.get_policy_name(realm, parent_trigger_id_str)

headers = [
{"x_astarte_trigger_policy", policy_name}
| headers
]

routing_key = generate_routing_key(realm, policy_name)

{routing_key, headers}
end

defp update_if_http_trigger(
routing_key,
headers,
_realm,
_parent_trigger_id_str
) do
{routing_key, headers}
end

defp generate_routing_key(realm, policy) do
"#{realm}_#{policy}"
end
end
1 change: 1 addition & 0 deletions apps/astarte_data_updater_plant/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
"mime": {:hex, :mime, "1.5.0", "203ef35ef3389aae6d361918bf3f952fa17a09e8e43b5aa592b93eba05d0fb8d", [:mix], [], "hexpm", "55a94c0f552249fc1a3dd9cd2d3ab9de9d3c89b559c2bd01121f824834f24746"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"mox": {:hex, :mox, "0.5.2", "55a0a5ba9ccc671518d068c8dddd20eeb436909ea79d1799e2209df7eaa98b6c", [:mix], [], "hexpm", "df4310628cd628ee181df93f50ddfd07be3e5ecc30232d3b6aadf30bdfe6092b"},
"observer_cli": {:hex, :observer_cli, "1.6.1", "d176f967c978ab8b8a29c35c12524f78b7bb36fd4e9b8276dd75c9cb56e07e42", [:mix, :rebar3], [{:recon, "~>2.5.1", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm", "3418e319764b9dff1f469e43cbdffd7fd54ea47cbf765027c557abd146a19fb3"},
"parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"},
"plug": {:hex, :plug, "1.11.1", "f2992bac66fdae679453c9e86134a4201f6f43a687d8ff1cd1b2862d53c80259", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "23524e4fefbb587c11f0833b3910bfb414bf2e2534d61928e920f54e3a1b881f"},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
#
# This file is part of Astarte.
#
# Copyright 2021 Ispirata Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

defmodule Astarte.DataUpdaterPlant.HTTPTriggersHandlerTest do
use ExUnit.Case

use Astarte.Core.Triggers.SimpleEvents

alias AMQP.Channel
alias AMQP.Connection
alias AMQP.Queue
alias Astarte.Core.Triggers.SimpleTriggersProtobuf.AMQPTriggerTarget
alias Astarte.DataUpdaterPlant.Config
alias Astarte.DataUpdaterPlant.TriggersHandler
alias Astarte.DataUpdaterPlant.DatabaseTestHelper

@introspection "com.My.Interface:1:0;com.Another.Interface:1:2"
@realm "autotestrealm"
@policy1 "@default"
@queue_name1 "#{@realm}_#{@policy1}_queue"
@routing_key1 "#{@realm}_#{@policy1}"
@device_id :crypto.strong_rand_bytes(16) |> Base.url_encode64(padding: false)
@interface "com.Test.Interface"
@major_version 1
@minor_version 1
@path "/some/path"
@bson_value %{v: "testvalue"} |> Cyanide.encode!()
@ip_address "2.3.4.5"

setup_all do
{:ok, conn} = Connection.open(Config.amqp_producer_options!())
{:ok, chan} = Channel.open(conn)
{:ok, _queue} = Queue.declare(chan, @queue_name1)

:ok =
Queue.bind(chan, @queue_name1, Config.events_exchange_name!(), routing_key: @routing_key1)

on_exit(fn ->
Channel.close(chan)
Connection.close(conn)
end)

[chan: chan]
end

test "HTTP event with no trigger policy handling", %{chan: chan} do
test_pid = self()

{:ok, consumer_tag} =
AMQP.Queue.subscribe(chan, @queue_name1, fn payload, meta ->
send(test_pid, {:event, payload, meta})
end)

simple_trigger_id = :uuid.get_v4()
parent_trigger_id = :uuid.get_v4()
static_header_key = "important_metadata_value_change_applied"
static_header_value = "test_meta_value_change_applied"
static_headers = [{static_header_key, static_header_value}]
old_bson_value = %{v: 41} |> Cyanide.encode!()
new_bson_value = %{v: 42} |> Cyanide.encode!()
timestamp = get_timestamp()

target = %AMQPTriggerTarget{
simple_trigger_id: simple_trigger_id,
parent_trigger_id: parent_trigger_id,
static_headers: static_headers,
routing_key: "trigger_engine"
}

TriggersHandler.value_change_applied(
target,
@realm,
@device_id,
@interface,
@path,
old_bson_value,
new_bson_value,
timestamp
)

assert_receive {:event, payload, meta}

assert %SimpleEvent{
device_id: @device_id,
parent_trigger_id: ^parent_trigger_id,
simple_trigger_id: ^simple_trigger_id,
realm: @realm,
timestamp: ^timestamp,
event: {:value_change_applied_event, value_change_applied_event}
} = SimpleEvent.decode(payload)

assert %ValueChangeAppliedEvent{
interface: @interface,
path: @path,
old_bson_value: ^old_bson_value,
new_bson_value: ^new_bson_value
} = value_change_applied_event

headers_map = amqp_headers_to_map(meta.headers)

assert Map.get(headers_map, "x_astarte_realm") == @realm
assert Map.get(headers_map, "x_astarte_trigger_policy") == @policy1
assert Map.get(headers_map, "x_astarte_device_id") == @device_id
assert Map.get(headers_map, "x_astarte_event_type") == "value_change_applied_event"

assert Map.get(headers_map, "x_astarte_simple_trigger_id") |> :uuid.string_to_uuid() ==
simple_trigger_id

assert Map.get(headers_map, "x_astarte_parent_trigger_id") |> :uuid.string_to_uuid() ==
parent_trigger_id

assert Map.get(headers_map, static_header_key) == static_header_value

AMQP.Queue.unsubscribe(chan, consumer_tag)
end

defp amqp_headers_to_map(headers) do
Enum.reduce(headers, %{}, fn {key, _type, value}, acc ->
Map.put(acc, key, value)
end)
end

defp get_timestamp do
DateTime.utc_now()
|> DateTime.to_unix(:microsecond)
end
end

0 comments on commit a006473

Please sign in to comment.