Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConfigsBuilder properties (LeaseManagementConfig, CoordinatorConfig .....) can't be set or overridden in KclMessageDrivenChannelAdapter using KinesisClientLibConfiguration #224

Closed
amitchidrewar1301 opened this issue Sep 24, 2024 · 14 comments
Milestone

Comments

@amitchidrewar1301
Copy link

amitchidrewar1301 commented Sep 24, 2024

  • In version 2.0.1, we could override the Kinesis KCL configuration in KclMessageDrivenChannelAdapter using KinesisClientLibConfiguration as described in the documentation.

  • However, in the latest version (4.0.3), we need to set properties such as maxLeasesForWorker, maxLeasesToStealAtOneTime, and other lease management configurations in the KclMessageDrivenChannelAdapter scheduler (via LeaseManagementConfig). Additionally, we are looking for a method to override other configurations using ConfigsBuilder, especially for properties that are not directly exposed in KinesisConsumerProperties.

Could you please add the support or guide on how to configure the ConfigsBuilder for setting these lower-level configurations?

@artembilan artembilan added this to the 4.0.4 milestone Sep 24, 2024
@artembilan
Copy link
Member

That doc looks out-dated and has to be fixed respectively.
Unfortunately, the ConfigsBuilder is not self-containing entity, e.g. lifecycleConfig() does this:

    public LifecycleConfig lifecycleConfig() {
        return new LifecycleConfig();
    }

and so on.
And that info is not stored in the builder by itself to expose some hook to configure the whole ConfigsBuilder.
We probably can expose Consumer<> configurer for individual pieces coming here:

