Skip to content

Commit

Permalink
[INLONG-5046][Agent] Support collect data from PostgreSQL (apache#5367)
Browse files Browse the repository at this point in the history
  • Loading branch information
iamsee123 authored and rhizoma-atractylodis committed Aug 17, 2022
1 parent 45618bc commit b67a0af
Show file tree
Hide file tree
Showing 9 changed files with 672 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.constant;

/**
* Constants of job fetcher PostgreSQL snapshot mode
*/
public class PostgreSQLConstants {

public static final String INITIAL = "initial";

public static final String EARLIEST_OFFSET = "never";

public static final String ALWAYS = "always";

public static final String EXPORTED = "exported";

public static final String INITIAL_ONLY = "initial_only";

public static final String CUSTOM = "custom";
}
4 changes: 4 additions & 0 deletions inlong-agent/agent-plugins/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
</dependency>
<dependency>
<artifactId>awaitility</artifactId>
<groupId>org.awaitility</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.plugin.sources;

import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.sources.reader.PostgreSQLReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;

/**
* PostgreSQL source, split PostgreSQL source job into multi readers
*/
public class PostgreSQLSource extends AbstractSource {

private static final Logger LOGGER = LoggerFactory.getLogger(PostgreSQLSource.class);

public PostgreSQLSource() {

}

@Override
public List<Reader> split(JobProfile conf) {
super.init(conf);
Reader postgreSQLReader = new PostgreSQLReader();
List<Reader> readerList = new ArrayList<>();
readerList.add(postgreSQLReader);
sourceMetric.sourceSuccessCount.incrementAndGet();
return readerList;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.plugin.sources.reader;

import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import io.debezium.connector.postgresql.PostgresConnector;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.PostgreSQLConstants;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.metrics.AgentMetricItem;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.sources.snapshot.PostgreSQLSnapshotBase;
import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
import org.apache.inlong.agent.pojo.DebeziumFormat;
import org.apache.inlong.agent.pojo.DebeziumOffset;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_MAP_CAPACITY;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;

/**
* Read postgreSQL data
*/
public class PostgreSQLReader extends AbstractReader {

public static final String COMPONENT_NAME = "PostgreSQLReader";
public static final String JOB_POSTGRESQL_USER = "job.postgreSQLJob.user";
public static final String JOB_DATABASE_PASSWORD = "job.postgreSQLJob.password";
public static final String JOB_DATABASE_HOSTNAME = "job.postgreSQLJob.hostname";
public static final String JOB_DATABASE_PORT = "job.postgreSQLJob.port";
public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.postgreSQLjob.offset.intervalMs";
public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.postgreSQLjob.history.filename";
public static final String JOB_DATABASE_SNAPSHOT_MODE = "job.postgreSQLjob.snapshot.mode";
public static final String JOB_DATABASE_QUEUE_SIZE = "job.postgreSQLjob.queueSize";
public static final String JOB_DATABASE_OFFSETS = "job.postgreSQLjob.offsets";
public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE = "job.postgreSQLjob.offset.specificOffsetFile";
public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS = "job.postgreSQLjob.offset.specificOffsetPos";
public static final String JOB_DATABASE_DBNAME = "job.postgreSQLjob.dbname";
public static final String JOB_DATABASE_SERVER_NAME = "job.postgreSQLjob.servername";
public static final String JOB_DATABASE_PLUGIN_NAME = "job.postgreSQLjob.pluginname";
private static final Gson GSON = new Gson();
private static final Logger LOGGER = LoggerFactory.getLogger(PostgreSQLReader.class);
private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
protected AgentMetricItem readerMetric;
private String userName;
private String password;
private String hostName;
private String port;
private String offsetFlushIntervalMs;
private String offsetStoreFileName;
private String snapshotMode;
private String instanceId;
private String offset;
private String specificOffsetFile;
private String specificOffsetPos;
private String dbName;
private String pluginName;
private String serverName;
private PostgreSQLSnapshotBase postgreSQLSnapshot;
private boolean finished = false;
private ExecutorService executor;
/**
* pair.left : table name
* pair.right : actual data
*/
private LinkedBlockingQueue<Pair<String, String>> postgreSQLMessageQueue;
private JobProfile jobProfile;
private boolean destroyed = false;

public PostgreSQLReader() {
}

@Override
public Message read() {
if (!postgreSQLMessageQueue.isEmpty()) {
readerMetric.pluginReadCount.incrementAndGet();
return getPostgreSQLMessage();
} else {
return null;
}
}

private DefaultMessage getPostgreSQLMessage() {
Pair<String, String> message = postgreSQLMessageQueue.poll();
Map<String, String> header = new HashMap<>(DEFAULT_MAP_CAPACITY);
header.put(PROXY_KEY_DATA, message.getKey());
return new DefaultMessage(message.getValue().getBytes(StandardCharsets.UTF_8), header);
}

@Override
public void init(JobProfile jobConf) {
super.init(jobConf);
jobProfile = jobConf;
LOGGER.info("init PostgreSQL reader with jobConf {}", jobConf.toJsonStr());
userName = jobConf.get(JOB_POSTGRESQL_USER);
password = jobConf.get(JOB_DATABASE_PASSWORD);
hostName = jobConf.get(JOB_DATABASE_HOSTNAME);
port = jobConf.get(JOB_DATABASE_PORT);
dbName = jobConf.get(JOB_DATABASE_DBNAME);
serverName = jobConf.get(JOB_DATABASE_SERVER_NAME);
pluginName = jobConf.get(JOB_DATABASE_PLUGIN_NAME, "pgoutput");
instanceId = jobConf.getInstanceId();
offsetFlushIntervalMs = jobConf.get(JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, "100000");
offsetStoreFileName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME,
tryToInitAndGetHistoryPath()) + "/offset.dat" + instanceId;
snapshotMode = jobConf.get(JOB_DATABASE_SNAPSHOT_MODE, "");
postgreSQLMessageQueue = new LinkedBlockingQueue<>(jobConf.getInt(JOB_DATABASE_QUEUE_SIZE, 1000));
finished = false;

offset = jobConf.get(JOB_DATABASE_OFFSETS, "");
specificOffsetFile = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, "");
specificOffsetPos = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "-1");
postgreSQLSnapshot = new PostgreSQLSnapshotBase(offsetStoreFileName);
postgreSQLSnapshot.save(offset);

Properties props = getEngineProps();

DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(
io.debezium.engine.format.Json.class)
.using(props)
.notifying((records, committer) -> {
try {
for (ChangeEvent<String, String> record : records) {
DebeziumFormat debeziumFormat = GSON
.fromJson(record.value(), DebeziumFormat.class);
postgreSQLMessageQueue.put(Pair.of(debeziumFormat.getSource().getTable(), record.value()));
committer.markProcessed(record);
}
committer.markBatchFinished();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
System.currentTimeMillis(), records.size());
readerMetric.pluginReadCount.addAndGet(records.size());
} catch (Exception e) {
readerMetric.pluginReadFailCount.addAndGet(records.size());
LOGGER.error("parse binlog message error", e);
}
})
.using((success, message, error) -> {
if (!success) {
LOGGER.error("PostgreSQL job with jobConf {} has error {}", instanceId, message, error);
}
}).build();

executor = Executors.newSingleThreadExecutor();
executor.execute(engine);

LOGGER.info("get initial snapshot of job {}, snapshot {}", instanceId, getSnapshot());
}

