@@ -643,6 +643,8 @@ def _add_node_properties(self, node_name, properties, predecessor_name):
643643 definition .map_out_nodes [predecessor_name ] = {}
644644 definition .map_out_nodes [predecessor_name ][node_name ] = properties
645645
646+ definition .edge_definitions [node_name ] = properties
647+
646648# Pipeline current "stream" and "frame_id" are thread-local variables
647649# Valid for create_stream(), process_frame() and destroy_stream() on main thread
648650# Valid for PipelineElement._create_frames_generator() thread
@@ -724,6 +726,7 @@ def _create_pipeline_graph(self, definition):
724726
725727 definition .map_in_nodes = {}
726728 definition .map_out_nodes = {}
729+ definition .edge_definitions = {}
727730 node_heads , node_successors = Graph .traverse (
728731 definition .graph , self ._add_node_properties )
729732 pipeline_graph = PipelineGraph (node_heads )
@@ -1172,7 +1175,7 @@ def _process_frame_common(self, stream_dict, frame_data_in, new_frame) \
11721175 stream .set_state (self ._process_stream_event (
11731176 element_name , stream_event , frame_data_out ))
11741177 # TODO: Test "stream.state" before continuing
1175- self ._process_map_out (element_name , frame_data_out )
1178+ self ._process_map_out (element , element_name , frame_data_out )
11761179 self ._process_metrics_capture ( # TODO: Move up ?
11771180 metrics , element .name )
11781181 frame .swag .update (frame_data_out )
@@ -1334,34 +1337,51 @@ def _process_metrics_capture(self, metrics, element_name):
13341337 metrics ["pipeline_memory" ] = pipeline_memory # Total so far !
13351338
13361339 def _process_map_in (self , header , element , element_name , swag ):
1337- map_in_names = {}
1338- if element_name in self .definition .map_in_nodes :
1339- map_in_elements = self .definition .map_in_nodes [element_name ]
1340- for in_element , in_map in map_in_elements .items ():
1341- from_name , to_name = next (iter (in_map .items ()))
1342- map_in_names [to_name ] = f"{ element_name } .{ to_name } "
1340+ #map_in_names = {}
1341+ #if element_name in self.definition.map_in_nodes:
1342+ # map_in_elements = self.definition.map_in_nodes[element_name]
1343+ # for in_element, in_map in map_in_elements.items():
1344+ # from_name, to_name = next(iter(in_map.items()))
1345+ # map_in_names[to_name] = f"{element_name}.{to_name}"
1346+ #print(f"map_in_names: {map_in_names}")
1347+
1348+ #inputs = {}
1349+ #input_names = [input["name"] for input in element.definition.input]
1350+
1351+ #for input_name in input_names:
1352+ # try:
1353+ # if input_name in map_in_names:
1354+ # inputs[input_name] = swag[map_in_names[input_name]]
1355+ # else:
1356+ # inputs[input_name] = swag[input_name]
1357+ # except KeyError as key_error:
1358+ # self._error_pipeline(header,
1359+ # f'Function parameter "{input_name}" not found')
13431360
13441361 inputs = {}
1345- input_names = [input ["name" ] for input in element .definition .input ]
1362+ for input in element .definition .input :
1363+ if input ["name" ] in swag :
1364+ inputs [input ["name" ]] = swag [input ["name" ]]
1365+
1366+ edge_definitions = self .definition .edge_definitions .get (element_name , {})
1367+ for predecessor_output , input_name in edge_definitions .items ():
1368+ if predecessor_output in swag :
1369+ inputs [input_name ] = swag [predecessor_output ]
13461370
1347- for input_name in input_names :
1348- try :
1349- if input_name in map_in_names :
1350- inputs [input_name ] = swag [map_in_names [input_name ]]
1351- else :
1352- inputs [input_name ] = swag [input_name ]
1353- except KeyError as key_error :
1354- self ._error_pipeline (header ,
1355- f'Function parameter "{ input_name } " not found' )
13561371 return inputs
13571372
1358- def _process_map_out (self , element_name , frame_data_out ):
1359- if element_name in self .definition .map_out_nodes :
1360- map_out_node = self .definition .map_out_nodes [element_name ]
1361- for out_element , out_map in map_out_node .items ():
1362- from_name , to_name = next (iter (out_map .items ()))
1363- to_name = f"{ out_element } .{ to_name } "
1364- frame_data_out [to_name ] = frame_data_out .pop (from_name )
1373+ def _process_map_out (self , element , element_name , frame_data_out ):
1374+ for output in element .definition .output :
1375+ output_name = output ["name" ]
1376+ qualified_output_name = f"{ element_name } .{ output_name } "
1377+ if output_name in frame_data_out :
1378+ frame_data_out [qualified_output_name ] = frame_data_out [output_name ]
1379+ #if element_name in self.definition.map_out_nodes:
1380+ # map_out_node = self.definition.map_out_nodes[element_name]
1381+ # for out_element, out_map in map_out_node.items():
1382+ # from_name, to_name = next(iter(out_map.items()))
1383+ # to_name = f"{out_element}.{to_name}"
1384+ # frame_data_out[to_name] = frame_data_out.pop(from_name)
13651385
13661386# FIX: _create_frame_generator(): StreamEvent.ERROR -->
13671387# self.destroy_stream(get_stream_id(), graceful=False) # immediately !
0 commit comments