-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: move command topic deserialization to CommandRunner and introduce DEGRADED CommandRunnerStatus #6012
feat: move command topic deserialization to CommandRunner and introduce DEGRADED CommandRunnerStatus #6012
Conversation
fd27224
to
02ce7ab
Compare
f03080e
to
69c4c55
Compare
… CommandRunnerStatus
69c4c55
to
09356e8
Compare
* @apiNote this method may block | ||
*/ | ||
List<QueuedCommand> getNewCommands(Duration timeout); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't agree with the decision to pull QueuedCommand out of this interface and replace it with Pair<ConsumerRecord, CommandStatusFuture>> since it breaks this abstraction unnecessarily. The idea behind this interface was to provide an api to a message queue where a caller could enqueue/dequeue, and also register to be notified when a given message is consumed/processed, or when a given offset is consumed/processed. It's not important that the backing store is Kafka, and that shouldn't really leak into the interface (I know the txnl producer is in there, but that can be fixed). It's also not that important that the messages have key/value types CommandID/Command (I know, bad naming 😁 ). Therefore I think it would be better to organize this in the following way:
Leave this interface mostly the same, except change references to Command and CommandID to byte[]
Change QueuedCommand's members to be:
class QueuedCommand {
byte[] commandId;
byte[] command;
....
}
That said, it is useful to have an explicit type for Command to improve safety (so for example, a caller can't accidentally mix them up, or CommandStore can't accidentally use the Command as the key to the status map). So, an additional improvement we could make would be to leave the command
parameters as type Command (I'd still use byte[] for the command id and just serialize/deserialize that from the caller). Then, in QueuedCommand, change getCommand
to getAndDeserializeCommand(Deserializer<Command> deserializer)
so that CommandRunner is the entity that actually does the deserialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used your suggestion as a guideline and have updated the PR. (No changes to CommandQueue)
09c358c
to
04d7121
Compare
04d7121
to
4f5ce5f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good. Couple more thoughts inline.
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/CommandTopicUtil.java
Outdated
Show resolved
Hide resolved
@@ -55,6 +59,7 @@ | |||
// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling | |||
public class CommandStore implements CommandQueue, Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of thoughts about design here:
It would be nice to try to structure this so that the implementation of CommandStore cannot deserialize Command (at least not without making assumptions about its format, which it shouldn't do). This would require the following tweaks:
- make the CommandID serializer/deserializer, and the Command serializer only (not deserializer) parameters to CommandStore passed in the constructor. Don't have it accept a deserializer.
- make the deserializer a parameter to QueuedCommand.getAndDeserializeCommand()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -117,8 +123,8 @@ void handleStatement(final QueuedCommand queuedCommand) { | |||
throwIfNotConfigured(); | |||
|
|||
handleStatementWithTerminatedQueries( | |||
queuedCommand.getCommand(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd just pass command, commandId, status, and offset in as parameters rather than deserializing from this method
…ce DEGRADED CommandRunnerStatus (confluentinc#6012) * feat: move deserialization to CommandRunner and introduce DEGRADED to CommandRunnerStatus * rohan comment * fix test * more feedback * pass deserializer to queuedcommand for command
Description
Moves deserialization of the ConsumerRecords from the CommandTopic to the CommandRunner. This allows for deserialization of individual records and better detection of
SerializationException
that can occur when a server processes an incompatible command.Introduces a
DEGRADED
CommandRunnerStatus
which indicates that the CommandRunner has encountered an incompatible process and the thread has been terminated. The only way to get out of this state currently is to start up a server version that can process the command that put old server version into theDEGRADED
stateNext step is to add a check in DistributingExecutor that makes sure the CommandRunner isn't in a DEGRADED state.
Testing done
Updated Unit tests and added new ones
Manual test:
Reviewer checklist