|
| 1 | +# r: compas_xr>=2.0.0 |
| 2 | +""" |
| 3 | +Component to stream data from Realtime Database. |
| 4 | +
|
| 5 | +Streams updates from Firebase Realtime Database using a background worker. |
| 6 | +
|
| 7 | +COMPAS XR v1.0.0 |
| 8 | +""" |
| 9 | + |
| 10 | +import time |
| 11 | + |
| 12 | +import Grasshopper |
| 13 | +from compas_eve.ghpython import BackgroundWorker |
| 14 | + |
| 15 | +from compas_xr.realtime_database import RealtimeDatabase |
| 16 | + |
| 17 | + |
| 18 | +def start_rtdb_stream(worker, config_filepath, rtdb_path, return_full_data): |
| 19 | + worker.config_filepath = config_filepath |
| 20 | + worker.rtdb_path = rtdb_path |
| 21 | + worker.return_full_data = bool(return_full_data) |
| 22 | + worker.db = RealtimeDatabase(config_filepath) |
| 23 | + worker.stream_id = None |
| 24 | + worker.update_count = 0 |
| 25 | + worker.display_message("Connecting...") |
| 26 | + |
| 27 | + def on_message(message): |
| 28 | + evt = message.get("event") |
| 29 | + pth = message.get("path") |
| 30 | + raw_dat = message.get("data") |
| 31 | + |
| 32 | + if worker.return_full_data: |
| 33 | + # Pull the full current subtree from the subscribed path. |
| 34 | + try: |
| 35 | + dat = worker.db.get_data(worker.rtdb_path) |
| 36 | + except Exception: |
| 37 | + dat = raw_dat |
| 38 | + else: |
| 39 | + # Use raw event delta payload. |
| 40 | + dat = raw_dat |
| 41 | + |
| 42 | + worker.update_count += 1 |
| 43 | + worker.update_result((evt, pth, dat), delay=1) |
| 44 | + |
| 45 | + mode = "full" if worker.return_full_data else "delta" |
| 46 | + worker.display_message("Received Update #{} ({})".format(worker.update_count, mode)) |
| 47 | + |
| 48 | + stream_obj = worker.db.stream_data(rtdb_path, on_message) |
| 49 | + worker.stream_id = getattr(stream_obj, "_stream_id", None) |
| 50 | + |
| 51 | + mode = "full" if worker.return_full_data else "delta" |
| 52 | + worker.display_message("Streaming... waiting for updates ({})".format(mode)) |
| 53 | + |
| 54 | + while not worker.has_requested_cancellation(): |
| 55 | + time.sleep(0.1) |
| 56 | + |
| 57 | + return None |
| 58 | + |
| 59 | + |
| 60 | +def stop_rtdb_stream(worker): |
| 61 | + # Called by worker.dispose(). |
| 62 | + if hasattr(worker, "db") and worker.db and hasattr(worker, "stream_id") and worker.stream_id: |
| 63 | + try: |
| 64 | + worker.db.close_stream(worker.stream_id) |
| 65 | + except Exception as e: |
| 66 | + worker.display_message("Stop error: {}".format(e)) |
| 67 | + worker.display_message("Stopped") |
| 68 | + |
| 69 | + |
| 70 | +class StreamRealtimeDatabaseComponent(Grasshopper.Kernel.GH_ScriptInstance): |
| 71 | + def RunScript(self, config_filepath, path, stream, return_full_data): |
| 72 | + event = None |
| 73 | + event_path = None |
| 74 | + data = None |
| 75 | + status = "stopped" |
| 76 | + |
| 77 | + if stream is None: |
| 78 | + stream = False |
| 79 | + |
| 80 | + if return_full_data is None: |
| 81 | + return_full_data = True |
| 82 | + |
| 83 | + if not stream: |
| 84 | + BackgroundWorker.stop_instance_by_component(ghenv) # noqa: F821 |
| 85 | + return event, event_path, data, status |
| 86 | + |
| 87 | + if not config_filepath or not path: |
| 88 | + status = "error: provide config_filepath and path" |
| 89 | + return event, event_path, data, status |
| 90 | + |
| 91 | + worker = BackgroundWorker.instance_by_component( |
| 92 | + ghenv, # noqa: F821 |
| 93 | + start_rtdb_stream, |
| 94 | + dispose_function=stop_rtdb_stream, |
| 95 | + auto_set_done=False, |
| 96 | + force_new=False, |
| 97 | + args=(config_filepath, path, return_full_data), |
| 98 | + ) |
| 99 | + |
| 100 | + must_restart = False |
| 101 | + if hasattr(worker, "config_filepath") and hasattr(worker, "rtdb_path"): |
| 102 | + if worker.config_filepath != config_filepath or worker.rtdb_path != path: |
| 103 | + must_restart = True |
| 104 | + elif hasattr(worker, "return_full_data") and worker.return_full_data != bool(return_full_data): |
| 105 | + must_restart = True |
| 106 | + |
| 107 | + if must_restart: |
| 108 | + worker = BackgroundWorker.instance_by_component( |
| 109 | + ghenv, # noqa: F821 |
| 110 | + start_rtdb_stream, |
| 111 | + dispose_function=stop_rtdb_stream, |
| 112 | + auto_set_done=False, |
| 113 | + force_new=True, |
| 114 | + args=(config_filepath, path, return_full_data), |
| 115 | + ) |
| 116 | + |
| 117 | + if not worker.is_working(): |
| 118 | + worker.start_work() |
| 119 | + |
| 120 | + if hasattr(worker, "result") and worker.result: |
| 121 | + event, event_path, data = worker.result |
| 122 | + |
| 123 | + if worker.is_working(): |
| 124 | + mode = "full" if bool(return_full_data) else "delta" |
| 125 | + status = "streaming ({}, {})".format(getattr(worker, "stream_id", None), mode) |
| 126 | + elif worker.is_done(): |
| 127 | + status = "done" |
| 128 | + else: |
| 129 | + status = "idle" |
| 130 | + |
| 131 | + return event, event_path, data, status |
0 commit comments