Библиотека реализует событийную архитектуру приложений (Event-Driven Architecture). Работает с фреймворком Phalcon 3.x, но при желании можно легко адаптировать под другие фреймворки.
Рабочий пример можно посмотреть вот здесь: https://github.com/chocofamilyme/pubsub/tree/master/examples
- Транзакционное сохранение моделей ORM и публикация события
- Публикация событий без транзакции
- Подписка на события
- Повторная отправка события в ту же очередь при необходимости
- Сохранение в общую очередь всех не обработанных и истекших сообщений. Из этой очереди потом можно сохранить куда-то в БД и обработать индивидуально
- Phalcon 3.x+
- PHP 7.0+
composer require chocofamilyme/pubsub
На данный момент библиотека работает только с RabbitMQ, при желаении можно добавить другие.
return [
'eventsource' => [
'default' => 'rabbitmq',
'drivers' => [
'rabbitmq' => [
'adapter' => 'RabbitMQ',
'hosts' => [
[
'host' => 'host',
'port' => 5672,
'user' => 'user',
'password' => 'password',
'vhost' => '/',
],
],
// Не объязательные параметры
'heartbeat' => 60,
'read_write_timeout' => 60,
'connection_timeout' => 60,
'wait_timeout' => 0,
],
],
],
];
Полный список смотрите - https://github.com/php-amqplib/php-amqplib
$di = \Phalcon\Di::getDefault();
$config = $di->get('config')->eventsource;
$config = $config->drivers[$config->default];
$serviceName = $di->get('config')->domain;
$cache = $di->get('cache');
$di->setShared('eventsource',
function () use ($config, $serviceName, $cache) {
$adapter = $config->adapter;
$config = array_merge($config->toArray(), ['app_id' => $serviceName]);
$class = 'Chocofamily\PubSub\Provider\\'.$adapter;
$repeater = new Repeater($cache);
return $class::getInstance($config, $repeater);
}
);
Здесь $cache
объект реализующий интерефейс Phalcon\Cache\BackendInterface
. Кэш используется для подсчета количества повторной обработки определенного сообщения.
Ключ | Значение | Описание |
---|---|---|
connection | По умолчанию PhpAmqpLib\Connection\AMQPLazyConnection::class | php-amqplib |
connection_timeout | По умолчанию 3.0 (сек) | Максимальное время на соединение с сервером Rabbitmq |
read_write_timeout | По умолчанию 3.0 (сек) | Максимальное время на получение |
heartbeat | По умолчанию 60 (сек) | RabbitMQ Doc |
keepalive | По умолчанию false | RabbitMQ Doc |
context | По умолчанию null | RabbitMQ Doc |
prefetch_count | По умолчанию 1 | RabbitMQ Doc |
no_ack | По умолчанию false | RabbitMQ Doc |
durable | По умолчанию true | RabbitMQ Doc |
exclusive | По умолчанию false | RabbitMQ Doc |
queue | По умолчанию [] | RabbitMQ Doc |
basic_consume_exclusive | По умолчанию false | RabbitMQ Doc |
wait_allowed_methods | По умолчанию null | php-amqplib |
wait_non_blocking | По умолчанию true | php-amqplib |
wait_timeout | По умолчанию 0 | Максимальное время ожидания до получения первого сообщения |
exchange_type | По умолчанию topic | RabbitMQ Doc) |
app_id | По умолчанию '' | Индификатор приложения которое создает сообщение |
sleep | По умолчанию 0.1 секунды | Если wait_non_blocking равно true, то если нету задачи ждать время sleep |
Публиковать сообщения можно используя класс Chocofamily\PubSub\Publisher
. Минимальный рабочий пример:
$publisher = new Publisher($di->getShared('eventsource'));
$payload = [
'event_id' => 11995,
'name' => 'docx',
'age' => 25
];
$routeKey = 'order.created';
$publisher->send($payload, $routeKey);
Для RabbitMQ переменная $routeKey
должна состоять минимум из двух частей разделенных точкой .
. Пример order.created
. Имя Exchange будет содержать первый блок, т.е. order
. После этого если зайдете в админку rabbitmq должен создаться exchange с именем order
.
Обновленно: начиная с версии 2.* можно указать exchange
, которому привяжется маршрут `$routeKey, пример:
$publisher = new Publisher($di->getShared('eventsource'));
$payload = [
'event_id' => 11995,
'name' => 'docx',
'age' => 25
];
$exchange = 'order';
$routeKey = 'order.created';
$publisher->send($payload, $routeKey, $exchange);
Для подписки на события используется класс Chocofamily\PubSub\Subscriber
. Минимальный рабочий пример:
$params = [
'queue_name' => 'restapi_orderx',
];
$taskName = 'your_task_name';
$subscriber = new Subscriber($di->getShared('eventsource'), 'order.created.*', $params, $taskName);
$subscriber->subscribe(function ($headers, $body) {
echo print_r($headers, 1). PHP_EOL;
echo print_r($body, 1). PHP_EOL;
});
Обновленно: начиная с версии 2.* можно указать exchange
и связать с ним маршрут. Теперь можно указать массив
маршрутов. Пример:
$params = [
'queue_name' => 'restapi_orderx',
];
$taskName = 'your_task_name';
$routeKeys = [
'order.created',
'order.payed',
];
$exchange = 'order';
$subscriber = new Subscriber($di->getShared('eventsource'), $routeKeys, $params, $taskName, $exchange);
$subscriber->subscribe(function ($headers, $body) {
echo print_r($headers, 1). PHP_EOL;
echo print_r($body, 1). PHP_EOL;
});
Чтобы обратно отправить сообщение в очередь необходимо в кэлбэк функции кинуть исключение Chocofamily\PubSub\Exceptions\RetryException
. Сообщение может максимум 5 раз обработаться повторно, после этого он попадает в очередь мертвых сообщений (exchange = DLX).
В подписчик можно передавать следующие настройки:
durable: bool — сохранять на диск данные
queue: array — настройки самой очереди
prefetch_count: int — количество предзагрузки сообщений
no_ack: — требуется ли подтверждение сообщений
app_id — уникальный ID приложения. Можно использовать для идентификации откуда собите пошло изначально
Этот способ необходим для атомарности сохранения сущности в БД и публикования события. Следующая картинка хорошо иллюстрирует как это работает:
Для этого необходимо создать таблицу events:
create table events
(
id serial not null
constraint events_pkey
primary key,
type smallint not null,
model_id int not null,
model_type varchar(100) not null,
exchange varchar(100) not null,
routing_key varchar(100) not null,
payload json not null,
status smallint not null,
created_at timestamp default now() not null,
updated_at timestamp
);
Пример использования:
use Chocofamily\PubSub\Services\EventPrepare;
...
$order = new Order([
'user_id' => 11166541,
'status' => 0,
'total' => 5852,
]);
$eventSource = $di->get('eventsource');
$event = new EventPrepare($order, new OrderSerialize(['name' => 'docx']), 1);
$event->up($eventSource, 'order.created.-5');
Модель Order должна реализовывать итерфейс ModelInterface.
Обновленно: начиная с версии 2.* можно указать exchange
и связпть с ним маршрут. Привер:
use Chocofamily\PubSub\Services\EventPrepare;
...
$order = new Order([
'user_id' => 11166541,
'status' => 0,
'total' => 5852,
]);
$eventSource = $di->get('eventsource');
$routeKey = 'order.created.-5';
$exchange = 'order';
$event = new EventPrepare($order, new OrderSerialize(['name' => 'docx']), 1);
$event->up($eventSource, $routeKey, $exchange);
Метод up
работает так
- db transaction start
- order->save();
- eventModel->save()
- db transaction commit
- event publish
Для повторной отправке событие используется класс Chocofamily\PubSub\Services\EventRepeater
. Рабочий пример:
use Chocofamily\PubSub\Services\EventRepeater;
...
$dateStart = \DateTime::createFromFormat('Y-m-d', '2018-01-01');
$eventDataProvider = new Chocofamily\PubSub\Provider\Event($di->get('eventsource'), $dateStart);
try {
$event = new EventRepeater($eventDataProvider);
$event->retry();
} catch (\Exception $e) {
$message = sprintf('%d %s in %s:%s', $e->getCode(), $e->getMessage(), $e->getFile(), $e->getLine());
$di->get('logger')->error($message);
}
Для очистки событий используется класс Chocofamily\PubSub\Services\EventCleaner
с методом clean
.
Рабочий пример:
use Chocofamily\PubSub\Services\EventCleaner;
...
try {
$event = new EventCleaner($di->get('modelsManager'));
$event->clean();
} catch (ModelException $e) {
$message = sprintf('%d %s in %s:%s', $e->getCode(), $e->getMessage(), $e->getFile(), $e->getLine());
$di->get('logger')->error($message);
}
По умолчанию удаляетя событие больше 1 месяца. Если передать дату как второй параметр в конструкторе, то будет удалятся все событие до указонной даты:
use Chocofamily\PubSub\Services\EventCleaner;
...
$dateTime = new \DateTime();
$dateTime = $dateTime->modify('-1 day');
try {
$event = new EventCleaner($di->get('modelsManager'), $dateTime);
$event->clean();
} catch (ModelException $e) {
$message = sprintf('%d %s in %s:%s', $e->getCode(), $e->getMessage(), $e->getFile(), $e->getLine());
$di->get('logger')->error($message);
}
@todo
- Написать интерфейс для транзакций и убрать зависимость от фреймворка