diff --git a/.gitignore b/.gitignore index de5f673..4897f53 100644 --- a/.gitignore +++ b/.gitignore @@ -48,9 +48,16 @@ GeneratedCode # Generated files srcCxx/shape.* +!srcCxx/shape.idl srcCxx/shapePlugin.* srcCxx/shapeSupport.* +# Generated files Connext Micro +srcCxx/shape_bounded.* +!srcCxx/shape_bounded.idl +srcCxx/shape_boundedPlugin.* +srcCxx/shape_boundedSupport.* + # VSCode default folders .vscode/ build/ diff --git a/generate_xlsx_report.py b/generate_xlsx_report.py index 9a4351e..af54edc 100644 --- a/generate_xlsx_report.py +++ b/generate_xlsx_report.py @@ -19,6 +19,22 @@ import datetime from rtps_test_utilities import log_message import test_suite +from enum import Enum + +class TestStatus(Enum): + """ + Enumeration of the test status. + PASSED: The test has passed + FAILED: The test has failed + PUB_UNSUPPORTED: The test is unsupported for the Publisher + SUB_UNSUPPORTED: The test is unsupported for the Subscriber + PUB_SUB_UNSUPPORTED: The test is unsupported for both Publisher and Subscriber + """ + PASSED = 1 + FAILED = 2 + PUB_UNSUPPORTED = 3 + SUB_UNSUPPORTED = 4 + PUB_SUB_UNSUPPORTED = 5 class XlxsReportArgumentParser: """Class that parse the arguments of the application.""" @@ -70,6 +86,8 @@ def get_company_name(product:str) -> str: def get_product_name(product:str) -> str: """Returns a beautified product name and version""" # set the beautified name and version + if 'connext' in product.lower() and 'micro' in product.lower(): + return 'Connext DDS Micro ' + re.search(r'([\d.]+)', product).group(1) if 'connext' in product.lower(): return 'Connext DDS ' + re.search(r'([\d.]+)', product).group(1) elif 'opendds' in product.lower(): @@ -88,14 +106,15 @@ def get_product_name(product:str) -> str: class JunitAggregatedData: """ - Class that contains the JUnit aggregated data as a tuple of 2 integers - [tests_passed, total_tests]. This identifies one cell in the summary - table that shows the product and the amount of tests passed and total. + Class that contains the JUnit aggregated data as a tuple of 3 integers + [tests_passed, total_tests, tests_unsupported]. This identifies one cell in + the summary table that shows the product and the amount of tests passed, + total and unsupported. """ - data: tuple[int,int] # [tests_passed, total_tests] + data: tuple[int,int, int] # [tests_passed, total_tests, tests_unsupported] - def __init__(self, passed_tests: int, total_tests: int) -> None: - self.data = [passed_tests, total_tests] + def __init__(self, passed_tests: int, total_tests: int, unsupported_tests: int) -> None: + self.data = [passed_tests, total_tests, unsupported_tests] def get_passed_tests(self): return self.data[0] @@ -103,8 +122,14 @@ def get_passed_tests(self): def get_total_tests(self): return self.data[1] + def get_unsupported_tests(self): + return self.data[2] + + def get_supported_tests(self): + return self.data[1] - self.data[2] + def __str__(self) -> str: - return f'({self.data[0]}, {self.data[1]})' + return f'({self.data[0]}, {self.data[1]}, {self.data[2]})' class JunitTestCaseAggregatedData: """ @@ -113,14 +138,13 @@ class JunitTestCaseAggregatedData: Publisher or Subscriber) and with all other products (as Subscribers or Publishers, the opposite). This tuple is composed by 2 strings that identifies the other product - (Publisher or Subscriber), the test name and whether the test was - successful or not. + (Publisher or Subscriber), the test name and the status of the test. """ - # [publisher or subscriber name, test_name, passed_tests] - data: tuple[str,str,bool] = None + # [publisher or subscriber name, test_name, status] + data: tuple[str,str,TestStatus] = None - def __init__(self, product: str, test_name: str, passed: bool) -> None: - self.data = (product, test_name, passed) + def __init__(self, product: str, test_name: str, status: TestStatus) -> None: + self.data = (product, test_name, status) def get_product_name(self): return self.data[0] @@ -128,7 +152,7 @@ def get_product_name(self): def get_test_name(self): return self.data[1] - def get_passed(self): + def get_status(self): return self.data[2] def __str__(self) -> str: @@ -166,6 +190,7 @@ def __init__(self, input: pathlib.Path): @staticmethod def xml_parser(file): """Function to parse the XML file""" + parser = lxml.etree.XMLParser(huge_tree=True) return lxml.etree.parse(file, parser) @@ -182,6 +207,7 @@ def update_value_aggregated_data_dict(self, updated_data = JunitAggregatedData( dictionary[key].get_passed_tests() + value.get_passed_tests(), dictionary[key].get_total_tests() + value.get_total_tests(), + dictionary[key].get_unsupported_tests() + value.get_unsupported_tests() ) dictionary[key] = updated_data else: @@ -218,47 +244,47 @@ def get_info(self, input: pathlib.Path = None): publisher_name = ProductUtils.get_product_name(product_names.group(1)) subscriber_name = ProductUtils.get_product_name(product_names.group(2)) - # get the value of the passed_tests and total_tests as a - # JunitAggregatedData - element = JunitAggregatedData( - suite.tests - suite.failures - suite.skipped - suite.errors, - suite.tests - ) - - # update the information of the product in the summary_dict with - # the information of the publisher and the subscriber - self.update_value_aggregated_data_dict( - self.summary_dict, publisher_name, element) - # do not add duplicated data if the publisher and subscriber names - # are the same - if publisher_name != subscriber_name: - self.update_value_aggregated_data_dict( - self.summary_dict, subscriber_name, element) - - # Get table with the summary of the test passed/total_tests for - # every product as publisher and as subscriber - product_dict_key = (publisher_name, subscriber_name) - product_test_data = JunitAggregatedData( - suite.tests - suite.failures - suite.skipped - suite.errors, - suite.tests) - self.update_value_aggregated_data_dict( - self.product_summary_dict, - product_dict_key, - product_test_data) - # for each test case in the test suite, fill out the dictionaries # that contains information about the product as publisher and # subscriber + unsupported_tests_count = 0 for case in list(iter(suite)): + is_pub_unsupported = False + is_sub_unsupported = False + status = None test_name = re.search(r'((?:Test_)[\S]+_\d+)', case.name).group(1) + # count number of unsupported tests for the summary + # result array is not empty and the message contains 'UNSUPPORTED_FEATURE' + if case.result and len(case.result) > 0: + if 'PUB_UNSUPPORTED_FEATURE' in case.result[0].message.upper(): + is_pub_unsupported = True + if 'SUB_UNSUPPORTED_FEATURE' in case.result[0].message.upper(): + is_sub_unsupported = True + + if is_pub_unsupported or is_sub_unsupported: + unsupported_tests_count += 1 + + # Get test status + if case.is_passed: + status = TestStatus.PASSED + elif is_pub_unsupported and is_sub_unsupported: + status = TestStatus.PUB_SUB_UNSUPPORTED + elif is_pub_unsupported: + status = TestStatus.PUB_UNSUPPORTED + elif is_sub_unsupported: + status = TestStatus.SUB_UNSUPPORTED + else: + status = TestStatus.FAILED + + # update the value of the publisher_name as publisher with # all products as subscribers. - # the tuple is (subscriber_name, test_name, is_passed) + # the tuple is (subscriber_name, test_name, status) publisher_test_result = JunitTestCaseAggregatedData( product=subscriber_name, test_name=test_name, - passed=case.is_passed + status=status ) # add the resulting tuple to the publisher dictionary, the key @@ -272,11 +298,11 @@ def get_info(self, input: pathlib.Path = None): # update the value of the subscriber_name as subscriber with # all products as publishers. - # the tuple is (publisher_name, test_name, is_passed) + # the tuple is (publisher_name, test_name, status) subscriber_test_result = JunitTestCaseAggregatedData( product=publisher_name, test_name=test_name, - passed=case.is_passed + status=status ) # add the resulting tuple to the subscriber dictionary, the key @@ -288,6 +314,37 @@ def get_info(self, input: pathlib.Path = None): product_dict=self.subscriber_product_dict ) + # get the value of the passed_tests, total_tests and + # unsupported_tests as a JunitAggregatedData + element = JunitAggregatedData( + suite.tests - suite.failures - suite.skipped - suite.errors, + suite.tests, + unsupported_tests_count + ) + + # update the information of the product in the summary_dict with + # the information of the publisher and the subscriber + self.update_value_aggregated_data_dict( + self.summary_dict, publisher_name, element) + # do not add duplicated data if the publisher and subscriber names + # are the same + if publisher_name != subscriber_name: + self.update_value_aggregated_data_dict( + self.summary_dict, subscriber_name, element) + + # Get table with the summary of the test + # passed/total_tests/unsupported_tests for every product as + # publisher and as subscriber + product_dict_key = (publisher_name, subscriber_name) + product_test_data = JunitAggregatedData( + suite.tests - suite.failures - suite.skipped - suite.errors, + suite.tests, + unsupported_tests_count) + self.update_value_aggregated_data_dict( + self.product_summary_dict, + product_dict_key, + product_test_data) + class ColorUtils: """Set specific colors""" GREEN = '#4EB168' @@ -422,7 +479,13 @@ def get_format_color(self, index: int, num_elements: int): Return the corresponding color format depending on the ratio of passed_tests/total_tests """ + # this might only happen for supported tests when the total supported + # scenarios is 0 + if num_elements == 0: + return self.__formats['result_red'] + ratio = index / num_elements + if ratio < 0.25: return self.__formats['result_red'] elif ratio < 0.5: @@ -434,17 +497,20 @@ def get_format_color(self, index: int, num_elements: int): else: # ratio == 1 return self.__formats['result_green'] - def get_format_color_bool(self, passed: bool): + def get_format_color_test_status(self, status: TestStatus): """ - Get the corresponding color format depending on 'passed'. - Green if passed is True, Red otherwise + Get the corresponding color format depending on 'status'. + Green if status is PASSED, Red if FAILED, Yellow if UNSUPPORTED """ - if passed: + if status == TestStatus.PASSED: # Return GREEN - return self.get_format_color(1,1) + return self.__formats['result_green'] + elif status == TestStatus.FAILED: + # Return RED + return self.__formats['result_red'] else: - # Return FALSE - return self.get_format_color(0,1) + # Return YELLOW + return self.__formats['result_yellow'] def add_static_data_test(self, worksheet: xlsxwriter.Workbook.worksheet_class, @@ -604,7 +670,7 @@ def add_product_table(self, 'Test', self.__formats['bold_w_border']) - # This column dictionary will keep the colum for the subscriber product + # This column dictionary will keep the column for the subscriber product column_dict = {} row_dict = {} # for all elements (test results), add the corresponding value to the @@ -645,13 +711,26 @@ def add_product_table(self, element.get_test_name(), self.__formats['bold_w_border']) - # set OK or ERROR if the test passed or not - str_result = 'OK' if element.get_passed() else 'ERROR' + # get status string of the test result + if element.get_status() == TestStatus.PASSED: + str_result = 'OK' + elif element.get_status() == TestStatus.FAILED: + str_result = 'ERROR' + elif element.get_status() == TestStatus.PUB_UNSUPPORTED: + str_result = 'PUB UNSUPPORTED' + elif element.get_status() == TestStatus.SUB_UNSUPPORTED: + str_result = 'SUB UNSUPPORTED' + elif element.get_status() == TestStatus.PUB_SUB_UNSUPPORTED: + str_result = 'PUB/SUB UNSUPPORTED' + else: + str_result = 'UNKNOWN' + + # write status string to the test result worksheet.write( process_row, process_column, str_result, - self.get_format_color_bool(element.get_passed())) + self.get_format_color_test_status(element.get_status())) return (current_row, current_column) def add_data_summary_worksheet(self, @@ -673,33 +752,60 @@ def add_data_summary_worksheet(self, 'Product', self.__formats['bold_w_border']) worksheet.write( current_row, current_column + 2, - 'Test Passed', self.__formats['bold_w_border']) + 'Tests Passed', self.__formats['bold_w_border']) + worksheet.write( + current_row, current_column + 3, + 'Supported Tests', self.__formats['bold_w_border']) + worksheet.write( + current_row, current_column + 4, + 'Supported Tests Passed', self.__formats['bold_w_border']) current_row += 1 # Create table with the total passed_tests/total_tests per product for product_name, value in self.__data.summary_dict.items(): + # company name worksheet.write( current_row, current_column, ProductUtils.get_company_name(product_name), self.__formats['bold_w_border']) + # product name worksheet.write( current_row, current_column + 1, product_name, self.__formats['bold_w_border']) + # test passed worksheet.write( current_row, current_column + 2, str(value.get_passed_tests()) + ' / ' + - str(value.get_total_tests()), + str(value.get_total_tests()), self.get_format_color(value.get_passed_tests(), value.get_total_tests())) + # supported tests + worksheet.write( + current_row, current_column + 3, + str(value.get_supported_tests()) + ' / ' + + str(value.get_total_tests()), + self.__formats['result_yellow'] if value.get_unsupported_tests() > 0 + else self.__formats['result_green']) + # supported tests passed + worksheet.write( + current_row, current_column + 4, + str(value.get_passed_tests()) + ' / ' + + str(value.get_supported_tests()), + self.get_format_color(value.get_passed_tests(), + value.get_supported_tests())) current_row += 1 # Add 2 rows of gap for the next table current_row += 2 worksheet.write( current_row, current_column, - 'Publisher/Subscriber', self.__formats['bold_w_border']) + 'Test Result: passed / supported / total', self.__formats['bold_w_border']) + current_row += 1 + worksheet.write( + current_row, current_column, + 'Publisher (row)/Subscriber (column)', self.__formats['bold_w_border']) # create a dictionary to store the row/column of the product name # for example, row_dict['Connext DDS 6.1.2'] = 30 means that the @@ -743,9 +849,10 @@ def add_data_summary_worksheet(self, process_column = column_dict[subscriber_name] worksheet.write(process_row, process_column, - str(value.get_passed_tests()) + ' / ' + - str(value.get_total_tests()), - self.get_format_color(value.get_passed_tests(), value.get_total_tests())) + str(value.get_passed_tests()) + ' / ' + + str(value.get_supported_tests()) + ' / ' + + str(value.get_total_tests()), + self.get_format_color(value.get_passed_tests(), value.get_supported_tests())) def add_static_data_summary_worksheet(self, worksheet: xlsxwriter.Workbook.worksheet_class, diff --git a/interoperability_report.py b/interoperability_report.py index 51c2eaa..9c1bfed 100644 --- a/interoperability_report.py +++ b/interoperability_report.py @@ -24,7 +24,7 @@ if __name__ == "__main__" and platform.system() == "Darwin": multiprocessing.set_start_method('fork') -from rtps_test_utilities import ReturnCode, log_message, no_check, remove_ansi_colors +from rtps_test_utilities import ReturnCode, log_message, basic_check, remove_ansi_colors # This parameter is used to save the samples the Publisher sends. # MAX_SAMPLES_SAVED is the maximum number of samples saved. @@ -51,16 +51,19 @@ def stop_process(child_process, timeout=30, poll_interval=0.2): else: return True # Process already exited - start_time = time.time() + return_value = True + start_time = time.time() while child_process.isalive() and (time.time() - start_time < timeout): time.sleep(poll_interval) if child_process.isalive(): child_process.terminate(force=True) - return False # Process was forcefully terminated + return_value = False # Process was forcefully terminated + + child_process.expect(pexpect.EOF, timeout=5) - return True + return return_value def run_subscriber_shape_main( name_executable: str, @@ -138,14 +141,17 @@ def run_subscriber_shape_main( index = child_sub.expect( [ 'Create topic:', # index = 0 - pexpect.TIMEOUT, # index = 1 - pexpect.EOF # index = 2 + re.compile('not supported', re.IGNORECASE), # index = 1 + pexpect.TIMEOUT, # index = 2 + pexpect.EOF # index = 3 ], timeout ) - if index == 1 or index == 2: + if index == 2 or index == 3: produced_code[produced_code_index] = ReturnCode.TOPIC_NOT_CREATED + elif index == 1: + produced_code[produced_code_index] = ReturnCode.SUB_UNSUPPORTED_FEATURE elif index == 0: # Step 3: Check if the reader is created log_message(f'Subscriber {subscriber_index}: Waiting for DataReader ' @@ -153,25 +159,33 @@ def run_subscriber_shape_main( index = child_sub.expect( [ 'Create reader for topic:', # index = 0 - pexpect.TIMEOUT, # index = 1 - 'failed to create content filtered topic' # index = 2 + 'failed to create content filtered topic', # index = 1 + re.compile('not supported', re.IGNORECASE), # index = 2 + pexpect.TIMEOUT, # index = 3 + pexpect.EOF # index = 4 + ], timeout ) - if index == 1: + if index == 3 or index == 4: produced_code[produced_code_index] = ReturnCode.READER_NOT_CREATED - elif index == 2: + elif index == 1: produced_code[produced_code_index] = ReturnCode.FILTER_NOT_CREATED + elif index == 2: + produced_code[produced_code_index] = ReturnCode.SUB_UNSUPPORTED_FEATURE elif index == 0: # Step 4: Read data or incompatible qos or deadline missed log_message(f'Subscriber {subscriber_index}: Waiting for data', verbosity) index = child_sub.expect( [ - '\[[0-9]+\]', # index = 0 + r'\[[0-9]+\]', # index = 0 'on_requested_incompatible_qos()', # index = 1 'on_requested_deadline_missed()', # index = 2 - pexpect.TIMEOUT, # index = 3 + re.compile('not supported', re.IGNORECASE), # index = 3 + pexpect.TIMEOUT, # index = 4 + pexpect.EOF # index = 5 + ], timeout ) @@ -180,8 +194,10 @@ def run_subscriber_shape_main( produced_code[produced_code_index] = ReturnCode.INCOMPATIBLE_QOS elif index == 2: produced_code[produced_code_index] = ReturnCode.DEADLINE_MISSED - elif index == 3: + elif index == 4 or index == 5: produced_code[produced_code_index] = ReturnCode.DATA_NOT_RECEIVED + elif index == 3: + produced_code[produced_code_index] = ReturnCode.SUB_UNSUPPORTED_FEATURE elif index == 0: # Step 5: Receiving samples log_message(f'Subscriber {subscriber_index}: Receiving samples', @@ -277,14 +293,17 @@ def run_publisher_shape_main( index = child_pub.expect( [ 'Create topic:', # index == 0 - pexpect.TIMEOUT, # index == 1 - pexpect.EOF # index == 2 + re.compile('not supported', re.IGNORECASE), # index = 1 + pexpect.TIMEOUT, # index == 2 + pexpect.EOF # index == 3 ], timeout ) - if index == 1 or index == 2: + if index == 2 or index == 3: produced_code[produced_code_index] = ReturnCode.TOPIC_NOT_CREATED + elif index == 1: + produced_code[produced_code_index] = ReturnCode.PUB_UNSUPPORTED_FEATURE elif index == 0: # Step 3: Check if the writer is created log_message(f'Publisher {publisher_index}: Waiting for DataWriter ' @@ -292,12 +311,16 @@ def run_publisher_shape_main( index = child_pub.expect( [ 'Create writer for topic', # index = 0 - pexpect.TIMEOUT # index = 1 + re.compile('not supported', re.IGNORECASE), # index = 1 + pexpect.TIMEOUT, # index = 2 + pexpect.EOF # index == 3 ], timeout ) - if index == 1: + if index == 2 or index == 3: produced_code[produced_code_index] = ReturnCode.WRITER_NOT_CREATED + elif index == 1: + produced_code[produced_code_index] = ReturnCode.PUB_UNSUPPORTED_FEATURE elif index == 0: # Step 4: Check if the writer matches the reader log_message(f'Publisher {publisher_index}: Waiting for matching ' @@ -305,15 +328,19 @@ def run_publisher_shape_main( index = child_pub.expect( [ 'on_publication_matched()', # index = 0 - pexpect.TIMEOUT, # index = 1 - 'on_offered_incompatible_qos' # index = 2 + 'on_offered_incompatible_qos', # index = 1 + re.compile('not supported', re.IGNORECASE), # index = 2 + pexpect.TIMEOUT, # index = 3 + pexpect.EOF # index == 4 ], timeout ) - if index == 1: + if index == 3 or index == 4: produced_code[produced_code_index] = ReturnCode.READER_NOT_MATCHED - elif index == 2: + elif index == 1: produced_code[produced_code_index] = ReturnCode.INCOMPATIBLE_QOS + elif index == 2: + produced_code[produced_code_index] = ReturnCode.PUB_UNSUPPORTED_FEATURE elif index == 0: # In the case that the option -w is selected, the Publisher # saves the samples sent in order, so the Subscriber can check @@ -324,15 +351,19 @@ def run_publisher_shape_main( if '-w ' in parameters or parameters.endswith('-w'): # Step 5: Check whether the writer sends the samples index = child_pub.expect([ - '\[[0-9]+\]', # index = 0 + r'\[[0-9]+\]', # index = 0 'on_offered_deadline_missed()', # index = 1 - pexpect.TIMEOUT # index = 2 + re.compile('not supported', re.IGNORECASE), # index = 2 + pexpect.TIMEOUT, # index = 3 + pexpect.EOF # index == 4 ], timeout) if index == 1: produced_code[produced_code_index] = ReturnCode.DEADLINE_MISSED - elif index == 2: + elif index == 3 or index == 4: produced_code[produced_code_index] = ReturnCode.DATA_NOT_SENT + elif index == 2: + produced_code[produced_code_index] = ReturnCode.PUB_UNSUPPORTED_FEATURE elif index == 0: produced_code[produced_code_index] = ReturnCode.OK log_message(f'Publisher {publisher_index}: Sending ' @@ -341,20 +372,24 @@ def run_publisher_shape_main( for x in range(0, MAX_SAMPLES_SAVED, 1): # At this point, at least one sample has been printed # Therefore, that sample is added to samples_sent. - pub_string = re.search('[0-9]+ [0-9]+ \[[0-9]+\]', + pub_string = re.search(r'[0-9]+ [0-9]+ \[[0-9]+\]', child_pub.before + child_pub.after) last_sample = pub_string.group(0) samples_sent.put(last_sample) index = child_pub.expect([ - '\[[0-9]+\]', # index = 0 + r'\[[0-9]+\]', # index = 0 'on_offered_deadline_missed()', # index = 1 - pexpect.TIMEOUT # index = 2 + re.compile('not supported', re.IGNORECASE), # index = 2 + pexpect.TIMEOUT # index = 3 ], timeout) if index == 1: produced_code[produced_code_index] = ReturnCode.DEADLINE_MISSED break elif index == 2: + produced_code[produced_code_index] = ReturnCode.PUB_UNSUPPORTED_FEATURE + break + elif index == 3: produced_code[produced_code_index] = ReturnCode.DATA_NOT_SENT break last_sample_saved.put(last_sample) @@ -811,7 +846,7 @@ def main(): raise RuntimeError('Cannot process function of ' f'test case: {test_case_name}') else: - check_function = no_check + check_function = basic_check assert(len(parameters) == len(expected_codes)) diff --git a/rtps_test_utilities.py b/rtps_test_utilities.py index 7e9068e..4b2e746 100644 --- a/rtps_test_utilities.py +++ b/rtps_test_utilities.py @@ -28,6 +28,8 @@ class ReturnCode(Enum): DEADLINE_MISSED : Publisher/Subscriber missed the deadline period ORDERED_ACCESS_INSTANCE : Subscriber reading with ordered access and access scope INSTANCE ORDERED_ACCESS_TOPIC : Subscriber reading with ordered access and access scope TOPIC + PUB_UNSUPPORTED_FEATURE : The test requires a feature not supported by the publisher implementation + SUB_UNSUPPORTED_FEATURE : The test requires a feature not supported by the subscriber implementation """ OK = 0 TOPIC_NOT_CREATED = 1 @@ -44,6 +46,8 @@ class ReturnCode(Enum): DEADLINE_MISSED = 14 ORDERED_ACCESS_INSTANCE = 15 ORDERED_ACCESS_TOPIC = 16 + PUB_UNSUPPORTED_FEATURE = 17 + SUB_UNSUPPORTED_FEATURE = 18 def log_message(message, verbosity): if verbosity: @@ -56,3 +60,18 @@ def remove_ansi_colors(text): def no_check(child_sub, samples_sent, last_sample_saved, timeout): return ReturnCode.OK + +def basic_check(child_sub, samples_sent, last_sample_saved, timeout): + """ Only checks that the data is well formed and size is not zero.""" + sub_string = re.search('\w\s+\w+\s+[0-9]+ [0-9]+ \[([0-9]+)\]', + child_sub.before + child_sub.after) + + if sub_string is None: + return ReturnCode.DATA_NOT_RECEIVED + + sample_size = int(sub_string.group(1)) + + if sample_size == 0: + return ReturnCode.DATA_NOT_CORRECT + + return ReturnCode.OK diff --git a/run_tests.sh b/run_tests.sh index be90102..e12659d 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -81,7 +81,7 @@ for i in $publisher; do subscriber_name=$(basename "$j" _shape_main_linux) echo "Testing Publisher $publisher_name --- Subscriber $subscriber_name" extra_args="" - if [[ "${subscriber,,}" == *opendds* && "${publisher,,}" == *connext* ]]; then + if [[ "${subscriber_name,,}" == *opendds* && "${publisher_name,,}" == *connext_dds* ]]; then extra_args="--periodic-announcement 5000" fi; if [[ -n $output ]]; then diff --git a/srcCxx/makefile_rti_connext_micro_linux b/srcCxx/makefile_rti_connext_micro_linux new file mode 100644 index 0000000..1475c32 --- /dev/null +++ b/srcCxx/makefile_rti_connext_micro_linux @@ -0,0 +1,89 @@ +###################################################################### +# To compile, type: +# make -f makefile_rti_connext_micro_linux +# To compile with the Debug option, use: +# make -f makefile_rti_connext_micro_linux DEBUG=1 +# +# This makefile assumes that your build environment is already correctly +# configured. (For example, the correct version of your compiler and +# linker should be on your PATH.) +# +# You should set the environemnt variable RTIMEHOME to point to where +# RTI Connext Micro is installed. +# +###################################################################### + +# If undefined in the environment default RTIMEHOME to install dir +ifndef RTIMEHOME +$(error RTIMEHOME not defined) +endif + +COMPILER_FLAGS = -m64 +LINKER_FLAGS = -m64 -static-libgcc + +split_path_name = $(subst /rti_, , $(RTIMEHOME)) +# from connext_dds_micro-x.y.z remove _dds to get connext_micro-x.y.z +product_name = $(notdir $(split_path_name)) +product_name := $(subst _dds,,$(product_name)) + +version_name = $(lastword $(product_name)) +common_name = "_shape_main_linux" +executable_name = $(version_name)$(common_name) + +RTIMEARCH = x64Linux4gcc7.3.0 + +ifndef COMPILER +COMPILER = g++ +endif + +ifndef LINKER +LINKER = g++ +endif + +SYSLIBS = -ldl -lnsl -lm -lpthread -lrt + +ifeq ($(DEBUG),1) +COMPILER_FLAGS += -g -O0 +LINKER_FLAGS += -g +LIBS = -L$(RTIMEHOME)/lib/$(RTIMEARCH) \ + -lrti_me_cppzd -lrti_me_netiosdmzd \ + -lrti_me_discdpdezd -lrti_me_ddsfilterzd -lrti_me_rhsmzd \ + -lrti_me_whsmzd -lrti_mezd -lrti_me_ddsxtypeszd $(SYSLIBS) +else +# This option strips the executable symbols +LINKER_FLAGS += -s +LIBS = -L$(RTIMEHOME)/lib/$(RTIMEARCH) \ + -lrti_me_cppz -lrti_me_netiosdmz \ + -lrti_me_discdpdez -lrti_me_ddsfilterz -lrti_me_rhsmz \ + -lrti_me_whsmz -lrti_mez -lrti_me_ddsxtypesz $(SYSLIBS) +endif + +DEFINES = -DRTI_UNIX -DRTI_LINUX -DRTI_CONNEXT_MICRO + +INCLUDES = -I. -I$(RTIMEHOME)/include -I$(RTIMEHOME)/include/rti_me + +OBJDIR := objs/$(RTIMEARCH)_micro + +CDRSOURCES := shape_bounded.idl +AUTOGENSOURCES := shape_boundedSupport.cxx shape_boundedPlugin.cxx shape_bounded.cxx + +EXEC := $(executable_name) +AUTOGENOBJS := $(addprefix $(OBJDIR)/, $(AUTOGENSOURCES:%.cxx=%.o)) + +$(OBJDIR)/$(EXEC) : $(AUTOGENSOURCES) $(AUTOGENOBJS) $(OBJDIR)/shape_main.o + $(LINKER) $(LINKER_FLAGS) -o $@ $(OBJDIR)/shape_main.o $(AUTOGENOBJS) $(LIBS) + +$(OBJDIR)/%.o : %.cxx + $(COMPILER) $(COMPILER_FLAGS) -o $@ $(DEFINES) $(INCLUDES) -c $< + +shape_main.cxx : shape_configurator_rti_connext_micro.h + +# Generate type-specific sources +$(AUTOGENSOURCES) : $(CDRSOURCES) + $(RTIMEHOME)/rtiddsgen/scripts/rtiddsgen $(CDRSOURCES) -replace -micro -language C++ + +$(AUTOGENOBJS): | objs/$(RTIMEARCH)_micro + +objs/$(RTIMEARCH)_micro: + echo "Making directory objs/$(RTIMEARCH)_micro"; + mkdir -p objs/$(RTIMEARCH)_micro diff --git a/srcCxx/makefile_rti_connext_micro_macos b/srcCxx/makefile_rti_connext_micro_macos new file mode 100644 index 0000000..25ca9b5 --- /dev/null +++ b/srcCxx/makefile_rti_connext_micro_macos @@ -0,0 +1,89 @@ +###################################################################### +# To compile, type: +# make -f makefile_rti_connext_micro_linux +# To compile with the Debug option, use: +# make -f makefile_rti_connext_micro_linux DEBUG=1 +# +# This makefile assumes that your build environment is already correctly +# configured. (For example, the correct version of your compiler and +# linker should be on your PATH.) +# +# You should set the environemnt variable RTIMEHOME to point to where +# RTI Connext Micro is installed. +# +###################################################################### + +# If undefined in the environment default RTIMEHOME to install dir +ifndef RTIMEHOME +$(error RTIMEHOME not defined) +endif + +COMPILER_FLAGS = -std=c++11 +LINKER_FLAGS = + +split_path_name = $(subst /rti_, , $(RTIMEHOME)) +# from connext_dds_micro-x.y.z remove _dds to get connext_micro-x.y.z +product_name = $(notdir $(split_path_name)) +product_name := $(subst _dds,,$(product_name)) + +version_name = $(lastword $(product_name)) +common_name = "_shape_main_macos" +executable_name = $(version_name)$(common_name) + +RTIMEARCH = arm64Darwin23clang15.0 + +ifndef COMPILER +COMPILER = clang++ +endif + +ifndef LINKER +LINKER = clang++ +endif + +#SYSLIBS = + +ifeq ($(DEBUG),1) +COMPILER_FLAGS += -g -O0 +LINKER_FLAGS += -g +LIBS = -L$(RTIMEHOME)/lib/$(RTIMEARCH) \ + -lrti_me_cppzd -lrti_me_netiosdmzd \ + -lrti_me_discdpdezd -lrti_me_ddsfilterzd -lrti_me_rhsmzd \ + -lrti_me_whsmzd -lrti_mezd -lrti_me_ddsxtypeszd $(SYSLIBS) +else +# This option strips the executable symbols +#LINKER_FLAGS += -s +LIBS = -L$(RTIMEHOME)/lib/$(RTIMEARCH) \ + -lrti_me_cppz -lrti_me_netiosdmz \ + -lrti_me_discdpdez -lrti_me_ddsfilterz -lrti_me_rhsmz \ + -lrti_me_whsmz -lrti_mez -lrti_me_ddsxtypesz $(SYSLIBS) +endif + +DEFINES = -DRTI_UNIX -DRTI_DARWIN -DRTI_CONNEXT_MICRO + +INCLUDES = -I. -I$(RTIMEHOME)/include -I$(RTIMEHOME)/include/rti_me + +OBJDIR := objs/$(RTIMEARCH)_micro + +CDRSOURCES := shape_bounded.idl +AUTOGENSOURCES := shape_boundedSupport.cxx shape_boundedPlugin.cxx shape_bounded.cxx + +EXEC := $(executable_name) +AUTOGENOBJS := $(addprefix $(OBJDIR)/, $(AUTOGENSOURCES:%.cxx=%.o)) + +$(OBJDIR)/$(EXEC) : $(AUTOGENSOURCES) $(AUTOGENOBJS) $(OBJDIR)/shape_main.o + $(LINKER) $(LINKER_FLAGS) -o $@ $(OBJDIR)/shape_main.o $(AUTOGENOBJS) $(LIBS) + +$(OBJDIR)/%.o : %.cxx + $(COMPILER) $(COMPILER_FLAGS) -o $@ $(DEFINES) $(INCLUDES) -c $< + +shape_main.cxx : shape_configurator_rti_connext_micro.h + +# Generate type-specific sources +$(AUTOGENSOURCES) : $(CDRSOURCES) + $(RTIMEHOME)/rtiddsgen/scripts/rtiddsgen $(CDRSOURCES) -replace -micro -language C++ + +$(AUTOGENOBJS): | objs/$(RTIMEARCH)_micro + +objs/$(RTIMEARCH)_micro: + echo "Making directory objs/$(RTIMEARCH)_micro"; + mkdir -p objs/$(RTIMEARCH)_micro diff --git a/srcCxx/shape_bounded.idl b/srcCxx/shape_bounded.idl new file mode 100644 index 0000000..3ffde7e --- /dev/null +++ b/srcCxx/shape_bounded.idl @@ -0,0 +1,9 @@ +@appendable +struct ShapeType { + @key + string<128> color; + long x; + long y; + long shapesize; + sequence additional_payload_size; +}; diff --git a/srcCxx/shape_configurator_rti_connext_dds.h b/srcCxx/shape_configurator_rti_connext_dds.h index 590cbd0..e2a905f 100644 --- a/srcCxx/shape_configurator_rti_connext_dds.h +++ b/srcCxx/shape_configurator_rti_connext_dds.h @@ -1,3 +1,5 @@ +#include + #include "shape.h" #include "shapeSupport.h" #include "ndds/ndds_namespace_cpp.h" @@ -15,14 +17,43 @@ const char *get_qos_policy_name(DDS_QosPolicyId_t policy_id) return DDS_QosPolicyId_to_string(policy_id); // not standard... } +bool configure_datafrag_size( + DDS::DomainParticipantQos &dp_qos, + size_t datafrag_size) { + bool ok = false; + if (datafrag_size == 0) { + ok = false; + } else { + DDS_PropertyQosPolicyHelper_add_property( + &dp_qos.property, + "dds.transport.UDPv4.builtin.parent.message_size_max", + std::to_string(datafrag_size).c_str(), + DDS_BOOLEAN_FALSE); + ok = true; + } + return ok; +} + void configure_participant_announcements_period( DDS::DomainParticipantQos &dp_qos, useconds_t announcement_period_us) { if (announcement_period_us == 0) { return; } + dp_qos.discovery_config.participant_liveliness_assert_period.sec = announcement_period_us / 1000000; dp_qos.discovery_config.participant_liveliness_assert_period.nanosec = (announcement_period_us % 1000000) * 1000; } + +void configure_large_data(DDS::DataWriterQos &dw_qos) { + if (DDS::PropertyQosPolicyHelper::assert_property( + dw_qos.property, + "dds.data_writer.history.memory_manager.fast_pool.pool_buffer_max_size", + "65536", + DDS_BOOLEAN_FALSE) != DDS_RETCODE_OK) { + printf("failed to set property pool_buffer_max_size\n"); + } + dw_qos.publish_mode.kind = DDS::ASYNCHRONOUS_PUBLISH_MODE_QOS; +} diff --git a/srcCxx/shape_configurator_rti_connext_micro.h b/srcCxx/shape_configurator_rti_connext_micro.h new file mode 100644 index 0000000..46dae67 --- /dev/null +++ b/srcCxx/shape_configurator_rti_connext_micro.h @@ -0,0 +1,285 @@ +#include "shape_bounded.h" +#include "shape_boundedSupport.h" + +#include "rti_me_cpp.hxx" +#include "dds_cpp/dds_cpp_netio.hxx" + +#include +#include + +#define LISTENER_STATUS_MASK_ALL (DDS_STATUS_MASK_ALL) + +#ifndef XCDR_DATA_REPRESENTATION + #define XCDR_DATA_REPRESENTATION DDS_XCDR_DATA_REPRESENTATION +#endif + +#ifndef XCDR2_DATA_REPRESENTATION + #define XCDR2_DATA_REPRESENTATION DDS_XCDR2_DATA_REPRESENTATION +#endif + +#ifndef PresentationQosPolicyAccessScopeKind + #define PresentationQosPolicyAccessScopeKind DDS_PresentationQosPolicyAccessScopeKind +#endif + +#ifndef INSTANCE_PRESENTATION_QOS + #define INSTANCE_PRESENTATION_QOS DDS_INSTANCE_PRESENTATION_QOS +#endif + +#ifndef TOPIC_PRESENTATION_QOS + #define TOPIC_PRESENTATION_QOS DDS_TOPIC_PRESENTATION_QOS +#endif + +#ifndef GROUP_PRESENTATION_QOS + #define GROUP_PRESENTATION_QOS DDS_GROUP_PRESENTATION_QOS +#endif + + +#define DataRepresentationId_t DDS_DataRepresentationId_t +#define DataRepresentationIdSeq DDS_DataRepresentationIdSeq + +typedef CDR_StringSeq StringSeq; + +const DDS::DurabilityQosPolicyKind TRANSIENT_DURABILITY_QOS = DDS_TRANSIENT_DURABILITY_QOS; +const DDS::DurabilityQosPolicyKind PERSISTENT_DURABILITY_QOS = DDS_PERSISTENT_DURABILITY_QOS; + +void StringSeq_push(StringSeq &string_seq, const char *elem) +{ + string_seq.ensure_length(string_seq.length()+1, string_seq.length()+1); + string_seq[string_seq.length()-1] = DDS_String_dup(elem); +} + + +const char* get_qos_policy_name(DDS::QosPolicyId_t policy_id) +{ + //case DDS::USERDATA_QOS_POLICY_ID) { return "USERDATA"; + if (policy_id == DDS::DURABILITY_QOS_POLICY_ID) { return "DURABILITY"; } + else if (policy_id == DDS::PRESENTATION_QOS_POLICY_ID) { return "PRESENTATION"; } + else if (policy_id == DDS::DEADLINE_QOS_POLICY_ID) { return "DEADLINE"; } + else if (policy_id == DDS::LATENCYBUDGET_QOS_POLICY_ID) { return "LATENCYBUDGET"; } + else if (policy_id == DDS::OWNERSHIP_QOS_POLICY_ID) { return "OWNERSHIP"; } + else if (policy_id == DDS::OWNERSHIPSTRENGTH_QOS_POLICY_ID) { return "OWNERSHIPSTRENGTH"; } + else if (policy_id == DDS::LIVELINESS_QOS_POLICY_ID) { return "LIVELINESS"; } + else if (policy_id == DDS::TIMEBASEDFILTER_QOS_POLICY_ID) { return "TIMEBASEDFILTER"; } + else if (policy_id == DDS::PARTITION_QOS_POLICY_ID) { return "PARTITION"; } + else if (policy_id == DDS::RELIABILITY_QOS_POLICY_ID) { return "RELIABILITY"; } + else if (policy_id == DDS::DESTINATIONORDER_QOS_POLICY_ID) { return "DESTINATIONORDER"; } + else if (policy_id == DDS::HISTORY_QOS_POLICY_ID) { return "HISTORY"; } + else if (policy_id == DDS::RESOURCELIMITS_QOS_POLICY_ID) { return "RESOURCELIMITS"; } + else if (policy_id == DDS::ENTITYFACTORY_QOS_POLICY_ID) { return "ENTITYFACTORY"; } + else if (policy_id == DDS::WRITERDATALIFECYCLE_QOS_POLICY_ID) { return "WRITERDATALIFECYCLE"; } + else if (policy_id == DDS::READERDATALIFECYCLE_QOS_POLICY_ID) { return "READERDATALIFECYCLE"; } + else if (policy_id == DDS::TOPICDATA_QOS_POLICY_ID) { return "TOPICDATA"; } + else if (policy_id == DDS::GROUPDATA_QOS_POLICY_ID) { return "GROUPDATA"; } + else if (policy_id == DDS::TRANSPORTPRIORITY_QOS_POLICY_ID) { return "TRANSPORTPRIORITY"; } + else if (policy_id == DDS::LIFESPAN_QOS_POLICY_ID) { return "LIFESPAN"; } + else if (policy_id == DDS::DURABILITYSERVICE_QOS_POLICY_ID) { return "DURABILITYSERVICE"; } + else { return "Unknown"; } +} + +static bool config_micro() +{ + bool ok = false; + RT::Registry *registry = NULL; + DPDE::DiscoveryPluginProperty *discovery_plugin_properties = NULL; + UDP_InterfaceFactoryProperty *udp_property = NULL; + + OSAPI_Log_set_verbosity(OSAPI_LOG_VERBOSITY_SILENT); + + registry = DDSTheParticipantFactory->get_registry(); + + /* Register Writer History */ + if (!registry->register_component("wh", WHSMHistoryFactory::get_interface(), NULL, NULL)) + { + printf("ERROR: unable to register writer history\n"); + goto done; + } + + /* Register Reader History */ + if (!registry->register_component("rh", RHSMHistoryFactory::get_interface(), NULL, NULL)) + { + printf("ERROR: unable to register reader history\n"); + goto done; + } + + /* Configure UDP transport's allowed interfaces */ + if (!registry->unregister(NETIO_DEFAULT_UDP_NAME, NULL, NULL)) + { + printf("ERROR: unable to unregister udp\n"); + goto done; + } + + udp_property = new UDP_InterfaceFactoryProperty(); + if (udp_property == NULL) + { + printf("ERROR: unable to allocate udp properties\n"); + goto done; + } + + udp_property->max_message_size = 64 * 1024; //64KB + + if (!registry->register_component( + NETIO_DEFAULT_UDP_NAME, + UDPInterfaceFactory::get_interface(), + &udp_property->_parent._parent, + NULL)) { + printf("ERROR: unable to register udp\n"); + goto done; + } + + discovery_plugin_properties = new DPDE::DiscoveryPluginProperty(); + + /* Configure properties */ + discovery_plugin_properties->participant_liveliness_assert_period.sec = 5; + discovery_plugin_properties->participant_liveliness_assert_period.nanosec = 0; + discovery_plugin_properties->participant_liveliness_lease_duration.sec = 30; + discovery_plugin_properties->participant_liveliness_lease_duration.nanosec = 0; + + + if (!registry->register_component( + "dpde", + DPDEDiscoveryFactory::get_interface(), + &discovery_plugin_properties->_parent, + NULL)) { + printf("ERROR: unable to register dpde\n"); + goto done; + } + + ok = true; +done: + if (!ok) { + if (udp_property != NULL) { + delete udp_property; + } + if (discovery_plugin_properties != NULL) { + delete discovery_plugin_properties; + } + } + return ok; +} + +static bool configure_datafrag_size(unsigned int datafrag_size) { + + bool ok = false; + RT::Registry *registry = NULL; + UDP_InterfaceFactoryProperty *udp_property = NULL; + + registry = DDSTheParticipantFactory->get_registry(); + + if (!registry->unregister(NETIO_DEFAULT_UDP_NAME, NULL, NULL)) { + printf("ERROR: unable to unregister udp\n"); + goto done; + } + + udp_property = new UDP_InterfaceFactoryProperty(); + if (udp_property == NULL) { + printf("ERROR: unable to allocate udp properties\n"); + goto done; + } + + udp_property->max_message_size = datafrag_size; + + if (!registry->register_component( + NETIO_DEFAULT_UDP_NAME, + UDPInterfaceFactory::get_interface(), + &udp_property->_parent._parent, + NULL)) { + printf("ERROR: unable to register udp\n"); + goto done; + } + ok = true; +done: + if (!ok) { + if (udp_property != NULL) { + delete udp_property; + } + } + return ok; +} + +static bool configure_dp_qos(DDS::DomainParticipantQos &dp_qos) +{ + if (!dp_qos.discovery.discovery.name.set_name("dpde")) + { + printf("ERROR: unable to set discovery plugin name\n"); + return false; + } + + dp_qos.discovery.initial_peers.maximum(2); + dp_qos.discovery.initial_peers.length(2); + dp_qos.discovery.initial_peers[0] = DDS_String_dup("127.0.0.1"); + dp_qos.discovery.initial_peers[1] = DDS_String_dup("_udp://239.255.0.1"); + + /* if there are more remote or local endpoints, you need to increase these limits */ + dp_qos.resource_limits.max_destination_ports = 32; + dp_qos.resource_limits.max_receive_ports = 32; + dp_qos.resource_limits.local_topic_allocation = 8; + dp_qos.resource_limits.local_type_allocation = 8; + + dp_qos.resource_limits.local_reader_allocation = 8; + dp_qos.resource_limits.local_writer_allocation = 8; + dp_qos.resource_limits.remote_participant_allocation = 16; + dp_qos.resource_limits.remote_reader_allocation = 16; + dp_qos.resource_limits.remote_writer_allocation = 16; + return true; +} + +void config_dw_qos(DDS::DataWriterQos &dw_qos) { + dw_qos.resource_limits.max_instances = 500; + dw_qos.resource_limits.max_samples = 500; + dw_qos.resource_limits.max_samples_per_instance = 500; +} + +void config_dr_qos(DDS::DataReaderQos &dr_qos) { + dr_qos.resource_limits.max_instances = 500; + dr_qos.resource_limits.max_samples = 500; + dr_qos.resource_limits.max_samples_per_instance = 500; + dr_qos.reader_resource_limits.max_remote_writers = 16; + dr_qos.reader_resource_limits.max_samples_per_remote_writer = 500; + dr_qos.reader_resource_limits.max_fragmented_samples = 64; + dr_qos.reader_resource_limits.max_fragmented_samples_per_remote_writer = 32; +} + +uint64_t DDS_UInt8Seq_get_length(DDS_OctetSeq * seq) +{ + return seq->length(); +} + +void DDS_UInt8Seq_ensure_length(DDS_OctetSeq * seq, uint64_t length, uint64_t max) +{ + seq->ensure_length(length, max); +} + +unsigned char* DDS_UInt8Seq_get_reference(DDS_OctetSeq * seq, uint64_t index) +{ + return DDS_OctetSeq_get_reference(seq, index); +} + +const unsigned char* DDS_UInt8Seq_get_reference(const DDS_OctetSeq * seq, uint64_t index) +{ + return DDS_OctetSeq_get_reference(seq, index); +} + +void set_instance_color( + std::vector>& vec, + const DDS::InstanceHandle_t handle, + const std::string& color) { + // Check if the handle already exists + for (auto& p : vec) { + if (DDS_InstanceHandle_equals(&p.first, &handle)) { + return; + } + } + // If it doesn't exist, add it + vec.push_back(std::make_pair(handle, color)); +} + +std::string get_instance_color( + const std::vector>& vec, + const DDS::InstanceHandle_t handle) { + for (const auto& p : vec) { + if (DDS_InstanceHandle_equals(&p.first, &handle)) { + return p.second; + } + } + return ""; +} \ No newline at end of file diff --git a/srcCxx/shape_main.cxx b/srcCxx/shape_main.cxx index 94162fc..88c092a 100644 --- a/srcCxx/shape_main.cxx +++ b/srcCxx/shape_main.cxx @@ -19,8 +19,12 @@ #include #include +#include + #if defined(RTI_CONNEXT_DDS) #include "shape_configurator_rti_connext_dds.h" +#elif defined(RTI_CONNEXT_MICRO) +#include "shape_configurator_rti_connext_micro.h" #elif defined(TWINOAKS_COREDX) #include "shape_configurator_toc_coredx_dds.h" #elif defined(OPENDDS) @@ -276,6 +280,7 @@ class ShapeOptions { useconds_t periodic_announcement_period_us; + unsigned int datafrag_size; char* cft_expression; int size_modulo; @@ -335,6 +340,7 @@ class ShapeOptions { periodic_announcement_period_us = 0; + datafrag_size = 0; // Default: 0 (means not set) cft_expression = NULL; size_modulo = 0; // 0 means disabled @@ -412,6 +418,8 @@ class ShapeOptions { printf(" read_next_instance()\n"); printf(" --periodic-announcement : indicates the periodic participant\n"); printf(" announcement period in ms. Default 0 (off)\n"); + printf(" --datafrag-size : set the data fragment size (default: 0, means\n"); + printf(" not set)\n"); printf(" --cft : ContentFilteredTopic filter expression (quotes\n"); printf(" required around the expression). Cannot be used with\n"); printf(" -c on subscriber applications\n"); @@ -479,10 +487,24 @@ class ShapeOptions { logger.log_message("warning: --size-modulo has no effect unless shapesize (-z) is set to 0", Verbosity::ERROR); } if (subscribe && color != NULL && cft_expression != NULL) { - logger.log_message("error: cannot specify both --cft and -c/--color for subscriber applications", Verbosity::ERROR); + logger.log_message("error: cannot specify both --cft and -c for subscriber applications", Verbosity::ERROR); return false; } +#if defined(RTI_CONNEXT_MICRO) + if (subscribe && (color != NULL || cft_expression != NULL)) { + STRING_FREE(color); + color = NULL; + STRING_FREE(cft_expression); + cft_expression = NULL; + logger.log_message("warning: content filtered topic not supported, normal topic used", Verbosity::ERROR); + } + if (subscribe && take_read_next_instance) { + take_read_next_instance = false; + logger.log_message("warning: use of take/read_next_instance() not available, using take/read()", Verbosity::ERROR); + } +#endif + return true; } @@ -509,6 +531,7 @@ class ShapeOptions { {"take-read", no_argument, NULL, 'K'}, {"time-filter", required_argument, NULL, 'i'}, {"periodic-announcement", required_argument, NULL, 'N'}, + {"datafrag-size", required_argument, NULL, 'Z'}, {"cft", required_argument, NULL, 'F'}, {"size-modulo", required_argument, NULL, 'Q'}, {NULL, 0, NULL, 0 } @@ -895,6 +918,25 @@ class ShapeOptions { periodic_announcement_period_us = (useconds_t) converted_param * 1000; break; } + case 'Z': { + unsigned int converted_param = 0; + if (sscanf(optarg, "%u", &converted_param) == 0) { + logger.log_message("unrecognized value for datafrag-size " + + std::string(1, optarg[0]), + Verbosity::ERROR); + parse_ok = false; + } + // the spec mentions that the fragment size must satisfy: + // fragment size <= 65535 bytes. + if (converted_param > 65535) { + logger.log_message("incorrect value for datafrag-size, " + "it must be <= 65535 bytes" + + std::to_string(converted_param), + Verbosity::ERROR); + parse_ok = false; + } + datafrag_size = converted_param; + } case 'F': cft_expression = strdup(optarg); break; @@ -933,7 +975,9 @@ class ShapeOptions { "\n TimeBasedFilterInterval = " + std::to_string(timebasedfilter_interval_us / 1000) + "ms" + "\n DeadlineInterval = " + std::to_string(deadline_interval_us / 1000) + "ms" + "\n Shapesize = " + std::to_string(shapesize) + - "\n Reading method = " + (use_read ? "read_next_instance" : "take_next_instance") + + "\n Reading method = " + (use_read + ? (take_read_next_instance ? "read_next_instance" : "read") + : (take_read_next_instance ? "take_next_instance" : "take")) + "\n Write period = " + std::to_string(write_period_us / 1000) + "ms" + "\n Read period = " + std::to_string(read_period_us / 1000) + "ms" + "\n Lifespan = " + std::to_string(lifespan_us / 1000) + "ms" + @@ -948,7 +992,8 @@ class ShapeOptions { "\n Final Instance State = " + (unregister ? "Unregister" : (dispose ? "Dispose" : "not specified")) + "\n Periodic Announcement Period = " - + std::to_string(periodic_announcement_period_us / 1000) + "ms", + + std::to_string(periodic_announcement_period_us / 1000) + "ms" + + "\n Data Fragmentation Size = " + std::to_string(datafrag_size) + " bytes", Verbosity::DEBUG); if (topic_name != NULL){ logger.log_message(" Topic = " + std::string(topic_name), @@ -1085,6 +1130,10 @@ class ShapeApplication { pub = NULL; sub = NULL; color = NULL; + + topics = NULL; + drs = NULL; + dws = NULL; } //------------------------------------------------------------- @@ -1113,23 +1162,26 @@ class ShapeApplication { topics[i] = NULL; } - drs = (ShapeTypeDataReader**) malloc(sizeof(ShapeTypeDataReader*) * options->num_topics); - if (drs == NULL) { - logger.log_message("Error allocating memory for DataReaders", Verbosity::ERROR); - return false; - } - for (unsigned int i = 0; i < options->num_topics; ++i) { - drs[i] = NULL; + if (options->publish) { + dws = (ShapeTypeDataWriter**) malloc(sizeof(ShapeTypeDataWriter*) * options->num_topics); + if (dws == NULL) { + logger.log_message("Error allocating memory for DataWriters", Verbosity::ERROR); + return false; + } + for (unsigned int i = 0; i < options->num_topics; ++i) { + dws[i] = NULL; + } + } else { + drs = (ShapeTypeDataReader**) malloc(sizeof(ShapeTypeDataReader*) * options->num_topics); + if (drs == NULL) { + logger.log_message("Error allocating memory for DataReaders", Verbosity::ERROR); + return false; + } + for (unsigned int i = 0; i < options->num_topics; ++i) { + drs[i] = NULL; + } } - dws = (ShapeTypeDataWriter**) malloc(sizeof(ShapeTypeDataWriter*) * options->num_topics); - if (dws == NULL) { - logger.log_message("Error allocating memory for DataWriters", Verbosity::ERROR); - return false; - } - for (unsigned int i = 0; i < options->num_topics; ++i) { - dws[i] = NULL; - } #ifndef OBTAIN_DOMAIN_PARTICIPANT_FACTORY #define OBTAIN_DOMAIN_PARTICIPANT_FACTORY DomainParticipantFactory::get_instance() @@ -1146,9 +1198,40 @@ class ShapeApplication { CONFIGURE_PARTICIPANT_FACTORY #endif - DomainParticipantQos dp_qos; +#ifdef RTI_CONNEXT_MICRO + if (!config_micro()) { + logger.log_message("Error configuring Connext Micro", Verbosity::ERROR); + return false; + } +#endif + + DDS::DomainParticipantQos dp_qos; dpf->get_default_participant_qos(dp_qos); + if (options->datafrag_size > 0) { + bool result = false; + #if defined(RTI_CONNEXT_DDS) + result = configure_datafrag_size(dp_qos, options->datafrag_size); + #elif defined(RTI_CONNEXT_MICRO) + result = configure_datafrag_size(options->datafrag_size); + #endif + + if (!result) { + logger.log_message("Error configuring Data Fragmentation Size = " + + std::to_string(options->datafrag_size), Verbosity::ERROR); + return false; + } else { + logger.log_message("Data Fragmentation Size = " + + std::to_string(options->datafrag_size), Verbosity::DEBUG); + } + } + +#ifdef RTI_CONNEXT_MICRO + if (!configure_dp_qos(dp_qos)) { + return false; + } +#endif + #ifdef RTI_CONNEXT_DDS configure_participant_announcements_period(dp_qos, options->periodic_announcement_period_us); #endif @@ -1159,6 +1242,7 @@ class ShapeApplication { return false; } logger.log_message("Participant created", Verbosity::DEBUG); + #ifndef REGISTER_TYPE #define REGISTER_TYPE ShapeTypeTypeSupport::register_type #endif @@ -1180,7 +1264,7 @@ class ShapeApplication { logger.log_message("Topics created:", Verbosity::DEBUG); for (unsigned int i = 0; i < options->num_topics; ++i) { if (logger.verbosity() == Verbosity::DEBUG) { - printf(" topic[%d]=%p\n",i,(void*)topics[i]); + printf(" topic(%d)=%p\n",i,(void*)topics[i]); } } @@ -1234,7 +1318,8 @@ class ShapeApplication { { logger.log_message(" Presentation Access Scope " + QosUtils::to_string(pub_qos.presentation.access_scope) - + std::string(" : Not supported"), Verbosity::ERROR); + + std::string(" : not supported"), Verbosity::ERROR); + return false; } #endif #if defined(INTERCOM_DDS) @@ -1242,22 +1327,31 @@ class ShapeApplication { { logger.log_message(" Coherent Access with Presentation Access Scope " + QosUtils::to_string(pub_qos.presentation.access_scope) - + std::string(" : Not supported"), Verbosity::ERROR); + + std::string(" : not supported"), Verbosity::ERROR); + return false; } #endif } - logger.log_message(" Presentation Coherent Access = " + std::string(pub_qos.presentation.coherent_access ? "true" : "false"), Verbosity::DEBUG); logger.log_message(" Presentation Ordered Access = " + std::string(pub_qos.presentation.ordered_access ? "true" : "false"), Verbosity::DEBUG); logger.log_message(" Presentation Access Scope = " + QosUtils::to_string(pub_qos.presentation.access_scope), Verbosity::DEBUG); - #else - logger.log_message(" Presentation Coherent Access = Not supported", Verbosity::ERROR); - logger.log_message(" Presentation Ordered Access = Not supported", Verbosity::ERROR); - logger.log_message(" Presentation Access Scope = Not supported", Verbosity::ERROR); + if (options->coherent_set_enabled) { + logger.log_message(" Presentation Coherent Access = not supported", Verbosity::ERROR); + return false; + } + if (options->ordered_access_enabled) { + logger.log_message(" Presentation Ordered Access = not supported", Verbosity::ERROR); + return false; + } + if ((options->coherent_set_enabled || options->ordered_access_enabled) + && (options->coherent_set_access_scope != INSTANCE_PRESENTATION_QOS)) { + logger.log_message(" Presentation Access Scope = not supported", Verbosity::ERROR); + return false; + } #endif pub = dp->create_publisher(pub_qos, NULL, LISTENER_STATUS_MASK_NONE); @@ -1268,12 +1362,26 @@ class ShapeApplication { logger.log_message("Publisher created", Verbosity::DEBUG); logger.log_message("Data Writer QoS:", Verbosity::DEBUG); pub->get_default_datawriter_qos( dw_qos ); + +#if defined (RTI_CONNEXT_MICRO) + config_dw_qos(dw_qos); +#endif + dw_qos.reliability FIELD_ACCESSOR.kind = options->reliability_kind; logger.log_message(" Reliability = " + QosUtils::to_string(dw_qos.reliability FIELD_ACCESSOR.kind), Verbosity::DEBUG); dw_qos.durability FIELD_ACCESSOR.kind = options->durability_kind; +#if defined(RTI_CONNEXT_MICRO) + if (dw_qos.durability FIELD_ACCESSOR.kind == TRANSIENT_DURABILITY_QOS) { + logger.log_message(" Durability = TRANSIENT_DURABILITY_QOS : not supported", Verbosity::ERROR); + return false; + } else if (dw_qos.durability FIELD_ACCESSOR.kind == PERSISTENT_DURABILITY_QOS) { + logger.log_message(" Durability = PERSISTENT_DURABILITY_QOS : not supported", Verbosity::ERROR); + return false; + } +#endif logger.log_message(" Durability = " + QosUtils::to_string(dw_qos.durability FIELD_ACCESSOR.kind), Verbosity::DEBUG); -#if defined(RTI_CONNEXT_DDS) +#if defined(RTI_CONNEXT_DDS) || defined (RTI_CONNEXT_MICRO) DataRepresentationIdSeq data_representation_seq; data_representation_seq.ensure_length(1,1); data_representation_seq[0] = options->data_representation; @@ -1320,9 +1428,9 @@ class ShapeApplication { dw_qos.deadline FIELD_ACCESSOR.period.SECONDS_FIELD_NAME = options->deadline_interval_us / 1000000; dw_qos.deadline FIELD_ACCESSOR.period.nanosec = (options->deadline_interval_us % 1000000) * 1000; } - logger.log_message(" DeadlinePeriod = " + std::to_string(dw_qos.deadline FIELD_ACCESSOR.period.SECONDS_FIELD_NAME) + "secs", + logger.log_message(" DeadlinePeriod = " + std::to_string(dw_qos.deadline FIELD_ACCESSOR.period.SECONDS_FIELD_NAME) + " secs", Verbosity::DEBUG); - logger.log_message(" " + std::to_string(dw_qos.deadline FIELD_ACCESSOR.period.nanosec) + "nanosecs", + logger.log_message(" " + std::to_string(dw_qos.deadline FIELD_ACCESSOR.period.nanosec) + " nanosecs", Verbosity::DEBUG); // options->history_depth < 0 means leave default value @@ -1339,38 +1447,39 @@ class ShapeApplication { } if (options->lifespan_us > 0) { -#if defined(RTI_CONNEXT_DDS) || defined(OPENDDS) || defined(TWINOAKS_COREDX) || defined(INTERCOM_DDS) +#if defined (RTI_CONNEXT_MICRO) + logger.log_message(" Lifespan = not supported", Verbosity::ERROR); + return false; +#elif defined(RTI_CONNEXT_DDS) || defined(OPENDDS) || defined(TWINOAKS_COREDX) || defined(INTERCOM_DDS) dw_qos.lifespan FIELD_ACCESSOR.duration.SECONDS_FIELD_NAME = options->lifespan_us / 1000000; dw_qos.lifespan FIELD_ACCESSOR.duration.nanosec = (options->lifespan_us % 1000000) * 1000; #elif defined(EPROSIMA_FAST_DDS) dw_qos.lifespan FIELD_ACCESSOR.duration = Duration_t(options->lifespan_us * 1e-6); #endif } - logger.log_message(" Lifespan = " + std::to_string(dw_qos.lifespan FIELD_ACCESSOR.duration.SECONDS_FIELD_NAME) + " secs", Verbosity::DEBUG); - logger.log_message(" " + std::to_string(dw_qos.lifespan FIELD_ACCESSOR.duration.nanosec) + " nanosecs", Verbosity::DEBUG); +#if !defined(RTI_CONNEXT_MICRO) + logger.log_message(" Lifespan = " + std::to_string(dw_qos.lifespan FIELD_ACCESSOR.duration.SECONDS_FIELD_NAME) + " secs", + Verbosity::DEBUG); + logger.log_message(" " + std::to_string(dw_qos.lifespan FIELD_ACCESSOR.duration.nanosec) + " nanosecs", + Verbosity::DEBUG); +#endif #if defined(RTI_CONNEXT_DDS) - // usage of large data - if (PropertyQosPolicyHelper::assert_property( - dw_qos.property, - "dds.data_writer.history.memory_manager.fast_pool.pool_buffer_max_size", - "65536", - DDS_BOOLEAN_FALSE) != DDS_RETCODE_OK) { - logger.log_message("failed to set property pool_buffer_max_size", Verbosity::ERROR); - } if (options->additional_payload_size > 64000) { - dw_qos.publish_mode.kind = ASYNCHRONOUS_PUBLISH_MODE_QOS; + configure_large_data(dw_qos); } logger.log_message(" Publish Mode kind = " + std::string(dw_qos.publish_mode.kind == ASYNCHRONOUS_PUBLISH_MODE_QOS ? "ASYNCHRONOUS_PUBLISH_MODE_QOS" : "SYNCHRONOUS_PUBLISH_MODE_QOS"), Verbosity::DEBUG); #endif +#if !defined(RTI_CONNEXT_MICRO) if (options->unregister) { dw_qos.writer_data_lifecycle FIELD_ACCESSOR .autodispose_unregistered_instances = DDS_BOOLEAN_FALSE; } logger.log_message(" Autodispose_unregistered_instances = " + std::string(dw_qos.writer_data_lifecycle FIELD_ACCESSOR .autodispose_unregistered_instances ? "true" : "false"), Verbosity::DEBUG); +#endif // Create different DataWriters (depending on the number of entities) // The DWs are attached to the same array index of the topics. @@ -1386,7 +1495,7 @@ class ShapeApplication { logger.log_message("DataWriters created:", Verbosity::DEBUG); for (unsigned int i = 0; i < options->num_topics; ++i) { if (logger.verbosity() == Verbosity::DEBUG) { - printf(" dws[%d]=%p\n",i,(void*)dws[i]); + printf(" dws(%d)=%p\n",i,(void*)dws[i]); } } @@ -1433,7 +1542,8 @@ class ShapeApplication { { logger.log_message(" Presentation Access Scope " + QosUtils::to_string(sub_qos.presentation.access_scope) - + std::string(" : Not supported"), Verbosity::ERROR); + + std::string(" : not supported"), Verbosity::ERROR); + return false; } #endif #if defined(INTERCOM_DDS) @@ -1441,7 +1551,8 @@ class ShapeApplication { { logger.log_message(" Coherent Access with Presentation Access Scope " + QosUtils::to_string(sub_qos.presentation.access_scope) - + std::string(" : Not supported"), Verbosity::ERROR); + + std::string(" : not supported"), Verbosity::ERROR); + return false; } #endif } @@ -1454,9 +1565,19 @@ class ShapeApplication { QosUtils::to_string(sub_qos.presentation.access_scope), Verbosity::DEBUG); #else - logger.log_message(" Presentation Coherent Access = Not supported", Verbosity::ERROR); - logger.log_message(" Presentation Ordered Access = Not supported", Verbosity::ERROR); - logger.log_message(" Presentation Access Scope = Not supported", Verbosity::ERROR); + if (options->coherent_set_enabled) { + logger.log_message(" Presentation Coherent Access = not supported", Verbosity::ERROR); + return false; + } + if (options->ordered_access_enabled) { + logger.log_message(" Presentation Ordered Access = not supported", Verbosity::ERROR); + return false; + } + if ((options->coherent_set_enabled || options->ordered_access_enabled) + && (options->coherent_set_access_scope != INSTANCE_PRESENTATION_QOS)) { + logger.log_message(" Presentation Access Scope = not supported", Verbosity::ERROR); + return false; + } #endif sub = dp->create_subscriber( sub_qos, NULL, LISTENER_STATUS_MASK_NONE ); @@ -1464,15 +1585,30 @@ class ShapeApplication { logger.log_message("failed to create subscriber", Verbosity::ERROR); return false; } + logger.log_message("Subscriber created", Verbosity::DEBUG); logger.log_message("Data Reader QoS:", Verbosity::DEBUG); sub->get_default_datareader_qos( dr_qos ); + +#if defined (RTI_CONNEXT_MICRO) + config_dr_qos(dr_qos); +#endif + dr_qos.reliability FIELD_ACCESSOR.kind = options->reliability_kind; logger.log_message(" Reliability = " + QosUtils::to_string(dr_qos.reliability FIELD_ACCESSOR.kind), Verbosity::DEBUG); dr_qos.durability FIELD_ACCESSOR.kind = options->durability_kind; +#if defined(RTI_CONNEXT_MICRO) + if (dr_qos.durability FIELD_ACCESSOR.kind == TRANSIENT_DURABILITY_QOS) { + logger.log_message(" Durability = TRANSIENT_DURABILITY_QOS : not supported", Verbosity::ERROR); + return false; + } else if (dr_qos.durability FIELD_ACCESSOR.kind == PERSISTENT_DURABILITY_QOS) { + logger.log_message(" Durability = PERSISTENT_DURABILITY_QOS : not supported", Verbosity::ERROR); + return false; + } +#endif logger.log_message(" Durability = " + QosUtils::to_string(dr_qos.durability FIELD_ACCESSOR.kind), Verbosity::DEBUG); -#if defined(RTI_CONNEXT_DDS) +#if defined(RTI_CONNEXT_DDS) || defined (RTI_CONNEXT_MICRO) DataRepresentationIdSeq data_representation_seq; data_representation_seq.ensure_length(1,1); data_representation_seq[0] = options->data_representation; @@ -1505,20 +1641,27 @@ class ShapeApplication { dr_qos.ownership FIELD_ACCESSOR.kind = EXCLUSIVE_OWNERSHIP_QOS; } logger.log_message(" Ownership = " + QosUtils::to_string(dr_qos.ownership FIELD_ACCESSOR.kind), Verbosity::DEBUG); + + if ( options->timebasedfilter_interval_us > 0) { -#if defined(EPROSIMA_FAST_DDS) - logger.log_message(" Time based filter not supported", Verbosity::ERROR); +#if defined(EPROSIMA_FAST_DDS) || defined(RTI_CONNEXT_MICRO) + logger.log_message(" TimeBasedFilter = not supported", Verbosity::ERROR); + return false; #else dr_qos.time_based_filter FIELD_ACCESSOR.minimum_separation.SECONDS_FIELD_NAME = options->timebasedfilter_interval_us / 1000000; dr_qos.time_based_filter FIELD_ACCESSOR.minimum_separation.nanosec = (options->timebasedfilter_interval_us % 1000000) * 1000; #endif } + +#if !defined(EPROSIMA_FAST_DDS) && !defined(RTI_CONNEXT_MICRO) logger.log_message(" TimeBasedFilter = " + - std::to_string(dr_qos.time_based_filter FIELD_ACCESSOR.minimum_separation.SECONDS_FIELD_NAME) + "secs", - Verbosity::DEBUG); + std::to_string(dr_qos.time_based_filter FIELD_ACCESSOR.minimum_separation.SECONDS_FIELD_NAME) + "secs", + Verbosity::DEBUG); logger.log_message(" " + - std::to_string(dr_qos.time_based_filter FIELD_ACCESSOR.minimum_separation.nanosec) + "nanosecs", - Verbosity::DEBUG); + std::to_string(dr_qos.time_based_filter FIELD_ACCESSOR.minimum_separation.nanosec) + "nanosecs", + Verbosity::DEBUG); +#endif + if ( options->deadline_interval_us > 0 ) { dr_qos.deadline FIELD_ACCESSOR.period.SECONDS_FIELD_NAME = options->deadline_interval_us / 1000000; dr_qos.deadline FIELD_ACCESSOR.period.nanosec = (options->deadline_interval_us % 1000000) * 1000;; @@ -1542,14 +1685,17 @@ class ShapeApplication { } if ( options->cft_expression != NULL || options->color != NULL) { + /* For Connext Micro color and cft_expression will be always NULL */ +#if !defined(RTI_CONNEXT_MICRO) + /* filter on specified color */ ContentFilteredTopic *cft = NULL; StringSeq cf_params; for (unsigned int i = 0; i < options->num_topics; ++i) { const std::string filtered_topic_name_str = - std::string(options->topic_name) + - (i > 0 ? std::to_string(i) : "") + - "_filtered"; + std::string(options->topic_name) + + (i > 0 ? std::to_string(i) : "") + + "_filtered"; const char* filtered_topic_name = filtered_topic_name_str.c_str(); const char* filter_expr = nullptr; @@ -1558,7 +1704,7 @@ class ShapeApplication { cft = dp->create_contentfilteredtopic(filtered_topic_name, topics[i], filter_expr, cf_params); logger.log_message(" ContentFilterTopic = \"" + std::string(filter_expr) + "\"", Verbosity::DEBUG); } else if (options->color != NULL) { -#if defined(RTI_CONNEXT_DDS) + #if defined(RTI_CONNEXT_DDS) char parameter[64]; snprintf(parameter, 64, "'%s'", options->color); StringSeq_push(cf_params, parameter); @@ -1566,7 +1712,7 @@ class ShapeApplication { cft = dp->create_contentfilteredtopic(filtered_topic_name, topics[i], "color MATCH %0", cf_params); logger.log_message(" ContentFilterTopic = \"color MATCH " + std::string(parameter) + std::string("\""), Verbosity::DEBUG); -#elif defined(INTERCOM_DDS) + #elif defined(INTERCOM_DDS) char parameter[64]; snprintf(parameter, 64, "'%s'", options->color); StringSeq_push(cf_params, parameter); @@ -1574,17 +1720,17 @@ class ShapeApplication { cft = dp->create_contentfilteredtopic(filtered_topic_name, topics[i], "color = %0", cf_params); logger.log_message(" ContentFilterTopic = \"color = " + std::string(parameter) + std::string("\""), Verbosity::DEBUG); -#elif defined(TWINOAKS_COREDX) || defined(OPENDDS) + #elif defined(TWINOAKS_COREDX) || defined(OPENDDS) StringSeq_push(cf_params, options->color); cft = dp->create_contentfilteredtopic(filtered_topic_name, topics[i], "color = %0", cf_params); logger.log_message(" ContentFilterTopic = \"color = " + std::string(options->color) + std::string("\""), Verbosity::DEBUG); -#elif defined(EPROSIMA_FAST_DDS) + #elif defined(EPROSIMA_FAST_DDS) cf_params.push_back(std::string("'") + options->color + std::string("'")); cft = dp->create_contentfilteredtopic(filtered_topic_name, topics[i], "color = %0", cf_params); logger.log_message(" ContentFilterTopic = \"color = " + cf_params[0] + std::string("\""), Verbosity::DEBUG); -#endif + #endif } if (cft == NULL) { @@ -1599,6 +1745,8 @@ class ShapeApplication { return false; } } +#endif + } else { // Create different DataReaders (depending on the number of entities) // The DRs are attached to the same array index of the topics. @@ -1614,7 +1762,7 @@ class ShapeApplication { logger.log_message("DataReaders created:", Verbosity::DEBUG); for (unsigned int i = 0; i < options->num_topics; ++i) { if (logger.verbosity() == Verbosity::DEBUG) { - printf(" drs[%d]=%p\n",i,(void*)drs[i]); + printf(" drs(%d)=%p\n",i,(void*)drs[i]); } } @@ -1633,7 +1781,7 @@ class ShapeApplication { static void shape_initialize_w_color(ShapeType &shape, const char * color_value) { -#if defined(RTI_CONNEXT_DDS) +#if defined(RTI_CONNEXT_DDS) || defined(RTI_CONNEXT_MICRO) ShapeType_initialize(&shape); #endif @@ -1665,13 +1813,15 @@ class ShapeApplication { #if defined(EPROSIMA_FAST_DDS) // TODO: Remove when Fast DDS supports `get_key_value()` std::map instance_handle_color; +#elif defined(RTI_CONNEXT_MICRO) + std::vector> instance_handle_color; #endif while ( ! all_done ) { ReturnCode_t retval; SampleInfoSeq sample_infos; -#if defined(RTI_CONNEXT_DDS) || defined(OPENDDS) || defined(INTERCOM_DDS) +#if defined(RTI_CONNEXT_DDS) || defined(RTI_CONNEXT_MICRO) || defined(OPENDDS) || defined(INTERCOM_DDS) ShapeTypeSeq samples; #elif defined(TWINOAKS_COREDX) ShapeTypePtrSeq samples; @@ -1680,6 +1830,7 @@ class ShapeApplication { DataSeq samples; #endif +#if defined(RTI_CONNEXT_DDS) || defined(TWINOAKS_COREDX) || defined(INTERCOM_DDS) if (options->coherent_set_enabled) { printf("Reading coherent sets, iteration %d\n",n); } @@ -1689,12 +1840,14 @@ class ShapeApplication { if (options->coherent_set_enabled || options->ordered_access_enabled) { sub->begin_access(); } +#endif for (unsigned int i = 0; i < options->num_topics; ++i) { previous_handles[i] = HANDLE_NIL; do { if (!options->use_read) { if (options->take_read_next_instance) { logger.log_message("Calling take_next_instance() function", Verbosity::DEBUG); +#if !defined(RTI_CONNEXT_MICRO) retval = drs[i]->take_next_instance ( samples, sample_infos, LENGTH_UNLIMITED, @@ -1702,6 +1855,7 @@ class ShapeApplication { ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE ); +#endif } else { logger.log_message("Calling take() function", Verbosity::DEBUG); retval = drs[i]->take ( samples, @@ -1713,6 +1867,7 @@ class ShapeApplication { } } else { /* Use read_next_instance*/ if (options->take_read_next_instance) { +#if !defined(RTI_CONNEXT_MICRO) logger.log_message("Calling read_next_instance() function", Verbosity::DEBUG); retval = drs[i]->read_next_instance ( samples, sample_infos, @@ -1721,6 +1876,7 @@ class ShapeApplication { ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE ); +#endif } else { logger.log_message("Calling read() function", Verbosity::DEBUG); retval = drs[i]->read ( samples, @@ -1743,7 +1899,7 @@ class ShapeApplication { for (decltype(n_samples) n_sample = 0; n_sample < n_samples; n_sample++) { logger.log_message("Processing sample " + std::to_string(n_sample), Verbosity::DEBUG); -#if defined(RTI_CONNEXT_DDS) +#if defined(RTI_CONNEXT_DDS) || defined(RTI_CONNEXT_MICRO) ShapeType *sample = &samples[n_sample]; SampleInfo *sample_info = &sample_infos[n_sample]; #elif defined(TWINOAKS_COREDX) @@ -1767,19 +1923,26 @@ class ShapeApplication { #else if (DDS_UInt8Seq_get_length(&sample->additional_payload_size FIELD_ACCESSOR) > 0) { int additional_payload_index = DDS_UInt8Seq_get_length(&sample->additional_payload_size FIELD_ACCESSOR) - 1; - printf(" {%u}", sample->additional_payload_size FIELD_ACCESSOR [additional_payload_index]); + printf(" {%u}", sample->additional_payload_size FIELD_ACCESSOR [additional_payload_index]); } #endif printf("\n"); #if defined(EPROSIMA_FAST_DDS) instance_handle_color[sample_info->instance_handle] = sample->color FIELD_ACCESSOR STRING_IN; +#elif defined(RTI_CONNEXT_MICRO) + set_instance_color(instance_handle_color, sample_info->instance_handle, sample->color); #endif - } else { + } + + if (sample_info->instance_state != ALIVE_INSTANCE_STATE) { ShapeType shape_key; shape_initialize_w_color(shape_key, NULL); #if defined(EPROSIMA_FAST_DDS) shape_key.color FIELD_ACCESSOR = instance_handle_color[sample_info->instance_handle] NAME_ACCESSOR; +#elif defined(RTI_CONNEXT_MICRO) + // 128 is the max length of the color string + strncpy(shape_key.color, get_instance_color(instance_handle_color, sample_info->instance_handle).c_str(), 128); #else drs[i]->get_key_value(shape_key, sample_info->instance_handle); #endif @@ -1795,7 +1958,7 @@ class ShapeApplication { } } -#if defined(RTI_CONNEXT_DDS) || defined(OPENDDS) || defined(EPROSIMA_FAST_DDS) || defined(INTERCOM_DDS) +#if defined(RTI_CONNEXT_DDS) || defined(RTI_CONNEXT_MICRO) || defined(OPENDDS) || defined(EPROSIMA_FAST_DDS) || defined(INTERCOM_DDS) previous_handles[i] = sample_infos[0].instance_handle; #elif defined(TWINOAKS_COREDX) previous_handles[i] = sample_infos[0]->instance_handle; @@ -1806,9 +1969,11 @@ class ShapeApplication { } while (retval == RETCODE_OK); } +#if defined(RTI_CONNEXT_DDS) || defined(TWINOAKS_COREDX) || defined(INTERCOM_DDS) if (options->coherent_set_enabled || options->ordered_access_enabled) { sub->end_access(); } +#endif // increasing number of iterations n++; @@ -1890,6 +2055,7 @@ class ShapeApplication { } } +#if !defined(RTI_CONNEXT_MICRO) if (options->coherent_set_enabled || options->ordered_access_enabled) { // n also represents the number of samples written per publisher per instance if (options->coherent_set_sample_count != 0 && n % options->coherent_set_sample_count == 0) { @@ -1897,6 +2063,7 @@ class ShapeApplication { pub->begin_coherent_changes(); } } +#endif for (unsigned int i = 0; i < options->num_topics; ++i) { for (unsigned int j = 0; j < options->num_instances; ++j) { @@ -1906,7 +2073,7 @@ class ShapeApplication { shape_set_color(shape, instance_color.c_str()); } -#if defined(RTI_CONNEXT_DDS) || defined(OPENDDS) || defined(INTERCOM_DDS) || defined(TWINOAKS_COREDX) +#if defined(RTI_CONNEXT_DDS) || defined(RTI_CONNEXT_MICRO) || defined(OPENDDS) || defined(INTERCOM_DDS) || defined(TWINOAKS_COREDX) dws[i]->write( shape, HANDLE_NIL ); #elif defined(EPROSIMA_FAST_DDS) dws[i]->write( &shape, HANDLE_NIL ); @@ -1927,6 +2094,7 @@ class ShapeApplication { } } +#if !defined(RTI_CONNEXT_MICRO) if (options->coherent_set_enabled || options->ordered_access_enabled) { // n also represents the number of samples written per publisher per instance if (options->coherent_set_sample_count != 0 @@ -1935,6 +2103,7 @@ class ShapeApplication { pub->end_coherent_changes(); } } +#endif usleep(options->write_period_us); // increase number of iterations @@ -1978,13 +2147,15 @@ class ShapeApplication { /* ensure that all updates have been acked by reader[s] */ /* otherwise the app may terminate before reader has seen all updates */ -#if defined(RTI_CONNEXT_DDS) || defined (OPENDDS) +#if defined(RTI_CONNEXT_DDS) || defined (RTI_CONNEXT_MICRO) || defined (OPENDDS) Duration_t max_wait = {1, 0}; /* should not take long... */ #else Duration_t max_wait( 1, 0 ); /* should not take long... */ #endif for (unsigned int i = 0; i < options->num_topics; ++i) { +#if !defined(RTI_CONNEXT_MICRO) dws[i]->wait_for_acknowledgments( max_wait ); +#endif } return true; diff --git a/test_suite.py b/test_suite.py index 3535094..5bce266 100644 --- a/test_suite.py +++ b/test_suite.py @@ -122,7 +122,7 @@ # RELIABILITY 'Test_Reliability_0' : { - 'apps' : ['-P -t Square -b -z 0', '-S -t Square -b -z 0'], + 'apps' : ['-P -t Square -b -z 0', '-S -t Square -b'], 'expected_codes' : [ReturnCode.OK, ReturnCode.OK], 'check_function' : tsf.test_reliability_order, 'title' : 'Communication between BEST_EFFORT publisher and subscriber',