-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ksql-migrations and sequential dependencies and timing issues #8752
Comments
Bumping this up as this impacts SD |
@colinhicks I believe this problem is not specific to migrations. if you run a create stream statement (with key/value schema), followed by a create connector statement on the CLI, this will still break the connector once it starts producing records to that topic. the connector failure would be because the existing schema for that topic (registered by ksqldb on the create stream statement) is not compatible with the records its trying to produce (specifically, the connector might have certain REQUIRED columns while ksql would have registered all columns as OPTIONAL). This will not be an issue if the connector produces data to that topic before the ksqldb stream is created (because ksqldb's schema is backward compatible with the one registered by Connect -- required fields can be upgraded to optional). overall, its a race condition between the connector and ksqldb trying to register schemas on the same subject in SR. the connector followed-by ksqldb situation works fine, but ksqldb followed-by connect will fail for the above reason. Overall, I'm wondering if this requires ksqldb should support the complete Connect schema spec. Right now, Connect and ksqldb speak slightly different "schemas" and that seems to create friction in such ways. |
I had a quick chat with @wicknicks about this issue today offline, and aggregating ksql team's own discussions about this:
|
Thanks @guozhangwang and @wicknicks!
Above all, the behavior should be deterministic and we should systematically prevent timing issues that yield processing-time issues. Ideally ksql returns immediate feedback when there's a problem: If I submit problematic statements, I should get an error in the response rather than having to look at either the processing log or connector task errors. Statement ordering is indeed very important with regard to CREATE SOURCE CONNECTOR and CREATE STREAM where a common topic is shared. But I am not sure we should enforce only a single ordering. Both these workflows seem valid for example:
Even though our instinct may be that the first example is more common in practice, I'd like to strongly consider options that support leaving the relevant statement ordering up to the user. Another way to look at this is to ask if rather than leaving enforcement up to the SR and/or connect services: What if we introduced some additional semantics and made enforcement the user's responsibility? Here's a sketch of new syntax I imagine might support the first example above: CREATE SOURCE CONNECTOR my_source WITH ('kafka.topic'='source-input', 'output.format'='avro', ...);
ASSERT EXISTS TOPIC 'source-input' TIMEOUT 3 MINUTES;
ASSERT EXISTS SCHEMA 'source-input-value' TIMEOUT 3 MINUTES;
CREATE STREAM source_input WITH (kafka_topic='source-input', format='avro'); And the second example: CREATE STREAM source_input (`col1` VARCHAR, `col2` BIGINT)
WITH (kafka_topic='source-input', format='avro', partitions=6);
-- other statements that build out an ETL pipeline ...
ASSERT EXISTS TOPIC 'source-input' TIMEOUT 15 SECONDS;
ASSERT EXISTS SCHEMA 'source-input-value' TIMEOUT 15 SECONDS;
CREATE SOURCE CONNECTOR my_source WITH ('kafka.topic'='source-input', 'output.format'='avro', ...); The idea is the Would love to get some feedback on this sketch. At the expense of introducing new syntax, it seems to offer more expressibility to users while helping maintain determinism. It would also appear to require fewer if any interdependencies with SR and connect. |
@colinhicks The |
Thanks for the feedback, @davidwmartines2! I wonder if you have a take on this as well: In the context of migrations, I wonder if these assert statements also offer a pattern for establishing a baseline state for an environment. I'm imagining a migration file with only -- migrations/V000001__check-prerequisites.sql
-- ensure expected topics already exist in Kafka
-- leave out the TIMEOUT clause to use a default duration
ASSERT EXIST TOPIC 'my-topic-1';
ASSERT EXIST TOPIC 'my-topic-1' ;
-- <other topic assertions> ...
ASSERT EXIST TOPIC 'my-topic-5';
-- we expect that a schema is already registered for my-topic-5's key and value
ASSERT EXIST SCHEMA 'my-topic-5-key';
ASSERT EXIST SCHEMA 'my-topic-5-value'; -- migrations/V000002__create-streams.sql
CREATE STREAM my_stream_1 (foo VARCHAR) WITH (kafka_topic='my-topic-1', value_format='json');
-- <other CS statements> ...
CREATE STREAM my_stream_5 WITH (kafka_topic='my-topic-5', format='protobuf'); An alternative pattern would be to include the specific assertion and relevant DDL statement in individual migrations: -- migrations/V000001__create-stream-1.sql
ASSERT EXIST TOPIC 'my-topic-1';
CREATE STREAM my_stream_1 (foo VARCHAR) WITH (kafka_topic='my-topic-1', value_format='json'); -- migrations/V000005__create-stream-5.sql
ASSERT EXIST TOPIC 'my-topic-5';
ASSERT EXIST SCHEMA 'my-topic-5-key';
ASSERT EXIST SCHEMA 'my-topic-5-value';
CREATE STREAM my_stream_5 WITH (kafka_topic='my-topic-5', format='protobuf'); |
@colinhicks Another alternative is to use a similar approach of Snowflake with We can implement it with the CREATE CONNECTOR like this:
The CREATE SOURCE CONNECTOR will block until all topics and SR schemas are created. We could add a TIMEOUT to the CREATE CONNECTOR in case something fails. Seems simpler than adding several ASSERT statements. What you think? |
@colinhicks I like the explicitness of the ASSERTs. That is a nice pattern, and it makes it clear what the expected baseline state is. I think this has potential for usage outside of the connector scenario. It does require that the user would need to know what to assert. In the case of schemas, it might not be clear what the names of them are. @spena It seems the wait_for_completion would do assertion logic behind the scenes, which is a little more user-friendly. Would it be able to determine the objects and names to check for? |
I like the explicitness of the EXIST TOPIC|STREAM|TABLE statements. btw, for SCHEMA, are we referring to a subject in schema registry or a schema entry in the ksql metastore? @spena |
I also like the |
Just want to add some inputs. |
@danilomr12 the names of the topics that a source connector may produce to are not known at connector creation time. this completely depends on how a connector is implemented. the final topics may also change if certain SMTs are used (for example, a RegexRouter may add prefixes or suffixes to topic names effectively writing records over to a completely different topic than what the connector intended). in your example, would it be possible to run two connectors instead of one to start seeing data in topics quickly? |
@wicknicks You're right, I use some SMT too. That's one option I have, it won't solve completely the problem of topic creation delay but will be so much faster. While this solution is not ready I'll test some workarounds. |
@jzaralim , can we close this issue as resolved? |
Yup! The |
ksql-migrations is a great tool for deploying objects to ksqlDB in a repeatable and predictable way. However, there are some cases where the desired end state (a set of connectors, streams, tables, etc.) cannot be created in a single application of a set of ksql migration scripts.
Consider this scenario:
Waiting a few seconds and re-running the migrations apply will give it time to create the topic, and migration 2 will then succeed.
Ideally, we don't have any failure/retry logic in the automated ksql-migrations process.
This is a request for guidance and/or more detailed documentation on how to handle this type of situation.
The text was updated successfully, but these errors were encountered: