Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistent streams #58

Merged
merged 4 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
branches:
- master
- axon-server-api-*.*.x
- persisted-streams
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this in the PR, right? Or will the persisted-streams branch live forever?

workflow_dispatch:

jobs:
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>io.axoniq</groupId>
<artifactId>axon-server-api</artifactId>
<version>2023.0.2-SNAPSHOT</version>
<version>2024.0.0-SNAPSHOT</version>
<name>Axon Server API</name>
<description>Public API for communication with AxonServer</description>

Expand Down
125 changes: 125 additions & 0 deletions src/main/proto/streams.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
syntax = "proto3";
package io.axoniq.axonserver.grpc.streams;
import "event.proto";
import "google/protobuf/empty.proto";
option java_multiple_files = true;

/* Service providing operations for persistent event streams, event streams where Axon Server keeps
track of the progress. All operations require a header (AxonIQ-Context) to be passed with each request to define
the context.
*/
service EventStreamService {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: rename to PersistentStreamsService
I also suggest to change the name of the proto file to persistent-streams.proto

/* Open a persistent event stream connection from a client. Creates the stream if it does not exist. */
rpc OpenStream(stream StreamCommand) returns (stream StreamSignal) {

}

/* Deletes a persistent event stream. All existing connections to the stream are closed. */
rpc DeleteStream(DeleteStreamRequest) returns (stream google.protobuf.Empty) {

}

/* Change properties of a persistent event stream. */
rpc UpdateStream(UpdateStreamRequest) returns (stream google.protobuf.Empty) {

}

/* Returns a list of all persistent event streams defined (for the context). For each event stream it returns
the progress per segment. */
rpc ListStreams( google.protobuf.Empty) returns (stream StreamStatus) {

}

/* Returns the clients connected to all persistent event streams defined (for the context). For each client it returns
the segments that are received by the client. */
rpc ListConnections( google.protobuf.Empty) returns (stream StreamConnections) {

}
}

/* Contains the current status of a persistent stream */
message StreamStatus {
string stream_id = 1; /* the unique identification of the stream */
string stream_name = 2; /* a name for the stream */
SequencingPolicy sequencing_policy = 3; /* the policy used to distribute events across segments. */
string filter = 4; /* an expression to filter events */
repeated SegmentPosition segments = 5; /* the last confirmed position per segment */
}

/* Contains the position per segment */
message SegmentPosition {
int32 segment = 1; /* the segment number */
int64 position = 2; /* the last confirmed position */
}

/* Contains the current connections for a persistent stream */
message StreamConnections {
string stream_id = 1; /* the unique identification of the stream */
string stream_name = 2; /* a name for the stream */
repeated StreamConnection segments_per_connection = 3; /* the segments held per client */
}

/* Contains the segments per client */
message StreamConnection {
string client_id = 1; /* the unique identification of the client */
repeated int32 segments = 2; /* a list of segment numbers */
}

/* Request to set up a connection to a stream. Clients should first submit an OpenRequest on this connection
to connect to a stream and can then submit ProgressRequest messages to report progress. */
message StreamCommand {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid confusion with Command (Messages), I suggest renaming to StreamInstruction or StreamRequest

oneof request {
OpenRequest open = 1; /* the initial message to connect to a stream */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the outer message is StreamRequest, this one could just be called "Open" or "OpenStream". Or since the description says "the initial message to connect to a stream", perhaps "Connect"?

ProgressRequest progress = 2; /* sends progress in processing events to Axon Server */
}
}

/* Request to delete a persistent stream */
message DeleteStreamRequest {
string stream_id = 1; /* the unique identification of the stream */
}

/* Request to update the properties of a persistent stream */
message UpdateStreamRequest {
string stream_id = 1; /* the unique identification of the stream */
int32 segments = 2; /* Request to change the number of segments */
string stream_name = 3; /* Request to change the name of the stream */
}


/* Request to open a connection to a persistent stream */
message OpenRequest {
string stream_id = 1; /* the unique identification of the stream */
string client_id = 2; /* the unique identification of the client */
InitializationProperties initialization_properties = 3; /* properties to create the stream if it does not exist */
}

/* Properties to create the stream if it does not exist */
message InitializationProperties {
int32 segments = 1; /* the initial number of segments */
int64 initial_position = 2; /* the position in the event store to start reading from */
SequencingPolicy sequencing_policy = 3; /* the sequencing policy */
string filter = 4; /* an expression to filter events */
string stream_name = 5; /* a name for the stream */
}

/* Defines the policy used to distribute events across segments */
message SequencingPolicy {
string policy_name = 1; /* the name of the sequencing policy */
repeated string parameter = 2; /* optional list of parameters used by the sequencing policy */
}

/* Message to report progress of event processing for a specific segment in a stream */
message ProgressRequest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other messaging systems typically call these message Acknowledgements or ACKs. Might be easier for people to understand if we use the same terminology. Wdyt?

int32 segment = 1; /* the segment number */
int64 position = 2; /* the position of the last processed event */
}

/* Message sent by Axon Server to the client stream connection */
message StreamSignal {
int32 segment = 1; /* the segment number */
oneof type {
io.axoniq.axonserver.grpc.event.EventWithToken event = 2; /* an event to process in the client */
bool closed = 3; /* indicates that the segment is closed by Axon Server */
}
}
Loading