Skip to content

Commit

Permalink
YQ-3933 Fix FlagTrackDelivery / to stable (#12232)
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds authored Dec 3, 2024
1 parent 57b3c20 commit 036a7ce
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 21 deletions.
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/actors/common/retry_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class TRetryEventsQueue {
void Send(THolder<T> ev, ui64 cookie = 0) {
if (LocalRecipient) {
LastSentDataTime = TInstant::Now();
NActors::TActivationContext::Send(new NActors::IEventHandle(RecipientId, SenderId, ev.Release(), /* flags */ 0, cookie));
NActors::TActivationContext::Send(new NActors::IEventHandle(RecipientId, SenderId, ev.Release(), /* flags */ NActors::IEventHandle::FlagTrackDelivery, cookie));
return;
}

Expand Down
63 changes: 43 additions & 20 deletions ydb/tests/fq/yds/test_row_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
@pytest.fixture
def kikimr(request):
kikimr_conf = StreamingOverKikimrConfig(
cloud_mode=True, node_count={"/cp": TenantConfig(1), "/compute": TenantConfig(2)}
cloud_mode=True, node_count={"/cp": TenantConfig(1), "/compute": TenantConfig(3)}
)
kikimr = StreamingOverKikimr(kikimr_conf)
kikimr.compute_plane.fq_config['row_dispatcher']['enabled'] = True
Expand Down Expand Up @@ -686,39 +686,41 @@ def test_restart_compute_node(self, kikimr, client):
client.create_yds_connection(
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
)
self.init_topics("test_restart_compute_node")
self.init_topics("test_restart_compute_node", partitions_count=4)

sql = Rf'''
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
SELECT Cast(time as String) FROM {YDS_CONNECTION}.`{self.input_topic}`
WITH (format=json_each_row, SCHEMA (time Int32 NOT NULL));'''

query_id = start_yds_query(kikimr, client, sql)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 4)

data = ['{"time": 101, "data": "hello1"}', '{"time": 102, "data": "hello2"}']
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(100, 102)], "partition_key1")
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(102, 104)], "partition_key2")
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(104, 106)], "partition_key3")
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(106, 108)], "partition_key4")

self.write_stream(data)
expected = ['101', '102']
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
expected = [Rf'''{c}''' for c in range(100, 108)]
assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == expected

kikimr.compute_plane.wait_completed_checkpoints(
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 1
)

wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2)

node_index = 2
logging.debug("Restart compute node {}".format(node_index))
kikimr.compute_plane.kikimr_cluster.nodes[node_index].stop()
kikimr.compute_plane.kikimr_cluster.nodes[node_index].start()
kikimr.compute_plane.wait_bootstrap(node_index)

data = ['{"time": 103, "data": "hello3"}', '{"time": 104, "data": "hello4"}']
self.write_stream(data)
expected = ['103', '104']
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(108, 110)], "partition_key1")
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(110, 112)], "partition_key2")
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(112, 114)], "partition_key3")
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(114, 116)], "partition_key4")

expected = [Rf'''{c}''' for c in range(108, 116)]
assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == expected

kikimr.compute_plane.wait_completed_checkpoints(
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 1
)
Expand All @@ -729,10 +731,31 @@ def test_restart_compute_node(self, kikimr, client):
kikimr.compute_plane.kikimr_cluster.nodes[node_index].start()
kikimr.compute_plane.wait_bootstrap(node_index)

data = ['{"time": 105, "data": "hello5"}', '{"time": 106, "data": "hello6"}']
self.write_stream(data)
expected = ['105', '106']
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(116, 118)], "partition_key1")
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(118, 120)], "partition_key2")
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(120, 122)], "partition_key3")
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(122, 124)], "partition_key4")

expected = [Rf'''{c}''' for c in range(116, 124)]
assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == expected

kikimr.compute_plane.wait_completed_checkpoints(
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2
)

node_index = 3
logging.debug("Restart compute node {}".format(node_index))
kikimr.compute_plane.kikimr_cluster.nodes[node_index].stop()
kikimr.compute_plane.kikimr_cluster.nodes[node_index].start()
kikimr.compute_plane.wait_bootstrap(node_index)

write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(124, 126)], "partition_key1")
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(126, 128)], "partition_key2")
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(128, 130)], "partition_key3")
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(130, 132)], "partition_key4")

expected = [Rf'''{c}''' for c in range(124, 132)]
assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == expected

stop_yds_query(client, query_id)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)
Expand Down

0 comments on commit 036a7ce

Please sign in to comment.