diff --git a/README.md b/README.md index f36654cf4..2ebcad752 100644 --- a/README.md +++ b/README.md @@ -24,9 +24,11 @@ git submodule update --init --recursive ## Versioning The examples contained in this branch were built and tested against **RTI Connext -7.5.0**. If you need examples that have been built and tested against previous +7.7.0**. If you need examples that have been built and tested against previous versions of RTI Connext, please check out the corresponding release branch: +- [release/7.6.0](https://github.com/rticommunity/rticonnextdds-examples/tree/release/7.6.0) +- [release/7.5.0](https://github.com/rticommunity/rticonnextdds-examples/tree/release/7.5.0) - [release/7.4.0](https://github.com/rticommunity/rticonnextdds-examples/tree/release/7.4.0) - [release/7.3.0](https://github.com/rticommunity/rticonnextdds-examples/tree/release/7.3.0) - [release/7.2.0](https://github.com/rticommunity/rticonnextdds-examples/tree/release/7.2.0) diff --git a/VERSION b/VERSION index b9bc2fdcb..d21b198c8 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -latest \ No newline at end of file +7.7.0 \ No newline at end of file diff --git a/examples/connext_dds/asynchronous_publication/cs/AsyncPublicationExample.csproj b/examples/connext_dds/asynchronous_publication/cs/AsyncPublicationExample.csproj index f7469d3b6..67c37c9b3 100644 --- a/examples/connext_dds/asynchronous_publication/cs/AsyncPublicationExample.csproj +++ b/examples/connext_dds/asynchronous_publication/cs/AsyncPublicationExample.csproj @@ -6,7 +6,7 @@ - + diff --git a/examples/connext_dds/custom_flow_controller/cs/FlowControllerExample.csproj b/examples/connext_dds/custom_flow_controller/cs/FlowControllerExample.csproj index f7469d3b6..67c37c9b3 100644 --- a/examples/connext_dds/custom_flow_controller/cs/FlowControllerExample.csproj +++ b/examples/connext_dds/custom_flow_controller/cs/FlowControllerExample.csproj @@ -6,7 +6,7 @@ - + diff --git a/examples/connext_dds/dynamic_data_sequences/cs/DynamicDataSequencesExample.csproj b/examples/connext_dds/dynamic_data_sequences/cs/DynamicDataSequencesExample.csproj index a76fa5a9e..7d3e3e093 100644 --- a/examples/connext_dds/dynamic_data_sequences/cs/DynamicDataSequencesExample.csproj +++ b/examples/connext_dds/dynamic_data_sequences/cs/DynamicDataSequencesExample.csproj @@ -6,7 +6,7 @@ - + diff --git a/examples/connext_dds/dynamic_data_using_publisher_subscriber/cs/Shapes.csproj b/examples/connext_dds/dynamic_data_using_publisher_subscriber/cs/Shapes.csproj index f7469d3b6..67c37c9b3 100644 --- a/examples/connext_dds/dynamic_data_using_publisher_subscriber/cs/Shapes.csproj +++ b/examples/connext_dds/dynamic_data_using_publisher_subscriber/cs/Shapes.csproj @@ -6,7 +6,7 @@ - + diff --git a/examples/connext_dds/group_coherent_presentation/cs/GroupCoherentExample.csproj b/examples/connext_dds/group_coherent_presentation/cs/GroupCoherentExample.csproj index f7469d3b6..67c37c9b3 100644 --- a/examples/connext_dds/group_coherent_presentation/cs/GroupCoherentExample.csproj +++ b/examples/connext_dds/group_coherent_presentation/cs/GroupCoherentExample.csproj @@ -6,7 +6,7 @@ - + diff --git a/examples/connext_dds/partitions/cs/PartitionsExample.csproj b/examples/connext_dds/partitions/cs/PartitionsExample.csproj index cf9bc115d..b758bfb45 100644 --- a/examples/connext_dds/partitions/cs/PartitionsExample.csproj +++ b/examples/connext_dds/partitions/cs/PartitionsExample.csproj @@ -6,7 +6,7 @@ - + diff --git a/examples/connext_dds/remote_procedure_call/py/inventory_client.py b/examples/connext_dds/remote_procedure_call/py/inventory_client.py index 78ec9611c..26c416d99 100644 --- a/examples/connext_dds/remote_procedure_call/py/inventory_client.py +++ b/examples/connext_dds/remote_procedure_call/py/inventory_client.py @@ -21,21 +21,13 @@ class InventoryClient(InventoryService, rpc.ClientBase): ... -async def wait_for_service(client: InventoryClient): - while client.matched_service_count == 0: - await sleep(0.1) - - async def run_client(args): participant = dds.DomainParticipant(args.domain) client = InventoryClient( participant, "Inventory", max_wait_per_call=dds.Duration(20) ) - # For versions 7.4.0 and below: - await wait_for_service(client) - # For newer versions you can use the following: - # await client.wait_for_service_async(dds.Duration(20)) + await client.wait_for_service_async(dds.Duration(20)) print("Initial inventory: ", await client.get_inventory()) diff --git a/examples/connext_dds/request_reply/cs/Primes.csproj b/examples/connext_dds/request_reply/cs/Primes.csproj index 9f7f3d34b..132b22605 100644 --- a/examples/connext_dds/request_reply/cs/Primes.csproj +++ b/examples/connext_dds/request_reply/cs/Primes.csproj @@ -6,7 +6,7 @@ - + diff --git a/examples/connext_secure/CMakeLists.txt b/examples/connext_secure/CMakeLists.txt index 4df2dc5a4..3ceb72855 100644 --- a/examples/connext_secure/CMakeLists.txt +++ b/examples/connext_secure/CMakeLists.txt @@ -24,6 +24,7 @@ if(NOT DEFINED CONNEXTDDS_CONNEXT_SECURE_EXAMPLES) set(CONNEXTDDS_CONNEXT_SECURE_EXAMPLES "cds" "certificate_revocation_list" + "dynamic_permissions" "lightweight" "whitelist" ) diff --git a/examples/connext_secure/dynamic_permissions/README.md b/examples/connext_secure/dynamic_permissions/README.md new file mode 100644 index 000000000..4d9cc53d6 --- /dev/null +++ b/examples/connext_secure/dynamic_permissions/README.md @@ -0,0 +1,93 @@ +# Example Code: Dynamic Permissions + +## Concept + +This example showcases how the Security Plugins enforce Permissions Document +expiration, and how the Permissions Document can be renewed to resume +communication. + +## Building the Example + +Use the following commands to build the example and get the executables that +you can run: + +```sh +cd c++11/ +mkdir build && cd build +cmake .. +cmake --build . +``` + +You can optionally pass the +``-DCONNEXTDDS_DIR=``, +``-DOPENSSL_ROOT_DIR=``, +``-DCONNEXTDDS_ARCH=``, +``-DCMAKE_BUILD_TYPE=``, and +``-DBUILD_SHARED_LIBS=`` variables to the cmake configuration step. + +After building the example, you will have a publisher Permissions Document that +expires in 1 minute. If you need to re-create it, please remove this file from +your build directory and re-run the ``createExpiringPermissions`` target. + +```sh +rm security/ecdsa01/xml/Permissions2_expiring.xml && \ + cmake --build . --target createExpiringPermissions +``` + +## Running the example + +Demo is based on a standard rtiddsgen publisher and subscriber example code. + +Run a publisher and a subscriber in separate terminal windows. + +```sh +./dynamic_permissions_publisher +``` + +```sh +./dynamic_permissions_subscriber +``` + +Verify that they communicate and that the subscriber is receiving data. + +```sh +# Publisher +Writing ::DynamicPermissions, count 0 +Writing ::DynamicPermissions, count 1 +# [...] + +# Subscriber +::DynamicPermissions subscriber sleeping up to 1 sec... +[value: 0] +::DynamicPermissions subscriber sleeping up to 1 sec... +[value: 1] +::DynamicPermissions subscriber sleeping up to 1 sec... +# [...] +``` + +Once the Permissions Document of the publisher DomainParticipant expires, you +will see the following error messages: + +```sh +# Publisher +ERROR [0x831AB06E,0x43876C36,0xFD825600:0x000001C1|ADVANCE NOTIFY INVALID LOCAL PERMISSIONS|CHECK STATUS|LC:Security] RTI_Security_PermissionsGrant_isValidTime:{"DDS:Security:LogTopicV2":{"f":"10","s":"3","t":{"s":"1748517658","n":"108000"},"h":"RTISP-10036","i":"0.0.0.0","a":"RTI Secure DDS Application","p":"85264","k":"50331706","x":[{"DDS":[{"domain_id":"0"},{"guid":"831AB06E.43876C36.FD825600.000001C1"},{"plugin_class":"DDS:Access:Permissions"},{"plugin_method":"RTI_Security_PermissionsGrant_isValidTime"}]}],"m":"now is after not_after of permissions file"}} +ERROR [0x831AB06E,0x43876C36,0xFD825600:0x000001C1|ADVANCE NOTIFY INVALID LOCAL PERMISSIONS|CHECK STATUS|LC:Security] RTI_Security_AccessControl_validate_status:{"DDS:Security:LogTopicV2":{"f":"10","s":"3","t":{"s":"1748517658","n":"192000"},"h":"RTISP-10036","i":"0.0.0.0","a":"RTI Secure DDS Application","p":"85264","k":"50331706","x":[{"DDS":[{"domain_id":"0"},{"guid":"831AB06E.43876C36.FD825600.000001C1"},{"plugin_class":"DDS:Access:Permissions"},{"plugin_method":"RTI_Security_AccessControl_validate_status"}]}],"m":"permissions' validity period is invalid."}} +ERROR [0x831AB06E,0x43876C36,0xFD825600:0x000001C1|ADVANCE NOTIFY INVALID LOCAL PERMISSIONS|CHECK STATUS|LC:Security] PRESParticipant_onSecurityLocalCredentialValidateEvent:FAILED TO VALIDATE | Local permissions credentials. +ERROR [0x831AB06E,0x43876C36,0xFD825600:0x000001C1|ADVANCE NOTIFY INVALID LOCAL PERMISSIONS|LC:Security] PRESParticipant_onSecurityLocalCredentialEventListener:FAILED TO VALIDATE | Local credentials. + +# Subscriber +ERROR [PARSE MESSAGE|0xDED844B7,0x87B9550F,0xB66DD964:0x000201C4{Entity=DR,MessageKind=DATA}|RECEIVE FROM 0x831AB06E,0x43876C36,0xFD825600:0x000201C3|:0x000001C1{Domain=0}|RECEIVE SAMPLE|PROCESS HANDSHAKE|GET SECURITY STATE|LC:Security] RTI_Security_PermissionsGrant_isValidTime:{"DDS:Security:LogTopicV2":{"f":"10","s":"3","t":{"s":"1748517682","n":"984966998"},"h":"RTISP-10036","i":"0.0.0.0","a":"RTI Secure DDS Application","p":"85248","k":"50331706","x":[{"DDS":[{"domain_id":"0"},{"guid":"DED844B7.87B9550F.B66DD964.000001C1"},{"plugin_class":"DDS:Access:Permissions"},{"plugin_method":"RTI_Security_PermissionsGrant_isValidTime"}]}],"m":"now is after not_after of permissions file"}} +ERROR [PARSE MESSAGE|0xDED844B7,0x87B9550F,0xB66DD964:0x000201C4{Entity=DR,MessageKind=DATA}|RECEIVE FROM 0x831AB06E,0x43876C36,0xFD825600:0x000201C3|:0x000001C1{Domain=0}|RECEIVE SAMPLE|PROCESS HANDSHAKE|GET SECURITY STATE|LC:Security] RTI_Security_AccessControl_validatePermissionsDocument:{"DDS:Security:LogTopicV2":{"f":"10","s":"3","t":{"s":"1748517682","n":"985028998"},"h":"RTISP-10036","i":"0.0.0.0","a":"RTI Secure DDS Application","p":"85248","k":"50331706","x":[{"DDS":[{"domain_id":"0"},{"guid":"DED844B7.87B9550F.B66DD964.000001C1"},{"plugin_class":"DDS:Access:Permissions"},{"plugin_method":"RTI_Security_AccessControl_validatePermissionsDocument"}]}],"m":"grant has invalid time"}} +ERROR [PARSE MESSAGE|0xDED844B7,0x87B9550F,0xB66DD964:0x000201C4{Entity=DR,MessageKind=DATA}|RECEIVE FROM 0x831AB06E,0x43876C36,0xFD825600:0x000201C3|:0x000001C1{Domain=0}|RECEIVE SAMPLE|PROCESS HANDSHAKE|GET SECURITY STATE|LC:Security] RTI_Security_AccessControl_validate_remote_permissions:{"DDS:Security:LogTopicV2":{"f":"10","s":"1","t":{"s":"1748517682","n":"985044998"},"h":"RTISP-10036","i":"0.0.0.0","a":"RTI Secure DDS Application","p":"85248","k":"50331706","x":[{"DDS":[{"domain_id":"0"},{"guid":"DED844B7.87B9550F.B66DD964.000001C1"},{"plugin_class":"DDS:Access:Permissions"},{"plugin_method":"RTI_Security_AccessControl_validate_remote_permissions"}]}],"m":"failed to validate remote permissions"}} +ERROR [PARSE MESSAGE|0xDED844B7,0x87B9550F,0xB66DD964:0x000201C4{Entity=DR,MessageKind=DATA}|RECEIVE FROM 0x831AB06E,0x43876C36,0xFD825600:0x000201C3|:0x000001C1{Domain=0}|RECEIVE SAMPLE|PROCESS HANDSHAKE|GET SECURITY STATE|LC:Security] DDS_DomainParticipantTrustPlugins_forwardGetAuthenticatedRemoteParticipantSecurityState:FAILED TO VALIDATE | Remote permissions. +ERROR [PARSE MESSAGE|0xDED844B7,0x87B9550F,0xB66DD964:0x000201C4{Entity=DR,MessageKind=DATA}|RECEIVE FROM 0x831AB06E,0x43876C36,0xFD825600:0x000201C3|:0x000001C1{Domain=0}|RECEIVE SAMPLE|PROCESS HANDSHAKE|LC:Security] PRESParticipant_authorizeRemoteParticipant:{"DDS:Security:LogTopicV2":{"f":"10","s":"3","t":{"s":"1748517682","n":"985078998"},"h":"RTISP-10036","i":"0.0.0.0","a":"RTI Secure DDS Application","p":"85248","k":"50331706","x":[{"DDS":[{"domain_id":"0"},{"guid":"DED844B7.87B9550F.B66DD964.000001C1"},{"plugin_class":"RTI:Auth"},{"plugin_method":"PRESParticipant_authorizeRemoteParticipant"}]}],"m":"unauthorized remote participant 831ab06e.43876c36.fd825600 denied by local participant ded844b7.87b9550f.b66dd964"}} +ERROR [PARSE MESSAGE|0xDED844B7,0x87B9550F,0xB66DD964:0x000201C4{Entity=DR,MessageKind=DATA}|RECEIVE FROM 0x831AB06E,0x43876C36,0xFD825600:0x000201C3|:0x000001C1{Domain=0}|RECEIVE SAMPLE|PROCESS HANDSHAKE|LC:Security] PRESParticipant_processHandshake:FAILED TO VALIDATE | Failed to authorize remote DP (GUID: 0x831AB06E,0x43876C36,0xFD825600:0x000001C1). +``` + +Communication will stop. + +## Renewing the Permissions Document + +This example updates the publisher DomainParticipant's Permissions Document +after 70 samples. At that point, communication with the subscriber will +resume. diff --git a/examples/connext_secure/dynamic_permissions/c++11/CMakeLists.txt b/examples/connext_secure/dynamic_permissions/c++11/CMakeLists.txt new file mode 100644 index 000000000..c4fac74da --- /dev/null +++ b/examples/connext_secure/dynamic_permissions/c++11/CMakeLists.txt @@ -0,0 +1,67 @@ +# +# (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. +# +# RTI grants Licensee a license to use, modify, compile, and create derivative +# works of the Software. Licensee has the right to distribute object form +# only for use with RTI products. The Software is provided "as is", with no +# warranty of any type, including any warranty for fitness for any purpose. +# RTI is under no obligation to maintain or support the Software. RTI shall +# not be liable for any incidental or consequential damages arising out of the +# use or inability to use the software. +# +cmake_minimum_required(VERSION 3.11) +project(rtiexamples-dynamic-permissions) +list(APPEND CMAKE_MODULE_PATH + "${CMAKE_CURRENT_SOURCE_DIR}/../../../../resources/cmake/Modules" +) +include(ConnextDdsConfigureCmakeUtils) +connextdds_configure_cmake_utils() + +find_package(RTIConnextDDS + "7.0.0" + REQUIRED + COMPONENTS + security_plugins +) + +if(NOT TARGET RTIConnextDDS::security_plugins) + message(WARNING "RTIConnextDDS::security_plugins component is missing. Skipping example") + return() +endif() + +# Include ConnextDdsAddExample.cmake from resources/cmake +include(ConnextDdsAddExample) + +connextdds_add_example( + IDL "dynamic_permissions" + LANG "C++11" +) + +include (ConnextDdsGenerateSecurityArtifacts) +connextdds_generate_security_artifacts() + +# Do a copy of the original subscriber's Permissions Document, but with the +# validity modified so that it expires in 1 minute. +add_custom_command( + OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/security/ecdsa01/xml/Permissions2_expiring.xml" + COMMAND ${CMAKE_COMMAND} + -DINPUT_FILE="${CMAKE_CURRENT_BINARY_DIR}/security/ecdsa01/xml/Permissions2.xml" + -DOUTPUT_FILE="${CMAKE_CURRENT_BINARY_DIR}/security/ecdsa01/xml/Permissions2_expiring.xml" + -P ${CMAKE_SOURCE_DIR}/modify_permissions.cmake + DEPENDS "${CMAKE_CURRENT_BINARY_DIR}/security/ecdsa01/xml/Permissions2.xml" +) + +# Sign the modified Permissions Document +connextdds_openssl_smime_sign( + INPUT "${CMAKE_CURRENT_BINARY_DIR}/security/ecdsa01/xml/Permissions2_expiring.xml" + OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/security/ecdsa01/xml/signed/signed_Permissions2_expiring.p7s" + SIGNER_CERTIFICATE "${CMAKE_CURRENT_BINARY_DIR}/security/ecdsa01/certs/ca_cert.pem" + PRIVATE_KEY_FILE "${CMAKE_CURRENT_BINARY_DIR}/security/ecdsa01/certs/ca_key.pem" +) + +# Create a Permissions Document that is about to expire +add_custom_target(createExpiringPermissions + ALL + DEPENDS + dynamic_permissions_securityArtifacts + "${CMAKE_CURRENT_BINARY_DIR}/security/ecdsa01/xml/signed/signed_Permissions2_expiring.p7s") diff --git a/examples/connext_secure/dynamic_permissions/c++11/USER_QOS_PROFILES.xml b/examples/connext_secure/dynamic_permissions/c++11/USER_QOS_PROFILES.xml new file mode 100644 index 000000000..2b31c22dd --- /dev/null +++ b/examples/connext_secure/dynamic_permissions/c++11/USER_QOS_PROFILES.xml @@ -0,0 +1,68 @@ + + + + + + + + + + dds.sec.auth.identity_ca + file:security/ecdsa01/certs/ca_cert.pem + + + dds.sec.auth.identity_certificate + file:security/ecdsa01/certs/peer1_cert.pem + + + dds.sec.auth.private_key + file:security/ecdsa01/certs/peer1_key.pem + + + dds.sec.access.permissions_ca + file:security/ecdsa01/certs/ca_cert.pem + + + dds.sec.access.governance + file:security/ecdsa01/xml/signed/signed_Governance.p7s + + + dds.sec.access.permissions + file:security/ecdsa01/xml/signed/signed_Permissions1.p7s + + + + + + + + + + + dds.sec.auth.identity_certificate + file:security/ecdsa01/certs/peer2_cert.pem + + + dds.sec.auth.private_key + file:security/ecdsa01/certs/peer2_key.pem + + + dds.sec.access.permissions + file:security/ecdsa01/xml/signed/signed_Permissions2_expiring.p7s + + + + + + + diff --git a/examples/connext_secure/dynamic_permissions/c++11/application.h b/examples/connext_secure/dynamic_permissions/c++11/application.h new file mode 100644 index 000000000..b4353c51e --- /dev/null +++ b/examples/connext_secure/dynamic_permissions/c++11/application.h @@ -0,0 +1,132 @@ +/* + * (c) Copyright, Real-Time Innovations, 2025. All rights reserved. + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the software solely for use with RTI Connext DDS. Licensee may + * redistribute copies of the software provided that all such copies are subject + * to this license. The software is provided "as is", with no warranty of any + * type, including any warranty for fitness for any purpose. RTI is under no + * obligation to maintain or support the software. RTI shall not be liable for + * any incidental or consequential damages arising out of the use or inability + * to use the software. + */ + +#ifndef APPLICATION_H +#define APPLICATION_H + +#include +#include +#include + +namespace application { + +// Catch control-C and tell application to shut down +bool shutdown_requested = false; + +inline void stop_handler(int) +{ + shutdown_requested = true; + std::cout << "preparing to shut down..." << std::endl; +} + +inline void setup_signal_handlers() +{ + signal(SIGINT, stop_handler); + signal(SIGTERM, stop_handler); +} + +enum ParseReturn { PARSE_RETURN_OK, PARSE_RETURN_FAILURE, PARSE_RETURN_EXIT }; + +struct ApplicationArguments { + ParseReturn parse_result; + unsigned int domain_id; + unsigned int sample_count; + NDDS_Config_LogVerbosity verbosity; +}; + +inline void set_verbosity(ApplicationArguments &arguments, int verbosity) +{ + switch (verbosity) { + case 0: + arguments.verbosity = NDDS_CONFIG_LOG_VERBOSITY_SILENT; + break; + case 1: + arguments.verbosity = NDDS_CONFIG_LOG_VERBOSITY_ERROR; + break; + case 2: + arguments.verbosity = NDDS_CONFIG_LOG_VERBOSITY_WARNING; + break; + case 3: + arguments.verbosity = NDDS_CONFIG_LOG_VERBOSITY_STATUS_ALL; + break; + default: + arguments.verbosity = NDDS_CONFIG_LOG_VERBOSITY_ERROR; + break; + } +} + +// Parses application arguments for example. Returns whether to exit. +inline void parse_arguments( + ApplicationArguments &arguments, + int argc, + char *argv[]) +{ + int arg_processing = 1; + bool show_usage = false; + arguments.domain_id = 0; + arguments.sample_count = INT_MAX; + arguments.verbosity = NDDS_CONFIG_LOG_VERBOSITY_ERROR; + arguments.parse_result = PARSE_RETURN_OK; + + while (arg_processing < argc) { + if ((argc > arg_processing + 1) + && (strcmp(argv[arg_processing], "-d") == 0 + || strcmp(argv[arg_processing], "--domain") == 0)) { + arguments.domain_id = atoi(argv[arg_processing + 1]); + arg_processing += 2; + } else if ( + (argc > arg_processing + 1) + && (strcmp(argv[arg_processing], "-s") == 0 + || strcmp(argv[arg_processing], "--sample-count") == 0)) { + arguments.sample_count = atoi(argv[arg_processing + 1]); + arg_processing += 2; + } else if ( + (argc > arg_processing + 1) + && (strcmp(argv[arg_processing], "-v") == 0 + || strcmp(argv[arg_processing], "--verbosity") == 0)) { + set_verbosity(arguments, atoi(argv[arg_processing + 1])); + arg_processing += 2; + } else if ( + strcmp(argv[arg_processing], "-h") == 0 + || strcmp(argv[arg_processing], "--help") == 0) { + std::cout << "Example application." << std::endl; + show_usage = true; + arguments.parse_result = PARSE_RETURN_EXIT; + break; + } else { + std::cout << "Bad parameter." << std::endl; + show_usage = true; + arguments.parse_result = PARSE_RETURN_FAILURE; + break; + } + } + if (show_usage) { + std::cout << "Usage:\n" + " -d, --domain Domain ID this " + "application will\n" + " subscribe in. \n" + " Default: 0\n" + " -s, --sample_count Number of samples to " + "receive before\n" + " cleanly shutting down. \n" + " Default: infinite\n" + " -v, --verbosity How much debugging output " + "to show.\n" + " Range: 0-3 \n" + " Default: 1" + << std::endl; + } +} + +} // namespace application + +#endif // APPLICATION_H diff --git a/examples/connext_secure/dynamic_permissions/c++11/application.hpp b/examples/connext_secure/dynamic_permissions/c++11/application.hpp new file mode 100644 index 000000000..3c8820e8b --- /dev/null +++ b/examples/connext_secure/dynamic_permissions/c++11/application.hpp @@ -0,0 +1,141 @@ +/* +* (c) Copyright, Real-Time Innovations, 2025. All rights reserved. +* RTI grants Licensee a license to use, modify, compile, and create derivative +* works of the software solely for use with RTI Connext DDS. Licensee may +* redistribute copies of the software provided that all such copies are subject +* to this license. The software is provided "as is", with no warranty of any +* type, including any warranty for fitness for any purpose. RTI is under no +* obligation to maintain or support the software. RTI shall not be liable for +* any incidental or consequential damages arising out of the use or inability +* to use the software. +*/ + +#ifndef APPLICATION_HPP +#define APPLICATION_HPP + +#include +#include +#include + +namespace application { + + // Catch control-C and tell application to shut down + bool shutdown_requested = false; + + inline void stop_handler(int) + { + shutdown_requested = true; + std::cout << "preparing to shut down..." << std::endl; + } + + inline void setup_signal_handlers() + { + signal(SIGINT, stop_handler); + signal(SIGTERM, stop_handler); + } + + enum class ParseReturn { + ok, + failure, + exit + }; + + struct ApplicationArguments { + ParseReturn parse_result; + unsigned int domain_id; + unsigned int sample_count; + rti::config::Verbosity verbosity; + + ApplicationArguments( + ParseReturn parse_result_param, + unsigned int domain_id_param, + unsigned int sample_count_param, + rti::config::Verbosity verbosity_param) + : parse_result(parse_result_param), + domain_id(domain_id_param), + sample_count(sample_count_param), + verbosity(verbosity_param) {} + }; + + inline void set_verbosity( + rti::config::Verbosity& verbosity, + int verbosity_value) + { + switch (verbosity_value) { + case 0: + verbosity = rti::config::Verbosity::SILENT; + break; + case 1: + verbosity = rti::config::Verbosity::EXCEPTION; + break; + case 2: + verbosity = rti::config::Verbosity::WARNING; + break; + case 3: + verbosity = rti::config::Verbosity::STATUS_ALL; + break; + default: + verbosity = rti::config::Verbosity::EXCEPTION; + break; + } + } + + // Parses application arguments for example. + inline ApplicationArguments parse_arguments(int argc, char *argv[]) + { + int arg_processing = 1; + bool show_usage = false; + ParseReturn parse_result = ParseReturn::ok; + unsigned int domain_id = 0; + unsigned int sample_count = (std::numeric_limits::max)(); + rti::config::Verbosity verbosity(rti::config::Verbosity::EXCEPTION); + + while (arg_processing < argc) { + if ((argc > arg_processing + 1) + && (strcmp(argv[arg_processing], "-d") == 0 + || strcmp(argv[arg_processing], "--domain") == 0)) { + domain_id = atoi(argv[arg_processing + 1]); + arg_processing += 2; + } else if ((argc > arg_processing + 1) + && (strcmp(argv[arg_processing], "-s") == 0 + || strcmp(argv[arg_processing], "--sample-count") == 0)) { + sample_count = atoi(argv[arg_processing + 1]); + arg_processing += 2; + } else if ((argc > arg_processing + 1) + && (strcmp(argv[arg_processing], "-v") == 0 + || strcmp(argv[arg_processing], "--verbosity") == 0)) { + set_verbosity(verbosity, atoi(argv[arg_processing + 1])); + arg_processing += 2; + } else if (strcmp(argv[arg_processing], "-h") == 0 + || strcmp(argv[arg_processing], "--help") == 0) { + std::cout << "Example application." << std::endl; + show_usage = true; + parse_result = ParseReturn::exit; + break; + } else { + std::cout << "Bad parameter." << std::endl; + show_usage = true; + parse_result = ParseReturn::failure; + break; + } + } + if (show_usage) { + std::cout << "Usage:\n"\ + " -d, --domain Domain ID this application will\n" \ + " subscribe in. \n" + " Default: 0\n"\ + " -s, --sample_count Number of samples to receive before\n"\ + " cleanly shutting down. \n" + " Default: infinite\n" + " -v, --verbosity How much debugging output to show.\n"\ + " Range: 0-3 \n" + " Default: 1" + << std::endl; + } + + return ApplicationArguments(parse_result, domain_id, sample_count, verbosity); + } + +} // namespace application + +#endif // APPLICATION_HPP diff --git a/examples/connext_secure/dynamic_permissions/c++11/dynamic_permissions.idl b/examples/connext_secure/dynamic_permissions/c++11/dynamic_permissions.idl new file mode 100644 index 000000000..1b39e1cd6 --- /dev/null +++ b/examples/connext_secure/dynamic_permissions/c++11/dynamic_permissions.idl @@ -0,0 +1,14 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ +struct DynamicPermissions { + int32 value; +}; diff --git a/examples/connext_secure/dynamic_permissions/c++11/dynamic_permissions_publisher.cxx b/examples/connext_secure/dynamic_permissions/c++11/dynamic_permissions_publisher.cxx new file mode 100644 index 000000000..570b4fb7f --- /dev/null +++ b/examples/connext_secure/dynamic_permissions/c++11/dynamic_permissions_publisher.cxx @@ -0,0 +1,104 @@ +/* + * (c) Copyright, Real-Time Innovations, 2025. All rights reserved. + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the software solely for use with RTI Connext DDS. Licensee may + * redistribute copies of the software provided that all such copies are subject + * to this license. The software is provided "as is", with no warranty of any + * type, including any warranty for fitness for any purpose. RTI is under no + * obligation to maintain or support the software. RTI shall not be liable for + * any incidental or consequential damages arising out of the use or inability + * to use the software. + */ + +#include + +#include +#include // for sleep() +#include // for logging + +#include "application.hpp" // for command line parsing and ctrl-c +#include "dynamic_permissions.hpp" + +void run_publisher_application( + unsigned int domain_id, + unsigned int sample_count) +{ + // Start communicating in a domain, usually one participant per application + dds::domain::DomainParticipant participant( + domain_id, + dds::core::QosProvider::Default().participant_qos( + "dynamic_permissions_Library::publisher")); + + // Create a Topic with a name and a datatype + dds::topic::Topic<::DynamicPermissions> topic( + participant, + "Example DynamicPermissions"); + + // Create a Publisher + dds::pub::Publisher publisher(participant); + + // Create a DataWriter with default QoS + dds::pub::DataWriter<::DynamicPermissions> writer(publisher, topic); + + ::DynamicPermissions data; + // Main loop, write data + for (unsigned int samples_written = 0; + !application::shutdown_requested && samples_written < sample_count; + samples_written++) { + // Modify the data to be written here + data.value = static_cast(samples_written); + std::cout << "Writing ::DynamicPermissions, count " << samples_written + << std::endl; + + writer.write(data); + + // Send once every second + rti::util::sleep(dds::core::Duration(1)); + + // The Permissions Document expires after 1 minute (~60 samples). + // Let's update it after 70 samples. At this point, the publisher and + // subscriber lost communication. This will be fixed by updating the + // Permissions Document. + if (samples_written == 70) { + std::cout << "Updating Permissions Document" << std::endl; + dds::domain::qos::DomainParticipantQos updated_qos = + participant.qos(); + updated_qos->property.set(rti::core::policy::Property::Entry( + "dds.sec.access.permissions", + "security/ecdsa01/xml/signed/signed_Permissions2.p7s")); + participant << updated_qos; + } + } +} + +int main(int argc, char *argv[]) +{ + using namespace application; + + // Parse arguments and handle control-C + auto arguments = parse_arguments(argc, argv); + if (arguments.parse_result == ParseReturn::exit) { + return EXIT_SUCCESS; + } else if (arguments.parse_result == ParseReturn::failure) { + return EXIT_FAILURE; + } + setup_signal_handlers(); + + // Sets Connext verbosity to help debugging + rti::config::Logger::instance().verbosity(arguments.verbosity); + + try { + run_publisher_application(arguments.domain_id, arguments.sample_count); + } catch (const std::exception &ex) { + // This will catch DDS exceptions + std::cerr << "Exception in run_publisher_application(): " << ex.what() + << std::endl; + return EXIT_FAILURE; + } + + // Releases the memory used by the participant factory. Optional at + // application exit + dds::domain::DomainParticipant::finalize_participant_factory(); + + return EXIT_SUCCESS; +} diff --git a/examples/connext_secure/dynamic_permissions/c++11/dynamic_permissions_subscriber.cxx b/examples/connext_secure/dynamic_permissions/c++11/dynamic_permissions_subscriber.cxx new file mode 100644 index 000000000..42edf4d60 --- /dev/null +++ b/examples/connext_secure/dynamic_permissions/c++11/dynamic_permissions_subscriber.cxx @@ -0,0 +1,113 @@ +/* + * (c) Copyright, Real-Time Innovations, 2025. All rights reserved. + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the software solely for use with RTI Connext DDS. Licensee may + * redistribute copies of the software provided that all such copies are subject + * to this license. The software is provided "as is", with no warranty of any + * type, including any warranty for fitness for any purpose. RTI is under no + * obligation to maintain or support the software. RTI shall not be liable for + * any incidental or consequential damages arising out of the use or inability + * to use the software. + */ + +#include +#include + +#include +#include +#include + +#include "dynamic_permissions.hpp" +#include "application.hpp" // for command line parsing and ctrl-c + +int process_data(dds::sub::DataReader<::DynamicPermissions> reader) +{ + // Take all samples + int count = 0; + dds::sub::LoanedSamples<::DynamicPermissions> samples = reader.take(); + for (auto sample : samples) { + if (sample.info().valid()) { + count++; + std::cout << sample.data() << std::endl; + } else { + std::cout << "Instance state changed to " + << sample.info().state().instance_state() << std::endl; + } + } + + return count; +} // The LoanedSamples destructor returns the loan + +void run_subscriber_application( + unsigned int domain_id, + unsigned int sample_count) +{ + // Start communicating in a domain, usually one participant per application + dds::domain::DomainParticipant participant( + domain_id, + dds::core::QosProvider::Default().participant_qos( + "dynamic_permissions_Library::subscriber")); + + // Create a Topic with a name and a datatype + dds::topic::Topic<::DynamicPermissions> topic( + participant, + "Example DynamicPermissions"); + + // Create a Subscriber and DataReader with default Qos + dds::sub::Subscriber subscriber(participant); + dds::sub::DataReader<::DynamicPermissions> reader(subscriber, topic); + + // Create a ReadCondition for any data received on this reader and set a + // handler to process the data + unsigned int samples_read = 0; + dds::sub::cond::ReadCondition read_condition( + reader, + dds::sub::status::DataState::any(), + [reader, &samples_read]() { + samples_read += process_data(reader); + }); + + // WaitSet will be woken when the attached condition is triggered + dds::core::cond::WaitSet waitset; + waitset += read_condition; + + while (!application::shutdown_requested && samples_read < sample_count) { + std::cout << "::DynamicPermissions subscriber sleeping up to 1 sec..." + << std::endl; + + // Run the handlers of the active conditions. Wait for up to 1 second. + waitset.dispatch(dds::core::Duration(1)); + } +} + +int main(int argc, char *argv[]) +{ + using namespace application; + + // Parse arguments and handle control-C + auto arguments = parse_arguments(argc, argv); + if (arguments.parse_result == ParseReturn::exit) { + return EXIT_SUCCESS; + } else if (arguments.parse_result == ParseReturn::failure) { + return EXIT_FAILURE; + } + setup_signal_handlers(); + + // Sets Connext verbosity to help debugging + rti::config::Logger::instance().verbosity(arguments.verbosity); + + try { + run_subscriber_application(arguments.domain_id, arguments.sample_count); + } catch (const std::exception &ex) { + // This will catch DDS exceptions + std::cerr << "Exception in run_subscriber_application(): " << ex.what() + << std::endl; + return EXIT_FAILURE; + } + + // Releases the memory used by the participant factory. Optional at + // application exit + dds::domain::DomainParticipant::finalize_participant_factory(); + + return EXIT_SUCCESS; +} diff --git a/examples/connext_secure/dynamic_permissions/c++11/modify_permissions.cmake b/examples/connext_secure/dynamic_permissions/c++11/modify_permissions.cmake new file mode 100644 index 000000000..ddd239f75 --- /dev/null +++ b/examples/connext_secure/dynamic_permissions/c++11/modify_permissions.cmake @@ -0,0 +1,46 @@ +file(READ ${INPUT_FILE} CONTENTS) + +# Find the positions of the start and end tags +string(FIND "${CONTENTS}" "" START_INDEX) +string(FIND "${CONTENTS}" "" END_INDEX) + +if(START_INDEX EQUAL -1 OR END_INDEX EQUAL -1) + message(FATAL_ERROR "Tags not found in the input file") +endif() + +# Compute the new contents +string(LENGTH "" NOT_AFTER_LENGTH) +math(EXPR START_INDEX "${START_INDEX} + ${NOT_AFTER_LENGTH}") +string(SUBSTRING "${CONTENTS}" 0 ${START_INDEX} BEFORE_START) + +string(LENGTH "${CONTENTS}" TOTAL_LENGTH) +math(EXPR TRAILING_LENGTH "${TOTAL_LENGTH} - ${END_INDEX}") +string(SUBSTRING "${CONTENTS}" ${END_INDEX} ${TRAILING_LENGTH} AFTER_END) + +# Replace with the current date + 1 minute +string(TIMESTAMP current_epoch "%s" UTC) +MATH(EXPR expiring_epoch "(${current_epoch} + 60)") + +# SOURCE_DATE_EPOCH allows the date and time to be set externally by an exported +# environment variable. If the SOURCE_DATE_EPOCH environment variable is set, +# the string(TIMESTAMP [...]) cmake command will return its value instead of the +# current time. +# Backup. +if (DEFINED ENV{SOURCE_DATE_EPOCH}) + set(_old_source_date_epoch ENV{SOURCE_DATE_EPOCH}) +endif() +# +# Set new value. +set(ENV{SOURCE_DATE_EPOCH} ${expiring_epoch}) +# +# Get the timestamp that we want. +string(TIMESTAMP expiring_date "%Y-%m-%dT%H:%M:%S" UTC) +# +# Revert old value. +if (DEFINED _old_source_date_epoch) + set(ENV{SOURCE_DATE_EPOCH} ${_old_source_date_epoch}) +else() + unset(ENV{SOURCE_DATE_EPOCH}) +endif() + +file(WRITE ${OUTPUT_FILE} "${BEFORE_START}${expiring_date}${AFTER_END}") diff --git a/examples/routing_service/udp_socket_adapter/CMakeLists.txt b/examples/routing_service/udp_socket_adapter_dynamic/CMakeLists.txt similarity index 91% rename from examples/routing_service/udp_socket_adapter/CMakeLists.txt rename to examples/routing_service/udp_socket_adapter_dynamic/CMakeLists.txt index 4181e5ae9..0fc9b611b 100644 --- a/examples/routing_service/udp_socket_adapter/CMakeLists.txt +++ b/examples/routing_service/udp_socket_adapter_dynamic/CMakeLists.txt @@ -1,5 +1,5 @@ # -# (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. +# (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. # # RTI grants Licensee a license to use, modify, compile, and create derivative # works of the Software. Licensee has the right to distribute object form @@ -37,6 +37,8 @@ add_library(${PROJECT_NAME} "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketInputDiscoveryStreamReader.hpp" "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamReader.cxx" "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamReader.hpp" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamWriter.cxx" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamWriter.hpp" "${CMAKE_CURRENT_SOURCE_DIR}/src/UdpSocket.cxx" "${CMAKE_CURRENT_SOURCE_DIR}/src/UdpSocket.hpp" ) diff --git a/examples/routing_service/udp_socket_adapter_dynamic/README.md b/examples/routing_service/udp_socket_adapter_dynamic/README.md new file mode 100644 index 000000000..9ab2ece60 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/README.md @@ -0,0 +1,213 @@ +# Example Code: Routing Service C++11 Socket Adapter using Dynamic Data + +## Example Description + +This example shows how to implement a simple Routing Service Adapter plugin +in C++11 to receive data from a UDP socket using RTI Routing Service. + +This examples uses dynamic data API and there is no need to know the data type +information beforehand. + +The code in this directory provides the following components: + +- `src/SocketAdapter` implements the plugin that is loaded by *RTI Routing +Service*. It responsible for creating and deleting connections. +- `src/SocketConnection` implements a connection. This component is +responsible for the creation and deletion of `StreamReaders`. +- `src/SocketInputDiscoveryStreamReader` implements the logic necessary to +propagate information about the discovered input streams (in this case +sockets) to the Routing Service. +- `src/SocketStreamReader` implements a `StreamReader` that reads sample +information from a UDP socket. +- `src/SocketStreamWriter` implements a `StreamWriter` that sends sample +information to a UDP socket. + + +For more details, please refer to the *RTI Routing Service SDK* documentation. + +## Building the C++ example + +In order to build this example, you need to define the variables +`CONNEXTDDS_DIR` and `CONNEXTDDS_ARCH`. You can do so by exporting them +manually, by sourcing the `rtisetenv` script for your architecture, or by +passing them to the `cmake` command as arguments: + +```bash +mkdir build +cd build +cmake -DCONNEXTDDS_DIR= \ # If not exported + -DCONNEXTDDS_ARCH= \ # If not exported + -DBUILD_SHARED_LIBS=ON|OFF \ # ON is preferred + -DCMAKE_BUILD_TYPE=Debug|Release .. +cmake --build . +cd .. +``` + +Example command for Windows: + +```bash +cmake .. -DCONNEXTDDS_DIR="%NDDSHOME%" -DCONNEXTDDS_ARCH=x64Win64VS2015 -DBUILD_SHARED_LIBS=ON -DCMAKE_BUILD_TYPE=Release -A x64 -G "Visual Studio 17 2022" +cd .. +``` + +**Note**: You do not need to define `CONNEXTDDS_ARCH` if you only have one +architecture target installed in your system. + +**Note**: When compiling on a Windows 64-bit machine you will need to add the +`-A x64` parameter to the call to CMake. + +**Note:** If you are using a multi-configuration generator, such as Visual +Studio Solutions, you can specify the configuration mode to build as follows: + +```bash +cmake --build . --config Release|Debug +``` + +Here is more information about generating +[Visual Studio Solutions for Windows using CMake](https://cmake.org/cmake/help/v3.16/generator/Visual%20Studio%2016%202019.html#platform-selection). + +**Note:** `BUILD_SHARED_LIBS` allows you to control if the generated library +for this example is a static or a dynamic shared library. The following +sections assume you are building a dynamic shared library. However, Routing +Service also supports static linking of adapters. To use this functionality +you would need to create an application that uses Routing Service as a library +component and statically links to this `SocketAdapter` library. + +### Cross-compilation + +When you need to cross-compile the example, the above +command will not work, the assigned compiler won't be the cross-compiler and +errors may happen when linking against the cross-compiled Connext binaries. +To fix this, you have to create a file with the architecture name and call +CMake with a specific flag called ``-DCMAKE_TOOLCHAIN_FILE``. +An example of the file to create with the toolchain settings (e.g. for an +ARM architectures): + +```cmake +set(CMAKE_SYSTEM_NAME Linux) +set(toolchain_path "/arm-bcm2708/gcc-linaro-arm-linux-gnueabihf-raspbian") +set(CMAKE_C_COMPILER "${toolchain_path}/bin/arm-linux-gnueabihf-gcc") +set(CMAKE_CXX_COMPILER "${toolchain_path}/bin/arm-linux-gnueabihf-g++") +``` + +Then you can call CMake like this: + +```bash +cmake -DCONNEXTDDS_DIR= -DCMAKE_TOOLCHAIN_FILE= + -DCONNEXTDDS_ARCH= .. +``` + +## Running the C++ example + +To run the example, you just need to run the following commands from the top +level folder. This example has been written to allow easy experimentation with +the RTI DDSPing tool shipped with *RTI Connext DDS* installer bundle. If you wish +to create a real Routing Service adapter, you should modify the code and XML accordingly. + +There are 2 configurations (`-cfgName`) in the Routing Service XML file: + +- **SocketAdapterToDDS** - It reads data from a UDP socket using the +SocketAdapter and outputs it to DDS. You can visualize the ouptut by running: + +- **DDSToSocketAdapter** - It sends data from DDS to a UDP socket. You can +publish DDS data by running command: + + +To run Routing Service, you will need first to set up your environment as +follows. + +Before running the RTI Routing Service, you need to specify where the +`SocketAdapterCpp` library is located as shown below: + +Linux: + +```bash +$export RTI_LD_LIBRARY_PATH=$NDDSHOME/lib/: +``` + +Windows: + +```bash +set PATH=%NDDSHOME%/lib/; +``` + +The SocketAdapterCpp library will be in the `./build` folder. + +```bash +# From the build/ directory +$NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName SocketAdapterToDDS +``` + +Here is an output from a sample run: + +```bash +$export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter_dynamic/build/ + +$ $NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName SocketAdapterToDDS +RTI Routing Service 7.3.0 executing (with name SocketAdapterToSocketAdapter) +``` + +Now you'll need to send data to the UDP sockets. By default, DDS Ping data is +expected on `127.0.0.1:10203`. You can change both the expected type and topic name +and the UDP socket configuration on `RsSocketAdapter.xml`. + +To run a simple test, run in different terminals: + +```bash +$export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter_dynamic/build/ + +$ $NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName DDSToSocketAdapter +RTI Routing Service 7.3.0 executing (with configuration=DDSToSocketAdapter) +``` + + +```bash + $NDDSHOME/bin/rtiddsping -publisher -domainId 0 +``` + +## Running a data-diode example + +You can configure a data-diode scenario by using two Routing Services instances; +- One using **DDSToSocketAdapter** configuration to publish DDS data over a one direction UDP socket +- The other using **SocketAdapterToDDS** configuration to convert back to DDS samples +``` + ┌───────────┐ ┌─────────────┐ ┌─────────────┐ ┌───────────┐ + │ Connext │ │ Routing │ ┌────────────────┐ │ Routing │ │ Connext │ + │ App ├─►│ Service ├───►│ UDP DATA DIODE ├──►│ Service ├─►│ App │ + │ │ │ DDS TO UDP │ └────────────────┘ │ UDP TO DDS │ │ │ + └───────────┘ └─────────────┘ └─────────────┘ └───────────┘ +``` +To run this example in a local machine: +```bash +$export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter_dynamic/build/ + +$ $NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName SocketAdapterToDDS +RTI Routing Service 7.3.0 executing (with configuration=SocketAdapterToDDS) +``` +And in a different terminal: +```bash +$export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter_dynamic/build/ + +$ $NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName DDSToSocketAdapter +RTI Routing Service 7.3.0 executing (with configuration=DDSToSocketAdapter) +``` + +Using the default configuration from RsSocketAdapter.xml, you need to publish DDS Ping data +on domain id 0 and subscribe to DSS Ping data on domain id 1: + +```bash + $NDDSHOME/bin/rtiddsping -publisher -domainId 0 +``` + +```bash + $NDDSHOME/bin/rtiddsping -subscriber -domainId 1 +``` + +## Requirements + +To run this example you will need: + +- RTI Connext Professional version 6.0.0 or higher. +- CMake version 3.10 or higher. +- A target platform with support for RTI Routing Service and C++11. +- Python3. diff --git a/examples/routing_service/udp_socket_adapter_dynamic/RsSocketAdapter.xml b/examples/routing_service/udp_socket_adapter_dynamic/RsSocketAdapter.xml new file mode 100644 index 000000000..5a693012f --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/RsSocketAdapter.xml @@ -0,0 +1,105 @@ + + + + + + SocketAdapterCpp + SocketAdapter_create_adapter_plugin + + + + + + + + + + 1 + + + + + + IMMEDIATE + PingType + PingStream + + + + + + receive_address + 127.0.0.1 + + + + receive_port + 10203 + + + + + + ON_DOMAIN_MATCH + PingType + PingTopic + + + + + + + + + + + + + + 0 + + + + + + + ON_ROUTE_MATCH + PingType + PingStream + + + + send_address + 127.0.0.1 + + + send_port + 0 + + + dest_address + 127.0.0.1 + + + dest_port + 10203 + + + + + + ON_DOMAIN_MATCH + PingType + PingTopic + + + + + + + + \ No newline at end of file diff --git a/examples/routing_service/udp_socket_adapter/src/SocketAdapter.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.cxx similarity index 95% rename from examples/routing_service/udp_socket_adapter/src/SocketAdapter.cxx rename to examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.cxx index a649473f0..a73226590 100644 --- a/examples/routing_service/udp_socket_adapter/src/SocketAdapter.cxx +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.cxx @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter/src/SocketAdapter.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.hpp similarity index 96% rename from examples/routing_service/udp_socket_adapter/src/SocketAdapter.hpp rename to examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.hpp index aec7eee95..0042adae7 100644 --- a/examples/routing_service/udp_socket_adapter/src/SocketAdapter.hpp +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.hpp @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.cxx new file mode 100644 index 000000000..2b7eb7434 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.cxx @@ -0,0 +1,75 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#include "SocketConnection.hpp" +#include "SocketStreamReader.hpp" +#include "SocketStreamWriter.hpp" +#include "SocketStreamWriter.hpp" + +using namespace rti::routing; +using namespace rti::routing::adapter; + +SocketConnection::SocketConnection( + StreamReaderListener *input_stream_discovery_listener, + StreamReaderListener *output_stream_discovery_listener, + const PropertySet &properties) + : input_discovery_reader_( + properties, + input_stream_discovery_listener) {}; + +StreamReader *SocketConnection::create_stream_reader( + Session *session, + const StreamInfo &info, + const PropertySet &properties, + StreamReaderListener *listener) +{ + return new SocketStreamReader(this, info, properties, listener); +} + +StreamWriter *SocketConnection::create_stream_writer( + Session *session, + const StreamInfo &info, + const PropertySet &properties) + { + return new SocketStreamWriter(this, info, properties); +} + +void SocketConnection::delete_stream_reader(StreamReader *reader) +{ + SocketStreamReader *socket_reader = + dynamic_cast(reader); + socket_reader->shutdown_socket_reader_thread(); + delete reader; +} + +void SocketConnection::delete_stream_writer(StreamWriter *writer) +{ + SocketStreamWriter *socket_writer = + dynamic_cast(writer); + delete writer; +} + +DiscoveryStreamReader *SocketConnection::input_stream_discovery_reader() +{ + return &input_discovery_reader_; +} + +DiscoveryStreamReader *SocketConnection::output_stream_discovery_reader() +{ + return nullptr; +} + +void SocketConnection::dispose_discovery_stream( + const rti::routing::StreamInfo &stream_info) +{ + input_discovery_reader_.dispose(stream_info); +} diff --git a/examples/routing_service/udp_socket_adapter/src/SocketConnection.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.hpp similarity index 81% rename from examples/routing_service/udp_socket_adapter/src/SocketConnection.hpp rename to examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.hpp index c79ccb00a..ae49f6c0c 100644 --- a/examples/routing_service/udp_socket_adapter/src/SocketConnection.hpp +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.hpp @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form @@ -37,13 +37,23 @@ class SocketConnection : public rti::routing::adapter::Connection { const rti::routing::PropertySet &properties, rti::routing::adapter::StreamReaderListener *listener) final; - // This function will also stop the receiving socket thread + rti::routing::adapter::StreamWriter *create_stream_writer( + rti::routing::adapter::Session *session, + const rti::routing::StreamInfo &info, + const rti::routing::PropertySet &properties) final; + void delete_stream_reader( rti::routing::adapter::StreamReader *reader) final; + void delete_stream_writer( + rti::routing::adapter::StreamWriter *writer) final; + rti::routing::adapter::DiscoveryStreamReader * input_stream_discovery_reader() final; + rti::routing::adapter::DiscoveryStreamReader * + output_stream_discovery_reader() final; + /** * @brief This function is called by the SocketStreamReader to indicate * that it's time to dispose the route. The dispose set by the diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.cxx new file mode 100644 index 000000000..a55323e11 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.cxx @@ -0,0 +1,88 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#include "SocketInputDiscoveryStreamReader.hpp" + +using namespace rti::routing; +using namespace rti::routing::adapter; + +SocketInputDiscoveryStreamReader::SocketInputDiscoveryStreamReader( + const PropertySet &, + StreamReaderListener *input_stream_discovery_listener) +{ + input_stream_discovery_listener_ = input_stream_discovery_listener; +} + +void SocketInputDiscoveryStreamReader::dispose( + const rti::routing::StreamInfo &stream_info) +{ + /** + * This guard is essential since the take() and return_loan() operations + * triggered by calling on_data_available() execute on an internal Routing + * Service thread. The custom dispose() operation doesn't run on that + * thread. Since the take() and return_loan() operations also need to access + * the data_samples_ list this protection is required. + */ + std::lock_guard guard(data_samples_mutex_); + + std::unique_ptr stream_info_disposed( + new StreamInfo( + stream_info.stream_name(), + stream_info.type_info().type_name())); + stream_info_disposed.get()->disposed(true); + + this->data_samples_.push_back(std::move(stream_info_disposed)); + input_stream_discovery_listener_->on_data_available(this); +} + +void SocketInputDiscoveryStreamReader::take( + std::vector &stream) +{ + /** + * This guard is essential since the take() and return_loan() operations + * triggered by calling on_data_available() execute on an internal Routing + * Service thread. The custom dispose() operation doesn't run on that + * thread. Since the take() and return_loan() operations also need to access + * the data_samples_ list this protection is required. + */ + std::lock_guard guard(data_samples_mutex_); + std::transform( + data_samples_.begin(), + data_samples_.end(), + std::back_inserter(stream), + [](const std::unique_ptr &element) { + return element.get(); + }); +} + +void SocketInputDiscoveryStreamReader::return_loan( + std::vector &stream) +{ + /** + * This guard is essential since the take() and return_loan() operations + * triggered by calling on_data_available() execute on an internal Routing + * Service thread. The custom dispose() operation doesn't run on that + * thread. Since the take() and return_loan() operations also need to access + * the data_samples_ list this protection is required. + */ + std::lock_guard guard(data_samples_mutex_); + + /** + * For discovery streams there will never be any outstanding return_loan(). + * Thus we can be sure that each take() will be followed by a call to + * return_loan(), before the next take() executes. + */ + this->data_samples_.erase( + data_samples_.begin(), + data_samples_.begin() + stream.size()); + stream.clear(); +} diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.hpp new file mode 100644 index 000000000..7f2bdb373 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.hpp @@ -0,0 +1,61 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#ifndef SOCKETDISCOVERYSTREAMREADER_HPP +#define SOCKETDISCOVERYSTREAMREADER_HPP + +#include +#include + +#include +#include + +/** + * This class implements a DiscoveryStreamReader, a special kind of StreamReader + * that provide discovery information about the available streams and their + * types. + */ + +class SocketInputDiscoveryStreamReader + : public rti::routing::adapter::DiscoveryStreamReader { +public: + SocketInputDiscoveryStreamReader( + const rti::routing::PropertySet &, + rti::routing::adapter::StreamReaderListener + *input_stream_discovery_listener); + + void take(std::vector &) final; + + void return_loan(std::vector &) final; + + /** + * @brief Custom operation defined to indicate disposing off an + * when the SocketStreamReader has finished reading from the socket. + * The SocketInputDiscoveryStreamReader will then create a new + * discovery sample indicating that the stream has been disposed. + * This will cause the Routing Service to start tearing down the Routes + * associated with having the corresponding + * and . + * + * @param stream_info \b in. Reference to a StreamInfo object which should + * be used when creating a new StreamInfo sample with disposed set to true + */ + void dispose(const rti::routing::StreamInfo &stream_info); + +private: + std::mutex data_samples_mutex_; + std::vector> data_samples_; + rti::routing::adapter::StreamReaderListener + *input_stream_discovery_listener_; +}; + +#endif diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx new file mode 100644 index 000000000..b040e958b --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx @@ -0,0 +1,148 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#include +#include +#include +#include +#include +#include + +#include +#include +#ifdef _WIN32 + #include + #pragma comment(lib, "ws2_32.lib") +#else + #include + #include + #include + #include + #include +#endif +#include "SocketStreamReader.hpp" +#include +#include +#include + + +using namespace dds::core::xtypes; +using namespace rti::routing; +using namespace rti::routing::adapter; + +void SocketStreamReader::socket_reading_thread() +{ + while (!stop_thread_) { + int received_bytes = 0; + socket->receive_data( + received_buffer_, + &received_bytes, + BUFFER_MAX_SIZE); + + // Most likely received nothing or there was an error + // Not doing any error handling here + if (received_bytes <= 0) { + // Sleep for a small period of time to avoid busy waiting + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + continue; + } + + { + std::lock_guard lock(buffer_mutex_); + received_buffers_.emplace(received_buffer_, received_buffer_ + received_bytes); + } + + reader_listener_->on_data_available(this); + } + + socket_connection_->dispose_discovery_stream(stream_info_); +} + +SocketStreamReader::SocketStreamReader( + SocketConnection *connection, + const StreamInfo &info, + const PropertySet &properties, + StreamReaderListener *listener) + : stop_thread_(false), + stream_info_(info.stream_name(), info.type_info().type_name()) +{ + socket_connection_ = connection; + reader_listener_ = listener; + adapter_type_ = static_cast(info.type_info().type_representation()); + + // Parse the properties provided in the xml configuration file + for (const auto &property : properties) { + if (property.first == RECEIVE_ADDRESS_STRING) { + receive_address_ = property.second; + } else if (property.first == RECEIVE_PORT_STRING) { + receive_port_ = std::stoi(property.second); + } + } + + socket = std::unique_ptr( + new UdpSocket(receive_address_.c_str(), receive_port_)); + + socketreader_thread_ = + std::thread(&SocketStreamReader::socket_reading_thread, this); +} + +void SocketStreamReader::take( + std::vector &samples, + std::vector &infos) +{ + take_buffer_.clear(); + { + std::unique_lock lock(buffer_mutex_); + if (received_buffers_.empty()) { + // No data available + samples.clear(); + infos.clear(); + return; + } + take_buffer_ = std::move(received_buffers_.front()); + received_buffers_.pop(); + } + + dds::core::xtypes::DynamicData deserialized_sample(*adapter_type_); + rti::core::xtypes::from_cdr_buffer(deserialized_sample, take_buffer_); + + samples.resize(1); + infos.resize(1); + + std::unique_ptr sample(new DynamicData(*adapter_type_)); + *sample = deserialized_sample; + samples[0] = sample.release(); + + return; +} + +void SocketStreamReader::return_loan( + std::vector &samples, + std::vector &infos) +{ + for (int i = 0; i < samples.size(); ++i) { + delete samples[i]; + delete infos[i]; + } + samples.clear(); + infos.clear(); +} + +void SocketStreamReader::shutdown_socket_reader_thread() +{ + stop_thread_ = true; + socketreader_thread_.join(); +} + +SocketStreamReader::~SocketStreamReader() +{ +} diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.hpp new file mode 100644 index 000000000..21517a9ff --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.hpp @@ -0,0 +1,90 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#ifndef SOCKETSTREAMREADER_HPP +#define SOCKETSTREAMREADER_HPP + +#include +#include +#include +#include + +#include "SocketConnection.hpp" +#include "UdpSocket.hpp" + +#include +#include + +#define BUFFER_MAX_SIZE 1024 +#define RECEIVE_ADDRESS_STRING "receive_address" +#define RECEIVE_PORT_STRING "receive_port" + +/** + * @brief StreamReader implementation for UDP socket input in RTI Routing Service. + * + * SocketStreamReader is a specific implementation of rti::routing::adapter::DynamicDataStreamReader + * that receives data from a UDP socket and makes it available to RTI Routing Service as DynamicData samples. + * + * This class manages a background thread to continuously read UDP packets from a specified address and port, + * buffering received data for consumption by the Routing Service. It supports thread-safe queuing of incoming + * data, loaning and returning DynamicData samples, and clean shutdown of the reading thread. + * + */ + +class SocketStreamReader : public rti::routing::adapter::DynamicDataStreamReader { +public: + SocketStreamReader( + SocketConnection *connection, + const rti::routing::StreamInfo &info, + const rti::routing::PropertySet &, + rti::routing::adapter::StreamReaderListener *listener); + + void take( + std::vector &, + std::vector &) final; + + void return_loan( + std::vector &, + std::vector &) final; + + void shutdown_socket_reader_thread(); + + ~SocketStreamReader(); + +private: + /** + * @brief Function used by socketreader_thread_ to read samples from the + * socket. + */ + void socket_reading_thread(); + + SocketConnection *socket_connection_; + rti::routing::adapter::StreamReaderListener *reader_listener_; + + std::unique_ptr socket; + + std::thread socketreader_thread_; + bool stop_thread_; + + std::ifstream input_socket_stream_; + std::string receive_address_; + int receive_port_; + char received_buffer_[BUFFER_MAX_SIZE]; + std::queue> received_buffers_; + std::mutex buffer_mutex_; + std::vector take_buffer_; + + rti::routing::StreamInfo stream_info_; + dds::core::xtypes::DynamicType *adapter_type_; +}; + +#endif diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.cxx new file mode 100644 index 000000000..8d08b0665 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.cxx @@ -0,0 +1,101 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#include +#include +#include +#include +#include + +#include +#include +#ifdef _WIN32 + #include + #pragma comment(lib, "ws2_32.lib") +#else + #include + #include + #include + #include + #include +#endif + +#include +#include +#include +#include "SocketStreamWriter.hpp" +#include "SocketStreamReader.hpp" //use ShapeType from here +#include + +using namespace dds::core::xtypes; +using namespace rti::routing; +using namespace rti::routing::adapter; + +SocketStreamWriter::SocketStreamWriter( + SocketConnection *connection, + const StreamInfo &info, + const PropertySet &properties + ) + : stream_info_(info.stream_name(), info.type_info().type_name()) +{ + + socket_connection_ = connection; + + adapter_type_ = + static_cast(info.type_info().type_representation()); + + + // Parse the properties provided in the xml configuration file + for (const auto &property : properties) { + if (property.first == SEND_ADDRESS_STRING) { + send_address_ = property.second; + } + else if (property.first == SEND_PORT_STRING) { + send_port_ = std::stoi(property.second); + } + else if (property.first == DEST_ADDRESS_STRING) + { + dest_address_ = property.second; + } + else if (property.first == DEST_PORT_STRING) + { + dest_port_ = std::stoi(property.second); + } + } + + socket = std::unique_ptr(new UdpSocket( + send_address_.c_str(), + send_port_)); +} + +int SocketStreamWriter::write( + const std::vector &samples, + const std::vector &infos) +{ + size_t len = 0; + for (const auto sample : samples) { + serialization_buffer_.clear(); + rti::core::xtypes::to_cdr_buffer(serialization_buffer_, *sample); + // Send the serialized data + len = socket->send_data( + serialization_buffer_.data(), + serialization_buffer_.size(), + dest_address_.c_str(), + dest_port_); + } + + return len; +} + +SocketStreamWriter::~SocketStreamWriter() +{ +} diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.hpp new file mode 100644 index 000000000..39fc32af8 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.hpp @@ -0,0 +1,76 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#ifndef SOCKETSTREAMWRITER_HPP +#define SOCKETSTREAMWRITER_HPP + +#include +#include +#include +#include + +#include "SocketConnection.hpp" +#include "UdpSocket.hpp" + +#include +#include + +#define BUFFER_MAX_SIZE 1024 +#define SEND_ADDRESS_STRING "send_address" +#define SEND_PORT_STRING "send_port" +#define DEST_ADDRESS_STRING "dest_address" +#define DEST_PORT_STRING "dest_port" + +/** + * @brief StreamWriter implementation for UDP socket output in RTI Routing Service. + * + * SocketStreamWriter is a specific implementation of rti::routing::adapter::DynamicDataStreamWriter + * that sends data to a UDP socket, making it available for external consumers outside DDS. + * + * This class is responsible for serializing DynamicData samples received from Routing Service + * and transmitting them as UDP packets to a specified destination address and port. + * It manages socket creation, serialization buffers, and the configuration of destination + * parameters via properties. + * + */ + +class SocketStreamWriter : public rti::routing::adapter::DynamicDataStreamWriter { +public: + explicit SocketStreamWriter( + SocketConnection *connection, + const rti::routing::StreamInfo &info, + const rti::routing::PropertySet & + ); + + virtual int write( + const std::vector &, + const std::vector &) final; + + ~SocketStreamWriter(); + + +private: + + SocketConnection *socket_connection_; + std::vector serialization_buffer_; + std::unique_ptr socket; + + int send_port_; + int dest_port_; + + std::string send_address_; + std::string dest_address_; + rti::routing::StreamInfo stream_info_; + dds::core::xtypes::DynamicType *adapter_type_; +}; + +#endif diff --git a/examples/routing_service/udp_socket_adapter/src/UdpSocket.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.cxx similarity index 85% rename from examples/routing_service/udp_socket_adapter/src/UdpSocket.cxx rename to examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.cxx index 3c1bf9f61..8da702a3d 100644 --- a/examples/routing_service/udp_socket_adapter/src/UdpSocket.cxx +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.cxx @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form @@ -97,4 +97,15 @@ void UdpSocket::receive_data( &client_addr_len); return; +} + +int UdpSocket::send_data(char* tx_buffer, int tx_length, const char* destAddr, int destPort) +{ + sockaddr_in dest_addr; + dest_addr.sin_family = AF_INET; + dest_addr.sin_port = htons(destPort); + dest_addr.sin_addr.s_addr = inet_addr(destAddr); + + size_t length = sendto(sockfd, tx_buffer, tx_length, 0, (struct sockaddr*)&dest_addr, sizeof(dest_addr)); + return (int)length; } \ No newline at end of file diff --git a/examples/routing_service/udp_socket_adapter/src/UdpSocket.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.hpp similarity index 56% rename from examples/routing_service/udp_socket_adapter/src/UdpSocket.hpp rename to examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.hpp index bc0773f41..024429e1b 100644 --- a/examples/routing_service/udp_socket_adapter/src/UdpSocket.hpp +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.hpp @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form @@ -10,6 +10,9 @@ * use or inability to use the software. */ +#ifndef UDPSOCKETUTILS_HPP +#define UDPSOCKETUTILS_HPP + #ifdef _WIN32 #include #include @@ -28,6 +31,20 @@ #pragma comment(lib, "ws2_32.lib") #endif +/** + * @brief Utility class for UDP socket communication in the RTI Routing Service UDP Socket Adapter. + * + * UdpSocket provides a lightweight abstraction for UDP socket communication, + * supporting both Windows and POSIX systems. It is designed to be used by the Routing Service + * UDP socket adapter to send and receive raw UDP packets as part of data bridging between + * external UDP sources and DDS. + * + * The class handles socket creation, binding to a specified IP address and port, and + * ensures non-blocking operation for efficient integration with multi-threaded applications. + * It provides methods for receiving data from any UDP client and for sending data to a + * specified destination address and port. + */ + class UdpSocket { public: UdpSocket(const char* ip, int port); @@ -37,6 +54,12 @@ class UdpSocket { int* received_bytes, int size_of_original_buffer); + int send_data( + char* tx_buffer, + int tx_length, + const char* destAddr, + int destPort); + private: #ifdef _WIN32 SOCKET sockfd; @@ -48,3 +71,5 @@ class UdpSocket { void init_socket(); void bind_socket(const char* ip, int port); }; + +#endif diff --git a/examples/routing_service/udp_socket_adapter_typed/CMakeLists.txt b/examples/routing_service/udp_socket_adapter_typed/CMakeLists.txt new file mode 100644 index 000000000..0fc9b611b --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/CMakeLists.txt @@ -0,0 +1,58 @@ +# +# (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. +# +# RTI grants Licensee a license to use, modify, compile, and create derivative +# works of the Software. Licensee has the right to distribute object form +# only for use with RTI products. The Software is provided "as is", with no +# warranty of any type, including any warranty for fitness for any purpose. +# RTI is under no obligation to maintain or support the Software. RTI shall +# not be liable for any incidental or consequential damages arising out of the +# use or inability to use the software. +# +cmake_minimum_required(VERSION 3.11) +project(SocketAdapterCpp) + +# Find RTI Connext dependencies +list(APPEND CMAKE_MODULE_PATH + "${CMAKE_CURRENT_SOURCE_DIR}/../../../resources/cmake/Modules" +) +include(ConnextDdsConfigureCmakeUtils) +connextdds_configure_cmake_utils() + +find_package( + RTIConnextDDS "7.3.0" + REQUIRED + COMPONENTS + core + routing_service +) + +# It may not be necessary to include the hpp files +add_library(${PROJECT_NAME} + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketAdapter.cxx" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketAdapter.hpp" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketConnection.cxx" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketConnection.hpp" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketInputDiscoveryStreamReader.cxx" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketInputDiscoveryStreamReader.hpp" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamReader.cxx" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamReader.hpp" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamWriter.cxx" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamWriter.hpp" + "${CMAKE_CURRENT_SOURCE_DIR}/src/UdpSocket.cxx" + "${CMAKE_CURRENT_SOURCE_DIR}/src/UdpSocket.hpp" +) + +set_property(TARGET ${PROJECT_NAME} PROPERTY CXX_STANDARD 11) +set_property(TARGET ${PROJECT_NAME} PROPERTY CXX_STANDARD_REQUIRED ON) + +target_link_libraries(${PROJECT_NAME} + RTIConnextDDS::routing_service_infrastructure + RTIConnextDDS::cpp2_api +) + +# To differentiate between debug and release builds +set_target_properties(${PROJECT_NAME} + PROPERTIES + DEBUG_POSTFIX "d" +) diff --git a/examples/routing_service/udp_socket_adapter/README.md b/examples/routing_service/udp_socket_adapter_typed/README.md similarity index 64% rename from examples/routing_service/udp_socket_adapter/README.md rename to examples/routing_service/udp_socket_adapter_typed/README.md index e69bfb04a..e84e8988a 100644 --- a/examples/routing_service/udp_socket_adapter/README.md +++ b/examples/routing_service/udp_socket_adapter_typed/README.md @@ -5,6 +5,8 @@ This example shows how to implement a simple Routing Service Adapter plugin in C++11 to receive data from a UDP socket using RTI Routing Service. +This example requires including a Types.xml file with the data type information. + The code in this directory provides the following components: - `src/SocketAdapter` implements the plugin that is loaded by *RTI Routing @@ -16,8 +18,12 @@ propagate information about the discovered input streams (in this case sockets) to the Routing Service. - `src/SocketStreamReader` implements an `StreamReader` that reads sample information from a UDP socket. +- `src/SocketStreamWriter` implements an `StreamWriter` that sends sample +information to a UDP socket. - `test/send_shape_to_socket.py` implements a simple tester to send shape type data to a UDP socket. +- `test/receive_shape_from_socket.py` implements a simple tester to receive shape +type data from a UDP socket. For more details, please refer to the *RTI Routing Service SDK* documentation. @@ -101,7 +107,7 @@ the Shapes Demo shipped with *RTI Connext DDS* installer bundle. You will find some hardcoded references to ShapeType and Square. If you wish to create a real Routing Service adapter, you should modify the code and XML accordingly. -There is 1 configuration (`-cfgName`) in the Routing Service XML file: +There are 2 configurations (`-cfgName`) in the Routing Service XML file: - **SocketAdapterToDDS** - It reads data from a UDP socket using the SocketAdapter and outputs it to DDS. You can visualize the ouptut by @@ -111,6 +117,8 @@ subscribing to Squares in Shapes Demo or running: $NDDSHOME/bin/rtiddsspy -printSample ``` +- **DDSToSocketAdapter** - It sends data from DDS to a UDP socket. + To run Routing Service, you will need first to set up your environment as follows. @@ -139,7 +147,7 @@ $NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName SocketAdap Here is an output from a sample run: ```bash -$export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter/build/ +$export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter_typed/build/ $ $NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName SocketAdapterToDDS RTI Routing Service 7.3.0 executing (with name SocketAdapterToSocketAdapter) @@ -158,6 +166,62 @@ python3 test/send_shape_to_socket.py 127.0.0.1 10203 You can now open a Shapes Demo instance on domain 0 and subscribe to Squares. You should start receiving a red Square. +Alternatively, you can also execute the test to send UDP sockets from DDS data: + +```bash +$export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter_typed/build/ + +$ $NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName DDSToSocketAdapter +RTI Routing Service 7.3.0 executing (with configuration=DDSToSocketAdapter) +``` + +And to test the UDP socket content run: +```bash +python3 test/read_shape_from_socket.py 10203 +``` + + +## Running a data-diode example + +You can configure a data-diode scenario by using two Routing Services instances; +- One using **DDSToSocketAdapter** configuration to publish DDS data over a one direction UDP socket +- The other using **SocketAdapterToDDS** configuration to convert back to DDS samples +``` + ┌───────────┐ ┌─────────────┐ ┌─────────────┐ ┌───────────┐ + │ Connext │ │ Routing │ ┌────────────────┐ │ Routing │ │ Connext │ + │ App ├─►│ Service ├───►│ UDP DATA DIODE ├──►│ Service ├─►│ App │ + │ │ │ DDS TO UDP │ └────────────────┘ │ UDP TO DDS │ │ │ + └───────────┘ └─────────────┘ └─────────────┘ └───────────┘ +``` +To run this example in a local machine: +```bash +$export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter_typed/build/ + +$ $NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName SocketAdapterToDDS +RTI Routing Service 7.3.0 executing (with configuration=SocketAdapterToDDS) +``` +And in a different terminal: +```bash +$export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter_typed/build/ + +$ $NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName DDSToSocketAdapter +RTI Routing Service 7.3.0 executing (with configuration=DDSToSocketAdapter) +``` + +Using the default configuration from RsSocketAdapter.xml, you need to publish Squares +on domain id 0 and subscribe to Squares on domain id 1 using rtishapes demo: + +```bash +$ $NDDSHOME/bin/rtishapesdemo -domainId 0 +``` + +```bash +$ $NDDSHOME/bin/rtishapesdemo -domainId 1 +``` +Then start publishing and subscribing to the Square topic. +You should be able to see red squares in the subscriber application. +Keep in mind the shape color has been overwritten in the adapter for showcasing purposes. + ## Requirements To run this example you will need: diff --git a/examples/routing_service/udp_socket_adapter/RsSocketAdapter.xml b/examples/routing_service/udp_socket_adapter_typed/RsSocketAdapter.xml similarity index 58% rename from examples/routing_service/udp_socket_adapter/RsSocketAdapter.xml rename to examples/routing_service/udp_socket_adapter_typed/RsSocketAdapter.xml index 757cf5948..15cfeb391 100644 --- a/examples/routing_service/udp_socket_adapter/RsSocketAdapter.xml +++ b/examples/routing_service/udp_socket_adapter_typed/RsSocketAdapter.xml @@ -28,13 +28,12 @@ - + - 0 - - - + 1 + + @@ -70,11 +69,61 @@ You could use Triangle or Circle if you also modify the other references to Square that are hardcoded in the adapter code --> Square - + - + + + + + + + 0 + + + + + + + ON_ROUTE_MATCH + ShapeType + Square + + + + send_address + 127.0.0.1 + + + send_port + 0 + + + dest_address + 127.0.0.1 + + + dest_port + 10203 + + + + + + ON_DOMAIN_MATCH + ShapeType + Square + + + + + + + + \ No newline at end of file diff --git a/examples/routing_service/udp_socket_adapter/Types.xml b/examples/routing_service/udp_socket_adapter_typed/Types.xml similarity index 100% rename from examples/routing_service/udp_socket_adapter/Types.xml rename to examples/routing_service/udp_socket_adapter_typed/Types.xml diff --git a/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.cxx b/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.cxx new file mode 100644 index 000000000..a73226590 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.cxx @@ -0,0 +1,49 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#include "SocketAdapter.hpp" +#include "SocketConnection.hpp" + +using namespace rti::routing; +using namespace rti::routing::adapter; + +SocketAdapter::SocketAdapter(PropertySet &properties) +{ +} + +Connection *SocketAdapter::create_connection( + rti::routing::adapter::detail::StreamReaderListener + *input_stream_discovery_listener, + rti::routing::adapter::detail::StreamReaderListener + *output_stream_discovery_listener, + const PropertySet &properties) +{ + return new SocketConnection( + input_stream_discovery_listener, + output_stream_discovery_listener, + properties); +} + +void SocketAdapter::delete_connection(Connection *connection) +{ + /** + * Perform cleanup pertaining to the connection object here. + */ + delete connection; +} + +rti::config::LibraryVersion SocketAdapter::get_version() const +{ + return { 1, 0, 0, 'r' }; +} + +RTI_ADAPTER_PLUGIN_CREATE_FUNCTION_DEF(SocketAdapter) diff --git a/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.hpp b/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.hpp new file mode 100644 index 000000000..0042adae7 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.hpp @@ -0,0 +1,52 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#ifndef SOCKETADAPTER_HPP +#define SOCKETADAPTER_HPP + +#include +#include +#include +#include +#include + +/* + * This is the initialization of the RS adapter. For simplicity, this adapter + * only reads from UDP and writes to DDS, not the other way around + */ +class SocketAdapter : public rti::routing::adapter::AdapterPlugin { +public: + explicit SocketAdapter(rti::routing::PropertySet &); + + rti::routing::adapter::Connection *create_connection( + rti::routing::adapter::detail::StreamReaderListener *, + rti::routing::adapter::detail::StreamReaderListener *, + const rti::routing::PropertySet &) final; + + void delete_connection(rti::routing::adapter::Connection *connection) final; + + rti::config::LibraryVersion get_version() const; +}; + +/** + * This macro defines a C-linkage symbol that can be used as create function + * for plug-in registration through XML. + * + * The generated symbol has the name: + * + * \code + * SocketAdapterPlugin_create_adapter_plugin + * \endcode + */ +RTI_ADAPTER_PLUGIN_CREATE_FUNCTION_DECL(SocketAdapter) + +#endif diff --git a/examples/routing_service/udp_socket_adapter/src/SocketConnection.cxx b/examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.cxx similarity index 75% rename from examples/routing_service/udp_socket_adapter/src/SocketConnection.cxx rename to examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.cxx index 729ce9bdf..c8f24a6fd 100644 --- a/examples/routing_service/udp_socket_adapter/src/SocketConnection.cxx +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.cxx @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form @@ -12,6 +12,7 @@ #include "SocketConnection.hpp" #include "SocketStreamReader.hpp" +#include "SocketStreamWriter.hpp" using namespace rti::routing; using namespace rti::routing::adapter; @@ -33,6 +34,14 @@ StreamReader *SocketConnection::create_stream_reader( return new SocketStreamReader(this, info, properties, listener); } +StreamWriter *SocketConnection::create_stream_writer( + Session *session, + const StreamInfo &info, + const PropertySet &properties) +{ + return new SocketStreamWriter(this, info, properties); +} + void SocketConnection::delete_stream_reader(StreamReader *reader) { SocketStreamReader *socket_reader = @@ -40,12 +49,22 @@ void SocketConnection::delete_stream_reader(StreamReader *reader) socket_reader->shutdown_socket_reader_thread(); delete reader; } +void SocketConnection::delete_stream_writer(StreamWriter *writer) +{ + SocketStreamWriter *socket_writer = dynamic_cast(writer); + delete writer; +} DiscoveryStreamReader *SocketConnection::input_stream_discovery_reader() { return &input_discovery_reader_; } +DiscoveryStreamReader *SocketConnection::output_stream_discovery_reader() +{ + return nullptr; +} + void SocketConnection::dispose_discovery_stream( const rti::routing::StreamInfo &stream_info) { diff --git a/examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.hpp b/examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.hpp new file mode 100644 index 000000000..ae49f6c0c --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.hpp @@ -0,0 +1,72 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#ifndef SOCKETCONNECTION_HPP +#define SOCKETCONNECTION_HPP + +#include +#include + + #include "SocketInputDiscoveryStreamReader.hpp" + +/* + * This class creates the RS Connection, which is an access point to our + * example data domain (a UDP socket) + */ +class SocketConnection : public rti::routing::adapter::Connection { +public: + SocketConnection( + rti::routing::adapter::StreamReaderListener + *input_stream_discovery_listener, + rti::routing::adapter::StreamReaderListener + *output_stream_discovery_listener, + const rti::routing::PropertySet &properties); + + rti::routing::adapter::StreamReader *create_stream_reader( + rti::routing::adapter::Session *session, + const rti::routing::StreamInfo &info, + const rti::routing::PropertySet &properties, + rti::routing::adapter::StreamReaderListener *listener) final; + + rti::routing::adapter::StreamWriter *create_stream_writer( + rti::routing::adapter::Session *session, + const rti::routing::StreamInfo &info, + const rti::routing::PropertySet &properties) final; + + void delete_stream_reader( + rti::routing::adapter::StreamReader *reader) final; + + void delete_stream_writer( + rti::routing::adapter::StreamWriter *writer) final; + + rti::routing::adapter::DiscoveryStreamReader * + input_stream_discovery_reader() final; + + rti::routing::adapter::DiscoveryStreamReader * + output_stream_discovery_reader() final; + + /** + * @brief This function is called by the SocketStreamReader to indicate + * that it's time to dispose the route. The dispose set by the + * SocketInputDiscoveryStreamReader starts the chain of cleanup procedure. + * + * @param stream_info \b in. Reference to a StreamInfo object which should + * be used when creating a new StreamInfo sample with disposed set to true + */ + void dispose_discovery_stream( + const rti::routing::StreamInfo &stream_info); + + private: + SocketInputDiscoveryStreamReader input_discovery_reader_; +}; + +#endif diff --git a/examples/routing_service/udp_socket_adapter/src/SocketInputDiscoveryStreamReader.cxx b/examples/routing_service/udp_socket_adapter_typed/src/SocketInputDiscoveryStreamReader.cxx similarity index 98% rename from examples/routing_service/udp_socket_adapter/src/SocketInputDiscoveryStreamReader.cxx rename to examples/routing_service/udp_socket_adapter_typed/src/SocketInputDiscoveryStreamReader.cxx index 79c3dddad..ecfd9c68e 100644 --- a/examples/routing_service/udp_socket_adapter/src/SocketInputDiscoveryStreamReader.cxx +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketInputDiscoveryStreamReader.cxx @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter/src/SocketInputDiscoveryStreamReader.hpp b/examples/routing_service/udp_socket_adapter_typed/src/SocketInputDiscoveryStreamReader.hpp similarity index 97% rename from examples/routing_service/udp_socket_adapter/src/SocketInputDiscoveryStreamReader.hpp rename to examples/routing_service/udp_socket_adapter_typed/src/SocketInputDiscoveryStreamReader.hpp index 48c75f337..5103762d9 100644 --- a/examples/routing_service/udp_socket_adapter/src/SocketInputDiscoveryStreamReader.hpp +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketInputDiscoveryStreamReader.hpp @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter/src/SocketStreamReader.cxx b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamReader.cxx similarity index 98% rename from examples/routing_service/udp_socket_adapter/src/SocketStreamReader.cxx rename to examples/routing_service/udp_socket_adapter_typed/src/SocketStreamReader.cxx index dbc2675c3..efec348ad 100644 --- a/examples/routing_service/udp_socket_adapter/src/SocketStreamReader.cxx +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamReader.cxx @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter/src/SocketStreamReader.hpp b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamReader.hpp similarity index 97% rename from examples/routing_service/udp_socket_adapter/src/SocketStreamReader.hpp rename to examples/routing_service/udp_socket_adapter_typed/src/SocketStreamReader.hpp index 5ab1e65a6..b3adf6d48 100644 --- a/examples/routing_service/udp_socket_adapter/src/SocketStreamReader.hpp +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamReader.hpp @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.cxx b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.cxx new file mode 100644 index 000000000..90d66d14f --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.cxx @@ -0,0 +1,130 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#include +#include +#include +#include +#include + +#include +#include +#ifdef _WIN32 + #include + #pragma comment(lib, "ws2_32.lib") +#else + #include + #include + #include + #include + #include +#endif + +#include +#include +#include +#include "SocketStreamWriter.hpp" +#include "SocketStreamReader.hpp" //use ShapeType from here +#include + +using namespace dds::core::xtypes; +using namespace rti::routing; +using namespace rti::routing::adapter; + +SocketStreamWriter::SocketStreamWriter( + SocketConnection *connection, + const StreamInfo &info, + const PropertySet &properties + ) + : stream_info_(info.stream_name(), info.type_info().type_name()) +{ + + socket_connection_ = connection; + + adapter_type_ = + static_cast(info.type_info().type_representation()); + + + // Parse the properties provided in the xml configuration file + for (const auto &property : properties) { + if (property.first == SEND_ADDRESS_STRING) { + send_address_ = property.second; + } + else if (property.first == SEND_PORT_STRING) { + send_port_ = std::stoi(property.second); + } + else if (property.first == DEST_ADDRESS_STRING) + { + dest_address_ = property.second; + } + else if (property.first == DEST_PORT_STRING) + { + dest_port_ = std::stoi(property.second); + } + } + + socket = std::unique_ptr(new UdpSocket( + send_address_.c_str(), + send_port_)); +} + +int SocketStreamWriter::write( + const std::vector &samples, + const std::vector &infos) +{ + size_t len = 0; + + ShapeType shapes; + uint32_t tempObject=0; + + for (const auto sample : samples) { + //send sample out UDP interface + if (sample->member_exists_in_type("shapesize")) + { + shapes.shapesize = sample->value("shapesize"); + shapes.x = sample->value("x"); + shapes.y = sample->value("y"); + len = +socket->send_data((char*)&shapes, sizeof(shapes), dest_address_.c_str(), dest_port_); + } + else + { + Logger::instance().local("Received Sample that is not valid ShapeType"); + } + } + return len; +} + +int SocketStreamWriter::write( + const std::vector &samples, + const std::vector &infos, + const SelectorState &selector_state) +{ + int len; + len = write(samples, infos); + return len; +} + +void SocketStreamWriter::return_loan( + std::vector &samples, + std::vector &infos) +{ + for (int i = 0; i < samples.size(); ++i) { + delete samples[i]; + delete infos[i]; + } + samples.clear(); + infos.clear(); +} + +SocketStreamWriter::~SocketStreamWriter() +{ +} diff --git a/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.hpp b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.hpp new file mode 100644 index 000000000..053a92810 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.hpp @@ -0,0 +1,84 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#ifndef SOCKETSTREAMWRITER_HPP +#define SOCKETSTREAMWRITER_HPP + +#include +#include +#include +#include + +#include "SocketConnection.hpp" +#include "UdpSocket.hpp" + +#include +#include + +#define BUFFER_MAX_SIZE 1024 +#define SEND_ADDRESS_STRING "send_address" +#define SEND_PORT_STRING "send_port" +#define DEST_ADDRESS_STRING "dest_address" +#define DEST_PORT_STRING "dest_port" + +class SocketStreamWriter : public rti::routing::adapter::DynamicDataStreamWriter { +public: + explicit SocketStreamWriter( + SocketConnection *connection, + const rti::routing::StreamInfo &info, + const rti::routing::PropertySet & + ); + + virtual int + write(const std::vector &, + const std::vector &) final; + + virtual int write( + const std::vector &, + const std::vector &, + const rti::routing::adapter::SelectorState &selector_state) final; + + virtual void return_loan( + std::vector &, + std::vector &) final; + + ~SocketStreamWriter(); + + +private: + /** + * @brief Function used by socketreader_thread_ to read samples from the + * socket. + */ + + + SocketConnection *socket_connection_; + + std::unique_ptr socket; + + int send_port_; + int dest_port_; + + std::string send_address_; + std::string dest_address_; + rti::routing::StreamInfo stream_info_; + dds::core::xtypes::DynamicType *adapter_type_; + + struct ShapeType { + int x; + int y; + int shapesize; + }; + +}; + +#endif diff --git a/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.cxx b/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.cxx new file mode 100644 index 000000000..8da702a3d --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.cxx @@ -0,0 +1,111 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#include "UdpSocket.hpp" + +#include + + +UdpSocket::UdpSocket(const char *ip, int port) +{ +#ifdef _WIN32 + WSADATA wsaData; + if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) { + throw dds::core::IllegalOperationError("WSAStartup failed"); + } +#endif + + // Socket initialization + init_socket(); + memset(&server_addr, 0, sizeof(server_addr)); + + // Using non-blocking sockets for easier thread management +#ifdef _WIN32 + unsigned long nonBlocking = 1; + if (ioctlsocket(sockfd, FIONBIO, &nonBlocking) != 0) { + std::cerr << "Error setting socket to non-blocking\n"; + closesocket(sockfd); + WSACleanup(); + throw dds::core::IllegalOperationError("ioctlsocket failed"); + } +#else + fcntl(sockfd, F_SETFL, O_NONBLOCK); +#endif + + // Bind the socket + bind_socket(ip, port); +} + +UdpSocket::~UdpSocket() +{ +#ifdef _WIN32 + closesocket(sockfd); + WSACleanup(); +#else + close(sockfd); +#endif +} + +void UdpSocket::init_socket() +{ + if ((sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { + throw dds::core::IllegalOperationError("Socket creation failed"); + } +} + +void UdpSocket::bind_socket(const char *ip, int port) +{ + server_addr.sin_family = AF_INET; + inet_pton(AF_INET, ip, &(server_addr.sin_addr)); + server_addr.sin_port = htons(port); + + if (bind(sockfd, + (const struct sockaddr *) &server_addr, + sizeof(server_addr)) + == -1) { + throw dds::core::IllegalOperationError("Bind failed"); + } +} + +void UdpSocket::receive_data( + char *received_buffer, + int *received_bytes, + int size_of_original_buffer) +{ + socklen_t len = sizeof(server_addr); + + socklen_t client_addr_len = sizeof(client_addr); + + /** Receive data.Since it's non-blocking, it will return right away most + * of the times + */ + *received_bytes = recvfrom( + sockfd, + received_buffer, + size_of_original_buffer, + 0, + (struct sockaddr *) &client_addr, + &client_addr_len); + + return; +} + +int UdpSocket::send_data(char* tx_buffer, int tx_length, const char* destAddr, int destPort) +{ + sockaddr_in dest_addr; + dest_addr.sin_family = AF_INET; + dest_addr.sin_port = htons(destPort); + dest_addr.sin_addr.s_addr = inet_addr(destAddr); + + size_t length = sendto(sockfd, tx_buffer, tx_length, 0, (struct sockaddr*)&dest_addr, sizeof(dest_addr)); + return (int)length; +} \ No newline at end of file diff --git a/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.hpp b/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.hpp new file mode 100644 index 000000000..024429e1b --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.hpp @@ -0,0 +1,75 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#ifndef UDPSOCKETUTILS_HPP +#define UDPSOCKETUTILS_HPP + +#ifdef _WIN32 + #include + #include +#else + #include + #include + #include + #include + #include + #include +#endif + +#include + +#ifdef _WIN32 + #pragma comment(lib, "ws2_32.lib") +#endif + +/** + * @brief Utility class for UDP socket communication in the RTI Routing Service UDP Socket Adapter. + * + * UdpSocket provides a lightweight abstraction for UDP socket communication, + * supporting both Windows and POSIX systems. It is designed to be used by the Routing Service + * UDP socket adapter to send and receive raw UDP packets as part of data bridging between + * external UDP sources and DDS. + * + * The class handles socket creation, binding to a specified IP address and port, and + * ensures non-blocking operation for efficient integration with multi-threaded applications. + * It provides methods for receiving data from any UDP client and for sending data to a + * specified destination address and port. + */ + +class UdpSocket { +public: + UdpSocket(const char* ip, int port); + ~UdpSocket(); + void receive_data( + char* received_buffer, + int* received_bytes, + int size_of_original_buffer); + + int send_data( + char* tx_buffer, + int tx_length, + const char* destAddr, + int destPort); + +private: +#ifdef _WIN32 + SOCKET sockfd; +#else + int sockfd; +#endif + struct sockaddr_in server_addr, client_addr; + + void init_socket(); + void bind_socket(const char* ip, int port); +}; + +#endif diff --git a/examples/routing_service/udp_socket_adapter_typed/test/read_shape_from_socket.py b/examples/routing_service/udp_socket_adapter_typed/test/read_shape_from_socket.py new file mode 100644 index 000000000..f29f078e5 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/test/read_shape_from_socket.py @@ -0,0 +1,47 @@ +import socket +import sys +import struct + +# Receive and parse data from the socket +def receive_data(sock): + # Receive data from the socket + data, addr = sock.recvfrom(1024) # Buffer size of 1024 bytes + # Unpack the data as 3 int types (x, y, shapesize) + x, y, size = struct.unpack("iii", data) + return x, y, size, addr + + +def main(): + if len(sys.argv) != 2: + print( + "Usage: python3 read_shape_from_socket.py " + ) + return + + # Input arguments + port = int(sys.argv[1]) + + samples_received = 0 + + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + try: + # Bind the socket to listen on the specified port + sock.bind(('', port)) + print(f"Listening for shape data on port {port}...") + + while True: + # Receive data from the socket + x, y, size, addr = receive_data(sock) + + # Print the received shape data + samples_received += 1 + print(f"Sample #{samples_received} from {addr}: x={x}, y={y}, size={size}") + + except KeyboardInterrupt: + print("\nExiting...") + except Exception as e: + print(f"Error: {e}") + + +if __name__ == "__main__": + main() diff --git a/examples/routing_service/udp_socket_adapter/test/send_shape_to_socket.py b/examples/routing_service/udp_socket_adapter_typed/test/send_shape_to_socket.py similarity index 100% rename from examples/routing_service/udp_socket_adapter/test/send_shape_to_socket.py rename to examples/routing_service/udp_socket_adapter_typed/test/send_shape_to_socket.py diff --git a/resources/cmake/Modules/ConnextDdsGenerateSecurityArtifacts.cmake b/resources/cmake/Modules/ConnextDdsGenerateSecurityArtifacts.cmake index 65fe6693b..168178f27 100644 --- a/resources/cmake/Modules/ConnextDdsGenerateSecurityArtifacts.cmake +++ b/resources/cmake/Modules/ConnextDdsGenerateSecurityArtifacts.cmake @@ -90,7 +90,7 @@ function(connextdds_generate_security_artifacts) ) set(xmls_name Governance Permissions1 Permissions2) foreach(xml ${xmls_name}) - list(APPEND artifacts_input_files "${xml}.xml") + list(APPEND artifacts_input_files "${openssl_working_dir}/xml/${xml}.xml") endforeach() add_custom_command( @@ -143,12 +143,12 @@ function(connextdds_generate_security_artifacts) OUTPUT_KEY_FILE "${ca_key_file}" OUTPUT_CERT_FILE "${ca_cert_file}" CRL_NUMBER_FILE "${openssl_temporary_dir}/crlNumber" - TEXT DIGEST SHA256 DAYS ${expiration_days} ECPARAM_NAME prime256v1 ECPARAM_OUTPUT_FILE "${openssl_temporary_dir}/ecdsaparam" CONFIG_FILE "${ca_config_file}" + CA_EXTENSION v3_ca WORKING_DIRECTORY "${openssl_working_dir}" ) @@ -157,7 +157,6 @@ function(connextdds_generate_security_artifacts) OUTPUT_CERT_FILE "${peer1_cert_file}" OUTPUT_CERT_REQUEST_FILE "${openssl_temporary_dir}/peer1_req_cert.pem" OUTPUT_KEY_FILE "${peer1_key_file}" - TEXT ECPARAM_NAME "prime256v1" ECPARAM_OUTPUT_FILE "${openssl_temporary_dir}/ecdsaparam1" CONFIG_FILE "${peer1_config_file}" @@ -173,7 +172,6 @@ function(connextdds_generate_security_artifacts) OUTPUT_CERT_FILE "${peer2_cert_file}" OUTPUT_CERT_REQUEST_FILE "${openssl_temporary_dir}/peer2_req_cert.pem" OUTPUT_KEY_FILE "${peer2_key_file}" - TEXT ECPARAM_NAME "prime256v1" ECPARAM_OUTPUT_FILE "${openssl_temporary_dir}/ecdsaparam1" CONFIG_FILE "${peer2_config_file}" diff --git a/resources/security/xml/Governance.xml b/resources/security/xml/Governance.xml index 21e8ebf12..eedc857a7 100644 --- a/resources/security/xml/Governance.xml +++ b/resources/security/xml/Governance.xml @@ -1,6 +1,6 @@ + xsi:noNamespaceSchemaLocation="http://community.rti.com/schema/7.7.0/dds_security_governance.xsd"> @@ -10,6 +10,7 @@ false true + true NONE NONE ENCRYPT diff --git a/tests/examples/connext_dds/request_reply/cs/Primes.csproj b/tests/examples/connext_dds/request_reply/cs/Primes.csproj index 80b434c38..abd8a63c9 100644 --- a/tests/examples/connext_dds/request_reply/cs/Primes.csproj +++ b/tests/examples/connext_dds/request_reply/cs/Primes.csproj @@ -6,8 +6,8 @@ - - + + diff --git a/tutorials/rpc/py/robot_client.py b/tutorials/rpc/py/robot_client.py index b3452faa9..b80451772 100644 --- a/tutorials/rpc/py/robot_client.py +++ b/tutorials/rpc/py/robot_client.py @@ -24,10 +24,7 @@ async def main(): participant = dds.DomainParticipant(domain_id=0) client = RobotControlClient(participant, "MyRobotControl") - # For versions 7.4.0 and below: - sleep(2) - # For newer versions you can use the following: - # await client.wait_for_service_async(dds.Duration(20)) + await client.wait_for_service_async(dds.Duration(20)) # Available in Connext 7.5.0+ print("Calling walk_to...") result = await client.walk_to(