From f3eecd01afa2466327a89beee9f55cd640c9161d Mon Sep 17 00:00:00 2001 From: fengzero Date: Tue, 24 Feb 2026 06:35:33 +0000 Subject: [PATCH] core(driver): try connect device when no tags --- include/neuron/plugin.h | 1 + plugins/datalayers/datalayers_plugin_intf.c | 17 ++++++----- plugins/ekuiper/plugin_ekuiper.c | 17 ++++++----- plugins/file/file_plugin.c | 17 ++++++----- plugins/modbus/modbus_rtu.c | 17 ++++++----- plugins/modbus/modbus_tcp.c | 33 ++++++++++++++++----- plugins/monitor/monitor.c | 17 ++++++----- plugins/mqtt/mqtt_plugin_intf.c | 17 ++++++----- src/adapter/adapter.c | 22 ++++++++++++++ src/adapter/adapter_internal.h | 1 + src/adapter/driver/driver.c | 25 ++++++++++++++++ src/adapter/driver/driver_internal.h | 2 ++ 12 files changed, 130 insertions(+), 56 deletions(-) diff --git a/include/neuron/plugin.h b/include/neuron/plugin.h index 96773d3b4..d089d0aae 100644 --- a/include/neuron/plugin.h +++ b/include/neuron/plugin.h @@ -89,6 +89,7 @@ typedef struct neu_plugin_intf_funs { int (*start)(neu_plugin_t *plugin); int (*stop)(neu_plugin_t *plugin); int (*setting)(neu_plugin_t *plugin, const char *setting); + int (*try_connect)(neu_plugin_t *plugin); int (*request)(neu_plugin_t *plugin, neu_reqresp_head_t *head, void *data); diff --git a/plugins/datalayers/datalayers_plugin_intf.c b/plugins/datalayers/datalayers_plugin_intf.c index e539ee99b..8331edc9f 100644 --- a/plugins/datalayers/datalayers_plugin_intf.c +++ b/plugins/datalayers/datalayers_plugin_intf.c @@ -266,12 +266,13 @@ int datalayers_plugin_request(neu_plugin_t *plugin, neu_reqresp_head_t *head, } const neu_plugin_intf_funs_t datalayers_plugin_intf_funs = { - .open = datalayers_plugin_open, - .close = datalayers_plugin_close, - .init = datalayers_plugin_init, - .uninit = datalayers_plugin_uninit, - .start = datalayers_plugin_start, - .stop = datalayers_plugin_stop, - .setting = datalayers_plugin_config, - .request = datalayers_plugin_request, + .open = datalayers_plugin_open, + .close = datalayers_plugin_close, + .init = datalayers_plugin_init, + .uninit = datalayers_plugin_uninit, + .start = datalayers_plugin_start, + .stop = datalayers_plugin_stop, + .setting = datalayers_plugin_config, + .request = datalayers_plugin_request, + .try_connect = NULL, }; \ No newline at end of file diff --git a/plugins/ekuiper/plugin_ekuiper.c b/plugins/ekuiper/plugin_ekuiper.c index 582f0144e..ad98730ca 100644 --- a/plugins/ekuiper/plugin_ekuiper.c +++ b/plugins/ekuiper/plugin_ekuiper.c @@ -423,14 +423,15 @@ static int ekuiper_plugin_request(neu_plugin_t * plugin, } static const neu_plugin_intf_funs_t plugin_intf_funs = { - .open = ekuiper_plugin_open, - .close = ekuiper_plugin_close, - .init = ekuiper_plugin_init, - .uninit = ekuiper_plugin_uninit, - .start = ekuiper_plugin_start, - .stop = ekuiper_plugin_stop, - .setting = ekuiper_plugin_config, - .request = ekuiper_plugin_request, + .open = ekuiper_plugin_open, + .close = ekuiper_plugin_close, + .init = ekuiper_plugin_init, + .uninit = ekuiper_plugin_uninit, + .start = ekuiper_plugin_start, + .stop = ekuiper_plugin_stop, + .setting = ekuiper_plugin_config, + .request = ekuiper_plugin_request, + .try_connect = NULL, }; const neu_plugin_module_t neu_plugin_module = { diff --git a/plugins/file/file_plugin.c b/plugins/file/file_plugin.c index 4aecf2974..d48b51988 100644 --- a/plugins/file/file_plugin.c +++ b/plugins/file/file_plugin.c @@ -49,14 +49,15 @@ static int driver_write(neu_plugin_t *plugin, void *req, neu_datatag_t *tag, neu_value_u value); static const neu_plugin_intf_funs_t plugin_intf_funs = { - .open = driver_open, - .close = driver_close, - .init = driver_init, - .uninit = driver_uninit, - .start = driver_start, - .stop = driver_stop, - .setting = driver_config, - .request = driver_request, + .open = driver_open, + .close = driver_close, + .init = driver_init, + .uninit = driver_uninit, + .start = driver_start, + .stop = driver_stop, + .setting = driver_config, + .request = driver_request, + .try_connect = NULL, .driver.validate_tag = driver_validate_tag, .driver.group_timer = driver_group_timer, diff --git a/plugins/modbus/modbus_rtu.c b/plugins/modbus/modbus_rtu.c index 7b83d28e4..faf4d6a64 100644 --- a/plugins/modbus/modbus_rtu.c +++ b/plugins/modbus/modbus_rtu.c @@ -44,14 +44,15 @@ static int driver_write(neu_plugin_t *plugin, void *req, neu_datatag_t *tag, static int driver_write_tags(neu_plugin_t *plugin, void *req, UT_array *tags); static const neu_plugin_intf_funs_t plugin_intf_funs = { - .open = driver_open, - .close = driver_close, - .init = driver_init, - .uninit = driver_uninit, - .start = driver_start, - .stop = driver_stop, - .setting = driver_config, - .request = driver_request, + .open = driver_open, + .close = driver_close, + .init = driver_init, + .uninit = driver_uninit, + .start = driver_start, + .stop = driver_stop, + .setting = driver_config, + .request = driver_request, + .try_connect = NULL, .driver.validate_tag = driver_validate_tag, .driver.group_timer = driver_group_timer, diff --git a/plugins/modbus/modbus_tcp.c b/plugins/modbus/modbus_tcp.c index 493c938a4..72a521166 100644 --- a/plugins/modbus/modbus_tcp.c +++ b/plugins/modbus/modbus_tcp.c @@ -36,6 +36,7 @@ static int driver_stop(neu_plugin_t *plugin); static int driver_config(neu_plugin_t *plugin, const char *config); static int driver_request(neu_plugin_t *plugin, neu_reqresp_head_t *head, void *data); +static int driver_connect(neu_plugin_t *plugin); static int driver_validate_tag(neu_plugin_t *plugin, neu_datatag_t *tag); static int driver_group_timer(neu_plugin_t *plugin, neu_plugin_group_t *group); @@ -46,14 +47,15 @@ static int driver_test_read_tag(neu_plugin_t *plugin, void *req, neu_datatag_t tag); static const neu_plugin_intf_funs_t plugin_intf_funs = { - .open = driver_open, - .close = driver_close, - .init = driver_init, - .uninit = driver_uninit, - .start = driver_start, - .stop = driver_stop, - .setting = driver_config, - .request = driver_request, + .open = driver_open, + .close = driver_close, + .init = driver_init, + .uninit = driver_uninit, + .start = driver_start, + .stop = driver_stop, + .setting = driver_config, + .request = driver_request, + .try_connect = driver_connect, .driver.validate_tag = driver_validate_tag, .driver.group_timer = driver_group_timer, @@ -388,6 +390,21 @@ static int driver_config(neu_plugin_t *plugin, const char *config) return 0; } +static int driver_connect(neu_plugin_t *plugin) +{ + if (plugin->conn != NULL) { + if (neu_conn_is_connected(plugin->conn)) { + plugin->common.link_state = NEU_NODE_LINK_STATE_CONNECTED; + } else { + neu_conn_connect(plugin->conn); + if (neu_conn_is_connected(plugin->conn)) { + plugin->common.link_state = NEU_NODE_LINK_STATE_CONNECTED; + } + } + } + return 0; +} + static int driver_request(neu_plugin_t *plugin, neu_reqresp_head_t *head, void *data) { diff --git a/plugins/monitor/monitor.c b/plugins/monitor/monitor.c index aebbbb214..6e8675a18 100644 --- a/plugins/monitor/monitor.c +++ b/plugins/monitor/monitor.c @@ -397,14 +397,15 @@ static int monitor_plugin_stop(neu_plugin_t *plugin) } static const neu_plugin_intf_funs_t plugin_intf_funs = { - .open = monitor_plugin_open, - .close = monitor_plugin_close, - .init = monitor_plugin_init, - .uninit = monitor_plugin_uninit, - .start = monitor_plugin_start, - .stop = monitor_plugin_stop, - .setting = monitor_plugin_config, - .request = monitor_plugin_request, + .open = monitor_plugin_open, + .close = monitor_plugin_close, + .init = monitor_plugin_init, + .uninit = monitor_plugin_uninit, + .start = monitor_plugin_start, + .stop = monitor_plugin_stop, + .setting = monitor_plugin_config, + .request = monitor_plugin_request, + .try_connect = NULL, }; #define DEFAULT_MONITOR_PLUGIN_DESCR \ diff --git a/plugins/mqtt/mqtt_plugin_intf.c b/plugins/mqtt/mqtt_plugin_intf.c index 61435a653..42c5e8599 100644 --- a/plugins/mqtt/mqtt_plugin_intf.c +++ b/plugins/mqtt/mqtt_plugin_intf.c @@ -622,12 +622,13 @@ int mqtt_plugin_request(neu_plugin_t *plugin, neu_reqresp_head_t *head, } const neu_plugin_intf_funs_t mqtt_plugin_intf_funs = { - .open = mqtt_plugin_open, - .close = mqtt_plugin_close, - .init = mqtt_plugin_init, - .uninit = mqtt_plugin_uninit, - .start = mqtt_plugin_start, - .stop = mqtt_plugin_stop, - .setting = mqtt_plugin_config, - .request = mqtt_plugin_request, + .open = mqtt_plugin_open, + .close = mqtt_plugin_close, + .init = mqtt_plugin_init, + .uninit = mqtt_plugin_uninit, + .start = mqtt_plugin_start, + .stop = mqtt_plugin_stop, + .setting = mqtt_plugin_config, + .request = mqtt_plugin_request, + .try_connect = NULL, }; diff --git a/src/adapter/adapter.c b/src/adapter/adapter.c index 5bf01c245..62c0136e5 100644 --- a/src/adapter/adapter.c +++ b/src/adapter/adapter.c @@ -2097,6 +2097,10 @@ int neu_adapter_uninit(neu_adapter_t *adapter) neu_event_del_io(adapter->events, adapter->control_io); if (adapter->module->type == NEU_NA_TYPE_DRIVER) { + if (adapter->timer_connect != NULL) { + neu_event_del_timer(adapter->events, adapter->timer_connect); + adapter->timer_connect = NULL; + } neu_adapter_driver_destroy((neu_adapter_driver_t *) adapter); } @@ -2133,6 +2137,19 @@ int neu_adapter_start(neu_adapter_t *adapter) neu_adapter_driver_start_group_timer( (neu_adapter_driver_t *) adapter); } + + if (adapter->module->type == NEU_NA_TYPE_DRIVER) { + neu_event_timer_param_t param = { + .second = 5, + .millisecond = 0, + .usr_data = (void *) adapter, + .type = NEU_EVENT_TIMER_BLOCK, + }; + + param.cb = neu_adapter_driver_try_connect; + adapter->timer_connect = + neu_event_add_timer(adapter->events, param); + } } return error; @@ -2176,6 +2193,11 @@ int neu_adapter_stop(neu_adapter_t *adapter) (neu_adapter_driver_t *) adapter); } neu_adapter_reset_metrics(adapter); + + if (adapter->timer_connect != NULL) { + neu_event_del_timer(adapter->events, adapter->timer_connect); + adapter->timer_connect = NULL; + } } return error; diff --git a/src/adapter/adapter_internal.h b/src/adapter/adapter_internal.h index 3ff190c75..d6325434e 100644 --- a/src/adapter/adapter_internal.h +++ b/src/adapter/adapter_internal.h @@ -55,6 +55,7 @@ struct neu_adapter { neu_event_timer_t *timer_lev; int64_t timestamp_lev; + neu_event_timer_t *timer_connect; // metrics neu_node_metrics_t *metrics; diff --git a/src/adapter/driver/driver.c b/src/adapter/driver/driver.c index 672e49eef..f40c7d4d4 100644 --- a/src/adapter/driver/driver.c +++ b/src/adapter/driver/driver.c @@ -741,6 +741,31 @@ int neu_adapter_driver_uninit(neu_adapter_driver_t *driver) return 0; } +int neu_adapter_driver_try_connect(void *param) +{ + neu_adapter_driver_t *driver = (neu_adapter_driver_t *) param; + if (driver->tag_cnt > 0) { + return 0; + } + + if (driver->adapter.state != NEU_NODE_RUNNING_STATE_RUNNING) { + return 0; + } + + neu_plugin_common_t *common = + neu_plugin_to_plugin_common(driver->adapter.plugin); + if (common->link_state == NEU_NODE_LINK_STATE_CONNECTED) { + return 0; + } + + if (driver->adapter.module->intf_funs->try_connect == NULL) { + return 0; + } + + return driver->adapter.module->intf_funs->try_connect( + driver->adapter.plugin); +} + static inline void start_group_timer(neu_adapter_driver_t *driver, group_t *grp) { uint32_t interval = neu_group_get_interval(grp->group); diff --git a/src/adapter/driver/driver_internal.h b/src/adapter/driver/driver_internal.h index a7cce8278..4a1c217e0 100644 --- a/src/adapter/driver/driver_internal.h +++ b/src/adapter/driver/driver_internal.h @@ -28,6 +28,8 @@ void neu_adapter_driver_destroy(neu_adapter_driver_t *driver); int neu_adapter_driver_init(neu_adapter_driver_t *driver); int neu_adapter_driver_uninit(neu_adapter_driver_t *driver); +int neu_adapter_driver_try_connect(void *param); + void neu_adapter_driver_start_group_timer(neu_adapter_driver_t *driver); void neu_adapter_driver_stop_group_timer(neu_adapter_driver_t *driver);