diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/TypeUtils.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/TypeUtils.java new file mode 100644 index 0000000000..bac2365a81 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/TypeUtils.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.utils; + +import java.util.HashSet; +import java.util.Set; + +import lombok.experimental.UtilityClass; + +@UtilityClass +public class TypeUtils { + + public static Set castSet(Object obj, Class clazz) { + Set result = new HashSet<>(); + if (obj instanceof Set) { + for (Object o : (Set) obj) { + result.add(clazz.cast(o)); + } + return result; + } + return null; + } +} diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/utils/TypeUtilsTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/utils/TypeUtilsTest.java new file mode 100644 index 0000000000..fcd83642ec --- /dev/null +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/utils/TypeUtilsTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.utils; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TypeUtilsTest { + + @Test + public void testCastSet() { + Set set = new HashSet<>(Arrays.asList("1", "2", "3")); + Set newSet = TypeUtils.castSet(set, String.class); + for (String s : set) { + Assertions.assertTrue(newSet.contains(s)); + } + Assertions.assertEquals(set.size(), newSet.size()); + } +} diff --git a/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusGrpcExporter.java b/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusGrpcExporter.java index c4500de05a..7838753fb1 100644 --- a/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusGrpcExporter.java +++ b/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusGrpcExporter.java @@ -61,6 +61,7 @@ public static void export(final String meterName, final GrpcSummaryMetrics summa final Meter meter = GlobalMeterProvider.getMeter(meterName); paramPairs.forEach( - (metricInfo, getMetric) -> observeOfValue(meter, METRICS_GRPC_PREFIX + metricInfo[0], metricInfo[1], GRPC, summaryMetrics, getMetric)); + (metricInfo, getMetric) -> observeOfValue(meter, METRICS_GRPC_PREFIX + metricInfo[0], metricInfo[1], + GRPC, summaryMetrics, getMetric, GrpcSummaryMetrics.class)); } } diff --git a/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusHttpExporter.java b/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusHttpExporter.java index d8b5181f04..bb14e321d9 100644 --- a/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusHttpExporter.java +++ b/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusHttpExporter.java @@ -178,7 +178,8 @@ public class PrometheusHttpExporter { public void export(String name, HttpSummaryMetrics summaryMetrics) { Meter meter = GlobalMeterProvider.getMeter(name); - paramPairs.forEach((metricInfo, getMetric) -> observeOfValue(meter, metricInfo[0], metricInfo[1], HTTP, summaryMetrics, getMetric)); + paramPairs.forEach((metricInfo, getMetric) -> observeOfValue(meter, metricInfo[0], metricInfo[1], + HTTP, summaryMetrics, getMetric, HttpSummaryMetrics.class)); } } diff --git a/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusTcpExporter.java b/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusTcpExporter.java index e468552e0d..d615566e1f 100644 --- a/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusTcpExporter.java +++ b/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusTcpExporter.java @@ -81,6 +81,7 @@ public class PrometheusTcpExporter { public void export(final String meterName, final TcpSummaryMetrics summaryMetrics) { final Meter meter = GlobalMeterProvider.getMeter(meterName); paramPairs.forEach( - (metricInfo, getMetric) -> observeOfValue(meter, METRICS_TCP_PREFIX + metricInfo[0], metricInfo[1], TCP, summaryMetrics, getMetric)); + (metricInfo, getMetric) -> observeOfValue(meter, METRICS_TCP_PREFIX + metricInfo[0], metricInfo[1], + TCP, summaryMetrics, getMetric, TcpSummaryMetrics.class)); } } diff --git a/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/utils/PrometheusExporterUtils.java b/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/utils/PrometheusExporterUtils.java index e387e0d2fd..3f43c908c2 100644 --- a/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/utils/PrometheusExporterUtils.java +++ b/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/utils/PrometheusExporterUtils.java @@ -43,21 +43,21 @@ public class PrometheusExporterUtils { * @param getMetric */ @SneakyThrows - public static void observeOfValue(Meter meter, String metricName, String metricDesc, String protocol, - Metric summaryMetrics, Function getMetric) { + public static void observeOfValue(Meter meter, String metricName, String metricDesc, String protocol, + Metric summaryMetrics, Function getMetric, Class clazz) { Method method = getMetric.getClass().getMethod("apply", Object.class); - Class metricType = (Class) method.getGenericReturnType(); + Class metricType = (Class) method.getGenericReturnType(); if (metricType == Long.class) { meter.longValueObserverBuilder(metricName) .setDescription(metricDesc) .setUnit(protocol) - .setUpdater(result -> result.observe((long) getMetric.apply(summaryMetrics), Labels.empty())) + .setUpdater(result -> result.observe((long) getMetric.apply(clazz.cast(summaryMetrics)), Labels.empty())) .build(); } else if (metricType == Double.class) { meter.doubleValueObserverBuilder(metricName) .setDescription(metricDesc) .setUnit(protocol) - .setUpdater(result -> result.observe((double) getMetric.apply(summaryMetrics), Labels.empty())) + .setUpdater(result -> result.observe((double) getMetric.apply(clazz.cast(summaryMetrics)), Labels.empty())) .build(); } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/HttpRequestProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/HttpRequestProcessor.java index edbc0ca35b..570f11c998 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/HttpRequestProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/HttpRequestProcessor.java @@ -48,14 +48,14 @@ default boolean rejectRequest() { return false; } - default void completeResponse(HttpCommand req, AsyncContext asyncContext, + default void completeResponse(HttpCommand req, AsyncContext asyncContext, T respHeader, EventMeshRetCode emCode, String msg, Class clazz) { try { Method method = clazz.getMethod("buildBody", Integer.class, String.class); Object o = method.invoke(null, emCode.getRetCode(), StringUtils.isNotBlank(msg) ? msg : emCode.getErrMsg()); - HttpCommand response = req.createHttpCommandResponse(respHeader, (E) o); + HttpCommand response = req.createHttpCommandResponse(respHeader, (Body) o); asyncContext.onComplete(response); } catch (Exception e) { log.error("response failed", e); diff --git a/eventmesh-security-plugin/eventmesh-security-auth-token/src/main/java/org/apache/eventmesh/auth/token/impl/auth/AuthTokenUtils.java b/eventmesh-security-plugin/eventmesh-security-auth-token/src/main/java/org/apache/eventmesh/auth/token/impl/auth/AuthTokenUtils.java index 68dca1a2c0..8ea524acd7 100644 --- a/eventmesh-security-plugin/eventmesh-security-auth-token/src/main/java/org/apache/eventmesh/auth/token/impl/auth/AuthTokenUtils.java +++ b/eventmesh-security-plugin/eventmesh-security-auth-token/src/main/java/org/apache/eventmesh/auth/token/impl/auth/AuthTokenUtils.java @@ -21,6 +21,7 @@ import org.apache.eventmesh.api.exception.AclException; import org.apache.eventmesh.common.config.CommonConfiguration; import org.apache.eventmesh.common.utils.ConfigurationContextUtil; +import org.apache.eventmesh.common.utils.TypeUtils; import org.apache.commons.lang3.StringUtils; @@ -136,14 +137,15 @@ public static boolean authAccess(AclProperties aclProperties) { String topic = aclProperties.getTopic(); - Set groupTopics = (Set) aclProperties.getExtendedField("topics"); + Object topics = aclProperties.getExtendedField("topics"); - if (groupTopics.contains(topic)) { - return true; - } else { - return false; + if (!(topics instanceof Set)) { + throw new RuntimeException("abc"); } + Set groupTopics = TypeUtils.castSet(topics, String.class); + + return groupTopics.contains(topic); } } diff --git a/eventmesh-storage-plugin/eventmesh-storage-rocketmq/src/main/java/org/apache/eventmesh/storage/rocketmq/consumer/PushConsumerImpl.java b/eventmesh-storage-plugin/eventmesh-storage-rocketmq/src/main/java/org/apache/eventmesh/storage/rocketmq/consumer/PushConsumerImpl.java index 8768a35421..d332a6c7b3 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-rocketmq/src/main/java/org/apache/eventmesh/storage/rocketmq/consumer/PushConsumerImpl.java +++ b/eventmesh-storage-plugin/eventmesh-storage-rocketmq/src/main/java/org/apache/eventmesh/storage/rocketmq/consumer/PushConsumerImpl.java @@ -145,6 +145,7 @@ public void unsubscribe(String topic) { } } + @SuppressWarnings("deprecation") public void updateOffset(List cloudEvents, AbstractContext context) { ConsumeMessageService consumeMessageService = rocketmqPushConsumer .getDefaultMQPushConsumerImpl().getConsumeMessageService();