Skip to content

Commit

Permalink
eventmesh-admin-rocketmq submodule and createTopic REST API (#530)
Browse files Browse the repository at this point in the history
* eventmesh-admin-rocketmq submodule and createTopic draft API

* add license header

* change to /topicmanage

* fix build error

* fix some code check style issues

close #435 #346
  • Loading branch information
yzhao244 authored Nov 29, 2021
1 parent d6b4d21 commit 93ebbe8
Show file tree
Hide file tree
Showing 15 changed files with 645 additions and 14 deletions.
12 changes: 6 additions & 6 deletions eventmesh-admin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ EventMesh Administration Module for EventMesh. It manages Admin Service, Configu

## Administration Client Manager APIs

### POST /clientmanage/topics/
### POST /topicmanage
- Create a new topic if does not exist
- Exposed POST endpoint to create a new topic if it does not exist.
* Url - http://localhost:8081/clientmanage/topics/
* Url - http://localhost:8081/topicmanage
* sample request payload
```json
{
Expand All @@ -23,12 +23,12 @@ EventMesh Administration Module for EventMesh. It manages Admin Service, Configu
"created_time": "2021-09-03",
}
```
### DELETE /clientmanage/topics/(string: topic)/
### DELETE /topicmanage/(string: topic)
- Delete a specific topic.
- Exposed DELETE endpoint to remove a specific topic
* URL -
```url
http://localhost:8081/clientmanage/topics/mytopic1
http://localhost:8081/topicmanage/mytopic1
```

* Response -
Expand All @@ -39,12 +39,12 @@ EventMesh Administration Module for EventMesh. It manages Admin Service, Configu
}
```

### GET /clientmanage/topics
### GET /topicmanage
- Retrieve a list of topics
- Exposed GET endpoint to retrieve all topics
* URL -
```url
http://localhost:8081/clientmanage/topics
http://localhost:8081/topicmanage
```
* Response

Expand Down
19 changes: 18 additions & 1 deletion eventmesh-admin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,22 @@
* limitations under the License.
*/

dependencies {
task copyEventMeshAdmin(dependsOn: ['jar']) {
doFirst {
new File(projectDir, '../eventmesh-admin/dist/apps').mkdir()
new File(projectDir, '../dist/admin/').mkdirs()
}
doLast {
copy {
into('../eventmesh-admin/dist/apps/')
from project.jar.getArchivePath()
exclude {
"eventmesh-admin-${version}.jar"
}
}
copy {
into '../dist/admin'
from "../eventmesh-admin/dist/apps/eventmesh-admin-rocketmq-${version}.jar"
}
}
}
26 changes: 26 additions & 0 deletions eventmesh-admin/eventmesh-admin-rocketmq/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


dependencies {
compileOnly project(":eventmesh-common")

implementation "org.apache.httpcomponents:httpclient"
implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")

testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
}
17 changes: 17 additions & 0 deletions eventmesh-admin/eventmesh-admin-rocketmq/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

rocketmq_version=4.7.1
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.admin.rocketmq.controller;

import org.apache.eventmesh.admin.rocketmq.handler.TopicsHandler;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.sun.net.httpserver.HttpServer;

public class AdminController {

private static final Logger logger = LoggerFactory.getLogger(AdminController.class);

public AdminController() {
}

public void run(HttpServer server) throws IOException {

server.createContext("/topicmanage", new TopicsHandler());

logger.info("EventMesh-Admin Controller server context created successfully");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.admin.rocketmq.handler;

import org.apache.eventmesh.admin.rocketmq.request.TopicCreateRequest;
import org.apache.eventmesh.admin.rocketmq.response.TopicResponse;
import org.apache.eventmesh.admin.rocketmq.util.JsonUtils;
import org.apache.eventmesh.admin.rocketmq.util.NetUtils;
import org.apache.eventmesh.admin.rocketmq.util.RequestMapping;

import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.io.OutputStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

public class TopicsHandler implements HttpHandler {
private static final Logger logger = LoggerFactory.getLogger(TopicsHandler.class);

@Override
public void handle(HttpExchange httpExchange) throws IOException {

// create a new topic
if (RequestMapping.postMapping("/topicmanage", httpExchange)) {
createTopicHandler(httpExchange);
return;
}

OutputStream out = httpExchange.getResponseBody();
httpExchange.sendResponseHeaders(500, 0);
String result = String.format("Please check your request url");
logger.error(result);
out.write(result.getBytes());
return;
}

public void createTopicHandler(HttpExchange httpExchange) throws IOException {
String result = "";
OutputStream out = httpExchange.getResponseBody();
try {
String params = NetUtils.parsePostBody(httpExchange);
TopicCreateRequest topicCreateRequest =
JsonUtils.toObject(params, TopicCreateRequest.class);
String topic = topicCreateRequest.getName();

if (StringUtils.isBlank(topic)) {
result = "Create topic failed. Parameter topic not found.";
logger.error(result);
out.write(result.getBytes());
return;
}

//TBD: A new rocketmq service will be implemented for creating topics
TopicResponse topicResponse = null;
if (topicResponse != null) {
logger.info("create a new topic: {}", topic);
httpExchange.getResponseHeaders().add("Content-Type", "appication/json");
httpExchange.sendResponseHeaders(200, 0);
result = JsonUtils.toJson(topicResponse);
logger.info(result);
out.write(result.getBytes());
return;
} else {
httpExchange.sendResponseHeaders(500, 0);
result = String.format("create topic failed! Server side error");
logger.error(result);
out.write(result.getBytes());
return;
}
} catch (Exception e) {
httpExchange.getResponseHeaders().add("Content-Type", "appication/json");
httpExchange.sendResponseHeaders(500, 0);
result = String.format("create topic failed! Server side error");
logger.error(result);
out.write(result.getBytes());
return;
} finally {
if (out != null) {
try {
out.close();
} catch (IOException e) {
logger.warn("out close failed...", e);
}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.admin.rocketmq.request;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

@JsonInclude(JsonInclude.Include.NON_EMPTY)
@JsonIgnoreProperties(ignoreUnknown = true)
public class TopicCreateRequest {

private String name;

@JsonCreator
public TopicCreateRequest(@JsonProperty("name") String topic) {
super();
this.name = topic;
}

@JsonProperty("name")
public String getName() {
return this.name;
}

@JsonProperty("name")
public void setName(String name) {
this.name = name;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.admin.rocketmq.response;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

public class TopicResponse {

private String topic;
private String createdTime;

@JsonCreator
public TopicResponse(@JsonProperty("topic") String topic,
@JsonProperty("created_time") String createdTime) {
super();
this.topic = topic;
this.createdTime = createdTime;
}

@JsonProperty("topic")
public String getTopic() {
return this.topic;
}

@JsonProperty("topic")
public void setTopic(String topic) {
this.topic = topic;
}

@JsonProperty("created_time")
public String getCreatedTime() {
return createdTime;
}

@JsonProperty("created_time")
public void setCreatedTime(String createdTime) {
this.createdTime = createdTime;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("TopicResponse {topic=" + this.topic + ",");
sb.append("created_time=" + this.createdTime + "}");
return sb.toString();
}

}
Loading

0 comments on commit 93ebbe8

Please sign in to comment.