Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions core/config/commander_threaded.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Commander:
ExecutionNode:
-
threads: 2
-
threads: 2
-
threads: 2
-
threads: 2
-
threads: 2

4 changes: 2 additions & 2 deletions core/core/cognitive_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def __init__(self, name, class_name, **params):
)

#Periodic publishing of activation
self.activation_publish_timer=self.create_timer(0.01, self.publish_activation_callback, callback_group=self.cbgroup_activation)
self.activation_publish_timer=self.create_timer(0.01, self.publish_activation_callback, callback_group=self.cbgroup_server)

#Service clients to add or delete nodes from the LTM
service_name_add_LTM = 'ltm_0' + '/add_node' # TODO choose LTM ID
Expand Down Expand Up @@ -387,7 +387,7 @@ def create_activation_input(self, node: dict):
subscriber=self.create_subscription(Activation, 'cognitive_node/' + str(name) + '/activation', self.read_activation_callback, 1, callback_group=self.cbgroup_activation)
data=Activation()
updated=False
new_input=dict(subscriber=subscriber, data=data, updated=updated)
new_input=dict(subscriber=subscriber, node_type=node_type, data=data, updated=updated)
self.activation_inputs[name]=new_input
self.get_logger().debug(f'Created new activation input: {name} of type {node_type}')
else:
Expand Down
36 changes: 23 additions & 13 deletions core/core/commander_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import rclpy
import yaml
import random
import traceback
import multiprocessing as mp

from rclpy.node import Node
Expand Down Expand Up @@ -41,6 +42,7 @@ def __init__(self):
self.cbgroup_server=MutuallyExclusiveCallbackGroup()
self.node_clients={}
self.random_seed = self.declare_parameter('random_seed', value = 0).get_parameter_value().integer_value
self.global_params = {"random_seed": self.random_seed}


# Add Execution Node Service for the Execution Nodes
Expand Down Expand Up @@ -363,7 +365,12 @@ def create_node(self, request, response):
"""
name = str(request.name)
class_name = str(request.class_name)
parameters = str(request.parameters)
yaml_parameters = str(request.parameters)
parameters={}
if yaml_parameters:
parameters = yaml.safe_load(yaml_parameters)
parameters.update(self.global_params)
yaml_parameters = yaml.dump(parameters)

self.get_logger().info(f'Creating new {class_name} {name}...')

Expand All @@ -375,7 +382,7 @@ def create_node(self, request, response):

ex = self.get_lowest_load_executor()

executor_response = self.send_create_request_to_executor(ex, name, class_name, parameters)
executor_response = self.send_create_request_to_executor(ex, name, class_name, yaml_parameters)

self.register_node(ex, name)

Expand Down Expand Up @@ -565,23 +572,27 @@ def load_experiment(self, request, response):
self.get_logger().info(f'Loading file {experiment_file}')

nodes = data['LTM']['Nodes']

self.global_params["globals"] = data['LTM'].get("Globals", {})
if data.get('Control'):
self.global_params['Control'] = data['Control']
if data['LTM'].get('Connectors'):
self.global_params['Connectors'] = data['LTM']['Connectors']
for class_name, node_list in nodes.items():
for node in node_list:
name = node['name']
class_name = node['class_name']
if node.get('parameters'):
parameters = str(node['parameters'])
parameters = str({**node['parameters'], **self.global_params})
else:
parameters = ''
parameters = str(self.global_params)
self.get_logger().info(f"Loading {class_name} {name}...")

if self.node_exists(name):
self.get_logger().info(f"Node {name} already exists.")

else:
new_ex = node.get('new_executor', False)
new_threads = node.get('thread', 1)
new_threads = node.get('threads', 1)
if new_ex:
ex=self.add_execution_node(new_threads)
else:
Expand All @@ -603,17 +614,12 @@ def load_experiment(self, request, response):
params_dict['LTM_id']='ltm_0' #TODO Handle multiple LTMs
if data['LTM'].get('Files'):
params_dict['Files']=data['LTM']['Files']
if data['LTM'].get('Connectors'):
params_dict['Connectors']=data['LTM']['Connectors']
if data.get('Control'):
params_dict['Control']=data['Control']
params_dict['random_seed']=self.random_seed
parameters=str(params_dict)
parameters=str({**params_dict, **self.global_params})

self.get_logger().info(f"Loading {class_name} {name}...")

new_ex = experiment_data.get('new_executor', False)
new_threads = experiment_data.get('thread', 1)
new_threads = experiment_data.get('threads', 1)
if new_ex:
ex=self.add_execution_node(new_threads)
else:
Expand Down Expand Up @@ -945,6 +951,10 @@ def main(args=None):
except KeyboardInterrupt:
print('Keyboard Interrupt Detected: Shutting down execution nodes...')
commander.process_shutdown()
except Exception as e:
rclpy.logging.get_logger(f"execution_node_{id}").error(
f"Unhandled exception: {e}\n{traceback.format_exc()}"
)
finally:
commander.destroy_node()

Expand Down
18 changes: 17 additions & 1 deletion core/core/execution_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import yaml
import importlib
import asyncio
import traceback

import rclpy.logging
from rclpy.node import Node
Expand Down Expand Up @@ -147,7 +148,12 @@ async def create_node(self, request, response):

self.get_logger().info(f'Creating new {class_name} {name}...')

new_node = class_from_classname(class_name)(name, **parameters)
try:
new_node = class_from_classname(class_name)(name, **parameters)
except Exception as e:
self.get_logger().error(f"Unhandled exception: {e}\n{traceback.format_exc()}")
response.created = False
return response

self.nodes[name] = new_node

Expand Down Expand Up @@ -396,15 +402,25 @@ def create_execution_node(id: int, threads: int, args=None):
"""
rclpy.init(args=args)
if threads>1:
rclpy.logging.get_logger(f"execution_node_{id}").info(
f"Creating multi-threaded executor with {threads} threads."
)
executor=MultiThreadedExecutor(num_threads=threads)
else:
rclpy.logging.get_logger(f"execution_node_{id}").info(
f"Creating single-threaded executor."
)
executor = SingleThreadedExecutor() # TODO: TBD if it is single or multi threaded executor.
execution_node = ExecutionNode(executor, id)
executor.add_node(execution_node)
try:
executor.spin()
except KeyboardInterrupt:
pass
except Exception as e:
rclpy.logging.get_logger(f"execution_node_{id}").error(
f"Unhandled exception: {e}\n{traceback.format_exc()}"
)
finally:
pass
for node in execution_node.nodes.values():
Expand Down
Loading