Skip to content

Commit

Permalink
Merge pull request #2600 from ControlSystemStudio/alarm_config_stblztn
Browse files Browse the repository at this point in the history
Alarm server  -connect_secs and -stable_secs settings
  • Loading branch information
kasemir authored Mar 17, 2023
2 parents 5d57d5f + a2d8282 commit b20ccda
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 112 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2018 Oak Ridge National Laboratory.
* Copyright (c) 2018-2023 Oak Ridge National Laboratory.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
Expand All @@ -24,26 +24,37 @@
* Eventually, if there are no more resets,
* it will time out.
*
* Timeout can be adjusted on each 'reset'
*
* @author Kay Kasemir
*/
@SuppressWarnings("nls")
public class ResettableTimeout
{
private final long timeout_secs;
private long timeout_secs;

private final ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResettableTimeout"));
private final ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResettableTimeout"));
private final CountDownLatch no_more_messages = new CountDownLatch(1);
private final Runnable signal_no_more_messages = () -> no_more_messages.countDown();
private final AtomicReference<ScheduledFuture<?>> timeout = new AtomicReference<>();

/** @param timeout_secs Seconds after which we time out */
public ResettableTimeout(final long timeout_secs)
{
this.timeout_secs = timeout_secs;
reset();
}
{
this.timeout_secs = timeout_secs;
reset();
}

/** Reset the timer. As long as this is called within the timeout, we keep running
* @param timeout_secs New timeout in seconds
*/
public void reset(final long timeout_secs)
{
this.timeout_secs = timeout_secs;
reset();
}

/** Reset the timer. As long as this is called within the timeout, we keep running */
/** Reset the timer. As long as this is called within the timeout, we keep running */
public void reset()
{
final ScheduledFuture<?> previous = timeout.getAndSet(timer.schedule(signal_no_more_messages, timeout_secs, TimeUnit.SECONDS));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2018 Oak Ridge National Laboratory.
* Copyright (c) 2018-2023 Oak Ridge National Laboratory.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
Expand All @@ -24,7 +24,12 @@
* Not receiving any alarm client updates for a while
* likely means that we have a stable configuration.
*
* <p>This helper awaits such a pause in updates.
* <p>This helper first waits for an initial config message,
* allowing for the connection to take some time.
* Based on past experience, we then receive a flurry of
* config messages.
* By then awaiting a pause in configuration updates,
* we assume that a complete configuration snapshot has been received.
*
* @author Kay Kasemir
* @author Evan Smith
Expand All @@ -34,9 +39,11 @@ public class AlarmConfigMonitor
{
private final AlarmClient client;
private final ResettableTimeout timer;
private final long idle_secs;
private final AtomicInteger updates = new AtomicInteger();

private final AlarmClientListener updateListener = new AlarmClientListener()
/** Listener to messages, resetting timer on config messages */
private final AlarmClientListener config_listener = new AlarmClientListener()
{
@Override
public void serverStateChanged(final boolean alive)
Expand All @@ -50,7 +57,7 @@ public void serverModeChanged(boolean maintenance_mode)
//NOP
}

@Override
@Override
public void serverDisableNotifyChanged(boolean disable_notify)
{
//NOP
Expand All @@ -59,16 +66,16 @@ public void serverDisableNotifyChanged(boolean disable_notify)
@Override
public void itemAdded(final AlarmTreeItem<?> item)
{
// Reset the timer when receiving update
timer.reset();
// Reset the timer when receiving config update
timer.reset(idle_secs);
updates.incrementAndGet();
}

@Override
public void itemRemoved(final AlarmTreeItem<?> item)
{
// Reset the timer when receiving update
timer.reset();
// Reset the timer when receiving config update
timer.reset(idle_secs);
updates.incrementAndGet();
}

Expand All @@ -79,13 +86,15 @@ public void itemUpdated(final AlarmTreeItem<?> item)
}
};

/** @param idle_secs Seconds after which we decide that there's a pause in configuration updates
/** @param initial_secs Seconds to wait for the initial config message (a 'connection' timeout)
* @param idle_secs Seconds after which we decide that there's a pause in configuration updates (assuming we received complete config snapshot)
* @param client AlarmClient to check for a pause in updates
*/
public AlarmConfigMonitor(final long idle_secs, final AlarmClient client)
public AlarmConfigMonitor(final long initial_secs, final long idle_secs, final AlarmClient client)
{
this.client = client;
timer = new ResettableTimeout(idle_secs);
this.idle_secs = idle_secs;
timer = new ResettableTimeout(initial_secs);
}

/** Wait for a pause in configuration updates
Expand All @@ -94,7 +103,7 @@ public AlarmConfigMonitor(final long idle_secs, final AlarmClient client)
*/
public void waitForPauseInUpdates(final long timeout) throws Exception
{
client.addListener(updateListener);
client.addListener(config_listener);
if (! timer.awaitTimeout(timeout))
throw new Exception(timeout + " seconds have passed, I give up waiting for updates to subside.");
// Reset the counter to count any updates received after we decide to continue.
Expand All @@ -110,7 +119,7 @@ public int getCount()
/** Call when no longer interested in checking updates */
public void dispose()
{
client.removeListener(updateListener);
client.removeListener(config_listener);
timer.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2018 Oak Ridge National Laboratory.
* Copyright (c) 2018-2023 Oak Ridge National Laboratory.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
Expand All @@ -20,25 +20,25 @@
@SuppressWarnings("nls")
public class AlarmModelSnapshotDemo
{
@Test
public void testAlarmModelWriter() throws Exception
{
// Get alarm configuration
final AlarmClient client = new AlarmClient(AlarmDemoSettings.SERVERS, AlarmDemoSettings.ROOT, AlarmDemoSettings.KAFKA_PROPERTIES_FILE);

System.out.println("Wait for stable configuration, i.e. no changes for 4 seconds...");

// Wait until we have a snapshot, i.e. no more changes for 4 seconds
final AlarmConfigMonitor monitor = new AlarmConfigMonitor(4, client);
monitor.waitForPauseInUpdates(30);

System.out.println("Alarm configuration:");


final ByteArrayOutputStream buf = new ByteArrayOutputStream();
final XmlModelWriter xmlWriter = new XmlModelWriter(buf);
xmlWriter.write(client.getRoot());
xmlWriter.close();
@Test
public void testAlarmModelWriter() throws Exception
{
// Get alarm configuration
final AlarmClient client = new AlarmClient(AlarmDemoSettings.SERVERS, AlarmDemoSettings.ROOT, AlarmDemoSettings.KAFKA_PROPERTIES_FILE);
client.start();
System.out.println("Wait 10 secs for connection, then for stable configuration, i.e. no changes for 4 seconds...");
final long start = System.currentTimeMillis();

final AlarmConfigMonitor monitor = new AlarmConfigMonitor(10, 4, client);
monitor.waitForPauseInUpdates(30);
final double secs = (System.currentTimeMillis() - start) / 1000.0;
System.out.format("Alarm configuration after %.3f seconds:\n\n", secs);

final ByteArrayOutputStream buf = new ByteArrayOutputStream();
final XmlModelWriter xmlWriter = new XmlModelWriter(buf);
xmlWriter.write(client.getRoot());
xmlWriter.close();
final String xml = buf.toString();
System.out.println(xml);

Expand All @@ -47,5 +47,5 @@ public void testAlarmModelWriter() throws Exception
System.out.println("Bummer, there were " + changes + " updates to the configuration, might have to try this again...");
monitor.dispose();
client.shutdown();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2012-2018 Oak Ridge National Laboratory.
* Copyright (c) 2012-2023 Oak Ridge National Laboratory.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
Expand Down Expand Up @@ -61,4 +61,23 @@ public void testReset()
assertThat(timer.awaitTimeout(6), equalTo(true));
timer.shutdown();
}

@Test
public void testChangingTimeout()
{
ResettableTimeout timer = new ResettableTimeout(4);
System.out.println("Should time out in 4 secs");
assertThat(timer.awaitTimeout(8), equalTo(true));
timer.shutdown();


timer = new ResettableTimeout(4);
System.out.println("Now resetting after just 1 second to a 1 second timeout...");
assertThat(timer.awaitTimeout(1), equalTo(false));
timer.reset(1);

System.out.println(".. and expecting time out in 1 second...");
assertThat(timer.awaitTimeout(4), equalTo(true));
timer.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2018 Oak Ridge National Laboratory.
* Copyright (c) 2018-2023 Oak Ridge National Laboratory.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
Expand Down Expand Up @@ -28,15 +28,16 @@
@SuppressWarnings("nls")
public class AlarmConfigTool
{
/** Timeout for receiving the first config update, a quasi connection timeout. Default is 10 seconds. */
public static long CONNECTION_SECS = 10;

/** Time the model must be stable for. Unit is seconds. Default is 4 seconds. */
private long STABILIZATION_SECS = 4;
public static long STABILIZATION_SECS = 4;

// Export an alarm system model to an xml file.
public void exportModel(String filename, String server, String config, String kafka_properties_file, long wait) throws Exception
{
// Export an alarm system model to an xml file.
public void exportModel(String filename, String server, String config, String kafka_properties_file) throws Exception
{
final XmlModelWriter xmlWriter;

if (wait > STABILIZATION_SECS) STABILIZATION_SECS = wait;

// Write to stdout or to file.
if (filename.equals("stdout"))
Expand All @@ -56,10 +57,11 @@ public void exportModel(String filename, String server, String config, String ka
final AlarmClient client = new AlarmClient(server, config, kafka_properties_file);
client.start();

System.out.printf("Writing file after model is stable for %d seconds:\n", STABILIZATION_SECS);
System.out.printf("Writing file after %d second connection timeout, then waiting until model is stable for %d seconds:\n",
CONNECTION_SECS, STABILIZATION_SECS);
System.out.println("Monitoring changes...");

final AlarmConfigMonitor updateMonitor = new AlarmConfigMonitor(STABILIZATION_SECS, client);
final AlarmConfigMonitor updateMonitor = new AlarmConfigMonitor(CONNECTION_SECS, STABILIZATION_SECS, client);
updateMonitor.waitForPauseInUpdates(30);

System.out.printf("Received no more updates for %d seconds, I think I have a stable configuration\n", STABILIZATION_SECS);
Expand All @@ -77,18 +79,18 @@ public void exportModel(String filename, String server, String config, String ka
updateMonitor.dispose();

client.shutdown();
}
}

// Import an alarm system model from an xml file.
public void importModel(final String filename, final String server, final String config, String kafka_properties_file) throws InterruptedException, Exception
{
System.out.println("Reading new configuration from " + filename);
final long start = System.currentTimeMillis();
final File file = new File(filename);
final FileInputStream fileInputStream = new FileInputStream(file);
// Import an alarm system model from an xml file.
public void importModel(final String filename, final String server, final String config, String kafka_properties_file) throws InterruptedException, Exception
{
System.out.println("Reading new configuration from " + filename);
final long start = System.currentTimeMillis();
final File file = new File(filename);
final FileInputStream fileInputStream = new FileInputStream(file);

final XmlModelReader xmlModelReader = new XmlModelReader();
xmlModelReader.load(fileInputStream);
final XmlModelReader xmlModelReader = new XmlModelReader();
xmlModelReader.load(fileInputStream);

final AlarmClientNode new_root = xmlModelReader.getRoot();
// Check that the configs match.
Expand All @@ -99,13 +101,14 @@ public void importModel(final String filename, final String server, final String
}
final long got_xml = System.currentTimeMillis();

// Connect to the server.
final AlarmClient client = new AlarmClient(server, config, kafka_properties_file);
// Connect to the server.
final AlarmClient client = new AlarmClient(server, config, kafka_properties_file);
client.start();
try
{
System.out.println("Fetching existing alarm configuration for \"" + config + "\", then waiting for it to remain stable for " + STABILIZATION_SECS + " seconds...");
final AlarmConfigMonitor updateMonitor = new AlarmConfigMonitor(STABILIZATION_SECS, client);
System.out.println("Fetching existing alarm configuration for \"" + config + "\" with " + CONNECTION_SECS +
" connection timeout, then waiting for it to remain stable for " + STABILIZATION_SECS + " seconds...");
final AlarmConfigMonitor updateMonitor = new AlarmConfigMonitor(CONNECTION_SECS, STABILIZATION_SECS, client);
updateMonitor.waitForPauseInUpdates(30);
updateMonitor.dispose();
final long got_old_config = System.currentTimeMillis();
Expand All @@ -118,15 +121,15 @@ public void importModel(final String filename, final String server, final String
// Delete the old model. Leave the root node.
final List<AlarmTreeItem<?>> root_children = root.getChildren();
for (final AlarmTreeItem<?> child : root_children)
client.removeComponent(child);
client.removeComponent(child);
final long deleted_old = System.currentTimeMillis();

System.out.println("Loading new " + new_root.getName() + " ...");

// For every child of the new root, add them and their descendants to the old root.
final List<AlarmTreeItem<?>> new_root_children = new_root.getChildren();
for (final AlarmTreeItem<?> child : new_root_children)
addNodes(client, root, child);
addNodes(client, root, child);
final long loaded_new = System.currentTimeMillis();

System.out.println("Time to read XML : " + SecondsParser.formatSeconds((got_xml - start) / 1000.0));
Expand All @@ -140,16 +143,16 @@ public void importModel(final String filename, final String server, final String
{
client.shutdown();
}
}

private void addNodes(final AlarmClient client, final AlarmTreeItem<?> parent, final AlarmTreeItem<?> tree_item) throws Exception
{
// Send the configuration for the newly created node.
client.sendItemConfigurationUpdate(tree_item.getPathName(), tree_item);

// Recurse over children.
final List<AlarmTreeItem<?>> children = tree_item.getChildren();
for (final AlarmTreeItem<?> child : children)
addNodes(client, tree_item, child);
}
}

private void addNodes(final AlarmClient client, final AlarmTreeItem<?> parent, final AlarmTreeItem<?> tree_item) throws Exception
{
// Send the configuration for the newly created node.
client.sendItemConfigurationUpdate(tree_item.getPathName(), tree_item);

// Recurse over children.
final List<AlarmTreeItem<?>> children = tree_item.getChildren();
for (final AlarmTreeItem<?> child : children)
addNodes(client, tree_item, child);
}
}
Loading

0 comments on commit b20ccda

Please sign in to comment.