Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/neuron/plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ typedef struct neu_plugin_intf_funs {
int (*start)(neu_plugin_t *plugin);
int (*stop)(neu_plugin_t *plugin);
int (*setting)(neu_plugin_t *plugin, const char *setting);
int (*try_connect)(neu_plugin_t *plugin);

int (*request)(neu_plugin_t *plugin, neu_reqresp_head_t *head, void *data);

Expand Down
17 changes: 9 additions & 8 deletions plugins/datalayers/datalayers_plugin_intf.c
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,13 @@ int datalayers_plugin_request(neu_plugin_t *plugin, neu_reqresp_head_t *head,
}

const neu_plugin_intf_funs_t datalayers_plugin_intf_funs = {
.open = datalayers_plugin_open,
.close = datalayers_plugin_close,
.init = datalayers_plugin_init,
.uninit = datalayers_plugin_uninit,
.start = datalayers_plugin_start,
.stop = datalayers_plugin_stop,
.setting = datalayers_plugin_config,
.request = datalayers_plugin_request,
.open = datalayers_plugin_open,
.close = datalayers_plugin_close,
.init = datalayers_plugin_init,
.uninit = datalayers_plugin_uninit,
.start = datalayers_plugin_start,
.stop = datalayers_plugin_stop,
.setting = datalayers_plugin_config,
.request = datalayers_plugin_request,
.try_connect = NULL,
};
17 changes: 9 additions & 8 deletions plugins/ekuiper/plugin_ekuiper.c
Original file line number Diff line number Diff line change
Expand Up @@ -423,14 +423,15 @@ static int ekuiper_plugin_request(neu_plugin_t * plugin,
}

static const neu_plugin_intf_funs_t plugin_intf_funs = {
.open = ekuiper_plugin_open,
.close = ekuiper_plugin_close,
.init = ekuiper_plugin_init,
.uninit = ekuiper_plugin_uninit,
.start = ekuiper_plugin_start,
.stop = ekuiper_plugin_stop,
.setting = ekuiper_plugin_config,
.request = ekuiper_plugin_request,
.open = ekuiper_plugin_open,
.close = ekuiper_plugin_close,
.init = ekuiper_plugin_init,
.uninit = ekuiper_plugin_uninit,
.start = ekuiper_plugin_start,
.stop = ekuiper_plugin_stop,
.setting = ekuiper_plugin_config,
.request = ekuiper_plugin_request,
.try_connect = NULL,
};

const neu_plugin_module_t neu_plugin_module = {
Expand Down
17 changes: 9 additions & 8 deletions plugins/file/file_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,15 @@ static int driver_write(neu_plugin_t *plugin, void *req, neu_datatag_t *tag,
neu_value_u value);

static const neu_plugin_intf_funs_t plugin_intf_funs = {
.open = driver_open,
.close = driver_close,
.init = driver_init,
.uninit = driver_uninit,
.start = driver_start,
.stop = driver_stop,
.setting = driver_config,
.request = driver_request,
.open = driver_open,
.close = driver_close,
.init = driver_init,
.uninit = driver_uninit,
.start = driver_start,
.stop = driver_stop,
.setting = driver_config,
.request = driver_request,
.try_connect = NULL,

.driver.validate_tag = driver_validate_tag,
.driver.group_timer = driver_group_timer,
Expand Down
17 changes: 9 additions & 8 deletions plugins/modbus/modbus_rtu.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@ static int driver_write(neu_plugin_t *plugin, void *req, neu_datatag_t *tag,
static int driver_write_tags(neu_plugin_t *plugin, void *req, UT_array *tags);

static const neu_plugin_intf_funs_t plugin_intf_funs = {
.open = driver_open,
.close = driver_close,
.init = driver_init,
.uninit = driver_uninit,
.start = driver_start,
.stop = driver_stop,
.setting = driver_config,
.request = driver_request,
.open = driver_open,
.close = driver_close,
.init = driver_init,
.uninit = driver_uninit,
.start = driver_start,
.stop = driver_stop,
.setting = driver_config,
.request = driver_request,
.try_connect = NULL,

.driver.validate_tag = driver_validate_tag,
.driver.group_timer = driver_group_timer,
Expand Down
33 changes: 25 additions & 8 deletions plugins/modbus/modbus_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ static int driver_stop(neu_plugin_t *plugin);
static int driver_config(neu_plugin_t *plugin, const char *config);
static int driver_request(neu_plugin_t *plugin, neu_reqresp_head_t *head,
void *data);
static int driver_connect(neu_plugin_t *plugin);

static int driver_validate_tag(neu_plugin_t *plugin, neu_datatag_t *tag);
static int driver_group_timer(neu_plugin_t *plugin, neu_plugin_group_t *group);
Expand All @@ -46,14 +47,15 @@ static int driver_test_read_tag(neu_plugin_t *plugin, void *req,
neu_datatag_t tag);

static const neu_plugin_intf_funs_t plugin_intf_funs = {
.open = driver_open,
.close = driver_close,
.init = driver_init,
.uninit = driver_uninit,
.start = driver_start,
.stop = driver_stop,
.setting = driver_config,
.request = driver_request,
.open = driver_open,
.close = driver_close,
.init = driver_init,
.uninit = driver_uninit,
.start = driver_start,
.stop = driver_stop,
.setting = driver_config,
.request = driver_request,
.try_connect = driver_connect,

.driver.validate_tag = driver_validate_tag,
.driver.group_timer = driver_group_timer,
Expand Down Expand Up @@ -388,6 +390,21 @@ static int driver_config(neu_plugin_t *plugin, const char *config)
return 0;
}

static int driver_connect(neu_plugin_t *plugin)
{
if (plugin->conn != NULL) {
if (neu_conn_is_connected(plugin->conn)) {
plugin->common.link_state = NEU_NODE_LINK_STATE_CONNECTED;
} else {
neu_conn_connect(plugin->conn);
if (neu_conn_is_connected(plugin->conn)) {
plugin->common.link_state = NEU_NODE_LINK_STATE_CONNECTED;
}
}
}
return 0;
}

static int driver_request(neu_plugin_t *plugin, neu_reqresp_head_t *head,
void *data)
{
Expand Down
17 changes: 9 additions & 8 deletions plugins/monitor/monitor.c
Original file line number Diff line number Diff line change
Expand Up @@ -397,14 +397,15 @@ static int monitor_plugin_stop(neu_plugin_t *plugin)
}

static const neu_plugin_intf_funs_t plugin_intf_funs = {
.open = monitor_plugin_open,
.close = monitor_plugin_close,
.init = monitor_plugin_init,
.uninit = monitor_plugin_uninit,
.start = monitor_plugin_start,
.stop = monitor_plugin_stop,
.setting = monitor_plugin_config,
.request = monitor_plugin_request,
.open = monitor_plugin_open,
.close = monitor_plugin_close,
.init = monitor_plugin_init,
.uninit = monitor_plugin_uninit,
.start = monitor_plugin_start,
.stop = monitor_plugin_stop,
.setting = monitor_plugin_config,
.request = monitor_plugin_request,
.try_connect = NULL,
};

#define DEFAULT_MONITOR_PLUGIN_DESCR \
Expand Down
17 changes: 9 additions & 8 deletions plugins/mqtt/mqtt_plugin_intf.c
Original file line number Diff line number Diff line change
Expand Up @@ -622,12 +622,13 @@ int mqtt_plugin_request(neu_plugin_t *plugin, neu_reqresp_head_t *head,
}

const neu_plugin_intf_funs_t mqtt_plugin_intf_funs = {
.open = mqtt_plugin_open,
.close = mqtt_plugin_close,
.init = mqtt_plugin_init,
.uninit = mqtt_plugin_uninit,
.start = mqtt_plugin_start,
.stop = mqtt_plugin_stop,
.setting = mqtt_plugin_config,
.request = mqtt_plugin_request,
.open = mqtt_plugin_open,
.close = mqtt_plugin_close,
.init = mqtt_plugin_init,
.uninit = mqtt_plugin_uninit,
.start = mqtt_plugin_start,
.stop = mqtt_plugin_stop,
.setting = mqtt_plugin_config,
.request = mqtt_plugin_request,
.try_connect = NULL,
};
22 changes: 22 additions & 0 deletions src/adapter/adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -2097,6 +2097,10 @@ int neu_adapter_uninit(neu_adapter_t *adapter)
neu_event_del_io(adapter->events, adapter->control_io);

if (adapter->module->type == NEU_NA_TYPE_DRIVER) {
if (adapter->timer_connect != NULL) {
neu_event_del_timer(adapter->events, adapter->timer_connect);
adapter->timer_connect = NULL;
}
neu_adapter_driver_destroy((neu_adapter_driver_t *) adapter);
}

Expand Down Expand Up @@ -2133,6 +2137,19 @@ int neu_adapter_start(neu_adapter_t *adapter)
neu_adapter_driver_start_group_timer(
(neu_adapter_driver_t *) adapter);
}

if (adapter->module->type == NEU_NA_TYPE_DRIVER) {
neu_event_timer_param_t param = {
.second = 5,
.millisecond = 0,
.usr_data = (void *) adapter,
.type = NEU_EVENT_TIMER_BLOCK,
};

param.cb = neu_adapter_driver_try_connect;
adapter->timer_connect =
neu_event_add_timer(adapter->events, param);
}
}

return error;
Expand Down Expand Up @@ -2176,6 +2193,11 @@ int neu_adapter_stop(neu_adapter_t *adapter)
(neu_adapter_driver_t *) adapter);
}
neu_adapter_reset_metrics(adapter);

if (adapter->timer_connect != NULL) {
neu_event_del_timer(adapter->events, adapter->timer_connect);
adapter->timer_connect = NULL;
}
}

