Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistent streams #58

Merged
merged 4 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>io.axoniq</groupId>
<artifactId>axon-server-api</artifactId>
<version>2023.0.2-SNAPSHOT</version>
<version>2024.0.0-SNAPSHOT</version>
<name>Axon Server API</name>
<description>Public API for communication with AxonServer</description>

Expand Down
125 changes: 125 additions & 0 deletions src/main/proto/persistent-streams.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
syntax = "proto3";
package io.axoniq.axonserver.grpc.streams;
import "event.proto";
import "google/protobuf/empty.proto";
option java_multiple_files = true;

/* Service providing operations for persistent event streams, event streams where Axon Server keeps
track of the progress. All operations require a header (AxonIQ-Context) to be passed with each request to define
the context.
*/
service PersistentStreamService {
/* Open a persistent event stream connection from a client. Creates the stream if it does not exist. */
rpc OpenStream(stream StreamRequest) returns (stream StreamSignal) {

}

/* Deletes a persistent event stream. All existing connections to the stream are closed. */
rpc DeleteStream(DeleteStreamRequest) returns (stream google.protobuf.Empty) {

}

/* Change properties of a persistent event stream. */
rpc UpdateStream(UpdateStreamRequest) returns (stream google.protobuf.Empty) {

}

/* Returns a list of all persistent event streams defined (for the context). For each event stream it returns
the progress per segment. */
rpc ListStreams(google.protobuf.Empty) returns (stream StreamStatus) {

}

/* Returns the clients connected to all persistent event streams defined (for the context). For each client it returns
the segments that are received by the client. */
rpc ListConnections(google.protobuf.Empty) returns (stream StreamConnections) {

}
}

/* Contains the current status of a persistent stream */
message StreamStatus {
string stream_id = 1; /* the unique identification of the persistent stream */
string stream_name = 2; /* a name for the persistent stream */
SequencingPolicy sequencing_policy = 3; /* the policy used to distribute events across segments. */
string filter = 4; /* an expression to filter events, same syntax as used for ad-hoc queries on the event store */
repeated SegmentPosition segments = 5; /* the last confirmed position per segment */
}

/* Contains the position per segment */
message SegmentPosition {
int32 segment = 1; /* the segment number */
int64 position = 2; /* the last confirmed position */
}

/* Contains the current connections for a persistent stream */
message StreamConnections {
string stream_id = 1; /* the unique identification of the stream */
string stream_name = 2; /* a name for the stream */
Comment on lines +57 to +58
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
string stream_id = 1; /* the unique identification of the stream */
string stream_name = 2; /* a name for the stream */
string stream_id = 1; /* the unique identification of the persistent stream */
string stream_name = 2; /* the name for the persistent stream */

repeated StreamConnection segments_per_connection = 3; /* the segments held per client */
}

/* Contains the segments per client */
message StreamConnection {
string client_id = 1; /* the unique identification of the client */
repeated int32 segments = 2; /* a list of segment numbers */
}

/* Request to set up a connection to a stream. Clients should first submit an OpenRequest on this connection
to connect to a stream and can then submit Acknowledgement messages to report progress. */
message StreamRequest {
oneof request {
Open open = 1; /* the initial message to connect to a stream */
ProgressAcknowledgement acknowledgeProgress = 2; /* sends progress in processing events to Axon Server */
}
}

/* Request to delete a persistent stream */
message DeleteStreamRequest {
string stream_id = 1; /* the unique identification of the stream */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
string stream_id = 1; /* the unique identification of the stream */
string stream_id = 1; /* the unique identification of the persistent stream */

}

/* Request to update the properties of a persistent stream */
message UpdateStreamRequest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the fact both the segments and stream_name parameters reside here mean both need to be given for any update persistent stream request? Or are they nullable, for example?

string stream_id = 1; /* the unique identification of the stream */
int32 segments = 2; /* Request to change the number of segments */
string stream_name = 3; /* Request to change the name of the stream */
Comment on lines +84 to +86
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
string stream_id = 1; /* the unique identification of the stream */
int32 segments = 2; /* Request to change the number of segments */
string stream_name = 3; /* Request to change the name of the stream */
string stream_id = 1; /* the unique identification of the persistent stream */
int32 segments = 2; /* Request to change the number of segments */
string stream_name = 3; /* Request to change the name of the persistent stream */

}


/* Request to open a connection to a persistent stream */
message Open {
string stream_id = 1; /* the unique identification of the stream */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
string stream_id = 1; /* the unique identification of the stream */
string stream_id = 1; /* the unique identification of the persistent stream */

string client_id = 2; /* the unique identification of the client */
InitializationProperties initialization_properties = 3; /* properties to create the stream if it does not exist */
}

/* Properties to create the stream if it does not exist */
message InitializationProperties {
int32 segments = 1; /* the initial number of segments */
int64 initial_position = 2; /* the position in the event store to start reading from */
SequencingPolicy sequencing_policy = 3; /* the sequencing policy */
string filter = 4; /* an expression to filter events, same syntax as used for ad-hoc queries on the event store */
string stream_name = 5; /* a name for the persistent stream */
}

/* Defines the policy used to distribute events across segments. The policy name must be known on the server. */
message SequencingPolicy {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the stream request fail if the policy_name doesn't exist on the implementers side of this API?

string policy_name = 1; /* the name of the sequencing policy */
repeated string parameter = 2; /* optional list of parameters used by the sequencing policy */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the stream request fail if any of the parameters don't exist or are mistyped?

}

/* Message to report progress of event processing for a specific segment in a stream */
message ProgressAcknowledgement {
int32 segment = 1; /* the segment number */
int64 position = 2; /* the position of the last processed event */
}

/* Message sent by Axon Server to the client stream connection */
message StreamSignal {
int32 segment = 1; /* the segment number */
oneof type {
io.axoniq.axonserver.grpc.event.EventWithToken event = 2; /* an event to process in the client */
bool closed = 3; /* indicates that the segment is closed by Axon Server */
}
}