diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/PostgreSQLConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/PostgreSQLConstants.java new file mode 100644 index 00000000000..7f114a92d27 --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/PostgreSQLConstants.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.constant; + +/** + * Constants of job fetcher PostgreSQL snapshot mode + */ +public class PostgreSQLConstants { + + public static final String INITIAL = "initial"; + + public static final String EARLIEST_OFFSET = "never"; + + public static final String ALWAYS = "always"; + + public static final String EXPORTED = "exported"; + + public static final String INITIAL_ONLY = "initial_only"; + + public static final String CUSTOM = "custom"; +} diff --git a/inlong-agent/agent-plugins/pom.xml b/inlong-agent/agent-plugins/pom.xml index 48303112ad8..46f2e040a6e 100644 --- a/inlong-agent/agent-plugins/pom.xml +++ b/inlong-agent/agent-plugins/pom.xml @@ -88,6 +88,10 @@ org.postgresql postgresql + + io.debezium + debezium-connector-postgres + awaitility org.awaitility diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java new file mode 100644 index 00000000000..243d205dd69 --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources; + +import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.plugin.Reader; +import org.apache.inlong.agent.plugin.sources.reader.PostgreSQLReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; + +/** + * PostgreSQL source, split PostgreSQL source job into multi readers + */ +public class PostgreSQLSource extends AbstractSource { + + private static final Logger LOGGER = LoggerFactory.getLogger(PostgreSQLSource.class); + + public PostgreSQLSource() { + + } + + @Override + public List split(JobProfile conf) { + super.init(conf); + Reader postgreSQLReader = new PostgreSQLReader(); + List readerList = new ArrayList<>(); + readerList.add(postgreSQLReader); + sourceMetric.sourceSuccessCount.incrementAndGet(); + return readerList; + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java new file mode 100644 index 00000000000..5d3e9207c2d --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources.reader; + +import com.google.common.base.Preconditions; +import com.google.gson.Gson; +import io.debezium.connector.postgresql.PostgresConnector; +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.inlong.agent.conf.AgentConfiguration; +import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.constant.PostgreSQLConstants; +import org.apache.inlong.agent.message.DefaultMessage; +import org.apache.inlong.agent.metrics.AgentMetricItem; +import org.apache.inlong.agent.metrics.audit.AuditUtils; +import org.apache.inlong.agent.plugin.Message; +import org.apache.inlong.agent.plugin.sources.snapshot.PostgreSQLSnapshotBase; +import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore; +import org.apache.inlong.agent.pojo.DebeziumFormat; +import org.apache.inlong.agent.pojo.DebeziumOffset; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.DebeziumOffsetSerializer; +import org.apache.kafka.connect.storage.FileOffsetBackingStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_MAP_CAPACITY; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA; + +/** + * Read postgreSQL data + */ +public class PostgreSQLReader extends AbstractReader { + + public static final String COMPONENT_NAME = "PostgreSQLReader"; + public static final String JOB_POSTGRESQL_USER = "job.postgreSQLJob.user"; + public static final String JOB_DATABASE_PASSWORD = "job.postgreSQLJob.password"; + public static final String JOB_DATABASE_HOSTNAME = "job.postgreSQLJob.hostname"; + public static final String JOB_DATABASE_PORT = "job.postgreSQLJob.port"; + public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.postgreSQLjob.offset.intervalMs"; + public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.postgreSQLjob.history.filename"; + public static final String JOB_DATABASE_SNAPSHOT_MODE = "job.postgreSQLjob.snapshot.mode"; + public static final String JOB_DATABASE_QUEUE_SIZE = "job.postgreSQLjob.queueSize"; + public static final String JOB_DATABASE_OFFSETS = "job.postgreSQLjob.offsets"; + public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE = "job.postgreSQLjob.offset.specificOffsetFile"; + public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS = "job.postgreSQLjob.offset.specificOffsetPos"; + public static final String JOB_DATABASE_DBNAME = "job.postgreSQLjob.dbname"; + public static final String JOB_DATABASE_SERVER_NAME = "job.postgreSQLjob.servername"; + public static final String JOB_DATABASE_PLUGIN_NAME = "job.postgreSQLjob.pluginname"; + private static final Gson GSON = new Gson(); + private static final Logger LOGGER = LoggerFactory.getLogger(PostgreSQLReader.class); + private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf(); + protected AgentMetricItem readerMetric; + private String userName; + private String password; + private String hostName; + private String port; + private String offsetFlushIntervalMs; + private String offsetStoreFileName; + private String snapshotMode; + private String instanceId; + private String offset; + private String specificOffsetFile; + private String specificOffsetPos; + private String dbName; + private String pluginName; + private String serverName; + private PostgreSQLSnapshotBase postgreSQLSnapshot; + private boolean finished = false; + private ExecutorService executor; + /** + * pair.left : table name + * pair.right : actual data + */ + private LinkedBlockingQueue> postgreSQLMessageQueue; + private JobProfile jobProfile; + private boolean destroyed = false; + + public PostgreSQLReader() { + } + + @Override + public Message read() { + if (!postgreSQLMessageQueue.isEmpty()) { + readerMetric.pluginReadCount.incrementAndGet(); + return getPostgreSQLMessage(); + } else { + return null; + } + } + + private DefaultMessage getPostgreSQLMessage() { + Pair message = postgreSQLMessageQueue.poll(); + Map header = new HashMap<>(DEFAULT_MAP_CAPACITY); + header.put(PROXY_KEY_DATA, message.getKey()); + return new DefaultMessage(message.getValue().getBytes(StandardCharsets.UTF_8), header); + } + + @Override + public void init(JobProfile jobConf) { + super.init(jobConf); + jobProfile = jobConf; + LOGGER.info("init PostgreSQL reader with jobConf {}", jobConf.toJsonStr()); + userName = jobConf.get(JOB_POSTGRESQL_USER); + password = jobConf.get(JOB_DATABASE_PASSWORD); + hostName = jobConf.get(JOB_DATABASE_HOSTNAME); + port = jobConf.get(JOB_DATABASE_PORT); + dbName = jobConf.get(JOB_DATABASE_DBNAME); + serverName = jobConf.get(JOB_DATABASE_SERVER_NAME); + pluginName = jobConf.get(JOB_DATABASE_PLUGIN_NAME, "pgoutput"); + instanceId = jobConf.getInstanceId(); + offsetFlushIntervalMs = jobConf.get(JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, "100000"); + offsetStoreFileName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME, + tryToInitAndGetHistoryPath()) + "/offset.dat" + instanceId; + snapshotMode = jobConf.get(JOB_DATABASE_SNAPSHOT_MODE, ""); + postgreSQLMessageQueue = new LinkedBlockingQueue<>(jobConf.getInt(JOB_DATABASE_QUEUE_SIZE, 1000)); + finished = false; + + offset = jobConf.get(JOB_DATABASE_OFFSETS, ""); + specificOffsetFile = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, ""); + specificOffsetPos = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "-1"); + postgreSQLSnapshot = new PostgreSQLSnapshotBase(offsetStoreFileName); + postgreSQLSnapshot.save(offset); + + Properties props = getEngineProps(); + + DebeziumEngine> engine = DebeziumEngine.create( + io.debezium.engine.format.Json.class) + .using(props) + .notifying((records, committer) -> { + try { + for (ChangeEvent record : records) { + DebeziumFormat debeziumFormat = GSON + .fromJson(record.value(), DebeziumFormat.class); + postgreSQLMessageQueue.put(Pair.of(debeziumFormat.getSource().getTable(), record.value())); + committer.markProcessed(record); + } + committer.markBatchFinished(); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId, + System.currentTimeMillis(), records.size()); + readerMetric.pluginReadCount.addAndGet(records.size()); + } catch (Exception e) { + readerMetric.pluginReadFailCount.addAndGet(records.size()); + LOGGER.error("parse binlog message error", e); + } + }) + .using((success, message, error) -> { + if (!success) { + LOGGER.error("PostgreSQL job with jobConf {} has error {}", instanceId, message, error); + } + }).build(); + + executor = Executors.newSingleThreadExecutor(); + executor.execute(engine); + + LOGGER.info("get initial snapshot of job {}, snapshot {}", instanceId, getSnapshot()); + } + + private String tryToInitAndGetHistoryPath() { + String historyPath = agentConf.get( + AgentConstants.AGENT_HISTORY_PATH, AgentConstants.DEFAULT_AGENT_HISTORY_PATH); + String parentPath = agentConf.get( + AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME); + return AgentUtils.makeDirsIfNotExist(historyPath, parentPath).getAbsolutePath(); + } + + private Properties getEngineProps() { + Properties props = new Properties(); + + props.setProperty("name", "engine" + instanceId); + props.setProperty("connector.class", PostgresConnector.class.getCanonicalName()); + props.setProperty("database.server.name", serverName); + props.setProperty("plugin.name", pluginName); + props.setProperty("database.hostname", hostName); + props.setProperty("database.port", port); + props.setProperty("database.user", userName); + props.setProperty("database.dbname", dbName); + props.setProperty("database.password", password); + + props.setProperty("offset.flush.interval.ms", offsetFlushIntervalMs); + props.setProperty("database.snapshot.mode", snapshotMode); + props.setProperty("key.converter.schemas.enable", "false"); + props.setProperty("value.converter.schemas.enable", "false"); + props.setProperty("snapshot.mode", snapshotMode); + props.setProperty("offset.storage.file.filename", offsetStoreFileName); + if (PostgreSQLConstants.CUSTOM.equals(snapshotMode)) { + props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName()); + props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset()); + } else { + props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName()); + } + props.setProperty("tombstones.on.delete", "false"); + props.setProperty("converters", "datetime"); + props.setProperty("datetime.type", "org.apache.inlong.agent.plugin.utils.BinlogTimeConverter"); + props.setProperty("datetime.format.date", "yyyy-MM-dd"); + props.setProperty("datetime.format.time", "HH:mm:ss"); + props.setProperty("datetime.format.datetime", "yyyy-MM-dd HH:mm:ss"); + props.setProperty("datetime.format.timestamp", "yyyy-MM-dd HH:mm:ss"); + + LOGGER.info("PostgreSQL job {} start with props {}", jobProfile.getInstanceId(), props); + return props; + } + + private String serializeOffset() { + Map sourceOffset = new HashMap<>(); + Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, + JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null"); + sourceOffset.put("file", specificOffsetFile); + Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, + JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null"); + sourceOffset.put("pos", specificOffsetPos); + DebeziumOffset specificOffset = new DebeziumOffset(); + specificOffset.setSourceOffset(sourceOffset); + Map sourcePartition = new HashMap<>(); + sourcePartition.put("server", instanceId); + specificOffset.setSourcePartition(sourcePartition); + byte[] serializedOffset = new byte[0]; + try { + serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset); + } catch (IOException e) { + LOGGER.error("serialize offset message error", e); + } + return new String(serializedOffset, StandardCharsets.UTF_8); + } + + @Override + public void destroy() { + synchronized (this) { + if (!destroyed) { + executor.shutdownNow(); + postgreSQLSnapshot.close(); + destroyed = true; + } + } + } + + @Override + public boolean isFinished() { + return finished; + } + + @Override + public String getReadSource() { + return instanceId; + } + + @Override + public void setReadTimeout(long mill) { + return; + } + + @Override + public void setWaitMillisecond(long millis) { + return; + } + + @Override + public String getSnapshot() { + if (postgreSQLSnapshot != null) { + return postgreSQLSnapshot.getSnapshot(); + } + return ""; + } + + @Override + public void finishRead() { + finished = true; + } + + @Override + public boolean isSourceExist() { + return true; + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/PostgreSQLSnapshotBase.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/PostgreSQLSnapshotBase.java new file mode 100644 index 00000000000..d27213389c0 --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/PostgreSQLSnapshotBase.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources.snapshot; + +import org.apache.inlong.agent.utils.ThreadUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.util.Base64; + +/** + * PostgreSQL Snapshot + */ +public class PostgreSQLSnapshotBase implements SnapshotBase { + + public static final int BUFFER_SIZE = 1024; + public static final int START_OFFSET = 0; + private static final Logger LOGGER = LoggerFactory.getLogger(PostgreSQLSnapshotBase.class); + private final Base64.Decoder decoder = Base64.getDecoder(); + private final Base64.Encoder encoder = Base64.getEncoder(); + private File file; + private byte[] offset; + + public PostgreSQLSnapshotBase(String filePath) { + file = new File(filePath); + } + + @Override + public String getSnapshot() { + load(); + return encoder.encodeToString(offset); + } + + @Override + public void close() { + + } + + /** + * Load postgres offset from local file + */ + private void load() { + try { + if (!file.exists()) { + // if parentDir not exist, create first + File parentDir = file.getParentFile(); + if (parentDir == null) { + LOGGER.info("no parent dir, file:{}", file.getAbsolutePath()); + return; + } + if (!parentDir.exists()) { + boolean success = parentDir.mkdir(); + LOGGER.info("create dir {} result {}", parentDir, success); + } + file.createNewFile(); + } + FileInputStream fis = new FileInputStream(file); + BufferedInputStream inputStream = new BufferedInputStream(fis); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + int len; + byte[] buf = new byte[BUFFER_SIZE]; + while ((len = inputStream.read(buf)) != -1) { + outputStream.write(buf, START_OFFSET, len); + } + offset = outputStream.toByteArray(); + inputStream.close(); + outputStream.close(); + } catch (Throwable ex) { + LOGGER.error("load PostgreSQL WAL log error", ex); + ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); + } + } + + /** + * Save PostgreSQL offset to local file + */ + public void save(String snapshot) { + byte[] bytes = decoder.decode(snapshot); + if (bytes.length != 0) { + offset = bytes; + try (OutputStream output = Files.newOutputStream(file.toPath())) { + output.write(bytes); + } catch (Throwable e) { + LOGGER.error("save offset to file error", e); + ThreadUtils.threadThrowableHandler(Thread.currentThread(), e); + } + } + } +} diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/PostgreSQLOffsetManagerTest.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/PostgreSQLOffsetManagerTest.java new file mode 100644 index 00000000000..63cb8ae9306 --- /dev/null +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/PostgreSQLOffsetManagerTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources; + +import org.apache.commons.codec.binary.Base64; +import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; +import org.apache.inlong.agent.plugin.sources.snapshot.PostgreSQLSnapshotBase; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.nio.file.Path; +import java.nio.file.Paths; + +/** + * Test for PostgreSQL snapshot + */ +public class PostgreSQLOffsetManagerTest { + + private static AgentBaseTestsHelper helper; + + private static final String fileName = "testPostgreSQL.txt"; + + private static Path filePath; + + @BeforeClass + public static void setup() { + helper = new AgentBaseTestsHelper(PostgreSQLOffsetManagerTest.class.getName()).setupAgentHome(); + Path testDir = helper.getTestRootDir(); + filePath = Paths.get(testDir.toString(), fileName); + } + + @AfterClass + public static void teardown() { + helper.teardownAgentHome(); + } + + @Test + public void testOffset() { + PostgreSQLSnapshotBase snapshotManager = new PostgreSQLSnapshotBase(filePath.toString()); + byte[] snapshotBytes = new byte[]{-65,-14,23}; + final Base64 base64 = new Base64(); + String encodeSnapshot = base64.encodeAsString(snapshotBytes); + snapshotManager.save(encodeSnapshot); + Assert.assertEquals(snapshotManager.getSnapshot(),encodeSnapshot); + File file = new File(filePath.toString()); + Assert.assertEquals(file.exists(),true); + System.out.println(file.getAbsolutePath()); + } +} diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/PostgreSQLReaderTest.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/PostgreSQLReaderTest.java new file mode 100644 index 00000000000..812f7604a15 --- /dev/null +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/PostgreSQLReaderTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources; + +import com.google.gson.Gson; +import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.constant.SnapshotModeConstants; +import org.apache.inlong.agent.plugin.Message; +import org.apache.inlong.agent.plugin.sources.reader.PostgreSQLReader; +import org.apache.inlong.agent.pojo.DebeziumFormat; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID; + +/** + * Test for PostgreSQL reader + */ +public class PostgreSQLReaderTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(PostgreSQLReaderTest.class); + private static final Gson GSON = new Gson(); + + @Test + public void testDebeziumFormat() { + String debeziumJson = "{\n" + + " \"before\": null,\n" + + " \"after\": {\n" + + " \"id\": 1004,\n" + + " \"first_name\": \"name1\",\n" + + " \"last_name\": \"name2\"\n" + + " },\n" + + " \"source\": {\n" + + " \"version\": \"12\",\n" + + " \"name\": \"myserver\",\n" + + " \"ts_sec\": 0,\n" + + " \"gtid\": null,\n" + + " \"file\": \"000000010000000000000001\",\n" + + " \"row\": 0,\n" + + " \"snapshot\": true,\n" + + " \"thread\": null,\n" + + " \"db\": \"postgres\",\n" + + " \"table\": \"customers\"\n" + + " },\n" + + " \"op\": \"r\",\n" + + " \"ts_ms\": 1486500577691\n" + + " }"; + DebeziumFormat debeziumFormat = GSON.fromJson(debeziumJson, DebeziumFormat.class); + Assert.assertEquals("customers", debeziumFormat.getSource().getTable()); + Assert.assertEquals("true", debeziumFormat.getSource().getSnapshot()); + } + + /** + * this test is used for testing collect data from postgreSQL in unit test, + * and it may cause failure in compile + * thus we annotate it. + */ + // @Test + public void postgresLoadTest() { + JobProfile jobProfile = new JobProfile(); + jobProfile.set(PostgreSQLReader.JOB_POSTGRESQL_USER, "postgres"); + jobProfile.set(PostgreSQLReader.JOB_DATABASE_SERVER_NAME, "postgres"); + jobProfile.set(PostgreSQLReader.JOB_DATABASE_PLUGIN_NAME, "pgoutput"); + jobProfile.set(PostgreSQLReader.JOB_DATABASE_PASSWORD, "123456"); + jobProfile.set(PostgreSQLReader.JOB_DATABASE_HOSTNAME, "localhost"); + jobProfile.set(PostgreSQLReader.JOB_DATABASE_PORT, "5432"); + jobProfile.set(PostgreSQLReader.JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, "000000010000000000000001"); + jobProfile.set(PostgreSQLReader.JOB_DATABASE_SNAPSHOT_MODE, SnapshotModeConstants.INITIAL); + jobProfile.set(PostgreSQLReader.JOB_DATABASE_DBNAME, "postgres"); + jobProfile.set("job.instance.id", "_1"); + jobProfile.set(PROXY_INLONG_GROUP_ID, "groupid"); + jobProfile.set(PROXY_INLONG_STREAM_ID, "streamid"); + PostgreSQLReader postgreSQLReader = new PostgreSQLReader(); + postgreSQLReader.init(jobProfile); + while (true) { + Message message = postgreSQLReader.read(); + if (message != null) { + LOGGER.info("read message is {}", message.toString()); + break; + } + } + } +} diff --git a/licenses/inlong-agent/LICENSE b/licenses/inlong-agent/LICENSE index 94420705e78..dc8bd802228 100644 --- a/licenses/inlong-agent/LICENSE +++ b/licenses/inlong-agent/LICENSE @@ -367,6 +367,7 @@ The text of each license is the standard Apache 2.0 license. org.apache.curator:curator-recipes:2.12.0 - Curator Recipes (https://curator.apache.org/curator-recipes), (The Apache Software License, Version 2.0) io.debezium:debezium-api:1.8.0.Final - Debezium API (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0) io.debezium:debezium-connector-mysql:1.8.0.Final - Debezium Connector for MySQL (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0) + io.debezium:debezium-connector-postgres:1.8.0.Final - Debezium Connector for PostgreSQL (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0) io.debezium:debezium-core:1.8.0.Final - Debezium Core (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0) io.debezium:debezium-ddl-parser:1.8.0.Final - Debezium ANTLR DDL parsers (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0) io.debezium:debezium-embedded:1.8.0.Final - Debezium Embedded (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0) diff --git a/pom.xml b/pom.xml index 5dfc3ac26c7..8832579c48d 100644 --- a/pom.xml +++ b/pom.xml @@ -582,6 +582,11 @@ debezium-connector-mysql ${debezium.version} + + io.debezium + debezium-connector-postgres + ${debezium.version} + com.h2database h2