Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-10288][Agent] Update the Oracle Source #10746

Merged
merged 1 commit into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,17 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_MONGO_SSL_ENABLE = "task.mongoTask.sslEnabled";
public static final String TASK_MONGO_POLL_INTERVAL = "task.mongoTask.pollIntervalInMs";

// Oracle task
public static final String TASK_ORACLE_HOSTNAME = "task.oracleTask.hostname";
public static final String TASK_ORACLE_PORT = "task.oracleTask.port";
public static final String TASK_ORACLE_USER = "task.oracleTask.user";
public static final String TASK_ORACLE_PASSWORD = "task.oracleTask.password";
public static final String TASK_ORACLE_DBNAME = "task.oracleTask.dbname";
public static final String TASK_ORACLE_SERVERNAME = "task.oracleTask.serverName";
public static final String TASK_ORACLE_SCHEMA_INCLUDE_LIST = "task.oracleTask.schemaIncludeList";
public static final String TASK_ORACLE_TABLE_INCLUDE_LIST = "task.oracleTask.tableIncludeList";
public static final String TASK_ORACLE_SNAPSHOT_MODE = "task.oracleTask.snapshotMode";

// PostgreSQL task
public static final String TASK_POSTGRES_HOSTNAME = "task.postgreSQLTask.hostname";
public static final String TASK_POSTGRES_PORT = "task.postgreSQLTask.port";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public class OracleTask {
private String port;
private String serverName;
private String dbname;
private String tableIncludeList;
private String schemaIncludeList;

private OracleTask.Snapshot snapshot;
private OracleTask.Offset offset;
Expand Down Expand Up @@ -58,13 +60,15 @@ public static class History {
public static class OracleTaskConfig {

private String hostname;
private String user;
private String username;
private String password;
private String port;
private String dbname;
private String serverName;
private String database;
private String schemaName;
private String tableName;
private String primaryKey;

private String snapshotMode;
private String scanStartupMode;
private String intervalMs;
private String offsetFilename;
private String historyFilename;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class TaskProfileDto {
public static final String DEFAULT_KAFKA_TASK = "org.apache.inlong.agent.plugin.task.KafkaTask";
public static final String DEFAULT_PULSAR_TASK = "org.apache.inlong.agent.plugin.task.PulsarTask";
public static final String DEFAULT_MONGODB_TASK = "org.apache.inlong.agent.plugin.task.MongoDBTask";
public static final String DEFAULT_ORACLE_TASK = "org.apache.inlong.agent.plugin.task.OracleTask";
public static final String DEFAULT_POSTGRESQL_TASK = "org.apache.inlong.agent.plugin.task.PostgreSQLTask";
public static final String DEFAULT_CHANNEL = "org.apache.inlong.agent.plugin.channel.MemoryChannel";
public static final String MANAGER_JOB = "MANAGER_JOB";
Expand Down Expand Up @@ -308,12 +309,14 @@ private static OracleTask getOracleTask(DataConfig dataConfigs) {
OracleTaskConfig config = GSON.fromJson(dataConfigs.getExtParams(),
OracleTaskConfig.class);
OracleTask oracleTask = new OracleTask();
oracleTask.setUser(config.getUser());

oracleTask.setHostname(config.getHostname());
oracleTask.setPassword(config.getPassword());
oracleTask.setPort(config.getPort());
oracleTask.setServerName(config.getServerName());
oracleTask.setDbname(config.getDbname());
oracleTask.setUser(config.getUsername());
oracleTask.setPassword(config.getPassword());
oracleTask.setSchemaIncludeList(config.getSchemaName());
oracleTask.setDbname(config.getDatabase());
oracleTask.setTableIncludeList(config.getTableName());

OracleTask.Offset offset = new OracleTask.Offset();
offset.setFilename(config.getOffsetFilename());
Expand All @@ -322,7 +325,7 @@ private static OracleTask getOracleTask(DataConfig dataConfigs) {
oracleTask.setOffset(offset);

OracleTask.Snapshot snapshot = new OracleTask.Snapshot();
snapshot.setMode(config.getSnapshotMode());
snapshot.setMode(config.getScanStartupMode());
oracleTask.setSnapshot(snapshot);

OracleTask.History history = new OracleTask.History();
Expand Down Expand Up @@ -488,6 +491,7 @@ public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
profileDto.setTask(task);
break;
case ORACLE:
task.setTaskClass(DEFAULT_ORACLE_TASK);
OracleTask oracleTask = getOracleTask(dataConfig);
task.setOracleTask(oracleTask);
task.setSource(ORACLE_SOURCE);
Expand Down
6 changes: 6 additions & 0 deletions inlong-agent/agent-plugins/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,16 @@
<properties>
<inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir>
<debezium.version>1.8.0.Final</debezium.version>
<ojdbc.version>19.3.0.0</ojdbc.version>
<darwinsys.version>1.5.1</darwinsys.version>
</properties>

<dependencies>
<dependency>
<groupId>com.oracle.ojdbc</groupId>
<artifactId>ojdbc8</artifactId>
<version>${ojdbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>agent-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.plugin.instance;

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.constant.TaskConstants;

import java.io.IOException;

public class OracleInstance extends CommonInstance {

@Override
public void setInodeInfo(InstanceProfile profile) throws IOException {
profile.set(TaskConstants.INODE_INFO, "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,56 @@

package org.apache.inlong.agent.plugin.sources;

import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.except.FileException;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
import org.apache.inlong.agent.plugin.sources.reader.OracleReader;

import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.relational.history.FileDatabaseHistory;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import static org.apache.inlong.agent.constant.TaskConstants.*;

/**
* Oracle SQL source
*/
public class OracleSource extends AbstractSource {

private static final Logger logger = LoggerFactory.getLogger(OracleSource.class);
private static final Logger LOGGER = LoggerFactory.getLogger(OracleSource.class);
private static final Integer DEBEZIUM_QUEUE_SIZE = 100;
private ExecutorService executor;
public InstanceProfile profile;
private BlockingQueue<SourceData> debeziumQueue;
private Properties props = new Properties();

private String snapshotMode;
private String dbName;
private String tableName;
private String schema;

public OracleSource() {
}
Expand All @@ -51,32 +82,120 @@ public List<Reader> split(TaskProfile conf) {

@Override
protected String getThreadName() {
return null;
return "oracle-source-" + taskId + "-" + instanceId;
}

@Override
protected void initSource(InstanceProfile profile) {
try {
LOGGER.info("OracleSource init: {}", profile.toJsonStr());
debeziumQueue = new LinkedBlockingQueue<>(DEBEZIUM_QUEUE_SIZE);

dbName = profile.get(TASK_ORACLE_DBNAME);
tableName = profile.get(TASK_ORACLE_TABLE_INCLUDE_LIST);
schema = profile.get(TASK_ORACLE_SCHEMA_INCLUDE_LIST);
snapshotMode = profile.get(TASK_ORACLE_SNAPSHOT_MODE, "initial");

props.setProperty("name", "Oracle-" + instanceId);
props.setProperty("connector.class", OracleConnector.class.getName());

// Unified storage in "[agentPath]/data/"
String agentPath =
AgentConfiguration.getAgentConf().get(AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME);
String offsetPath = agentPath + "/data/" + getThreadName() + "/" + "offset.dat";
String historyPath = agentPath + "/data/" + getThreadName() + "/" + "history.dat";
props.setProperty("offset.storage", FileOffsetBackingStore.class.getName());
props.setProperty("offset.storage.file.filename", offsetPath);
props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName());
props.setProperty("database.history.file.filename", historyPath);

props.setProperty(String.valueOf(OracleConnectorConfig.HOSTNAME), profile.get(TASK_ORACLE_HOSTNAME));
props.setProperty(String.valueOf(OracleConnectorConfig.PORT), profile.get(TASK_ORACLE_PORT));
props.setProperty(String.valueOf(OracleConnectorConfig.USER), profile.get(TASK_ORACLE_USER));
props.setProperty(String.valueOf(OracleConnectorConfig.PASSWORD), profile.get(TASK_ORACLE_PASSWORD));
props.setProperty(String.valueOf(OracleConnectorConfig.TABLE_INCLUDE_LIST), schema + "." + tableName);
props.setProperty(String.valueOf(OracleConnectorConfig.SERVER_NAME), getThreadName());
props.setProperty(String.valueOf(OracleConnectorConfig.DATABASE_NAME), profile.get(TASK_ORACLE_DBNAME));
props.setProperty(String.valueOf(OracleConnectorConfig.SCHEMA_INCLUDE_LIST), schema);
props.setProperty(String.valueOf(OracleConnectorConfig.SNAPSHOT_MODE), snapshotMode);

// Prevent Base64 encoding of Oracle NUMBER type fields
props.setProperty(String.valueOf(OracleConnectorConfig.DECIMAL_HANDLING_MODE), "string");

props.setProperty("key.converter.schemas.enable", "false");
props.setProperty("value.converter.schemas.enable", "false");

executor = Executors.newSingleThreadExecutor();
executor.execute(startDebeziumEngine());

} catch (Exception ex) {
stopRunning();
throw new FileException("error init stream for " + instanceId, ex);
}
}

private Runnable startDebeziumEngine() {
return () -> {
AgentThreadFactory.nameThread(getThreadName() + "debezium");
try (DebeziumEngine<ChangeEvent<String, String>> debeziumEngine = DebeziumEngine.create(Json.class)
.using(props)
.using(OffsetCommitPolicy.always())
.notifying(this::handleConsumerEvent)
.build()) {
debeziumEngine.run();
} catch (Throwable e) {
LOGGER.error("do run error in postgres debezium: ", e);
}
};
}

private void handleConsumerEvent(List<ChangeEvent<String, String>> records,
DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> committer) throws InterruptedException {
for (ChangeEvent<String, String> record : records) {
boolean offerSuc = false;
SourceData sourceData = new SourceData(record.value().getBytes(StandardCharsets.UTF_8), "0L");
while (isRunnable() && !offerSuc) {
offerSuc = debeziumQueue.offer(sourceData, 1, TimeUnit.SECONDS);
}
committer.markProcessed(record);
}
committer.markBatchFinished();
}

@Override
protected void printCurrentState() {

LOGGER.info("oracle table is {}", tableName);
}

@Override
protected boolean doPrepareToRead() {
return false;
return true;
}

@Override
protected List<SourceData> readFromSource() {
return null;
List<SourceData> dataList = new ArrayList<>();
try {
int size = 0;
while (size < BATCH_READ_LINE_TOTAL_LEN) {
SourceData sourceData = debeziumQueue.poll(1, TimeUnit.SECONDS);
if (sourceData != null) {
LOGGER.info("readFromSource: {}", sourceData.getData());
size += sourceData.getData().length;
dataList.add(sourceData);
} else {
break;
}
}
} catch (InterruptedException e) {
LOGGER.error("poll {} data from debezium queue interrupted.", instanceId);
}
return dataList;
}

@Override
public Message read() {
return null;
return super.read();
}

@Override
Expand All @@ -86,16 +205,17 @@ protected boolean isRunnable() {

@Override
protected void releaseSource() {

LOGGER.info("release oracle source");
executor.shutdownNow();
}

@Override
public boolean sourceFinish() {
return false;
return super.sourceFinish();
}

@Override
public boolean sourceExist() {
return false;
return true;
}
}
Loading
Loading