diff --git a/org.lflang.tests/src/org/lflang/tests/TestBase.java b/org.lflang.tests/src/org/lflang/tests/TestBase.java index 32a0d11073..60ea708098 100644 --- a/org.lflang.tests/src/org/lflang/tests/TestBase.java +++ b/org.lflang.tests/src/org/lflang/tests/TestBase.java @@ -490,8 +490,15 @@ private ProcessBuilder getExecCommand(LFTest test) { } } case Python: { + var binPath = test.fileConfig.binPath; + var binaryName = nameOnly; + var fullPath = binPath.resolve(binaryName); + if (Files.exists(fullPath)) { + // If execution script exists, run it. + return new ProcessBuilder(fullPath.toString()).directory(binPath.toFile()); + } var srcGen = test.fileConfig.getSrcGenPath(); - var fullPath = srcGen.resolve(nameOnly + ".py"); + fullPath = srcGen.resolve(nameOnly + ".py"); if (Files.exists(fullPath)) { return new ProcessBuilder("python3", fullPath.getFileName().toString()) .directory(srcGen.toFile()); diff --git a/org.lflang/src/lib/py/reactor-c-py b/org.lflang/src/lib/py/reactor-c-py index 7bc14d22a2..54363be857 160000 --- a/org.lflang/src/lib/py/reactor-c-py +++ b/org.lflang/src/lib/py/reactor-c-py @@ -1 +1 @@ -Subproject commit 7bc14d22a20f73ed2cc4a5700207edb3d0da71d6 +Subproject commit 54363be857f8edb0a07c552875e45b468d689ea8 diff --git a/org.lflang/src/org/lflang/federated/FedASTUtils.java b/org.lflang/src/org/lflang/federated/FedASTUtils.java index b044c58444..7b40c2ae56 100644 --- a/org.lflang/src/org/lflang/federated/FedASTUtils.java +++ b/org.lflang/src/org/lflang/federated/FedASTUtils.java @@ -1,5 +1,6 @@ /************* * Copyright (c) 2021, The University of California at Berkeley. + * Copyright (c) 2021, The University of Texas at Dallas. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: @@ -38,6 +39,7 @@ import org.lflang.InferredType; import org.lflang.JavaAstUtils; import org.lflang.TargetProperty.CoordinationType; +import org.lflang.federated.serialization.SupportedSerializers; import org.lflang.TimeValue; import org.lflang.generator.GeneratorBase; import org.lflang.generator.PortInstance; @@ -62,6 +64,7 @@ * execution. * * @author Soroush Bateni {soroush@utdallas.edu} + * @author Edward A. Lee {eal@berkeley.edu} * */ public class FedASTUtils { @@ -698,9 +701,9 @@ public static void makeCommunication( serializer = SupportedSerializers.valueOf( connection.getSerializer().getType().toUpperCase() ); - // Add it to the list of enabled serializers - generator.enabledSerializers.add(serializer); } + // Add it to the list of enabled serializers + generator.enabledSerializers.add(serializer); // Add the sender reaction. addNetworkSenderReaction( diff --git a/org.lflang/src/org/lflang/federated/PythonGeneratorExtension.java b/org.lflang/src/org/lflang/federated/PythonGeneratorExtension.java new file mode 100644 index 0000000000..01a0461d44 --- /dev/null +++ b/org.lflang/src/org/lflang/federated/PythonGeneratorExtension.java @@ -0,0 +1,209 @@ +/************* + * Copyright (c) 2021, The University of Texas at Dallas. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + ***************/ + +package org.lflang.federated; + +import org.lflang.InferredType; +import org.lflang.TargetProperty.CoordinationType; +import org.lflang.federated.serialization.FedNativePythonSerialization; +import org.lflang.federated.serialization.FedSerialization; +import org.lflang.federated.serialization.SupportedSerializers; +import org.lflang.generator.python.PythonGenerator; +import org.lflang.lf.Action; +import org.lflang.lf.Delay; +import org.lflang.lf.VarRef; + +/** + * An extension class to the PythonGenerator that enables certain federated + * functionalities. + * + * @author Soroush Bateni {soroush@utdallas.edu} + * + */ +public class PythonGeneratorExtension { + /** + * Generate code for the body of a reaction that handles an output + * that is to be sent over the network. + * @param sendingPort The output port providing the data to send. + * @param receivingPort The variable reference to the destination port. + * @param receivingPortID The ID of the destination port. + * @param sendingFed The sending federate. + * @param sendingBankIndex The bank index of the sending federate, if it is a bank. + * @param sendingChannelIndex The channel index of the sending port, if it is a multiport. + * @param receivingFed The destination federate. + * @param type The type. + * @param isPhysical Indicates whether the connection is physical or not + * @param delay The delay value imposed on the connection using after + * @param serializer The serializer used on the connection. + * @param generator The instance of the PythonGenerator. + */ + public static String generateNetworkSenderBody( + VarRef sendingPort, + VarRef receivingPort, + int receivingPortID, + FederateInstance sendingFed, + int sendingBankIndex, + int sendingChannelIndex, + FederateInstance receivingFed, + InferredType type, + boolean isPhysical, + Delay delay, + SupportedSerializers serializer, + PythonGenerator generator + ) { + String sendRef = generator.generatePortRef(sendingPort, sendingBankIndex, sendingChannelIndex); + String receiveRef = generator.generateVarRef(receivingPort); // Used for comments only, so no need for bank/multiport index. + StringBuilder result = new StringBuilder(); + result.append("// Sending from " + sendRef + + " in federate " + sendingFed.name + " to " + receiveRef + " in federate " + receivingFed.name + "\n"); + // If the connection is physical and the receiving federate is remote, send it directly on a socket. + // If the connection is logical and the coordination mode is centralized, send via RTI. + // If the connection is logical and the coordination mode is decentralized, send directly + String messageType; + // Name of the next immediate destination of this message + String next_destination_name = "\"federate " + receivingFed.id + "\""; + + // Get the delay literal + String additionalDelayString = + CGeneratorExtension.getNetworkDelayLiteral( + delay, + generator + ); + + if (isPhysical) { + messageType = "MSG_TYPE_P2P_MESSAGE"; + } else if (generator.getTargetConfig().coordination == CoordinationType.DECENTRALIZED) { + messageType = "MSG_TYPE_P2P_TAGGED_MESSAGE"; + } else { + // Logical connection + // Send the message via rti + messageType = "MSG_TYPE_TAGGED_MESSAGE"; + next_destination_name = "\"federate " + receivingFed.id + " via the RTI\""; + } + + + String sendingFunction = "send_timed_message"; + String commonArgs = additionalDelayString + ", " + + messageType + ", " + + receivingPortID + ", " + + receivingFed.id + ", " + + next_destination_name + ", " + + "message_length"; + if (isPhysical) { + // Messages going on a physical connection do not + // carry a timestamp or require the delay; + sendingFunction = "send_message"; + commonArgs = messageType + ", " + receivingPortID + ", " + receivingFed.id + ", " + + next_destination_name + ", message_length"; + } + + var lengthExpression = ""; + var pointerExpression = ""; + switch (serializer) { + case NATIVE: { + var variableToSerialize = sendRef+"->value"; + FedNativePythonSerialization pickler = new FedNativePythonSerialization(); + lengthExpression = pickler.serializedBufferLength(); + pointerExpression = pickler.seializedBufferVar(); + result.append( + pickler.generateNetworkSerializerCode(variableToSerialize, null) + ); + result.append("size_t message_length = "+lengthExpression+";\n"); + result.append("info_print(\"Message length is %d\", message_length);\n"); + result.append(sendingFunction+"("+commonArgs+", "+pointerExpression+");\n"); + break; + } + case PROTO: { + throw new UnsupportedOperationException("Protbuf serialization is not supported yet."); + } + case ROS2: { + throw new UnsupportedOperationException("ROS2 serialization is not supported yet."); + } + + } + return result.toString(); + } + + /** + * Generate code for the body of a reaction that handles the + * action that is triggered by receiving a message from a remote + * federate. + * @param action The action. + * @param sendingPort The output port providing the data to send. + * @param receivingPort The ID of the destination port. + * @param receivingPortID The ID of the destination port. + * @param sendingFed The sending federate. + * @param receivingFed The destination federate. + * @param receivingBankIndex The receiving federate's bank index, if it is in a bank. + * @param receivingChannelIndex The receiving federate's channel index, if it is a multiport. + * @param type The type. + * @param isPhysical Indicates whether or not the connection is physical + * @param serializer The serializer used on the connection. + * @param generator The instance of the PythonGenerator. + */ + public static String generateNetworkReceiverBody( + Action action, + VarRef sendingPort, + VarRef receivingPort, + int receivingPortID, + FederateInstance sendingFed, + FederateInstance receivingFed, + int receivingBankIndex, + int receivingChannelIndex, + InferredType type, + boolean isPhysical, + SupportedSerializers serializer, + PythonGenerator generator + ) { + + String receiveRef = generator.generatePortRef(receivingPort, receivingBankIndex, receivingChannelIndex); + StringBuilder result = new StringBuilder(); + + // Transfer the physical time of arrival from the action to the port + result.append(receiveRef+"->physical_time_of_arrival = self->_lf__"+action.getName()+".physical_time_of_arrival;\n"); + + + String value = ""; + switch (serializer) { + case NATIVE: { + value = action.getName(); + FedNativePythonSerialization pickler = new FedNativePythonSerialization(); + result.append(pickler.generateNetworkDeserializerCode(value, null)); + result.append("SET("+receiveRef+", "+FedSerialization.deserializedVarName+");\n"); + break; + } + case PROTO: { + throw new UnsupportedOperationException("Protbuf serialization is not supported yet."); + } + case ROS2: { + throw new UnsupportedOperationException("ROS2 serialization is not supported yet."); + } + + } + + return result.toString(); + } +} diff --git a/org.lflang/src/org/lflang/federated/FedCLauncher.java b/org.lflang/src/org/lflang/federated/launcher/FedCLauncher.java similarity index 97% rename from org.lflang/src/org/lflang/federated/FedCLauncher.java rename to org.lflang/src/org/lflang/federated/launcher/FedCLauncher.java index 2c265ab286..db0a8675a8 100644 --- a/org.lflang/src/org/lflang/federated/FedCLauncher.java +++ b/org.lflang/src/org/lflang/federated/launcher/FedCLauncher.java @@ -23,7 +23,7 @@ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ***************/ -package org.lflang.federated; +package org.lflang.federated.launcher; import java.io.File; import java.io.IOException; @@ -31,6 +31,8 @@ import org.lflang.ErrorReporter; import org.lflang.FileConfig; import org.lflang.TargetConfig; +import org.lflang.federated.FedFileConfig; +import org.lflang.federated.FederateInstance; import org.lflang.generator.c.CCompiler; /** diff --git a/org.lflang/src/org/lflang/federated/FedLauncher.xtend b/org.lflang/src/org/lflang/federated/launcher/FedLauncher.xtend similarity index 91% rename from org.lflang/src/org/lflang/federated/FedLauncher.xtend rename to org.lflang/src/org/lflang/federated/launcher/FedLauncher.xtend index 11494159ff..77d451525f 100644 --- a/org.lflang/src/org/lflang/federated/FedLauncher.xtend +++ b/org.lflang/src/org/lflang/federated/launcher/FedLauncher.xtend @@ -23,7 +23,7 @@ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ***************/ -package org.lflang.federated +package org.lflang.federated.launcher import java.io.FileOutputStream import java.util.ArrayList @@ -41,17 +41,17 @@ import org.lflang.TargetProperty.ClockSyncMode * @author Soroush Bateni */ package class FedLauncher { - + protected var TargetConfig targetConfig; protected var FileConfig fileConfig; protected var ErrorReporter errorReporter; - + /** * @param targetConfig The current target configuration. * @param fileConfig The current file configuration. * @param errorReporter A error reporter for reporting any errors or warnings during the code generation */ - new ( + new( TargetConfig targetConfig, FileConfig fileConfig, ErrorReporter errorReporter @@ -60,16 +60,16 @@ package class FedLauncher { this.fileConfig = fileConfig; this.errorReporter = errorReporter; } - + /** * Return the compile command for a federate. * * @param federate The federate to compile. */ - protected def String compileCommandForFederate(FederateInstance federate) { + protected def String compileCommandForFederate(org.lflang.federated.FederateInstance federate) { throw new UnsupportedOperationException("Don't know how to compile the federates."); } - + /** * Return the command that will execute a remote federate, assuming that the current * directory is the top-level project folder. This is used to create a launcher script @@ -77,7 +77,7 @@ package class FedLauncher { * * @param federate The federate to execute. */ - protected def String executeCommandForRemoteFederate(FederateInstance federate) { + protected def String executeCommandForRemoteFederate(org.lflang.federated.FederateInstance federate) { throw new UnsupportedOperationException("Don't know how to execute the federates."); } @@ -85,13 +85,14 @@ package class FedLauncher { * Return the command that will execute a local federate, assuming that the current * directory is the top-level project folder. This is used to create a launcher script * for federates. - * + * * @param federate The federate to execute. */ - protected def String executeCommandForLocalFederate(FileConfig fileConfig, FederateInstance federate) { + protected def String executeCommandForLocalFederate(FileConfig fileConfig, + org.lflang.federated.FederateInstance federate) { throw new UnsupportedOperationException("Don't know how to execute the federates."); } - + /** * Create the launcher shell scripts. This will create one or two files * in the output path (bin directory). The first has name equal to @@ -136,9 +137,8 @@ package class FedLauncher { */ def createLauncher( ArrayList coreFiles, - List federates, + List federates, LinkedHashMap federationRTIProperties - ) { // NOTE: It might be good to use screen when invoking the RTI // or federates remotely so you can detach and the process keeps running. @@ -151,9 +151,7 @@ package class FedLauncher { // on the machine that runs the RTI. The command I tried // to get screen to work looks like this: // ssh -t «target» cd «path»; screen -S «filename»_«federate.name» -L bin/«filename»_«federate.name» 2>&1 - - //var outPath = binGenPath - + // var outPath = binGenPath val shCode = new StringBuilder() val distCode = new StringBuilder() shCode.append(''' @@ -187,7 +185,7 @@ package class FedLauncher { trap 'cleanup_err $LINENO' ERR trap 'cleanup_sigint $LINENO' SIGINT - + # Create a random 48-byte text ID for this federation. # The likelihood of two federations having the same ID is 1/16,777,216 (1/2^24). FEDERATION_ID=`openssl rand -hex 24` @@ -210,16 +208,16 @@ package class FedLauncher { if (user !== null) { target = user + '@' + host } - + var RTILaunchString = ''' - RTI -i ${FEDERATION_ID} \ - -n «federates.size» \ - -c «targetConfig.clockSync.toString()» «IF targetConfig.clockSync == ClockSyncMode.ON» \ - period «targetConfig.clockSyncOptions.period.toNanoSeconds» «ENDIF» \ - exchanges-per-interval «targetConfig.clockSyncOptions.trials» \ - & - ''' - + RTI -i ${FEDERATION_ID} \ + -n «federates.size» \ + -c «targetConfig.clockSync.toString()» «IF targetConfig.clockSync == ClockSyncMode.ON» \ + period «targetConfig.clockSyncOptions.period.toNanoSeconds» «ENDIF» \ + exchanges-per-interval «targetConfig.clockSyncOptions.trials» \ + & + ''' + // Launch the RTI in the foreground. if (host == 'localhost' || host == '0.0.0.0') { // FIXME: the paths below will not work on Windows @@ -249,8 +247,8 @@ package class FedLauncher { // Start the RTI on the remote machine. // FIXME: Should $FEDERATION_ID be used to ensure unique directories, executables, on the remote host? // Copy the source code onto the remote machine and compile it there. - if (distCode.length === 0) distCode.append(distHeader+"\n"); - + if(distCode.length === 0) distCode.append(distHeader + "\n"); + val logFileName = '''log/«fileConfig.name»_RTI.log''' // Launch the RTI on the remote machine using ssh and screen. @@ -270,11 +268,11 @@ package class FedLauncher { RTI -i '${FEDERATION_ID}' \ -n «federates.size» \ -c «targetConfig.clockSync.toString()» «IF targetConfig.clockSync == ClockSyncMode.ON» \ - period «targetConfig.clockSyncOptions.period.toNanoSeconds» «ENDIF» \ - exchanges-per-interval «targetConfig.clockSyncOptions.trials» \ - & + period «targetConfig.clockSyncOptions.period.toNanoSeconds» «ENDIF» \ + exchanges-per-interval «targetConfig.clockSyncOptions.trials» \ + & ''' - + shCode.append( ''' echo "#### Launching the runtime infrastructure (RTI) on remote host «host»." # FIXME: Killing this ssh does not kill the remote process. @@ -301,17 +299,17 @@ package class FedLauncher { sleep 1 ''') } - + // Index used for storing pids of federates var federateIndex = 0 for (federate : federates) { if (federate.isRemote) { - val fedFileConfig = new FedFileConfig(fileConfig, federate.name); + val fedFileConfig = new org.lflang.federated.FedFileConfig(fileConfig, federate.name); val fedRelSrcGenPath = fedFileConfig.srcGenBasePath.relativize(fedFileConfig.srcGenPath); - if(distCode.length === 0) distCode.append(distHeader+"\n"); + if(distCode.length === 0) distCode.append(distHeader + "\n"); val logFileName = '''log/«fedFileConfig.name»_«federate.name».log''' val compileCommand = compileCommandForFederate(federate); - //'''«targetConfig.compiler» src-gen/«topLevelName»_«federate.name».c -o bin/«topLevelName»_«federate.name» -pthread «targetConfig.compilerFlags.join(" ")»''' + // '''«targetConfig.compiler» src-gen/«topLevelName»_«federate.name».c -o bin/«topLevelName»_«federate.name» -pthread «targetConfig.compilerFlags.join(" ")»''' // FIXME: Should $FEDERATION_ID be used to ensure unique directories, executables, on the remote host? distCode.append( ''' echo "Making directory «path» and subdirectories src-gen, bin, and log on host «federate.user»@«federate.host»" @@ -345,14 +343,14 @@ package class FedLauncher { echo "In «path», executing: «executeCommand»" 2>&1 | tee -a «logFileName»; \ «executeCommand» 2>&1 | tee -a «logFileName»' & pids[«federateIndex++»]=$! - ''') + ''') } else { val executeCommand = executeCommandForLocalFederate(fileConfig, federate); shCode.append( ''' echo "#### Launching the federate «federate.name»." «executeCommand» & pids[«federateIndex++»]=$! - ''') + ''') } } if (host == 'localhost' || host == '0.0.0.0') { @@ -380,14 +378,14 @@ package class FedLauncher { if (file.exists) { file.delete } - + var fOut = new FileOutputStream(file) fOut.write(shCode.toString().getBytes()) fOut.close() if (!file.setExecutable(true, false)) { errorReporter.reportWarning("Unable to make launcher script executable.") } - + // Write the distributor file. // Delete the file even if it does not get generated. file = fileConfig.binPath.resolve(fileConfig.name + '_distribute.sh').toFile diff --git a/org.lflang/src/org/lflang/federated/launcher/FedPyLauncher.java b/org.lflang/src/org/lflang/federated/launcher/FedPyLauncher.java new file mode 100644 index 0000000000..d649b47365 --- /dev/null +++ b/org.lflang/src/org/lflang/federated/launcher/FedPyLauncher.java @@ -0,0 +1,83 @@ +/************* + * Copyright (c) 2021, The University of California at Berkeley. + * Copyright (c) 2021, The University of Texas at Dallas. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + ***************/ + +package org.lflang.federated.launcher; + +import org.lflang.ErrorReporter; +import org.lflang.FileConfig; +import org.lflang.TargetConfig; +import org.lflang.federated.FederateInstance; + +/** + * Utility class that can be used to create a launcher for federated LF programs + * that are written in Python. + * + * @author Soroush Bateni + * + */ +public class FedPyLauncher extends FedLauncher { + /** + * Create an instance of FedPyLauncher. + * + * @param targetConfig The current target configuration. + * @param fileConfig The current file configuration. + * @param errorReporter A error reporter for reporting any errors or warnings during the code generation + */ + public FedPyLauncher( + TargetConfig targetConfig, + FileConfig fileConfig, + ErrorReporter errorReporter + ) { + super(targetConfig, fileConfig, errorReporter); + } + + /** + * Return the command that will execute a remote federate, assuming that the current + * directory is the top-level project folder. This is used to create a launcher script + * for federates. + * + * @param federate The federate to execute. + */ + @Override + protected + String executeCommandForRemoteFederate(FederateInstance federate) { + return "python3 src-gen/"+fileConfig.name+"/"+federate.name+"/"+fileConfig.name+"_"+federate.name+" -i '$FEDERATION_ID'"; + } + + /** + * Return the command that will execute a local federate, assuming that the current + * directory is the top-level project folder. This is used to create a launcher script + * for federates. + * + * @param federate The federate to execute. + */ + @Override + protected + String executeCommandForLocalFederate(FileConfig fileConfig, FederateInstance federate) { + return "python3 " + fileConfig.getSrcGenPath() + "/" + federate.name + "/" + fileConfig.name+"_"+federate.name+".py -i $FEDERATION_ID"; + } +} diff --git a/org.lflang/src/org/lflang/federated/FedTSLauncher.java b/org.lflang/src/org/lflang/federated/launcher/FedTSLauncher.java similarity index 97% rename from org.lflang/src/org/lflang/federated/FedTSLauncher.java rename to org.lflang/src/org/lflang/federated/launcher/FedTSLauncher.java index a1d717739c..52b36371cd 100644 --- a/org.lflang/src/org/lflang/federated/FedTSLauncher.java +++ b/org.lflang/src/org/lflang/federated/launcher/FedTSLauncher.java @@ -23,11 +23,12 @@ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ***************/ -package org.lflang.federated; +package org.lflang.federated.launcher; import org.lflang.ErrorReporter; import org.lflang.FileConfig; import org.lflang.TargetConfig; +import org.lflang.federated.FederateInstance; /** * Utility class that can be used to create a launcher for federated LF programs diff --git a/org.lflang/src/org/lflang/federated/serialization/FedNativePythonSerialization.java b/org.lflang/src/org/lflang/federated/serialization/FedNativePythonSerialization.java new file mode 100644 index 0000000000..3815e9dd88 --- /dev/null +++ b/org.lflang/src/org/lflang/federated/serialization/FedNativePythonSerialization.java @@ -0,0 +1,122 @@ +/************* + * Copyright (c) 2021, The University of California at Berkeley. + * Copyright (c) 2021, The University of Texas at Dallas. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + ***************/ + +package org.lflang.federated.serialization; + +import org.lflang.Target; +import org.lflang.generator.GeneratorBase; + +/** + * Enables support for Python pickle serialization. + * + * @author Soroush Bateni + * + */ +public class FedNativePythonSerialization implements FedSerialization { + + @Override + public boolean isCompatible(GeneratorBase generator) { + if (generator.getTarget() != Target.Python ) { + throw new UnsupportedOperationException("This class only support Python serialization."); + } + return true; + } + + @Override + public String serializedBufferLength() { + return serializedVarName+".len"; + } + + @Override + public String seializedBufferVar() { + return serializedVarName+".buf"; + } + + @Override + public StringBuilder generateNetworkSerializerCode(String varName, + String originalType) { + StringBuilder serializerCode = new StringBuilder(); + + // Check that global_pickler is not null + serializerCode.append("if (global_pickler == NULL) error_print_and_exit(\"The pickle module is not loaded.\");\n"); + // Define the serialized PyObject + serializerCode.append("PyObject* serialized_pyobject = PyObject_CallMethod(global_pickler, \"dumps\", \"O\", "+varName+");\n"); + + // Error check + serializerCode.append("if (serialized_pyobject == NULL) {\n"); + serializerCode.append(" if (PyErr_Occurred()) PyErr_Print();\n"); + serializerCode.append(" error_print_and_exit(\"Could not serialize serialized_pyobject.\");\n"); + serializerCode.append("}\n"); + + serializerCode.append("Py_buffer "+serializedVarName+";\n"); + serializerCode.append("int returnValue = PyBytes_AsStringAndSize(serialized_pyobject, &"+serializedVarName+".buf, &"+serializedVarName+".len);\n"); + // Error check + serializerCode.append("if (returnValue == -1) {\n"); + serializerCode.append(" if (PyErr_Occurred()) PyErr_Print();\n"); + serializerCode.append(" error_print_and_exit(\"Could not serialize "+serializedVarName+".\");\n"); + serializerCode.append("}\n"); + + + + return serializerCode; + } + + @Override + public StringBuilder generateNetworkDeserializerCode(String varName, + String targetType) { + StringBuilder deserializerCode = new StringBuilder(); + + // Convert the network message to a Python ByteArray + deserializerCode.append("PyObject* message_byte_array = "+ + "PyBytes_FromStringAndSize((char*)"+varName+"->token->value, "+varName+"->token->length);\n"); + // Deserialize using Pickle + deserializerCode.append("Py_XINCREF(message_byte_array);\n"); + deserializerCode.append("PyObject* "+deserializedVarName+ + " = PyObject_CallMethod(global_pickler, \"loads\", \"O\", message_byte_array);\n"); + // Error check + deserializerCode.append("if ("+deserializedVarName+" == NULL) {\n"); + deserializerCode.append(" if (PyErr_Occurred()) PyErr_Print();\n"); + deserializerCode.append(" error_print_and_exit(\"Could not deserialize "+deserializedVarName+".\");\n"); + deserializerCode.append("}\n"); + + // Decrment the reference count + deserializerCode.append("Py_XDECREF(message_byte_array);\n"); + + return deserializerCode; + } + + @Override + public StringBuilder generatePreambleForSupport() { + return new StringBuilder(""); + } + + @Override + public StringBuilder generateCompilerExtensionForSupport() { + return new StringBuilder(""); + } + +} diff --git a/org.lflang/src/org/lflang/federated/FedROS2CPPSerialization.java b/org.lflang/src/org/lflang/federated/serialization/FedROS2CPPSerialization.java similarity index 82% rename from org.lflang/src/org/lflang/federated/FedROS2CPPSerialization.java rename to org.lflang/src/org/lflang/federated/serialization/FedROS2CPPSerialization.java index b732e4eec3..b2d22a89c2 100644 --- a/org.lflang/src/org/lflang/federated/FedROS2CPPSerialization.java +++ b/org.lflang/src/org/lflang/federated/serialization/FedROS2CPPSerialization.java @@ -1,4 +1,31 @@ -package org.lflang.federated; +/************* + * Copyright (c) 2021, The University of California at Berkeley. + * Copyright (c) 2021, The University of Texas at Dallas. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + ***************/ + +package org.lflang.federated.serialization; import org.lflang.Target; import org.lflang.generator.GeneratorBase; diff --git a/org.lflang/src/org/lflang/federated/FedSerialization.java b/org.lflang/src/org/lflang/federated/serialization/FedSerialization.java similarity index 98% rename from org.lflang/src/org/lflang/federated/FedSerialization.java rename to org.lflang/src/org/lflang/federated/serialization/FedSerialization.java index 3426926604..da044e7686 100644 --- a/org.lflang/src/org/lflang/federated/FedSerialization.java +++ b/org.lflang/src/org/lflang/federated/serialization/FedSerialization.java @@ -1,4 +1,4 @@ -package org.lflang.federated; +package org.lflang.federated.serialization; import org.lflang.generator.GeneratorBase; diff --git a/org.lflang/src/org/lflang/federated/SupportedSerializers.java b/org.lflang/src/org/lflang/federated/serialization/SupportedSerializers.java similarity index 92% rename from org.lflang/src/org/lflang/federated/SupportedSerializers.java rename to org.lflang/src/org/lflang/federated/serialization/SupportedSerializers.java index e72ae4b55a..332ab6152c 100644 --- a/org.lflang/src/org/lflang/federated/SupportedSerializers.java +++ b/org.lflang/src/org/lflang/federated/serialization/SupportedSerializers.java @@ -1,4 +1,4 @@ -package org.lflang.federated; +package org.lflang.federated.serialization; /** * The supported serializers. diff --git a/org.lflang/src/org/lflang/generator/GeneratorBase.xtend b/org.lflang/src/org/lflang/generator/GeneratorBase.xtend index 54b23ac815..27a1cc3735 100644 --- a/org.lflang/src/org/lflang/generator/GeneratorBase.xtend +++ b/org.lflang/src/org/lflang/generator/GeneratorBase.xtend @@ -62,7 +62,7 @@ import org.lflang.TargetProperty.CoordinationType import org.lflang.TimeValue import org.lflang.federated.FedASTUtils import org.lflang.federated.FederateInstance -import org.lflang.federated.SupportedSerializers +import org.lflang.federated.serialization.SupportedSerializers import org.lflang.graph.InstantiationGraph import org.lflang.lf.Action import org.lflang.lf.ActionOrigin diff --git a/org.lflang/src/org/lflang/generator/LFGenerator.java b/org.lflang/src/org/lflang/generator/LFGenerator.java index 3a7efc08f8..735ebe4878 100644 --- a/org.lflang/src/org/lflang/generator/LFGenerator.java +++ b/org.lflang/src/org/lflang/generator/LFGenerator.java @@ -17,6 +17,7 @@ import org.lflang.TargetConfig.Mode; import org.lflang.Target; import org.lflang.generator.c.CGenerator; +import org.lflang.generator.python.PythonGenerator; import org.lflang.scoping.LFGlobalScopeProvider; import com.google.inject.Inject; diff --git a/org.lflang/src/org/lflang/generator/c/CGenerator.xtend b/org.lflang/src/org/lflang/generator/c/CGenerator.xtend index afa4a15234..c1cccd513a 100644 --- a/org.lflang/src/org/lflang/generator/c/CGenerator.xtend +++ b/org.lflang/src/org/lflang/generator/c/CGenerator.xtend @@ -49,17 +49,18 @@ import org.lflang.ErrorReporter import org.lflang.FileConfig import org.lflang.InferredType import org.lflang.Target +import org.lflang.TargetConfig import org.lflang.TargetProperty import org.lflang.TargetProperty.ClockSyncMode import org.lflang.TargetProperty.CoordinationType import org.lflang.TargetProperty.LogLevel import org.lflang.TimeValue import org.lflang.federated.CGeneratorExtension -import org.lflang.federated.FedCLauncher import org.lflang.federated.FedFileConfig -import org.lflang.federated.FedROS2CPPSerialization import org.lflang.federated.FederateInstance -import org.lflang.federated.SupportedSerializers +import org.lflang.federated.launcher.FedCLauncher +import org.lflang.federated.serialization.FedROS2CPPSerialization +import org.lflang.federated.serialization.SupportedSerializers import org.lflang.generator.ActionInstance import org.lflang.generator.GeneratorBase import org.lflang.generator.InvalidSourceException @@ -93,7 +94,6 @@ import org.lflang.util.XtendUtil import static extension org.lflang.ASTUtils.* import static extension org.lflang.JavaAstUtils.* -import org.lflang.TargetConfig /** * Generator for C target. This class generates C code definining each reactor @@ -550,14 +550,16 @@ class CGenerator extends GeneratorBase { "federated/clock-sync.c" ); createFederatedLauncher(coreFiles); - - var rtiPath = fileConfig.getSrcGenBasePath().resolve("RTI") - var rtiDir = rtiPath.toFile() - if (!rtiDir.exists()) { - rtiDir.mkdirs() + + if (targetConfig.dockerOptions !== null) { + var rtiPath = fileConfig.getSrcGenBasePath().resolve("RTI") + var rtiDir = rtiPath.toFile() + if (!rtiDir.exists()) { + rtiDir.mkdirs() + } + writeRTIDockerFile(rtiPath, rtiDir) + copyRtiFiles(rtiDir, coreFiles) } - writeRTIDockerFile(rtiPath, rtiDir) - copyRtiFiles(rtiDir, coreFiles) } // Perform distinct code generation into distinct files for each federate. @@ -1323,7 +1325,6 @@ class CGenerator extends GeneratorBase { def writeRTIDockerFile(Path rtiPath, File rtiDir) { val dockerFileName = 'rti.Dockerfile' val dockerFile = rtiDir + File.separator + dockerFileName - var srcGenPath = fileConfig.getSrcGenPath // If a dockerfile exists, remove it. var file = new File(dockerFile) if (file.exists) { diff --git a/org.lflang/src/org/lflang/generator/PythonGenerator.xtend b/org.lflang/src/org/lflang/generator/python/PythonGenerator.xtend similarity index 87% rename from org.lflang/src/org/lflang/generator/PythonGenerator.xtend rename to org.lflang/src/org/lflang/generator/python/PythonGenerator.xtend index 65da0cb42e..b4b217e9d7 100644 --- a/org.lflang/src/org/lflang/generator/PythonGenerator.xtend +++ b/org.lflang/src/org/lflang/generator/python/PythonGenerator.xtend @@ -24,7 +24,7 @@ STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ***************/ -package org.lflang.generator +package org.lflang.generator.python import java.io.File import java.util.ArrayList @@ -39,10 +39,20 @@ import org.lflang.ErrorReporter import org.lflang.FileConfig import org.lflang.InferredType import org.lflang.Target +import org.lflang.TargetConfig +import org.lflang.TargetProperty.CoordinationType +import org.lflang.federated.FedFileConfig import org.lflang.federated.FederateInstance -import org.lflang.federated.SupportedSerializers +import org.lflang.federated.PythonGeneratorExtension +import org.lflang.federated.launcher.FedPyLauncher +import org.lflang.federated.serialization.FedNativePythonSerialization +import org.lflang.federated.serialization.SupportedSerializers +import org.lflang.generator.ParameterInstance +import org.lflang.generator.ReactorInstance +import org.lflang.generator.c.CCompiler import org.lflang.generator.c.CGenerator import org.lflang.lf.Action +import org.lflang.lf.Delay import org.lflang.lf.Input import org.lflang.lf.Instantiation import org.lflang.lf.Model @@ -59,8 +69,6 @@ import org.lflang.lf.VarRef import static extension org.lflang.ASTUtils.* import static extension org.lflang.JavaAstUtils.* -import org.lflang.TargetConfig -import org.lflang.generator.c.CCompiler /** * Generator for Python target. This class generates Python code defining each reactor @@ -91,36 +99,53 @@ class PythonGenerator extends CGenerator { } /** - * Template struct for ports with primitive types and + * Generic struct for ports with primitive types and * statically allocated arrays in Lingua Franca. * This template is defined as - * template - * struct template_input_output_port_struct { - * T value; - * bool is_present; - * int num_destinations; - * }; + * typedef struct { + * PyObject* value; + * bool is_present; + * int num_destinations; + * FEDERATED_CAPSULE_EXTENSION + * } generic_port_instance_struct; * - * @see xtext/org.lflang.linguafranca/src/lib/CCpp/ccpptarget.h + * @see reactor-c-py/lib/pythontarget.h */ val generic_port_type = "generic_port_instance_struct" /** - * Special template struct for ports with dynamically allocated + * Generic struct for ports with dynamically allocated * array types (a.k.a. token types) in Lingua Franca. * This template is defined as - * template - * struct template_input_output_port_struct { - * T value; - * bool is_present; - * int num_destinations; - * lf_token_t* token; - * int length; - * }; + * typedef struct { + * PyObject_HEAD + * PyObject* value; + * bool is_present; + * int num_destinations; + * lf_token_t* token; + * int length; + * FEDERATED_CAPSULE_EXTENSION + * } generic_port_instance_with_token_struct; * - * @see xtext/org.lflang.linguafranca/src/lib/CCpp/ccpptarget.h + * @see reactor-c-py/lib/pythontarget.h */ val generic_port_type_with_token = "generic_port_instance_with_token_struct" + + /** + * Generic struct for actions. + * This template is defined as + * typedef struct { + * trigger_t* trigger; + * PyObject* value; + * bool is_present; + * bool has_value; + * lf_token_t* token; + * FEDERATED_CAPSULE_EXTENSION + * } generic_action_instance_struct; + * + * @see reactor-c-py/lib/pythontarget.h + */ + val generic_action_type = "generic_action_instance_struct" override getTargetUndefinedType() '''PyObject*''' @@ -172,6 +197,21 @@ class PythonGenerator extends CGenerator { '''); } + /** + * Print information on how to execute the generated federation. + */ + def printFedRunInfo() { + println(''' + + ##################################### + To run the generated program, run: + + bash «fileConfig.binPath»/«fileConfig.name» + + ##################################### + '''); + } + //////////////////////////////////////////// //// Protected methods @@ -724,14 +764,7 @@ class PythonGenerator extends CGenerator { «generatePythonReactorClasses(federate)» - # The main function - def main(): - start() - - # As is customary in Python programs, the main() function - # should only be executed if the main module is active. - if __name__=="__main__": - main() + «PythonMainGenerator.generateCode()» ''' /** @@ -767,8 +800,9 @@ class PythonGenerator extends CGenerator { file.delete } // Create the necessary directories - if (!file.getParentFile().exists()) + if (!file.getParentFile().exists()) { file.getParentFile().mkdirs(); + } writeSourceCodeToFile(generatePythonCode(federate).toString.bytes, file.absolutePath) val setupPath = fileConfig.getSrcGenPath.resolve("setup.py") @@ -843,6 +877,40 @@ class PythonGenerator extends CGenerator { } pr(CGenerator.defineLogLevel(this)) + + if (isFederated) { + // FIXME: Instead of checking + // #ifdef FEDERATED, we could + // use #if (NUMBER_OF_FEDERATES > 1) + // To me, the former is more accurate. + pr(''' + #define FEDERATED + ''') + if (targetConfig.coordination === CoordinationType.CENTRALIZED) { + // The coordination is centralized. + pr(''' + #define FEDERATED_CENTRALIZED + ''') + } else if (targetConfig.coordination === CoordinationType.DECENTRALIZED) { + // The coordination is decentralized + pr(''' + #define FEDERATED_DECENTRALIZED + ''') + } + } + + // Handle target parameters. + // First, if there are federates, then ensure that threading is enabled. + if (isFederated) { + for (federate : federates) { + // The number of threads needs to be at least one larger than the input ports + // to allow the federate to wait on all input ports while allowing an additional + // worker thread to process incoming messages. + if (targetConfig.threads < federate.networkMessageActions.size + 1) { + targetConfig.threads = federate.networkMessageActions.size + 1; + } + } + } includeTargetLanguageHeaders() @@ -874,7 +942,8 @@ class PythonGenerator extends CGenerator { for (serialization : enabledSerializers) { switch (serialization) { case NATIVE: { - // No need to do anything at this point. + val pickler = new FedNativePythonSerialization(); + pr(pickler.generatePreambleForSupport.toString); } case PROTO: { // Handle .proto files. @@ -885,7 +954,8 @@ class PythonGenerator extends CGenerator { if (dotIndex > 0) { rootFilename = name.substring(0, dotIndex) } - pythonPreamble.append('''import «rootFilename»_pb2 as «rootFilename» + pythonPreamble.append(''' + import «rootFilename»_pb2 as «rootFilename» ''') } } @@ -921,6 +991,134 @@ class PythonGenerator extends CGenerator { } } + /** + * Generate code for the body of a reaction that handles the + * action that is triggered by receiving a message from a remote + * federate. + * @param action The action. + * @param sendingPort The output port providing the data to send. + * @param receivingPort The ID of the destination port. + * @param receivingPortID The ID of the destination port. + * @param sendingFed The sending federate. + * @param receivingFed The destination federate. + * @param receivingBankIndex The receiving federate's bank index, if it is in a bank. + * @param receivingChannelIndex The receiving federate's channel index, if it is a multiport. + * @param type The type. + * @param isPhysical Indicates whether or not the connection is physical + * @param serializer The serializer used on the connection. + */ + override generateNetworkReceiverBody( + Action action, + VarRef sendingPort, + VarRef receivingPort, + int receivingPortID, + FederateInstance sendingFed, + FederateInstance receivingFed, + int receivingBankIndex, + int receivingChannelIndex, + InferredType type, + boolean isPhysical, + SupportedSerializers serializer + ) { + var result = new StringBuilder(); + result.append(''' + // Acquire the GIL (Global Interpreter Lock) to be able to call Python APIs. + PyGILState_STATE gstate; + gstate = PyGILState_Ensure(); + ''') + result.append(PythonGeneratorExtension.generateNetworkReceiverBody( + action, sendingPort, receivingPort, + receivingPortID, + sendingFed, + receivingFed, + receivingBankIndex, + receivingChannelIndex, + type, + isPhysical, + serializer, + this + )); + result.append(''' + /* Release the thread. No Python API allowed beyond this point. */ + PyGILState_Release(gstate); + '''); + return result.toString(); + } + + /** + * Generate code for the body of a reaction that handles an output + * that is to be sent over the network. + * @param sendingPort The output port providing the data to send. + * @param receivingPort The variable reference to the destination port. + * @param receivingPortID The ID of the destination port. + * @param sendingFed The sending federate. + * @param sendingBankIndex The bank index of the sending federate, if it is a bank. + * @param sendingChannelIndex The channel index of the sending port, if it is a multiport. + * @param receivingFed The destination federate. + * @param type The type. + * @param isPhysical Indicates whether the connection is physical or not + * @param delay The delay value imposed on the connection using after + * @param serializer The serializer used on the connection. + */ + override generateNetworkSenderBody( + VarRef sendingPort, + VarRef receivingPort, + int receivingPortID, + FederateInstance sendingFed, + int sendingBankIndex, + int sendingChannelIndex, + FederateInstance receivingFed, + InferredType type, + boolean isPhysical, + Delay delay, + SupportedSerializers serializer + ) { + var result = new StringBuilder(); + result.append(''' + // Acquire the GIL (Global Interpreter Lock) to be able to call Python APIs. + PyGILState_STATE gstate; + gstate = PyGILState_Ensure(); + ''') + result.append(PythonGeneratorExtension.generateNetworkSenderBody( + sendingPort, + receivingPort, + receivingPortID, + sendingFed, + sendingBankIndex, + sendingChannelIndex, + receivingFed, + type, + isPhysical, + delay, + serializer, + this + )); + result.append(''' + /* Release the thread. No Python API allowed beyond this point. */ + PyGILState_Release(gstate); + '''); + return result.toString(); + } + + /** + * Create a launcher script that executes all the federates and the RTI. + * + * @param coreFiles The files from the core directory that must be + * copied to the remote machines. + */ + override createFederatedLauncher(ArrayList coreFiles) { + val launcher = new FedPyLauncher( + targetConfig, + fileConfig, + errorReporter + ); + launcher.createLauncher( + coreFiles, + federates, + federationRTIProperties + ); + } + /** * Generate the aliases for inputs, outputs, and struct type definitions for * actions of the specified reactor in the specified federate. @@ -928,70 +1126,60 @@ class PythonGenerator extends CGenerator { * @param federate A federate name, or null to unconditionally generate. */ override generateAuxiliaryStructs( - ReactorDecl decl, FederateInstance federate + ReactorDecl decl, + FederateInstance federate ) { val reactor = decl.toDefinition // First, handle inputs. for (input : reactor.allInputs) { - if (input.inferredType.isTokenType) { - pr(input, code, ''' - typedef «generic_port_type_with_token» «variableStructType(input, decl)»; - ''') - } - else - { - pr(input, code, ''' - typedef «generic_port_type» «variableStructType(input, decl)»; - ''') + if (federate === null || federate.containsPort(input as Port)) { + if (input.inferredType.isTokenType) { + pr(input, code, ''' + typedef «generic_port_type_with_token» «variableStructType(input, decl)»; + ''') + } else { + pr(input, code, ''' + typedef «generic_port_type» «variableStructType(input, decl)»; + ''') + } + } - + } // Next, handle outputs. for (output : reactor.allOutputs) { - if (output.inferredType.isTokenType) { - pr(output, code, ''' - typedef «generic_port_type_with_token» «variableStructType(output, decl)»; - ''') - } - else - { - pr(output, code, ''' - typedef «generic_port_type» «variableStructType(output, decl)»; - ''') + if (federate === null || federate.containsPort(output as Port)) { + if (output.inferredType.isTokenType) { + pr(output, code, ''' + typedef «generic_port_type_with_token» «variableStructType(output, decl)»; + ''') + } else { + pr(output, code, ''' + typedef «generic_port_type» «variableStructType(output, decl)»; + ''') + } + } } // Finally, handle actions. - // The very first item on this struct needs to be - // a trigger_t* because the struct will be cast to (trigger_t*) - // by the schedule() functions to get to the trigger. for (action : reactor.allActions) { - pr(action, code, ''' - typedef struct { - trigger_t* trigger; - «action.valueDeclaration» - bool is_present; - bool has_value; - lf_token_t* token; - } «variableStructType(action, reactor)»; - ''') + if (federate === null || federate.containsAction(action)) { + pr(action, code, ''' + typedef «generic_action_type» «variableStructType(action, decl)»; + ''') + } + } } /** * For the specified action, return a declaration for action struct to - * contain the value of the action. An action of - * type int[10], for example, will result in this: - * ``` - * int* value; - * ``` + * contain the value of the action. * This will return an empty string for an action with no type. * @param action The action. * @return A string providing the value field of the action struct. */ override valueDeclaration(Action action) { - if (action.type === null) { - return '' - } return "PyObject* value;" } @@ -1000,7 +1188,6 @@ class PythonGenerator extends CGenerator { * uniformly across all target languages. */ override includeTargetLanguageHeaders() { - pr('''#define MODULE_NAME LinguaFranca«topLevelName»''') pr('''#define _LF_GARBAGE_COLLECTED''') if (targetConfig.tracing !== null) { var filename = ""; @@ -1061,38 +1248,32 @@ class PythonGenerator extends CGenerator { if (errorsOccurred) return; var baseFileName = topLevelName + // Keep a separate file config for each federate + val oldFileConfig = fileConfig; for (federate : federates) { if (isFederated) { topLevelName = baseFileName + '_' + federate.name + fileConfig = new FedFileConfig(fileConfig, federate.name); } // Don't generate code if there is no main reactor if (this.main !== null) { generatePythonFiles(fsa, federate) - // The C generator produces all the .c files in a single central folder. - // However, we need to create a setup.py for each federate and run - // "pip install ." individually to compile and install each module - // Here, we move the necessary C files into each federate's folder - if (isFederated) { -// val srcDir = directory + File.separator + "src-gen" + File.separator + baseFileName -// val dstDir = directory + File.separator + "src-gen" + File.separator + filename - var filesToCopy = newArrayList('''«topLevelName».c''', "pythontarget.c", "pythontarget.h", - "ctarget.h", "core") - - fileConfig.copyFilesFromClassPath(fileConfig.srcPath.toString, fileConfig.getSrcGenPath.toString, filesToCopy); - - // Do not compile the Python code here. They will be compiled on remote machines + if (targetConfig.noCompile !== true) { + // If there are no federates, compile and install the generated code + pythonCompileCode(context) } else { - if (targetConfig.noCompile !== true) { - // If there are no federates, compile and install the generated code - pythonCompileCode(context) - } else { - printSetupInfo(); - } - + printSetupInfo(); + } + + if (!isFederated) { printRunInfo(); } } + fileConfig = oldFileConfig; } + if (isFederated) { + printFedRunInfo(); + } // Restore filename topLevelName = baseFileName } diff --git a/org.lflang/src/org/lflang/generator/python/PythonMainGenerator.java b/org.lflang/src/org/lflang/generator/python/PythonMainGenerator.java new file mode 100644 index 0000000000..e0184bf4f9 --- /dev/null +++ b/org.lflang/src/org/lflang/generator/python/PythonMainGenerator.java @@ -0,0 +1,54 @@ +/************* + * Copyright (c) 2021, The University of Texas at Dallas. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + ***************/ +package org.lflang.generator.python; + +/** + * Responsible for creating the main function for + * the generated Python target programs. + * + * @author Soroush Bateni + * + */ +public final class PythonMainGenerator { + + /* + * Generate the main function code + */ + public static String generateCode() { + StringBuilder code = new StringBuilder(); + code.append( + "# The main function\n" + + "def main(argv):\n" + + " start(argv)\n" + + "\n" + + "# As is customary in Python programs, the main() function\n" + + "# should only be executed if the main module is active.\n" + + "if __name__==\"__main__\":\n" + + " main(sys.argv)\n" + ); + return code.toString(); + } +} diff --git a/org.lflang/src/org/lflang/generator/ts/TSGenerator.kt b/org.lflang/src/org/lflang/generator/ts/TSGenerator.kt index 78ab01d6f0..0ac8160e91 100644 --- a/org.lflang/src/org/lflang/generator/ts/TSGenerator.kt +++ b/org.lflang/src/org/lflang/generator/ts/TSGenerator.kt @@ -31,7 +31,7 @@ import org.eclipse.xtext.generator.IGeneratorContext import org.lflang.* import org.lflang.ASTUtils.isInitialized import org.lflang.Target -import org.lflang.federated.FedTSLauncher +import org.lflang.federated.launcher.FedTSLauncher import org.lflang.federated.FederateInstance import org.lflang.generator.GeneratorBase import org.lflang.generator.PrependOperator @@ -39,7 +39,7 @@ import org.lflang.lf.* import org.lflang.scoping.LFGlobalScopeProvider import java.nio.file.Files import java.util.* -import org.lflang.federated.SupportedSerializers +import org.lflang.federated.serialization.SupportedSerializers /** * Generator for TypeScript target. diff --git a/org.lflang/src/org/lflang/validation/LFValidator.xtend b/org.lflang/src/org/lflang/validation/LFValidator.xtend index 993f673961..8d926c5ed6 100644 --- a/org.lflang/src/org/lflang/validation/LFValidator.xtend +++ b/org.lflang/src/org/lflang/validation/LFValidator.xtend @@ -83,7 +83,7 @@ import org.lflang.lf.WidthSpec import static extension org.lflang.ASTUtils.* import static extension org.lflang.JavaAstUtils.* -import org.lflang.federated.SupportedSerializers +import org.lflang.federated.serialization.SupportedSerializers /** * Custom validation checks for Lingua Franca programs. diff --git a/test/Python/src/federated/BroadcastFeedback.lf b/test/Python/src/federated/BroadcastFeedback.lf new file mode 100644 index 0000000000..224d71eb74 --- /dev/null +++ b/test/Python/src/federated/BroadcastFeedback.lf @@ -0,0 +1,32 @@ +/** + * This tests an output that is broadcast back to a multiport input of a bank. + */ +target Python { + timeout: 1 sec +}; +reactor SenderAndReceiver { + preamble {= + import sys + =} + output out; + input[2] inp; + state received(False); + + reaction(startup) -> out {= + out.set(42) + =} + reaction(inp) {= + if inp[0].is_present and inp[1].is_present and inp[0].value == 42 and inp[1].value == 42: + print("SUCCESS") + self.received = True + =} + reaction(shutdown) {= + if not self.received: + print("Failed to receive broadcast") + self.sys.exit(1) + =} +} +federated reactor { + s = new[2] SenderAndReceiver(); + (s.out)+ -> s.inp; +} diff --git a/test/Python/src/federated/BroadcastFeedbackWithHierarchy.lf b/test/Python/src/federated/BroadcastFeedbackWithHierarchy.lf new file mode 100644 index 0000000000..b28e42b5a9 --- /dev/null +++ b/test/Python/src/federated/BroadcastFeedbackWithHierarchy.lf @@ -0,0 +1,40 @@ +/** + * This tests an output that is broadcast back to a multiport input of a bank. + */ +target Python { + timeout: 1 sec +}; +reactor SenderAndReceiver { + output out; + input[2] in_; + state received(False); + + reaction(startup) -> out {= + out.set(42) + =} + + r = new Receiver(); + in_ -> r.in_; +} +reactor Receiver { + preamble {= + import sys + =} + input[2] in_; + state received(False); + + reaction(in_) {= + if in_[0].is_present and in_[1].is_present and in_[0].value == 42 and in_[1].value == 42: + print("SUCCESS") + self.received = True + =} + reaction(shutdown) {= + if not self.received: + print("Failed to receive broadcast") + self.sys.exit(1) + =} +} +federated reactor { + s = new[2] SenderAndReceiver(); + (s.out)+ -> s.in_; +} diff --git a/test/Python/src/federated/ChainWithDelay.lf b/test/Python/src/federated/ChainWithDelay.lf new file mode 100644 index 0000000000..187fc7cf32 --- /dev/null +++ b/test/Python/src/federated/ChainWithDelay.lf @@ -0,0 +1,19 @@ +/** + * Demonstration that monotonic NET hypothesis is invalid. + * + * @author Edward A. Lee + */ + target Python { + timeout: 3 msec +} +import Count from "../lib/Count.lf"; +import InternalDelay from "../lib/InternalDelay.lf"; +import TestCount from "../lib/TestCount.lf"; + +federated reactor { + c = new Count(period = 1 msec); + i = new InternalDelay(delay = 500 usec); + t = new TestCount(num_inputs = 3); + c.out -> i.in_; + i.out -> t.in_; +} diff --git a/test/Python/src/federated/CycleDetection.lf b/test/Python/src/federated/CycleDetection.lf new file mode 100644 index 0000000000..c5768e960f --- /dev/null +++ b/test/Python/src/federated/CycleDetection.lf @@ -0,0 +1,63 @@ +/** + * Check whether the internally generated network and control reactions + * introduce a cycle or not. The failure for this test is not being compiled. + * @author Edward A. Lee + */ +target Python; + +reactor CAReplica { + input local_update; + input remote_update; + input query; + + state balance(0); + + output response; + + reaction(local_update, remote_update) {= + if local_update.is_present: + self.balance += local_update.value + if remote_update.is_present: + self.balance += remote_update.value + =} + + reaction(query) -> response {= + response.set(self.balance) + =} +} +reactor UserInput { + preamble {= + import sys + =} + input balance; + output deposit; + + reaction(startup) -> deposit {= + deposit.set(100) + =} + reaction(balance) {= + if balance.value != 200: + self.sys.stderr.write("Did not receive the expected balance. Expected: 200. Got: {}.\n".format(balance.value)) + self.sys.exit(1) + print("Balance: {}".format(balance.value)) + request_stop() + =} + + reaction(shutdown) {= + print("Test passed!") + =} +} + +federated reactor { + u1 = new UserInput(); + r1 = new CAReplica(); + u2 = new UserInput(); + r2 = new CAReplica(); + (u1.deposit)+ -> r1.query, r1.local_update; + r1.response -> u1.balance; + u1.deposit -> r2.remote_update; + + (u2.deposit)+ -> r2.query, r2.local_update; + r2.response -> u2.balance; + u2.deposit -> r1.remote_update; +} \ No newline at end of file diff --git a/test/Python/src/federated/DecentralizedP2PUnbalancedTimeoutPhysical.lf b/test/Python/src/federated/DecentralizedP2PUnbalancedTimeoutPhysical.lf new file mode 100644 index 0000000000..9c13df4df8 --- /dev/null +++ b/test/Python/src/federated/DecentralizedP2PUnbalancedTimeoutPhysical.lf @@ -0,0 +1,47 @@ +/** + * Test a source-destination scenario where the source falls behind real-time, and reaches the + * timeout much later than the destination. In this test, the destination closes the connection + * early, causing the transmission to fail. Warnings will be printed. + * + * The test fails if the federation does not exit amenably. + * This variant has a physical connection between source and destination. + */ +target Python { + timeout: 1 msec, + coordination: decentralized +} + +reactor Clock(offset(0), period(1 sec)) { + output y; + timer t(offset, period); + state count(0); + reaction(t) -> y {= + self.count += 1 + y.set(self.count) + =} + reaction(shutdown) {= + print("SUCCESS: the source exited successfully.") + =} +} +reactor Destination { + preamble {= + import sys + =} + input x + state s(1) + reaction(x) {= + if x.value != self.s: + self.sys.stderr.write("Expected {} and got {}.".format(self.s, x.value)) + self.sys.exit(1) + self.s += 1 + =} + reaction(shutdown) {= + print("**** shutdown reaction invoked.") + print("Approx. time per reaction: {}ns".format(get_elapsed_physical_time()//(self.s+1))) + =} +} +federated reactor (period(10 usec)) { + c = new Clock(period = period); + d = new Destination(); + c.y ~> d.x; +} \ No newline at end of file diff --git a/test/Python/src/federated/DistributedBank.lf b/test/Python/src/federated/DistributedBank.lf new file mode 100644 index 0000000000..fecc4dea09 --- /dev/null +++ b/test/Python/src/federated/DistributedBank.lf @@ -0,0 +1,26 @@ +// Check bank of federates. +target Python { + timeout: 1 sec, + coordination: centralized +}; + +reactor Node { + preamble {= + import sys + =} + timer t(0, 100 msec); + state count(0) + reaction(t) {= + print("Hello world {}.".format(self.count)) + self.count += 1 + =} + reaction(shutdown) {= + if self.count == 0: + self.sys.stderr.write("Timer reactions did not execute.\n") + self.sys.exit(1) + =} +} + +federated reactor DistributedBank { + n = new[2] Node(); +} \ No newline at end of file diff --git a/test/Python/src/federated/DistributedBankToMultiport.lf b/test/Python/src/federated/DistributedBankToMultiport.lf new file mode 100644 index 0000000000..b684973fb4 --- /dev/null +++ b/test/Python/src/federated/DistributedBankToMultiport.lf @@ -0,0 +1,33 @@ +// Check multiport to bank connections between federates. +target Python { + timeout: 3 sec +}; + +import Count from "../lib/Count.lf"; + +reactor Destination { + preamble {= + import sys + =} + input[2] in_; + state count(1); + reaction(in_) {= + for i in range(len(in_)): + print("Received {}.".format(in_[i].value)) + if self.count != in_[i].value: + self.sys.stderr.write("Expected {}.\n".format(self.count)) + self.sys.exit(1) + self.count += 1 + =} + reaction(shutdown) {= + if self.count == 0: + self.sys.stderr.write("No data received.\n") + self.sys.exit(1) + =} +} + +federated reactor { + s = new[2] Count(); + d = new Destination(); + s.out -> d.in_; +} \ No newline at end of file diff --git a/test/Python/src/federated/DistributedCount.lf b/test/Python/src/federated/DistributedCount.lf new file mode 100644 index 0000000000..d705e61a43 --- /dev/null +++ b/test/Python/src/federated/DistributedCount.lf @@ -0,0 +1,43 @@ +/** Test a particularly simple form of a distributed deterministic system + * where a federation that receives timestamped messages has only those + * messages as triggers. Therefore, no additional coordination of the + * advancement of time (HLA or Ptides) is needed. + * @author Edward A. Lee + */ + target Python { + timeout: 5 sec, + logging: DEBUG, + coordination: centralized +}; + +import Count from "../lib/Count.lf"; + +reactor Print { + preamble {= + import sys + =} + input in_; + state c(1); + reaction(in_) {= + elapsed_time = get_elapsed_logical_time() + print("At time {}, received {}".format(elapsed_time, in_.value)) + if in_.value != self.c: + print("Expected to receive {}.".format(self.c)) + self.sys.exit(1) + if elapsed_time != MSEC(200) + SEC(1) * (self.c - 1): + print("Expected received time to be {}.".format(MSEC(200) * self.c)) + self.sys.exit(1) + self.c += 1 + =} + reaction(shutdown) {= + if self.c != 6: + print("Expected to receive 5 items.") + self.sys.exit(1) + =} +} + +federated reactor DistributedCount(offset(200 msec)) { + c = new Count(); + p = new Print(); + c.out -> p.in_ after offset; +} diff --git a/test/Python/src/federated/DistributedCountPhysical.lf b/test/Python/src/federated/DistributedCountPhysical.lf new file mode 100644 index 0000000000..96ee96620b --- /dev/null +++ b/test/Python/src/federated/DistributedCountPhysical.lf @@ -0,0 +1,51 @@ +/** + * Test a particularly simple form of a distributed deterministic system + * where a federation that receives timestamped messages only over connections + * that are marked 'physical' (using the ~> arrow). + * Therefore, no additional coordination of the + * advancement of time (HLA or Ptides) is needed. + * + * @author Edward A. Lee + * @author Soroush Bateni + */ +target Python { + timeout: 1 sec +}; +reactor Count { + timer t(200 msec, 1 sec); + state s(0); + output out; + reaction(t) -> out {= + out.set(self.s) + self.s += 1; + =} +} +reactor Print { + preamble {= + import sys + =} + input in_; + state c(0); + reaction(in_) {= + elapsed_time = get_elapsed_logical_time() + print("At time {}, received {}.".format(elapsed_time, in_.value)) + if in_.value != self.c: + self.sys.stderr.write("ERROR: Expected to receive {}.\n".format(self.c)) + self.sys.exit(1) + if not (elapsed_time > (SEC(1) * self.c) + MSEC(200)): + self.sys.stderr.write("ERROR: Expected received time to be strictly greater than {}. Got {}.\n".format(MSEC(200) * self.c, elapsed_time)) + self.sys.exit(3) + self.c += 1 + =} + reaction(shutdown) {= + if (self.c != 1): + self.sys.stderr.write("ERROR: Expected to receive 1 item. Received {}.\n".format(self.c)) + self.sys.exit(2) + print("SUCCESS: Successfully received 1 item."); + =} +} +federated reactor at localhost { + c = new Count(); + p = new Print(); + c.out ~> p.in_; // Indicating a 'physical' connection. +} diff --git a/test/Python/src/federated/DistributedCountPhysicalAfterDelay.lf b/test/Python/src/federated/DistributedCountPhysicalAfterDelay.lf new file mode 100644 index 0000000000..638e16f66c --- /dev/null +++ b/test/Python/src/federated/DistributedCountPhysicalAfterDelay.lf @@ -0,0 +1,51 @@ +/** + * Test a particularly simple form of a distributed deterministic system + * where a federation that receives timestamped messages only over connections + * that are marked 'physical' (using the ~> arrow). + * Therefore, no additional coordination of the + * advancement of time (HLA or Ptides) is needed. + * + * @author Edward A. Lee + * @author Soroush Bateni + */ +target Python { + timeout: 1 sec +}; +reactor Count { + timer t(200 msec, 1 sec); + state s(0); + output out; + reaction(t) -> out {= + out.set(self.s) + self.s += 1; + =} +} +reactor Print { + preamble {= + import sys + =} + input in_; + state c(0); + reaction(in_) {= + elapsed_time = get_elapsed_logical_time() + print("At time {}, received {}.".format(elapsed_time, in_.value)) + if in_.value != self.c: + self.sys.stderr.write("ERROR: Expected to receive {}.\n".format(self.c)) + self.sys.exit(1) + if not (elapsed_time > (SEC(1) * self.c) + MSEC(200)): + self.sys.stderr.write("ERROR: Expected received time to be strictly greater than {}. Got {}.\n".format(MSEC(200) * self.c, elapsed_time)) + self.sys.exit(3) + self.c += 1 + =} + reaction(shutdown) {= + if (self.c != 1): + self.sys.stderr.write("ERROR: Expected to receive 1 item. Received {}.\n".format(self.c)) + self.sys.exit(2) + print("SUCCESS: Successfully received 1 item."); + =} +} +federated reactor at localhost { + c = new Count(); + p = new Print(); + c.out ~> p.in_ after 400 msec; // Indicating a 'physical' connection. +} diff --git a/test/Python/src/federated/DistributedCountPhysicalDecentralized.lf b/test/Python/src/federated/DistributedCountPhysicalDecentralized.lf new file mode 100644 index 0000000000..16704b1471 --- /dev/null +++ b/test/Python/src/federated/DistributedCountPhysicalDecentralized.lf @@ -0,0 +1,53 @@ +/** + * Test a particularly simple form of a distributed deterministic system + * where a federation that receives timestamped messages only over connections + * that are marked 'physical' (using the ~> arrow). + * Therefore, no additional coordination of the + * advancement of time (HLA or Ptides) is needed. + * + * @author Edward A. Lee + * @author Soroush Bateni + */ +target Python { + timeout: 1 sec, + coordination: decentralized +}; + +reactor Count { + timer t(200 msec, 1 sec); + state s(0); + output out; + reaction(t) -> out {= + out.set(self.s) + self.s += 1 + =} +} +reactor Print { + preamble {= + import sys + =} + input in_; + state c(0); + reaction(in_) {= + elapsed_time = get_elapsed_logical_time() + print("At time {}, received {}.".format(elapsed_time, in_.value)) + if in_.value != self.c: + self.sys.stderr.write("ERROR: Expected to receive {}.\n".format(self.c)) + self.sys.exit(1) + if not (elapsed_time > (SEC(1) * self.c) + MSEC(200)): + self.sys.stderr.write("ERROR: Expected received time to be strictly greater than {}.\n".format(MSEC(200) * self.c)) + self.sys.exit(3) + self.c += 1 + =} + reaction(shutdown) {= + if self.c != 1: + self.sys.stderr.write("ERROR: Expected to receive 1 item. Received {}.\n".format(self.c)) + self.sys.exit(2) + print("SUCCESS: Successfully received 1 item.") + =} +} +federated reactor at localhost { + c = new Count(); + p = new Print(); + c.out ~> p.in_; // Indicating a 'physical' connection. +} diff --git a/test/Python/src/federated/DistributedMultiport.lf b/test/Python/src/federated/DistributedMultiport.lf new file mode 100644 index 0000000000..7b98f291f2 --- /dev/null +++ b/test/Python/src/federated/DistributedMultiport.lf @@ -0,0 +1,44 @@ +// Check multiport connections between federates. +target Python { + timeout: 1 sec, + coordination: centralized +}; + +reactor Source { + output[4] out; + timer t(0, 100 msec); + state count(0); + reaction(t) -> out {= + for i in range(len(out)): + out[i].set(self.count) + self.count += 1 + =} +} + +reactor Destination { + preamble {= + import sys + =} + input[4] in_; + state count(0); + reaction(in_) {= + for i in range(len(in_)): + if in_[i].is_present: + print("Received {}.".format(in_[i].value)) + if in_[i].value != self.count: + self.sys.stderr.write("Expected {}.\n".format(self.count)) + self.sys.exit(1) + self.count += 1 + =} + reaction(shutdown) {= + if self.count == 0: + self.sys.stderr.write("No data received.") + self.sys.exit(1) + =} +} + +federated reactor DistributedMultiport { + s = new Source(); + d = new Destination(); + s.out -> d.in_; +} \ No newline at end of file diff --git a/test/Python/src/federated/DistributedMultiportToBank.lf b/test/Python/src/federated/DistributedMultiportToBank.lf new file mode 100644 index 0000000000..5aab4cf758 --- /dev/null +++ b/test/Python/src/federated/DistributedMultiportToBank.lf @@ -0,0 +1,41 @@ +// Check multiport to bank connections between federates. +target Python { + timeout: 1 sec +}; + +reactor Source { + output[2] out; + timer t(0, 100 msec); + state count(0); + reaction(t) -> out {= + for i in range(len(out)): + out[i].set(self.count) + self.count += 1 + =} +} + +reactor Destination { + preamble {= + import sys + =} + input in_; + state count(0); + reaction(in_) {= + print("Received {}.".format(in_.value)) + if self.count != in_.value: + self.sys.stderr.write("Expected {}.\n".format(self.count)) + self.sys.exit(1) + self.count += 1 + =} + reaction(shutdown) {= + if self.count == 0: + self.sys.stderr.write("No data received.") + self.sys.exit(1) + =} +} + +federated reactor DistributedMultiportToBank { + s = new Source(); + d = new[2] Destination(); + s.out -> d.in_; +} \ No newline at end of file diff --git a/test/Python/src/federated/DistributedMultiportToken.lf b/test/Python/src/federated/DistributedMultiportToken.lf new file mode 100644 index 0000000000..4eac60522f --- /dev/null +++ b/test/Python/src/federated/DistributedMultiportToken.lf @@ -0,0 +1,37 @@ +// Check multiport connections between federates where the message is +// carried by a Token (in this case, with an array of char). +target Python { + timeout: 1 sec, + coordination: centralized +}; + +reactor Source { + output[4] out; + timer t(0, 200 msec); + state count(0); + reaction(t) -> out {= + for i in range(len(out)): + self.count += 1 + out[i].set("Hello {}".format(self.count)) + print("MessageGenerator: At time {}, send message: {}.".format( + get_elapsed_logical_time(), + out[i].value + ) + ) + =} +} + +reactor Destination { + input[4] in_; + reaction(in_) {= + for i in range(len(in_)): + if in_[i].is_present: + print("Received {}.".format(in_[i].value)) + =} +} + +federated reactor DistributedMultiportToken { + s = new Source(); + d = new Destination(); + s.out -> d.in_; +} \ No newline at end of file diff --git a/test/Python/src/federated/HelloDistributed.lf b/test/Python/src/federated/HelloDistributed.lf new file mode 100644 index 0000000000..0e5ab9fe0b --- /dev/null +++ b/test/Python/src/federated/HelloDistributed.lf @@ -0,0 +1,45 @@ +/** Test a particularly simple form of a distributed deterministic system + * where a federation that receives timestamped messages has only those + * messages as triggers. Therefore, no additional coordination of the + * advancement of time (HLA or Ptides) is needed. + * @author Edward A. Lee + */ +target Python; + +reactor Source { + output out; + reaction(startup) -> out {= + print("Sending 'Hello World!' message from source federate."); + out.set("Hello World!") + request_stop() + =} +} +reactor Destination { + input _in; + state received(false); + reaction(startup) {= + print("Destination started.") + =} + reaction(_in) {= + print(f"At logical time {get_elapsed_logical_time()}, destination received {_in.value}") + if _in.value != "Hello World!": + sys.stderr.write("ERROR: Expected to receive 'Hello World!'\n"); + exit(1) + self.received = True + =} + reaction(shutdown) {= + print("Shutdown invoked.") + if self.received is not True: + sys.stderr.write("ERROR: Destination did not receive the message.") + exit(1) + =} +} + +federated reactor HelloDistributed at localhost { +// reaction(startup) {= +// print("Printing something in top-level federated reactor.") +// =} + s = new Source(); // Reactor s is in federate Source + d = new Destination(); // Reactor d is in federate Destination + s.out -> d._in; // This version preserves the timestamp. +} diff --git a/test/Python/src/federated/ParallelDestinations.lf b/test/Python/src/federated/ParallelDestinations.lf new file mode 100644 index 0000000000..413b281eb0 --- /dev/null +++ b/test/Python/src/federated/ParallelDestinations.lf @@ -0,0 +1,25 @@ +/** + * Test parallel connections for federated execution. + */ +target Python { + timeout: 2 sec +} + +import Count from "../lib/Count.lf" +import TestCount from "../lib/TestCount.lf" + +reactor Source { + output[2] out; + c1 = new Count(); + c2 = new Count(); + + c1.out, c2.out -> out; +} + +federated reactor { + s = new Source(); + t1 = new TestCount(num_inputs = 3); + t2 = new TestCount(num_inputs = 3); + + s.out -> t1.in_, t2.in_; +} \ No newline at end of file diff --git a/test/Python/src/federated/ParallelSources.lf b/test/Python/src/federated/ParallelSources.lf new file mode 100644 index 0000000000..983a04328f --- /dev/null +++ b/test/Python/src/federated/ParallelSources.lf @@ -0,0 +1,26 @@ +/** + * Test parallel connections for federated execution. + */ +target Python { + timeout: 2 sec +} + +import Count from "../lib/Count.lf" +import TestCount from "../lib/TestCount.lf" + +reactor Destination { + input[2] in_; + + t1 = new TestCount(num_inputs = 3); + t2 = new TestCount(num_inputs = 3); + + in_ -> t1.in_, t2.in_; +} + +federated reactor { + c1 = new Count(); + c2 = new Count(); + d = new Destination(); + + c1.out, c2.out -> d.in_; +} \ No newline at end of file diff --git a/test/Python/src/federated/ParallelSourcesMultiport.lf b/test/Python/src/federated/ParallelSourcesMultiport.lf new file mode 100644 index 0000000000..deec689643 --- /dev/null +++ b/test/Python/src/federated/ParallelSourcesMultiport.lf @@ -0,0 +1,36 @@ +/** + * Test parallel connections for federated execution. + */ +target Python { + timeout: 2 sec +} + +import Count from "../lib/Count.lf" +import TestCount from "../lib/TestCount.lf" + +reactor Source { + output[2] out; + c1 = new Count(); + c2 = new Count(); + + c1.out, c2.out -> out; +} + +reactor Destination1 { + input[3] in_; + + t1 = new TestCount(num_inputs = 3); + t2 = new TestCount(num_inputs = 3); + t3 = new TestCount(num_inputs = 3); + + in_ -> t1.in_, t2.in_, t3.in_; +} + +federated reactor { + s1 = new Source(); + s2 = new Source(); + d1 = new Destination1(); + t4 = new TestCount(num_inputs = 3); + + s1.out, s2.out -> d1.in_, t4.in_; +} \ No newline at end of file diff --git a/test/Python/src/federated/PingPongDistributed.lf b/test/Python/src/federated/PingPongDistributed.lf new file mode 100644 index 0000000000..a4519b09a2 --- /dev/null +++ b/test/Python/src/federated/PingPongDistributed.lf @@ -0,0 +1,71 @@ + /** + * Basic benchmark from the Savina benchmark suite that is + * intended to measure message-passing overhead. + * This is based on https://www.scala-lang.org/old/node/54 + * See https://shamsimam.github.io/papers/2014-agere-savina.pdf. + * + * This is a distributed version, where Ping and Pong run in + * separate programs and can be run on different machines. + * + * To get a sense, some (informal) results for 1,000,000 ping-pongs + * on my Mac: + * + * Unthreaded: 97 msec + * Threaded: 265 msec + * Distributed: 53 seconds + * + * There is no parallelism in this application, so it does not benefit from being + * being distributed. + * + * These measurements are total execution time, including startup and shutdown, of + * all three programs. + * + * @author Edward A. Lee + */ +target Python; + +reactor Ping(count(10)) { + input receive; + output send; + state pingsLeft(count); + logical action serve; + reaction (startup, serve) -> send {= + print("At logical time {}, Ping sending {}.\n".format(get_elapsed_logical_time(), self.pingsLeft)) + send.set(self.pingsLeft) + self.pingsLeft -= 1 + =} + reaction (receive) -> serve {= + if self.pingsLeft > 0: + serve.schedule(0) + else: + request_stop() + =} +} +reactor Pong(expected(10)) { + preamble {= + import sys + =} + + input receive; + output send; + state count(0); + reaction(receive) -> send {= + self.count += 1 + print("At logical time {}, Pong received {}.\n".format(get_elapsed_logical_time(), receive.value)) + send.set(receive.value) + if self.count == self.expected: + request_stop() + =} + reaction(shutdown) {= + if self.count != self.expected: + print("Pong expected to receive {} inputs, but it received {}.\n".format(self.expected, self.count), file=self.sys.stderr) + self.sys.exit(1) + print("Pong received {} pings.\n".format(self.count)) + =} +} +federated reactor (count(10)) { + ping = new Ping(count = count); + pong = new Pong(expected = count); + ping.send ~> pong.receive; + pong.send ~> ping.receive; +} diff --git a/test/Python/src/federated/StopAtShutdown.lf b/test/Python/src/federated/StopAtShutdown.lf new file mode 100644 index 0000000000..6da7060e0c --- /dev/null +++ b/test/Python/src/federated/StopAtShutdown.lf @@ -0,0 +1,43 @@ +/** + * Check that request_stop() doesn't cause + * any issues at the shutdown tag. + * + * Original bug discovered by Steven Wong + * + * @author Steven Wong + */ +target Python { + timeout: 2 sec +} + +reactor A { + input in_; + reaction(startup) {= + print("Hello World!") + =} + + reaction(in_) {= + print("Got it") + =} + + reaction(shutdown) {= + request_stop() + =} +} + +reactor B { + output out; + timer t(1 sec); + reaction(t) -> out {= + out.set(1) + =} + reaction(shutdown) {= + request_stop() + =} +} + +federated reactor { + a = new A(); + b = new B(); + b.out -> a.in_; +} \ No newline at end of file diff --git a/test/Python/src/federated/failing/ClockSync.lf b/test/Python/src/federated/failing/ClockSync.lf new file mode 100644 index 0000000000..10fd0b474b --- /dev/null +++ b/test/Python/src/federated/failing/ClockSync.lf @@ -0,0 +1,81 @@ +/** + * This program tests clock synchronization. + * It checks the clock synchronization error and fails + * if it exceeds a threshold. Note that failures could + * occur here intermittently because clock synchronization + * accuracy depends on many conditions. But the threshold + * is quite high, so failures should be rare. + * @author Edward A. Lee + */ + + // reason for failing: clock-sync and clock-sync-options not supported in the python target + + +target Python { + coordination: decentralized, + timeout: 10 sec, + clock-sync: on, // Turn on runtime clock synchronization. + clock-sync-options: { + local-federates-on: true, // Forces all federates to perform clock sync. + collect-stats: true, // Collect useful statistics like average network delay + // and the standard deviation for the network delay over + // one clock synchronization cycle. Generates a warning + // if the standard deviation is higher than the clock sync + // guard. + test-offset: 200 msec, // Artificially offsets clocks by multiples of 200 msec. + period: 5 msec, // Period with which runtime clock sync is performed. + trials: 10, // Number of messages exchanged to perform clock sync. + attenuation: 10 // Attenuation applied to runtime clock sync adjustments. + } +}; + +/** + * Reactor that outputs periodically. + */ +reactor Ticker(period:time(1600 msec)) { + output out:int; + + timer tick(0, period); + + reaction(tick) -> out {= + SET(out, 42); + =} +} + +/** + * Print a message when an input arrives. + */ +reactor Printer { + input in:int; + + reaction(startup) {= + interval_t offset = _lf_global_physical_clock_offset + _lf_global_test_physical_clock_offset; + info_print("Clock sync error at startup is %lld ns.", offset); + =} + + reaction(in) {= + info_print("Received %d.", in->value); + =} + + reaction(shutdown) {= + interval_t offset = _lf_global_physical_clock_offset + _lf_global_test_physical_clock_offset; + info_print("Clock sync error at shutdown is %lld ns.", offset); + // Error out if the offset is bigger than 100 msec. + if (offset > MSEC(100)) { + error_print("Offset exceeds test threshold of 100 msec."); + exit(1); + } + =} +} + +reactor Federate { + source = new Ticker(); + play = new Printer(); + source.out -> play.in; +} + +federated reactor ClockSync { + fed1 = new Federate(); + fed2 = new Federate(); +} + diff --git a/test/Python/src/federated/failing/DecentralizedP2PComm.lf b/test/Python/src/federated/failing/DecentralizedP2PComm.lf new file mode 100644 index 0000000000..7daaf98447 --- /dev/null +++ b/test/Python/src/federated/failing/DecentralizedP2PComm.lf @@ -0,0 +1,47 @@ +target Python { + timeout: 1 sec, + tracing: true, + clock-sync: off, + coordination: decentralized +} + +// reason for failing: get_current_tag() not supported by the python target + +reactor Platform(start(0), expected_start(0), stp_offset_param(0)) { + preamble {= + import sys + =} + input in_; + output out; + timer t(0, 100 msec); + state count(start); + state expected(expected_start); + reaction(t) -> out {= + out.set(self.count) + self.count += 1 + =} + reaction(in_) {= + print("Received {}.".format(in_.value)) + if in_.value != self.expected_start: + self.sys.stderr.write("Expected {} but got {}.\n".format(self.expected_start, in_.value)) + self.sys.exit(1) + self.expected_start += 1 + =} STP (stp_offset_param) {= + print("Received {} late.".format(in_.value)) + current_tag = get_current_tag() + self.expected_star += 1 + self.sys.stderr.write("STP offset was violated by ({}, {}).".format(current_tag.time - in_.intended_tag.time, current_tag.microstep - in_.intended_tag.microstep)) + =} + reaction(shutdown) {= + print("Shutdown invoked.") + if self.expected == self.expected_start: + self.sys.stderr.write("Did not receive anything.\n") + self.sys.exit(1) + =} +} +federated reactor DecentralizedP2PComm { + a = new Platform(expected_start = 100, stp_offset_param = 10 msec); + b = new Platform(start = 100, stp_offset_param = 10 msec); + a.out -> b.in_; + b.out -> a.in_; +} diff --git a/test/Python/src/federated/failing/DecentralizedP2PUnbalancedTimeout.lf b/test/Python/src/federated/failing/DecentralizedP2PUnbalancedTimeout.lf new file mode 100644 index 0000000000..2977689fd6 --- /dev/null +++ b/test/Python/src/federated/failing/DecentralizedP2PUnbalancedTimeout.lf @@ -0,0 +1,50 @@ +/** + * Test a source-destination scenario where the source falls behind real-time, and reaches the + * timeout much later than the destination. In this test, the destination closes the connection + * early, causing the transmission to fail. Warnings will be printed about lost messages. + * + * The test fails if the federation does not exit. + */ +target Python { + timeout: 1 msec, + coordination: decentralized +} + +// reason for failing: get_current_tag() not supported by the python target + +reactor Clock(offset:time(0), period:time(1 sec)) { + output y:int; + timer t(offset, period); + state count:int(0); + reaction(t) -> y {= + (self->count)++; + info_print("Sending %d.", self->count); + SET(y, self->count); + =} + reaction(shutdown) {= + info_print("SUCCESS: the source exited successfully."); + =} +} +reactor Destination { + input x:int; + state s:int(1); + reaction(x) {= + info_print("Received %d", x->value); + tag_t current_tag = get_current_tag(); + if (x->value != self->s) { + error_print_and_exit("At tag (%lld, %u) expected %d and got %d.", + current_tag.time - start_time, current_tag.microstep, self->s, x->value + ); + } + self->s++; + =} + reaction(shutdown) {= + info_print("**** shutdown reaction invoked."); + info_print("Approx. time per reaction: %lldns", get_elapsed_physical_time()/(self->s+1)); + =} +} +federated reactor (period:time(10 usec)) { + c = new Clock(period = period); + d = new Destination(); + c.y -> d.x; +} \ No newline at end of file diff --git a/test/Python/src/federated/failing/DistributedCountDecentralized.lf b/test/Python/src/federated/failing/DistributedCountDecentralized.lf new file mode 100644 index 0000000000..b6dbbcbea4 --- /dev/null +++ b/test/Python/src/federated/failing/DistributedCountDecentralized.lf @@ -0,0 +1,53 @@ +/** Test a particularly simple form of a distributed deterministic system + * where a federation that receives timestamped messages has only those + * messages as triggers. Therefore, no additional coordination of the + * advancement of time (HLA or Ptides) is needed. + * @author Edward A. Lee + */ + +// reason for failing: get_microstop() and in_.intended_tag are not supported in python target + +target Python { + timeout: 5 sec, + coordination: decentralized +}; + +import Count from "../lib/Count.lf"; + +reactor Print { + preamble {= + import sys + =} + input in_ + state c(1) + reaction(in_) {= + elapsed_time = get_elapsed_logical_time() + print("At tag ({}, {}), received %d. The original intended tag of the message was ({}, {}).".format( + elapsed_time, + get_microstep(), + in_.value, + in_.intended_tag.time - get_start_time(), + in_.intended_tag.microstep + )) + if in_.value != self.c: + print("Expected to receive {}.".format(self.c)) + self.sys.exit(1) + if elapsed_time != MSEC(200) + SEC(1) * (self.c - 1): + print("Expected received time to be {}.".format(MSEC(200) * self.c)) + self.sys.exit(3) + self.c += 1 + =} + reaction(shutdown) {= + if self.c != 6: + self.sys.stderr.write("Expected to receive 5 items.\n") + self.sys.exit(2) + print("SUCCESS: Successfully received 5 items.") + =} +} + +federated reactor DistributedCountDecentralized { + c = new Count(); + p = new Print(); + c.out -> p.in_ after 200 msec; // Indicating a 'logical' connection + // with a large enough delay. +} diff --git a/test/Python/src/federated/failing/DistributedCountDecentralizedLate.lf b/test/Python/src/federated/failing/DistributedCountDecentralizedLate.lf new file mode 100644 index 0000000000..c3335bb91d --- /dev/null +++ b/test/Python/src/federated/failing/DistributedCountDecentralizedLate.lf @@ -0,0 +1,71 @@ + /** + * Test a form of a distributed deterministic system + * where a federate that receives timestamped messages has a timer in addition to the messages + * as triggers. Therefore, careful coordination of the advancement of time using Ptides is needed. + * @author Edward A. Lee + * @author Soroush Bateni + */ +target Python { + timeout: 4900 msec, + coordination: decentralized +}; + +import Count from "../lib/Count.lf"; + +// reason for failing: get_current_tag(), get_microstop(), compare_tags() and in_.intended_tag are not supported in python target + +reactor Print { + preamble {= + import sys + =} + input in_ // STP () + // STP(in, 30 msec); + state success(0) + state success_stp_violation(0) + timer t(0, 10 usec) // Force a timer to be invoke periodically + // to ensure logical time will advance in the + // absence of incoming messages. + state c(0) + reaction(in_) {= + current_tag = get_current_tag() + print("At tag ({}, {}) received {}. Intended tag is ({}, {}).".format( + get_elapsed_logical_time(), + get_microstep(), + in_.value, + in_.intended_tag.time - get_start_time(), + in_.intended_tag.microstep + ) + ) + if (compare_tags((tag_t){.time=current_tag.time - get_start_time(), .microstep=current_tag.microstep}, + (tag_t){.time=SEC(1) * self->c, .microstep=0}) == 0): + self.success += 1 // Message was on-time + self.c += 1 + =} STP (0) {= + current_tag = get_current_tag() + print("At tag ({}, {}), message has violated the STP offset by ({}, {}).".format( + current_tag.time - start_time, current_tag.microstep, + current_tag.time - in_.intended_tag.time, + current_tag.microstep - in_.intended_tag.microstep + ) + ) + self.success_stp_violation += 1 + self.c += 1 + =} + reaction(t) {= + // Do nothing. + =} + + reaction(shutdown) {= + if self.success + self.success_stp_violation != 5: + self.sys.stderr.write("Failed to detect STP violations in messages.\n") + self.sys.exit(1) + else: + print("Successfully detected STP violation ({} violations, {} on-time).".format(self.success_stp_violation, self.success)) + =} +} + +federated reactor { + c = new Count(); + p = new Print(); + c.out -> p.in_; // Indicating a 'logical' connection. +} diff --git a/test/Python/src/federated/failing/DistributedCountDecentralizedLateDownstream.lf b/test/Python/src/federated/failing/DistributedCountDecentralizedLateDownstream.lf new file mode 100644 index 0000000000..716807961e --- /dev/null +++ b/test/Python/src/federated/failing/DistributedCountDecentralizedLateDownstream.lf @@ -0,0 +1,111 @@ +/** + * Test a form of a distributed deterministic system + * where a federate that receives timestamped messages has a timer in addition to the messages + * as triggers. Therefore, careful coordination of the advancement of time using Ptides is needed. + * In addition, this test shows that the STP violation is passed down the hierarchy until it is handled. + * + * An STP violation occurs if when a message with intended tag g1 arrives + * on a port p after the receiving federate has progressed far enough that + * it cannot process an event with tag g1 on the port p. + * This test has a fast timer (10 usec period) in the receiving federate + * so that the receiving federate is continually advancing its current tag, + * and hence an STP violation is more likely to occur. + * Furthermore, this test sets the STP threshold to 0, which makes the + * violation extremely likely to occur. It could still not occur, however, + * if the message arrives between ticks of the 10 usec timer. + * + * @author Edward A. Lee + * @author Soroush Bateni + */ + +// reason for failing: get_current_tag(), get_microstop(), compare_tags() and in_.intended_tag are not supported in python target + +target Python { + timeout: 1900 msec, // 9 msec headroom for the last (probably tardy) message to arrive. + coordination: decentralized +}; + +import Count from "../lib/Count.lf"; + +reactor ImportantActuator { + input in:int; + state success:int(0); // Count messages that arrive without STP violation. + state success_stp_violation:int(0); + timer t(0, 10 usec); // Force a timer to be invoke periodically + // to ensure logical time will advance in the + // absence of incoming messages. + state c:int(0); + reaction(in) {= + tag_t current_tag = get_current_tag(); + info_print("ImportantActuator: At tag (%lld, %u) received %d. Intended tag is (%lld, %u).", + get_elapsed_logical_time(), + get_microstep(), + in->value, + in->intended_tag.time - get_start_time(), + in->intended_tag.microstep); + if (compare_tags((tag_t){.time=current_tag.time - get_start_time(), .microstep=current_tag.microstep}, + (tag_t){.time=SEC(1) * self->c, .microstep=0}) == 0) { + self->success++; // Message was on-time + } else { + error_print_and_exit("Normal reaction was invoked, but current tag doesn't match expected tag."); + } + self->c++; + =} STP (0) {= + tag_t current_tag = get_current_tag(); + info_print("ImportantActuator: At tag (%lld, %u), message has violated the STP offset by (%lld, %u).", + current_tag.time - start_time, current_tag.microstep, + current_tag.time - in->intended_tag.time, + current_tag.microstep - in->intended_tag.microstep); + self->success_stp_violation++; + self->c++; + =} + reaction(t) {= + // Do nothing. + =} + + reaction(shutdown) {= + if ((self->success + self->success_stp_violation) != 2) { + error_print_and_exit("Failed to detect STP violation in messages."); + } else { + info_print("Successfully detected STP violations (%d violations, %d on-time).", self->success_stp_violation, self->success); + } + =} +} + +reactor Print { + input in:int; + reaction(in) {= + tag_t current_tag = get_current_tag(); + info_print("Print reactor: at tag (%lld, %u) received %d. Intended tag is (%lld, %u).", + current_tag.time - get_start_time(), + current_tag.microstep, + in->value, + in->intended_tag.time - get_start_time(), + in->intended_tag.microstep); + =} + +} + +reactor Receiver { + input in:int; + timer t(0, 10 msec); // Force a timer to be invoke periodically + // to ensure logical time will advance in the + // absence of incoming messages. + state c:int(0); + p = new Print(); + a = new ImportantActuator(); + reaction(in) -> p.in, a.in {= + SET(p.in, in->value + 1); + SET(a.in, in->value + 1); + =} + + reaction(t) {= + // Do nothing. + =} +} + +federated reactor { + c = new Count(period = 1 sec); + r = new Receiver(); + c.out -> r.in; // Indicating a 'logical' connection. +} diff --git a/test/Python/src/federated/failing/DistributedCountDecentralizedLateHierarchy.lf b/test/Python/src/federated/failing/DistributedCountDecentralizedLateHierarchy.lf new file mode 100644 index 0000000000..13a0d1702b --- /dev/null +++ b/test/Python/src/federated/failing/DistributedCountDecentralizedLateHierarchy.lf @@ -0,0 +1,98 @@ +/** + * Test a form of a distributed deterministic system + * where a federate that receives timestamped messages has a timer in addition to the messages + * as triggers. Therefore, careful coordination of the advancement of time using Ptides is needed. + * In addition, this test shows that the STP violation of the reaction + * is passed down the hierarchy until it is handled. + * + * @author Edward A. Lee + * @author Soroush Bateni + */ + +// reason for failing: get_current_tag(), get_microstop(), compare_tags() and in_.intended_tag are not supported in python target + +target Python { + timeout: 4900 msec, + coordination: decentralized +}; + +import Count from "../lib/Count.lf"; + +reactor ImportantActuator { + input in:int; + state success:int(0); + state success_stp_violation:int(0); + timer t(0, 10 usec); // Force a timer to be invoke periodically + // to ensure logical time will advance in the + // absence of incoming messages. + state c:int(0); + reaction(in) {= + tag_t current_tag = get_current_tag(); + printf("At tag (%lld, %u) received %d. Intended tag is (%lld, %u).\n", + get_elapsed_logical_time(), + get_microstep(), + in->value, + in->intended_tag.time - get_start_time(), + in->intended_tag.microstep); + if (compare_tags((tag_t){.time=current_tag.time - get_start_time(), .microstep=current_tag.microstep}, + (tag_t){.time=SEC(1) * self->c, .microstep=0}) == 0) { + self->success++; // Message was on-time + } + self->c++; + =} STP (0) {= + tag_t current_tag = get_current_tag(); + printf("Message violated STP offset by (%lld, %u).\n", + current_tag.time - in->intended_tag.time, + current_tag.microstep - in->intended_tag.microstep); + self->success_stp_violation++; + self->c++; + =} + reaction(t) {= + // Do nothing. + =} + + reaction(shutdown) {= + if ((self->success + self->success_stp_violation) != 5) { + fprintf(stderr, "Failed to detect STP violations in messages.\n"); + exit(1); + } else { + printf("Successfully detected STP violations (%d violations, %d on-time).\n", self->success_stp_violation, self->success); + } + =} +} + +reactor Print { + input in:int; + reaction(in) {= + tag_t current_tag = get_current_tag(); + printf("At tag (%lld, %u) received %d. Intended tag is (%lld, %u).\n", + current_tag.time - get_start_time(), + current_tag.microstep, + in->value, + in->intended_tag.time - get_start_time(), + in->intended_tag.microstep); + =} + +} + +reactor Receiver { + input in:int; + timer t(0, 10 msec); // Force a timer to be invoke periodically + // to ensure logical time will advance in the + // absence of incoming messages. + state c:int(0); + p = new Print(); + a = new ImportantActuator(); + in -> p.in; + in -> a.in; + + reaction(t) {= + // Do nothing. + =} +} + +federated reactor { + c = new Count(); + r = new Receiver(); + c.out -> r.in; // Indicating a 'logical' connection. +} diff --git a/test/Python/src/federated/failing/DistributedDoublePort.lf b/test/Python/src/federated/failing/DistributedDoublePort.lf new file mode 100644 index 0000000000..8d03a342de --- /dev/null +++ b/test/Python/src/federated/failing/DistributedDoublePort.lf @@ -0,0 +1,57 @@ +/** + * Test the case for when two upstream federates + * send messages to a downstream federte on two + * different ports. One message should carry a + * microstep delay relative to the other + * message. + * + * @author Soroush Bateni + */ + +// reason for failing: get_microstop() is not supported in python target + +target Python { + timeout: 900 msec, + logging: DEBUG, + coordination: centralized +}; + +import Count from "../lib/Count.lf"; + +reactor CountMicrostep { + state count:int(1); + output out:int; + logical action act:int; + timer t(0, 1 sec); + reaction(t) -> act {= + schedule_int(act, 0, self->count++); + =} + + reaction(act) -> out {= + SET(out, act->value); + =} +} + +reactor Print { + input in:int; + input in2:int; + reaction(in, in2) {= + interval_t elapsed_time = get_elapsed_logical_time(); + info_print("At tag (%lld, %u), received in = %d and in2 = %d.", elapsed_time, get_microstep(), in->value, in2->value); + if (in->is_present && in2->is_present) { + error_print_and_exit("ERROR: invalid logical simultaneity."); + } + =} + + reaction(shutdown) {= + info_print("SUCCESS: messages were at least one microstep apart."); + =} +} + +federated reactor DistributedDoublePort { + c = new Count(); + cm = new CountMicrostep(); + p = new Print(); + c.out -> p.in; // Indicating a 'logical' connection. + cm.out -> p.in2; +} diff --git a/test/Python/src/federated/failing/DistributedLoopedAction.lf b/test/Python/src/federated/failing/DistributedLoopedAction.lf new file mode 100644 index 0000000000..63bab8033f --- /dev/null +++ b/test/Python/src/federated/failing/DistributedLoopedAction.lf @@ -0,0 +1,70 @@ +/** + * Test a sender-receiver network system that + * relies on microsteps being taken into account. + * + * @author Soroush Bateni + */ + +target Python { + timeout: 1 sec +}; + +// reason for failing: get_microstop() is not supported in python target + +import Sender from "../lib/LoopedActionSender.lf" + +reactor Receiver(take_a_break_after(10), break_interval(400 msec)) { + input in_; + state received_messages(0); + state total_received_messages(0); + state breaks(0); + timer t(0, 1 msec); // This will impact the performance + // but forces the logical time to advance + // Comment this line for a more sensible + // log output. + reaction(in_) {= + print("At tag ({}, {}) received value {}.".format( + get_elapsed_logical_time(), + get_microstep(), + in_.value + ) + ) + self->total_received_messages++; + if (in->value != self->received_messages++) { + fprintf(stderr,"ERROR: received messages out of order.\n"); + // exit(1); + } + if (get_elapsed_logical_time() != self->breaks * self->break_interval) { + fprintf(stderr,"ERROR: received messages at an incorrect time: %lld.\n", get_elapsed_logical_time()); + // exit(2); + } + + if (self->received_messages == self->take_a_break_after) { + // Sender is taking a break; + self->breaks++; + self->received_messages = 0; + } + =} + + reaction(t) {= + // Do nothing + =} + + reaction(shutdown) {= + if (self->breaks != 3 || + (self->total_received_messages != ((SEC(1)/self->break_interval)+1) * self->take_a_break_after) + ) { + fprintf(stderr,"ERROR: test failed.\n"); + exit(4); + } + printf("SUCCESS: Successfully received all messages from the sender.\n"); + =} +} + + +federated reactor DistributedLoopedAction { + sender = new Sender(); + receiver = new Receiver(); + + sender.out -> receiver.in; +} diff --git a/test/Python/src/federated/failing/DistributedLoopedActionDecentralized.lf b/test/Python/src/federated/failing/DistributedLoopedActionDecentralized.lf new file mode 100644 index 0000000000..1ddb49062a --- /dev/null +++ b/test/Python/src/federated/failing/DistributedLoopedActionDecentralized.lf @@ -0,0 +1,125 @@ +/** + * Test a sender-receiver network system that + * relies on microsteps being taken into account. + * The purpose of this test is to check whether the functionalities + * pertinent to dynamic STP offset adjustments are present and + * functioning to a degree. + * + * This version of the test does not use a centralized + * coordinator to advance tag. Therefore, + * the receiver will rely on an STP offset (initially + * zero) to wait long enough for messages to arrive + * before advancing its tag. In this test, + * the STP offset is initially zero and gradually + * raised every time an STP violation is perceived until + * no STP violation is observed. Therefore, the exact + * outcome of the test will depend on actual runtime + * timing. + * + * + * @author Soroush Bateni + */ + +// reason for failing: get_current_tag(), get_microstop(), compare_tags() and in_.intended_tag are not supported in python target + +target Python { + timeout: 1 sec, + coordination: decentralized +}; + + +import Sender from "../lib/LoopedActionSender.lf" + +reactor Receiver(take_a_break_after:int(10), break_interval:time(400 msec)) { + input in:int; + state received_messages:int(0); + state total_received_messages:int(0); + state breaks:int(0); + reaction(in) {= + tag_t current_tag = get_current_tag(); + info_print("At tag (%lld, %u) received value %d with STP violation (%lld, %u).", + current_tag.time - get_start_time(), + current_tag.microstep, + in->value, + current_tag.time - in->intended_tag.time, + current_tag.microstep - in->intended_tag.microstep + ); + self->total_received_messages++; + if (in->value != get_microstep()) { + warning_print("Received incorrect value %d. Expected %d.", in->value, get_microstep()); + // exit(1); // The receiver should tolerate this type of error + // in this test because messages on the network can + // arrive late. Note that with an accurate STP offset, + // this type of error should be extremely rare. + + } + if (in->value != self->received_messages) { + warning_print("Skipped expected value %d. Received value %d.", self->received_messages, in->value); + self->received_messages = in->value; + // exit(1); // The receiver should tolerate this type of error + // in this test because multiple messages arriving + // at a given tag (t, m) can overwrite each other. + // Because messages arrive in order, only the last + // value that is received on the port at a given tag + // can be observed. Note that with an accurate STP + // offset, this type of error should be extremely + // rare. + // FIXME: Messages should not be dropped or + // overwritten. + } + self->received_messages++; + if (self->received_messages == self->take_a_break_after) { + // Sender is taking a break; + self->breaks++; + self->received_messages = 0; + } + =} + + reaction(shutdown) {= + if (self->breaks != 3 || + (self->total_received_messages != ((SEC(1)/self->break_interval)+1) * self->take_a_break_after) + ) { + error_print_and_exit("Test failed. Breaks: %d, Messages: %d.", self->breaks, self->total_received_messages); + } + info_print("SUCCESS: Successfully received all messages from the sender. Breaks: %d, Messages: %d.", self->breaks, self->total_received_messages); + =} +} + +reactor STPReceiver(take_a_break_after:int(10), break_interval:time(400 msec), stp_offset:time(0)) { + input in:int; + state last_time_updated_stp:time(0); + receiver = new Receiver(take_a_break_after = 10, break_interval = 400 msec); + timer t (0, 1 msec); // Force advancement of logical time + + reaction (in) -> receiver.in {= + info_print("Received %d.", in->value); + SET(receiver.in, in->value); + =} STP (stp_offset) {= + info_print("Received %d late.", in->value); + tag_t current_tag = get_current_tag(); + info_print("STP violation of (%lld, %u) perceived on the input.", + current_tag.time - in->intended_tag.time, + current_tag.microstep - in->intended_tag.microstep); + SET(receiver.in, in->value); + // Only update the STP offset once per + // time step. + if (current_tag.time != self->last_time_updated_stp) { + info_print("Raising the STP offset by %lld.", MSEC(10)); + self->stp_offset += MSEC(10); + set_stp_offset(MSEC(10)); + self->last_time_updated_stp = current_tag.time; + } + =} + + reaction (t) {= + // Do nothing + =} +} + + +federated reactor DistributedLoopedActionDecentralized { + sender = new Sender(take_a_break_after = 10, break_interval = 400 msec); + stpReceiver = new STPReceiver(take_a_break_after = 10, break_interval = 400 msec); + + sender.out -> stpReceiver.in; +} diff --git a/test/Python/src/federated/failing/DistributedLoopedPhysicalAction.lf b/test/Python/src/federated/failing/DistributedLoopedPhysicalAction.lf new file mode 100644 index 0000000000..9bee8b3784 --- /dev/null +++ b/test/Python/src/federated/failing/DistributedLoopedPhysicalAction.lf @@ -0,0 +1,86 @@ +/** + * Test a sender-receiver network system that + * is similar to DistributedLoopedAction, but it uses a physical + * action rather than a logical action. + * This also demonstrates the advance-message-interval coordination option. + * This specifies the time period between Time Advance Notice (TAN) messages + * sent to the RTI (a form of null message that must be sent because of the + * physical action). The presence of this option also silences a warning + * about having a physical action that triggers an output. + * + * @author Soroush Bateni + */ + +// reason for failing: get_current_tag() is not supported in python target + +target C { + timeout: 1 sec, + coordination-options: {advance-message-interval: 10 msec} // Silences warning. +}; + +reactor Sender(take_a_break_after:int(10), break_interval:time(550 msec)) { + output out:int; + physical action act; + state sent_messages:int(0); + reaction(startup, act) -> act, out {= + // Send a message on out + SET(out, self->sent_messages); + self->sent_messages++; + if (self->sent_messages < self->take_a_break_after) { + schedule(act, 0); + } else { + // Take a break + self->sent_messages = 0; + schedule(act, self->break_interval); + } + =} +} + +reactor Receiver(take_a_break_after:int(10), break_interval:time(550 msec)) { + input in:int; + state received_messages:int(0); + state total_received_messages:int(0); + state breaks:int(0); + timer t(0, 1 msec); // This will impact the performance + // but forces the logical time to advance + // Comment this line for a more sensible + // log output. + reaction(in) {= + tag_t current_tag = get_current_tag(); + info_print("At tag (%lld, %u) received %d.", + current_tag.time - get_start_time(), + current_tag.microstep, + in->value); + self->total_received_messages++; + if (in->value != self->received_messages++) { + error_print_and_exit("Expected %d.", self->received_messages - 1); + } + + if (self->received_messages == self->take_a_break_after) { + // Sender is taking a break; + self->breaks++; + self->received_messages = 0; + } + =} + + reaction(t) {= + // Do nothing + =} + + reaction(shutdown) {= + if (self->breaks != 2 || + (self->total_received_messages != ((SEC(1)/self->break_interval)+1) * self->take_a_break_after) + ) { + error_print_and_exit("Test failed. Breaks: %d, Messages: %d.", self->breaks, self->total_received_messages); + } + info_print("SUCCESS: Successfully received all messages from the sender."); + =} +} + + +federated reactor DistributedLoopedPhysicalAction { + sender = new Sender(); + receiver = new Receiver(); + + sender.out -> receiver.in; +} \ No newline at end of file diff --git a/test/Python/src/federated/failing/DistributedLoopedPhysicalActionDecentralized.lf b/test/Python/src/federated/failing/DistributedLoopedPhysicalActionDecentralized.lf new file mode 100644 index 0000000000..cbebe75fda --- /dev/null +++ b/test/Python/src/federated/failing/DistributedLoopedPhysicalActionDecentralized.lf @@ -0,0 +1,22 @@ +/** + * Test a sender-receiver network system that + * relies on microsteps being taken into account. + * + * @author Soroush Bateni + */ + +// reason for failing: get_current_tag() is not supported in python target + +target C { + timeout: 1 sec, + coordination: decentralized +}; + +import Sender, Receiver from "DistributedLoopedPhysicalAction.lf" + +federated reactor { + sender = new Sender(); + receiver = new Receiver(); + + sender.out -> receiver.in; +} diff --git a/test/Python/src/federated/failing/DistributedNetworkOrder.lf b/test/Python/src/federated/failing/DistributedNetworkOrder.lf new file mode 100644 index 0000000000..59ede2977c --- /dev/null +++ b/test/Python/src/federated/failing/DistributedNetworkOrder.lf @@ -0,0 +1,67 @@ +/* + * This is a test for send_timed_message, + * which is an internal API. + * + * This test sends a second message at time 5 msec that has the same intended tag as + * a message that it had previously sent at time 0 msec. This results in a warning, + * but the message microstep is incremented and correctly received one microstep later. + * + * @author Soroush Bateni + */ + +// reason for failing: send_timed_message() is not not supported in python target + +target Python { + timeout: 1 sec +}; + +reactor Sender { + output out:int; + timer t(0, 1 msec); + reaction(t) {= + int payload = 1; + if (get_elapsed_logical_time() == 0LL) { + send_timed_message(MSEC(10), MSG_TYPE_TAGGED_MESSAGE, 0, 1, "federate 1", sizeof(int), + (unsigned char*)&payload); + } else if (get_elapsed_logical_time() == MSEC(5)) { + payload = 2; + send_timed_message(MSEC(5), MSG_TYPE_TAGGED_MESSAGE, 0, 1, "federate 1", sizeof(int), + (unsigned char*)&payload); + } + =} +} + +reactor Receiver { + input in:int; + state success:int(0); + + reaction(in) {= + tag_t current_tag = get_current_tag(); + if (current_tag.time == (start_time + MSEC(10))) { + if (current_tag.microstep == 0 && in->value == 1) { + self->success++; + } else if (current_tag.microstep == 1 && in->value == 2) { + self->success++; + } + } + printf("Received %d at tag (%lld, %u).\n", + in->value, + get_elapsed_logical_time(), + get_microstep()); + =} + + reaction(shutdown) {= + if (self->success != 2) { + fprintf(stderr, "ERROR: Failed to receive messages.\n"); + exit(1); + } + printf("SUCCESS.\n"); + =} +} + +federated reactor DistributedNetworkOrder { + sender = new Sender(); + receiver = new Receiver(); + + sender.out -> receiver.in; +} diff --git a/test/Python/src/federated/failing/DistributedStop.lf b/test/Python/src/federated/failing/DistributedStop.lf new file mode 100644 index 0000000000..54b6f233c2 --- /dev/null +++ b/test/Python/src/federated/failing/DistributedStop.lf @@ -0,0 +1,120 @@ + /** + * Test for request_stop() in federated execution with centralized coordination. + * + * @author Soroush Bateni + */ + + // reason for failing: get_microstep() and compare_tags() are not not supported in python target + + +target C; + +reactor Sender { + output out:int; + timer t(0, 1 usec); + logical action act; + state reaction_invoked_correctly:bool(false); + reaction(t, act) -> out, act {= + info_print("Sending 42 at (%lld, %u).", + get_elapsed_logical_time(), + get_microstep()); + SET(out, 42); + if (get_microstep() == 0) { + // Instead of having a separate reaction + // for 'act' like Stop.lf, we trigger the + // same reaction to test request_stop() being + // called multiple times + schedule(act, 0); + } + if (get_elapsed_logical_time() == USEC(1)) { + // Call request_stop() both at (1 usec, 0) and + // (1 usec, 1) + info_print("Requesting stop at (%lld, %u).", + get_elapsed_logical_time(), + get_microstep()); + request_stop(); + } + + tag_t _1usec1 = (tag_t) { .time = USEC(1) + get_start_time(), .microstep = 1u }; + if (compare_tags(get_current_tag(), _1usec1) == 0) { + // The reaction was invoked at (1 usec, 1) as expected + self->reaction_invoked_correctly = true; + } else if (compare_tags(get_current_tag(), _1usec1) > 0) { + // The reaction should not have been invoked at tags larger than (1 usec, 1) + error_print_and_exit("ERROR: Invoked reaction(t, act) at tag bigger than shutdown."); + } + =} + + reaction(shutdown) {= + if (get_elapsed_logical_time() != USEC(1) || + get_microstep() != 1) { + error_print_and_exit("ERROR: Sender failed to stop the federation in time. " + "Stopping at (%lld, %u).", + get_elapsed_logical_time(), + get_microstep()); + } else if (self->reaction_invoked_correctly == false) { + error_print_and_exit("ERROR: Sender reaction(t, act) was not invoked at (1 usec, 1). " + "Stopping at (%lld, %u).", + get_elapsed_logical_time(), + get_microstep()); + } + info_print("SUCCESS: Successfully stopped the federation at (%lld, %u).", + get_elapsed_logical_time(), + get_microstep()); + =} +} + +reactor Receiver ( + stp_offset:time(10 msec) // Used in the decentralized variant of the test +) { + input in:int; + state reaction_invoked_correctly:bool(false); + reaction(in) {= + info_print("Received %d at (%lld, %u).", + in->value, + get_elapsed_logical_time(), + get_microstep()); + if (get_elapsed_logical_time() == USEC(1)) { + info_print("Requesting stop at (%lld, %u).", + get_elapsed_logical_time(), + get_microstep()); + request_stop(); + // The receiver should receive a message at tag + // (1 usec, 1) and trigger this reaction + self->reaction_invoked_correctly = true; + } + + tag_t _1usec1 = (tag_t) { .time = USEC(1) + get_start_time(), .microstep = 1u }; + if (compare_tags(get_current_tag(), _1usec1) > 0) { + self->reaction_invoked_correctly = false; + } + =} + + reaction(shutdown) {= + // Sender should have requested stop earlier than the receiver. + // Therefore, the shutdown events must occur at (1000, 0) on the + // receiver. + if (get_elapsed_logical_time() != USEC(1) || + get_microstep() != 1) { + error_print_and_exit("Receiver failed to stop the federation at the right time. " + "Stopping at (%lld, %u).", + get_elapsed_logical_time(), + get_microstep()); + } else if (self->reaction_invoked_correctly == false) { + error_print_and_exit("Receiver reaction(in) was not invoked the correct number of times. " + "Stopping at (%lld, %u).", + get_elapsed_logical_time(), + get_microstep()); + } + info_print("SUCCESS: Successfully stopped the federation at (%lld, %u).", + get_elapsed_logical_time(), + get_microstep()); + =} +} + +federated reactor DistributedStop { + sender = new Sender(); + receiver = new Receiver(); + + sender.out -> receiver.in; +} \ No newline at end of file diff --git a/test/Python/src/federated/failing/DistributedStopDecentralized.lf b/test/Python/src/federated/failing/DistributedStopDecentralized.lf new file mode 100644 index 0000000000..50e622cdbb --- /dev/null +++ b/test/Python/src/federated/failing/DistributedStopDecentralized.lf @@ -0,0 +1,20 @@ + /** + * Test for request_stop() in federated execution with decentralized coordination. + * + * @author Soroush Bateni + */ + + // reason for failing: get_microstep() and compare_tags() are not not supported in python target + +target C { + coordination: decentralized +}; + +import Sender, Receiver from "DistributedStop.lf" + +federated reactor DistributedStopDecentralized { + sender = new Sender(); + receiver = new Receiver(); + + sender.out -> receiver.in; +} \ No newline at end of file diff --git a/test/Python/src/federated/failing/DistributedStopZero.lf b/test/Python/src/federated/failing/DistributedStopZero.lf new file mode 100644 index 0000000000..6af951fbc9 --- /dev/null +++ b/test/Python/src/federated/failing/DistributedStopZero.lf @@ -0,0 +1,86 @@ +/** + * Test for request_stop() in federated execution with centralized coordination + * at tag (0,0). + * + * @author Soroush Bateni + */ + + // reason for failing: get_microstep() and compare_tags() are not not supported in python target + +target C; + +reactor Sender { + output out:int; + timer t(0, 1 usec); + reaction(t) -> out{= + printf("Sending 42 at (%lld, %u).\n", + get_elapsed_logical_time(), + get_microstep()); + SET(out, 42); + + tag_t zero = (tag_t) { .time = get_start_time(), .microstep = 0u }; + if (compare_tags(get_current_tag(), zero) == 0) { + // Request stop at (0,0) + printf("Requesting stop at (%lld, %u).\n", + get_elapsed_logical_time(), + get_microstep()); + request_stop(); + } + =} + + reaction(shutdown) {= + if (get_elapsed_logical_time() != USEC(0) || + get_microstep() != 1) { + fprintf(stderr, "ERROR: Sender failed to stop the federation in time. " + "Stopping at (%lld, %u).\n", + get_elapsed_logical_time(), + get_microstep()); + exit(1); + } + printf("SUCCESS: Successfully stopped the federation at (%lld, %u).\n", + get_elapsed_logical_time(), + get_microstep()); + =} +} + +reactor Receiver { + input in:int; + reaction(in) {= + printf("Received %d at (%lld, %u).\n", + in->value, + get_elapsed_logical_time(), + get_microstep()); + tag_t zero = (tag_t) { .time = get_start_time(), .microstep = 0u }; + if (compare_tags(get_current_tag(), zero) == 0) { + // Request stop at (0,0) + printf("Requesting stop at (%lld, %u).\n", + get_elapsed_logical_time(), + get_microstep()); + request_stop(); + } + =} + + reaction(shutdown) {= + // Sender should have requested stop earlier than the receiver. + // Therefore, the shutdown events must occur at (1000, 0) on the + // receiver. + if (get_elapsed_logical_time() != USEC(0) || + get_microstep() != 1) { + fprintf(stderr, "ERROR: Receiver failed to stop the federation in time. " + "Stopping at (%lld, %u).\n", + get_elapsed_logical_time(), + get_microstep()); + exit(1); + } + printf("SUCCESS: Successfully stopped the federation at (%lld, %u).\n", + get_elapsed_logical_time(), + get_microstep()); + =} +} + +federated reactor { + sender = new Sender(); + receiver = new Receiver(); + + sender.out -> receiver.in; +} \ No newline at end of file diff --git a/test/Python/src/federated/failing/DistributedToken.lf b/test/Python/src/federated/failing/DistributedToken.lf new file mode 100644 index 0000000000..44c9ef7ef3 --- /dev/null +++ b/test/Python/src/federated/failing/DistributedToken.lf @@ -0,0 +1,106 @@ +/** + * Distributed LF program where a MessageGenerator creates a string + * message that is sent via the RTI (runtime infrastructure) to a + * receiver that prints the message. The type is char*, so this + * tests the transport of token-encapsulated messages. + * Three executable programs are generated, Distributed, + * Distributed_Sender, and Distributed_Receiver. + * The RTI is realized in the first of these and is identified + * as a "launcher," so it launches the other two programs. + * + * This program uses a 'logical' connection -> with a STP violation handler, + * decentralized coordination, and an 'after' that is sufficiently + * large to get deterministic timestamps. Hence, it realizes a + * 'poor man's Ptides' that does not require clock synchronization + * nor HLA-style centralized control over the advancement of time. + * + * @author Edward A. Lee + */ + + // reason for failing: get_microstep() is not supported in the python target + + +target Python { + timeout: 5 secs, + coordination: decentralized +}; + +/** + * Reactor that generates a sequence of messages, one per second. + * The message will be a string consisting of a root string followed + * by a count. + * @param root The root string. + * @output message The message. + */ +reactor MessageGenerator(root:string("")) { + // Output type char* instead of string is used for dynamically + // allocated character arrays (as opposed to static constant strings). + output message:char*; + state count:int(1); + // Send first message after 1 sec so that the startup reactions + // do not factor into the transport time measurement on the first message. + timer t(1 sec, 1 sec); + reaction(t) -> message {= + // With NULL, 0 arguments, snprintf tells us how many bytes are needed. + // Add one for the null terminator. + int length = snprintf(NULL, 0, "%s %d", self->root, self->count) + 1; + // Dynamically allocate memory for the output. + SET_NEW_ARRAY(message, length); + // Populate the output string and increment the count. + snprintf(message->value, length, "%s %d", self->root, self->count++); + printf("MessageGenerator: At time %lld, send message: %s\n", + get_elapsed_logical_time(), + message->value + ); + =} +} + +/** + * Reactor that prints an incoming string. + * @param prefix A prefix for the message. + * @input message The message. + */ +reactor PrintMessage { + input message:char*; + state count:int(0); + reaction(message) {= + printf("PrintMessage: At (elapsed) logical time %lld, receiver receives: %s\n", + get_elapsed_logical_time(), + message->value + ); + // Check the trailing number only of the message. + self->count++; + int trailing_number = atoi(&message->value[12]); + if (trailing_number != self->count) { + printf("ERROR: Expected message to be 'Hello World %d'.\n", self->count); + exit(1); + } + =} STP (0) {= + printf("PrintMessage: At (elapsed) tag (%lld, %u), receiver receives: %s\n" + "Original intended tag was (%lld, %u).\n", + get_elapsed_logical_time(), + get_microstep(), + message->value, + message->intended_tag.time - get_start_time(), + message->intended_tag.microstep); + // Check the trailing number only of the message. + self->count++; + int trailing_number = atoi(&message->value[12]); + if (trailing_number != self->count) { + printf("ERROR: Expected message to be 'Hello World %d'.\n", self->count); + exit(1); + } + =} + reaction(shutdown) {= + if (self->count == 0) { + printf("ERROR: No messages received.\n"); + exit(2); + } + =} +} + +federated reactor DistributedToken { + msg = new MessageGenerator(root = "Hello World"); + dsp = new PrintMessage(); + msg.message -> dsp.message after 40 msec; +} \ No newline at end of file diff --git a/test/Python/src/federated/failing/LoopDistributedCentralized.lf b/test/Python/src/federated/failing/LoopDistributedCentralized.lf new file mode 100644 index 0000000000..9b1a665d33 --- /dev/null +++ b/test/Python/src/federated/failing/LoopDistributedCentralized.lf @@ -0,0 +1,66 @@ +/** + * This tests a feedback loop with physical actions and + * centralized coordination. + * + * @author Edward A. Lee + */ + + // reason for failing: lf_comma_separated_time() not supported in the python target + +target Python { + flags: "-Wall", + coordination: centralized, + coordination-options: {advance-message-interval: 100 msec}, + threads: 2, + timeout: 5 sec +} +preamble {= + #include // Defines sleep() + bool stop = false; + // Thread to trigger an action once every second. + void* ping(void* actionref) { + while(!stop) { + info_print("Scheduling action."); + schedule(actionref, 0); + sleep(1); + } + return NULL; + } +=} + +reactor Looper(incr:int(1), delay:time(0 msec)) { + input in:int; + output out:int; + physical action a(delay); + state count:int(0); + reaction(startup) -> a {= + // Start the thread that listens for Enter or Return. + lf_thread_t thread_id; + info_print("Starting thread."); + lf_thread_create(&thread_id, &ping, a); + =} + reaction(a) -> out {= + SET(out, self->count); + self->count += self->incr; + =} + reaction(in) {= + instant_t time_lag = get_physical_time() - get_logical_time(); + char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, time_lag); + info_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); + =} + reaction(shutdown) {= + info_print("******* Shutdown invoked."); + // Stop the thread that is scheduling actions. + stop = true; + if (self->count != 5 * self->incr) { + error_print_and_exit("Failed to receive all five expected inputs."); + } + =} +} +federated reactor LoopDistributedCentralized(delay:time(0)) { + left = new Looper(); + right = new Looper(incr = -1); + left.out -> right.in; + right.out -> left.in; +} \ No newline at end of file diff --git a/test/Python/src/federated/failing/LoopDistributedCentralizedPrecedence.lf b/test/Python/src/federated/failing/LoopDistributedCentralizedPrecedence.lf new file mode 100644 index 0000000000..80fe08dcee --- /dev/null +++ b/test/Python/src/federated/failing/LoopDistributedCentralizedPrecedence.lf @@ -0,0 +1,53 @@ +/** + * This tests that the precedence order of reaction invocation is kept + * when a feedback loop is present in centralized coordination. + * + * @author Edward A. Lee + * @author Soroush Bateni + */ + + // reason for failing: lf_comma_separated_time() not supported in the python target + +target Python { + flags: "-Wall", + coordination: centralized, + coordination-options: {advance-message-interval: 100 msec}, + threads: 2, + timeout: 5 sec +} + +reactor Looper(incr:int(1), delay:time(0 msec)) { + input in:int; + output out:int; + state count:int(0); + state received_count:int(0); + timer t(0, 1 sec); + reaction(t) -> out {= + SET(out, self->count); + self->count += self->incr; + =} + reaction(in) {= + instant_t time_lag = get_physical_time() - get_logical_time(); + char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, time_lag); + info_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); + self->received_count = self->count; + =} + reaction(t) {= + if (self->received_count != self->count) { + error_print_and_exit("reaction(t) was invoked before reaction(in). Precedence order was not kept."); + } + =} + reaction(shutdown) {= + info_print("******* Shutdown invoked."); + if (self->count != 6 * self->incr) { + error_print_and_exit("Failed to receive all six expected inputs."); + } + =} +} +federated reactor (delay:time(0)) { + left = new Looper(); + right = new Looper(incr = -1); + left.out -> right.in; + right.out -> left.in; +} \ No newline at end of file diff --git a/test/Python/src/federated/failing/LoopDistributedCentralizedPrecedenceHierarchy.lf b/test/Python/src/federated/failing/LoopDistributedCentralizedPrecedenceHierarchy.lf new file mode 100644 index 0000000000..e435c658fe --- /dev/null +++ b/test/Python/src/federated/failing/LoopDistributedCentralizedPrecedenceHierarchy.lf @@ -0,0 +1,70 @@ +/** + * This tests that the precedence order of reaction invocation is kept + * in the hierarchy of reactors when a feedback loop is present in centralized coordination. + * + * @author Edward A. Lee + * @author Soroush Bateni + */ + + // reason for failing: lf_comma_separated_time() not supported in the python target + + +target Python { + flags: "-Wall", + coordination: centralized, + coordination-options: {advance-message-interval: 100 msec}, + threads: 2, + timeout: 5 sec +} + +reactor Contained (incr:int(1)) { + timer t(0, 1 sec); + input in:int; + state count:int(0); + state received_count:int(0); + reaction(t) {= + self->count += self->incr; + =} + reaction(in) {= + self->received_count = self->count; + =} + reaction(t) {= + if (self->received_count != self->count) { + error_print_and_exit("reaction(t) was invoked before reaction(in). Precedence order was not kept."); + } + =} +} + +reactor Looper(incr:int(1), delay:time(0 msec)) { + input in:int; + output out:int; + state count:int(0); + timer t(0, 1 sec); + + c = new Contained(incr = incr); + + reaction(t) -> out {= + info_print("Sending network output %d", self->count); + SET(out, self->count); + self->count += self->incr; + =} + reaction(in) {= + instant_t time_lag = get_physical_time() - get_logical_time(); + char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, time_lag); + info_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); + =} + in->c.in; + reaction(shutdown) {= + info_print("******* Shutdown invoked."); + if (self->count != 6 * self->incr) { + error_print_and_exit("Failed to receive all six expected inputs."); + } + =} +} +federated reactor (delay:time(0)) { + left = new Looper(); + right = new Looper(incr = -1); + left.out -> right.in; + right.out -> left.in; +} \ No newline at end of file diff --git a/test/Python/src/federated/failing/LoopDistributedDecentralized.lf b/test/Python/src/federated/failing/LoopDistributedDecentralized.lf new file mode 100644 index 0000000000..0900857866 --- /dev/null +++ b/test/Python/src/federated/failing/LoopDistributedDecentralized.lf @@ -0,0 +1,75 @@ +/** + * This tests a feedback loop with physical actions and + * decentralized coordination. + * + * @author Edward A. Lee + */ + + // reason for failing: lf_comma_separated_time() not supported in the python target + +target Python { + coordination: decentralized, + threads: 3, + timeout: 5 sec +} +preamble {= + #include // Defines sleep() + bool stop = false; + // Thread to trigger an action once every second. + void* ping(void* actionref) { + while(!stop) { + info_print("Scheduling action."); + schedule(actionref, 0); + sleep(1); + } + return NULL; + } +=} + +reactor Looper(incr:int(1), delay:time(0 msec), stp_offset:time(0)) { + input in:int; + output out:int; + physical action a(stp_offset); + state count:int(0); + reaction(startup) -> a {= + // Start the thread that listens for Enter or Return. + lf_thread_t thread_id; + info_print("Starting thread."); + lf_thread_create(&thread_id, &ping, a); + =} + reaction(a) -> out {= + info_print("Setting out."); + SET(out, self->count); + self->count += self->incr; + =} + reaction(in) {= + instant_t time_lag = get_physical_time() - get_logical_time(); + char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, time_lag); + info_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); + =} STP (stp_offset) {= + instant_t time_lag = get_physical_time() - get_logical_time(); + char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, time_lag); + info_print("STP offset was violated. Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); + =} deadline (10 msec) {= + instant_t time_lag = get_physical_time() - get_logical_time(); + char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, time_lag); + info_print("Deadline miss. Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); + =} + reaction(shutdown) {= + info_print("******* Shutdown invoked."); + // Stop the thread that is scheduling actions. + stop = true; + if (self->count != 5 * self->incr) { + error_print_and_exit("Failed to receive all five expected inputs."); + } + =} +} +federated reactor LoopDistributedDecentralized(delay:time(0)) { + left = new Looper(stp_offset = 900 usec); + right = new Looper(incr = -1, stp_offset = 2400 usec); + left.out -> right.in; + right.out -> left.in; +} diff --git a/test/Python/src/federated/failing/LoopDistributedDouble.lf b/test/Python/src/federated/failing/LoopDistributedDouble.lf new file mode 100644 index 0000000000..491be98ba8 --- /dev/null +++ b/test/Python/src/federated/failing/LoopDistributedDouble.lf @@ -0,0 +1,86 @@ +/** + * This tests a feedback loop with physical actions and + * centralized coordination. + * + * @author Edward A. Lee + */ + +// reason for failing: current tag struct not supported in the python target + +target Python { + flags: "-Wall", + coordination: centralized, + coordination-options: {advance-message-interval: 100 msec}, + threads: 2, + timeout: 5 sec +} +preamble {= + #include // Defines sleep() + bool stop = false; + // Thread to trigger an action once every second. + void* ping(void* actionref) { + while(!stop) { + info_print("Scheduling action."); + schedule(actionref, 0); + sleep(1); + } + return NULL; + } +=} + +reactor Looper(incr:int(1), delay:time(0 msec)) { + input in:int; + input in2:int; + output out:int; + output out2:int; + physical action a(delay); + state count:int(0); + timer t(0, 1 sec); + reaction(startup) -> a {= + // Start the thread that listens for Enter or Return. + lf_thread_t thread_id; + info_print("Starting thread."); + lf_thread_create(&thread_id, &ping, a); + =} + reaction(a) -> out, out2 {= + if (self->count%2 == 0) { + SET(out, self->count); + } else { + SET(out2, self->count); + } + self->count += self->incr; + =} + reaction(in) {= + info_print("Received %d at logical time (%lld, %d).", + in->value, + current_tag.time - start_time, current_tag.microstep + ); + =} + reaction(in2) {= + info_print("Received %d on in2 at logical time (%lld, %d).", + in2->value, + current_tag.time - start_time, current_tag.microstep + ); + =} + reaction(t) {= + info_print("Timer triggered at logical time (%lld, %d).", + current_tag.time - start_time, current_tag.microstep + ); + =} + reaction(shutdown) {= + info_print("******* Shutdown invoked."); + // Stop the thread that is scheduling actions. + stop = true; + if (self->count != 5 * self->incr) { + error_print_and_exit("Failed to receive all five expected inputs."); + } + =} +} +federated reactor (delay:time(0)) { + left = new Looper(); + right = new Looper(incr = -1); + left.out -> right.in; + right.out -> left.in; + right.out2 -> left.in2; + left.out2 -> right.in2; +} \ No newline at end of file diff --git a/test/Python/src/federated/failing/PhysicalSTP.lf b/test/Python/src/federated/failing/PhysicalSTP.lf new file mode 100644 index 0000000000..5b1232db50 --- /dev/null +++ b/test/Python/src/federated/failing/PhysicalSTP.lf @@ -0,0 +1,49 @@ +/** + * This is a test that detects STP violations according to the + * physical time of message arrival. + */ +target Python { + timeout: 1900 msec, + coordination: decentralized +}; + +import Count from "../lib/Count.lf"; + +reactor Print (STP_offset_param(0)) { + preamble {= + import sys + =} + input in_; + state c(1); + reaction(in_) {= + elapsed_time = get_elapsed_logical_time(); + print("At time {}, received {}".format(elapsed_time, in_.value)) + if in_.value != self.c: + self.sys.stderr.write("Expected to receive {}.\n".format(self.c)) + self.sys.exit(1) + STP_discrepency = get_logical_time() + self.STP_offset_param - in_.physical_time_of_arrival + if STP_discrepency < 0: + print("The message has violated the STP offset by {} in physical time.".format(-1 * STP_discrepency)) + self.c += 1 + else: + self.sys.stderr.write("Message arrived {} early.\n".format(STP_discrepency)) + self.sys.exit(1) + =} STP (STP_offset_param) {= + // This STP handler should never be invoked because the only source of event + // for Print is the Count reactor. + self.sys.stderr.write("Logical STP violation was detected. Only physical STP violations are possible.\n") + self.sys.exit(1) + =} + reaction(shutdown) {= + if self.c != 3: + self.sys.stderr.write("Expected to receive 2 items but got {}.\n".format(self.c)) + self.sys.exit(1) + =} +} + +federated reactor { + c = new Count(offset = 1 msec, period = 1 sec); + p = new Print(STP_offset_param = 1 usec); + + c.out -> p.in_; +} \ No newline at end of file diff --git a/test/Python/src/federated/failing/TopLevelArtifacts.lf b/test/Python/src/federated/failing/TopLevelArtifacts.lf new file mode 100644 index 0000000000..cfd1dec321 --- /dev/null +++ b/test/Python/src/federated/failing/TopLevelArtifacts.lf @@ -0,0 +1,56 @@ +/** + * Test whether top-level reactions, actions, and ports are handled appropriately. + * + * Currently, these artifacts are replicated on all federates. + * + * @note This just tests for the correctness of the code generation. These top-level + * artifacts might be disallowed in the future. + */ + + // reason for failing: strange error during compile time. lfc seeems to treat this file as C target. + + target Python { + timeout: 1 msec +}; + + import Count from "../lib/Count.lf"; + import TestCount from "../lib/TestCount.lf"; + + federated reactor { + preamble {= + import sys + =} + input in_; + output out; + state successes(0); + reaction (startup) {= + self.successes += 1; + =} + timer t(0, 1 sec); + reaction (t) -> act {= + self.successes += 1; + act.schedule(0); + =} + logical action act(0); + reaction (act) in_ -> out {= + self.successes += 1; + if in_.is_present: + self.sys.stderr.write("Input is present in the top-level reactor!\n"); + self.sys.exit(1); + out.set(1); + if out.value != 1: + self.sys.stderr.write("Ouput has unexpected value {}!\n".format(out.value)); + self.sys.exit(1); + =} + + c = new Count(); + tc = new TestCount(); + c.out -> tc.in_; + + reaction (shutdown) {= + if self->successes != 3: + self.sys.stderr.write("Failed to properly execute top-level reactions\n"); + self.sys.exit(1); + print("SUCCESS!"); + =} +} \ No newline at end of file diff --git a/test/Python/src/lib/Count.lf b/test/Python/src/lib/Count.lf index ce79ce9151..19f6608ce2 100644 --- a/test/Python/src/lib/Count.lf +++ b/test/Python/src/lib/Count.lf @@ -1,14 +1,10 @@ target Python; -reactor Count { +reactor Count(offset(0), period(1 sec)) { state count(1); output out; - timer t(0, 1 sec); + timer t(offset, period); reaction(t) -> out {= out.set(self.count) self.count += 1 =} -} - -reactor Foo { - -} +} \ No newline at end of file diff --git a/test/Python/src/lib/InternalDelay.lf b/test/Python/src/lib/InternalDelay.lf new file mode 100644 index 0000000000..77d12f612b --- /dev/null +++ b/test/Python/src/lib/InternalDelay.lf @@ -0,0 +1,14 @@ +target Python; +reactor InternalDelay ( + delay(10 msec) +) { + input in_; + output out; + logical action d; + reaction(in_) -> d {= + d.schedule(self.delay, in_.value) + =} + reaction(d) -> out {= + out.set(d.value) + =} +} \ No newline at end of file diff --git a/test/Python/src/lib/LoopedActionSender.lf b/test/Python/src/lib/LoopedActionSender.lf new file mode 100644 index 0000000000..a99b760a14 --- /dev/null +++ b/test/Python/src/lib/LoopedActionSender.lf @@ -0,0 +1,31 @@ +/** + * A sender reactor that outputs integers + * in superdense time. + * + * @author Soroush Bateni + */ +target Python; + +/** + * @param take_a_break_after: Indicates how many messages are sent + * in consecutive superdense time + * @param break_interval: Determines how long the reactor should take + * a break after sending take_a_break_after messages. + */ +reactor Sender(take_a_break_after(10), break_interval(400 msec)) { + output out; + logical action act; + state sent_messages(0); + reaction(startup, act) -> act, out {= + // Send a message on out + out.set(self.sent_messages) + self.sent_messages += 1 + if self.sent_messages < self.take_a_break_after: + act.schedule(0) + else: + // Take a break + self.sent_messages=0; + act.schedule(self.break_interval) + =} +} + \ No newline at end of file diff --git a/test/Python/src/lib/TestCount.lf b/test/Python/src/lib/TestCount.lf new file mode 100644 index 0000000000..314384cca9 --- /dev/null +++ b/test/Python/src/lib/TestCount.lf @@ -0,0 +1,32 @@ +/** + * Test that a counting sequence of inputs starts with the specified start + * parameter value, increments by the specified stride, and receives the + * specified number of inputs. + * + * @param start The starting value for the expected inputs. Default is 1. + * @param stride The increment for the inputs. Default is 1. + * @param num_inputs The number of inputs expected. Default is 1. + */ +target Python; +reactor TestCount(start(1), stride(1), num_inputs(1)) { + preamble {= + import sys + =} + state count(start); + state inputs_received(0); + input in_; + reaction(in_) {= + print("Received {}.".format(in_.value)) + if in_.value != self.count: + print("Expected {}.".format(self.count)) + self.sys.exit(1) + self.count += self.stride; + self.inputs_received += 1; + =} + reaction(shutdown) {= + print("Shutdown invoked.") + if self.inputs_received != self.num_inputs: + print("Expected to receive {} inputs, but got {}.".format(self.num_inputs, self.inputs_received)) + self.sys.exit(1) + =} +} \ No newline at end of file diff --git a/test/Python/src/lib/TestCountMultiport.lf b/test/Python/src/lib/TestCountMultiport.lf new file mode 100644 index 0000000000..d1bb8a29ba --- /dev/null +++ b/test/Python/src/lib/TestCountMultiport.lf @@ -0,0 +1,41 @@ +/** + * Test that a counting sequence of inputs starts with the specified start + * parameter value, increments by the specified stride, and receives the + * specified number of inputs. This version has a multiport input, and + * each input is expected to be present and incremented over the previous + * input. + * + * @param start The starting value for the expected inputs. Default is 1. + * @param stride The increment for the inputs. Default is 1. + * @param num_inputs The number of inputs expected on each channel. Default is 1. + */ +target Python; +reactor TestCountMultiport(start(1), stride(1), num_inputs(1), width(2)) { + preamble {= + import sys + =} + state count(start); + state inputs_received(0); + input[width] inp; + reaction(inp) {= + for i in range(in_width): + if not inp[i].is_present: + print("No input on channel {}.".format(i)) + self.sys.exit(1) + print("Received {} on channel {}.".format(inp[i].value, i)) + if inp[i].value != self.count: + print("Expected {}.".format(self.count)) + self.sys.exit(1) + self.count += self.stride + self.inputs_received += 1 + =} + reaction(shutdown) {= + print("Shutdown invoked.") + if self.inputs_received != self.num_inputs: + print("Expected to receive {} inputs, but only got {}.".format( + self.num_inputs, + self.inputs_received + )) + self.sys.exit(1) + =} +} \ No newline at end of file