Skip to content

Commit

Permalink
[MINOR] Fix plugin cannot load properties from classpath (#763)
Browse files Browse the repository at this point in the history
* [MINOR] Fix plugin cannot load properties from classpath

* Fix callback warning
  • Loading branch information
ruanwenjun authored Feb 14, 2022
1 parent 5ae5d13 commit 67987ab
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,6 @@ public class Constants {

public static final String MESSAGE_PROP_SEPARATOR = "99";

public static final String EVENTMESH_CONF_HOME = System.getProperty("confPath", System.getenv("confPath"));

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.eventmesh.connector.rocketmq.config;

import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants;

import org.apache.commons.lang3.StringUtils;
Expand All @@ -25,7 +26,7 @@
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URL;
import java.io.InputStream;
import java.util.Properties;

import lombok.experimental.UtilityClass;
Expand All @@ -38,27 +39,33 @@ public class ConfigurationWrapper {
private static final Properties properties = new Properties();

static {
String configFile = getConfigFilePath();
log.info("loading config: {}", configFile);
try {
properties.load(new BufferedReader(new FileReader(configFile)));
} catch (IOException e) {
throw new IllegalArgumentException(
String.format("Cannot load RocketMQ configuration file from :%s", configFile));
}
loadProperties();
}

public String getProp(String key) {
return StringUtils.isEmpty(key) ? null : properties.getProperty(key, null);
}

private static String getConfigFilePath() {
// get from classpath
URL resource = ConfigurationWrapper.class.getClassLoader().getResource(EventMeshConstants.EVENTMESH_CONF_FILE);
if (resource != null && new File(resource.getPath()).exists()) {
return resource.getPath();
/**
* Load rocketmq properties file from classpath and conf home.
* The properties defined in conf home will override classpath.
*/
private void loadProperties() {
try (InputStream resourceAsStream = ConfigurationWrapper.class.getResourceAsStream(
File.separator + EventMeshConstants.EVENTMESH_CONF_FILE)) {
if (resourceAsStream != null) {
properties.load(resourceAsStream);
}
} catch (IOException e) {
throw new RuntimeException(String.format("Load %s.properties file from classpath error", EventMeshConstants.EVENTMESH_CONF_FILE));
}
try {
String configPath = Constants.EVENTMESH_CONF_HOME + File.separator + EventMeshConstants.EVENTMESH_CONF_FILE;
if (new File(configPath).exists()) {
properties.load(new BufferedReader(new FileReader(configPath)));
}
} catch (IOException e) {
throw new IllegalArgumentException(String.format("Cannot load %s file from conf", EventMeshConstants.EVENTMESH_CONF_FILE));
}
// get from config home
return EventMeshConstants.EVENTMESH_CONF_HOME + File.separator + EventMeshConstants.EVENTMESH_CONF_FILE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

package org.apache.eventmesh.metrics.opentelemetry.config;

import org.apache.eventmesh.common.Constants;

import org.apache.commons.lang3.StringUtils;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.Properties;

import lombok.experimental.UtilityClass;
Expand Down Expand Up @@ -56,25 +56,25 @@ private void initializeConfig() {
}
}

/**
* Load properties file from classpath and conf home.
* The properties defined in conf home will override classpath.
*/
private void loadProperties() {
URL resource = OpenTelemetryConfiguration.class.getClassLoader().getResource(CONFIG_FILE);
if (resource != null) {
try (InputStream inputStream = resource.openStream()) {
if (inputStream.available() > 0) {
properties.load(new BufferedReader(new InputStreamReader(inputStream)));
}
} catch (IOException e) {
throw new RuntimeException("Load opentelemetry.properties file from classpath error");
try (InputStream resourceAsStream = OpenTelemetryConfiguration.class.getResourceAsStream(File.separator + CONFIG_FILE)) {
if (resourceAsStream != null) {
properties.load(resourceAsStream);
}
} catch (IOException e) {
throw new RuntimeException(String.format("Load %s file from classpath error", CONFIG_FILE));
}
// get from config home
try {
String configPath = System.getProperty("confPath", System.getenv("confPath")) + File.separator + CONFIG_FILE;
String configPath = Constants.EVENTMESH_CONF_HOME + File.separator + CONFIG_FILE;
if (new File(configPath).exists()) {
properties.load(new BufferedReader(new FileReader(configPath)));
}
} catch (IOException e) {
throw new IllegalArgumentException("Cannot load opentelemetry.properties file from conf");
throw new IllegalArgumentException(String.format("Cannot load %s file from conf", CONFIG_FILE));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
# limitations under the License.
#

#prometheusPort
# You can get the export metrics by this port
eventMesh.metrics.prometheus.port=19090
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.metrics.opentelemetry.config;

import org.junit.Assert;
import org.junit.Test;

public class OpenTelemetryConfigurationTest {

@Test
public void getEventMeshPrometheusPort() {
int eventMeshPrometheusPort = OpenTelemetryConfiguration.getEventMeshPrometheusPort();
Assert.assertEquals(19091, eventMeshPrometheusPort);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

eventMesh.metrics.prometheus.port=19091
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ public RRCallbackResponseHandlerAdapter(ProtocolMessage protocolMessage, RRCallb
long timeout) {
Preconditions.checkNotNull(rrCallback, "rrCallback invalid");
Preconditions.checkNotNull(protocolMessage, "message invalid");
if (!(protocolMessage instanceof EventMeshMessage) && !(protocolMessage instanceof CloudEvent)
&& !(protocolMessage instanceof Message)) {
if (!(protocolMessage instanceof EventMeshMessage)
&& !(protocolMessage instanceof CloudEvent)
&& !(protocolMessage instanceof Message)) {
throw new IllegalArgumentException(String.format("ProtocolMessage: %s is not supported", protocolMessage));
}
this.protocolMessage = protocolMessage;
Expand Down Expand Up @@ -95,15 +96,18 @@ public String handleResponse(HttpResponse response) throws IOException {
return protocolMessage.toString();
}

@SuppressWarnings("unchecked")
private ProtocolMessage transformToProtocolMessage(EventMeshRetObj ret) {
// todo: constructor other protocol message, can judge by protocol type in properties
SendMessageResponseBody.ReplyMessage replyMessage = JsonUtils.deserialize(ret.getRetMsg(),
SendMessageResponseBody.ReplyMessage.class);
EventMeshMessage eventMeshMessage = EventMeshMessage.builder()
SendMessageResponseBody.ReplyMessage replyMessage = JsonUtils.deserialize(ret.getRetMsg(), SendMessageResponseBody.ReplyMessage.class);
if (protocolMessage instanceof EventMeshMessage) {
EventMeshMessage eventMeshMessage = EventMeshMessage.builder()
.content(replyMessage.body)
.prop(replyMessage.properties)
.topic(replyMessage.topic)
.build();
return (ProtocolMessage) eventMeshMessage;
return (ProtocolMessage) eventMeshMessage;
}
// todo: constructor other protocol message
throw new RuntimeException("Unsupported callback message type");
}
}

0 comments on commit 67987ab

Please sign in to comment.