-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest.py
More file actions
47 lines (39 loc) · 1.74 KB
/
test.py
File metadata and controls
47 lines (39 loc) · 1.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# #define our topics/queues
# INPUT_TOPIC = "raw_input"
# PROCESS_A_OUTPUT_TOPIC = "process_A _Results"
# #state store for pending joins
# pending_joins ={}
# #process A function
# async def process_a(message):
# correlation_id = message.get("id")
# result = await some_processing_logic(message)
# return {
# "correlation_id":correlation_id,
# "result":result
# }
# #process B with temporal join
# async def process_b():
# # process messages when both parts are available
# async def handle_complete_join(input_data,output_data):
# result = combine_data(input_data,output_Data)
# return result
# #input consumer
# async def on_input_message(message):
# correlation_id = message.get("id")
# if correlation_id not in pending_joins :
# pending_joins[correlation_id] = {"input": None, "output": None}
# pending_joins[correlation_id]["input"] = message
# await try_process(correlation_id)
# #output consumer
# async def on_output_message(message):
# correlation_id = message.get("correlation_id")
# if correlation_id not in pending_joins :
# pending_joins[correlation_id] = {"input": None, "output": None}
# pending_joins[correlation_id]["output"] = message
# await try_process(correlation_id)
# #check if we can process the complete join
# async def try_process(correlation_id):
# join_data = pending_joins.get(correlation_id)
# if join_data and join_data["input"] and join_data["output"]:
# await handle_complete_join(join_data["input"],join_data["output"])
# del pending_joins[correlation_id]# cleanup after processing