Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]Support mutli table for routine load #3

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 114 additions & 28 deletions docs/en/docs/data-operate/import/import-way/routine-load-manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,40 +142,126 @@ CREATE ROUTINE LOAD example_db.test1 ON example_tbl
>
>"strict_mode" = "true"

3. Example of importing data in Json format
[3. Example of importing data in Json format](#Example_of_importing_data_in_Json_format)

Routine Load only supports the following two types of json formats
The json format imported by Routine Load only supports the following two types:

The first one has only one record and is a json object.
- Only one record and it is a json object:

```json
{"category":"a9jadhx","author":"test","price":895}
When using **single table import** (that is, specifying the table name through ON TABLE_NAME), the json data format is as follows.
```
{"category":"a9jadhx","author":"test","price":895}
```
When using **dynamic/multi-table import** (i.e. not specifying a specific table name), the JSON data format is as follows.

```
table_name|{"category":"a9jadhx","author":"test","price":895}
```


Assuming we need to import data into two tables, `user_address` and `user_info`, the message format is as follows.

eg: user_address data format

```
user_address|{"user_id":128787321878,"address":"朝阳区朝阳大厦XXX号","timestamp":1589191587}
```
eg: user_info data format
```
user_info|{"user_id":128787321878,"name":"张三","age":18,"timestamp":1589191587}
```

The second one is a json array, which can contain multiple records

```json
[
{
"category":"11",
"author":"4avc",
"price":895,
"timestamp":1589191587
},
{
"category":"22",
"author":"2avc",
"price":895,
"timestamp":1589191487
},
{
"category":"33",
"author":"3avc",
"price":342,
"timestamp":1589191387
}
]
- The second type is a JSON array that can contain multiple records.

When using **single table import** (that is, specifying the table name through ON TABLE_NAME), the json data format is as follows.

```json
[
{
"category":"11",
"author":"4avc",
"price":895,
"timestamp":1589191587
},
{
"category":"22",
"author":"2avc",
"price":895,
"timestamp":1589191487
},
{
"category":"33",
"author":"3avc",
"price":342,
"timestamp":1589191387
}
]
```
When using **dynamic/multi-table import** (i.e. not specifying a specific table name), the JSON data format is as follows.

```
table_name|[
{
"user_id":128787321878,
"address":"Los Angeles, CA, USA",
"timestamp":1589191587
},
{
"user_id":128787321878,
"address":"Los Angeles, CA, USA",
"timestamp":1589191587
},
{
"user_id":128787321878,
"address":"Los Angeles, CA, USA",
"timestamp":1589191587
}
]
```
Similarly, taking the tables `user_address` and `user_info` as examples, the message format would be as follows.

eg: user_address data format
```
user_address|[
{
"category":"11",
"author":"4avc",
"price":895,
"timestamp":1589191587
},
{
"category":"22",
"author":"2avc",
"price":895,
"timestamp":1589191487
},
{
"category":"33",
"author":"3avc",
"price":342,
"timestamp":1589191387
}
]
```
eg: user_info data format
```
user_info|[
{
"user_id":128787321878,
"address":"Los Angeles, CA, USA",
"timestamp":1589191587
},
{
"user_id":128787321878,
"address":"Los Angeles, CA, USA",
"timestamp":1589191587
},
{
"user_id":128787321878,
"address":"Los Angeles, CA, USA",
"timestamp":1589191587
}
```

Create the Doris data table to be imported

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ CREATE ROUTINE LOAD

The Routine Load function allows users to submit a resident import task, and import data into Doris by continuously reading data from a specified data source.

Currently, only data in CSV or Json format can be imported from Kakfa through unauthenticated or SSL authentication.
Currently, only data in CSV or Json format can be imported from Kakfa through unauthenticated or SSL authentication. [Example of importing data in Json format](../../../../data-operate/import/import-way/routine-load-manual.md#Example_of_importing_data_in_Json_format)

grammar:

```sql
CREATE ROUTINE LOAD [db.]job_name ON tbl_name
CREATE ROUTINE LOAD [db.]job_name [ON tbl_name]
[merge_type]
[load_properties]
[job_properties]
Expand All @@ -53,12 +53,23 @@ FROM data_source [data_source_properties]

- `tbl_name`

Specifies the name of the table to be imported.
Specifies the name of the table to be imported.Optional parameter, If not specified, the dynamic table method will
be used, which requires the data in Kafka to contain table name information. Currently, only the table name can be
obtained from the Kafka value, and it needs to conform to the format of "table_name|{"col1": "val1", "col2": "val2"}"
for JSON data. The "tbl_name" represents the table name, and "|" is used as the delimiter between the table name and
the table data. The same format applies to CSV data, such as "table_name|val1,val2,val3". It is important to note that
the "table_name" must be consistent with the table name in Doris, otherwise it may cause import failures.

Tips: The `columns_mapping` parameter is not supported for dynamic tables. If your table structure is consistent with
the table structure in Doris and there is a large amount of table information to be imported, this method will be the
best choice.

- `merge_type`

Data merge type. The default is APPEND, which means that the imported data are ordinary append write operations. The MERGE and DELETE types are only available for Unique Key model tables. The MERGE type needs to be used with the [DELETE ON] statement to mark the Delete Flag column. The DELETE type means that all imported data are deleted data.

Tips: When using dynamic multiple tables, please note that this parameter should be consistent with the type of each dynamic table, otherwise it will result in import failure.

- load_properties

Used to describe imported data. The composition is as follows:
Expand All @@ -85,31 +96,43 @@ FROM data_source [data_source_properties]

`(k1, k2, tmpk1, k3 = tmpk1 + 1)`

Tips: Dynamic multiple tables are not supported.

- `preceding_filter`

Filter raw data. For a detailed introduction to this part, you can refer to the [Column Mapping, Transformation and Filtering] document.

Tips: When using dynamic multiple tables, please note that this parameter should be consistent with the type of each dynamic table, otherwise it will result in import failure.

- `where_predicates`

Filter imported data based on conditions. For a detailed introduction to this part, you can refer to the [Column Mapping, Transformation and Filtering] document.

`WHERE k1 > 100 and k2 = 1000`

Tips: When using dynamic multiple tables, please note that this parameter should be consistent with the type of each dynamic table, otherwise it will result in import failure.

- `partitions`

Specify in which partitions of the import destination table. If not specified, it will be automatically imported into the corresponding partition.

`PARTITION(p1, p2, p3)`

Tips: When using dynamic multiple tables, please note that this parameter should conform to each dynamic table, otherwise it may cause import failure.

- `DELETE ON`

It needs to be used with the MEREGE import mode, only for the table of the Unique Key model. Used to specify the columns and calculated relationships in the imported data that represent the Delete Flag.

`DELETE ON v3 >100`

Tips: When using dynamic multiple tables, please note that this parameter should conform to each dynamic table, otherwise it may cause import failure.

- `ORDER BY`

Tables only for the Unique Key model. Used to specify the column in the imported data that represents the Sequence Col. Mainly used to ensure data order when importing.

Tips: When using dynamic multiple tables, please note that this parameter should conform to each dynamic table, otherwise it may cause import failure.

- `job_properties`

Expand Down Expand Up @@ -356,7 +379,31 @@ FROM data_source [data_source_properties]
);
````

2. Create a Kafka routine import task named test1 for example_tbl of example_db. Import tasks are in strict mode.
2. Create a Kafka routine dynamic multiple tables import task named "test1" for the "example_db". Specify the column delimiter, group.id, and client.id, and automatically consume all partitions, subscribing from the position with data (OFFSET_BEGINNING).

Assuming that we need to import data from Kafka into tables "test1" and "test2" in the "example_db", we create a routine import task named "test1". At the same time, we write the data in "test1" and "test2" to a Kafka topic named "my_topic" so that data from Kafka can be imported into both tables through a routine import task.

```sql
CREATE ROUTINE LOAD example_db.test1
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"property.group.id" = "xxx",
"property.client.id" = "xxx",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
```

3. Create a Kafka routine import task named test1 for example_tbl of example_db. Import tasks are in strict mode.



Expand All @@ -382,7 +429,7 @@ FROM data_source [data_source_properties]
);
````

3. Import data from the Kafka cluster through SSL authentication. Also set the client.id parameter. The import task is in non-strict mode and the time zone is Africa/Abidjan
4. Import data from the Kafka cluster through SSL authentication. Also set the client.id parameter. The import task is in non-strict mode and the time zone is Africa/Abidjan



Expand Down Expand Up @@ -412,7 +459,7 @@ FROM data_source [data_source_properties]
);
````

4. Import data in Json format. By default, the field name in Json is used as the column name mapping. Specify to import three partitions 0, 1, and 2, and the starting offsets are all 0
5. Import data in Json format. By default, the field name in Json is used as the column name mapping. Specify to import three partitions 0, 1, and 2, and the starting offsets are all 0



Expand All @@ -437,7 +484,7 @@ FROM data_source [data_source_properties]
);
````

5. Import Json data, extract fields through Jsonpaths, and specify the root node of the Json document
6. Import Json data, extract fields through Jsonpaths, and specify the root node of the Json document



Expand Down Expand Up @@ -465,7 +512,7 @@ FROM data_source [data_source_properties]
);
````

6. Create a Kafka routine import task named test1 for example_tbl of example_db. And use conditional filtering.
7. Create a Kafka routine import task named test1 for example_tbl of example_db. And use conditional filtering.



Expand All @@ -492,7 +539,7 @@ FROM data_source [data_source_properties]
);
````

7. Import data to Unique with sequence column Key model table
8. Import data to Unique with sequence column Key model table



Expand All @@ -516,7 +563,7 @@ FROM data_source [data_source_properties]
);
````

8. Consume from a specified point in time
9. Consume from a specified point in time



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ Result description:
PauseTime: The last job pause time
EndTime: Job end time
DbName: corresponding database name
TableName: corresponding table name
TableName: The name of the corresponding table (In the case of multiple tables, since it is a dynamic table, the specific table name is not displayed, and we uniformly display it as "multi-table").
IsMultiTbl: Indicates whether it is a multi-table
State: job running state
DataSourceType: Data source type: KAFKA
CurrentTaskNum: The current number of subtasks
Expand Down
Loading