return error;
Expand Down
1 change: 1 addition & 0 deletions src/adapter/adapter_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ struct neu_adapter {

neu_event_timer_t *timer_lev;
int64_t timestamp_lev;
neu_event_timer_t *timer_connect;

// metrics
neu_node_metrics_t *metrics;
Expand Down
25 changes: 25 additions & 0 deletions src/adapter/driver/driver.c
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,31 @@ int neu_adapter_driver_uninit(neu_adapter_driver_t *driver)
return 0;
}

int neu_adapter_driver_try_connect(void *param)
{
neu_adapter_driver_t *driver = (neu_adapter_driver_t *) param;
if (driver->tag_cnt > 0) {
return 0;
}

if (driver->adapter.state != NEU_NODE_RUNNING_STATE_RUNNING) {
return 0;
}

neu_plugin_common_t *common =
neu_plugin_to_plugin_common(driver->adapter.plugin);
if (common->link_state == NEU_NODE_LINK_STATE_CONNECTED) {
return 0;
}

if (driver->adapter.module->intf_funs->try_connect == NULL) {
return 0;
}

return driver->adapter.module->intf_funs->try_connect(
driver->adapter.plugin);
}

static inline void start_group_timer(neu_adapter_driver_t *driver, group_t *grp)
{
uint32_t interval = neu_group_get_interval(grp->group);
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/driver/driver_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ void neu_adapter_driver_destroy(neu_adapter_driver_t *driver);
int neu_adapter_driver_init(neu_adapter_driver_t *driver);
int neu_adapter_driver_uninit(neu_adapter_driver_t *driver);

int neu_adapter_driver_try_connect(void *param);

void neu_adapter_driver_start_group_timer(neu_adapter_driver_t *driver);
void neu_adapter_driver_stop_group_timer(neu_adapter_driver_t *driver);

Expand Down