diff --git a/README.md b/README.md index e7a98a154e..c3e3d94cfc 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ The protocol of eventmesh is easier and convenient, you can read more [here](doc | v1.0.0 |Support java-sdk , tcp pub/sub, http pub| | v1.1.0 |Support RocketMQ as eventstore| | v1.1.1 |Support https| -| v1.2.0 |Support Plug-in architecture, support http sub| +| v1.2.0 |Support Plug-in architecture, support http sub, support cloud native deploy| | V1.3.0 |Support CloudEvents protocol| | |Support Event transaction| | |Support Event filter| diff --git a/README.zh-CN.md b/README.zh-CN.md index 4a9139b54f..51a79ef2e5 100644 --- a/README.zh-CN.md +++ b/README.zh-CN.md @@ -45,7 +45,7 @@ eventmesh的通信协议更加简洁方便,详细内容,阅读更多[这里] | v1.0.0 | Support java-sdk , tcp pub/sub, http pub | | v1.1.0 | Support RocketMQ as eventstore | | v1.1.1 | Support https | -| v1.2.0 | Support Plug-in architecture, support http sub | +| v1.2.0 | Support Plug-in architecture, support http sub, support cloud native deploy | | V1.3.0 | Support CloudEvents protocol | | | Support Event transaction | | | Support Event filter | diff --git a/build.gradle b/build.gradle index bddbceba1f..19a9b26d21 100644 --- a/build.gradle +++ b/build.gradle @@ -44,6 +44,7 @@ allprojects { clean.doFirst { delete 'build' delete 'dist' + delete 'out' } if (project.findProperty("snapshot") instanceof String) { @@ -235,6 +236,10 @@ subprojects { copy { into('../dist/apps/') from project.jar.getArchivePath() + exclude 'eventmesh-common*.jar' + exclude 'eventmesh-connector-api*.jar' + exclude 'eventmesh-registry*.jar' + exclude 'eventmesh-starter*.jar' } copy { into '../dist/lib' @@ -248,19 +253,20 @@ subprojects { exclude '**/*log4j2.xml*' exclude '**/spring-boot-devtools*.jar' exclude '**/mumble-sdk-test*.jar' - exclude '**/defibus*.jar' - exclude '*log4j*.jar' + exclude '*connector-rocketmq*.jar' + exclude 'eventmesh-runtime*.jar' +// exclude '*log4j*.jar' exclude 'commons-collections-3.2.2.jar' } copy { into '../dist/bin' - from '../script' + from '../eventmesh-runtime/bin' } copy { into '../dist/conf' - from '../conf/' + from '../eventmesh-runtime/conf' } } } diff --git a/docker/eventmesh-defibus/Dockerfile b/docker/eventmesh-defibus/Dockerfile deleted file mode 100644 index 1077ff07e8..0000000000 --- a/docker/eventmesh-defibus/Dockerfile +++ /dev/null @@ -1,15 +0,0 @@ -FROM centos7-jdk8:latest - -MAINTAINER mikexue mike_xwm@126.com - -WORKDIR /data -RUN mkdir /data/app -ADD eventmesh-runtime_1.2.0-SNAPSHOT.tar.gz /data/app/eventmesh -COPY eventmesh-connector-defibus-1.2.0-SNAPSHOT.jar /data/app/eventmesh/apps -WORKDIR /data/app/eventmesh/bin - -EXPOSE 10000 -EXPOSE 10105 - -ENV DOCKER true -CMD sh start.sh diff --git a/docker/eventmesh-rocketmq/Dockerfile b/docker/eventmesh-rocketmq/Dockerfile index f18309d88c..326cee133d 100644 --- a/docker/eventmesh-rocketmq/Dockerfile +++ b/docker/eventmesh-rocketmq/Dockerfile @@ -4,8 +4,7 @@ MAINTAINER mikexue mike_xwm@126.com WORKDIR /data RUN mkdir /data/app -ADD eventmesh-runtime_1.2.0-SNAPSHOT.tar.gz /data/app/eventmesh -COPY eventmesh-connector-rocketmq-1.2.0.jar /data/app/eventmesh/apps +ADD EventMesh_1.2.0-SNAPSHOT.tar.gz /data/app/eventmesh WORKDIR /data/app/eventmesh/bin EXPOSE 10000 @@ -14,3 +13,4 @@ EXPOSE 10105 ENV DOCKER true CMD sh start.sh + diff --git a/docs/README.md b/docs/README.md index 2eac06289a..41f65639b8 100644 --- a/docs/README.md +++ b/docs/README.md @@ -37,7 +37,7 @@ Key components: | ---- | ---- | | v1.0.0 |Support pub/sub, http api, java-sdk| | v1.1.0 |Support rocketmq as eventstore| -| v1.2.0 |Support Plug-in architecture, support http sub| +| v1.2.0 |Support Plug-in architecture, support http sub,support cloud native deploy| | V1.3.0 |Support CloudEvents protocol| | |Support transaction event| | |Support Event Sourcing| diff --git a/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md b/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md index 6899a70dc5..b4f5c4e338 100644 --- a/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md +++ b/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md @@ -95,3 +95,175 @@ dependencies { 运行com.webank.eventmesh.starter.StartUp的主要方法 ``` + + +## 3 Docker 运行 + +### 3.1 拉取镜像 + +执行 `docker pull eventmesh/eventmesh-rocketmq:v1.2.0` , 你将会获取到EventMesh的镜像,如下图所示: + +![image-20210309155255510](D:\Ideaworkspace\workspace1\xwm1992\1.2.0\EventMesh\docs\images\docker\docker-image.png) + +### 3.2 配置 + +> **预先准备** : 你可能需要从github上下载源代码,并参考这两个文件(proxy.properties 和 rocketmq-client.properties)的内容来做下面的操作 + +**3.2.1 需要配置的文件** + +在运行容器之前,你需要配置如下文件: + +**proxy.properties** + +| 配置项 | 默认值 | 备注 | +| ---------------------- | ------ | ----------------------- | +| proxy.server.http.port | 10105 | EventMesh http 服务端口 | +| proxy.server.tcp.port | 10000 | EventMesh tcp 服务端口 | + +**rocketmq-client.properties** + +| 配置项 | 默认值 | 备注 | +| --------------------------------- | ----------------------------- | --------------------- | +| proxy.server.rocketmq.namesrvAddr | 127.0.0.1:9876;127.0.0.1:9876 | RocketMQ namesrv 地址 | + +拉取了EventMesh镜像到你的宿主机后,你可以执行下面的命令来完成**proxy.properties**和**rocketmq-client.properties** 文件的配置 + +**3.2.2 创建文件** + +```shell +mkdir -p /data/eventmesh/rocketmq/conf +cd /data/eventmesh/rocketmq/conf +vi proxy.properties +vi rocketmq-client.properties +``` + +这两个文件内容可以参考 [proxy.properties](https://github.com/WeBankFinTech/EventMesh/blob/develop/eventmesh-runtime/conf/proxy.properties) 和 [rocketmq-client.properties](https://github.com/WeBankFinTech/EventMesh/blob/develop/eventmesh-runtime/conf/rocketmq-client.properties) + +### 3.3 运行 + +**3.3.1 运行** + +执行下面的命令来运行容器 + +```shell +docker run -d -p 10000:10000 -p 10105:10105 -v /data/eventmesh/rocketmq/conf/proxy.properties:/data/app/eventmesh/conf/proxy.properties -v /data/eventmesh/rocketmq/conf/rocketmq-client.properties:/data/app/eventmesh/conf/rocketmq-client.properties docker.io/eventmesh/eventmesh-rocketmq:v1.2.0 +``` + +> -p : 将容器内端口与宿主机端口绑定,容器的端口应与配置文件中的端口一致 +> +> -v : 将容器内的配置文件挂载到宿主机下,需注意配置文件的路径 + +**3.3.2 检查容器的运行状况** + +执行 `docker ps` 来检查容器的运行状况 + +![image-20210309155917269](D:\Ideaworkspace\workspace1\xwm1992\1.2.0\EventMesh\docs\images\docker\docker-ps.png) + +执行 `docker logs [container id]` 可以得到如下结果 + +![image-20210309195623836](D:\Ideaworkspace\workspace1\xwm1992\1.2.0\EventMesh\docs\images\docker\docker-logs.png) + +执行 `docker exec -it [container id] /bin/bash` 可以进入到容器中并查看详细信息 + +![image-20210309200327627](D:\Ideaworkspace\workspace1\xwm1992\1.2.0\EventMesh\docs\images\docker\docker-exec.png) + +### 3.4 测试 + +**预先准备** :RocketMQ Namesrv & Broker + +你可以通过[这里](https://github.com/apache/rocketmq-docker)来构建rocketmq镜像或者从 docker hub上获取rocketmq镜像. + +```shell +#获取namesrv镜像 +docker pull rocketmqinc/rocketmq-namesrv:4.5.0-alpine +#获取broker镜像 +docker pull rocketmqinc/rocketmq-broker:4.5.0-alpine + +#运行namerv容器 +docker run -d -p 9876:9876 -v `pwd` /data/namesrv/logs:/root/logs -v `pwd`/data/namesrv/store:/root/store --name rmqnamesrv rocketmqinc/rocketmq-namesrv:4.5.0-alpine sh mqnamesrv + +#运行broker容器 +docker run -d -p 10911:10911 -p 10909:10909 -v `pwd`/data/broker/logs:/root/logs -v `pwd`/data/broker/store:/root/store --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" rocketmqinc/rocketmq-broker:4.5.0-alpine sh mqbroker -c ../conf/broker.conf +``` + +这里 **rocketmq-broker ip** 是 **pod ip**, 如果你想修改这个ip, 可以通过挂载容器中 **broker.conf** 文件的方式并修改文件中的 **brokerIP1** 配置项为自定义值 + +**3.4.1 运行示例** + +Windows + +- Windows系统下运行示例可以参考[这里](https://github.com/WeBankFinTech/EventMesh/blob/develop/docs/en/instructions/eventmesh-sdk-java-quickstart.md) + +Linux + +- **获取 eventmesh-test_1.2.0-SNAPSHOT.tar.gz** + + 你可以从我们的 **releases** 获取或者**通过源码的方式进行构建** + + **通过源码的方式进行构建**: + + ```shell + cd /* Your Deploy Path */EventMesh/eventmesh-test + gradle clean testdist testtar -x test` + ``` + + 可以在 `/eventmesh-test/build` 目录下获得 **eventmesh-test_1.2.0-SNAPSHOT.tar.gz** + +- **修改配置文件** + + ```shell + #上传 + upload eventmesh-test_1.2.0-SNAPSHOT.tar.gz + #解压 + tar -zxvf eventmesh-test_1.2.0-SNAPSHOT.tar.gz + #配置 + cd conf + config your application.properties + ``` + +- **运行** + + TCP Sub + + ```shell + cd bin + sh tcp_sub.sh + ``` + + TCP Pub + + ```shell + cd bin + sh tcp_pub.sh + ``` + + TCP Sub Broadcast + + ```shell + cd bin + sh tcp_sub_broadcast.sh + ``` + + TCP Pub Broadcast + + ```shell + cd bin + sh tcp_pub_broadcast.sh + ``` + + HTTP Sub + + ```shell + cd bin + sh http_sub.sh + ``` + + HTTP Pub + + ```shell + cd bin + sh http_pub.sh + ``` + + 之后 , 你可以在 `/logs` 目录下面看到不同模式的运行日志 + diff --git a/docs/cn/instructions/eventmesh-sdk-java-quickstart.zh-CN.md b/docs/cn/instructions/eventmesh-sdk-java-quickstart.zh-CN.md index b84f0e5b92..a49db89fc1 100644 --- a/docs/cn/instructions/eventmesh-sdk-java-quickstart.zh-CN.md +++ b/docs/cn/instructions/eventmesh-sdk-java-quickstart.zh-CN.md @@ -2,50 +2,21 @@ > Eventmesh-sdk-java作为客户端,与eventmesh-runtime通信,用于完成消息的发送和接收。 > -> Eventmesh-sdk-java支持同步消息,异步消息和广播消息。同步消息表示生产者发送消息,需要消费者提供响应消息;异步消息表示生产者只发送消息,不关心回复消息。广播消息表示生产者发送一次消息,所有订阅广播主题的消费者都将收到消息 +> Eventmesh-sdk-java支持异步消息和广播消息。异步消息表示生产者只发送消息,不关心回复消息。广播消息表示生产者发送一次消息,所有订阅广播主题的消费者都将收到消息 > > Eventmesh-sdk-java支持HTTP和TCP协议。 +TCP 和 Http 示例都在**eventmesh-test**模块下 -### 1. TCP DEMO - -

同步消息

- -- 创建主题 - -``` -sh runadmin.sh updateTopic -c $ {ClusterName} -t $ {topic} -n $ {namesrvAddr} -``` - - - -- 启动消费者,订阅上一步骤已经创建的Topic - - -``` -运行com.webank.eventmesh.client.tcp.demo.SyncResponse的主要方法 -``` - - - -- 启动发送端,发送消息 - - -``` -运行com.webank.eventmesh.client.tcp.demo.SyncRequest的主要方法 -``` +**注意:**下载了源代码后,需要将`/conf/application.properties` 和 `/conf/log4j2.xml` 复制到 `resources` 目录下 +![image-20210311193048090](..\..\images\eventmesh-test-structure.png) +### 1. TCP DEMO

异步消息

-- 创建主题 - -``` -sh runadmin.sh updateTopic -c $ {ClusterName} -t $ {topic} -n $ {namesrvAddr} -``` - - +- 创建主题FT0-e-80010000-01-1,可以通过rocketmq-console或者rocketmq tools 命令 - 启动消费者,订阅上一步骤已经创建的Topic @@ -53,8 +24,6 @@ sh runadmin.sh updateTopic -c $ {ClusterName} -t $ {topic} -n $ {namesrvAddr} 运行com.webank.eventmesh.client.tcp.demo.AsyncSubscribe的主要方法 ``` - - - 启动发送端,发送消息 @@ -62,17 +31,9 @@ sh runadmin.sh updateTopic -c $ {ClusterName} -t $ {topic} -n $ {namesrvAddr} 运行com.webank.eventmesh.client.tcp.demo.AsyncPublish的主要方法 ``` - -

广播消息

-- 创建主题 - -``` -sh runadmin.sh updateTopic -c $ {ClusterName} -t $ {topic} -n $ {namesrvAddr} -``` - - +- 创建主题FT0-e-80030000-01-3,可以通过rocketmq-console或者rocketmq tools 命令 - 启动消费端,订阅上一步骤已经创建的Topic @@ -80,8 +41,6 @@ sh runadmin.sh updateTopic -c $ {ClusterName} -t $ {topic} -n $ {namesrvAddr} 运行com.webank.eventmesh.client.tcp.demo.AsyncSubscribeBroadcast的主要方法 ``` - - - 启动发送端,发送广播消息 ``` @@ -90,37 +49,17 @@ sh runadmin.sh updateTopic -c $ {ClusterName} -t $ {topic} -n $ {namesrvAddr} ### 2. HTTP演示 ->对于http,eventmesh-sdk-java对同步消息实现了msg的发送;对于异步事件实现了发送与订阅 +>对于http,eventmesh-sdk-java对对于异步事件实现了发送与订阅 > >在演示中,Java类`LiteMessage`的`content`字段表示一个特殊的协议,因此,如果您要使用eventmesh-sdk-java的http-client,则只需设计协议的内容并在同一时间提供消费者的应用程序。 - - -

同步消息

- ->发送消息,生产者需要等到收到用户的响应消息 - -``` -运行com.webank.eventmesh.client.http.demo.SyncRequestInstance的主要方法 -``` - - - ->发送消息,生产者在回调中处理响应消息 - -``` -运行com.webank.eventmesh.client.http.demo.AsyncSyncRequestInstance的主要方法 -``` - - -

异步事件

> 生产者将事件发送给下游即可,无需等待响应 - 启动消费端,订阅Topic -异步事件消费端为spring boot demo,运行demo即可启动服务并完成Topic订阅 + 异步事件消费端为spring boot demo,运行demo即可启动服务并完成Topic订阅 ``` 运行com.webank.eventmesh.client.http.demo.sub.SpringBootDemoApplication的主要方法 diff --git a/docs/en/instructions/eventmesh-runtime-quickstart.md b/docs/en/instructions/eventmesh-runtime-quickstart.md index 4327c5e4b5..7c44827d21 100644 --- a/docs/en/instructions/eventmesh-runtime-quickstart.md +++ b/docs/en/instructions/eventmesh-runtime-quickstart.md @@ -11,27 +11,33 @@ Gradle at least 5.6, eg 5.6.* ``` ### 1.2 download sources + download source code from [https://github.com/WeBankFinTech/EventMesh](https://github.com/WeBankFinTech/EventMesh) You will get **EventMesh-master.zip** ### 1.3 build sources + ```$xslt unzip EventMesh-master.zip cd /*YOUR DEPLOY PATH*/EventMesh-master/eventmesh-runtime gradle clean tar -x test ``` -You will get **eventmesh-runtime_1.0.0.tar.gz** in directory /* YOUR DEPLOY PATH */EventMesh-master/eventmesh-runtime/dist + +You will get **EventMesh_1.2.0.tar.gz** in directory /* YOUR DEPLOY PATH */EventMesh-master/build ### 1.4 Deployment + - deploy eventmesh-runtime -```$xslt -upload eventmesh-runtime_1.0.0.tar.gz -tar -zxvf eventmesh-runtime_1.0.0.tar.gz + +```shell +upload Eventmesh_1.2.0.tar.gz +tar -zxvf Eventmesh_1.2.0.tar.gz cd conf config your proxy.properties cd ../bin sh start.sh ``` + If you see "ProxyTCPServer[port=10000] started....", you setup runtime successfully. @@ -50,7 +56,7 @@ Same with 1.2 **2.3.1 Project structure:** -![](../../images/project-structure.png) +![](..\..\images\project-structure.png) - eventmesh-common : eventmesh common classes and method module - eventmesh-connector-api : eventmesh connector api definition module @@ -88,3 +94,170 @@ dependencies { running `com.webank.eventmesh.starter.StartUp` main method + + +## 3 Run with Docker + +### 3.1 Pull + +execute `docker pull eventmesh/eventmesh-rocketmq:v1.2.0` , you will get EventMesh image like below + +![image-20210309155255510](../../images/docker/docker-image.png) + +### 3.2 Config + +> **prerequisite** : may be you need download the source code from git first and use the contents of these files(proxy.properties and rocketmq-client.properties) as a reference for the following actions. + +**3.2.1 Files to configure** + +Before run the container you should configure some files. + +**proxy.properties** + +| Configuration Key | Default Value | Remarks | +| ---------------------- | ------------- | -------------------------- | +| proxy.server.http.port | 10105 | EventMesh http server port | +| proxy.server.tcp.port | 10000 | EventMesh tcp server port | + +**rocketmq-client.properties** + +| Configuration Key | Default Value | Remarks | +| --------------------------------- | ----------------------------- | -------------------------------- | +| proxy.server.rocketmq.namesrvAddr | 127.0.0.1:9876;127.0.0.1:9876 | RocketMQ namesrv default address | + +After pull the EventMesh image to your host machine, you can execute command below to configure **proxy.properties** and **rocketmq-client.properties** + +**3.2.2 Create Files** + +```shell +mkdir -p /data/eventmesh/rocketmq/conf +cd /data/eventmesh/rocketmq/conf +vi proxy.properties +vi rocketmq-client.properties +``` + +The contents of these files can reference from [proxy.properties](https://github.com/WeBankFinTech/EventMesh/blob/develop/eventmesh-runtime/conf/proxy.properties) and [rocketmq-client.properties](https://github.com/WeBankFinTech/EventMesh/blob/develop/eventmesh-runtime/conf/rocketmq-client.properties) + +### 3.3 Run + +**3.3.1 run** + +execute command below to run container + +``` +docker run -d -p 10000:10000 -p 10105:10105 -v /data/eventmesh/rocketmq/conf/proxy.properties:/data/app/eventmesh/conf/proxy.properties -v /data/eventmesh/rocketmq/conf/rocketmq-client.properties:/data/app/eventmesh/conf/rocketmq-client.properties docker.io/eventmesh/eventmesh-rocketmq:v1.2.0 +``` + +> -p : binding the container port with host machine port +> +> -v : mount the container configuration files with host machine files + +**3.3.2 check container** + +execute `docker ps` to check the container health + +![image-20210309155917269](..\..\images\docker\docker-ps.png) + +execute `docker logs [container id]` you will get following result: + +![image-20210309195623836](..\..\images\docker\docker-logs.png) + +execute `docker exec -it [container id] /bin/bash` you will go into the container and see the details: + +![image-20210309200327627](..\..\images\docker\docker-exec.png) + +### 3.4 Test + +**Prerequisite** :RocketMQ Namesrv & Broker + +you can build the rocketmq image following [here](https://github.com/apache/rocketmq-docker) or get the rocketmq image from docker hub. + +``` +docker pull rocketmqinc/rocketmq-namesrv:4.5.0-alpine +docker pull rocketmqinc/rocketmq-broker:4.5.0-alpine + +#namesrv +docker run -d -p 9876:9876 -v `pwd` /data/namesrv/logs:/root/logs -v `pwd`/data/namesrv/store:/root/store --name rmqnamesrv rocketmqinc/rocketmq-namesrv:4.5.0-alpine sh mqnamesrv + +#broker +docker run -d -p 10911:10911 -p 10909:10909 -v `pwd`/data/broker/logs:/root/logs -v `pwd`/data/broker/store:/root/store --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" rocketmqinc/rocketmq-broker:4.5.0-alpine sh mqbroker -c ../conf/broker.conf +``` + +When we get this point, **rocketmq-broker ip** is the **pod ip**, if you want to change the ip, you can mount the **broker.conf** file in container and modify **brokerIP1** configuration in this file to your custom values. + +**3.4.1 Run Demo** + +Windows + +- For demos running under the Windows , you can refer [here](https://github.com/WeBankFinTech/EventMesh/blob/develop/docs/en/instructions/eventmesh-sdk-java-quickstart.md) + +Linux + +- **Get eventmesh-test_1.2.0-SNAPSHOT.tar.gz** + + you can get this package from **our releases** or **build with source code**. + + **build with source code**: + + ```shell + cd /* Your Deploy Path */EventMesh/eventmesh-test + gradle clean testdist testtar -x test` + ``` + + you will get **eventmesh-test_1.2.0-SNAPSHOT.tar.gz** under the /eventmesh-test/build + +- **Modify configuration files** + + ```shell + upload eventmesh-test_1.2.0-SNAPSHOT.tar.gz + tar -zxvf eventmesh-test_1.2.0-SNAPSHOT.tar.gz + cd conf + config your application.properties + ``` + +- **Run demo** + + TCP Sub + + ```shell + cd bin + sh tcp_sub.sh + ``` + + TCP Pub + + ```shell + cd bin + sh tcp_pub.sh + ``` + + TCP Sub Broadcast + + ```shell + cd bin + sh tcp_sub_broadcast.sh + ``` + + TCP Pub Broadcast + + ```shell + cd bin + sh tcp_pub_broadcast.sh + ``` + + HTTP Sub + + ```shell + cd bin + sh http_sub.sh + ``` + + HTTP Pub + + ```shell + cd bin + sh http_pub.sh + ``` + + After this , you can see result of different mode in logs file under `/logs` directory + diff --git a/docs/en/instructions/eventmesh-sdk-java-quickstart.md b/docs/en/instructions/eventmesh-sdk-java-quickstart.md index 2e5f8baa1a..92d8951f6a 100644 --- a/docs/en/instructions/eventmesh-sdk-java-quickstart.md +++ b/docs/en/instructions/eventmesh-sdk-java-quickstart.md @@ -2,48 +2,21 @@ > Eventmesh-sdk-java , as the client, communicated with eventmesh-runtime, used to complete the sending and receiving of message. > -> Eventmesh-sdk-java supports sync msg, async msg and broadcast msg. Sync msg means the producer sends msg which need the consumer supplies the response msg, Async msg means the producer just sends msg and does not care reply msg.Broadcast msg means the producer send msg once and all the consumer subscribed the broadcast topic will receive the msg. +> Eventmesh-sdk-java supports async msg and broadcast msg. Async msg means the producer just sends msg and does not care reply msg.Broadcast msg means the producer send msg once and all the consumer subscribed the broadcast topic will receive the msg. > > Eventmesh-sdk-java supports the protocol of HTTP and TCP. +TCP demos and Http demos are both under the **eventmesh-test** module. -### 1. TCP DEMO - -#### Sync msg - -- create topic - -``` -sh runadmin.sh updateTopic -c ${ClusterName} -t ${topic} -n ${namesrvAddr} -``` - - - -* start consumer ,subscribe topic in previous step. - -``` -Run the main method of com.webank.eventmesh.client.tcp.demo.SyncResponse -``` - - - -* start producer, send message - -``` -Run the main method of com.webank.eventmesh.client.tcp.demo.SyncRequest -``` +**prerequisite**:after download the source code you should copy `/conf/application.properties` and `/conf/log4j2.xml` to the `resources` directory +![image-20210311193048090](..\..\images\eventmesh-test-structure.png) +### 1. TCP DEMO #### Async msg -- create topic - -``` -sh runadmin.sh updateTopic -c ${ClusterName} -t ${topic} -n ${namesrvAddr} -``` - - +- create topic FT0-e-80010000-01-1 on rocketmq-console - start consumer ,subscribe topic in previous step. @@ -51,34 +24,27 @@ sh runadmin.sh updateTopic -c ${ClusterName} -t ${topic} -n ${namesrvAddr} Run the main method of com.webank.eventmesh.client.tcp.demo.AsyncSubscribe ``` +- start producer, send message -start producer, send message - ``` Run the main method of com.webank.eventmesh.client.tcp.demo.AsyncPublish ``` - - #### Broadcast msg -- create topic +- create topic FT0-e-80030000-01-3 on rocketmq-console ``` sh runadmin.sh updateTopic -c ${ClusterName} -t ${topic} -n ${namesrvAddr} ``` - - - start consumer ,subscribe topic in previous step. ``` Run the main method of com.webank.eventmesh.client.tcp.demo.AsyncSubscribeBroadcast ``` - - * start producer, send broadcast message ``` @@ -87,37 +53,17 @@ Run the main method of com.webank.eventmesh.client.tcp.demo.AsyncPublishBroadcas ### 2. HTTP DEMO -> As to http, eventmesh-sdk-java implements the sending of sync msg. For async event it implements the pub and sub. +> As to http, eventmesh-sdk-java implements the pub and sub for async event . > > In the demo ,the field of `content` of the java class `LiteMessage` represents a special protocal, so if you want to use http-client of eventmesh-sdk-java, you just need to design the content of protocal and supply the consumer appliacation at the same time. - - -#### Sync msg - -> send msg ,producer need waiting until receive the response msg of consumer - -``` -Run the main method of com.webank.eventmesh.client.http.demo.SyncRequestInstance -``` - - - -> send msg,producer handles the reponse msg in callback - -``` -Run the main method of com.webank.eventmesh.client.http.demo.AsyncSyncRequestInstance -``` - - - #### Async event -> producer send the event to consumer and don't need waiting response msg of consumer +> producer send the event to consumer and don't need waiting response msg from consumer - start consumer, subscribe topic -Async consumer demo is a spring boot application demo, you can easily run this demo to start service and subscribe the topic. + Async consumer demo is a spring boot application demo, you can easily run this demo to start service and subscribe the topic. ``` Run the main method of com.webank.eventmesh.client.http.demo.sub.SpringBootDemoApplication diff --git a/docs/images/docker/docker-exec.png b/docs/images/docker/docker-exec.png new file mode 100644 index 0000000000..f605921156 Binary files /dev/null and b/docs/images/docker/docker-exec.png differ diff --git a/docs/images/docker/docker-image.png b/docs/images/docker/docker-image.png new file mode 100644 index 0000000000..5ca80021d4 Binary files /dev/null and b/docs/images/docker/docker-image.png differ diff --git a/docs/images/docker/docker-logs.png b/docs/images/docker/docker-logs.png new file mode 100644 index 0000000000..8cd3c4df5b Binary files /dev/null and b/docs/images/docker/docker-logs.png differ diff --git a/docs/images/docker/docker-ps.png b/docs/images/docker/docker-ps.png new file mode 100644 index 0000000000..9ae1c52b2c Binary files /dev/null and b/docs/images/docker/docker-ps.png differ diff --git a/docs/images/eventmesh-test-structure.png b/docs/images/eventmesh-test-structure.png new file mode 100644 index 0000000000..b783f1edb0 Binary files /dev/null and b/docs/images/eventmesh-test-structure.png differ diff --git a/docs/images/project-structure.png b/docs/images/project-structure.png index 53fddfd881..252a9536c6 100644 Binary files a/docs/images/project-structure.png and b/docs/images/project-structure.png differ diff --git a/eventmesh-sdk-java/build.gradle b/eventmesh-sdk-java/build.gradle index e411e4cb64..fe66fd0d53 100644 --- a/eventmesh-sdk-java/build.gradle +++ b/eventmesh-sdk-java/build.gradle @@ -15,16 +15,17 @@ * limitations under the License. */ -plugins { - id 'org.springframework.boot' version '2.1.6.RELEASE' -} + +//plugins { +// id 'org.springframework.boot' version '2.1.6.RELEASE' +//} + apply plugin: 'java' apply plugin: "maven" apply plugin: "eclipse" apply plugin: "idea" -apply plugin: "io.spring.dependency-management" if (project.findProperty("jdk") == "1.7") { sourceCompatibility = 1.7 @@ -66,14 +67,13 @@ dependencies { testCompile log4j2, sl4j compile project(":eventmesh-common") testCompile project(":eventmesh-common") - //Spring模块 - compile("org.springframework.boot:spring-boot-starter-web") - //Spring依赖的第三方模块(2.1.6依赖的是3.8.1) - compile("org.apache.commons:commons-lang3") - //Spring依赖的第三方模块(2.1.6依赖的是2.6.2)指定依赖版本为2.6.1 - compile('org.apache.commons:commons-pool2') - //未被依赖的库需要手动:我司开源的 轻代码开发框架核心包,文末源码链接 - compile('com.diboot:diboot-core:2.0.0') + +// //Spring依赖的第三方模块(2.1.6依赖的是3.8.1) +// compile("org.apache.commons:commons-lang3") +// //Spring依赖的第三方模块(2.1.6依赖的是2.6.2)指定依赖版本为2.6.1 +// compile('org.apache.commons:commons-pool2') +// //未被依赖的库需要手动:我司开源的 轻代码开发框架核心包,文末源码链接 +// compile('com.diboot:diboot-core:2.0.0') // implementation('org.springframework.boot:spring-boot-starter-web') { // exclude group: 'org.springframework.boot', module: 'spring-boot-starter-logging' @@ -86,21 +86,20 @@ dependencies { configurations.all{ exclude group: "ch.qos.logback" - exclude group: 'org.springframework.boot', module: 'spring-boot-starter-logging' } -uploadArchives { - repositories { - mavenDeployer { -// snapshotRepository(url: 'Your target repo address') { -// authentication(userName: 'Your user name', password: 'Your password') -// } -// repository(url: 'Your target repo address') { -// authentication(userName: 'Your user name', password: 'Your password') -// } - } - } -} +//uploadArchives { +// repositories { +// mavenDeployer { +//// snapshotRepository(url: 'Your target repo address') { +//// authentication(userName: 'Your user name', password: 'Your password') +//// } +//// repository(url: 'Your target repo address') { +//// authentication(userName: 'Your user name', password: 'Your password') +//// } +// } +// } +//} task zip(type: Zip, dependsOn: ['clean', 'jar']) { extension = 'zip' diff --git a/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/http/demo/AsyncPublishInstance.java b/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/http/demo/AsyncPublishInstance.java index abe414c424..86a94c0931 100644 --- a/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/http/demo/AsyncPublishInstance.java +++ b/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/http/demo/AsyncPublishInstance.java @@ -38,18 +38,19 @@ public static void main(String[] args) throws Exception { liteProducer = new LiteProducer(weMQProxyClientConfig); liteProducer.start(); - - LiteMessage liteMessage = new LiteMessage(); - liteMessage.setBizSeqNo(RandomStringUtils.randomNumeric(30)) + for(int i = 0; i < 1; i++) { + LiteMessage liteMessage = new LiteMessage(); + liteMessage.setBizSeqNo(RandomStringUtils.randomNumeric(30)) // .setContent("contentStr with special protocal") - .setContent("testPublishMessage") - .setTopic(topic) - .setUniqueId(RandomStringUtils.randomNumeric(30)) - .addProp(Constants.PROXY_MESSAGE_CONST_TTL, String.valueOf(4 * 3600 * 1000)); + .setContent("testPublishMessage") + .setTopic(topic) + .setUniqueId(RandomStringUtils.randomNumeric(30)) + .addProp(Constants.PROXY_MESSAGE_CONST_TTL, String.valueOf(4 * 1000)); - boolean flag = liteProducer.publish(liteMessage); - Thread.sleep(1000); - logger.info("publish result , {}", flag); + boolean flag = liteProducer.publish(liteMessage); + Thread.sleep(1000); + logger.info("publish result , {}", flag); + } }catch (Exception e){ logger.warn("publish msg failed", e); } diff --git a/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/tcp/common/AccessTestCaseTopicSet.java b/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/tcp/common/AccessTestCaseTopicSet.java index 694719c610..0f0e17caee 100644 --- a/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/tcp/common/AccessTestCaseTopicSet.java +++ b/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/tcp/common/AccessTestCaseTopicSet.java @@ -6,7 +6,7 @@ public class AccessTestCaseTopicSet { // public static final String TOPIC_PRX_WQ2ClientBroadCast = "topic-broadcast-test"; - public static final String TOPIC_PRX_WQ2ClientBroadCast = "FT0-e-80030000-01-3"; + public static final String TOPIC_PRX_WQ2ClientBroadCast = "FT0-e-80030001-01-3"; // public static final String TOPIC_PRX_SyncSubscribeTest = "topic-sync-test"; public static final String TOPIC_PRX_SyncSubscribeTest = "FT0-s-80000000-01-0"; diff --git a/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/tcp/demo/AsyncPublish.java b/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/tcp/demo/AsyncPublish.java index 25f2875564..2752586924 100644 --- a/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/tcp/demo/AsyncPublish.java +++ b/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/tcp/demo/AsyncPublish.java @@ -20,16 +20,16 @@ public class AsyncPublish{ public static void main(String[] agrs)throws Exception{ try{ UserAgent userAgent = AccessTestUtils.generateClient1(); - client = new DefaultWemqAccessClient("127.0.0.1",10000,userAgent); + client = new DefaultWemqAccessClient("127.0.0.1",10002,userAgent); client.init(); client.heartbeat(); - for(int i=0; i < 10; i++) { + for(int i=0; i < 5; i++) { Package asyncMsg = AccessTestUtils.asyncMessage(); logger.info("begin send async msg[{}]==================={}", i, asyncMsg); client.publish(asyncMsg, WemqAccessCommon.DEFAULT_TIME_OUT_MILLS); - Thread.sleep(10); + Thread.sleep(1000); } Thread.sleep(2000); diff --git a/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/tcp/demo/AsyncPublishBroadcast.java b/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/tcp/demo/AsyncPublishBroadcast.java index 754f39c8e6..71a8b75897 100644 --- a/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/tcp/demo/AsyncPublishBroadcast.java +++ b/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/tcp/demo/AsyncPublishBroadcast.java @@ -18,7 +18,7 @@ public class AsyncPublishBroadcast { public static void main(String[] agrs)throws Exception{ try{ UserAgent userAgent = AccessTestUtils.generateClient1(); - client = new DefaultWemqAccessClient("127.0.0.1",10000,userAgent); + client = new DefaultWemqAccessClient("127.0.0.1",10002,userAgent); client.init(); client.heartbeat(); diff --git a/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/tcp/demo/AsyncSubscribe.java b/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/tcp/demo/AsyncSubscribe.java index d1f328210a..103d504af2 100644 --- a/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/tcp/demo/AsyncSubscribe.java +++ b/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/tcp/demo/AsyncSubscribe.java @@ -22,7 +22,7 @@ public class AsyncSubscribe implements ReceiveMsgHook { public static void main(String[] agrs)throws Exception{ try{ UserAgent userAgent = AccessTestUtils.generateClient2(); - client = new DefaultWemqAccessClient("127.0.0.1",10000,userAgent); + client = new DefaultWemqAccessClient("127.0.0.1",10002,userAgent); client.init(); client.heartbeat(); diff --git a/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java b/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java index f4aecd8448..364613c5b5 100644 --- a/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java +++ b/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java @@ -22,11 +22,11 @@ public class AsyncSubscribeBroadcast implements ReceiveMsgHook { public static void main(String[] agrs)throws Exception{ try{ UserAgent userAgent = AccessTestUtils.generateClient2(); - client = new DefaultWemqAccessClient("127.0.0.1",10000,userAgent); + client = new DefaultWemqAccessClient("127.0.0.1",10002,userAgent); client.init(); client.heartbeat(); - client.subscribe("FT0-e-80030000-01-3"); + client.subscribe("FT0-e-80030001-01-3"); client.registerSubBusiHandler(handler); client.listen(); diff --git a/eventmesh-test/bin/http_pub.sh b/eventmesh-test/bin/http_pub.sh new file mode 100644 index 0000000000..9042645a8e --- /dev/null +++ b/eventmesh-test/bin/http_pub.sh @@ -0,0 +1,135 @@ +#!/bin/sh + + +# Copyright (C) @2017 Webank Group Holding Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. +# + +#=========================================================================================== +# Java Environment Setting +#=========================================================================================== +set -e +#服务器配置可能不一致,增加这些配置避免乱码问题 +export LANG=en_US.UTF-8 +export LC_CTYPE=en_US.UTF-8 +export LC_ALL=en_US.UTF-8 + +TMP_JAVA_HOME="/nemo/jdk1.8.0_152" + +#detect operating system. +OS=$(uname -o) + +function is_java8 { + local _java="$1" + [[ -x "$_java" ]] || return 1 + [[ "$("$_java" -version 2>&1)" =~ 'java version "1.8' || "$("$_java" -version 2>&1)" =~ 'openjdk version "1.8' ]] || return 2 + return 0 +} + + +function get_pid { + local ppid="" + if [ -f ${DEMO_HOME}/bin/pid_http_pub.file ]; then + ppid=$(cat ${DEMO_HOME}/bin/pid_http_pub.file) + else + if [[ $OS =~ Msys ]]; then + # 在Msys上存在可能无法kill识别出的进程的BUG + ppid=`jps -v | grep -i "com.webank.eventmesh.client.http.demo.AsyncPublishInstance" | grep java | grep -v grep | awk -F ' ' {'print $1'}` + elif [[ $OS =~ Darwin ]]; then + # 已知问题:grep java 可能无法精确识别java进程 + ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "com.webank.eventmesh.client.http.demo.AsyncPublishInstance" | grep -Ev "^root" |awk -F ' ' {'print $2'}) + else + #在Linux服务器上要求尽可能精确识别进程 + ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $PROXY_HOME | grep -i "com.webank.eventmesh.client.http.demo.AsyncPublishInstance" | grep -Ev "^root" |awk -F ' ' {'print $2'}) + fi + fi + echo "$ppid"; +} + + +if [[ -d "$TMP_JAVA_HOME" ]] && is_java8 "$TMP_JAVA_HOME/bin/java"; then + JAVA="$TMP_JAVA_HOME/bin/java" +elif [[ -d "$JAVA_HOME" ]] && is_java8 "$JAVA_HOME/bin/java"; then + JAVA="$JAVA_HOME/bin/java" +elif is_java8 "/nemo/jdk8/bin/java"; then + JAVA="/nemo/jdk8/bin/java"; +elif is_java8 "/nemo/jdk1.8/bin/java"; then + JAVA="/nemo/jdk1.8/bin/java"; +elif is_java8 "/nemo/jdk/bin/java"; then + JAVA="/nemo/jdk/bin/java"; +elif is_java8 "$(which java)"; then + JAVA="$(which java)" +else + echo -e "ERROR\t java(1.8) not found, operation abort." + exit 9; +fi + +echo "http_pub_demo use java location= "$JAVA + +DEMO_HOME=`cd "./.." && pwd` + +export DEMO_HOME + +export DEMO_LOG_HOME=${DEMO_HOME}/logs + +echo "DEMO_HOME : ${DEMO_HOME}, DEMO_LOG_HOME : ${DEMO_LOG_HOME}" + +function make_logs_dir { + if [ ! -e "${DEMO_LOG_HOME}" ]; then mkdir -p "${DEMO_LOG_HOME}"; fi +} + +error_exit () +{ + echo "ERROR: $1 !!" + exit 1 +} + +export JAVA_HOME + +JAVA_OPT=`cat ${DEMO_HOME}/conf/server.env | grep APP_START_JVM_OPTION::: | awk -F ':::' {'print $2'}` +JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50" +JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${DEMO_HOME}/logs/demo_http_pub_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy" +JAVA_OPT="${JAVA_OPT} -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${DEMO_HOME}/logs -XX:ErrorFile=${PROXY_HOME}/logs/hs_err_%p.log" +JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m" +JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow" +JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch" +JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=8G" +JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking" +JAVA_OPT="${JAVA_OPT} -Dio.netty.leakDetectionLevel=advanced" +JAVA_OPT="${JAVA_OPT} -Dio.netty.allocator.type=pooled" +JAVA_OPT="${JAVA_OPT} -Djava.security.egd=file:/dev/./urandom" +JAVA_OPT="${JAVA_OPT} -Dlog4j.configurationFile=${DEMO_HOME}/conf/log4j2.xml" +#JAVA_OPT="${JAVA_OPT} -Dproxy.log.home=${DEMO_LOG_HOME}" +JAVA_OPT="${JAVA_OPT} -DconfPath=${DEMO_HOME}/conf" +JAVA_OPT="${JAVA_OPT} -Dlog4j2.AsyncQueueFullPolicy=Discard" +JAVA_OPT="${JAVA_OPT} -Drocketmq.client.logUseSlf4j=true" + +pid=$(get_pid) +if [ -n "$pid" ];then + echo -e "ERROR\t the server is already running (pid=$pid), there is no need to execute start.sh again." + exit 9; +fi + +make_logs_dir + +echo "using jdk[$JAVA]" >> ${DEMO_LOG_HOME}/demo_http_pub.out + + +DEMO_MAIN=com.webank.eventmesh.client.http.demo.AsyncPublishInstance +if [ $DOCKER ] +then + $JAVA $JAVA_OPT -classpath ${DEMO_HOME}/conf:${DEMO_HOME}/apps/*:${DEMO_HOME}/lib/* $DEMO_MAIN >> ${DEMO_LOG_HOME}/demo_http_pub.out +else + $JAVA $JAVA_OPT -classpath ${DEMO_HOME}/conf:${DEMO_HOME}/apps/*:${DEMO_HOME}/lib/* $DEMO_MAIN >> ${DEMO_LOG_HOME}/demo_http_pub.out 2>&1 & +echo $!>pid_http_pub.file +fi +exit 0 diff --git a/eventmesh-test/bin/http_sub.sh b/eventmesh-test/bin/http_sub.sh new file mode 100644 index 0000000000..c37ced41e5 --- /dev/null +++ b/eventmesh-test/bin/http_sub.sh @@ -0,0 +1,135 @@ +#!/bin/sh + + +# Copyright (C) @2017 Webank Group Holding Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. +# + +#=========================================================================================== +# Java Environment Setting +#=========================================================================================== +set -e +#服务器配置可能不一致,增加这些配置避免乱码问题 +export LANG=en_US.UTF-8 +export LC_CTYPE=en_US.UTF-8 +export LC_ALL=en_US.UTF-8 + +TMP_JAVA_HOME="/nemo/jdk1.8.0_152" + +#detect operating system. +OS=$(uname -o) + +function is_java8 { + local _java="$1" + [[ -x "$_java" ]] || return 1 + [[ "$("$_java" -version 2>&1)" =~ 'java version "1.8' || "$("$_java" -version 2>&1)" =~ 'openjdk version "1.8' ]] || return 2 + return 0 +} + + +function get_pid { + local ppid="" + if [ -f ${DEMO_HOME}/bin/pid_http_sub.file ]; then + ppid=$(cat ${DEMO_HOME}/bin/pid_http_sub.file) + else + if [[ $OS =~ Msys ]]; then + # 在Msys上存在可能无法kill识别出的进程的BUG + ppid=`jps -v | grep -i "com.webank.eventmesh.client.http.demo.sub.SpringBootDemoApplication" | grep java | grep -v grep | awk -F ' ' {'print $1'}` + elif [[ $OS =~ Darwin ]]; then + # 已知问题:grep java 可能无法精确识别java进程 + ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "com.webank.eventmesh.client.http.demo.sub.SpringBootDemoApplication" | grep -Ev "^root" |awk -F ' ' {'print $2'}) + else + #在Linux服务器上要求尽可能精确识别进程 + ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $PROXY_HOME | grep -i "com.webank.eventmesh.client.http.demo.sub.SpringBootDemoApplication" | grep -Ev "^root" |awk -F ' ' {'print $2'}) + fi + fi + echo "$ppid"; +} + + +if [[ -d "$TMP_JAVA_HOME" ]] && is_java8 "$TMP_JAVA_HOME/bin/java"; then + JAVA="$TMP_JAVA_HOME/bin/java" +elif [[ -d "$JAVA_HOME" ]] && is_java8 "$JAVA_HOME/bin/java"; then + JAVA="$JAVA_HOME/bin/java" +elif is_java8 "/nemo/jdk8/bin/java"; then + JAVA="/nemo/jdk8/bin/java"; +elif is_java8 "/nemo/jdk1.8/bin/java"; then + JAVA="/nemo/jdk1.8/bin/java"; +elif is_java8 "/nemo/jdk/bin/java"; then + JAVA="/nemo/jdk/bin/java"; +elif is_java8 "$(which java)"; then + JAVA="$(which java)" +else + echo -e "ERROR\t java(1.8) not found, operation abort." + exit 9; +fi + +echo "http_sub_demo use java location= "$JAVA + +DEMO_HOME=`cd "./.." && pwd` + +export DEMO_HOME + +export DEMO_LOG_HOME=${DEMO_HOME}/logs + +echo "DEMO_HOME : ${DEMO_HOME}, DEMO_LOG_HOME : ${DEMO_LOG_HOME}" + +function make_logs_dir { + if [ ! -e "${DEMO_LOG_HOME}" ]; then mkdir -p "${DEMO_LOG_HOME}"; fi +} + +error_exit () +{ + echo "ERROR: $1 !!" + exit 1 +} + +export JAVA_HOME + +JAVA_OPT=`cat ${DEMO_HOME}/conf/server.env | grep APP_START_JVM_OPTION::: | awk -F ':::' {'print $2'}` +JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50" +JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${DEMO_HOME}/logs/demo_http_sub_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy" +JAVA_OPT="${JAVA_OPT} -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${DEMO_HOME}/logs -XX:ErrorFile=${PROXY_HOME}/logs/hs_err_%p.log" +JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m" +JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow" +JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch" +JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=8G" +JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking" +JAVA_OPT="${JAVA_OPT} -Dio.netty.leakDetectionLevel=advanced" +JAVA_OPT="${JAVA_OPT} -Dio.netty.allocator.type=pooled" +JAVA_OPT="${JAVA_OPT} -Djava.security.egd=file:/dev/./urandom" +JAVA_OPT="${JAVA_OPT} -Dlog4j.configurationFile=${DEMO_HOME}/conf/log4j2.xml" +#JAVA_OPT="${JAVA_OPT} -Dproxy.log.home=${DEMO_LOG_HOME}" +JAVA_OPT="${JAVA_OPT} -DconfPath=${DEMO_HOME}/conf" +JAVA_OPT="${JAVA_OPT} -Dlog4j2.AsyncQueueFullPolicy=Discard" +JAVA_OPT="${JAVA_OPT} -Drocketmq.client.logUseSlf4j=true" + +pid=$(get_pid) +if [ -n "$pid" ];then + echo -e "ERROR\t the server is already running (pid=$pid), there is no need to execute start.sh again." + exit 9; +fi + +make_logs_dir + +echo "using jdk[$JAVA]" >> ${DEMO_LOG_HOME}/demo_http_sub.out + + +DEMO_MAIN=com.webank.eventmesh.client.http.demo.sub.SpringBootDemoApplication +if [ $DOCKER ] +then + $JAVA $JAVA_OPT -classpath ${DEMO_HOME}/conf:${DEMO_HOME}/apps/*:${DEMO_HOME}/lib/* $DEMO_MAIN >> ${DEMO_LOG_HOME}/demo_http_sub.out +else + $JAVA $JAVA_OPT -classpath ${DEMO_HOME}/conf:${DEMO_HOME}/apps/*:${DEMO_HOME}/lib/* $DEMO_MAIN >> ${DEMO_LOG_HOME}/demo_http_sub.out 2>&1 & +echo $!>pid_http_sub.file +fi +exit 0 diff --git a/eventmesh-test/bin/tcp_pub.sh b/eventmesh-test/bin/tcp_pub.sh new file mode 100644 index 0000000000..b3fd74d919 --- /dev/null +++ b/eventmesh-test/bin/tcp_pub.sh @@ -0,0 +1,135 @@ +#!/bin/sh + + +# Copyright (C) @2017 Webank Group Holding Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. +# + +#=========================================================================================== +# Java Environment Setting +#=========================================================================================== +set -e +#服务器配置可能不一致,增加这些配置避免乱码问题 +export LANG=en_US.UTF-8 +export LC_CTYPE=en_US.UTF-8 +export LC_ALL=en_US.UTF-8 + +TMP_JAVA_HOME="/nemo/jdk1.8.0_152" + +#detect operating system. +OS=$(uname -o) + +function is_java8 { + local _java="$1" + [[ -x "$_java" ]] || return 1 + [[ "$("$_java" -version 2>&1)" =~ 'java version "1.8' || "$("$_java" -version 2>&1)" =~ 'openjdk version "1.8' ]] || return 2 + return 0 +} + + +function get_pid { + local ppid="" + if [ -f ${DEMO_HOME}/bin/pid_tcp_pub.file ]; then + ppid=$(cat ${DEMO_HOME}/bin/pid_tcp_pub.file) + else + if [[ $OS =~ Msys ]]; then + # 在Msys上存在可能无法kill识别出的进程的BUG + ppid=`jps -v | grep -i "com.webank.eventmesh.client.tcp.demo.AsyncPublish" | grep java | grep -v grep | awk -F ' ' {'print $1'}` + elif [[ $OS =~ Darwin ]]; then + # 已知问题:grep java 可能无法精确识别java进程 + ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "com.webank.eventmesh.client.tcp.demo.AsyncPublish" | grep -Ev "^root" |awk -F ' ' {'print $2'}) + else + #在Linux服务器上要求尽可能精确识别进程 + ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $PROXY_HOME | grep -i "com.webank.eventmesh.client.tcp.demo.AsyncPublish" | grep -Ev "^root" |awk -F ' ' {'print $2'}) + fi + fi + echo "ppid"; +} + + +if [[ -d "$TMP_JAVA_HOME" ]] && is_java8 "$TMP_JAVA_HOME/bin/java"; then + JAVA="$TMP_JAVA_HOME/bin/java" +elif [[ -d "$JAVA_HOME" ]] && is_java8 "$JAVA_HOME/bin/java"; then + JAVA="$JAVA_HOME/bin/java" +elif is_java8 "/nemo/jdk8/bin/java"; then + JAVA="/nemo/jdk8/bin/java"; +elif is_java8 "/nemo/jdk1.8/bin/java"; then + JAVA="/nemo/jdk1.8/bin/java"; +elif is_java8 "/nemo/jdk/bin/java"; then + JAVA="/nemo/jdk/bin/java"; +elif is_java8 "$(which java)"; then + JAVA="$(which java)" +else + echo -e "ERROR\t java(1.8) not found, operation abort." + exit 9; +fi + +echo "tcp_pub_demo use java location= "$JAVA + +DEMO_HOME=`cd "./.." && pwd` + +export DEMO_HOME + +export DEMO_LOG_HOME=${DEMO_HOME}/logs + +echo "DEMO_HOME : ${DEMO_HOME}, DEMO_LOG_HOME : ${DEMO_LOG_HOME}" + +function make_logs_dir { + if [ ! -e "${DEMO_LOG_HOME}" ]; then mkdir -p "${DEMO_LOG_HOME}"; fi +} + +error_exit () +{ + echo "ERROR: $1 !!" + exit 1 +} + +export JAVA_HOME + +JAVA_OPT=`cat ${DEMO_HOME}/conf/server.env | grep APP_START_JVM_OPTION::: | awk -F ':::' {'print $2'}` +JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50" +JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${DEMO_HOME}/logs/demo_tcp_pub_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy" +JAVA_OPT="${JAVA_OPT} -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${DEMO_HOME}/logs -XX:ErrorFile=${PROXY_HOME}/logs/hs_err_%p.log" +JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m" +JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow" +JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch" +JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=8G" +JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking" +JAVA_OPT="${JAVA_OPT} -Dio.netty.leakDetectionLevel=advanced" +JAVA_OPT="${JAVA_OPT} -Dio.netty.allocator.type=pooled" +JAVA_OPT="${JAVA_OPT} -Djava.security.egd=file:/dev/./urandom" +JAVA_OPT="${JAVA_OPT} -Dlog4j.configurationFile=${DEMO_HOME}/conf/log4j2.xml" +#JAVA_OPT="${JAVA_OPT} -Dproxy.log.home=${DEMO_LOG_HOME}" +JAVA_OPT="${JAVA_OPT} -DconfPath=${DEMO_HOME}/conf" +JAVA_OPT="${JAVA_OPT} -Dlog4j2.AsyncQueueFullPolicy=Discard" +JAVA_OPT="${JAVA_OPT} -Drocketmq.client.logUseSlf4j=true" + +pid=$(get_pid) +if [ -n "$pid" ];then + echo -e "ERROR\t the server is already running (pid=$pid), there is no need to execute start.sh again." + exit 9; +fi + +make_logs_dir + +echo "using jdk[$JAVA]" >> ${DEMO_LOG_HOME}/demo_tcp_pub.out + + +DEMO_MAIN=com.webank.eventmesh.client.tcp.demo.AsyncPublish +if [ $DOCKER ] +then + $JAVA $JAVA_OPT -classpath ${DEMO_HOME}/conf:${DEMO_HOME}/apps/*:${DEMO_HOME}/lib/* $DEMO_MAIN >> ${DEMO_LOG_HOME}/demo_tcp_pub.out +else + $JAVA $JAVA_OPT -classpath ${DEMO_HOME}/conf:${DEMO_HOME}/apps/*:${DEMO_HOME}/lib/* $DEMO_MAIN >> ${DEMO_LOG_HOME}/demo_tcp_pub.out 2>&1 & +echo $!>pid_tcp_pub.file +fi +exit 0 diff --git a/eventmesh-test/bin/tcp_pub_broadcast.sh b/eventmesh-test/bin/tcp_pub_broadcast.sh new file mode 100644 index 0000000000..3814e3aa51 --- /dev/null +++ b/eventmesh-test/bin/tcp_pub_broadcast.sh @@ -0,0 +1,135 @@ +#!/bin/sh + + +# Copyright (C) @2017 Webank Group Holding Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. +# + +#=========================================================================================== +# Java Environment Setting +#=========================================================================================== +set -e +#服务器配置可能不一致,增加这些配置避免乱码问题 +export LANG=en_US.UTF-8 +export LC_CTYPE=en_US.UTF-8 +export LC_ALL=en_US.UTF-8 + +TMP_JAVA_HOME="/nemo/jdk1.8.0_152" + +#detect operating system. +OS=$(uname -o) + +function is_java8 { + local _java="$1" + [[ -x "$_java" ]] || return 1 + [[ "$("$_java" -version 2>&1)" =~ 'java version "1.8' || "$("$_java" -version 2>&1)" =~ 'openjdk version "1.8' ]] || return 2 + return 0 +} + + +function get_pid { + local ppid="" + if [ -f ${DEMO_HOME}/bin/pid_tcp_pub_broadcast.file ]; then + ppid=$(cat ${DEMO_HOME}/bin/pid_tcp_pub_broadcast.file) + else + if [[ $OS =~ Msys ]]; then + # 在Msys上存在可能无法kill识别出的进程的BUG + ppid=`jps -v | grep -i "com.webank.eventmesh.client.tcp.demo.AsyncPublishBroadcast" | grep java | grep -v grep | awk -F ' ' {'print $1'}` + elif [[ $OS =~ Darwin ]]; then + # 已知问题:grep java 可能无法精确识别java进程 + ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "com.webank.eventmesh.client.tcp.demo.AsyncPublishBroadcast" | grep -Ev "^root" |awk -F ' ' {'print $2'}) + else + #在Linux服务器上要求尽可能精确识别进程 + ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $DEMO_HOME | grep -i "com.webank.eventmesh.client.tcp.demo.AsyncPublishBroadcast" | grep -Ev "^root" |awk -F ' ' {'print $2'}) + fi + fi + echo "$ppid"; +} + + +if [[ -d "$TMP_JAVA_HOME" ]] && is_java8 "$TMP_JAVA_HOME/bin/java"; then + JAVA="$TMP_JAVA_HOME/bin/java" +elif [[ -d "$JAVA_HOME" ]] && is_java8 "$JAVA_HOME/bin/java"; then + JAVA="$JAVA_HOME/bin/java" +elif is_java8 "/nemo/jdk8/bin/java"; then + JAVA="/nemo/jdk8/bin/java"; +elif is_java8 "/nemo/jdk1.8/bin/java"; then + JAVA="/nemo/jdk1.8/bin/java"; +elif is_java8 "/nemo/jdk/bin/java"; then + JAVA="/nemo/jdk/bin/java"; +elif is_java8 "$(which java)"; then + JAVA="$(which java)" +else + echo -e "ERROR\t java(1.8) not found, operation abort." + exit 9; +fi + +echo "tcp_pub_demo use java location= "$JAVA + +DEMO_HOME=`cd "./.." && pwd` + +export DEMO_HOME + +export DEMO_LOG_HOME=${DEMO_HOME}/logs + +echo "DEMO_HOME : ${DEMO_HOME}, DEMO_LOG_HOME : ${DEMO_LOG_HOME}" + +function make_logs_dir { + if [ ! -e "${DEMO_LOG_HOME}" ]; then mkdir -p "${DEMO_LOG_HOME}"; fi +} + +error_exit () +{ + echo "ERROR: $1 !!" + exit 1 +} + +export JAVA_HOME + +JAVA_OPT=`cat ${DEMO_HOME}/conf/server.env | grep APP_START_JVM_OPTION::: | awk -F ':::' {'print $2'}` +JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50" +JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${DEMO_HOME}/logs/demo_tcp_pub_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy" +JAVA_OPT="${JAVA_OPT} -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${DEMO_HOME}/logs -XX:ErrorFile=${DEMO_HOME}/logs/hs_err_%p.log" +JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m" +JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow" +JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch" +JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=8G" +JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking" +JAVA_OPT="${JAVA_OPT} -Dio.netty.leakDetectionLevel=advanced" +JAVA_OPT="${JAVA_OPT} -Dio.netty.allocator.type=pooled" +JAVA_OPT="${JAVA_OPT} -Djava.security.egd=file:/dev/./urandom" +JAVA_OPT="${JAVA_OPT} -Dlog4j.configurationFile=${DEMO_HOME}/conf/log4j2.xml" +#JAVA_OPT="${JAVA_OPT} -Dproxy.log.home=${DEMO_LOG_HOME}" +JAVA_OPT="${JAVA_OPT} -DconfPath=${DEMO_HOME}/conf" +JAVA_OPT="${JAVA_OPT} -Dlog4j2.AsyncQueueFullPolicy=Discard" +JAVA_OPT="${JAVA_OPT} -Drocketmq.client.logUseSlf4j=true" + +pid=$(get_pid) +if [ -n "$pid" ];then + echo -e "ERROR\t the server is already running (pid=$pid), there is no need to execute start.sh again." + exit 9; +fi + +make_logs_dir + +echo "using jdk[$JAVA]" >> ${DEMO_LOG_HOME}/demo_tcp_pub_broadcast.out + + +DEMO_MAIN=com.webank.eventmesh.client.tcp.demo.AsyncPublishBroadcast +if [ $DOCKER ] +then + $JAVA $JAVA_OPT -classpath ${DEMO_HOME}/conf:${DEMO_HOME}/apps/*:${DEMO_HOME}/lib/* $DEMO_MAIN >> ${DEMO_LOG_HOME}/demo_tcp_pub_broadcast.out +else + $JAVA $JAVA_OPT -classpath ${DEMO_HOME}/conf:${DEMO_HOME}/apps/*:${DEMO_HOME}/lib/* $DEMO_MAIN >> ${DEMO_LOG_HOME}/demo_tcp_pub_broadcast.out 2>&1 & +echo $!>pid_tcp_pub_broadcast.file +fi +exit 0 diff --git a/eventmesh-test/bin/tcp_sub.sh b/eventmesh-test/bin/tcp_sub.sh new file mode 100644 index 0000000000..0454abfe68 --- /dev/null +++ b/eventmesh-test/bin/tcp_sub.sh @@ -0,0 +1,135 @@ +#!/bin/sh + + +# Copyright (C) @2017 Webank Group Holding Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. +# + +#=========================================================================================== +# Java Environment Setting +#=========================================================================================== +set -e +#服务器配置可能不一致,增加这些配置避免乱码问题 +export LANG=en_US.UTF-8 +export LC_CTYPE=en_US.UTF-8 +export LC_ALL=en_US.UTF-8 + +TMP_JAVA_HOME="/nemo/jdk1.8.0_152" + +#detect operating system. +OS=$(uname -o) + +function is_java8 { + local _java="$1" + [[ -x "$_java" ]] || return 1 + [[ "$("$_java" -version 2>&1)" =~ 'java version "1.8' || "$("$_java" -version 2>&1)" =~ 'openjdk version "1.8' ]] || return 2 + return 0 +} + + +function get_pid { + local ppid="" + if [ -f ${DEMO_HOME}/bin/pid_tcp_sub.file ]; then + ppid=$(cat ${DEMO_HOME}/bin/pid_tcp_sub.file) + else + if [[ $OS =~ Msys ]]; then + # 在Msys上存在可能无法kill识别出的进程的BUG + ppid=`jps -v | grep -i "com.webank.eventmesh.client.tcp.demo.AsyncSubscribe" | grep java | grep -v grep | awk -F ' ' {'print $1'}` + elif [[ $OS =~ Darwin ]]; then + # 已知问题:grep java 可能无法精确识别java进程 + ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "com.webank.eventmesh.client.tcp.demo.AsyncSubscribe" | grep -Ev "^root" |awk -F ' ' {'print $2'}) + else + #在Linux服务器上要求尽可能精确识别进程 + ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $DEMO_HOME | grep -i "com.webank.eventmesh.client.tcp.demo.AsyncSubscribe" | grep -Ev "^root" |awk -F ' ' {'print $2'}) + fi + fi + echo "$ppid"; +} + + +if [[ -d "$TMP_JAVA_HOME" ]] && is_java8 "$TMP_JAVA_HOME/bin/java"; then + JAVA="$TMP_JAVA_HOME/bin/java" +elif [[ -d "$JAVA_HOME" ]] && is_java8 "$JAVA_HOME/bin/java"; then + JAVA="$JAVA_HOME/bin/java" +elif is_java8 "/nemo/jdk8/bin/java"; then + JAVA="/nemo/jdk8/bin/java"; +elif is_java8 "/nemo/jdk1.8/bin/java"; then + JAVA="/nemo/jdk1.8/bin/java"; +elif is_java8 "/nemo/jdk/bin/java"; then + JAVA="/nemo/jdk/bin/java"; +elif is_java8 "$(which java)"; then + JAVA="$(which java)" +else + echo -e "ERROR\t java(1.8) not found, operation abort." + exit 9; +fi + +echo "tcp_sub_demo use java location= "$JAVA + +DEMO_HOME=`cd "./.." && pwd` + +export DEMO_HOME + +export DEMO_LOG_HOME=${DEMO_HOME}/logs + +echo "DEMO_HOME : ${DEMO_HOME}, DEMO_LOG_HOME : ${DEMO_LOG_HOME}" + +function make_logs_dir { + if [ ! -e "${DEMO_LOG_HOME}" ]; then mkdir -p "${DEMO_LOG_HOME}"; fi +} + +error_exit () +{ + echo "ERROR: $1 !!" + exit 1 +} + +export JAVA_HOME + +JAVA_OPT=`cat ${DEMO_HOME}/conf/server.env | grep APP_START_JVM_OPTION::: | awk -F ':::' {'print $2'}` +JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50" +JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${DEMO_HOME}/logs/demo_tcp_sub_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy" +JAVA_OPT="${JAVA_OPT} -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${DEMO_HOME}/logs -XX:ErrorFile=${DEMO_HOME}/logs/hs_err_%p.log" +JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m" +JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow" +JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch" +JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=8G" +JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking" +JAVA_OPT="${JAVA_OPT} -Dio.netty.leakDetectionLevel=advanced" +JAVA_OPT="${JAVA_OPT} -Dio.netty.allocator.type=pooled" +JAVA_OPT="${JAVA_OPT} -Djava.security.egd=file:/dev/./urandom" +JAVA_OPT="${JAVA_OPT} -Dlog4j.configurationFile=${DEMO_HOME}/conf/log4j2.xml" +#JAVA_OPT="${JAVA_OPT} -Dproxy.log.home=${DEMO_LOG_HOME}" +JAVA_OPT="${JAVA_OPT} -DconfPath=${DEMO_HOME}/conf" +JAVA_OPT="${JAVA_OPT} -Dlog4j2.AsyncQueueFullPolicy=Discard" +JAVA_OPT="${JAVA_OPT} -Drocketmq.client.logUseSlf4j=true" + +pid=$(get_pid) +if [ -n "$pid" ];then + echo -e "ERROR\t the server is already running (pid=$pid), there is no need to execute start.sh again." + exit 9; +fi + +make_logs_dir + +echo "using jdk[$JAVA]" >> ${DEMO_LOG_HOME}/demo_tcp_sub.out + + +DEMO_MAIN=com.webank.eventmesh.client.tcp.demo.AsyncSubscribe +if [ $DOCKER ] +then + $JAVA $JAVA_OPT -classpath ${DEMO_HOME}/conf:${DEMO_HOME}/apps/*:${DEMO_HOME}/lib/* $DEMO_MAIN >> ${DEMO_LOG_HOME}/demo_tcp_sub.out +else + $JAVA $JAVA_OPT -classpath ${DEMO_HOME}/conf:${DEMO_HOME}/apps/*:${DEMO_HOME}/lib/* $DEMO_MAIN >> ${DEMO_LOG_HOME}/demo_tcp_sub.out 2>&1 & +echo $!>pid_tcp_sub.file +fi +exit 0 diff --git a/eventmesh-test/bin/tcp_sub_broadcast.sh b/eventmesh-test/bin/tcp_sub_broadcast.sh new file mode 100644 index 0000000000..9d460b0920 --- /dev/null +++ b/eventmesh-test/bin/tcp_sub_broadcast.sh @@ -0,0 +1,135 @@ +#!/bin/sh + + +# Copyright (C) @2017 Webank Group Holding Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. +# + +#=========================================================================================== +# Java Environment Setting +#=========================================================================================== +set -e +#服务器配置可能不一致,增加这些配置避免乱码问题 +export LANG=en_US.UTF-8 +export LC_CTYPE=en_US.UTF-8 +export LC_ALL=en_US.UTF-8 + +TMP_JAVA_HOME="/nemo/jdk1.8.0_152" + +#detect operating system. +OS=$(uname -o) + +function is_java8 { + local _java="$1" + [[ -x "$_java" ]] || return 1 + [[ "$("$_java" -version 2>&1)" =~ 'java version "1.8' || "$("$_java" -version 2>&1)" =~ 'openjdk version "1.8' ]] || return 2 + return 0 +} + + +function get_pid { + local ppid="" + if [ -f ${PROXY_HOME}/bin/pid_tcp_sub_broadcast.file ]; then + ppid=$(cat ${PROXY_HOME}/bin/pid_tcp_sub_broadcast.file) + else + if [[ $OS =~ Msys ]]; then + # 在Msys上存在可能无法kill识别出的进程的BUG + ppid=`jps -v | grep -i "com.webank.eventmesh.client.tcp.demo.AsyncSubscribeBroadcast" | grep java | grep -v grep | awk -F ' ' {'print $1'}` + elif [[ $OS =~ Darwin ]]; then + # 已知问题:grep java 可能无法精确识别java进程 + ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "com.webank.eventmesh.client.tcp.demo.AsyncSubscribeBroadcast" | grep -Ev "^root" |awk -F ' ' {'print $2'}) + else + #在Linux服务器上要求尽可能精确识别进程 + ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $PROXY_HOME | grep -i "com.webank.eventmesh.client.tcp.demo.AsyncSubscribeBroadcast" | grep -Ev "^root" |awk -F ' ' {'print $2'}) + fi + fi + echo "$ppid"; +} + + +if [[ -d "$TMP_JAVA_HOME" ]] && is_java8 "$TMP_JAVA_HOME/bin/java"; then + JAVA="$TMP_JAVA_HOME/bin/java" +elif [[ -d "$JAVA_HOME" ]] && is_java8 "$JAVA_HOME/bin/java"; then + JAVA="$JAVA_HOME/bin/java" +elif is_java8 "/nemo/jdk8/bin/java"; then + JAVA="/nemo/jdk8/bin/java"; +elif is_java8 "/nemo/jdk1.8/bin/java"; then + JAVA="/nemo/jdk1.8/bin/java"; +elif is_java8 "/nemo/jdk/bin/java"; then + JAVA="/nemo/jdk/bin/java"; +elif is_java8 "$(which java)"; then + JAVA="$(which java)" +else + echo -e "ERROR\t java(1.8) not found, operation abort." + exit 9; +fi + +echo "tcp_sub_demo use java location= "$JAVA + +DEMO_HOME=`cd "./.." && pwd` + +export DEMO_HOME + +export DEMO_LOG_HOME=${DEMO_HOME}/logs + +echo "DEMO_HOME : ${DEMO_HOME}, DEMO_LOG_HOME : ${DEMO_LOG_HOME}" + +function make_logs_dir { + if [ ! -e "${DEMO_LOG_HOME}" ]; then mkdir -p "${DEMO_LOG_HOME}"; fi +} + +error_exit () +{ + echo "ERROR: $1 !!" + exit 1 +} + +export JAVA_HOME + +JAVA_OPT=`cat ${DEMO_HOME}/conf/server.env | grep APP_START_JVM_OPTION::: | awk -F ':::' {'print $2'}` +JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50" +JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${DEMO_HOME}/logs/demo_tcp_sub_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy" +JAVA_OPT="${JAVA_OPT} -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${DEMO_HOME}/logs -XX:ErrorFile=${DEMO_HOME}/logs/hs_err_%p.log" +JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m" +JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow" +JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch" +JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=8G" +JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking" +JAVA_OPT="${JAVA_OPT} -Dio.netty.leakDetectionLevel=advanced" +JAVA_OPT="${JAVA_OPT} -Dio.netty.allocator.type=pooled" +JAVA_OPT="${JAVA_OPT} -Djava.security.egd=file:/dev/./urandom" +JAVA_OPT="${JAVA_OPT} -Dlog4j.configurationFile=${DEMO_HOME}/conf/log4j2.xml" +#JAVA_OPT="${JAVA_OPT} -Dproxy.log.home=${DEMO_LOG_HOME}" +JAVA_OPT="${JAVA_OPT} -DconfPath=${DEMO_HOME}/conf" +JAVA_OPT="${JAVA_OPT} -Dlog4j2.AsyncQueueFullPolicy=Discard" +JAVA_OPT="${JAVA_OPT} -Drocketmq.client.logUseSlf4j=true" + +pid=$(get_pid) +if [ -n "$pid" ];then + echo -e "ERROR\t the server is already running (pid=$pid), there is no need to execute start.sh again." + exit 9; +fi + +make_logs_dir + +echo "using jdk[$JAVA]" >> ${DEMO_LOG_HOME}/demo_tcp_sub_broadcast.out + + +DEMO_MAIN=com.webank.eventmesh.client.tcp.demo.AsyncSubscribeBroadcast +if [ $DOCKER ] +then + $JAVA $JAVA_OPT -classpath ${DEMO_HOME}/conf:${DEMO_HOME}/apps/*:${DEMO_HOME}/lib/* $DEMO_MAIN >> ${DEMO_LOG_HOME}/demo_tcp_sub_broadcast.out +else + $JAVA $JAVA_OPT -classpath ${DEMO_HOME}/conf:${DEMO_HOME}/apps/*:${DEMO_HOME}/lib/* $DEMO_MAIN >> ${DEMO_LOG_HOME}/demo_tcp_sub_broadcast.out 2>&1 & +echo $!>pid_tcp_sub_broadcast.file +fi +exit 0 diff --git a/eventmesh-test/build.gradle b/eventmesh-test/build.gradle new file mode 100644 index 0000000000..8a72fb60a8 --- /dev/null +++ b/eventmesh-test/build.gradle @@ -0,0 +1,111 @@ +buildscript { + repositories { + // 优先使用国内源 + maven { url 'https://maven.aliyun.com/repository/public' } + mavenCentral() + } +// dependencies { +// // 让spring-boot支持gradle +// classpath("org.springframework.boot:spring-boot-gradle-plugin:2.1.6.RELEASE") +// } +} + +//apply plugin: 'org.springframework.boot' +//apply plugin: "io.spring.dependency-management" +apply plugin: 'java' +apply plugin: "maven" +apply plugin: "eclipse" +apply plugin: "idea" + +group 'com.webank.eventmesh' +version '1.2.0-SNAPSHOT' + +sourceCompatibility = 1.8 + +repositories { + mavenCentral() +} + +//List log4j2 = [ +// "org.apache.logging.log4j:log4j-api:2.13.3", +// "org.apache.logging.log4j:log4j-core:2.13.3", +// "org.apache.logging.log4j:log4j-slf4j-impl:2.13.3", +// "org.apache.logging.log4j:log4j-web:2.13.3", +// "com.lmax:disruptor:3.4.2" +//] +// +//List sl4j = [ +// "org.slf4j:slf4j-api:1.7.30" +//] + +dependencies { +// compile log4j2, sl4j +// testCompile log4j2, sl4j + compile project(":eventmesh-sdk-java") + //Spring模块 + compile("org.springframework.boot:spring-boot-starter-web:2.1.6.RELEASE") + testCompile group: 'junit', name: 'junit', version: '4.12' + +} + +configurations.all{ + exclude group: "ch.qos.logback" + exclude group: 'org.springframework.boot', module: 'spring-boot-starter-logging' +} + +clean.doFirst { + delete 'build' + delete 'dist' +} + +task testdist(dependsOn: ['jar']) { + + doFirst { + new File(projectDir.getPath() , 'dist/bin').mkdirs() + new File(projectDir.getPath() , 'dist/apps').mkdirs() + new File(projectDir.getPath() , 'dist/conf').mkdirs() + new File(projectDir.getPath() , 'dist/lib').mkdirs() + } + + doLast { + copy { + into('dist/apps/') + from project.jar.getArchivePath() + } + copy { + into 'dist/lib' + from project.configurations.runtime + } + + copy { + into 'dist/bin' + from '../eventmesh-test/bin' + } + + copy { + into 'dist/conf' + from 'conf' + } + } +} + +task testtar(type: Tar) { + extension = 'tar.gz' + compression = Compression.GZIP + archiveName = project.name + '_' + project.version + '.' + extension + destinationDir = new File(projectDir.getPath(), 'build') + into('/') { + from 'dist' + } +} + +task packageTestSources(type: Jar) { + from project.sourceSets.main.allSource + classifier = 'sources' // either here or in artifacts block +} + +artifacts { + archives jar + archives packageJavadoc + archives packageSources +} \ No newline at end of file diff --git a/eventmesh-test/conf/application.properties b/eventmesh-test/conf/application.properties new file mode 100644 index 0000000000..2886a8c218 --- /dev/null +++ b/eventmesh-test/conf/application.properties @@ -0,0 +1,4 @@ +server.port=8088 +eventmesh.ip=127.0.0.1 +eventmesh.http.port=10105 +eventmesh.tcp.port=10000 \ No newline at end of file diff --git a/eventmesh-test/conf/log4j2.xml b/eventmesh-test/conf/log4j2.xml new file mode 100644 index 0000000000..99f79985fb --- /dev/null +++ b/eventmesh-test/conf/log4j2.xml @@ -0,0 +1,35 @@ + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/eventmesh-test/conf/server.env b/eventmesh-test/conf/server.env new file mode 100644 index 0000000000..77e68cefeb --- /dev/null +++ b/eventmesh-test/conf/server.env @@ -0,0 +1 @@ +APP_START_JVM_OPTION:::-server -Xms64M -Xmx128M -Xmn64m -XX:SurvivorRatio=4 -Duser.language=zh diff --git a/eventmesh-test/gradle.properties b/eventmesh-test/gradle.properties new file mode 100644 index 0000000000..e7cdcc01d2 --- /dev/null +++ b/eventmesh-test/gradle.properties @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +group=com.webank.eventmesh +version=1.2.0-SNAPSHOT +mavenUserName= +mavenPassword= diff --git a/eventmesh-test/src/main/java/com/webank/eventmesh/client/http/demo/AsyncPublishInstance.java b/eventmesh-test/src/main/java/com/webank/eventmesh/client/http/demo/AsyncPublishInstance.java new file mode 100644 index 0000000000..88a9d050f9 --- /dev/null +++ b/eventmesh-test/src/main/java/com/webank/eventmesh/client/http/demo/AsyncPublishInstance.java @@ -0,0 +1,73 @@ +package com.webank.eventmesh.client.http.demo; +import com.webank.eventmesh.client.http.conf.LiteClientConfig; +import com.webank.eventmesh.client.http.producer.LiteProducer; +import com.webank.eventmesh.common.Constants; +import com.webank.eventmesh.common.IPUtil; +import com.webank.eventmesh.common.LiteMessage; +import com.webank.eventmesh.common.ThreadUtil; +import com.webank.eventmesh.util.Utils; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +public class AsyncPublishInstance { + + public static Logger logger = LoggerFactory.getLogger(AsyncPublishInstance.class); + + public static void main(String[] args) throws Exception { + Properties properties = Utils.readPropertiesFile("application.properties"); + final String eventMeshIp = properties.getProperty("eventmesh.ip"); + final String eventMeshHttpPort = properties.getProperty("eventmesh.http.port"); + + LiteProducer liteProducer = null; + try{ +// String proxyIPPort = args[0]; + String proxyIPPort = eventMeshIp + ":" + eventMeshHttpPort; +// final String topic = args[1]; + final String topic = "FT0-e-80010000-01-1"; + if (StringUtils.isBlank(proxyIPPort)) { + // if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105 + proxyIPPort = "127.0.0.1:10105"; + } + + LiteClientConfig weMQProxyClientConfig = new LiteClientConfig(); + weMQProxyClientConfig.setLiteProxyAddr(proxyIPPort) + .setEnv("env") + .setIdc("idc") + .setDcn("dcn") + .setIp(IPUtil.getLocalAddress()) + .setSys("1234") + .setPid(String.valueOf(ThreadUtil.getPID())); + + liteProducer = new LiteProducer(weMQProxyClientConfig); + liteProducer.start(); + for(int i = 0; i < 5; i++) { + LiteMessage liteMessage = new LiteMessage(); + liteMessage.setBizSeqNo(RandomStringUtils.randomNumeric(30)) +// .setContent("contentStr with special protocal") + .setContent("testPublishMessage") + .setTopic(topic) + .setUniqueId(RandomStringUtils.randomNumeric(30)) + .addProp(Constants.PROXY_MESSAGE_CONST_TTL, String.valueOf(4 * 1000)); + + boolean flag = liteProducer.publish(liteMessage); + Thread.sleep(1000); + logger.info("publish result , {}", flag); + } + }catch (Exception e){ + logger.warn("publish msg failed", e); + } + + try{ + Thread.sleep(30000); + if(liteProducer != null){ + liteProducer.shutdown(); + } + }catch (Exception e1){ + logger.warn("producer shutdown exception", e1); + } + } +} diff --git a/eventmesh-test/src/main/java/com/webank/eventmesh/client/http/demo/AsyncSyncRequestInstance.java b/eventmesh-test/src/main/java/com/webank/eventmesh/client/http/demo/AsyncSyncRequestInstance.java new file mode 100644 index 0000000000..69ebef2afa --- /dev/null +++ b/eventmesh-test/src/main/java/com/webank/eventmesh/client/http/demo/AsyncSyncRequestInstance.java @@ -0,0 +1,81 @@ +package com.webank.eventmesh.client.http.demo; +import com.webank.eventmesh.client.http.conf.LiteClientConfig; +import com.webank.eventmesh.client.http.producer.LiteProducer; +import com.webank.eventmesh.client.http.producer.RRCallback; +import com.webank.eventmesh.common.IPUtil; +import com.webank.eventmesh.common.LiteMessage; +import com.webank.eventmesh.common.ThreadUtil; +import com.webank.eventmesh.util.Utils; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +public class AsyncSyncRequestInstance { + + public static Logger logger = LoggerFactory.getLogger(AsyncSyncRequestInstance.class); + + public static void main(String[] args) throws Exception { + + Properties properties = Utils.readPropertiesFile("application.properties"); + final String eventMeshIp = properties.getProperty("eventmesh.ip"); + final String eventMeshHttpPort = properties.getProperty("eventmesh.http.port"); + + LiteProducer liteProducer = null; + try { +// String proxyIPPort = args[0]; + String proxyIPPort = eventMeshIp + ":" + eventMeshHttpPort; +// final String topic = args[1]; + final String topic = "FT0-e-80010000-01-1"; + if (StringUtils.isBlank(proxyIPPort)) { + // if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105 + proxyIPPort = "127.0.0.1:10105"; + } + + LiteClientConfig weMQProxyClientConfig = new LiteClientConfig(); + weMQProxyClientConfig.setLiteProxyAddr(proxyIPPort) + .setEnv("env") + .setIdc("idc") + .setDcn("dcn") + .setIp(IPUtil.getLocalAddress()) + .setSys("1234") + .setPid(String.valueOf(ThreadUtil.getPID())); + + liteProducer = new LiteProducer(weMQProxyClientConfig); + + final long startTime = System.currentTimeMillis(); + final LiteMessage liteMessage = new LiteMessage(); + liteMessage.setBizSeqNo(RandomStringUtils.randomNumeric(30)) + .setContent("testAsyncMessage") + .setTopic(topic) + .setUniqueId(RandomStringUtils.randomNumeric(30)); + + liteProducer.request(liteMessage, new RRCallback() { + @Override + public void onSuccess(LiteMessage o) { + logger.debug("sendmsg : {}, return : {}, cost:{}ms", liteMessage.getContent(), o.getContent(), System.currentTimeMillis() - startTime); + } + + @Override + public void onException(Throwable e) { + logger.debug("sendmsg failed", e); + } + }, 3000); + + Thread.sleep(2000); + } catch (Exception e) { + logger.warn("async send msg failed", e); + } + + try{ + Thread.sleep(30000); + if(liteProducer != null){ + liteProducer.shutdown(); + } + }catch (Exception e1){ + logger.warn("producer shutdown exception", e1); + } + } +} diff --git a/eventmesh-test/src/main/java/com/webank/eventmesh/client/http/demo/SyncRequestInstance.java b/eventmesh-test/src/main/java/com/webank/eventmesh/client/http/demo/SyncRequestInstance.java new file mode 100644 index 0000000000..cea7dc20e2 --- /dev/null +++ b/eventmesh-test/src/main/java/com/webank/eventmesh/client/http/demo/SyncRequestInstance.java @@ -0,0 +1,73 @@ +package com.webank.eventmesh.client.http.demo; +import com.webank.eventmesh.client.http.conf.LiteClientConfig; +import com.webank.eventmesh.client.http.producer.LiteProducer; +import com.webank.eventmesh.common.IPUtil; +import com.webank.eventmesh.common.LiteMessage; +import com.webank.eventmesh.common.ThreadPoolFactory; +import com.webank.eventmesh.common.ThreadUtil; +import com.webank.eventmesh.util.Utils; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.Properties; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public class SyncRequestInstance { + + public static Logger logger = LoggerFactory.getLogger(SyncRequestInstance.class); + + public static void main(String[] args) throws Exception { + + LiteProducer liteProducer = null; + try { + String proxyIPPort = args[0]; + + final String topic = args[1]; + + if (StringUtils.isBlank(proxyIPPort)) { + // if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105 + proxyIPPort = "127.0.0.1:10105"; + } + + LiteClientConfig weMQProxyClientConfig = new LiteClientConfig(); + weMQProxyClientConfig.setLiteProxyAddr(proxyIPPort) + .setEnv("env") + .setIdc("idc") + .setDcn("dcn") + .setIp(IPUtil.getLocalAddress()) + .setSys("1234") + .setPid(String.valueOf(ThreadUtil.getPID())); + + liteProducer = new LiteProducer(weMQProxyClientConfig); + liteProducer.start(); + + long startTime = System.currentTimeMillis(); + LiteMessage liteMessage = new LiteMessage(); + liteMessage.setBizSeqNo(RandomStringUtils.randomNumeric(30)) + .setContent("contentStr with special protocal") + .setTopic(topic) + .setUniqueId(RandomStringUtils.randomNumeric(30)); + + LiteMessage rsp = liteProducer.request(liteMessage, 10000); + if (logger.isDebugEnabled()) { + logger.debug("sendmsg : {}, return : {}, cost:{}ms", liteMessage.getContent(), rsp.getContent(), System.currentTimeMillis() - startTime); + } + } catch (Exception e) { + logger.warn("send msg failed", e); + } + + try{ + Thread.sleep(30000); + if(liteProducer != null){ + liteProducer.shutdown(); + } + }catch (Exception e1){ + logger.warn("producer shutdown exception", e1); + } + } +} diff --git a/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/http/demo/sub/SpringBootDemoApplication.java b/eventmesh-test/src/main/java/com/webank/eventmesh/client/http/demo/sub/SpringBootDemoApplication.java similarity index 100% rename from eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/http/demo/sub/SpringBootDemoApplication.java rename to eventmesh-test/src/main/java/com/webank/eventmesh/client/http/demo/sub/SpringBootDemoApplication.java diff --git a/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/http/demo/sub/controller/SubController.java b/eventmesh-test/src/main/java/com/webank/eventmesh/client/http/demo/sub/controller/SubController.java similarity index 100% rename from eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/http/demo/sub/controller/SubController.java rename to eventmesh-test/src/main/java/com/webank/eventmesh/client/http/demo/sub/controller/SubController.java diff --git a/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/com/webank/eventmesh/client/http/demo/sub/service/SubService.java similarity index 76% rename from eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/http/demo/sub/service/SubService.java rename to eventmesh-test/src/main/java/com/webank/eventmesh/client/http/demo/sub/service/SubService.java index 4941ec0f77..2d77150760 100644 --- a/eventmesh-sdk-java/src/test/java/com/webank/eventmesh/client/http/demo/sub/service/SubService.java +++ b/eventmesh-test/src/main/java/com/webank/eventmesh/client/http/demo/sub/service/SubService.java @@ -1,20 +1,25 @@ package com.webank.eventmesh.client.http.demo.sub.service; +import com.webank.eventmesh.client.http.RemotingServer; import com.webank.eventmesh.client.http.conf.LiteClientConfig; import com.webank.eventmesh.client.http.consumer.LiteConsumer; import com.webank.eventmesh.common.IPUtil; import com.webank.eventmesh.common.ProxyException; import com.webank.eventmesh.common.ThreadUtil; +import com.webank.eventmesh.util.Utils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.SystemUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; +import org.springframework.remoting.support.RemotingSupport; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Properties; import java.util.concurrent.TimeUnit; @Component @@ -26,8 +31,14 @@ public class SubService implements InitializingBean { private String proxyIPPort = ""; + final Properties properties = Utils.readPropertiesFile("application.properties"); + final List topicList = Arrays.asList("FT0-e-80010000-01-1"); - final String url = "http://127.0.0.1:8088/sub/test"; + final String localIp = IPUtil.getLocalAddress(); + final String localPort = properties.getProperty("server.port"); + final String eventMeshIp = properties.getProperty("eventmesh.ip"); + final String eventMeshHttpPort = properties.getProperty("eventmesh.http.port"); + final String url = "http://" + localIp + ":" + localPort + "/sub/test"; final String env = "P"; final String idc = "FT"; final String dcn = "FT0"; @@ -38,7 +49,7 @@ public void afterPropertiesSet() throws Exception { if (StringUtils.isBlank(proxyIPPort)) { // if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105 - proxyIPPort = "127.0.0.1:10105"; + proxyIPPort = eventMeshIp + ":" + eventMeshHttpPort; } LiteClientConfig weMQProxyClientConfig = new LiteClientConfig(); weMQProxyClientConfig.setLiteProxyAddr(proxyIPPort) @@ -76,6 +87,6 @@ public void afterPropertiesSet() throws Exception { Thread.sleep(5 * 60 * 1000); - stopThread.start(); +// stopThread.start(); } } diff --git a/eventmesh-test/src/main/java/com/webank/eventmesh/client/tcp/common/AccessTestCaseTopicSet.java b/eventmesh-test/src/main/java/com/webank/eventmesh/client/tcp/common/AccessTestCaseTopicSet.java new file mode 100644 index 0000000000..694719c610 --- /dev/null +++ b/eventmesh-test/src/main/java/com/webank/eventmesh/client/tcp/common/AccessTestCaseTopicSet.java @@ -0,0 +1,17 @@ +package com.webank.eventmesh.client.tcp.common; + +/** + * Testcase set + */ +public class AccessTestCaseTopicSet { + +// public static final String TOPIC_PRX_WQ2ClientBroadCast = "topic-broadcast-test"; + public static final String TOPIC_PRX_WQ2ClientBroadCast = "FT0-e-80030000-01-3"; + +// public static final String TOPIC_PRX_SyncSubscribeTest = "topic-sync-test"; + public static final String TOPIC_PRX_SyncSubscribeTest = "FT0-s-80000000-01-0"; + +// public static final String TOPIC_PRX_WQ2ClientUniCast = "topic-async-test"; + public static final String TOPIC_PRX_WQ2ClientUniCast = "FT0-e-80010000-01-1"; + +} diff --git a/eventmesh-test/src/main/java/com/webank/eventmesh/client/tcp/common/AccessTestUtils.java b/eventmesh-test/src/main/java/com/webank/eventmesh/client/tcp/common/AccessTestUtils.java new file mode 100644 index 0000000000..54ca3f8277 --- /dev/null +++ b/eventmesh-test/src/main/java/com/webank/eventmesh/client/tcp/common/AccessTestUtils.java @@ -0,0 +1,130 @@ +package com.webank.eventmesh.client.tcp.common; + +import com.webank.eventmesh.common.protocol.tcp.AccessMessage; +import com.webank.eventmesh.common.protocol.tcp.Command; +import com.webank.eventmesh.common.protocol.tcp.Header; +import com.webank.eventmesh.common.protocol.tcp.Package; +import com.webank.eventmesh.common.protocol.tcp.UserAgent; + +import java.util.concurrent.ThreadLocalRandom; + +import static com.webank.eventmesh.client.tcp.common.AccessTestCaseTopicSet.*; +import static com.webank.eventmesh.common.protocol.tcp.Command.RESPONSE_TO_SERVER; + +public class AccessTestUtils { + private static final int seqLength = 10; + + public static UserAgent generateClient1() { + UserAgent user = new UserAgent(); + user.setDcn("AC0"); + user.setHost("127.0.0.1"); + user.setPassword(generateRandomString(8)); + user.setUsername("PU4283"); + user.setPath("/data/app/umg_proxy"); + user.setPort(8362); + user.setSubsystem("5023"); + user.setPid(32893); + user.setVersion("2.0.11"); + user.setIdc("FT"); + return user; + } + + public static UserAgent generateClient2() { + UserAgent user = new UserAgent(); + user.setDcn("FT0"); + user.setHost("127.0.0.1"); + user.setPassword(generateRandomString(8)); + user.setUsername("PU4283"); + user.setPath("/data/app/umg_proxy"); + user.setPort(9362); + user.setSubsystem("5017"); + user.setPid(42893); + user.setVersion("2.0.11"); + user.setIdc("FT"); + return user; + } + + public static Package syncRR() { + Package msg = new Package(); + msg.setHeader(new Header(Command.REQUEST_TO_SERVER, 0, null, generateRandomString(seqLength))); + msg.setBody(generateSyncRRWemqMsg()); + return msg; + } + + public static Package asyncRR() { + Package msg = new Package(); + msg.setHeader(new Header(Command.REQUEST_TO_SERVER, 0, null, generateRandomString(seqLength))); + msg.setBody(generateAsyncRRWemqMsg()); + return msg; + } + + public static Package asyncMessage() { + Package msg = new Package(); + msg.setHeader(new Header(Command.ASYNC_MESSAGE_TO_SERVER, 0, null, generateRandomString(seqLength))); + msg.setBody(generateAsyncEventWemqMsg()); + return msg; + } + + public static Package broadcastMessage() { + Package msg = new Package(); + msg.setHeader(new Header(Command.BROADCAST_MESSAGE_TO_SERVER, 0, null, generateRandomString(seqLength))); + msg.setBody(generateBroadcastWemqMsg()); + return msg; + } + + public static Package rrResponse(Package request) { + Package msg = new Package(); + msg.setHeader(new Header(RESPONSE_TO_SERVER, 0, null, generateRandomString(seqLength))); + msg.setBody(request.getBody()); + return msg; + } + + private static AccessMessage generateSyncRRWemqMsg() { + AccessMessage wemqMsg = new AccessMessage(); + wemqMsg.setTopic(TOPIC_PRX_SyncSubscribeTest); + wemqMsg.getProperties().put("msgType", "persistent"); + wemqMsg.getProperties().put("TTL", "300000"); + wemqMsg.getProperties().put("KEYS", generateRandomString(16)); + wemqMsg.setBody("testSyncRR"); + return wemqMsg; + } + + + private static AccessMessage generateAsyncRRWemqMsg() { + AccessMessage wemqMsg = new AccessMessage(); + wemqMsg.setTopic(TOPIC_PRX_SyncSubscribeTest); + wemqMsg.getProperties().put("REPLY_TO", "10.36.0.109@ProducerGroup-producerPool-9-access#V1_4_0#CI"); + wemqMsg.getProperties().put("TTL", "300000"); + wemqMsg.getProperties().put("PROPERTY_MESSAGE_REPLY_TO", "notnull"); + wemqMsg.setBody("testAsyncRR"); + return wemqMsg; + } + + private static AccessMessage generateAsyncEventWemqMsg() { + AccessMessage wemqMsg = new AccessMessage(); + wemqMsg.setTopic(TOPIC_PRX_WQ2ClientUniCast); + wemqMsg.getProperties().put("REPLY_TO", "10.36.0.109@ProducerGroup-producerPool-9-access#V1_4_0#CI"); + wemqMsg.getProperties().put("TTL", "30000"); + wemqMsg.getProperties().put("PROPERTY_MESSAGE_REPLY_TO", "notnull"); + wemqMsg.setBody("testAsyncMessage"); + return wemqMsg; + } + + private static AccessMessage generateBroadcastWemqMsg() { + AccessMessage wemqMsg = new AccessMessage(); + wemqMsg.setTopic(TOPIC_PRX_WQ2ClientBroadCast); + wemqMsg.getProperties().put("REPLY_TO", "10.36.0.109@ProducerGroup-producerPool-9-access#V1_4_0#CI"); + wemqMsg.getProperties().put("TTL", "30000"); + wemqMsg.getProperties().put("PROPERTY_MESSAGE_REPLY_TO", "notnull"); + wemqMsg.setBody("testAsyncMessage"); + return wemqMsg; + } + + private static String generateRandomString(int length) { + StringBuilder builder = new StringBuilder(length); + for (int i = 0; i < length; i++) { + builder.append((char) ThreadLocalRandom.current().nextInt(48, 57)); + } + return builder.toString(); + } +} diff --git a/eventmesh-test/src/main/java/com/webank/eventmesh/client/tcp/demo/AsyncPublish.java b/eventmesh-test/src/main/java/com/webank/eventmesh/client/tcp/demo/AsyncPublish.java new file mode 100644 index 0000000000..5c41901d3f --- /dev/null +++ b/eventmesh-test/src/main/java/com/webank/eventmesh/client/tcp/demo/AsyncPublish.java @@ -0,0 +1,49 @@ +package com.webank.eventmesh.client.tcp.demo; + +import com.webank.eventmesh.client.tcp.WemqAccessClient; +import com.webank.eventmesh.client.tcp.common.AccessTestUtils; +import com.webank.eventmesh.client.tcp.common.WemqAccessCommon; +import com.webank.eventmesh.client.tcp.impl.DefaultWemqAccessClient; +import com.webank.eventmesh.common.protocol.tcp.Package; +import com.webank.eventmesh.common.protocol.tcp.UserAgent; +import com.webank.eventmesh.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +public class AsyncPublish{ + + public static Logger logger = LoggerFactory.getLogger(AsyncPublish.class); + + private static WemqAccessClient client; + + public static AsyncPublish handler = new AsyncPublish(); + + + public static void main(String[] agrs)throws Exception{ + Properties properties = Utils.readPropertiesFile("application.properties"); + final String eventMeshIp = properties.getProperty("eventmesh.ip"); + final int eventMeshTcpPort = Integer.parseInt(properties.getProperty("eventmesh.tcp.port")); + try{ + UserAgent userAgent = AccessTestUtils.generateClient1(); + client = new DefaultWemqAccessClient(eventMeshIp,eventMeshTcpPort,userAgent); + client.init(); + client.heartbeat(); + + for(int i=0; i < 5; i++) { + Package asyncMsg = AccessTestUtils.asyncMessage(); + logger.info("begin send async msg[{}]==================={}", i, asyncMsg); + client.publish(asyncMsg, WemqAccessCommon.DEFAULT_TIME_OUT_MILLS); + + Thread.sleep(1000); + } + + Thread.sleep(2000); + //退出,销毁资源 +// client.close(); + }catch (Exception e){ + logger.warn("AsyncPublish failed", e); + } + } +} diff --git a/eventmesh-test/src/main/java/com/webank/eventmesh/client/tcp/demo/AsyncPublishBroadcast.java b/eventmesh-test/src/main/java/com/webank/eventmesh/client/tcp/demo/AsyncPublishBroadcast.java new file mode 100644 index 0000000000..8ddd896fd1 --- /dev/null +++ b/eventmesh-test/src/main/java/com/webank/eventmesh/client/tcp/demo/AsyncPublishBroadcast.java @@ -0,0 +1,42 @@ +package com.webank.eventmesh.client.tcp.demo; + +import com.webank.eventmesh.client.tcp.WemqAccessClient; +import com.webank.eventmesh.client.tcp.common.AccessTestUtils; +import com.webank.eventmesh.client.tcp.common.WemqAccessCommon; +import com.webank.eventmesh.client.tcp.impl.DefaultWemqAccessClient; +import com.webank.eventmesh.common.protocol.tcp.Package; +import com.webank.eventmesh.common.protocol.tcp.UserAgent; +import com.webank.eventmesh.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +public class AsyncPublishBroadcast { + + public static Logger logger = LoggerFactory.getLogger(AsyncPublishBroadcast.class); + + private static WemqAccessClient client; + + public static void main(String[] agrs)throws Exception{ + Properties properties = Utils.readPropertiesFile("application.properties"); + final String eventMeshIp = properties.getProperty("eventmesh.ip"); + final int eventMeshTcpPort = Integer.parseInt(properties.getProperty("eventmesh.tcp.port")); + try{ + UserAgent userAgent = AccessTestUtils.generateClient1(); + client = new DefaultWemqAccessClient(eventMeshIp,eventMeshTcpPort,userAgent); + client.init(); + client.heartbeat(); + + Package broadcastMsg = AccessTestUtils.broadcastMessage(); + logger.info("begin send broadcast msg============={}", broadcastMsg); + client.broadcast(broadcastMsg, WemqAccessCommon.DEFAULT_TIME_OUT_MILLS); + + Thread.sleep(2000); + //退出,销毁资源 +// client.close(); + }catch (Exception e){ + logger.warn("AsyncPublishBroadcast failed", e); + } + } +} diff --git a/eventmesh-test/src/main/java/com/webank/eventmesh/client/tcp/demo/AsyncSubscribe.java b/eventmesh-test/src/main/java/com/webank/eventmesh/client/tcp/demo/AsyncSubscribe.java new file mode 100644 index 0000000000..580e884c1a --- /dev/null +++ b/eventmesh-test/src/main/java/com/webank/eventmesh/client/tcp/demo/AsyncSubscribe.java @@ -0,0 +1,54 @@ +package com.webank.eventmesh.client.tcp.demo; + +import com.webank.eventmesh.client.tcp.WemqAccessClient; +import com.webank.eventmesh.client.tcp.common.AccessTestUtils; +import com.webank.eventmesh.client.tcp.common.ReceiveMsgHook; +import com.webank.eventmesh.client.tcp.impl.DefaultWemqAccessClient; +import com.webank.eventmesh.common.protocol.tcp.AccessMessage; +import com.webank.eventmesh.common.protocol.tcp.Package; +import com.webank.eventmesh.common.protocol.tcp.UserAgent; +import com.webank.eventmesh.util.Utils; +import io.netty.channel.ChannelHandlerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +public class AsyncSubscribe implements ReceiveMsgHook { + + public static Logger logger = LoggerFactory.getLogger(AsyncSubscribe.class); + + private static WemqAccessClient client; + + public static AsyncSubscribe handler = new AsyncSubscribe(); + + public static void main(String[] agrs)throws Exception{ + Properties properties = Utils.readPropertiesFile("application.properties"); + final String eventMeshIp = properties.getProperty("eventmesh.ip"); + final int eventMeshTcpPort = Integer.parseInt(properties.getProperty("eventmesh.tcp.port")); + try{ + UserAgent userAgent = AccessTestUtils.generateClient2(); + client = new DefaultWemqAccessClient(eventMeshIp,eventMeshTcpPort,userAgent); + client.init(); + client.heartbeat(); + + client.subscribe("FT0-e-80010000-01-1"); + client.registerSubBusiHandler(handler); + + client.listen(); + + //client.unsubscribe(); + + //退出,销毁资源 +// client.close(); + }catch (Exception e){ + logger.warn("AsyncSubscribe failed", e); + } + } + + @Override + public void handle(Package msg, ChannelHandlerContext ctx) { + AccessMessage accessMessage = (AccessMessage)msg.getBody(); + logger.info("receive async msg====================={}", accessMessage); + } +} diff --git a/eventmesh-test/src/main/java/com/webank/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java b/eventmesh-test/src/main/java/com/webank/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java new file mode 100644 index 0000000000..23b2425b22 --- /dev/null +++ b/eventmesh-test/src/main/java/com/webank/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java @@ -0,0 +1,54 @@ +package com.webank.eventmesh.client.tcp.demo; + +import com.webank.eventmesh.client.tcp.WemqAccessClient; +import com.webank.eventmesh.client.tcp.common.AccessTestUtils; +import com.webank.eventmesh.client.tcp.common.ReceiveMsgHook; +import com.webank.eventmesh.client.tcp.impl.DefaultWemqAccessClient; +import com.webank.eventmesh.common.protocol.tcp.AccessMessage; +import com.webank.eventmesh.common.protocol.tcp.Package; +import com.webank.eventmesh.common.protocol.tcp.UserAgent; +import com.webank.eventmesh.util.Utils; +import io.netty.channel.ChannelHandlerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +public class AsyncSubscribeBroadcast implements ReceiveMsgHook { + + public static Logger logger = LoggerFactory.getLogger(AsyncSubscribeBroadcast.class); + + private static WemqAccessClient client; + + public static AsyncSubscribeBroadcast handler = new AsyncSubscribeBroadcast(); + + public static void main(String[] agrs)throws Exception{ + Properties properties = Utils.readPropertiesFile("application.properties"); + final String eventMeshIp = properties.getProperty("eventmesh.ip"); + final int eventMeshTcpPort = Integer.parseInt(properties.getProperty("eventmesh.tcp.port")); + try{ + UserAgent userAgent = AccessTestUtils.generateClient2(); + client = new DefaultWemqAccessClient(eventMeshIp,eventMeshTcpPort,userAgent); + client.init(); + client.heartbeat(); + + client.subscribe("FT0-e-80030001-01-3"); + client.registerSubBusiHandler(handler); + + client.listen(); + + //client.unsubscribe(); + + //退出,销毁资源 +// client.close(); + }catch (Exception e){ + logger.warn("AsyncSubscribeBroadcast failed", e); + } + } + + @Override + public void handle(Package msg, ChannelHandlerContext ctx) { + AccessMessage accessMessage = (AccessMessage)msg.getBody(); + logger.info("receive broadcast msg==============={}", accessMessage); + } +} diff --git a/eventmesh-test/src/main/java/com/webank/eventmesh/client/tcp/demo/SyncRequest.java b/eventmesh-test/src/main/java/com/webank/eventmesh/client/tcp/demo/SyncRequest.java new file mode 100644 index 0000000000..943fb0fe12 --- /dev/null +++ b/eventmesh-test/src/main/java/com/webank/eventmesh/client/tcp/demo/SyncRequest.java @@ -0,0 +1,36 @@ +package com.webank.eventmesh.client.tcp.demo; + +import com.webank.eventmesh.client.tcp.WemqAccessClient; +import com.webank.eventmesh.client.tcp.common.AccessTestUtils; +import com.webank.eventmesh.client.tcp.common.WemqAccessCommon; +import com.webank.eventmesh.client.tcp.impl.DefaultWemqAccessClient; +import com.webank.eventmesh.common.protocol.tcp.Package; +import com.webank.eventmesh.common.protocol.tcp.UserAgent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SyncRequest { + + public static Logger logger = LoggerFactory.getLogger(SyncRequest.class); + + private static WemqAccessClient client; + + public static void main(String[] agrs)throws Exception{ + try{ + UserAgent userAgent = AccessTestUtils.generateClient1(); + client = new DefaultWemqAccessClient("127.0.0.1",10000,userAgent); + client.init(); + client.heartbeat(); + + Package rrMsg = AccessTestUtils.syncRR(); + logger.info("begin send rr msg=================={}",rrMsg); + Package response = client.rr(rrMsg, WemqAccessCommon.DEFAULT_TIME_OUT_MILLS); + logger.info("receive rr reply==================={}",response); + + //退出,销毁资源 +// client.close(); + }catch (Exception e){ + logger.warn("SyncRequest failed", e); + } + } +} diff --git a/eventmesh-test/src/main/java/com/webank/eventmesh/client/tcp/demo/SyncResponse.java b/eventmesh-test/src/main/java/com/webank/eventmesh/client/tcp/demo/SyncResponse.java new file mode 100644 index 0000000000..9bece8e11b --- /dev/null +++ b/eventmesh-test/src/main/java/com/webank/eventmesh/client/tcp/demo/SyncResponse.java @@ -0,0 +1,49 @@ +package com.webank.eventmesh.client.tcp.demo; + +import com.webank.eventmesh.client.tcp.WemqAccessClient; +import com.webank.eventmesh.client.tcp.common.AccessTestUtils; +import com.webank.eventmesh.client.tcp.common.ReceiveMsgHook; +import com.webank.eventmesh.client.tcp.impl.DefaultWemqAccessClient; +import com.webank.eventmesh.common.protocol.tcp.Package; +import com.webank.eventmesh.common.protocol.tcp.UserAgent; +import io.netty.channel.ChannelHandlerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SyncResponse implements ReceiveMsgHook { + + public static Logger logger = LoggerFactory.getLogger(SyncResponse.class); + + private static WemqAccessClient client; + + public static SyncResponse handler = new SyncResponse(); + + public static void main(String[] agrs)throws Exception{ + try{ + UserAgent userAgent = AccessTestUtils.generateClient2(); + client = new DefaultWemqAccessClient("127.0.0.1",10000,userAgent); + client.init(); + client.heartbeat(); + + client.subscribe("FT0-s-80000000-01-0"); + //同步RR消息 + client.registerSubBusiHandler(handler); + + client.listen(); + + //client.unsubscribe(); + + //退出,销毁资源 +// client.close(); + }catch (Exception e){ + logger.warn("SyncResponse failed", e); + } + } + + @Override + public void handle(Package msg, ChannelHandlerContext ctx) { + logger.info("receive sync rr msg================{}", msg); + Package pkg = AccessTestUtils.rrResponse(msg); + ctx.writeAndFlush(pkg); + } +} diff --git a/eventmesh-test/src/main/java/com/webank/eventmesh/util/Utils.java b/eventmesh-test/src/main/java/com/webank/eventmesh/util/Utils.java new file mode 100644 index 0000000000..1e4450ede1 --- /dev/null +++ b/eventmesh-test/src/main/java/com/webank/eventmesh/util/Utils.java @@ -0,0 +1,91 @@ +package com.webank.eventmesh.util; + +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.Resource; +import org.springframework.core.io.support.PropertiesLoaderUtils; + +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.Enumeration; +import java.util.Properties; + +public class Utils { + + /** + * 获取本地IP地址 + * + * @throws SocketException + */ + public static String getLocalIP() throws UnknownHostException, SocketException { + if (isWindowsOS()) { + return InetAddress.getLocalHost().getHostAddress(); + } else { + return getLinuxLocalIp(); + } + } + + /** + * 判断操作系统是否是Windows + * + * @return + */ + public static boolean isWindowsOS() { + boolean isWindowsOS = false; + // JRE (runtime)system,not OS + String osName = System.getProperty("os.name"); + if (osName.toLowerCase().contains("windows")) { + isWindowsOS = true; + } + return isWindowsOS; + } + + /** + * 获取Linux下的IP地址 + * + * @return IP地址 + * @throws SocketException + */ + private static String getLinuxLocalIp() throws SocketException { + String ip = ""; + try { + for (Enumeration en = NetworkInterface.getNetworkInterfaces(); en.hasMoreElements();) { + NetworkInterface intf = en.nextElement(); + String name = intf.getName(); + if (!name.contains("docker") && !name.contains("lo")) { + for (Enumeration enumIpAddr = intf.getInetAddresses(); enumIpAddr.hasMoreElements();) { + InetAddress inetAddress = enumIpAddr.nextElement(); + if (!inetAddress.isLoopbackAddress()) { + String ipaddress = inetAddress.getHostAddress().toString(); + if (!ipaddress.contains("::") && !ipaddress.contains("0:0:") + && !ipaddress.contains("fe80")) { + ip = ipaddress; + } + } + } + } + } + } catch (SocketException ex) { + ip = "127.0.0.1"; + ex.printStackTrace(); + } + return ip; + } + + /** + * @param fileName + * @return + */ + public static Properties readPropertiesFile(String fileName) { + try { + Resource resource = new ClassPathResource(fileName); + Properties props = PropertiesLoaderUtils.loadProperties(resource); + return props; + } catch (Exception e) { + e.printStackTrace(); + } + return null; + } + +} diff --git a/settings.gradle b/settings.gradle index 648bb30264..3505e33f62 100644 --- a/settings.gradle +++ b/settings.gradle @@ -24,4 +24,5 @@ include 'eventmesh-sdk-java' include 'eventmesh-common' include 'eventmesh-connector-api' include 'eventmesh-starter' +include 'eventmesh-test'