diff --git a/awscrt/mqtt.py b/awscrt/mqtt.py index 7529168ea..ed2fc248a 100644 --- a/awscrt/mqtt.py +++ b/awscrt/mqtt.py @@ -15,7 +15,7 @@ from awscrt.http import HttpProxyOptions, HttpRequest from awscrt.io import ClientBootstrap, ClientTlsContext, SocketOptions from dataclasses import dataclass -from awscrt.mqtt5 import Client as Mqtt5Client +from awscrt.mqtt5 import Client as Mqtt5Client, SdkMetrics class QoS(IntEnum): @@ -330,6 +330,10 @@ class Connection(NativeResource): proxy_options (Optional[awscrt.http.HttpProxyOptions]): Optional proxy options for all connections. + + enable_metrics (bool): If true, enable IoT SDK metrics in CONNECT packet username field, otherwise, disabled. + Default to True. You may set it to false if you are not using AWS IoT services, and + using a custom authentication mechanism. """ def __init__(self, @@ -355,7 +359,8 @@ def __init__(self, proxy_options=None, on_connection_success=None, on_connection_failure=None, - on_connection_closed=None + on_connection_closed=None, + enable_metrics=True, ): assert isinstance(client, Client) or isinstance(client, Mqtt5Client) @@ -408,6 +413,10 @@ def __init__(self, self.password = password self.socket_options = socket_options if socket_options else SocketOptions() self.proxy_options = proxy_options if proxy_options else websocket_proxy_options + if enable_metrics: + self.metrics = SdkMetrics() + else: + self.metrics = None self._binding = _awscrt.mqtt_client_connection_new( self, @@ -524,7 +533,8 @@ def on_connect(error_code, return_code, session_present): self.password, self.clean_session, on_connect, - self.proxy_options + self.proxy_options, + self.metrics ) except Exception as e: diff --git a/awscrt/mqtt5.py b/awscrt/mqtt5.py index 7c5e4f31f..1d03401a9 100644 --- a/awscrt/mqtt5.py +++ b/awscrt/mqtt5.py @@ -17,6 +17,18 @@ from inspect import signature +@dataclass +class SdkMetrics: + """ + Configuration for IoT SDK metrics that are embedded in MQTT username field. + + Args: + library_name (str): The SDK library name (e.g., "IoTDeviceSDK/Python") + + """ + library_name: str = "IoTDeviceSDK/Python" + + class QoS(IntEnum): """MQTT message delivery quality of service. @@ -1158,6 +1170,7 @@ class ConnectPacket: will_delay_interval_sec (int): A time interval, in seconds, that the server should wait (for a session reconnection) before sending the will message associated with the connection's session. If omitted or None, the server will send the will when the associated session is destroyed. If the session is destroyed before a will delay interval has elapsed, then the will must be sent at the time of session destruction. will (PublishPacket): The definition of a message to be published when the connection's session is destroyed by the server or when the will delay interval has elapsed, whichever comes first. If None, then nothing will be sent. user_properties (Sequence[UserProperty]): List of MQTT5 user properties included with the packet. + """ keep_alive_interval_sec: int = None client_id: str = None @@ -1338,6 +1351,8 @@ class ClientOptions: on_lifecycle_event_connection_success_fn (Callable[[LifecycleConnectSuccessData],]): Callback for Lifecycle Event Connection Success. on_lifecycle_event_connection_failure_fn (Callable[[LifecycleConnectFailureData],]): Callback for Lifecycle Event Connection Failure. on_lifecycle_event_disconnection_fn (Callable[[LifecycleDisconnectData],]): Callback for Lifecycle Event Disconnection. + enable_metrics (bool): If true, enable IoT SDK metrics in CONNECT packet username field, otherwise, disabled. Default to True. You may set it to false if you are not using AWS IoT services, and using a custom authentication mechanism. + """ host_name: str port: int = None @@ -1364,6 +1379,7 @@ class ClientOptions: on_lifecycle_event_connection_success_fn: Callable[[LifecycleConnectSuccessData], None] = None on_lifecycle_event_connection_failure_fn: Callable[[LifecycleConnectFailureData], None] = None on_lifecycle_event_disconnection_fn: Callable[[LifecycleDisconnectData], None] = None + enable_metrics: bool = True def _check_callback(callback): @@ -1392,6 +1408,7 @@ def __init__(self, client_options: ClientOptions): self._on_lifecycle_connection_failure_cb = _check_callback( client_options.on_lifecycle_event_connection_failure_fn) self._on_lifecycle_disconnection_cb = _check_callback(client_options.on_lifecycle_event_disconnection_fn) + self._enable_metrics = client_options.enable_metrics def _ws_handshake_transform(self, http_request_binding, http_headers_binding, native_userdata): if self._ws_handshake_transform_cb is None: @@ -1704,7 +1721,8 @@ def __init__( ping_timeout_ms: int, keep_alive_secs: int, ack_timeout_secs: int, - clean_session: int): + clean_session: int, + enable_metrics: bool): self.host_name = host_name self.port = port self.client_id = "" if client_id is None else client_id @@ -1715,6 +1733,7 @@ def __init__( self.keep_alive_secs: int = 1200 if keep_alive_secs is None else keep_alive_secs self.ack_timeout_secs: int = 0 if ack_timeout_secs is None else ack_timeout_secs self.clean_session: bool = True if clean_session is None else clean_session + self.enable_metrics: bool = True if enable_metrics is None else enable_metrics class Client(NativeResource): @@ -1728,7 +1747,6 @@ class Client(NativeResource): """ def __init__(self, client_options: ClientOptions): - super().__init__() core = _ClientCore(client_options) @@ -1746,6 +1764,12 @@ def __init__(self, client_options: ClientOptions): if not socket_options: socket_options = SocketOptions() + # Handle metrics configuration + if client_options.enable_metrics: + self.metrics = SdkMetrics() + else: + self.metrics = None + if not connect_options.will: is_will_none = True will = PublishPacket() @@ -1785,6 +1809,8 @@ def __init__(self, client_options: ClientOptions): will.correlation_data_bytes or will.correlation_data, will.content_type, will.user_properties, + client_options.enable_metrics, + self.metrics.library_name if self.metrics else None, client_options.session_behavior, client_options.extended_validation_and_flow_control_options, client_options.offline_queue_behavior, @@ -1811,7 +1837,8 @@ def __init__(self, client_options: ClientOptions): keep_alive_secs=connect_options.keep_alive_interval_sec, ack_timeout_secs=client_options.ack_timeout_sec, clean_session=( - client_options.session_behavior < ClientSessionBehaviorType.REJOIN_ALWAYS if client_options.session_behavior else True)) + client_options.session_behavior < ClientSessionBehaviorType.REJOIN_ALWAYS if client_options.session_behavior else True), + enable_metrics=client_options.enable_metrics) def start(self): """Notifies the MQTT5 client that you want it maintain connectivity to the configured endpoint. @@ -2043,5 +2070,6 @@ def new_connection(self, on_connection_interrupted=None, on_connection_resumed=N use_websockets=False, websocket_proxy_options=None, websocket_handshake_transform=None, - proxy_options=None + proxy_options=None, + enable_metrics=self.adapter_options.enable_metrics ) diff --git a/crt/aws-c-mqtt b/crt/aws-c-mqtt index 1d512d927..154e9c5dd 160000 --- a/crt/aws-c-mqtt +++ b/crt/aws-c-mqtt @@ -1 +1 @@ -Subproject commit 1d512d92709f60b74e2cafa018e69a2e647f28e9 +Subproject commit 154e9c5dd19ec6d8d4cd690492d2a029f13e2035 diff --git a/source/mqtt5_client.c b/source/mqtt5_client.c index 243af6a0e..90be7554b 100644 --- a/source/mqtt5_client.c +++ b/source/mqtt5_client.c @@ -831,6 +831,10 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) { PyObject *will_delay_interval_sec_py; /* optional uint32_t */ PyObject *user_properties_py; /* optional */ + /* Metrics */ + PyObject *is_metrics_enabled_py; /* optional enable metrics */ + struct aws_byte_cursor metrics_library_name; /* optional IoT SDK metrics username */ + /* Will */ PyObject *is_will_none_py; /* optional PublishPacket */ PyObject *will_qos_val_py; @@ -862,7 +866,7 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) { if (!PyArg_ParseTuple( args, - "Os#IOOOOz#Oz#z#OOOOOOOOOz*Oz#OOOz#z*z#OOOOOOOOOOOOOO", + "Os#IOOOOz#Oz#z#OOOOOOOOOz*Oz#OOOz#z*z#OOz#OOOOOOOOOOOOO", /* O */ &self_py, /* s */ &host_name.ptr, /* # */ &host_name.len, @@ -904,6 +908,11 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) { /* # */ &will_content_type.len, /* O */ &will_user_properties_py, + /* Metrics */ + /* O */ &is_metrics_enabled_py, + /* z */ &metrics_library_name.ptr, + /* # */ &metrics_library_name.len, + /* O */ &session_behavior_py, /* O */ &extended_validation_and_flow_control_options_py, /* O */ &offline_queue_behavior_py, @@ -1279,6 +1288,14 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) { connect_options.will = &will; } + /* METRICS */ + struct aws_mqtt_iot_sdk_metrics metrics_tmp; + AWS_ZERO_STRUCT(metrics_tmp); + if (PyObject_IsTrue(is_metrics_enabled_py)) { + metrics_tmp.library_name = metrics_library_name; + client_options.metrics = &metrics_tmp; + } + /* CALLBACKS */ Py_INCREF(client_core_py); diff --git a/source/mqtt_client_connection.c b/source/mqtt_client_connection.c index 78a26057e..60261aab2 100644 --- a/source/mqtt_client_connection.c +++ b/source/mqtt_client_connection.c @@ -437,6 +437,39 @@ static void s_on_connect( PyGILState_Release(state); } +/* If unsuccessful, false is returned and a Python error has been set */ +bool s_set_metrics(struct aws_mqtt_client_connection *connection, PyObject *metrics) { + assert(metrics && (metrics != Py_None)); + + if (connection == NULL) { + return false; + } + + bool success = false; + + PyObject *library_name_py = PyObject_GetAttrString(metrics, "library_name"); + struct aws_byte_cursor library_name = aws_byte_cursor_from_pyunicode(library_name_py); + if (!library_name.ptr) { + PyErr_SetString(PyExc_TypeError, "metrics.library_name must be str type"); + goto done; + } + + struct aws_mqtt_iot_sdk_metrics metrics_struct = { + .library_name = library_name, + }; + + if (aws_mqtt_client_connection_set_metrics(connection, &metrics_struct)) { + PyErr_SetAwsLastError(); + success = false; + } + + success = true; + +done: + Py_DECREF(library_name_py); + return success; +} + /* If unsuccessful, false is returned and a Python error has been set */ bool s_set_will(struct aws_mqtt_client_connection *connection, PyObject *will) { assert(will && (will != Py_None)); @@ -668,9 +701,10 @@ PyObject *aws_py_mqtt_client_connection_connect(PyObject *self, PyObject *args) PyObject *is_clean_session; PyObject *on_connect; PyObject *proxy_options_py; + PyObject *metrics_py; if (!PyArg_ParseTuple( args, - "Os#s#IOOKKHIIOz#z#OOO", + "Os#s#IOOKKHIIOz#z#OOOO", &impl_capsule, &client_id, &client_id_len, @@ -691,7 +725,8 @@ PyObject *aws_py_mqtt_client_connection_connect(PyObject *self, PyObject *args) &password_len, &is_clean_session, &on_connect, - &proxy_options_py)) { + &proxy_options_py, + &metrics_py)) { return NULL; } @@ -773,6 +808,13 @@ PyObject *aws_py_mqtt_client_connection_connect(PyObject *self, PyObject *args) } } + // If metrics is None, we do not set metrics at all. + if (metrics_py != Py_None) { + if (!s_set_metrics(py_connection->native, metrics_py)) { + goto done; + } + } + if (on_connect != Py_None) { Py_INCREF(on_connect); py_connection->on_connect = on_connect; diff --git a/test/test_mqtt.py b/test/test_mqtt.py index f8435316e..4040601d1 100644 --- a/test/test_mqtt.py +++ b/test/test_mqtt.py @@ -629,7 +629,8 @@ def _test_mqtt311_direct_connect_basic_auth(self): host_name=input_host_name, port=input_port, username=input_username, - password=input_password) + password=input_password, + enable_metrics=False) connection.connect().result(TIMEOUT) connection.disconnect().result(TIMEOUT) @@ -760,7 +761,8 @@ def sign_function(transform_args, **kwargs): username=input_username, password=input_password, use_websockets=True, - websocket_handshake_transform=sign_function) + websocket_handshake_transform=sign_function, + enable_metrics=False) connection.connect().result(TIMEOUT) connection.disconnect().result(TIMEOUT) diff --git a/test/test_mqtt5.py b/test/test_mqtt5.py index 83ee5928a..55e11d045 100644 --- a/test/test_mqtt5.py +++ b/test/test_mqtt5.py @@ -203,7 +203,8 @@ def _test_direct_connect_minimum(self): client_options = mqtt5.ClientOptions( host_name=input_host_name, - port=input_port + port=input_port, + enable_metrics=False ) callbacks = Mqtt5TestCallbacks() client = self._create_client(client_options=client_options, callbacks=callbacks) @@ -229,7 +230,8 @@ def _test_direct_connect_basic_auth(self): client_options = mqtt5.ClientOptions( host_name=input_host_name, port=input_port, - connect_options=connect_options + connect_options=connect_options, + enable_metrics=False ) callbacks = Mqtt5TestCallbacks() client = self._create_client(client_options=client_options, callbacks=callbacks) @@ -416,7 +418,8 @@ def _test_websocket_connect_basic_auth(self): client_options = mqtt5.ClientOptions( host_name=input_host_name, port=input_port, - connect_options=connect_options + connect_options=connect_options, + enable_metrics=False ) callbacks = Mqtt5TestCallbacks() client_options.websocket_handshake_transform = callbacks.ws_handshake_transform @@ -1845,6 +1848,74 @@ def _test_operation_statistics_uc1(self): def test_operation_statistics_uc1(self): test_retry_wrapper(self._test_operation_statistics_uc1) + # ============================================================== + # METRICS TEST CASES + # ============================================================== + + def _test_metrics_enabled_default(self): + input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") + input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") + input_key = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY") + + tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path( + input_cert, + input_key + ) + client_options = mqtt5.ClientOptions( + host_name=input_host_name, + port=8883 + ) + client_options.connect_options = mqtt5.ConnectPacket(client_id=create_client_id()) + client_options.tls_ctx = io.ClientTlsContext(tls_ctx_options) + callbacks = Mqtt5TestCallbacks() + client = self._create_client(client_options=client_options, callbacks=callbacks) + + # Verify metrics are enabled by default + self.assertTrue(client_options.enable_metrics) + self.assertIsNotNone(client.metrics) + self.assertEqual(client.metrics.library_name, "IoTDeviceSDK/Python") + + client.start() + callbacks.future_connection_success.result(TIMEOUT) + client.stop() + callbacks.future_stopped.result(TIMEOUT) + + def test_metrics_enabled_default(self): + test_retry_wrapper(self._test_metrics_enabled_default) + + def _test_metrics_disabled(self): + input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") + input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") + input_key = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY") + + tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path( + input_cert, + input_key + ) + client_options = mqtt5.ClientOptions( + host_name=input_host_name, + port=8883, + enable_metrics=False + ) + client_options.connect_options = mqtt5.ConnectPacket( + client_id=create_client_id() + ) + client_options.tls_ctx = io.ClientTlsContext(tls_ctx_options) + callbacks = Mqtt5TestCallbacks() + client = self._create_client(client_options=client_options, callbacks=callbacks) + + # Verify metrics are disabled + self.assertFalse(client_options.enable_metrics) + self.assertIsNone(client.metrics) + + client.start() + callbacks.future_connection_success.result(TIMEOUT) + client.stop() + callbacks.future_stopped.result(TIMEOUT) + + def test_metrics_disabled(self): + test_retry_wrapper(self._test_metrics_disabled) + if __name__ == 'main': unittest.main()