public Scheduler(@NonNull CheckpointConfig checkpointConfig, @NonNull CoordinatorConfig coordinatorConfig, @NonNull LeaseManagementConfig leaseManagementConfig, @NonNull LifecycleConfig lifecycleConfig, @NonNull MetricsConfig metricsConfig, @NonNull ProcessorConfig processorConfig, @NonNull RetrievalConfig retrievalConfig) {

Then a ConsumerEndpointCustomizer<KclMessageDrivenChannelAdapter> from Spring Cloud Stream could be used to call those hooks.
Another option would be to exposed all the properties for those *Config parts, but that's too much and probably not always would fit into configuration properties binding.

Will be fixed in Spring Integration AWS soon enough.

Thanks

@amitchidrewar1301
Copy link
Author

amitchidrewar1301 commented Sep 24, 2024

Thanks @artembilan for acknowledging and quick response

As I understood we can expect some configurer which can help us to override the individual pieces of config.

@artembilan
Copy link
Member

That's correct.
Please, follow the linked issue in Spring Integration AWS.

@amitchidrewar1301
Copy link
Author

Also, If we are thinking about the ConsumerEndpointCustomizer<KclMessageDrivenChannelAdapter> approach we need to make sure the config which was set in customizer configure method should not be overridden in the actual scheduler.

As I can see init(), dostart() method in KclMessageDrivenChannelAdapter are responsible for setting the ConfigsBuilder.

Currently, this step comes after the customizer configuration which eventually can override the configuration made in the customizer.

@artembilan
Copy link
Member

Well, that's not true.
The logic there is like this:

			consumerEndpoint = createConsumerEndpoint(destination, group, properties);
			consumerEndpoint.setOutputChannel(inputChannel);
			this.consumerCustomizer.configure(consumerEndpoint, name, group);
			if (consumerEndpoint instanceof InitializingBean initializingConsumerEndpoint) {
				initializingConsumerEndpoint.afterPropertiesSet();
			}
			if (properties.isAutoStartup() && consumerEndpoint instanceof Lifecycle consumerEndpointWithLifecycle) {
				consumerEndpointWithLifecycle.start();
			}

So, first that customizer is called, and only then the mentioned onInit() and doStart().
Therefore all good with how I'm seeing this.

@artembilan
Copy link
Member

In the KclMessageDrivenChannelAdapter I'm going to call it like this:

MetricsConfig metricsConfig = this.config.metricsConfig();
metricsConfig.metricsLevel(this.metricsLevel);
if (MetricsLevel.NONE.equals(this.metricsLevel)) {
	metricsConfig.metricsFactory(new NullMetricsFactory());
}
this.metricsConfigCustomizer.accept(metricsConfig);

So, whatever could be set by the KclMessageDrivenChannelAdapter internally can be overridden by the provided Consumer<MetricsConfig> metricsConfigCustomizer.

@amitchidrewar1301
Copy link
Author

Yeah, that makes sense, and like this, all the configs can be managed 👍 , so the priority would be always to customizer over KinesisConsumerProperties isn't it?

@artembilan
Copy link
Member

That's correct. If you set those customizers for specific config, that will win over configuration properties.
Or can be merged if you just don't call specific method of the particular config, like in the sample above: if you don't call metricsFactory() from the Consumer<MetricsConfig>, then whatever is set by the KclMessageDrivenChannelAdapter is still going to be there.

@amitchidrewar1301
Copy link
Author

Well, that's not true. The logic there is like this:

			consumerEndpoint = createConsumerEndpoint(destination, group, properties);
			consumerEndpoint.setOutputChannel(inputChannel);
			this.consumerCustomizer.configure(consumerEndpoint, name, group);
			if (consumerEndpoint instanceof InitializingBean initializingConsumerEndpoint) {
				initializingConsumerEndpoint.afterPropertiesSet();
			}
			if (properties.isAutoStartup() && consumerEndpoint instanceof Lifecycle consumerEndpointWithLifecycle) {
				consumerEndpointWithLifecycle.start();
			}

So, first that customizer is called, and only then the mentioned onInit() and doStart(). Therefore all good with how I'm seeing this.

To be honest I have tried to override the adapter instance in Customizer using reflection but turned out that config and scheduler instance variables are getting overridden due to instance assignment as you can see below so not sure of the order though.

@Override
	protected void onInit() {
		super.onInit();
		this.config =
				new ConfigsBuilder(buildStreamTracker(),
						this.consumerGroup,
						this.kinesisClient,
						this.dynamoDBClient,
						this.cloudWatchClient,
						this.workerId,
						this.recordProcessorFactory);
	}


@Override
	protected void doStart() {
		super.doStart();

		if (ListenerMode.batch.equals(this.listenerMode) && CheckpointMode.record.equals(this.checkpointMode)) {
			this.checkpointMode = CheckpointMode.batch;
			logger.warn("The 'checkpointMode' is overridden from [CheckpointMode.record] to [CheckpointMode.batch] "
					+ "because it does not make sense in case of [ListenerMode.batch].");
		}

		LifecycleConfig lifecycleConfig =  this.config.lifecycleConfig();
		lifecycleConfig.taskBackoffTimeMillis(this.consumerBackoff);

		RetrievalSpecificConfig retrievalSpecificConfig;
		String singleStreamName = this.streams.length == 1 ? this.streams[0] : null;
		if (this.fanOut) {
			retrievalSpecificConfig =
					new FanOutConfig(this.kinesisClient)
							.applicationName(this.consumerGroup)
							.streamName(singleStreamName);
		}
		else {
			retrievalSpecificConfig =
					new PollingConfig(this.kinesisClient)
							.streamName(singleStreamName);
		}

		RetrievalConfig retrievalConfig = this.config.retrievalConfig()
				.glueSchemaRegistryDeserializer(this.glueSchemaRegistryDeserializer)
				.retrievalSpecificConfig(retrievalSpecificConfig);

		MetricsConfig metricsConfig = this.config.metricsConfig();
		metricsConfig.metricsLevel(this.metricsLevel);

		this.scheduler =
				new Scheduler(
						this.config.checkpointConfig(),
						this.config.coordinatorConfig(),
						this.config.leaseManagementConfig(),
						lifecycleConfig,
						metricsConfig,
						this.config.processorConfig(),
						retrievalConfig);

		this.executor.execute(this.scheduler);
	}

@artembilan
Copy link
Member

Right. That's what I meant with a "not self-containing entity".
That this.config.leaseManagementConfig() does:

    public LeaseManagementConfig leaseManagementConfig() {
        return new LeaseManagementConfig(tableName(), dynamoDBClient(), kinesisClient(), workerIdentifier());
    }

So, we have to first call this, then customize it, and only after that push down to the Scheduler:

		LeaseManagementConfig leaseManagementConfig = this.config.leaseManagementConfig();
		this.leaseManagementConfigCustomizer.accept(leaseManagementConfig);
		
		this.scheduler =
				new Scheduler(
						this.config.checkpointConfig(),
						coordinatorConfig,
						leaseManagementConfig,
						lifecycleConfig,
						metricsConfig,
						this.config.processorConfig(),
						retrievalConfig);

@amitchidrewar1301
Copy link
Author

Thanks @artembilan for explaining the changes and sorting the issue, will be looking forward for the changes in the next iteration

@amitchidrewar1301
Copy link
Author

@artembilan

spring-projects/spring-integration-aws@c113391#r147137622

  • Can you also please add support for retrievalConfig and other configs as well?
  • We have use cases where we need to update the below configs
        retrievalConfig.retrievalSpecificConfig(
            new PollingConfig(this.kinesisClient).maxRecords(...);

        retrievalConfig.initialPositionInStreamExtended(
            newInitialPosition(..);
      }
  • Also, is there any way where we can override the lease table name

@artembilan
Copy link
Member

I have answered you over there so far.
The initialPositionInStreamExtended is deprecated:

    /**
     * The location in the shard from which the KinesisClientLibrary will start fetching records from
     * when the application starts for the first time and there is no checkpoint for the shard.
     *
     * <p>
     * Default value: {@link InitialPositionInStream#LATEST}
     * </p>
     *
     * @deprecated Initial stream position is now handled by {@link StreamTracker}.
     * @see StreamTracker#orphanedStreamInitialPositionInStream()
     * @see StreamTracker#createStreamConfig(StreamIdentifier)
     */
    @Deprecated
    private InitialPositionInStreamExtended initialPositionInStreamExtended = InitialPositionInStreamExtended
            .newInitialPosition(InitialPositionInStream.LATEST);

See KclMessageDrivenChannelAdapter.setStreamInitialSequence(InitialPositionInStreamExtended streamInitialSequence) { instead.

It is really recommended by KCL to use a fan-out mode, not polling.
Therefore maxRecords does not make sense.

@artembilan
Copy link
Member

The fix is in.
Please, consider to test your project with the latest Kiensis Binder 4.0.4-SNAPSHOT.
When I got 👍 from you I'll be free to release it as soon as possible.
Thank you for all your valuable feedback!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants