Skip to content

Commit

Permalink
Add topic filter for event WebSocket
Browse files Browse the repository at this point in the history
Signed-off-by: Florian Hotze <dev@florianhotze.com>
  • Loading branch information
florian-h05 committed Jan 9, 2025
1 parent f00c770 commit 0ee3dd8
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.lang.reflect.Type;
import java.util.List;
import java.util.Objects;
import java.util.regex.Pattern;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
Expand All @@ -29,6 +30,7 @@
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.openhab.core.events.Event;
import org.openhab.core.events.EventPublisher;
import org.openhab.core.events.TopicEventFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,6 +42,7 @@
* The {@link EventWebSocket} is the WebSocket implementation that extends the event bus
*
* @author Jan N. Klug - Initial contribution
* @author Florian Hotze - Add topic filter
*/
@WebSocket
@NonNullByDefault
Expand All @@ -49,6 +52,7 @@ public class EventWebSocket {
public static final String WEBSOCKET_TOPIC_PREFIX = "openhab/websocket/";

private static final Type STRING_LIST_TYPE = TypeToken.getParameterized(List.class, String.class).getType();
private static final Pattern TOPIC_VALIDATE_PATTERN = Pattern.compile("^(\\w*\\*?\\/?)+$");

private final Logger logger = LoggerFactory.getLogger(EventWebSocket.class);

Expand All @@ -63,6 +67,7 @@ public class EventWebSocket {

private List<String> typeFilter = List.of();
private List<String> sourceFilter = List.of();
private @Nullable TopicEventFilter topicFilter = null;

public EventWebSocket(Gson gson, EventWebSocketAdapter wsAdapter, ItemEventUtility itemEventUtility,
EventPublisher eventPublisher) {
Expand Down Expand Up @@ -148,6 +153,25 @@ public void onText(String message) {
remoteEndpoint.getInetSocketAddress(), typeFilter);
responseEvent = new EventDTO(WEBSOCKET_EVENT_TYPE, WEBSOCKET_TOPIC_PREFIX + "filter/source",
eventDTO.payload, null, eventDTO.eventId);
} else if ((WEBSOCKET_TOPIC_PREFIX + "filter/topic").equals(eventDTO.topic)) {
List<String> topics = Objects
.requireNonNullElse(gson.fromJson(eventDTO.payload, STRING_LIST_TYPE), List.of());
for (String topic : topics) {
if (!TOPIC_VALIDATE_PATTERN.matcher(topic).matches()) {
throw new EventProcessingException(
"Invalid topic '" + topic + "' in topic filter WebSocketEvent");
}
}
// convert to regex: replace any wildcard (*) with the regex pattern (.*)
topics = topics.stream().map(t -> t.trim().replace("*", ".*") + "$").toList();
// create topic filter if topic list not empty
if (!topics.isEmpty()) {
topicFilter = new TopicEventFilter(topics);
}
logger.debug("Setting topic filter for connection to {}: {}",
remoteEndpoint.getInetSocketAddress(), topics);
responseEvent = new EventDTO(WEBSOCKET_EVENT_TYPE, WEBSOCKET_TOPIC_PREFIX + "filter/topic",
eventDTO.payload, null, eventDTO.eventId);
} else {
throw new EventProcessingException("Invalid topic or payload in WebSocketEvent");
}
Expand Down Expand Up @@ -195,7 +219,8 @@ public void processEvent(Event event) {
try {
String source = event.getSource();
if ((source == null || !sourceFilter.contains(event.getSource()))
&& (typeFilter.isEmpty() || typeFilter.contains(event.getType()))) {
&& (typeFilter.isEmpty() || typeFilter.contains(event.getType()))
&& (topicFilter == null || topicFilter.apply(event))) {
sendMessage(gson.toJson(new EventDTO(event)));
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,25 @@
*/
package org.openhab.core.events;

import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;

/**
* The {@link TopicEventFilter} is a default openHAB {@link EventFilter} implementation that ensures filtering
* of events based on an event topic.
* of events based on a single event topic or multiple event topics.
*
* @author Stefan Bußweiler - Initial contribution
* @author Florian Hotze - Add support for filtering of events by multiple event topics
*/
@NonNullByDefault
public class TopicEventFilter implements EventFilter {

private final Pattern topicRegex;
private final @Nullable Pattern topicRegex;
private final List<Pattern> topicsRegexes = new ArrayList<>();

/**
* Constructs a new topic event filter.
Expand All @@ -38,8 +43,28 @@ public TopicEventFilter(String topicRegex) {
this.topicRegex = Pattern.compile(topicRegex);
}

/**
* Constructs a new topic event filter.
*
* @param topicsRegexes the regular expressions of multiple topics
* @see <a href="https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/regex/Pattern.html">Java
* Regex</a>
*/
public TopicEventFilter(List<String> topicsRegexes) {
this.topicRegex = null;
for (String topicRegex : topicsRegexes) {
this.topicsRegexes.add(Pattern.compile(topicRegex));
}
}

@Override
public boolean apply(Event event) {
return topicRegex.matcher(event.getTopic()).matches();
String topic = event.getTopic();
Pattern topicRegex = this.topicRegex;
if (topicRegex != null) {
return topicRegex.matcher(topic).matches();
} else {
return topicsRegexes.stream().anyMatch(p -> p.matcher(topic).matches());
}
}
}

0 comments on commit 0ee3dd8

Please sign in to comment.