SunTransit continuously fetches live vehicle positions from multiple transit agencies, calculates delays for each stop, and stores this data for analysis. The system powers a dashboard that lets users track real-time vehicle locations and evaluate agency performance, similar to how FlightRadar24 visualizes flights. Designed for fault tolerance, scalability, and cloud agnosticism, SunTransit can be deployed seamlessly across any cloud platform.
Due to cost constraints, SunTransit is currently online only for fetching data from two transit agencies: Valley Metro and Massachusetts Bay Transportation Authority, as it is deployed locally to save on cloud costs.
- Kafka: Kafka is being used as a message broker; there are n topics for n transit agencies. Each topic is dedicated to that agency's vehicle positions.
- Spark: Spark handles all major data processing tasks due to its scalability, distributed computing capabilities, and support for large-scale batch analytics. It processes GTFS real-time feeds from Kafka Topics and schedules to compute delays efficiently. Primarily, it is used for three jobs (1 streaming, and 2 batch), discussed later.
- Amazon S3: This is being used to store the delay of each vehicle for the stop it stopped at. Data is saved in parquet format partitioned by agency, and date.
- Redis: This is used just to store the live position for each vehicle of a transit agency. The positions have a TTL of 2 minutes. This is only used by the dashboard to visualize all the vehicles on the map.
- PostgreSQL: This database is used to store the average delay for all the stops for each route, an agency. It has 3 tables: stops_mean_delay, route_mean_delay, agency_mean_delay.
- MongoDB: This is just used to store the offset for batch jobs so that the next job starts from the next available message in Kafka topic
producer/producer.py
This simply fetches the GTFS RT Feeds from a transit agency and pushes it to the respective Kafka topic.spark-jobs/push_redis.py(Job 1) This is a Spark streaming job which takes vehicle positions from the Kafka topic, discards repeated messages, and saves the latest vehicle position in Redis time series with a retention of 2 minutes.spark-jobs/batch/delay_calculator.py(Job 2) This is a Spark batch job that is configured to run every hour to process the last hour's messages from the Kafka topic. It calculates the delay of a vehicle to reach the scheduled stop, and saves the data to S3 in Parquet format.spark-jobs/analyze_daily_records.py(Job 3) This is also a batch job that is configured to run in the morning at 2AM, to process the previous day's data (T-1). It reads the last day's data from S3, and finds the mean delay across the trips for each stop and route, then for the whole agency.
Before running SunTransit, provision the following cloud resources (you can use your preferred providers; examples are given below):
-
S3 Bucket
Used for storing Parquet files with delay data.
Example: Amazon S3 -
MongoDB Database
Used for storing batch job offsets.
Example: MongoDB Atlas -
PostgreSQL Database
Used for storing aggregated delay metrics.
Example: Neon.tech
Make sure to note the connection details for each resource, as you'll need them when creating your credentials.env file.
- After provisioning your cloud resources, create a
credentials.envfile with the following values:
AWS_ACCESS_KEY_ID=<your-aws-access-key-id>
AWS_SECRET_ACCESS_KEY=<your-aws-secret-access-key>
REDIS_HOST=redis
REDIS_PASSWORD=myPassword123
MONGODB_URL="mongodb+srv://<username>:<password>@<cluster-url>/<dbname>"
POSTGRESQL_URL="jdbc:postgresql://<host>:<port>/<dbname>"- Place this
credentials.envfile in both of these locations:
spark-jobs/env/credentials.envflask_app/credentials.env
- Update
S3_BUCKETinspark-jobs/env/.envwith your bucket name.
- Start all services with Docker Compose:
docker compose up -d --scale spark-worker=3- Initialize Kafka topics and start the producer (Job 1):
bash project-init.sh- Schedule batch and analysis jobs:
- Set up a cron job to run
batch_job.shevery hour. - Set up a cron job to run
analysis_job.shdaily at 2 AM.
Example cron entries:
0 * * * * /path/to/batch_job.sh
0 2 * * * /path/to/analysis_job.shCurrently working on


