You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
In the existing data synchronization, snapshot data and incremental data are send to kafka first, and then streaming write to Iceberg by Flink. Because the direct consumption of snapshot data will lead to problems such as high throughput and serious disorder (writing partition randomly), which will lead to write performance degradation and throughput glitches. It will always crash beacuse memory limit.e.g.
Proposed Changes
Here we can found serious disorder and high throughput will
Rate limit
At this time, the write.rate.limit option can be turned on to ensure smooth writing, it will decrese throughput .
Memory and disk map
In the scene above,we found insertRowsMap occpy around 30% memory,it's a heap map ,if we replace it into a memory with disk map,it can reduce memory pressure.
Example Usage
It's ok to put rate limit parameters into table options or table parameters.
For example,you can create a memory catalog iceberg table in Flink SQL Client
CREATE TABLE inlong_iceberg13 (
id bigint,
name string,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector'='iceberg-inlong',
'catalog-name'='hive_prod',
'catalog-database'='inlong_db',
'catalog-table'='inlong_table',
'uri'='thrift://localhost:9083',
'warehouse'='hdfs://localhost:8020/hive/warehouse',
'write.rate.limit' = '1000'
);
so here 'write.rate.limit' = '1000' means when sink into this table , it can only consume 1000 records per second for all subtasks totally.
Also, you can create it in hive catalog with table parameters write.rate.limit
Error Handles
case1:
Q:Will the triggering of chk be affected when the flow is limited? Will the downstream of the checkpoint barrier be unable to process for a long time and cause chk to time out??
A:There will be some impact but not serious。
a. The flow of downstream processing data is limited, resulting in a backlog of buffers, resulting in back pressure, and the upstream source will delay or even suspend pulling data from the split.
b. The insertion of the barrier is to insert the barrier to all downstream nodes synchronously when the CheckpointEvent is executed in the mailbox.
c. The sending of the barrier takes the route of Event, it will not be suspended due to insufficient buffer memory, but it will still be ranked behind data
d. That is to say, when chk is triggered, before chk times out, at least the amount of data to be processed downstream is "backlog buffer in the network / average data volume per piece = total backlog number", as long as it can be processed within a limited time After completing so many backlogs, you can complete chk
Rollout/Adoption Plan
Impelement rate limit.
Rate limit can dynamic adjustment.
Replace insertMap (determine whether to put it into position delete records in per checkpoint ) from heap map to rocksdb map.
The text was updated successfully, but these errors were encountered:
thexiay
changed the title
[Improve][Sort] Improve memory stability of data ingesting into iceberg
[Improve][Sort] Add rate limit for ingesting into iceberg
Apr 12, 2023
InLong Component
InLong Sort
Movtivation
In the existing data synchronization, snapshot data and incremental data are send to kafka first, and then streaming write to Iceberg by Flink. Because the direct consumption of snapshot data will lead to problems such as high throughput and serious disorder (writing partition randomly), which will lead to write performance degradation and throughput glitches. It will always crash beacuse memory limit.e.g.
Proposed Changes
Here we can found serious disorder and high throughput will
Rate limit
At this time, the write.rate.limit option can be turned on to ensure smooth writing, it will decrese throughput .
Memory and disk map
In the scene above,we found
insertRowsMap
occpy around 30% memory,it's a heap map ,if we replace it into a memory with disk map,it can reduce memory pressure.Example Usage
It's ok to put rate limit parameters into table options or table parameters.
For example,you can create a memory catalog iceberg table in Flink SQL Client
so here
'write.rate.limit' = '1000'
means when sink into this table , it can only consume 1000 records per second for all subtasks totally.Also, you can create it in hive catalog with table parameters
write.rate.limit
Error Handles
case1:
A:There will be some impact but not serious。
a. The flow of downstream processing data is limited, resulting in a backlog of buffers, resulting in back pressure, and the upstream source will delay or even suspend pulling data from the split.
b. The insertion of the barrier is to insert the barrier to all downstream nodes synchronously when the CheckpointEvent is executed in the mailbox.
c. The sending of the barrier takes the route of Event, it will not be suspended due to insufficient buffer memory, but it will still be ranked behind data
d. That is to say, when chk is triggered, before chk times out, at least the amount of data to be processed downstream is "backlog buffer in the network / average data volume per piece = total backlog number", as long as it can be processed within a limited time After completing so many backlogs, you can complete chk
Rollout/Adoption Plan
insertMap
(determine whether to put it into position delete records in per checkpoint ) from heap map to rocksdb map.The text was updated successfully, but these errors were encountered: