From 5fbeebd3db4a9d30ec0c5232a3e7b0a718ac85bf Mon Sep 17 00:00:00 2001 From: Trevor Date: Wed, 8 May 2024 18:15:38 -0700 Subject: [PATCH] Fix cron distributor and add debugging to listDevices pagination (#882) --- .../access/ClearBladeIotAccessProvider.java | 62 ++++++++++--------- .../bos/udmi/service/core/CronProcessor.java | 5 +- 2 files changed, 36 insertions(+), 31 deletions(-) diff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/ClearBladeIotAccessProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/access/ClearBladeIotAccessProvider.java index 53d2eca9c4..7e04385090 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/access/ClearBladeIotAccessProvider.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/access/ClearBladeIotAccessProvider.java @@ -346,10 +346,31 @@ private HashMap fetchDevices(String deviceRegistryId, String .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); collect.putAll(responseMap); pageToken = response.getNextPageToken(); + debug(format("fetchDevices %s found %d total %d more %s", deviceRegistryId, + responseMap.size(), collect.size(), pageToken != null)); } while (pageToken != null); return collect; } + private List findGateways(String registryId, Device proxyDevice) { + CloudModel cloudModel = listDevices(registryId); + List gateways = + cloudModel.device_ids.entrySet().stream().filter(entry -> + entry.getValue().resource_type == GATEWAY).map(Entry::getKey).toList(); + return gateways; + } + + private CloudModel findUnbindAndDelete(String registryId, Device device) { + List allGateways = findGateways(registryId, device); + if (allGateways.isEmpty()) { + throw new RuntimeException("Was expecting at least one bound gateway!"); + } + String deviceId = device.toBuilder().getId(); + info("Unbinding %s/%s from gateways: " + registryId, deviceId, CSV_JOINER.join(allGateways)); + allGateways.forEach(gatewayId -> unbindDevice(registryId, gatewayId, deviceId)); + return unbindAndDeleteCore(registryId, device); + } + private String getDeviceName(String registryId, String deviceId) { return DeviceName.of(projectId, getRegistryLocation(registryId), registryId, deviceId) .toString(); @@ -436,6 +457,17 @@ private CloudModel modelRegistry(String registryId, String deviceId, CloudModel } } + private CloudModel modifyDevice(String registryId, Device device) { + Device.Builder builder = device.toBuilder(); + String deviceId = builder.getId(); + CloudModel model = fetchDevice(registryId, deviceId); + model.metadata.putAll(builder.getMetadata()); + builder.setMetadata(model.metadata); + CloudModel cloudModel = updateDevice(registryId, builder.build(), METADATA_FIELD_MASK); + cloudModel.operation = Operation.MODIFY; + return cloudModel; + } + private StateNotificationConfig stateNotificationConfig() { String topicName = getScopedTopic(UDMI_STATE_TOPIC); return StateNotificationConfig.newBuilder().setPubsubTopicName(topicName).build(); @@ -485,25 +517,6 @@ private void unbindDevice(String registryId, String gatewayId, String proxyId) { } } - private CloudModel findUnbindAndDelete(String registryId, Device device) { - List allGateways = findGateways(registryId, device); - if (allGateways.isEmpty()) { - throw new RuntimeException("Was expecting at least one bound gateway!"); - } - String deviceId = device.toBuilder().getId(); - info("Unbinding %s/%s from gateways: " + registryId, deviceId, CSV_JOINER.join(allGateways)); - allGateways.forEach(gatewayId -> unbindDevice(registryId, gatewayId, deviceId)); - return unbindAndDeleteCore(registryId, device); - } - - private List findGateways(String registryId, Device proxyDevice) { - CloudModel cloudModel = listDevices(registryId); - List gateways = - cloudModel.device_ids.entrySet().stream().filter(entry -> - entry.getValue().resource_type == GATEWAY).map(Entry::getKey).toList(); - return gateways; - } - private void unbindGatewayDevices(String registryId, Device gatewayDevice) { String gatewayId = gatewayDevice.toBuilder().getId(); CloudModel cloudModel = listRegistryDevices(registryId, gatewayId); @@ -515,17 +528,6 @@ private void unbindGatewayDevices(String registryId, Device gatewayDevice) { } } - private CloudModel modifyDevice(String registryId, Device device) { - Device.Builder builder = device.toBuilder(); - String deviceId = builder.getId(); - CloudModel model = fetchDevice(registryId, deviceId); - model.metadata.putAll(builder.getMetadata()); - builder.setMetadata(model.metadata); - CloudModel cloudModel = updateDevice(registryId, builder.build(), METADATA_FIELD_MASK); - cloudModel.operation = Operation.MODIFY; - return cloudModel; - } - private CloudModel updateDevice(String registryId, Device device) { CloudModel cloudModel = updateDevice(registryId, device, UPDATE_FIELD_MASK); cloudModel.operation = Operation.UPDATE; diff --git a/udmis/src/main/java/com/google/bos/udmi/service/core/CronProcessor.java b/udmis/src/main/java/com/google/bos/udmi/service/core/CronProcessor.java index 366cf4ade4..37414bd2af 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/core/CronProcessor.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/core/CronProcessor.java @@ -1,5 +1,6 @@ package com.google.bos.udmi.service.core; +import static com.google.common.base.Preconditions.checkState; import static com.google.udmi.util.GeneralUtils.CSV_JOINER; import static com.google.udmi.util.GeneralUtils.ifNotNullThen; import static com.google.udmi.util.GeneralUtils.ifTrueGet; @@ -72,7 +73,9 @@ public CronProcessor(EndpointConfiguration config) { } private static String getContainerId(Envelope envelope) { - return envelope.gatewayId.split(PATH_SEPARATOR, 2)[0]; + String[] split = envelope.gatewayId.split(ID_SEPARATOR, 2); + checkState(split.length == 2, "malformed container id"); + return split[0]; } @Override