Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce origin event tracking mechanism #424

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open

Introduce origin event tracking mechanism #424

wants to merge 10 commits into from

Conversation

agavra
Copy link
Contributor

@agavra agavra commented Feb 17, 2025

This patch has a few changes:

  • It introduces the mechanism for counting origin events via OriginEventRecorder that inserts headers as records are consumed if (and only if) they do not already contain the origin event marker
  • It adds a UsageBasedV1 License Info which acts as a placeholder for our Usage based license when we introduce it. This is necessary so that existing licenses don't suddenly start reporting Origin Events / adding the headers
  • It adds the reporting of Origin Events to a server, a TestLicenseServer that imitates a license server and integration tests to make sure that it works with complicated subtopologies

These are done in three separate commits if you want to review one at a time.

@@ -97,8 +88,6 @@

public class ResponsiveKafkaStreams extends KafkaStreams {

private static final String SIGNING_KEYS_PATH = "/responsive-license-keys/license-keys.json";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this License stuff was moved to LicenseUtils

Comment on lines 95 to 100
final var header = record.headers().lastHeader(ORIGIN_EVENT_HEADER_KEY);
if (header == null) {
record.headers().add(ORIGIN_EVENT_HEADER_KEY, ORIGIN_EVENT_MARK);
inc(new TopicPartition(record.topic(), record.partition()));
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the main part of the PR where it records a new origin event and marks it as an origin event

for (final ConsumerRecord<K, V> record : records) {
final var header = record.headers().lastHeader(ORIGIN_EVENT_HEADER_KEY);
if (header == null) {
record.headers().add(ORIGIN_EVENT_HEADER_KEY, ORIGIN_EVENT_MARK);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do these headers actually get propagated down to the producer when it writes? I wonder if it would make more sense to just add the headers from the producer itself to prevent the header from being removed/messed with by streams

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a related note, don't we need to filter out changelog records? Though I suppose they are only read by the restore consumer.

Copy link
Contributor Author

@agavra agavra Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They do get propagated (I have a test for most of the DSL operators to confirm) but I think you're right we might as well just add the headers from the producers anyway.

That would also fix the changelog record problem since we'll produce changelog records with the header anyway.

final var header = record.headers().lastHeader(ORIGIN_EVENT_HEADER_KEY);
if (header == null) {
record.headers().add(ORIGIN_EVENT_HEADER_KEY, ORIGIN_EVENT_MARK);
inc(new TopicPartition(record.topic(), record.partition()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if a task gets restarted within a single stream thread without the task being unassigned, then we'll double-count origin events. I'm not sure the exact circumstances where that happens, but I'm pretty sure there's a few paths that close/suspend a task and then revives it - one I know of is when an EOS task is corrupted. One option here would be to track the last observed offset for each partition, and only bump the count if the polled record is a new offset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fixed with the new offset tracker


@Override
public void onCommit(final Map<TopicPartition, OffsetAndMetadata> offsets) {
final var now = System.currentTimeMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The offsets being committed aren't necessarily what's been polled so far. Streams puts the polled records into its own buffer, and can commit at any time, even if the internal buffer isn't drained. So technically we should only be reporting origin events up to the offsets being committed. How to do that is tricky. Need to think about it some more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I fixed this and I believe the previous one as well by tracking origin events in a bitset instead of just as a long, and then counting that bitset on commit. It's a bit heavyweight, but I think it should be fine since if we are tracking 100K offsets per commit we'll only need ~1.5k longs which should be hardly any memory overhead and 1.5k bitwise ops for counting the number of 1s shouldn't take much either. PTAL

public void checkLicense(final LicenseInfo licenseInfo) {
if (licenseInfo instanceof TimedTrialV1) {
verifyTimedTrialV1((TimedTrialV1) licenseInfo);
LOG.info("Checked and confirmed valid Time Trial license");
} else if (licenseInfo instanceof UsageBasedV1) {
LOG.info("Checked and confirmed valid Usage Based license");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this where we will check that the license is valid eventually?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this is where we could issue the remote check if we wanted to

@@ -17,5 +17,6 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

log4j.logger.dev.responsive.kafka.api.async=DEBUG
#log4j.logger.dev.responsive.kafka.internal.clients.OriginEventRecorder=DEBUG
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you mean to include this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I did, I was using it earlier and figured I'd probably be turning it off and on. I can remove it though if it bothers our perfectionism 😆


public void mark(final long offset) {
if (offset < baseOffset) {
throw new IllegalArgumentException(
Copy link
Contributor

@ableegoldman ableegoldman Feb 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we log an error before throwing that also includes the value of offsets (here and below)

Comment on lines +17 to +21
/**
* This class allows us to efficiently count the number of events
* between two offsets that match a certain condition. This is somewhat
* memory efficient in that we can track 100K offsets with ~1.5K longs
* (64 bits per long), or roughly 12KB.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused by the point of this class. This is just counting the number of origin events since the last commit, right? It doesn't seem like we ever expose what the actual origin event offsets are, so why do we need to save them in this BitSet scheme -- ie why not just use an int that increments in #mark and resets/returns in #countAndShift?

Lmk if you'd rather sync online because I'm sure I'm missing something here 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class should definitely have javadoc explaining it, originally I had implemented what you're saying but Rohan pointed out these two concerns: #424 (comment) and #424 (comment) both fixed by this bitset tracker.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants