Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions dataflows-cloud/helsinki-transit/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Connector for Helsinki Transit Dataflow

Use mqtt-source connector to read the Helkinki Transit live data feed.

### Prerequisites

*Checkout the connector configuration file [mqtt-helsinki.yaml](mqtt-helsinki.yaml) to get context on what we are doing.


* Load Jolt Smartmodule to your cluster:

```bash
fluvio hub smartmodule download infinyon/jaq@0.1.0
```

## Create topic with retention

Before starting the connector, create a topic with a specific retention policy. This will ensure that the data is retained for a certain period.

```bash

fluvio topic create events --retention-time 4h
fluvio topic create vehicle-position --retention-time 4h
fluvio topic create average-speed --retention-time 4h
```

### Start the mqtt connector

Checkout the connector configuration file [mqtt-helsinki.yaml](mqtt-helsinki.yaml) for context.

Start the cloud connector:

```bash
fluvio cloud connector create --config connector/mqtt-helsiniki.yaml
```

This connector refreshes the licenses every hour. Use fluvio to see the license numbers downloaded from the server:

```bash
fluvio consume helsinki
```

Use <Ctrl-C> to exit


### Clean-up

Delete connector:

```bash
fluvio cloud connector delete helsinki-mqtt
```
Empty file.
24 changes: 24 additions & 0 deletions dataflows-cloud/helsinki-transit/connector/mqtt-helsiniki.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
apiVersion: 0.1.0
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: the file name mqtt-helsiniki.yaml should be mqtt-helsinki.yaml

meta:
version: 0.2.9
name: helsinki-mqtt
type: mqtt-source
topic: helsinki
mqtt:
url: "mqtt://mqtt.hsl.fi"
topic: "/hfp/v2/journey/ongoing/vp/bus/#"
payload_output_type: json


Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra space (remove)

transforms:
- uses: infinyon/jaq@0.1.0
with:
filter: |
{
vehicle: .payload.VP.veh,
tst: .payload.VP.tst,
speed: .payload.VP.spd,
lat: .payload.VP.lat,
long: .payload.VP.long,
route: .payload.VP.route
}
197 changes: 197 additions & 0 deletions dataflows-cloud/helsinki-transit/dataflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
apiVersion: 0.5.0
meta:
name: helsinki-stats
version: 0.1.0
namespace: my-org

config:
converter: json
consumer:
default_starting_offset:
value: 0
position: End

types:
event:
type: object
properties:
vehicle:
type: i32
speed:
type: f64
optional: true
lat:
type: f64
optional: true
long:
type: f64
optional: true
route:
type: string
tst:
type: string

vehicle-position:
type: object
properties:
vehicle:
type: i32
speed:
type: f64
lat:
type: f64
long:
type: f64
route:
type: string
ts:
type: i64

average-speed:
type: object
properties:
vehicle:
type: i32
route:
type: string
speed:
type: f64

average-speed-list:
type: list
items:
type: average-speed


topics:
events:
name: helsinki
schema:
value:
type: event

vehicle-position:
schema:
value:
type: vehicle-position

average-speed:
schema:
value:
type: average-speed-list

services:
clean-events:
sources:
- type: topic
id: events

transforms:
- operator: filter
run: |
fn remove_incomplete_events(event: Event) -> Result<bool> {
let allow = event.lat.is_some() && event.long.is_some() && event.speed.is_some();
Ok(allow)
}

- operator: map
dependencies:
- name: chrono
version: "0.4.38"
run: |
fn clean_events(event: Event) -> Result<VehiclePosition> {
use chrono::naive::NaiveDateTime;
let no_timezone = NaiveDateTime::parse_from_str(&event.tst, "%Y-%m-%dT%H:%M:%S.%fZ")?;
let ts = no_timezone.and_utc().timestamp_millis();

let vp = VehiclePosition {
vehicle: event.vehicle,
route: event.route,
speed: event.speed.unwrap_or(0.0),
lat: event.lat.unwrap_or(0.0),
long: event.long.unwrap_or(0.0),
ts: ts,
};

Ok(vp)
}
sinks:
- type: topic
id: vehicle-position

generate-vehicle-stats:
sources:
- type: topic
id: vehicle-position

states:
vehicle-stat:
type: keyed-state
properties:
key:
type: i32
value:
type: arrow-row
properties:
route:
type: string
speed:
type: f64

window:
tumbling:
duration: 5s

assign-timestamp:
run: |
fn assign_timestamp(vp: VehiclePosition, _event_time: i64) -> Result<i64> {
Ok(vp.ts)
}

partition:
assign-key:
run: |
fn assign_key(vp: VehiclePosition) -> Result<i32> {
Ok(vp.vehicle)
}

update-state:
run: |
fn compute_average_speed(vp: VehiclePosition) -> Result<()> {
let mut veh = vehicle_stat();
veh.route = vp.route.clone();
veh.speed = (veh.speed + vp.speed) / 2.0f64;
veh.update()?;
println!("update speed for vehicle {}", vp.vehicle);
Ok(())
}

flush:
run: |
fn collect_vehicle_stats() -> Result<AverageSpeedList> {
let vs = sql("select * from vehicle_stat")?;
let vehicle_col = vs.key()?;
let speed_col = vs.col("speed")?;
let route_col = vs.col("route")?;

let mut avg_speed = vec![];

let rows = vs.rows()?;
while rows.next() {
let vehicle = rows.str(&vehicle_col)?;
let route = rows.str(&route_col)?;
let speed = rows.f64(&speed_col)?;
avg_speed.push(AverageSpeed {
vehicle: vehicle.parse()?,
route,
speed,
}
);
}

Ok(avg_speed)
}

sinks:
- type: topic
id: average-speed