From 732da541fe49835d3c02a079c1acd963a949b4b9 Mon Sep 17 00:00:00 2001 From: Marc Date: Tue, 6 Feb 2024 08:49:18 +0100 Subject: [PATCH 1/4] add protobuf definition for persisted streams --- .github/workflows/maven.yml | 1 + pom.xml | 2 +- src/main/proto/streams.proto | 125 +++++++++++++++++++++++++++++++++++ 3 files changed, 127 insertions(+), 1 deletion(-) create mode 100644 src/main/proto/streams.proto diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index b5a940f..f99cec3 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -5,6 +5,7 @@ on: branches: - master - axon-server-api-*.*.x + - persisted-streams workflow_dispatch: jobs: diff --git a/pom.xml b/pom.xml index a0662cd..fcf841f 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ 4.0.0 io.axoniq axon-server-api - 2023.0.2-SNAPSHOT + 2024.0.0-SNAPSHOT Axon Server API Public API for communication with AxonServer diff --git a/src/main/proto/streams.proto b/src/main/proto/streams.proto new file mode 100644 index 0000000..c5c3504 --- /dev/null +++ b/src/main/proto/streams.proto @@ -0,0 +1,125 @@ +syntax = "proto3"; +package io.axoniq.axonserver.grpc.streams; +import "event.proto"; +import "google/protobuf/empty.proto"; +option java_multiple_files = true; + +/* Service providing operations for persisted event streams, event streams where Axon Server keeps + track of the progress. All operations require a header (AxonIQ-Context) to be passed with each request to define + the context. + */ +service EventStreamService { + /* Open a persisted event stream connection from a client. Creates the stream if it does not exist. */ + rpc OpenStream(stream StreamCommand) returns (stream StreamSignal) { + + } + + /* Deletes a persisted event stream. All existing connections to the stream are closed. */ + rpc DeleteStream(DeleteStreamRequest) returns (stream google.protobuf.Empty) { + + } + + /* Change properties of a persisted event stream. */ + rpc UpdateStream(UpdateStreamRequest) returns (stream google.protobuf.Empty) { + + } + + /* Returns a list of all persisted event streams defined (for the context). For each event stream it returns + the progress per segment. */ + rpc ListStreams( google.protobuf.Empty) returns (stream StreamStatus) { + + } + + /* Returns the clients connected to all persisted event streams defined (for the context). For each client it returns + the segments that are received by the client. */ + rpc ListConnections( google.protobuf.Empty) returns (stream StreamConnections) { + + } +} + +/* Contains the current status of a persisted stream */ +message StreamStatus { + string stream_id = 1; /* the unique identification of the stream */ + string stream_name = 2; /* a name for the stream */ + SequencingPolicy sequencing_policy = 3; /* the policy used to distribute events across segments. */ + string filter = 4; /* an expression to filter events */ + repeated SegmentPosition segments = 5; /* the last confirmed position per segment */ +} + +/* Contains the position per segment */ +message SegmentPosition { + int32 segment = 1; /* the segment number */ + int64 position = 2; /* the last confirmed position */ +} + +/* Contains the current connections for a persisted stream */ +message StreamConnections { + string stream_id = 1; /* the unique identification of the stream */ + string stream_name = 2; /* a name for the stream */ + repeated StreamConnection segments_per_connection = 3; /* the segments held per client */ +} + +/* Contains the segments per client */ +message StreamConnection { + string client_id = 1; /* the unique identification of the client */ + repeated int32 segments = 2; /* a list of segment numbers */ +} + +/* Request to set up a connection to a stream. Clients should first submit an OpenRequest on this connection + to connect to a stream and can then submit ProgressRequest messages to report progress. */ +message StreamCommand { + oneof request { + OpenRequest open = 1; /* the initial message to connect to a stream */ + ProgressRequest progress = 2; /* sends progress in processing events to Axon Server */ + } +} + +/* Request to delete a persisted stream */ +message DeleteStreamRequest { + string stream_id = 1; /* the unique identification of the stream */ +} + +/* Request to update the properties of a persisted stream */ +message UpdateStreamRequest { + string stream_id = 1; /* the unique identification of the stream */ + int32 segments = 2; /* Request to change the number of segments */ + string stream_name = 3; /* Request to change the name of the stream */ +} + + +/* Request to open a connection to a persisted stream */ +message OpenRequest { + string stream_id = 1; /* the unique identification of the stream */ + string client_id = 2; /* the unique identification of the client */ + InitializationProperties initialization_properties = 3; /* properties to create the stream if it does not exist */ +} + +/* Properties to create the stream if it does not exist */ +message InitializationProperties { + int32 segments = 1; /* the initial number of segments */ + int64 initial_position = 2; /* the position in the event store to start reading from */ + SequencingPolicy sequencing_policy = 3; /* the sequencing policy */ + string filter = 4; /* an expression to filter events */ + string stream_name = 5; /* a name for the stream */ +} + +/* Defines the policy used to distribute events across segments */ +message SequencingPolicy { + string policy_name = 1; /* the name of the sequencing policy */ + repeated string parameter = 2; /* optional list of parameters used by the sequencing policy */ +} + +/* Message to report progress of event processing for a specific segment in a stream */ +message ProgressRequest { + int32 segment = 1; /* the segment number */ + int64 position = 2; /* the position of the last processed event */ +} + +/* Message sent by Axon Server to the client stream connection */ +message StreamSignal { + int32 segment = 1; /* the segment number */ + oneof type { + io.axoniq.axonserver.grpc.event.EventWithToken event = 2; /* an event to process in the client */ + bool closed = 3; /* indicates that the segment is closed by Axon Server */ + } +} \ No newline at end of file From 0e3ecc50da24354ebf964713f0d316e7d8a66955 Mon Sep 17 00:00:00 2001 From: Marc Date: Tue, 13 Feb 2024 11:10:45 +0100 Subject: [PATCH 2/4] rename persisted streams to persistent streams --- src/main/proto/streams.proto | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/main/proto/streams.proto b/src/main/proto/streams.proto index c5c3504..8c7f3be 100644 --- a/src/main/proto/streams.proto +++ b/src/main/proto/streams.proto @@ -4,40 +4,40 @@ import "event.proto"; import "google/protobuf/empty.proto"; option java_multiple_files = true; -/* Service providing operations for persisted event streams, event streams where Axon Server keeps +/* Service providing operations for persistent event streams, event streams where Axon Server keeps track of the progress. All operations require a header (AxonIQ-Context) to be passed with each request to define the context. */ service EventStreamService { - /* Open a persisted event stream connection from a client. Creates the stream if it does not exist. */ + /* Open a persistent event stream connection from a client. Creates the stream if it does not exist. */ rpc OpenStream(stream StreamCommand) returns (stream StreamSignal) { } - /* Deletes a persisted event stream. All existing connections to the stream are closed. */ + /* Deletes a persistent event stream. All existing connections to the stream are closed. */ rpc DeleteStream(DeleteStreamRequest) returns (stream google.protobuf.Empty) { } - /* Change properties of a persisted event stream. */ + /* Change properties of a persistent event stream. */ rpc UpdateStream(UpdateStreamRequest) returns (stream google.protobuf.Empty) { } - /* Returns a list of all persisted event streams defined (for the context). For each event stream it returns + /* Returns a list of all persistent event streams defined (for the context). For each event stream it returns the progress per segment. */ rpc ListStreams( google.protobuf.Empty) returns (stream StreamStatus) { } - /* Returns the clients connected to all persisted event streams defined (for the context). For each client it returns + /* Returns the clients connected to all persistent event streams defined (for the context). For each client it returns the segments that are received by the client. */ rpc ListConnections( google.protobuf.Empty) returns (stream StreamConnections) { } } -/* Contains the current status of a persisted stream */ +/* Contains the current status of a persistent stream */ message StreamStatus { string stream_id = 1; /* the unique identification of the stream */ string stream_name = 2; /* a name for the stream */ @@ -52,7 +52,7 @@ message SegmentPosition { int64 position = 2; /* the last confirmed position */ } -/* Contains the current connections for a persisted stream */ +/* Contains the current connections for a persistent stream */ message StreamConnections { string stream_id = 1; /* the unique identification of the stream */ string stream_name = 2; /* a name for the stream */ @@ -74,12 +74,12 @@ message StreamCommand { } } -/* Request to delete a persisted stream */ +/* Request to delete a persistent stream */ message DeleteStreamRequest { string stream_id = 1; /* the unique identification of the stream */ } -/* Request to update the properties of a persisted stream */ +/* Request to update the properties of a persistent stream */ message UpdateStreamRequest { string stream_id = 1; /* the unique identification of the stream */ int32 segments = 2; /* Request to change the number of segments */ @@ -87,7 +87,7 @@ message UpdateStreamRequest { } -/* Request to open a connection to a persisted stream */ +/* Request to open a connection to a persistent stream */ message OpenRequest { string stream_id = 1; /* the unique identification of the stream */ string client_id = 2; /* the unique identification of the client */ From 4cd8faa09bb517e0371d9a50421aac44f7d310ae Mon Sep 17 00:00:00 2001 From: Marc Date: Mon, 19 Feb 2024 09:20:10 +0100 Subject: [PATCH 3/4] rename service and messages in persistent-streams.proto. --- .../{streams.proto => persistent-streams.proto} | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) rename src/main/proto/{streams.proto => persistent-streams.proto} (93%) diff --git a/src/main/proto/streams.proto b/src/main/proto/persistent-streams.proto similarity index 93% rename from src/main/proto/streams.proto rename to src/main/proto/persistent-streams.proto index 8c7f3be..7bc90d9 100644 --- a/src/main/proto/streams.proto +++ b/src/main/proto/persistent-streams.proto @@ -8,9 +8,9 @@ option java_multiple_files = true; track of the progress. All operations require a header (AxonIQ-Context) to be passed with each request to define the context. */ -service EventStreamService { +service PersistentStreamService { /* Open a persistent event stream connection from a client. Creates the stream if it does not exist. */ - rpc OpenStream(stream StreamCommand) returns (stream StreamSignal) { + rpc OpenStream(stream StreamRequest) returns (stream StreamSignal) { } @@ -67,10 +67,10 @@ message StreamConnection { /* Request to set up a connection to a stream. Clients should first submit an OpenRequest on this connection to connect to a stream and can then submit ProgressRequest messages to report progress. */ -message StreamCommand { +message StreamRequest { oneof request { - OpenRequest open = 1; /* the initial message to connect to a stream */ - ProgressRequest progress = 2; /* sends progress in processing events to Axon Server */ + Open open = 1; /* the initial message to connect to a stream */ + Acknowledgement acknowledgment = 2; /* sends progress in processing events to Axon Server */ } } @@ -88,7 +88,7 @@ message UpdateStreamRequest { /* Request to open a connection to a persistent stream */ -message OpenRequest { +message Open { string stream_id = 1; /* the unique identification of the stream */ string client_id = 2; /* the unique identification of the client */ InitializationProperties initialization_properties = 3; /* properties to create the stream if it does not exist */ @@ -110,7 +110,7 @@ message SequencingPolicy { } /* Message to report progress of event processing for a specific segment in a stream */ -message ProgressRequest { +message Acknowledgement { int32 segment = 1; /* the segment number */ int64 position = 2; /* the position of the last processed event */ } From d902daedbcc6494fd72f3ddf786fd371bebf51b3 Mon Sep 17 00:00:00 2001 From: Marc Date: Tue, 27 Feb 2024 08:15:56 +0100 Subject: [PATCH 4/4] process review comments --- .github/workflows/maven.yml | 1 - src/main/proto/persistent-streams.proto | 22 +++++++++++----------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index f99cec3..b5a940f 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -5,7 +5,6 @@ on: branches: - master - axon-server-api-*.*.x - - persisted-streams workflow_dispatch: jobs: diff --git a/src/main/proto/persistent-streams.proto b/src/main/proto/persistent-streams.proto index 7bc90d9..6202965 100644 --- a/src/main/proto/persistent-streams.proto +++ b/src/main/proto/persistent-streams.proto @@ -26,23 +26,23 @@ service PersistentStreamService { /* Returns a list of all persistent event streams defined (for the context). For each event stream it returns the progress per segment. */ - rpc ListStreams( google.protobuf.Empty) returns (stream StreamStatus) { + rpc ListStreams(google.protobuf.Empty) returns (stream StreamStatus) { } /* Returns the clients connected to all persistent event streams defined (for the context). For each client it returns the segments that are received by the client. */ - rpc ListConnections( google.protobuf.Empty) returns (stream StreamConnections) { + rpc ListConnections(google.protobuf.Empty) returns (stream StreamConnections) { } } /* Contains the current status of a persistent stream */ message StreamStatus { - string stream_id = 1; /* the unique identification of the stream */ - string stream_name = 2; /* a name for the stream */ + string stream_id = 1; /* the unique identification of the persistent stream */ + string stream_name = 2; /* a name for the persistent stream */ SequencingPolicy sequencing_policy = 3; /* the policy used to distribute events across segments. */ - string filter = 4; /* an expression to filter events */ + string filter = 4; /* an expression to filter events, same syntax as used for ad-hoc queries on the event store */ repeated SegmentPosition segments = 5; /* the last confirmed position per segment */ } @@ -66,11 +66,11 @@ message StreamConnection { } /* Request to set up a connection to a stream. Clients should first submit an OpenRequest on this connection - to connect to a stream and can then submit ProgressRequest messages to report progress. */ + to connect to a stream and can then submit Acknowledgement messages to report progress. */ message StreamRequest { oneof request { Open open = 1; /* the initial message to connect to a stream */ - Acknowledgement acknowledgment = 2; /* sends progress in processing events to Axon Server */ + ProgressAcknowledgement acknowledgeProgress = 2; /* sends progress in processing events to Axon Server */ } } @@ -99,18 +99,18 @@ message InitializationProperties { int32 segments = 1; /* the initial number of segments */ int64 initial_position = 2; /* the position in the event store to start reading from */ SequencingPolicy sequencing_policy = 3; /* the sequencing policy */ - string filter = 4; /* an expression to filter events */ - string stream_name = 5; /* a name for the stream */ + string filter = 4; /* an expression to filter events, same syntax as used for ad-hoc queries on the event store */ + string stream_name = 5; /* a name for the persistent stream */ } -/* Defines the policy used to distribute events across segments */ +/* Defines the policy used to distribute events across segments. The policy name must be known on the server. */ message SequencingPolicy { string policy_name = 1; /* the name of the sequencing policy */ repeated string parameter = 2; /* optional list of parameters used by the sequencing policy */ } /* Message to report progress of event processing for a specific segment in a stream */ -message Acknowledgement { +message ProgressAcknowledgement { int32 segment = 1; /* the segment number */ int64 position = 2; /* the position of the last processed event */ }