Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-4993][Manager] Return details when querying a list of StreamSources #4994

Merged
merged 4 commits into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;

import java.io.IOException;
Expand Down Expand Up @@ -174,8 +174,8 @@ private static class DescribeSource extends AbstractCommandRunner {
@Override
void run() {
try {
List<SourceListResponse> sourceListResponses = managerClient.listSources(group, stream, type);
sourceListResponses.forEach(PrintUtils::printJson);
List<StreamSource> sources = managerClient.listSources(group, stream, type);
sources.forEach(PrintUtils::printJson);
} catch (Exception e) {
System.out.println(e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;

import java.io.IOException;
Expand Down Expand Up @@ -180,7 +180,7 @@ private static class ListSource extends AbstractCommandRunner {
@Parameter(names = {"-g", "--group"}, required = true, description = "inlong group id")
private String group;

@Parameter(names = {"-t", "--type"}, description = "sink type")
@Parameter(names = {"-t", "--type"}, description = "source type")
private String type;

ListSource(InnerInlongManagerClient managerClient) {
Expand All @@ -190,7 +190,7 @@ private static class ListSource extends AbstractCommandRunner {
@Override
void run() {
try {
List<SourceListResponse> sourceListResponses = managerClient.listSources(group, stream, type);
List<StreamSource> sourceListResponses = managerClient.listSources(group, stream, type);
PrintUtils.print(sourceListResponses, SourceInfo.class);
} catch (Exception e) {
System.out.println(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
Expand Down Expand Up @@ -202,12 +201,12 @@ private void initOrUpdateSource() {
InlongStreamInfo streamInfo = streamContext.getStreamInfo();
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
List<SourceListResponse> sourceListResponses = managerClient.listSources(groupId, streamId);
List<StreamSource> streamSources = managerClient.listSources(groupId, streamId);
List<String> updateSourceNames = Lists.newArrayList();
if (CollectionUtils.isNotEmpty(sourceListResponses)) {
for (SourceListResponse sourceListResponse : sourceListResponses) {
final String sourceName = sourceListResponse.getSourceName();
final int id = sourceListResponse.getId();
if (CollectionUtils.isNotEmpty(streamSources)) {
for (StreamSource source : streamSources) {
final String sourceName = source.getSourceName();
final int id = source.getId();
if (sourceRequests.get(sourceName) == null) {
boolean isDelete = managerClient.deleteSource(id);
if (!isDelete) {
Expand All @@ -222,7 +221,7 @@ private void initOrUpdateSource() {
updateState.getValue()));
}
updateSourceNames.add(sourceName);
sourceRequest.setId(sourceListResponse.getId());
sourceRequest.setId(source.getId());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
import org.apache.inlong.manager.common.util.HttpUtils;

import java.util.List;
Expand Down Expand Up @@ -111,8 +111,8 @@ public Map<String, SimpleGroupStatus> listGroupStatus(List<String> groupIds) {
groupListResponses.forEach(response -> {
String groupId = response.getInlongGroupId();
SimpleGroupStatus groupStatus = SimpleGroupStatus.parseStatusByCode(response.getStatus());
List<SourceListResponse> sourceListResponses = response.getSourceResponses();
groupStatus = recheckGroupStatus(groupStatus, sourceListResponses);
List<StreamSource> sources = response.getStreamSources();
groupStatus = recheckGroupStatus(groupStatus, sources);
groupStatusMap.put(groupId, groupStatus);
});
}
Expand All @@ -129,10 +129,9 @@ public InlongGroup getGroup(String groupId) {
return new InlongGroupImpl(groupInfo, this);
}

private SimpleGroupStatus recheckGroupStatus(SimpleGroupStatus groupStatus,
List<SourceListResponse> sourceListResponses) {
Map<SimpleSourceStatus, List<SourceListResponse>> statusListMap = Maps.newHashMap();
sourceListResponses.forEach(source -> {
private SimpleGroupStatus recheckGroupStatus(SimpleGroupStatus groupStatus, List<StreamSource> sources) {
Map<SimpleSourceStatus, List<StreamSource>> statusListMap = Maps.newHashMap();
sources.forEach(source -> {
SimpleSourceStatus status = SimpleSourceStatus.parseByStatus(source.getStatus());
statusListMap.computeIfAbsent(status, k -> Lists.newArrayList()).add(source);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.inlong.manager.client.api.util.StreamTransformTransfer;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
Expand Down Expand Up @@ -341,15 +340,15 @@ private void initOrUpdateTransform(InlongStreamInfo streamInfo) {
}

private void initOrUpdateSource(InlongStreamInfo streamInfo) {
List<SourceListResponse> sourceListResponses = managerClient.listSources(inlongGroupId, inlongStreamId);
List<StreamSource> streamSources = managerClient.listSources(inlongGroupId, inlongStreamId);
List<String> updateSourceNames = Lists.newArrayList();
for (SourceListResponse sourceListResponse : sourceListResponses) {
final String sourceName = sourceListResponse.getSourceName();
final int id = sourceListResponse.getId();
for (StreamSource source : streamSources) {
final String sourceName = source.getSourceName();
final int id = source.getId();
if (this.streamSources.get(sourceName) == null) {
boolean isDelete = managerClient.deleteSource(id);
if (!isDelete) {
throw new RuntimeException(String.format("Delete source=%s failed", sourceListResponse));
throw new RuntimeException(String.format("Delete source=%s failed", source));
}
} else {
StreamSource streamSource = this.streamSources.get(sourceName);
Expand All @@ -364,7 +363,7 @@ private void initOrUpdateSource(InlongStreamInfo streamInfo) {
updateSourceNames.add(sourceName);
}
}
for (Map.Entry<String, StreamSource> sourceEntry : streamSources.entrySet()) {
for (Map.Entry<String, StreamSource> sourceEntry : this.streamSources.entrySet()) {
String sourceName = sourceEntry.getKey();
if (updateSourceNames.contains(sourceName)) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@
import org.apache.inlong.manager.common.pojo.group.InlongGroupResetRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
Expand Down Expand Up @@ -326,15 +326,15 @@ public Integer createSource(SourceRequest request) {
/**
* Get information of sources.
*/
public List<SourceListResponse> listSources(String groupId, String streamId) {
public List<StreamSource> listSources(String groupId, String streamId) {
return listSources(groupId, streamId, null);
}

/**
* List information of sources by the specified source type.
*/
public List<SourceListResponse> listSources(String groupId, String streamId, String sourceType) {
Response<PageInfo<SourceListResponse>> response = executeHttpCall(
public List<StreamSource> listSources(String groupId, String streamId, String sourceType) {
Response<PageInfo<StreamSource>> response = executeHttpCall(
streamSourceApi.listSources(groupId, streamId, sourceType));
assertRespSuccess(response);
return response.getData().getList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import com.github.pagehelper.PageInfo;
import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
import retrofit2.Call;
import retrofit2.http.Body;
import retrofit2.http.DELETE;
Expand All @@ -38,7 +38,7 @@ public interface StreamSourceApi {
Call<Response<Boolean>> updateSource(@Body SourceRequest request);

@GET("source/list")
Call<Response<PageInfo<SourceListResponse>>> listSources(@Query("inlongGroupId") String groupId,
Call<Response<PageInfo<StreamSource>>> listSources(@Query("inlongGroupId") String groupId,
@Query("inlongStreamId") String streamId, @Query("sourceType") String sourceType);

@DELETE("source/delete/{id}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@
import org.apache.inlong.manager.common.auth.DefaultAuthentication;
import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupResetRequest;
import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSink;
Expand All @@ -48,16 +49,11 @@
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSink;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkListResponse;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSource;
import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSourceListResponse;
import org.apache.inlong.manager.common.pojo.source.file.FileSource;
import org.apache.inlong.manager.common.pojo.source.file.FileSourceListResponse;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSource;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
import org.apache.inlong.manager.common.pojo.source.mysql.MySQLBinlogSource;
import org.apache.inlong.manager.common.pojo.source.mysql.MySQLBinlogSourceListResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
Expand Down Expand Up @@ -154,9 +150,9 @@ void testListGroup4AutoPushSource() {
.id(1)
.inlongGroupId("1")
.name("name")
.sourceResponses(
.streamSources(
Lists.newArrayList(
AutoPushSourceListResponse.builder()
AutoPushSource.builder()
.id(22)
.inlongGroupId("1")
.inlongStreamId("2")
Expand Down Expand Up @@ -186,9 +182,9 @@ void testListGroup4BinlogSource() {
.id(1)
.inlongGroupId("1")
.name("name")
.sourceResponses(
.streamSources(
Lists.newArrayList(
MySQLBinlogSourceListResponse.builder()
MySQLBinlogSource.builder()
.id(22)
.inlongGroupId("1")
.inlongStreamId("2")
Expand Down Expand Up @@ -226,9 +222,9 @@ void testListGroup4FileSource() {
.status(1)
.createTime(new Date())
.modifyTime(new Date())
.sourceResponses(
.streamSources(
Lists.newArrayList(
FileSourceListResponse.builder()
FileSource.builder()
.id(22)
.inlongGroupId("1")
.inlongStreamId("2")
Expand Down Expand Up @@ -259,9 +255,9 @@ void testListGroup4KafkaSource() {
InlongGroupListResponse.builder()
.id(1)
.inlongGroupId("1")
.sourceResponses(
.streamSources(
Lists.newArrayList(
KafkaSourceListResponse.builder()
KafkaSource.builder()
.id(22)
.inlongGroupId("1")
.inlongStreamId("2")
Expand Down Expand Up @@ -295,16 +291,16 @@ void testListGroup4KafkaSource() {

@Test
void testListGroup4AllSource() {
ArrayList<SourceListResponse> sourceListResponses = Lists.newArrayList(
AutoPushSourceListResponse.builder()
ArrayList<StreamSource> streamSources = Lists.newArrayList(
AutoPushSource.builder()
.id(22)
.inlongGroupId("1")
.inlongStreamId("2")
.sourceType("AUTO_PUSH")
.version(1)
.build(),

MySQLBinlogSourceListResponse.builder()
MySQLBinlogSource.builder()
.id(22)
.inlongGroupId("1")
.inlongStreamId("2")
Expand All @@ -317,7 +313,7 @@ void testListGroup4AllSource() {
.tableWhiteList("")
.build(),

FileSourceListResponse.builder()
FileSource.builder()
.id(22)
.inlongGroupId("1")
.inlongStreamId("2")
Expand All @@ -327,15 +323,14 @@ void testListGroup4AllSource() {
.timeOffset("timeOffset")
.build(),

KafkaSourceListResponse.builder()
KafkaSource.builder()
.id(22)
.inlongGroupId("1")
.inlongStreamId("2")
.sourceType("KAFKA")
.sourceName("source name")
.serializationType("csv")
.dataNodeName("dataNodeName")

.topic("topic")
.groupId("111")
.bootstrapServers("bootstrapServers")
Expand All @@ -349,7 +344,7 @@ void testListGroup4AllSource() {
.inlongGroupId("1")
.name("name")
.inCharges("admin")
.sourceResponses(sourceListResponses)
.streamSources(streamSources)
.build()
);

Expand Down Expand Up @@ -390,7 +385,7 @@ void testCreateGroup() {
)
);

String groupId = innerInlongManagerClient.createGroup(new InlongGroupRequest());
String groupId = innerInlongManagerClient.createGroup(new InlongPulsarRequest());
Assertions.assertEquals("1111", groupId);
}

Expand All @@ -403,7 +398,7 @@ void testUpdateGroup() {
)
);

Pair<String, String> updateGroup = innerInlongManagerClient.updateGroup(new InlongGroupRequest());
Pair<String, String> updateGroup = innerInlongManagerClient.updateGroup(new InlongPulsarRequest());
Assertions.assertEquals("1111", updateGroup.getKey());
Assertions.assertTrue(StringUtils.isBlank(updateGroup.getValue()));
}
Expand Down Expand Up @@ -676,9 +671,8 @@ void testSaveCluster() {
)
)
);
ClusterRequest request = new ClusterRequest();
ClusterRequest request = new PulsarClusterRequest();
request.setName("pulsar");
request.setType("PULSAR");
request.setClusterTags("test_cluster");
Integer clusterIndex = innerInlongManagerClient.saveCluster(request);
Assertions.assertEquals(1, (int) clusterIndex);
Expand Down
Loading