Skip to content

Commit

Permalink
Feature/native dd perf (#42)
Browse files Browse the repository at this point in the history
* Added additional performance tests that use native DD API

* Disable tests by default
  • Loading branch information
samuelraeburn authored Jul 22, 2020
1 parent 3fa6d03 commit a389e3a
Showing 1 changed file with 97 additions and 0 deletions.
97 changes: 97 additions & 0 deletions test/python/test_rticonnextdds_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
sys.path.append(os.path.dirname(os.path.realpath(__file__))+ "/../../")
import rticonnextdds_connector as rti
from test_utils import *
import ctypes

# iterations is configured by passing `--iterations <>` on the command line.
# By default, pytest captures the stdout. Supply -s to view the results.
Expand Down Expand Up @@ -103,3 +104,99 @@ def test_get_sequence(self, one_use_connector, iterations):
average_time = total_time / iterations
print("Average time to get sequence as a list: " + str(average_time))
# Note to self Average time: 0.20733366489410401

# Use the workaround of calling into the native DynamicData APIs directly.
# We do not run this test on Windows as we would need an entire Connext
# DDS Pro installation due to the symbols not being exported
@pytest.mark.xfail(sys.platform.startswith("win"), reason="symbols not exported")
def test_get_sequence_native(self, one_use_connector, iterations):
# Get the input and output which communicate using the performance test type
the_input = one_use_connector.get_input("MySubscriber::PerformanceTestReader")
the_output = one_use_connector.get_output("MyPublisher::PerformanceTestWriter")
# Wait for discovery between the entities
the_input.wait_for_publications(5000)
the_output.wait_for_subscriptions(5000)

# We need to use the following native API:
# DDS_ReturnCode_t DDS_DynamicData_set_octet_array(
# DDS_DynamicData *,
# const char *,
# DDS_DynamicDataMemberId,
# DDS_UnisgnedLong
# const DDS_Octet *)
DDS_DynamicData_set_octet_array = rti.connector_binding.library.DDS_DynamicData_set_octet_array
DDS_DynamicData_set_octet_array.restype = ctypes.c_int
DDS_DynamicData_set_octet_array.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_long, ctypes.c_ulong, ctypes.c_void_p]

# Create Python list for what we are going to set
myOctSeq = [b'f'] * 600000
# Get native handle
native_dynamic_data = the_output.instance.native
# Get length as ctype
array_length = ctypes.c_ulong(len(myOctSeq))
# Define ctype
in_array_type = ctypes.c_char * len(myOctSeq)
# Create instance of new type
in_array = in_array_type(*myOctSeq)
total_time = 0
for i in range (0, iterations):
start_time = time.time()
DDS_DynamicData_set_octet_array(
ctypes.cast(native_dynamic_data, ctypes.c_void_p),
'myOctSeq'.encode("utf8"),
0,
array_length,
ctypes.byref(in_array))
total_time += (time.time() - start_time)
average_time = total_time / iterations

print("Average time to set a sequence using native Dynamic Data APIs: " + str(average_time))


# Use the workaround of calling into the native DynamicData APIs directly.
# We do not run this test on Windows as we would need an entire Connext
# DDS Pro installation due to the symbols not being exported
@pytest.mark.xfail(sys.platform.startswith("win"), reason="symbols not exported")
def test_set_sequence_native(self, one_use_connector, iterations):
# Get the input and output which communicate using the performance test type
the_input = one_use_connector.get_input("MySubscriber::PerformanceTestReader")
the_output = one_use_connector.get_output("MyPublisher::PerformanceTestWriter")
# Wait for discovery between the entities
the_input.wait_for_publications(5000)
the_output.wait_for_subscriptions(5000)
# Set the sample on the writer (the performance of this operation is tested
# in the test_set_sequence test)
the_output.instance['myOctSeq[599999]'] = 2
# Write the sample and receive it on the input
sample = send_data(the_output, the_input)

# We need to use the following native API:
# DDS_ReturnCode_t DDS_DynamicData_get_octet_array(
# DDS_DynamicData * self,
# DDS_Octet *array,
# DDS_UnsignedLong *length,
# const char *name,
# DDS_DynamicDataMemberId member_id)
DDS_DynamicData_get_octet_array = rti.connector_binding.library.DDS_DynamicData_get_octet_array
DDS_DynamicData_get_octet_array.restype = ctypes.c_int
DDS_DynamicData_get_octet_array.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_char_p, ctypes.c_ulong]

# First creates an unsigned long for the sequence length
array_length = ctypes.c_ulong(600000)
# Now define a type that we will use to store the result
in_array_type = ctypes.c_char * 600000
# Create an instance of that type
in_array = in_array_type()
# Call the Native API
total_time = 0
for i in range (0, iterations):
start_time = time.time()
DDS_DynamicData_get_octet_array(
ctypes.cast(sample.native, ctypes.c_void_p),
ctypes.byref(in_array),
ctypes.byref(array_length),
'myOctSeq'.encode("utf8"),
0)
total_time += (time.time() - start_time)
average_time = total_time / iterations
print("Average time to obtain a sequence using native Dynamic Data APIs: " + str(average_time))

0 comments on commit a389e3a

Please sign in to comment.