diff --git a/dataflows-cloud/helsinki-transit/README.md b/dataflows-cloud/helsinki-transit/README.md new file mode 100644 index 0000000..b1cb45b --- /dev/null +++ b/dataflows-cloud/helsinki-transit/README.md @@ -0,0 +1,52 @@ +# Connector for Helsinki Transit Dataflow + +Use mqtt-source connector to read the Helkinki Transit live data feed. + +### Prerequisites + +*Checkout the connector configuration file [mqtt-helsinki.yaml](mqtt-helsinki.yaml) to get context on what we are doing. + + +* Load Jolt Smartmodule to your cluster: + + ```bash + fluvio hub smartmodule download infinyon/jaq@0.1.0 + ``` + +## Create topic with retention + +Before starting the connector, create a topic with a specific retention policy. This will ensure that the data is retained for a certain period. + +```bash + +fluvio topic create events --retention-time 4h +fluvio topic create vehicle-position --retention-time 4h +fluvio topic create average-speed --retention-time 4h +``` + +### Start the mqtt connector + +Checkout the connector configuration file [mqtt-helsinki.yaml](mqtt-helsinki.yaml) for context. + +Start the cloud connector: + +```bash +fluvio cloud connector create --config connector/mqtt-helsiniki.yaml +``` + +This connector refreshes the licenses every hour. Use fluvio to see the license numbers downloaded from the server: + +```bash +fluvio consume helsinki +``` + +Use to exit + + +### Clean-up + +Delete connector: + +```bash +fluvio cloud connector delete helsinki-mqtt +``` diff --git a/dataflows-cloud/helsinki-transit/connector/Makefile b/dataflows-cloud/helsinki-transit/connector/Makefile new file mode 100644 index 0000000..e69de29 diff --git a/dataflows-cloud/helsinki-transit/connector/mqtt-helsiniki.yaml b/dataflows-cloud/helsinki-transit/connector/mqtt-helsiniki.yaml new file mode 100644 index 0000000..f8dbc2a --- /dev/null +++ b/dataflows-cloud/helsinki-transit/connector/mqtt-helsiniki.yaml @@ -0,0 +1,24 @@ +apiVersion: 0.1.0 +meta: + version: 0.2.9 + name: helsinki-mqtt + type: mqtt-source + topic: helsinki +mqtt: + url: "mqtt://mqtt.hsl.fi" + topic: "/hfp/v2/journey/ongoing/vp/bus/#" + payload_output_type: json + + +transforms: + - uses: infinyon/jaq@0.1.0 + with: + filter: | + { + vehicle: .payload.VP.veh, + tst: .payload.VP.tst, + speed: .payload.VP.spd, + lat: .payload.VP.lat, + long: .payload.VP.long, + route: .payload.VP.route + } \ No newline at end of file diff --git a/dataflows-cloud/helsinki-transit/dataflow.yaml b/dataflows-cloud/helsinki-transit/dataflow.yaml new file mode 100644 index 0000000..131af14 --- /dev/null +++ b/dataflows-cloud/helsinki-transit/dataflow.yaml @@ -0,0 +1,197 @@ +apiVersion: 0.5.0 +meta: + name: helsinki-stats + version: 0.1.0 + namespace: my-org + +config: + converter: json + consumer: + default_starting_offset: + value: 0 + position: End + +types: + event: + type: object + properties: + vehicle: + type: i32 + speed: + type: f64 + optional: true + lat: + type: f64 + optional: true + long: + type: f64 + optional: true + route: + type: string + tst: + type: string + + vehicle-position: + type: object + properties: + vehicle: + type: i32 + speed: + type: f64 + lat: + type: f64 + long: + type: f64 + route: + type: string + ts: + type: i64 + + average-speed: + type: object + properties: + vehicle: + type: i32 + route: + type: string + speed: + type: f64 + + average-speed-list: + type: list + items: + type: average-speed + + +topics: + events: + name: helsinki + schema: + value: + type: event + + vehicle-position: + schema: + value: + type: vehicle-position + + average-speed: + schema: + value: + type: average-speed-list + +services: + clean-events: + sources: + - type: topic + id: events + + transforms: + - operator: filter + run: | + fn remove_incomplete_events(event: Event) -> Result { + let allow = event.lat.is_some() && event.long.is_some() && event.speed.is_some(); + Ok(allow) + } + + - operator: map + dependencies: + - name: chrono + version: "0.4.38" + run: | + fn clean_events(event: Event) -> Result { + use chrono::naive::NaiveDateTime; + let no_timezone = NaiveDateTime::parse_from_str(&event.tst, "%Y-%m-%dT%H:%M:%S.%fZ")?; + let ts = no_timezone.and_utc().timestamp_millis(); + + let vp = VehiclePosition { + vehicle: event.vehicle, + route: event.route, + speed: event.speed.unwrap_or(0.0), + lat: event.lat.unwrap_or(0.0), + long: event.long.unwrap_or(0.0), + ts: ts, + }; + + Ok(vp) + } + sinks: + - type: topic + id: vehicle-position + + generate-vehicle-stats: + sources: + - type: topic + id: vehicle-position + + states: + vehicle-stat: + type: keyed-state + properties: + key: + type: i32 + value: + type: arrow-row + properties: + route: + type: string + speed: + type: f64 + + window: + tumbling: + duration: 5s + + assign-timestamp: + run: | + fn assign_timestamp(vp: VehiclePosition, _event_time: i64) -> Result { + Ok(vp.ts) + } + + partition: + assign-key: + run: | + fn assign_key(vp: VehiclePosition) -> Result { + Ok(vp.vehicle) + } + + update-state: + run: | + fn compute_average_speed(vp: VehiclePosition) -> Result<()> { + let mut veh = vehicle_stat(); + veh.route = vp.route.clone(); + veh.speed = (veh.speed + vp.speed) / 2.0f64; + veh.update()?; + println!("update speed for vehicle {}", vp.vehicle); + Ok(()) + } + + flush: + run: | + fn collect_vehicle_stats() -> Result { + let vs = sql("select * from vehicle_stat")?; + let vehicle_col = vs.key()?; + let speed_col = vs.col("speed")?; + let route_col = vs.col("route")?; + + let mut avg_speed = vec![]; + + let rows = vs.rows()?; + while rows.next() { + let vehicle = rows.str(&vehicle_col)?; + let route = rows.str(&route_col)?; + let speed = rows.f64(&speed_col)?; + avg_speed.push(AverageSpeed { + vehicle: vehicle.parse()?, + route, + speed, + } + ); + } + + Ok(avg_speed) + } + + sinks: + - type: topic + id: average-speed