private String tryToInitAndGetHistoryPath() {
String historyPath = agentConf.get(
AgentConstants.AGENT_HISTORY_PATH, AgentConstants.DEFAULT_AGENT_HISTORY_PATH);
String parentPath = agentConf.get(
AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME);
return AgentUtils.makeDirsIfNotExist(historyPath, parentPath).getAbsolutePath();
}

private Properties getEngineProps() {
Properties props = new Properties();

props.setProperty("name", "engine" + instanceId);
props.setProperty("connector.class", PostgresConnector.class.getCanonicalName());
props.setProperty("database.server.name", serverName);
props.setProperty("plugin.name", pluginName);
props.setProperty("database.hostname", hostName);
props.setProperty("database.port", port);
props.setProperty("database.user", userName);
props.setProperty("database.dbname", dbName);
props.setProperty("database.password", password);

props.setProperty("offset.flush.interval.ms", offsetFlushIntervalMs);
props.setProperty("database.snapshot.mode", snapshotMode);
props.setProperty("key.converter.schemas.enable", "false");
props.setProperty("value.converter.schemas.enable", "false");
props.setProperty("snapshot.mode", snapshotMode);
props.setProperty("offset.storage.file.filename", offsetStoreFileName);
if (PostgreSQLConstants.CUSTOM.equals(snapshotMode)) {
props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset());
} else {
props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
}
props.setProperty("tombstones.on.delete", "false");
props.setProperty("converters", "datetime");
props.setProperty("datetime.type", "org.apache.inlong.agent.plugin.utils.BinlogTimeConverter");
props.setProperty("datetime.format.date", "yyyy-MM-dd");
props.setProperty("datetime.format.time", "HH:mm:ss");
props.setProperty("datetime.format.datetime", "yyyy-MM-dd HH:mm:ss");
props.setProperty("datetime.format.timestamp", "yyyy-MM-dd HH:mm:ss");

LOGGER.info("PostgreSQL job {} start with props {}", jobProfile.getInstanceId(), props);
return props;
}

private String serializeOffset() {
Map<String, Object> sourceOffset = new HashMap<>();
Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null");
sourceOffset.put("file", specificOffsetFile);
Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null");
sourceOffset.put("pos", specificOffsetPos);
DebeziumOffset specificOffset = new DebeziumOffset();
specificOffset.setSourceOffset(sourceOffset);
Map<String, String> sourcePartition = new HashMap<>();
sourcePartition.put("server", instanceId);
specificOffset.setSourcePartition(sourcePartition);
byte[] serializedOffset = new byte[0];
try {
serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
} catch (IOException e) {
LOGGER.error("serialize offset message error", e);
}
return new String(serializedOffset, StandardCharsets.UTF_8);
}

@Override
public void destroy() {
synchronized (this) {
if (!destroyed) {
executor.shutdownNow();
postgreSQLSnapshot.close();
destroyed = true;
}
}
}

@Override
public boolean isFinished() {
return finished;
}

@Override
public String getReadSource() {
return instanceId;
}

@Override
public void setReadTimeout(long mill) {
return;
}

@Override
public void setWaitMillisecond(long millis) {
return;
}

@Override
public String getSnapshot() {
if (postgreSQLSnapshot != null) {
return postgreSQLSnapshot.getSnapshot();
}
return "";
}

@Override
public void finishRead() {
finished = true;
}

@Override
public boolean isSourceExist() {
return true;
}
}
Loading

0 comments on commit b67a0af

Please sign in to comment.