Skip to content

Commit

Permalink
Let PartitionProcessorManager only log on changes
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Oct 17, 2024
1 parent 63d58fe commit db5fa3a
Showing 1 changed file with 4 additions and 3 deletions.
7 changes: 4 additions & 3 deletions crates/worker/src/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ impl ProcessorState {

fn step_down(&mut self) -> Result<(), Error> {
if self.planned_mode != RunMode::Follower {
debug!("Asked by cluster-controller to demote partition to follower");
self.handle.step_down()?;
}

Expand Down Expand Up @@ -189,6 +190,7 @@ impl ProcessorState {
// todo alternative could be to let the CC decide the leader epoch
let leader_epoch =
Self::obtain_next_epoch(metadata_store_client, self.partition_id, node_id).await?;
debug!(%leader_epoch, "Asked by cluster-controller to promote partition to leader");
self.running_for_leadership_with_epoch = Some(leader_epoch);
self.handle.run_for_leader(leader_epoch)?;
}
Expand Down Expand Up @@ -643,9 +645,9 @@ impl<T: TransportConnect> PartitionProcessorManager<T> {

match control_processor.command {
ProcessorCommand::Stop => {
debug!("Asked by cluster-controller to stop partition");
if let Some(processor) = self.running_partition_processors.remove(&partition_id) {
if let Some(handle) = self.task_center.cancel_task(processor.task_id) {
debug!(%partition_id, "Asked by cluster-controller to stop partition");
if let Err(err) = handle.await {
warn!("Partition processor crashed while shutting down: {err}");
}
Expand All @@ -655,7 +657,6 @@ impl<T: TransportConnect> PartitionProcessorManager<T> {
}
}
ProcessorCommand::Follower => {
debug!("Asked by cluster-controller to demote partition to follower");
if let Some(state) = self.running_partition_processors.get_mut(&partition_id) {
// if we error here, then the system is shutting down
state.step_down()?;
Expand All @@ -674,7 +675,6 @@ impl<T: TransportConnect> PartitionProcessorManager<T> {
}
}
ProcessorCommand::Leader => {
debug!("Asked by cluster-controller to promote partition to leader");
if let Some(state) = self.running_partition_processors.get_mut(&partition_id) {
state
.run_for_leader(
Expand Down Expand Up @@ -736,6 +736,7 @@ impl<T: TransportConnect> PartitionProcessorManager<T> {
key_range: &RangeInclusive<PartitionKey>,
mode: RunMode,
) -> Result<(), Error> {
debug!("Start new partition processor.");
let mut state = self.spawn_partition_processor(partition_id, key_range.clone())?;

if RunMode::Leader == mode {
Expand Down

0 comments on commit db5fa3a

Please sign in to comment.