\n
\n
Altinity Sink Connector for ClickHouse
\n
For more information, visit ",
"mode": "html"
},
- "pluginVersion": "9.4.7",
+ "pluginVersion": "11.1.3",
"targets": [
{
"datasource": {
@@ -116,7 +425,7 @@
"h": 5,
"w": 3,
"x": 2,
- "y": 1
+ "y": 14
},
"id": 4,
"options": {
@@ -124,6 +433,7 @@
"graphMode": "none",
"justifyMode": "auto",
"orientation": "auto",
+ "percentChangeColorMode": "standard",
"reduceOptions": {
"calcs": [
"lastNotNull"
@@ -131,10 +441,12 @@
"fields": "",
"values": false
},
+ "showPercentChange": false,
"text": {},
- "textMode": "value_and_name"
+ "textMode": "value_and_name",
+ "wideLayout": true
},
- "pluginVersion": "9.4.7",
+ "pluginVersion": "11.1.3",
"targets": [
{
"datasource": {
@@ -180,7 +492,7 @@
"h": 5,
"w": 2,
"x": 5,
- "y": 1
+ "y": 14
},
"id": 6,
"options": {
@@ -188,6 +500,7 @@
"graphMode": "none",
"justifyMode": "auto",
"orientation": "auto",
+ "percentChangeColorMode": "standard",
"reduceOptions": {
"calcs": [
"lastNotNull"
@@ -195,10 +508,12 @@
"fields": "",
"values": false
},
+ "showPercentChange": false,
"text": {},
- "textMode": "auto"
+ "textMode": "auto",
+ "wideLayout": true
},
- "pluginVersion": "9.4.7",
+ "pluginVersion": "11.1.3",
"targets": [
{
"datasource": {
@@ -247,7 +562,7 @@
"h": 5,
"w": 2,
"x": 7,
- "y": 1
+ "y": 14
},
"id": 50,
"options": {
@@ -255,6 +570,7 @@
"graphMode": "area",
"justifyMode": "auto",
"orientation": "auto",
+ "percentChangeColorMode": "standard",
"reduceOptions": {
"calcs": [
"lastNotNull"
@@ -262,9 +578,11 @@
"fields": "",
"values": false
},
- "textMode": "auto"
+ "showPercentChange": false,
+ "textMode": "auto",
+ "wideLayout": true
},
- "pluginVersion": "9.4.7",
+ "pluginVersion": "11.1.3",
"targets": [
{
"datasource": {
@@ -279,223 +597,17 @@
}
],
"title": "Errors",
- "type": "stat"
- },
- {
- "datasource": {
- "type": "prometheus",
- "uid": "P1809F7CD0C75ACF3"
- },
- "fieldConfig": {
- "defaults": {
- "color": {
- "mode": "thresholds"
- },
- "mappings": [],
- "thresholds": {
- "mode": "absolute",
- "steps": [
- {
- "color": "green",
- "value": null
- },
- {
- "color": "red",
- "value": 80
- }
- ]
- }
- },
- "overrides": []
- },
- "gridPos": {
- "h": 5,
- "w": 3,
- "x": 9,
- "y": 1
- },
- "id": 60,
- "options": {
- "colorMode": "value",
- "graphMode": "area",
- "justifyMode": "auto",
- "orientation": "auto",
- "reduceOptions": {
- "calcs": [
- "lastNotNull"
- ],
- "fields": "",
- "values": false
- },
- "textMode": "auto"
- },
- "pluginVersion": "9.4.7",
- "targets": [
- {
- "datasource": {
- "type": "prometheus",
- "uid": "P1809F7CD0C75ACF3"
- },
- "editorMode": "code",
- "expr": "clickhouse_sink_connector_uptime/1000",
- "legendFormat": "__auto",
- "range": true,
- "refId": "A"
- }
- ],
- "title": "UpTime(secs)",
- "type": "stat"
- },
- {
- "datasource": {
- "type": "prometheus",
- "uid": "P1809F7CD0C75ACF3"
- },
- "fieldConfig": {
- "defaults": {
- "color": {
- "mode": "thresholds"
- },
- "custom": {
- "align": "auto",
- "cellOptions": {
- "type": "auto"
- },
- "filterable": false,
- "inspect": false
- },
- "mappings": [],
- "thresholds": {
- "mode": "absolute",
- "steps": [
- {
- "color": "green",
- "value": null
- },
- {
- "color": "red",
- "value": 80
- }
- ]
- }
- },
- "overrides": [
- {
- "matcher": {
- "id": "byName",
- "options": "topic"
- },
- "properties": [
- {
- "id": "custom.width",
- "value": 324
- }
- ]
- },
- {
- "matcher": {
- "id": "byName",
- "options": "job"
- },
- "properties": [
- {
- "id": "custom.width",
- "value": 72
- }
- ]
- },
- {
- "matcher": {
- "id": "byName",
- "options": "instance"
- },
- "properties": [
- {
- "id": "custom.width",
- "value": 109
- }
- ]
- }
- ]
- },
- "gridPos": {
- "h": 5,
- "w": 12,
- "x": 12,
- "y": 1
- },
- "id": 8,
- "options": {
- "footer": {
- "countRows": false,
- "fields": "",
- "reducer": [
- "sum"
- ],
- "show": false
- },
- "frameIndex": 0,
- "showHeader": true,
- "sortBy": []
- },
- "pluginVersion": "9.4.7",
- "targets": [
- {
- "datasource": {
- "uid": "prometheus"
- },
- "editorMode": "code",
- "exemplar": true,
- "expr": "rate(clickhouse_sink_topics_num_records_total[$__rate_interval])",
- "format": "table",
- "instant": true,
- "interval": "",
- "legendFormat": "",
- "refId": "A"
- }
- ],
- "title": "Records (Topic)",
- "type": "table"
- },
- {
- "datasource": {
- "type": "prometheus",
- "uid": "P1809F7CD0C75ACF3"
- },
- "fieldConfig": {
- "defaults": {
- "color": {
- "mode": "palette-classic"
- },
- "custom": {
- "axisCenteredZero": false,
- "axisColorMode": "text",
- "axisLabel": "",
- "axisPlacement": "auto",
- "barAlignment": -1,
- "drawStyle": "bars",
- "fillOpacity": 97,
- "gradientMode": "none",
- "hideFrom": {
- "legend": false,
- "tooltip": false,
- "viz": false
- },
- "lineInterpolation": "linear",
- "lineWidth": 2,
- "pointSize": 5,
- "scaleDistribution": {
- "type": "linear"
- },
- "showPoints": "auto",
- "spanNulls": false,
- "stacking": {
- "group": "A",
- "mode": "none"
- },
- "thresholdsStyle": {
- "mode": "off"
- }
+ "type": "stat"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "P1809F7CD0C75ACF3"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "thresholds"
},
"mappings": [],
"thresholds": {
@@ -515,24 +627,30 @@
"overrides": []
},
"gridPos": {
- "h": 8,
- "w": 12,
- "x": 0,
- "y": 6
+ "h": 5,
+ "w": 3,
+ "x": 9,
+ "y": 14
},
- "id": 16,
+ "id": 60,
"options": {
- "legend": {
- "calcs": [],
- "displayMode": "list",
- "placement": "bottom",
- "showLegend": true
+ "colorMode": "value",
+ "graphMode": "area",
+ "justifyMode": "auto",
+ "orientation": "auto",
+ "percentChangeColorMode": "standard",
+ "reduceOptions": {
+ "calcs": [
+ "lastNotNull"
+ ],
+ "fields": "",
+ "values": false
},
- "tooltip": {
- "mode": "single",
- "sort": "none"
- }
+ "showPercentChange": false,
+ "textMode": "auto",
+ "wideLayout": true
},
+ "pluginVersion": "11.1.3",
"targets": [
{
"datasource": {
@@ -540,13 +658,14 @@
"uid": "P1809F7CD0C75ACF3"
},
"editorMode": "code",
- "expr": "clickhouse_sink_debezium_lag/1000",
+ "expr": "clickhouse_sink_connector_uptime/1000",
+ "legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
- "title": "Debezium -> CH Lag (secs)",
- "type": "timeseries"
+ "title": "UpTime(secs)",
+ "type": "stat"
},
{
"datasource": {
@@ -560,31 +679,22 @@
"mode": "palette-classic"
},
"custom": {
+ "axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
- "barAlignment": 0,
- "drawStyle": "line",
- "fillOpacity": 0,
+ "fillOpacity": 80,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
- "lineInterpolation": "linear",
"lineWidth": 1,
- "pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
- "showPoints": "auto",
- "spanNulls": false,
- "stacking": {
- "group": "A",
- "mode": "none"
- },
"thresholdsStyle": {
"mode": "off"
}
@@ -610,20 +720,29 @@
"h": 8,
"w": 12,
"x": 12,
- "y": 6
+ "y": 14
},
"id": 10,
"options": {
+ "barRadius": 0,
+ "barWidth": 0.97,
+ "fullHighlight": false,
+ "groupWidth": 0.7,
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
+ "orientation": "auto",
+ "showValue": "auto",
+ "stacking": "none",
"tooltip": {
"mode": "single",
"sort": "none"
- }
+ },
+ "xTickLabelRotation": 0,
+ "xTickLabelSpacing": 0
},
"targets": [
{
@@ -640,45 +759,35 @@
}
],
"title": "Rate(Total Records)",
- "type": "timeseries"
+ "type": "barchart"
},
{
"datasource": {
"type": "prometheus",
"uid": "P1809F7CD0C75ACF3"
},
- "description": "Lag calculated from time records are inserted to CH and the timestamp recorded by debezium connector",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
+ "axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
- "barAlignment": 0,
- "drawStyle": "line",
- "fillOpacity": 0,
+ "fillOpacity": 80,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
- "lineInterpolation": "linear",
"lineWidth": 1,
- "pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
- "showPoints": "auto",
- "spanNulls": false,
- "stacking": {
- "group": "A",
- "mode": "none"
- },
"thresholdsStyle": {
"mode": "off"
}
@@ -704,36 +813,44 @@
"h": 8,
"w": 12,
"x": 0,
- "y": 14
+ "y": 19
},
- "id": 14,
+ "id": 16,
"options": {
+ "barRadius": 0,
+ "barWidth": 0.97,
+ "fullHighlight": false,
+ "groupWidth": 0.7,
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
+ "orientation": "auto",
+ "showValue": "auto",
+ "stacking": "none",
"tooltip": {
"mode": "single",
"sort": "none"
- }
+ },
+ "xTickLabelRotation": 0,
+ "xTickLabelSpacing": 0
},
"targets": [
{
"datasource": {
"type": "prometheus",
- "uid": "PBFA97CFB590B2093"
+ "uid": "P1809F7CD0C75ACF3"
},
"editorMode": "code",
- "expr": "clickhouse_sink_db_lag/1000",
- "legendFormat": "__auto",
+ "expr": "clickhouse_sink_debezium_lag/1000",
"range": true,
"refId": "A"
}
],
- "title": "Source DB -> CH Lag (secs)",
- "type": "timeseries"
+ "title": "Debezium -> CH Lag (secs)",
+ "type": "barchart"
},
{
"datasource": {
@@ -774,10 +891,11 @@
"h": 8,
"w": 12,
"x": 12,
- "y": 14
+ "y": 22
},
"id": 62,
"options": {
+ "cellHeight": "sm",
"footer": {
"countRows": false,
"fields": "",
@@ -788,7 +906,7 @@
},
"showHeader": true
},
- "pluginVersion": "9.4.7",
+ "pluginVersion": "11.1.3",
"targets": [
{
"datasource": {
@@ -836,40 +954,32 @@
},
{
"datasource": {
- "type": "vertamedia-clickhouse-datasource",
- "uid": "PDF61E9E97939C7ED"
+ "type": "prometheus",
+ "uid": "P1809F7CD0C75ACF3"
},
+ "description": "Lag calculated from time records are inserted to CH and the timestamp recorded by debezium connector",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
+ "axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
- "barAlignment": 0,
- "drawStyle": "bars",
- "fillOpacity": 49,
- "gradientMode": "opacity",
+ "fillOpacity": 80,
+ "gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
- "lineInterpolation": "linear",
"lineWidth": 1,
- "pointSize": 3,
"scaleDistribution": {
"type": "linear"
},
- "showPoints": "auto",
- "spanNulls": false,
- "stacking": {
- "group": "A",
- "mode": "normal"
- },
"thresholdsStyle": {
"mode": "off"
}
@@ -895,20 +1005,122 @@
"h": 8,
"w": 12,
"x": 0,
- "y": 22
+ "y": 27
},
- "id": 64,
+ "id": 14,
"options": {
+ "barRadius": 0,
+ "barWidth": 0.97,
+ "fullHighlight": false,
+ "groupWidth": 0.7,
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
+ "orientation": "auto",
+ "showValue": "auto",
+ "stacking": "none",
"tooltip": {
"mode": "single",
"sort": "none"
+ },
+ "xTickLabelRotation": 0,
+ "xTickLabelSpacing": 0
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "PBFA97CFB590B2093"
+ },
+ "editorMode": "code",
+ "expr": "clickhouse_sink_db_lag/1000",
+ "legendFormat": "__auto",
+ "range": true,
+ "refId": "A"
}
+ ],
+ "title": "Source DB -> CH Lag (secs)",
+ "type": "barchart"
+ },
+ {
+ "datasource": {
+ "type": "vertamedia-clickhouse-datasource",
+ "uid": "PDF61E9E97939C7ED"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "thresholds"
+ },
+ "custom": {
+ "axisBorderShow": false,
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "fillOpacity": 80,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineWidth": 1,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 0,
+ "y": 35
+ },
+ "id": 64,
+ "options": {
+ "barRadius": 0,
+ "barWidth": 0.97,
+ "fullHighlight": true,
+ "groupWidth": 0.7,
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "orientation": "auto",
+ "showValue": "always",
+ "stacking": "none",
+ "tooltip": {
+ "mode": "multi",
+ "sort": "none"
+ },
+ "xField": "Time",
+ "xTickLabelRotation": 0,
+ "xTickLabelSpacing": -200
},
"pluginVersion": "9.4.7",
"targets": [
@@ -922,17 +1134,16 @@
"format": "time_series",
"formattedQuery": "SELECT $timeSeries as t, count() FROM $table WHERE $timeFilter GROUP BY t ORDER BY t",
"hide": false,
- "interval": "",
"intervalFactor": 1,
- "query": "SELECT\n toUnixTimestamp(toStartOfMinute(event_time)) * 1000 AS t,\n sum(rows)/60 as rows,\n table\nFROM system.part_log\n\nWHERE\n (event_type = 'NewPart')\n AND event_time >= toDateTime($from)\n and event_time < toDateTime($to)\nGROUP BY\n t,\n table\nORDER BY\n t,\n table",
- "rawQuery": "SELECT\n toUnixTimestamp(toStartOfMinute(event_time)) * 1000 AS t,\n sum(rows)/60 as rows,\n table\nFROM system.part_log\n\nWHERE\n (event_type = 'NewPart')\n AND event_time >= toDateTime(1684726913)\n and event_time < toDateTime(1684727213)\nGROUP BY\n t,\n table\nORDER BY\n t,\n table",
+ "query": "SELECT toStartOfInterval(event_time, toIntervalSecond(1)) AS _time_dec,\nsum(rows) as rows, table\nFROM system.part_log\nWHERE (event_type = 'NewPart') AND event_time >= toDateTime($from) and event_time < toDateTime($to)\nGROUP BY _time_dec, table",
+ "rawQuery": "SELECT toStartOfInterval(event_time, toIntervalSecond(1)) AS _time_dec,\nsum(rows) as rows, table\nFROM system.part_log\nWHERE (event_type = 'NewPart') AND event_time >= toDateTime(1681314113) and event_time < toDateTime(1681335713)\nGROUP BY _time_dec, table",
"refId": "A",
- "round": "1s",
+ "round": "0s",
"skip_comments": true
}
],
"title": "Rows inserted ",
- "type": "timeseries"
+ "type": "barchart"
},
{
"collapsed": false,
@@ -940,7 +1151,7 @@
"h": 1,
"w": 24,
"x": 0,
- "y": 30
+ "y": 43
},
"id": 28,
"panels": [],
@@ -978,7 +1189,7 @@
"h": 8,
"w": 4,
"x": 0,
- "y": 31
+ "y": 44
},
"id": 30,
"options": {
@@ -986,6 +1197,7 @@
"graphMode": "area",
"justifyMode": "auto",
"orientation": "auto",
+ "percentChangeColorMode": "standard",
"reduceOptions": {
"calcs": [
"lastNotNull"
@@ -993,9 +1205,11 @@
"fields": "",
"values": false
},
- "textMode": "auto"
+ "showPercentChange": false,
+ "textMode": "auto",
+ "wideLayout": true
},
- "pluginVersion": "9.4.7",
+ "pluginVersion": "11.1.3",
"targets": [
{
"datasource": {
@@ -1023,6 +1237,7 @@
"mode": "palette-classic"
},
"custom": {
+ "axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
@@ -1036,6 +1251,7 @@
"tooltip": false,
"viz": false
},
+ "insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
@@ -1073,7 +1289,7 @@
"h": 8,
"w": 10,
"x": 4,
- "y": 31
+ "y": 44
},
"id": 32,
"options": {
@@ -1115,6 +1331,7 @@
"mode": "palette-classic"
},
"custom": {
+ "axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
@@ -1128,6 +1345,7 @@
"tooltip": false,
"viz": false
},
+ "insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
@@ -1165,7 +1383,7 @@
"h": 8,
"w": 10,
"x": 14,
- "y": 31
+ "y": 44
},
"id": 34,
"options": {
@@ -1242,8 +1460,7 @@
"mode": "absolute",
"steps": [
{
- "color": "green",
- "value": null
+ "color": "green"
},
{
"color": "red",
@@ -1258,7 +1475,7 @@
"h": 8,
"w": 12,
"x": 0,
- "y": 39
+ "y": 52
},
"id": 48,
"options": {
@@ -1295,7 +1512,7 @@
"h": 1,
"w": 24,
"x": 0,
- "y": 47
+ "y": 60
},
"id": 22,
"panels": [],
@@ -1317,27 +1534,17 @@
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
- "barAlignment": 0,
- "drawStyle": "line",
- "fillOpacity": 0,
+ "fillOpacity": 80,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
- "lineInterpolation": "linear",
"lineWidth": 1,
- "pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
- "showPoints": "auto",
- "spanNulls": false,
- "stacking": {
- "group": "A",
- "mode": "none"
- },
"thresholdsStyle": {
"mode": "off"
}
@@ -1347,8 +1554,7 @@
"mode": "absolute",
"steps": [
{
- "color": "green",
- "value": null
+ "color": "green"
},
{
"color": "red",
@@ -1363,20 +1569,29 @@
"h": 8,
"w": 12,
"x": 0,
- "y": 48
+ "y": 61
},
"id": 24,
"options": {
+ "barRadius": 0,
+ "barWidth": 0.97,
+ "fullHighlight": false,
+ "groupWidth": 0.7,
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
+ "orientation": "auto",
+ "showValue": "auto",
+ "stacking": "none",
"tooltip": {
"mode": "single",
"sort": "none"
- }
+ },
+ "xTickLabelRotation": 0,
+ "xTickLabelSpacing": 0
},
"targets": [
{
@@ -1392,7 +1607,7 @@
}
],
"title": "Memory Used(MB)",
- "type": "timeseries"
+ "type": "barchart"
},
{
"datasource": {
@@ -1409,27 +1624,17 @@
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
- "barAlignment": 0,
- "drawStyle": "line",
- "fillOpacity": 0,
+ "fillOpacity": 80,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
- "lineInterpolation": "linear",
"lineWidth": 1,
- "pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
- "showPoints": "auto",
- "spanNulls": false,
- "stacking": {
- "group": "A",
- "mode": "none"
- },
"thresholdsStyle": {
"mode": "off"
}
@@ -1439,8 +1644,7 @@
"mode": "absolute",
"steps": [
{
- "color": "green",
- "value": null
+ "color": "green"
},
{
"color": "red",
@@ -1455,20 +1659,29 @@
"h": 8,
"w": 12,
"x": 12,
- "y": 48
+ "y": 61
},
"id": 18,
"options": {
+ "barRadius": 0,
+ "barWidth": 0.97,
+ "fullHighlight": false,
+ "groupWidth": 0.7,
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
+ "orientation": "auto",
+ "showValue": "auto",
+ "stacking": "none",
"tooltip": {
"mode": "single",
"sort": "none"
- }
+ },
+ "xTickLabelRotation": 0,
+ "xTickLabelSpacing": 0
},
"targets": [
{
@@ -1483,7 +1696,7 @@
}
],
"title": "Memory Max (MB)",
- "type": "timeseries"
+ "type": "barchart"
},
{
"collapsed": false,
@@ -1491,7 +1704,7 @@
"h": 1,
"w": 24,
"x": 0,
- "y": 56
+ "y": 69
},
"id": 36,
"panels": [],
@@ -1513,8 +1726,7 @@
"mode": "absolute",
"steps": [
{
- "color": "green",
- "value": null
+ "color": "green"
},
{
"color": "red",
@@ -1529,7 +1741,7 @@
"h": 8,
"w": 12,
"x": 0,
- "y": 57
+ "y": 70
},
"id": 38,
"options": {
@@ -1546,7 +1758,7 @@
},
"textMode": "auto"
},
- "pluginVersion": "9.4.7",
+ "pluginVersion": "9.1.1",
"targets": [
{
"datasource": {
@@ -1578,8 +1790,7 @@
"mode": "absolute",
"steps": [
{
- "color": "green",
- "value": null
+ "color": "green"
},
{
"color": "red",
@@ -1594,7 +1805,7 @@
"h": 8,
"w": 12,
"x": 12,
- "y": 57
+ "y": 70
},
"id": 40,
"options": {
@@ -1611,7 +1822,7 @@
},
"textMode": "auto"
},
- "pluginVersion": "9.4.7",
+ "pluginVersion": "9.1.1",
"targets": [
{
"datasource": {
@@ -1634,7 +1845,7 @@
"h": 1,
"w": 24,
"x": 0,
- "y": 65
+ "y": 78
},
"id": 42,
"panels": [],
@@ -1656,8 +1867,7 @@
"mode": "absolute",
"steps": [
{
- "color": "green",
- "value": null
+ "color": "green"
},
{
"color": "red",
@@ -1672,7 +1882,7 @@
"h": 8,
"w": 12,
"x": 0,
- "y": 66
+ "y": 79
},
"id": 44,
"options": {
@@ -1689,7 +1899,7 @@
},
"textMode": "auto"
},
- "pluginVersion": "9.4.7",
+ "pluginVersion": "9.1.1",
"targets": [
{
"datasource": {
@@ -1721,29 +1931,16 @@
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
- "barAlignment": 0,
- "drawStyle": "line",
- "fillOpacity": 0,
+ "fillOpacity": 80,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
- "lineInterpolation": "linear",
"lineWidth": 1,
- "pointSize": 5,
"scaleDistribution": {
"type": "linear"
- },
- "showPoints": "auto",
- "spanNulls": false,
- "stacking": {
- "group": "A",
- "mode": "none"
- },
- "thresholdsStyle": {
- "mode": "off"
}
},
"mappings": [],
@@ -1751,8 +1948,7 @@
"mode": "absolute",
"steps": [
{
- "color": "green",
- "value": null
+ "color": "green"
},
{
"color": "red",
@@ -1767,20 +1963,28 @@
"h": 8,
"w": 12,
"x": 12,
- "y": 66
+ "y": 79
},
"id": 46,
"options": {
+ "barRadius": 0,
+ "barWidth": 0.97,
+ "groupWidth": 0.7,
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
+ "orientation": "auto",
+ "showValue": "auto",
+ "stacking": "none",
"tooltip": {
"mode": "single",
"sort": "none"
- }
+ },
+ "xTickLabelRotation": 0,
+ "xTickLabelSpacing": 0
},
"targets": [
{
@@ -1796,25 +2000,24 @@
}
],
"title": "Live Thread States",
- "type": "timeseries"
+ "type": "barchart"
}
],
- "refresh": "",
+ "refresh": false,
"revision": 1,
- "schemaVersion": 38,
- "style": "dark",
+ "schemaVersion": 39,
"tags": [],
"templating": {
"list": []
},
"time": {
- "from": "now-15m",
+ "from": "now-5m",
"to": "now"
},
"timepicker": {},
"timezone": "",
"title": "Altinity Sink Connector for ClickHouse",
- "uid": "c4cKtgk4p",
- "version": 2,
+ "uid": "c4cKtgk4k",
+ "version": 1,
"weekStart": ""
-}
+}
\ No newline at end of file
diff --git a/sink-connector/pom.xml b/sink-connector/pom.xml
index 597bb9865..146cb2413 100644
--- a/sink-connector/pom.xml
+++ b/sink-connector/pom.xml
@@ -313,7 +313,13 @@
debezium-core
${version.debezium}
-
+
+ io.debezium
+ debezium-storage-jdbc
+ ${version.debezium}
+
+
org.projectlombok
lombok
@@ -333,6 +339,11 @@
maven-artifact
3.9.1
+
+ com.zaxxer
+ HikariCP
+ 6.0.0
+
com.clickhouse
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java
index 11a9a0bd7..b4e0989a9 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java
@@ -474,7 +474,46 @@ static ConfigDef newConfigDef() {
6,
ConfigDef.Width.NONE,
ClickHouseSinkConnectorConfigVariables.REPLICA_STATUS_VIEW.toString())
-
+ .define(
+ ClickHouseSinkConnectorConfigVariables.CONNECTION_POOL_MAX_SIZE.toString(),
+ Type.INT,
+ 500,
+ Importance.HIGH,
+ "The maximum size of the connection pool",
+ CONFIG_GROUP_CONNECTOR_CONFIG,
+ 6,
+ ConfigDef.Width.NONE,
+ ClickHouseSinkConnectorConfigVariables.CONNECTION_POOL_MAX_SIZE.toString())
+ .define(
+ ClickHouseSinkConnectorConfigVariables.CONNECTION_POOL_TIMEOUT.toString(),
+ Type.LONG,
+ 50000,
+ Importance.HIGH,
+ "The timeout for the connection pool",
+ CONFIG_GROUP_CONNECTOR_CONFIG,
+ 6,
+ ConfigDef.Width.NONE,
+ ClickHouseSinkConnectorConfigVariables.CONNECTION_POOL_TIMEOUT.toString())
+ .define(
+ ClickHouseSinkConnectorConfigVariables.CONNECTION_POOL_MIN_IDLE.toString(),
+ Type.INT,
+ 10,
+ Importance.HIGH,
+ "The minimum number of idle connections in the connection pool",
+ CONFIG_GROUP_CONNECTOR_CONFIG,
+ 6,
+ ConfigDef.Width.NONE,
+ ClickHouseSinkConnectorConfigVariables.CONNECTION_POOL_MIN_IDLE.toString())
+ .define(
+ ClickHouseSinkConnectorConfigVariables.OFFSET_STORAGE_TABLE_NAME.toString(),
+ Type.STRING,
+ "altinity_sink_connector",
+ Importance.HIGH,
+ "The name of the offset storage table",
+ CONFIG_GROUP_CONNECTOR_CONFIG,
+ 7,
+ ConfigDef.Width.NONE,
+ ClickHouseSinkConnectorConfigVariables.OFFSET_STORAGE_TABLE_NAME.toString())
;
}
}
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java
index b4fddc77c..7632bc869 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java
@@ -75,7 +75,13 @@ public enum ClickHouseSinkConnectorConfigVariables {
REPLICA_STATUS_VIEW("replica.status.view"),
MAX_QUEUE_SIZE("sink.connector.max.queue.size"),
- SINGLE_THREADED("single.threaded");
+ SINGLE_THREADED("single.threaded"),
+
+ CONNECTION_POOL_MAX_SIZE("connection.pool.max.size"),
+ CONNECTION_POOL_TIMEOUT("connection.pool.timeout"),
+ CONNECTION_POOL_MIN_IDLE("connection.pool.min.idle"),
+
+ OFFSET_STORAGE_TABLE_NAME("offset.storage.jdbc.offset.table.name");
private String label;
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/Metrics.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/Metrics.java
index 5dd3da440..7c64de41e 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/Metrics.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/common/Metrics.java
@@ -81,17 +81,6 @@ public static void initialize(String enableFlag, String metricsPort) {
connectorStartTimeMs = System.currentTimeMillis();
- // Register reporters here.
-// reporter = ConsoleReporter.forRegistry(registry)
-// .convertRatesTo(TimeUnit.SECONDS)
-// .convertDurationsTo(TimeUnit.SECONDS)
-// .build();
-// reporter.start(1, TimeUnit.MINUTES);
-
- // registry = new MetricRegistry();
-// registry.register("memory", new MemoryUsageGaugeSet());
-// registry.register("jvm.thread-states",new ThreadStatesGaugeSet());
-// registry.register("jvm.garbage-collector",new GarbageCollectorMetricSet());
parseConfiguration(enableFlag, metricsPort);
if(enableMetrics) {
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/BaseDbWriter.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/BaseDbWriter.java
index 8043611a8..be65142f9 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/BaseDbWriter.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/BaseDbWriter.java
@@ -2,9 +2,10 @@
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
-import com.clickhouse.jdbc.ClickHouseConnection;
import com.clickhouse.jdbc.ClickHouseDataSource;
+
+import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.ZoneId;
@@ -12,12 +13,16 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
+
+import com.zaxxer.hikari.HikariDataSource;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class BaseDbWriter {
- protected ClickHouseConnection conn;
+ public static final String DATABASE_CLIENT_NAME = "Sink_Connector";
+ public static final String SYSTEM_DB = "system";
+ protected Connection conn;
private String hostName;
private Integer port;
@@ -38,7 +43,7 @@ public BaseDbWriter(
String userName,
String password,
ClickHouseSinkConnectorConfig config,
- ClickHouseConnection conn
+ Connection conn
) {
this.hostName = hostName;
@@ -72,7 +77,8 @@ public static Properties splitJdbcProperties(String jdbcProperties) {
return properties;
}
- public ClickHouseConnection getConnection() {
+ public Connection getConnection() {
+ HikariDbSource.printConnectionInfo();
return this.conn;
}
public static String getConnectionString(String hostName, Integer port, String database) {
@@ -87,11 +93,12 @@ public static String getConnectionString(String hostName, Integer port, String d
* @param userName UserName
* @param password Password
*/
- public static ClickHouseConnection createConnection(String url, String clientName, String userName, String password,
- ClickHouseSinkConnectorConfig config) {
+ public static Connection createConnection(String url, String clientName, String userName,
+ String password, String databaseName
+ , ClickHouseSinkConnectorConfig config) {
String jdbcParams = "";
- ClickHouseConnection conn = null;
+ Connection conn = null;
if(config != null) {
config.getString(ClickHouseSinkConnectorConfigVariables.JDBC_PARAMETERS.toString());
}
@@ -106,8 +113,16 @@ public static ClickHouseConnection createConnection(String url, String clientNam
Properties userProps = splitJdbcProperties(jdbcParams);
properties.putAll(userProps);
}
+ // Add username/password to the url.
+ url = url + "?user=" + userName + "&password=" + password;
ClickHouseDataSource dataSource = new ClickHouseDataSource(url, properties);
- conn = dataSource.getConnection(userName, password);
+ // Get connection from the pool.
+ HikariDataSource hikariDbSource = HikariDbSource.getInstance(dataSource, databaseName, config);
+ // Create a new ClickHouseConnection object with the connection from the pool.
+ // Convert Connection to ClickHouseConnection.
+
+ conn = hikariDbSource.getConnection();
+ //conn = dataSource.getConnection(userName, password);
} catch (Exception e) {
log.error("Error creating ClickHouse connection" + e);
}
@@ -123,12 +138,12 @@ public static ClickHouseConnection createConnection(String url, String clientNam
* @return
* @throws SQLException
*/
- public String executeQuery(String sql) throws SQLException {
+ public String executeSystemQuery(String sql) throws SQLException {
String result = null;
if(this.conn == null) {
String jdbcUrl = BaseDbWriter.getConnectionString(hostName, port,
database);
- conn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", userName, password, config);
+ conn = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password, BaseDbWriter.SYSTEM_DB, config);
}
ResultSet rs = this.conn.prepareStatement(sql).executeQuery();
if(rs != null) {
@@ -137,6 +152,7 @@ public String executeQuery(String sql) throws SQLException {
}
}
+ //conn.close();
return result;
}
@@ -162,7 +178,7 @@ public ResultSet executeQueryWithResultSet(String sql) throws SQLException {
* @throws SQLException
*/
public String getClickHouseVersion() throws SQLException {
- return this.executeQuery("SELECT VERSION()");
+ return this.executeSystemQuery("SELECT VERSION()");
}
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DBMetadata.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DBMetadata.java
index e24977722..0da20d8ed 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DBMetadata.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DBMetadata.java
@@ -10,6 +10,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
+import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -49,7 +50,7 @@ public String getEngine() {
* @param tableName
* @return
*/
- public MutablePair getTableEngine(ClickHouseConnection conn, String databaseName, String tableName) {
+ public MutablePair getTableEngine(Connection conn, String databaseName, String tableName) {
MutablePair result;
result = getTableEngineUsingSystemTables(conn, databaseName, tableName);
@@ -67,7 +68,7 @@ public MutablePair getTableEngine(ClickHouseConnection con
* @param databaseName
* @return
*/
- public boolean checkIfDatabaseExists(ClickHouseConnection conn, String databaseName) throws SQLException {
+ public boolean checkIfDatabaseExists(Connection conn, String databaseName) throws SQLException {
boolean result = false;
try (Statement stmt = conn.createStatement()) {
@@ -98,7 +99,7 @@ public boolean checkIfDatabaseExists(ClickHouseConnection conn, String databaseN
* @param tableName
* @return
*/
- public MutablePair getTableEngineUsingShowTable(ClickHouseConnection conn, String databaseName,
+ public MutablePair getTableEngineUsingShowTable(Connection conn, String databaseName,
String tableName) {
MutablePair result = new MutablePair<>();
@@ -197,7 +198,7 @@ else if(createDML.contains(TABLE_ENGINE.REPLACING_MERGE_TREE.getEngine())) {
* @param tableName Table Name.
* @return TABLE_ENGINE type
*/
- public MutablePair getTableEngineUsingSystemTables(final ClickHouseConnection conn, final String database,
+ public MutablePair getTableEngineUsingSystemTables(final Connection conn, final String database,
final String tableName) {
MutablePair result = new MutablePair<>();
@@ -277,7 +278,7 @@ public boolean checkIfNewReplacingMergeTree(String currentClickHouseVersion) thr
* Function to get the column name and isNullable as key/value pair.
*/
public Map getColumnsIsNullableForTable(String tableName,
- ClickHouseConnection conn,
+ Connection conn,
String database) throws SQLException {
Map columnsIsNullable = new HashMap<>();
@@ -301,7 +302,7 @@ public Map getColumnsIsNullableForTable(String tableName,
* to get the column name and column data type as key/value pair.
*/
public Map getColumnsDataTypesForTable(String tableName,
- ClickHouseConnection conn,
+ Connection conn,
String database,
ClickHouseSinkConnectorConfig config) {
@@ -351,15 +352,22 @@ public Map getColumnsDataTypesForTable(String tableName,
/**
* Function to get the ClickHouse server timezone(Defaults to UTC)
*/
- public ZoneId getServerTimeZone(ClickHouseConnection conn) {
+ public ZoneId getServerTimeZone(Connection conn) {
ZoneId result = ZoneId.of("UTC");
if(conn != null) {
- TimeZone serverTimeZone = conn.getServerTimeZone();
- if(serverTimeZone != null) {
- result = serverTimeZone.toZoneId();
- }
+ try {
+ // Perform a query to get the server timezone
+ ResultSet rs = conn.prepareStatement("SELECT timezone()").executeQuery();
+ if (rs.next()) {
+ String serverTimeZone = rs.getString(1);
+ result = ZoneId.of(serverTimeZone);
+ }
+ rs.close();
+ } catch (Exception e) {
+ log.error("Error retrieving server timezone", e);
}
+ }
return result;
}
@@ -368,7 +376,7 @@ public ZoneId getServerTimeZone(ClickHouseConnection conn) {
* @return
*/
public Set getAliasAndMaterializedColumnsForTableAndDatabase(String tableName, String databaseName,
- ClickHouseConnection conn) throws SQLException {
+ Connection conn) throws SQLException {
Set aliasColumns = new HashSet<>();
String query = "SELECT name FROM system.columns WHERE (table = '%s') AND (database = '%s') and " +
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbKafkaOffsetWriter.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbKafkaOffsetWriter.java
index 98b9dc5c7..8d99b166b 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbKafkaOffsetWriter.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbKafkaOffsetWriter.java
@@ -7,10 +7,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.sql.*;
import java.util.HashMap;
import java.util.Map;
@@ -30,7 +27,7 @@ public DbKafkaOffsetWriter(
String userName,
String password,
ClickHouseSinkConnectorConfig config,
- ClickHouseConnection connection
+ Connection connection
) {
super(hostName, port, database, userName, password, config, connection);
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java
index 457532268..3ab550151 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java
@@ -5,7 +5,7 @@
import com.altinity.clickhouse.sink.connector.db.operations.ClickHouseAutoCreateTable;
import com.altinity.clickhouse.sink.connector.db.operations.ClickHouseCreateDatabase;
import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct;
-import com.clickhouse.jdbc.ClickHouseConnection;
+import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.tuple.MutablePair;
@@ -13,10 +13,14 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.sql.Connection;
import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.Map;
+import static io.debezium.storage.jdbc.JdbcCommonConfig.CONFIGURATION_FIELD_PREFIX_STRING;
+import static io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX;
+
/**
* Class that abstracts all functionality
* related to interacting with Clickhouse DB.
@@ -71,7 +75,7 @@ public DbWriter(
String password,
ClickHouseSinkConnectorConfig config,
ClickHouseStruct record,
- ClickHouseConnection connection
+ Connection connection
) {
// Base class initiates connection using JDBC.
super(hostName, port, database, userName, password, config, connection);
@@ -87,7 +91,12 @@ public DbWriter(
}
DBMetadata metadata = new DBMetadata();
- createOffsetSchemaHistoryDatabase();
+ String offsetStorageDatabaseName = getOffsetStorageDatabaseName();
+ if(offsetStorageDatabaseName != null) {
+ createDestinationDatabase(offsetStorageDatabaseName);
+ }
+ // ToDO: create destination database if not exists
+ createDestinationDatabase(database);
MutablePair response = metadata.getTableEngine(this.conn, database, tableName);
this.engine = response.getLeft();
@@ -154,36 +163,59 @@ public DbWriter(
}
}
+ public String getOffsetStorageDatabaseName() {
+
+ String offsetSchemaHistoryTable = null;
+ try {
+ offsetSchemaHistoryTable = config.getString(OFFSET_STORAGE_PREFIX + JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name());
+ } catch(Exception e) {
+ log.error("***** Error retrieving offset store configuration ****", e);
+ }
+ if(offsetSchemaHistoryTable == null || offsetSchemaHistoryTable.isEmpty() == true) {
+ log.warn("Skipping creating offset schema history table as the query was not provided in configuration");
+ return null;
+ }
+ String offsetStorageDatabaseNameArray[] = offsetSchemaHistoryTable.split("\\.");
+ if(offsetStorageDatabaseNameArray.length <= 2) {
+ log.warn("Skipping creating offset schema history table as the query was not provided in configuration");
+ return null;
+ }
+ String offsetStorageDatabaseName = offsetStorageDatabaseNameArray[0];
+ String offsetStorageTableName = offsetStorageDatabaseNameArray[1];
+
+ return offsetStorageDatabaseName;
+ }
// Create offset/schema history storage database.
- public void createOffsetSchemaHistoryDatabase() {
+ public void createDestinationDatabase(String databaseName) {
+
DBMetadata metadata = new DBMetadata();
try {
- if (false == metadata.checkIfDatabaseExists(this.conn, database)) {
- new ClickHouseCreateDatabase().createNewDatabase(this.conn, database);
+ if (false == metadata.checkIfDatabaseExists(this.conn, databaseName)) {
+ new ClickHouseCreateDatabase().createNewDatabase(this.conn, databaseName);
}
} catch(Exception e) {
int maxRetries = 0;
final int MAX_RETRIES = 5;
- log.error("Error creating Database: " + database);
+ log.error("Error creating Database: " + databaseName);
// Keep retrying to createNewDatabase until Max number of retries is reached.
boolean createDatabaseFailed = false;
while(maxRetries++ > MAX_RETRIES) {
try {
Thread.sleep(maxRetries * 5000);
- if (false == metadata.checkIfDatabaseExists(this.conn, database)) {
- new ClickHouseCreateDatabase().createNewDatabase(this.conn, database);
+ if (false == metadata.checkIfDatabaseExists(this.conn, databaseName)) {
+ new ClickHouseCreateDatabase().createNewDatabase(this.conn, databaseName);
createDatabaseFailed = true;
break;
}
} catch (Exception ex) {
- log.error("Retry Number: " + maxRetries + "of" + MAX_RETRIES + " Error creating Database: " + database);
+ log.error("Retry Number: " + maxRetries + "of" + MAX_RETRIES + " Error creating Database: " + databaseName);
}
}
// if maxRetries exceeded, throw runtime exception.
if(createDatabaseFailed == false) {
- throw new RuntimeException("Error creating Database: " + database);
+ throw new RuntimeException("Error creating Database: " + databaseName);
}
}
}
@@ -203,27 +235,6 @@ public boolean wasTableMetaDataRetrieved() {
return result;
}
-
-
-
- /**
- * Function to check if the column is of DateTime64
- * from the column type(string name)
- *
- * @param columnType
- * @return true if its DateTime64, false otherwise.
- */
- public static boolean isColumnDateTime64(String columnType) {
- //ClickHouseDataType dt = ClickHouseDataType.of(columnType);
- //ToDo: Figure out a way to get the ClickHouseDataType
- // from column name.
- boolean result = false;
- if (columnType.contains("DateTime64")) {
- result = true;
- }
- return result;
- }
-
public Map getColumnNameToDataTypeMap() {
return this.columnNameToDataTypeMap;
}
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/HikariDbSource.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/HikariDbSource.java
new file mode 100644
index 000000000..fb4272356
--- /dev/null
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/HikariDbSource.java
@@ -0,0 +1,108 @@
+package com.altinity.clickhouse.sink.connector.db;
+
+import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
+import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
+import com.altinity.clickhouse.sink.connector.common.Metrics;
+import com.clickhouse.jdbc.ClickHouseDataSource;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import io.micrometer.prometheus.PrometheusMeterRegistry;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.sql.Connection;
+import java.util.HashMap;
+import java.util.Map;
+
+// Singleton class(one per database)
+public class HikariDbSource {
+ private static Map instance = new HashMap<>();
+
+ private static Map connectionPool = new HashMap<>();
+ //private static HikariDbSource instance;
+
+ private static final Logger log = LogManager.getLogger(HikariDbSource.class);
+ //private HikariDataSource dataSource;
+ private String databaseName;
+
+ // private constructor
+ private HikariDbSource(ClickHouseDataSource dataSource, String databaseName) {
+ // this.createConnectionPool(dataSource, databaseName);
+ }
+
+
+ public static HikariDataSource getInstance(ClickHouseDataSource dataSource, String databaseName,
+ ClickHouseSinkConnectorConfig config) {
+
+ if(instance.containsKey(databaseName)) {
+ return instance.get(databaseName);
+ } else {
+ HikariDataSource hikariDataSource = createConnectionPool(dataSource, databaseName, config);
+ instance.put(databaseName, hikariDataSource);
+ }
+ return instance.get(databaseName);
+ }
+
+ private static HikariDataSource createConnectionPool(ClickHouseDataSource chDataSource,
+ String databaseName, ClickHouseSinkConnectorConfig config) {
+ // pass the clickhouse config to create the datasource
+
+ int maxPoolSize = config.getInt(ClickHouseSinkConnectorConfigVariables.CONNECTION_POOL_MAX_SIZE.toString());
+ long poolConnectionTimeout = config.getLong(ClickHouseSinkConnectorConfigVariables.CONNECTION_POOL_TIMEOUT.toString());
+ int minIdle = config.getInt(ClickHouseSinkConnectorConfigVariables.CONNECTION_POOL_MIN_IDLE.toString());
+
+ HikariConfig poolConfig = new HikariConfig();
+ poolConfig.setPoolName("clickhouse" + "-" + databaseName);
+ String jdbcUrl = String.format("jdbc:ch:{hostname}:{port}/%s?insert_quorum=auto&server_time_zone&server_version=22.13.1.24495", databaseName);
+ poolConfig.setJdbcUrl(jdbcUrl);
+ poolConfig.setDriverClassName("com.clickhouse.jdbc.ClickHouseDriver"); // Ensure driver is set
+ // poolConfig.setUsername(dataSource.getConnection().getCurrentUser()); // Optional, if already in JDBC URL
+ // poolConfig.setPassword(dataSource.getConnection().()); // Optional, if already in JDBC URL
+ poolConfig.setConnectionTimeout(poolConnectionTimeout);
+ poolConfig.setMaximumPoolSize(maxPoolSize);
+ //poolConfig.setMinimumIdle(minIdle);
+ poolConfig.setIdleTimeout(2_000L);
+ poolConfig.setMaxLifetime(300_000L);
+ poolConfig.setDataSource(chDataSource);
+
+ HikariDataSource dataSource = new HikariDataSource(poolConfig);
+
+ PrometheusMeterRegistry meterRegistry = Metrics.meterRegistry();
+
+ if(meterRegistry != null) {
+ dataSource.setMetricRegistry(meterRegistry);
+ }
+ return dataSource;
+ }
+
+ public static void close() {
+
+ if(instance != null) {
+ for(HikariDataSource hikariDataSource: instance.values()) {
+ try {
+ hikariDataSource.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ instance.clear();
+ }
+ }
+
+ public static void closeDatabaseConnection(String databaseName) {
+ if(instance.containsKey(databaseName)) {
+ try {
+ instance.get(databaseName).close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ log.error("Error closing database connection pool", e);
+ }
+ }
+ }
+
+ public static void printConnectionInfo() {
+ for(HikariDataSource hikariDataSource: instance.values()) {
+ log.debug("Connection Pool Info: " + hikariDataSource.getPoolName() + " Max Size: " + hikariDataSource.getMaximumPoolSize() + " Active Connections: " + hikariDataSource.getHikariPoolMXBean().getActiveConnections());
+ }
+ }
+}
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/GroupInsertQueryWithBatchRecords.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/GroupInsertQueryWithBatchRecords.java
index 390804463..ef78926fe 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/GroupInsertQueryWithBatchRecords.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/GroupInsertQueryWithBatchRecords.java
@@ -15,6 +15,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.sql.Connection;
import java.util.*;
import static com.altinity.clickhouse.sink.connector.db.batch.CdcOperation.getCdcSectionBasedOnOperation;
@@ -37,7 +38,7 @@ public boolean groupQueryWithRecords(List records,
queryToRecordsMap,
Map partitionToOffsetMap,
ClickHouseSinkConnectorConfig config,
- String tableName, String databaseName, ClickHouseConnection connection,
+ String tableName, String databaseName, Connection connection,
Map columnNameToDataTypeMap) {
boolean result = false;
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java
index a7b321d08..bb611e760 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java
@@ -24,6 +24,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
@@ -69,7 +70,7 @@ public PreparedStatementExecutor(String replacingMergeTreeDeleteColumn,
public boolean addToPreparedStatementBatch(String topicName, Map>,
List> queryToRecordsMap, BlockMetaData bmd,
ClickHouseSinkConnectorConfig config,
- ClickHouseConnection conn,
+ Connection conn,
String tableName,
Map columnToDataTypeMap,
DBMetadata.TABLE_ENGINE engine) throws RuntimeException {
@@ -106,7 +107,7 @@ public boolean addToPreparedStatementBatch(String topicName, Map>, List> entry,
BlockMetaData bmd, ClickHouseSinkConnectorConfig config,
- ClickHouseConnection conn, String tableName, Map columnToDataTypeMap,
+ Connection conn, String tableName, Map columnToDataTypeMap,
DBMetadata.TABLE_ENGINE engine) throws RuntimeException {
AtomicBoolean result = new AtomicBoolean(false);
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAlterTable.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAlterTable.java
index 0e8224173..b3dfb588f 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAlterTable.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAlterTable.java
@@ -6,6 +6,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -53,7 +54,7 @@ public String createAlterTableSyntax(String tableName, Map colNa
* @para
* m modifiedFields
*/
- public void alterTable(List modifiedFields, String tableName, ClickHouseConnection connection, Map columnNameToDataTypeMap) {
+ public void alterTable(List modifiedFields, String tableName, Connection connection, Map columnNameToDataTypeMap) {
List missingFieldsInCH = new ArrayList();
// Identify the columns that need to be added/removed in ClickHouse.
for(Field f: modifiedFields) {
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java
index 0d8b906ad..b3b50003d 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTable.java
@@ -7,6 +7,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Map;
@@ -25,7 +26,7 @@ public class ClickHouseAutoCreateTable extends ClickHouseTableOperationsBase{
private static final Logger log = LogManager.getLogger(ClickHouseAutoCreateTable.class.getName());
public void createNewTable(ArrayList primaryKey, String tableName, String databaseName, Field[] fields,
- ClickHouseConnection connection, boolean isNewReplacingMergeTree,
+ Connection connection, boolean isNewReplacingMergeTree,
boolean useReplicatedReplacingMergeTree, String rmtDeleteColumn) throws SQLException {
Map colNameToDataTypeMap = this.getColumnNameToCHDataTypeMapping(fields);
String createTableQuery = this.createTableSyntax(primaryKey, tableName, databaseName, fields, colNameToDataTypeMap,
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseCreateDatabase.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseCreateDatabase.java
index 3e277b94d..c89f47bb2 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseCreateDatabase.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseCreateDatabase.java
@@ -2,10 +2,11 @@
import com.clickhouse.jdbc.ClickHouseConnection;
+import java.sql.Connection;
import java.sql.SQLException;
public class ClickHouseCreateDatabase extends ClickHouseTableOperationsBase {
- public void createNewDatabase(ClickHouseConnection conn, String dbName) throws SQLException {
+ public void createNewDatabase(Connection conn, String dbName) throws SQLException {
String query = String.format("USE system; CREATE DATABASE IF NOT EXISTS %s; USE %s", dbName, dbName);
this.runQuery(query, conn);
}
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseTableOperationsBase.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseTableOperationsBase.java
index cefac1c24..faace9e42 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseTableOperationsBase.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseTableOperationsBase.java
@@ -12,6 +12,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
@@ -100,7 +101,7 @@ public Map getColumnNameToCHDataTypeMapping(Field[] fields) {
* @param query
* @param conn
*/
- public void runQuery(String query, ClickHouseConnection conn) throws SQLException {
+ public void runQuery(String query, Connection conn) throws SQLException {
if(conn == null) {
log.error("ClickHouse connection not created");
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchExecutor.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchExecutor.java
index a44787805..abac9e4d3 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchExecutor.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchExecutor.java
@@ -2,10 +2,34 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
public class ClickHouseBatchExecutor extends ScheduledThreadPoolExecutor {
+ boolean isPaused = false;
public ClickHouseBatchExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, threadFactory);
+
+
+ }
+
+ public void pause() {
+ isPaused = true;
+ }
+
+ @Override
+ public void beforeExecute(Thread t, Runnable r) {
+ while (isPaused) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(100); // Polling
+ } catch (InterruptedException ie) {
+ t.interrupt();
+ }
+ }
+
+ }
+ public void resume() {
+ isPaused = false;
}
}
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java
index 7d199b2fc..993ef7280 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java
@@ -4,10 +4,7 @@
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
import com.altinity.clickhouse.sink.connector.common.Metrics;
import com.altinity.clickhouse.sink.connector.common.Utils;
-import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
-import com.altinity.clickhouse.sink.connector.db.DBMetadata;
-import com.altinity.clickhouse.sink.connector.db.DbKafkaOffsetWriter;
-import com.altinity.clickhouse.sink.connector.db.DbWriter;
+import com.altinity.clickhouse.sink.connector.db.*;
import com.altinity.clickhouse.sink.connector.db.batch.GroupInsertQueryWithBatchRecords;
import com.altinity.clickhouse.sink.connector.db.batch.PreparedStatementExecutor;
import com.altinity.clickhouse.sink.connector.model.BlockMetaData;
@@ -19,6 +16,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.sql.Connection;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.ArrayList;
@@ -41,11 +39,11 @@ public class ClickHouseBatchRunnable implements Runnable {
// Connection that will be used to create
// the debezium storage database.
- private ClickHouseConnection systemConnection;
+ private Connection systemConnection;
// For insert batch the database connection has to be the same.
// Create a map of database name to ClickHouseConnection.
- private Map databaseToConnectionMap = new HashMap<>();
+ private Map databaseToConnectionMap = new HashMap<>();
/**
* Data structures with state
*/
@@ -80,7 +78,8 @@ public ClickHouseBatchRunnable(LinkedBlockingQueue> recor
//this.topicToRecordsMap = new HashMap<>();
this.dbCredentials = parseDBConfiguration();
- this.systemConnection = createConnection();
+
+ this.systemConnection = createConnection(BaseDbWriter.SYSTEM_DB);
try {
@@ -91,17 +90,17 @@ public ClickHouseBatchRunnable(LinkedBlockingQueue> recor
}
}
- private ClickHouseConnection createConnection() {
+ private Connection createConnection(String databaseName) {
String jdbcUrl = BaseDbWriter.getConnectionString(this.dbCredentials.getHostName(),
this.dbCredentials.getPort(), "system");
- return BaseDbWriter.createConnection(jdbcUrl, "Sink Connector Lightweight", this.dbCredentials.getUserName(),
- this.dbCredentials.getPassword(), config);
+ return BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, this.dbCredentials.getUserName(),
+ this.dbCredentials.getPassword(), databaseName, config);
}
// Function to check if we have already stored a ClickHouseConnection
// in the databaseToConnectionMap.
- private ClickHouseConnection getClickHouseConnection(String databaseName) {
+ private Connection getClickHouseConnection(String databaseName) {
if (this.databaseToConnectionMap.containsKey(databaseName)) {
return this.databaseToConnectionMap.get(databaseName);
}
@@ -109,8 +108,8 @@ private ClickHouseConnection getClickHouseConnection(String databaseName) {
String jdbcUrl = BaseDbWriter.getConnectionString(this.dbCredentials.getHostName(),
this.dbCredentials.getPort(), databaseName);
- ClickHouseConnection conn = BaseDbWriter.createConnection(jdbcUrl, "Sink Connector Lightweight",
- this.dbCredentials.getUserName(), this.dbCredentials.getPassword(), config);
+ Connection conn = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME,
+ this.dbCredentials.getUserName(), this.dbCredentials.getPassword(), databaseName, config);
this.databaseToConnectionMap.put(databaseName, conn);
return conn;
@@ -238,7 +237,7 @@ public String getTableFromTopic(String topicName) {
}
public DbWriter getDbWriterForTable(String topicName, String tableName, String databaseName,
- ClickHouseStruct record, ClickHouseConnection connection) {
+ ClickHouseStruct record, Connection connection) {
DbWriter writer = null;
if (this.topicToDbWriterMap.containsKey(topicName)) {
@@ -296,7 +295,7 @@ private boolean processRecordsByTopic(String topicName, List r
if(this.databaseOverrideMap.containsKey(firstRecord.getDatabase()))
databaseName = this.databaseOverrideMap.get(firstRecord.getDatabase());
- ClickHouseConnection databaseConn = getClickHouseConnection(databaseName);
+ Connection databaseConn = getClickHouseConnection(databaseName);
DbWriter writer = getDbWriterForTable(topicName, tableName, databaseName, firstRecord, databaseConn);
PreparedStatementExecutor preparedStatementExecutor = new
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchWriter.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchWriter.java
index ce2f163e5..12bc3d362 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchWriter.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchWriter.java
@@ -19,6 +19,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.sql.Connection;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.ArrayList;
@@ -34,11 +35,11 @@ public class ClickHouseBatchWriter {
// Connection that will be used to create
// the debezium storage database.
- private ClickHouseConnection systemConnection;
+ private Connection systemConnection;
// For insert batch the database connection has to be the same.
// Create a map of database name to ClickHouseConnection.
- private Map databaseToConnectionMap = new HashMap<>();
+ private Map databaseToConnectionMap = new HashMap<>();
private static final Logger log = LogManager.getLogger(ClickHouseBatchWriter.class);
@@ -67,7 +68,7 @@ public ClickHouseBatchWriter(
//this.topicToRecordsMap = new HashMap<>();
this.dbCredentials = parseDBConfiguration();
- this.systemConnection = createConnection();
+ this.systemConnection = createConnection(BaseDbWriter.SYSTEM_DB);
try {
@@ -78,17 +79,17 @@ public ClickHouseBatchWriter(
}
}
- private ClickHouseConnection createConnection() {
+ private Connection createConnection(String database) {
String jdbcUrl = BaseDbWriter.getConnectionString(this.dbCredentials.getHostName(),
this.dbCredentials.getPort(), "system");
return BaseDbWriter.createConnection(jdbcUrl, "Sink Connector Lightweight", this.dbCredentials.getUserName(),
- this.dbCredentials.getPassword(), config);
+ this.dbCredentials.getPassword(), database,config);
}
// Function to check if we have already stored a ClickHouseConnection
// in the databaseToConnectionMap.
- private ClickHouseConnection getClickHouseConnection(String databaseName) {
+ private Connection getClickHouseConnection(String databaseName) {
if (this.databaseToConnectionMap.containsKey(databaseName)) {
return this.databaseToConnectionMap.get(databaseName);
}
@@ -96,8 +97,8 @@ private ClickHouseConnection getClickHouseConnection(String databaseName) {
String jdbcUrl = BaseDbWriter.getConnectionString(this.dbCredentials.getHostName(),
this.dbCredentials.getPort(), databaseName);
- ClickHouseConnection conn = BaseDbWriter.createConnection(jdbcUrl, "Sink Connector Lightweight",
- this.dbCredentials.getUserName(), this.dbCredentials.getPassword(), config);
+ Connection conn = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME,
+ this.dbCredentials.getUserName(), this.dbCredentials.getPassword(), databaseName, config);
this.databaseToConnectionMap.put(databaseName, conn);
return conn;
@@ -196,7 +197,7 @@ public String getTableFromTopic(String topicName) {
}
public DbWriter getDbWriterForTable(String topicName, String tableName, String databaseName,
- ClickHouseStruct record, ClickHouseConnection connection) {
+ ClickHouseStruct record, Connection connection) {
DbWriter writer = null;
if (this.topicToDbWriterMap.containsKey(topicName)) {
@@ -254,7 +255,7 @@ private boolean processRecordsByTopic(String topicName, List r
if(this.databaseOverrideMap.containsKey(firstRecord.getDatabase()))
databaseName = this.databaseOverrideMap.get(firstRecord.getDatabase());
- ClickHouseConnection databaseConn = getClickHouseConnection(databaseName);
+ Connection databaseConn = getClickHouseConnection(databaseName);
DbWriter writer = getDbWriterForTable(topicName, tableName, databaseName, firstRecord, databaseConn);
PreparedStatementExecutor preparedStatementExecutor = new
diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseDataTypeMapperTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseDataTypeMapperTest.java
index 88c30c43d..05b803aed 100644
--- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseDataTypeMapperTest.java
+++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseDataTypeMapperTest.java
@@ -14,10 +14,7 @@
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.sql.*;
import java.time.ZoneId;
import java.util.HashMap;
@@ -66,7 +63,8 @@ public void convert() throws SQLException {
String password = clickHouseContainer.getPassword();
String jdbcUrl = BaseDbWriter.getConnectionString(dbHostName, port, database);
- ClickHouseConnection conn = BaseDbWriter.createConnection(jdbcUrl, "client_1", userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>()));
+ Connection conn = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password,
+ BaseDbWriter.SYSTEM_DB, new ClickHouseSinkConnectorConfig(new HashMap<>()));
BaseDbWriter dbWriter = new BaseDbWriter(dbHostName, port,
database, userName, password, null, conn);
diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/converters/DebeziumConverterTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/converters/DebeziumConverterTest.java
index 336ece636..be649928d 100644
--- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/converters/DebeziumConverterTest.java
+++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/converters/DebeziumConverterTest.java
@@ -1,6 +1,7 @@
package com.altinity.clickhouse.sink.connector.converters;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
+import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import com.altinity.clickhouse.sink.connector.db.DbWriter;
import com.altinity.clickhouse.sink.connector.metadata.DataTypeRange;
import com.clickhouse.data.ClickHouseDataType;
@@ -12,6 +13,7 @@
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
+import java.sql.Connection;
import java.sql.PreparedStatement;
import java.time.*;
import java.time.temporal.ChronoUnit;
@@ -251,14 +253,15 @@ public void testBatchArrays() {
ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(new HashMap<>());
String jdbcUrl = DbWriter.getConnectionString(hostName, port, database);
- ClickHouseConnection conn1 = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, config);
+ Connection conn1 = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password,
+ BaseDbWriter.SYSTEM_DB, config);
DbWriter dbWriter = new DbWriter(hostName, port, database, tableName, userName, password, config, null, conn1);
String url = dbWriter.getConnectionString(hostName, port, database);
String insertQueryTemplate = "insert into test_ch_jdbc_complex_2(col1, col2, col3, col4, col5, col6) values(?, ?, ?, ?, ?, ?)";
try {
ClickHouseDataSource dataSource = new ClickHouseDataSource(url, properties);
- ClickHouseConnection conn = dataSource.getConnection(userName, password);
+ Connection conn = dataSource.getConnection(userName, password);
PreparedStatement ps = conn.prepareStatement(insertQueryTemplate);
diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/BaseDbWriterTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/BaseDbWriterTest.java
index 4341a27fa..fbf0b2dbd 100644
--- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/BaseDbWriterTest.java
+++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/BaseDbWriterTest.java
@@ -9,6 +9,7 @@
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.com.fasterxml.jackson.databind.annotation.JsonAppend;
+import java.sql.Connection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -21,8 +22,8 @@ public void testSplitJdbcProperties() {
props.put(ClickHouseSinkConnectorConfigVariables.JDBC_PARAMETERS.toString(), jdbcProperties);
ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(props);
- ClickHouseConnection conn = BaseDbWriter.createConnection(
- "localhost", "client_1","default", "", config);
+ Connection conn = BaseDbWriter.createConnection(
+ "localhost", BaseDbWriter.DATABASE_CLIENT_NAME,"default", "",BaseDbWriter.SYSTEM_DB, config);
Properties properties = new BaseDbWriter(
"localhost",
8123,
diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DBMetadataTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DBMetadataTest.java
index 2a463596a..67ec28056 100644
--- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DBMetadataTest.java
+++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DBMetadataTest.java
@@ -13,6 +13,7 @@
import org.apache.commons.lang3.tuple.MutablePair;
import org.testcontainers.utility.MountableFile;
+import java.sql.Connection;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.HashMap;
@@ -66,13 +67,14 @@ public void testCheckIfDatabaseExists() throws SQLException {
String dbHostName = clickHouseContainer.getHost();
Integer port = clickHouseContainer.getFirstMappedPort();
- String database = "default";
+ String database = "system";
String userName = clickHouseContainer.getUsername();
String password = clickHouseContainer.getPassword();
String tableName = "employees";
String jdbcUrl = BaseDbWriter.getConnectionString(dbHostName, port, database);
- ClickHouseConnection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>()));
+ Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password,
+ BaseDbWriter.SYSTEM_DB, new ClickHouseSinkConnectorConfig(new HashMap<>()));
DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password,
new ClickHouseSinkConnectorConfig(new HashMap<>()), null, conn);
@@ -141,13 +143,14 @@ public void testIsRMTVersionSupported(String clickhouseVersion, boolean result)
public void getTestGetServerTimeZone() {
String dbHostName = clickHouseContainer.getHost();
Integer port = clickHouseContainer.getFirstMappedPort();
- String database = "default";
+ String database = "system";
String userName = clickHouseContainer.getUsername();
String password = clickHouseContainer.getPassword();
String tableName = "employees";
String jdbcUrl = BaseDbWriter.getConnectionString(dbHostName, port, database);
- ClickHouseConnection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>()));
+ Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password,
+ BaseDbWriter.SYSTEM_DB,new ClickHouseSinkConnectorConfig(new HashMap<>()));
DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password,
new ClickHouseSinkConnectorConfig(new HashMap<>()), null, conn);
ZoneId serverTimeZone = new DBMetadata().getServerTimeZone(writer.getConnection());
@@ -166,7 +169,8 @@ public void getAliasAndMaterializedColumnsList() throws SQLException {
String tableName = "employees";
String jdbcUrl = BaseDbWriter.getConnectionString(dbHostName, port, database);
- ClickHouseConnection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>()));
+ Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password,
+ BaseDbWriter.SYSTEM_DB, new ClickHouseSinkConnectorConfig(new HashMap<>()));
Set aliasColumns = new DBMetadata().getAliasAndMaterializedColumnsForTableAndDatabase("people", "employees2", conn);
Assert.assertTrue(aliasColumns.size() == 2);
diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DbKafkaOffsetWriterTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DbKafkaOffsetWriterTest.java
index 4ce0d055e..10ddb2bd0 100644
--- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DbKafkaOffsetWriterTest.java
+++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DbKafkaOffsetWriterTest.java
@@ -13,6 +13,7 @@
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
+import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
@@ -33,13 +34,14 @@ public void testInsertTopicOffsetMetadata() throws SQLException {
String dbHostName = clickHouseContainer.getHost();
Integer port = clickHouseContainer.getFirstMappedPort();
- String database = "default";
+ String database = "system";
String userName = clickHouseContainer.getUsername();
String password = clickHouseContainer.getPassword();
String tableName = "employees";
String jdbcUrl = BaseDbWriter.getConnectionString(dbHostName, port, database);
- ClickHouseConnection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>()));
+ Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password,
+ BaseDbWriter.SYSTEM_DB,new ClickHouseSinkConnectorConfig(new HashMap<>()));
DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password,
new ClickHouseSinkConnectorConfig(new HashMap<>()), null, conn);
diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DbWriterTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DbWriterTest.java
index 757453ff1..0ae256317 100644
--- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DbWriterTest.java
+++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/DbWriterTest.java
@@ -22,6 +22,7 @@
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
+import java.sql.Connection;
import java.sql.PreparedStatement;
import java.time.ZoneId;
import java.util.*;
@@ -49,8 +50,8 @@ public static void init() {
ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(new HashMap<>());
String jdbcUrl = BaseDbWriter.getConnectionString(hostName, port, database);
- ClickHouseConnection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password,
- config);
+ Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password,
+ BaseDbWriter.SYSTEM_DB, config);
writer = new DbWriter(hostName, port, database, tableName, userName, password, config, null, conn);
}
@@ -87,10 +88,6 @@ public void testGetConnectionUrl() {
Assert.assertEquals(connectionUrl, "jdbc:clickhouse://remoteClickHouse:8123/employees");
}
- @Test
- public void testIsColumnTypeDate64() {
- boolean result = DbWriter.isColumnDateTime64("Nullable(DateTime64(3))");
- }
@Test
@Tag("IntegrationTest")
public void testGetColumnsDataTypesForTable() {
@@ -103,7 +100,8 @@ public void testGetColumnsDataTypesForTable() {
String tableName = "employees";
String jdbcUrl = BaseDbWriter.getConnectionString(dbHostName, port, database);
- ClickHouseConnection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>()));
+ Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password,
+ BaseDbWriter.SYSTEM_DB, new ClickHouseSinkConnectorConfig(new HashMap<>()));
DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password,
new ClickHouseSinkConnectorConfig(new HashMap<>()), null, conn);
@@ -114,7 +112,8 @@ public void testGetColumnsDataTypesForTable() {
String database2 = "employees2";
String jdbcUrl2 = BaseDbWriter.getConnectionString(dbHostName, port, database2);
- ClickHouseConnection conn2 = DbWriter.createConnection(jdbcUrl2, "client_1", userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>()));
+ Connection conn2 = DbWriter.createConnection(jdbcUrl2, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password,
+ BaseDbWriter.SYSTEM_DB, new ClickHouseSinkConnectorConfig(new HashMap<>()));
DbWriter writer2 = new DbWriter(dbHostName, port, database2, tableName, userName, password,
new ClickHouseSinkConnectorConfig(new HashMap<>()), null, conn2);
Map columnDataTypesMap2 = writer2.getColumnsDataTypesForTable("employees");
@@ -135,11 +134,11 @@ public void testGetEngineType() {
String tableName = "employees";
String jdbcUrl = BaseDbWriter.getConnectionString(dbHostName, port, database);
- ClickHouseConnection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password,
- new ClickHouseSinkConnectorConfig(new HashMap<>()));
+ Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password,
+ BaseDbWriter.SYSTEM_DB, new ClickHouseSinkConnectorConfig(new HashMap<>()));
DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password,
new ClickHouseSinkConnectorConfig(new HashMap<>()), null, conn);
- MutablePair result = new DBMetadata().getTableEngineUsingShowTable(writer.getConnection(), "default", "employees");
+ MutablePair result = new DBMetadata().getTableEngineUsingShowTable(writer.getConnection(), "system", "employees");
Assert.assertTrue(result.getLeft() == DBMetadata.TABLE_ENGINE.REPLACING_MERGE_TREE);
Assert.assertTrue(result.getRight().equalsIgnoreCase("_version"));
@@ -151,7 +150,7 @@ public void testGetEngineType() {
Assert.assertTrue(result_employees.getLeft() == DBMetadata.TABLE_ENGINE.REPLACING_MERGE_TREE);
Assert.assertTrue(result_employees.getRight().equalsIgnoreCase("_version_employees"));
- MutablePair resultProducts = new DBMetadata().getTableEngineUsingShowTable(writer.getConnection(), "default", "products");
+ MutablePair resultProducts = new DBMetadata().getTableEngineUsingShowTable(writer.getConnection(), "system", "products");
Assert.assertTrue(resultProducts.getLeft() == DBMetadata.TABLE_ENGINE.COLLAPSING_MERGE_TREE);
Assert.assertTrue(resultProducts.getRight().equalsIgnoreCase("sign"));
}
@@ -161,14 +160,14 @@ public void testGetEngineType() {
public void testGetEngineTypeUsingSystemTables() {
String dbHostName = clickHouseContainer.getHost();
Integer port = clickHouseContainer.getFirstMappedPort();
- String database = "default";
+ String database = "system";
String userName = clickHouseContainer.getUsername();
String password = clickHouseContainer.getPassword();
String tableName = "employees";
String jdbcUrl = BaseDbWriter.getConnectionString(dbHostName, port, database);
- ClickHouseConnection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password,
- new ClickHouseSinkConnectorConfig(new HashMap<>()));
+ Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password,
+ BaseDbWriter.SYSTEM_DB, new ClickHouseSinkConnectorConfig(new HashMap<>()));
DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password,
new ClickHouseSinkConnectorConfig(new HashMap<>()), null, conn);
MutablePair< DBMetadata.TABLE_ENGINE, String> result = new DBMetadata().getTableEngineUsingSystemTables(writer.getConnection(),
@@ -237,7 +236,8 @@ public void testGroupRecords() {
ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(new HashMap<>());
//String jdbcUrl = BaseDbWriter.getConnectionString(hostName, port, database);
- ClickHouseConnection conn = DbWriter.createConnection(connectionUrl, "client_1", userName, password, config);
+ Connection conn = DbWriter.createConnection(connectionUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password,
+ BaseDbWriter.SYSTEM_DB, config);
DbWriter dbWriter = new DbWriter(dbHostName, port, database, tableName, userName, password, config, null, conn);
Map>, List> queryToRecordsMap = new HashMap<>();
@@ -281,7 +281,8 @@ public void testGetClickHouseDataType() {
ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(new HashMap<>());
String jdbcUrl = BaseDbWriter.getConnectionString(hostName, port, database);
- ClickHouseConnection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, config);
+ Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password,
+ BaseDbWriter.SYSTEM_DB, config);
DbWriter dbWriter = new DbWriter(hostName, port, database, tableName, userName, password, config, null, conn);
PreparedStatementExecutor preparedStatementExecutor = new PreparedStatementExecutor(null,
false, null, null, database, ZoneId.of("UTC"));
@@ -312,7 +313,8 @@ public void testBatchArrays() {
ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(new HashMap<>());
String jdbcUrl = BaseDbWriter.getConnectionString(hostName, port, database);
- ClickHouseConnection conn2 = DbWriter.createConnection(jdbcUrl, "client_1", userName, "", config);
+ Connection conn2 = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, "",
+ BaseDbWriter.SYSTEM_DB, config);
DbWriter dbWriter = new DbWriter(hostName, port, database, tableName, userName, "", config,
null, conn2);
String url = dbWriter.getConnectionString(hostName, port, database);
@@ -336,7 +338,7 @@ public void testBatchArrays() {
try {
ClickHouseDataSource dataSource = new ClickHouseDataSource(url, properties);
- ClickHouseConnection conn = dataSource.getConnection(userName, "");
+ Connection conn = dataSource.getConnection(userName, "");
PreparedStatement ps = conn.prepareStatement(insertQueryTemplate);
@@ -377,7 +379,8 @@ public void testBatchInsert() {
ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(new HashMap<>());
String jdbcUrl = BaseDbWriter.getConnectionString(hostName, port, database);
- ClickHouseConnection conn2 = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, config);
+ Connection conn2 = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password,
+ BaseDbWriter.SYSTEM_DB, config);
DbWriter dbWriter = new DbWriter(hostName, port, database, tableName, userName, password, config,
null, conn2);
String url = dbWriter.getConnectionString(hostName, port, database);
@@ -385,7 +388,7 @@ public void testBatchInsert() {
String insertQueryTemplate = "insert into employees values(?,?,?,?,?,?)";
try {
ClickHouseDataSource dataSource = new ClickHouseDataSource(url, properties);
- ClickHouseConnection conn = dataSource.getConnection(userName, password);
+ Connection conn = dataSource.getConnection(userName, password);
PreparedStatement ps = conn.prepareStatement(insertQueryTemplate);
diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java
index e8bbe1e9c..34a130ea9 100644
--- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java
+++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAutoCreateTableTest.java
@@ -18,6 +18,7 @@
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
+import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -29,7 +30,7 @@ public class ClickHouseAutoCreateTableTest {
static Map columnToDataTypesMap;
- static ClickHouseConnection conn;
+ static Connection conn;
@Container
private ClickHouseContainer clickHouseContainer = new ClickHouseContainer("clickhouse/clickhouse-server:latest")
@@ -55,7 +56,8 @@ static void initialize() {
String jdbcUrl = BaseDbWriter.getConnectionString(hostName, port, database);
- ClickHouseConnection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, config);
+ Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password,
+ BaseDbWriter.SYSTEM_DB, config);
DbWriter writer = new DbWriter(hostName, port, database, tableName, userName, password, config, null, conn);
conn = writer.getConnection();
@@ -163,14 +165,15 @@ public void testCreateTableMultiplePrimaryKeys() {
public void testCreateNewTable() {
String dbHostName = clickHouseContainer.getHost();
Integer port = clickHouseContainer.getFirstMappedPort();
- String database = "default";
+ String database = "system";
String userName = clickHouseContainer.getUsername();
String password = clickHouseContainer.getPassword();
String tableName = "employees";
String jdbcUrl = BaseDbWriter.getConnectionString(dbHostName, port, database);
- ClickHouseConnection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>()));
+ Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password,
+ BaseDbWriter.SYSTEM_DB, new ClickHouseSinkConnectorConfig(new HashMap<>()));
DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password,
new ClickHouseSinkConnectorConfig(new HashMap<>()), null, conn);
diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseCreateDatabaseTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseCreateDatabaseTest.java
index 8673118b4..20b6c84cf 100644
--- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseCreateDatabaseTest.java
+++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseCreateDatabaseTest.java
@@ -14,6 +14,7 @@
import org.junit.jupiter.api.Test;
import org.testcontainers.clickhouse.ClickHouseContainer;
+import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -43,21 +44,22 @@ static void initialize() {
ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(new HashMap<>());
String jdbcUrl = BaseDbWriter.getConnectionString(hostName, port, systemDb);
- ClickHouseConnection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, config);
- dbWriter = new DbWriter(hostName, port, dbName, null, userName, password, config, null, conn);
- maintenanceDbWriter = new DbWriter(hostName, port, systemDb, null, userName, password, config, null, conn);
+ Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password,
+ DbWriter.SYSTEM_DB, config);
+ dbWriter = new DbWriter(hostName, port, dbName, "employees", userName, password, config, null, conn);
+ maintenanceDbWriter = new DbWriter(hostName, port, systemDb, "employees", userName, password, config, null, conn);
}
@BeforeEach
void dropTestDatabase() throws SQLException {
- Statement drop = maintenanceDbWriter.getConnection().createStatement();
- drop.executeQuery(String.format("DROP DATABASE IF EXISTS %s", dbName));
+// Statement drop = maintenanceDbWriter.getConnection().createStatement();
+// drop.executeQuery(String.format("DROP DATABASE IF EXISTS %s", dbName));
}
@Test
public void testCreateNewDatabase() throws SQLException {
ClickHouseCreateDatabase act = new ClickHouseCreateDatabase();
- ClickHouseConnection conn = dbWriter.getConnection();
+ Connection conn = dbWriter.getConnection();
try {
act.createNewDatabase(conn, dbName);
} catch(SQLException se) {