Skip to content

Commit 030c818

Browse files
wangpeixwangpeix
and
wangpeix
authoredApr 12, 2023
[INLONG-7783][Agent] Support sink data to Kafka (#7808)
Co-authored-by: wangpeix <luckywangpei@didiglobal.com>

File tree

9 files changed

+555
-5
lines changed

9 files changed

+555
-5
lines changed
 

‎inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java

+14
Original file line numberDiff line numberDiff line change
@@ -171,12 +171,26 @@ public class AgentConstants {
171171
public static final String DEFAULT_COMPRESSION_TYPE = "NONE";
172172

173173
public static final String PULSAR_CLIENT_PRODUCER_NUM = "agent.sink.pulsar.producer.num";
174+
public static final String KAFKA_SINK_PRODUCER_NUM = "agent.sink.kafka.producer.num";
174175
public static final int DEFAULT_PRODUCER_NUM = 3;
175176

176177
public static final String PULSAR_CLIENT_ENABLE_ASYNC_SEND = "agent.sink.pulsar.enbale.async.send";
178+
public static final String KAFKA_PRODUCER_ENABLE_ASYNC_SEND = "agent.sink.kafka.enbale.async.send";
177179
public static final boolean DEFAULT_ENABLE_ASYNC_SEND = true;
178180

179181
public static final String PULSAR_SINK_SEND_QUEUE_SIZE = "agent.sink.pulsar.send.queue.size";
182+
public static final String KAFKA_SINK_SEND_QUEUE_SIZE = "agent.sink.kafka.send.queue.size";
180183
public static final int DEFAULT_SEND_QUEUE_SIZE = 20000;
181184

185+
public static final String DEFAULT_KAFKA_SINK_SEND_ACKS = "1";
186+
public static final long DEFAULT_KAFKA_SINK_SYNC_SEND_TIMEOUT_MS = 3000;
187+
188+
public static final String DEFAULT_KAFKA_SINK_SEND_COMPRESSION_TYPE = "none";
189+
190+
public static final String DEFAULT_KAFKA_SINK_SEND_KEY_SERIALIZER =
191+
"org.apache.kafka.common.serialization.StringSerializer";
192+
193+
public static final String DEFAULT_KAFKA_SINK_SEND_VALUE_SERIALIZER =
194+
"org.apache.kafka.common.serialization.ByteArraySerializer";
195+
182196
}

‎inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java

+3
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class JobProfileDto {
3939
public static final String MANAGER_JOB = "MANAGER_JOB";
4040
public static final String DEFAULT_DATAPROXY_SINK = "org.apache.inlong.agent.plugin.sinks.ProxySink";
4141
public static final String PULSAR_SINK = "org.apache.inlong.agent.plugin.sinks.PulsarSink";
42+
public static final String KAFKA_SINK = "org.apache.inlong.agent.plugin.sinks.KafkaSink";
4243

4344
/**
4445
* file source
@@ -412,6 +413,8 @@ public static TriggerProfile convertToTriggerProfile(DataConfig dataConfig) {
412413
job.setTopicInfo(GSON.toJson(dataConfig.getTopicInfo()));
413414
if (mqType.equals(MQType.PULSAR)) {
414415
job.setSink(PULSAR_SINK);
416+
} else if (mqType.equals(MQType.KAFKA)) {
417+
job.setSink(KAFKA_SINK);
415418
} else {
416419
throw new IllegalArgumentException("input dataConfig" + dataConfig + "is invalid please check");
417420
}

‎inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java

+442
Large diffs are not rendered by default.

‎inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,10 @@ public void write(Message message) {
8282
});
8383
// increment the count of successful sinks
8484
sinkMetric.sinkSuccessCount.incrementAndGet();
85-
} else {
86-
// increment the count of failed sinks
87-
sinkMetric.sinkFailCount.incrementAndGet();
8885
}
8986
}
9087
} catch (Exception e) {
88+
sinkMetric.sinkFailCount.incrementAndGet();
9189
LOGGER.error("write message to Proxy sink error", e);
9290
} catch (Throwable t) {
9391
ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.agent.plugin.sinks;
19+
20+
import org.apache.inlong.agent.conf.JobProfile;
21+
import org.apache.inlong.agent.message.ProxyMessage;
22+
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
23+
import org.apache.inlong.agent.plugin.MiniAgent;
24+
import org.junit.BeforeClass;
25+
import org.junit.Test;
26+
27+
import java.nio.charset.StandardCharsets;
28+
import java.util.HashMap;
29+
import java.util.Map;
30+
31+
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_GROUP_ID;
32+
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
33+
import static org.junit.Assert.assertEquals;
34+
35+
public class KafkaSinkTest {
36+
37+
private static MockSink kafkaSink;
38+
private static JobProfile jobProfile;
39+
private static AgentBaseTestsHelper helper;
40+
private static MiniAgent agent;
41+
42+
@BeforeClass
43+
public static void setUp() throws Exception {
44+
helper = new AgentBaseTestsHelper(KafkaSinkTest.class.getName()).setupAgentHome();
45+
agent = new MiniAgent();
46+
jobProfile = JobProfile.parseJsonFile("kafkaSinkJob.json");
47+
jobProfile.set("job.mqClusters",
48+
"[{\"url\":\"mqurl\",\"token\":\"token\",\"mqType\":\"KAFKA\",\"params\":{}}]");
49+
jobProfile.set("job.topicInfo", "{\"topic\":\"topic\",\"inlongGroupId\":\"groupId\"}");
50+
System.out.println(jobProfile.toJsonStr());
51+
kafkaSink = new MockSink();
52+
kafkaSink.init(jobProfile);
53+
}
54+
55+
@Test
56+
public void testWrite() {
57+
String body = "testMesage";
58+
Map<String, String> attr = new HashMap<>();
59+
attr.put(PROXY_KEY_GROUP_ID, "groupId");
60+
attr.put(PROXY_KEY_STREAM_ID, "streamId");
61+
long count = 5;
62+
for (long i = 0; i < 5; i++) {
63+
kafkaSink.write(new ProxyMessage(body.getBytes(StandardCharsets.UTF_8), attr));
64+
}
65+
assertEquals(kafkaSink.sinkMetric.sinkSuccessCount.get(), count);
66+
}
67+
68+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"job": {
3+
"id": 1,
4+
"instance.id": "job_1",
5+
"source": "org.apache.inlong.agent.plugin.sources.TextFileSource",
6+
"sink": "org.apache.inlong.agent.plugin.sinks.KafkaSink",
7+
"channel": "org.apache.inlong.agent.plugin.channel.MemoryChannel",
8+
"groupId": "groupId",
9+
"streamId": "streamId"
10+
},
11+
"proxy": {
12+
"inlongGroupId": "groupId",
13+
"inlongStreamId": "streamId"
14+
}
15+
}

‎inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/MQClusterInfo.java

+2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.inlong.common.pojo.dataproxy;
1919

20+
import lombok.ToString;
2021
import org.apache.commons.lang3.StringUtils;
2122

2223
import java.util.HashMap;
@@ -25,6 +26,7 @@
2526
/**
2627
* MQ cluster info.
2728
*/
29+
@ToString
2830
public class MQClusterInfo {
2931

3032
private String url;

‎inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java

+2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ public class InlongConstants {
4040
*/
4141
public static final String COMMA = ",";
4242

43+
public static final String DOT = ".";
44+
4345
public static final String BLANK = " ";
4446

4547
public static final String SLASH = "/";

‎inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@
8484
import java.util.stream.Collectors;
8585
import java.util.stream.Stream;
8686

87+
import static org.apache.inlong.manager.common.consts.InlongConstants.DOT;
88+
8789
/**
8890
* Agent service layer implementation
8991
*/
@@ -476,9 +478,8 @@ private DataConfig getDataConfig(StreamSourceEntity entity, int op) {
476478
// add mq cluster setting
477479
List<MQClusterInfo> mqSet = new ArrayList<>();
478480
List<String> clusterTagList = Collections.singletonList(groupEntity.getInlongClusterTag());
479-
List<String> typeList = Arrays.asList(ClusterType.TUBEMQ, ClusterType.PULSAR);
480481
ClusterPageRequest pageRequest = ClusterPageRequest.builder()
481-
.typeList(typeList)
482+
.type(groupEntity.getMqType())
482483
.clusterTagList(clusterTagList)
483484
.build();
484485
List<InlongClusterEntity> mqClusterList = clusterMapper.selectByCondition(pageRequest);
@@ -518,6 +519,11 @@ private DataConfig getDataConfig(StreamSourceEntity entity, int op) {
518519
topicConfig.setInlongGroupId(groupId);
519520
topicConfig.setTopic(mqResource);
520521
dataConfig.setTopicInfo(topicConfig);
522+
} else if (MQType.KAFKA.equals(mqType)) {
523+
DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
524+
topicConfig.setInlongGroupId(groupId);
525+
topicConfig.setTopic(groupEntity.getMqResource() + DOT + streamEntity.getMqResource());
526+
dataConfig.setTopicInfo(topicConfig);
521527
}
522528
} else {
523529
LOGGER.warn("set syncSend=[0] as the stream not exists for groupId={}, streamId={}", groupId, streamId);

0 commit comments

Comments
 (0)
Please sign in to comment.