Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ Suggests:
rmarkdown,
GTFSwizard,
mapview,
xml2
xml2,
RProtoBuf
Remotes:
gmatosferreira/GTFSwizard
VignetteBuilder: knitr
Expand Down
4 changes: 4 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ export(osm_shapes_to_routes)
export(osm_trips_to_routes)
export(prioritize_lanes)
export(query_mobilitydatabase)
export(rt_collect)
export(rt_collect_protobuf)
export(rt_extend_prioritization)
export(unify)
import(RProtoBuf)
import(callr)
import(dplyr)
import(httr)
Expand Down
114 changes: 114 additions & 0 deletions R/rt_collect.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#' Collect GTFS-RT data
#'
#'
#' @param gtfs_rt_url String. URL of the GTFS-RT feed in JSON format.
#' @param destination_file String. File to save the downloaded GTFS-RT data. Content is appended in each iteration.
#' @param header_key String (Default "header"). Key in the JSON corresponding to the feed header. Set to NA if not present.
#' @param entity_key String (Default "entity"). Key in the JSON corresponding to the feed entities. Set to NA if response is a flat list.
#' @param fields_collect Character vector. Fields to extract from each entity in the feed.
#' @param scrape_interval Integer (Default 60). Interval in seconds between each download. Negative to run only once.
#' @param log_file String (Optional). Path to a log file to save download logs.
#'
#' @details
#' Downloads GTFS-RT data from the specified URL at regular intervals and saves them to the destination file.
#'
#' This function will run indefinitely until manually stopped.
#'
#'
#' @examples
#' \dontrun{
#' GTFShift::rt_collect("https://api.example.com/gtfs-rt", "gtfs_rt_data.csv")
#' }
#'
#' @import jsonlite
#' @import progress
#'
#' @export
rt_collect <- function(
gtfs_rt_url, destination_file,
header_key="header", # Optional
entity_key="entity",
fields_collect = c("id", "vehicle.trip.trip_id", "vehicle.position.latitude", "vehicle.position.longitude", "vehicle.position.speed", "vehicle.timestamp", "vehicle.current_status", "vehicle.current_stop_sequence", "vehicle.stop_id"),
scrape_interval = 60, log_file = NA
) {
# Log script start
m = sprintf("[%s] Starting GTFS-RT data collection from %s", format(Sys.time(), "%Y-%m-%d %H:%M:%S"), gtfs_rt_url)
message(m)
if (!is.na(log_file)) cat(paste(m, "\n"), file = log_file, append = TRUE)

# Each scrape_interval seconds, download the GTFS-RT feed and save it to the destination folder
count = 0
repeat {
count = count + 1
timestamp <- format(Sys.time(), "%Y%m%d_%H%M%S")
feed <- jsonlite::fromJSON(gtfs_rt_url)

if (!is.na(entity_key)) {
entities <- as.data.frame(feed[[entity_key]])
} else {
entities <- feed
}

# For each field in fields_collect, extract the data and add it to the data frame
feed_df <- data.frame()
for (field in fields_collect) {
field_parts <- unlist(strsplit(field, "\\."))
field_data <- entities
for (part in field_parts) {
if (part %in% names(field_data)) {
field_data <- field_data[[part]]
} else {
field_data <- NA
break
}
}
if (nrow(feed_df) == 0) {
feed_df <- data.frame(field_data)
names(feed_df) <- field
} else {
feed_df[[field]] <- field_data
}
}

if (!is.na(header_key)) {
header = feed[[header_key]]
if ("timestamp" %in% names(header)) {
feed_df$feed_timestamp <- header$timestamp
}
if ("incrementality" %in% names(header)) {
feed_df$feed_incrementality <- header$incrementality
}
}

write.table(
feed_df,
file = destination_file,
sep = ",",
row.names = FALSE,
col.names = !file.exists(destination_file), # only write header if file is new
append = TRUE
)

m = sprintf("[%s] Iteration %d completed", timestamp, count)
message(m)
if (!is.na(log_file)) cat(paste(m, "\n"), file = log_file, append = TRUE)

# Wait for scrape_interval seconds before the next download
if (scrape_interval<0) {
break
}
interval_start <- Sys.time()
pb <- progress::progress_bar$new( # Track progress
format = "Sleeping [:bar] :percent :spin elapsed=:elapsed",
clear = FALSE, show_after=0
)
pb$update(0)
repeat {
elapsed_time <- as.numeric(difftime(Sys.time(), interval_start, units="secs"))
if (elapsed_time >= scrape_interval) break;
pb$update( min(elapsed_time / scrape_interval, 1) );
Sys.sleep(0.1);
}
pb$update(1)
}
}
112 changes: 112 additions & 0 deletions R/rt_collect_protobuf.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#' Collect GTFS-RT data (with Protocol Buffers support)
#'
#'
#' @param gtfs_rt_url String. URL of the Protocol Buffers GTFS-RT feed.
#' @param destination_file String. File to save the downloaded GTFS-RT data. Content is appended in each iteration.
#' @param fields_collect Character vector. Fields to extract from each entity in the feed.
#' @param scrape_interval Integer (Default 60). Interval in seconds between each download. Negative to run only once.
#' @param log_file String (Optional). Path to a log file to save download logs.
#'
#' @details
#' Downloads GTFS-RT data from the specified URL at regular intervals and saves them to the destination file.
#'
#' This function will run indefinitely until manually stopped. Each downloaded file is named with a timestamp to ensure uniqueness.
#'
#'
#' @examples
#' \dontrun{
#' GTFShift::rt_collect_protobuf("https://api.example.com/gtfs-rt-protobuf", "gtfs_rt_data.csv")
#' }
#'
#' @import RProtoBuf
#' @import jsonlite
Comment thread
gmatosferreira marked this conversation as resolved.
#' @import progress
#'
#' @export
rt_collect_protobuf <- function(
gtfs_rt_url, destination_file,
fields_collect = c("id", "vehicle.trip.trip_id", "vehicle.position.latitude", "vehicle.position.longitude", "vehicle.position.speed", "vehicle.timestamp", "vehicle.current_status", "vehicle.current_stop_sequence", "vehicle.stop_id"),
scrape_interval = 60, log_file = NA
) {
# Log script start
m = sprintf("[%s] Starting GTFS-RT data collection from %s", format(Sys.time(), "%Y-%m-%d %H:%M:%S"), gtfs_rt_url)
message(m)
if (!is.na(log_file)) cat(paste(m, "\n"), file = log_file, append = TRUE)

# Each scrape_interval seconds, download the GTFS-RT feed and save it to the destination folder
count = 0
repeat {
count = count + 1
timestamp <- format(Sys.time(), "%Y%m%d_%H%M%S")

# Load protobuf
RProtoBuf::readProtoFiles((system.file("extdata", "gtfs-realtime.proto", package = "GTFShift")))
f <- file(gtfs_rt_url, "rb")
feed <- RProtoBuf::read(`transit_realtime.FeedMessage`, f)
close(f)

# Convert to R list
fields <- names(feed)

protobuf_to_list <- function(msg) {
if (!inherits(msg, "Message")) return(msg)

# get all fields
fields <- names(msg)

lapply(fields, function(f) {
value <- msg[[f]]

# recursively convert nested Message objects
if (inherits(value, "Message")) {
protobuf_to_list(value)
} else if (is.list(value)) {
lapply(value, protobuf_to_list)
} else {
value
}
}) |> setNames(fields)
}

feed_list <- protobuf_to_list(feed)
temp_json = tempfile(fileext = ".json")
write_json(
feed_list,
temp_json,
pretty = TRUE,
auto_unbox = TRUE
)
Comment on lines +73 to +78
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function calls write_json() without the package namespace. Since jsonlite is imported with @import jsonlite, this should work, but for clarity and to match R best practices, it would be better to use jsonlite::write_json() or ensure the import is correct.

Copilot uses AI. Check for mistakes.

suppressMessages({
rt_collect(
gtfs_rt_url = temp_json,
destination_file = destination_file,
fields_collect = fields_collect,
scrape_interval = -1,
log_file = NA
)
})

m = sprintf("[%s] Iteration %d completed", timestamp, count)
message(m)
if (!is.na(log_file)) cat(paste(m, "\n"), file = log_file, append = TRUE)

# Wait for scrape_interval seconds before the next download
if (scrape_interval<0) {
break
}
interval_start <- Sys.time()
pb <- progress::progress_bar$new( # Track progress
format = "Sleeping [:bar] :percent :spin elapsed=:elapsed",
clear = FALSE, show_after=0
)
pb$update(0)
repeat {
elapsed_time <- as.numeric(difftime(Sys.time(), interval_start, units="secs"))
if (elapsed_time >= scrape_interval) break;
pb$update( min(elapsed_time / scrape_interval, 1) );
Sys.sleep(0.1);
}
pb$update(1)
}
}
138 changes: 138 additions & 0 deletions R/rt_extend_prioritization.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#' Extend prioritization with GTFS-RT metrics
#'
#' This function extends lane segment indicators for prioritization with metrics produced with GTFS-RT data.
#'
#' @param lane_prioritization sf data.frame. Result of \code{GTFShift::prioritize_lanes()}
#' @param rt_collection sf data.frame. GTFS-RT data collection. Must include \code{speed} column.
#' @param lane_buffer numeric (Default 15). Buffer distance (in meters) to create around lane segments to capture nearby GTFS-RT points.
#'
#' @details
#' Extends the \code{lane_prioritization} data with speed metrics calculated from the GTFS-RT data points that fall within a buffer around each lane segment.
#'
#' Refer to \code{GTFShift::rt_collect()} for details on GTFS-RT data collection.
#'
#'
#' @returns The \code{lane_prioritization} \code{sf} \code{data.frame}, extended with the following columns:
#' \itemize{
#' \item \code{speed_avg}, the average speed of the vehicles on the way.
#' \item \code{speed_median}, the median speed of the vehicles on the way.
#' \item \code{speed_p25}, the 25th percentile speed of the vehicles on the way.
#' \item \code{speed_p75}, the 75th percentile speed of the vehicles on the way.
#' \item \code{speed_count}, the number of speed observations on the way.
#' }
#'
#' @examples
#' \dontrun{
#' rt_collect_file <- "gtfs_rt_data.csv"
#' GTFShift::rt_collect("https://api.example.com/gtfs-rt", rt_collect_file)
#' lane_prioritization <- GTFShift::prioritize_lanes(gtfs, osm_query)
#'
#' rt_collection <- read.csv(rt_collect_file) |> sf::st_as_sf(coords = c("longitude", "latitude"), crs = 4326)
#' lane_prioritization_extended <- GTFShift::rt_extend_prioritization(
#' lane_prioritization = lane_prioritization,
#' rt_collection = rt_collection
#' )
#' }
#'
#' @import progress
#' @import dplyr
Comment thread
gmatosferreira marked this conversation as resolved.
#' @import callr
#'
#' @export
rt_extend_prioritization <- function(
lane_prioritization,
rt_collection,
lane_buffer = 15 # in meters
) {
# 1. Validate inputs
required_cols = c("way_osm_id", "geometry")
missing_cols = setdiff(required_cols, colnames(lane_prioritization))
if (length(missing_cols) > 0) {
stop(paste("lane_prioritization is missing required columns:", paste(missing_cols, collapse = ", ")))
}
rt_attr_speed = "speed"
required_rt_cols = c(rt_attr_speed)
missing_rt_cols = setdiff(required_rt_cols, colnames(rt_collection))
if (length(missing_rt_cols) > 0) {
stop(paste("rt_collection is missing required columns:", paste(missing_rt_cols, collapse = ", ")))
}

# Display feedback
pb <- progress::progress_bar$new( # Track progress
format = "Extending prioritization with GTFS-RT metrics [:bar] :percent :spin elapsed=:elapsed",
clear = FALSE, show_after=0
)
pb$update(0)

# 2. Get only updates IN_TRANSIT
if ("current_status" %in% colnames(rt_collection)) {
rt_collection <- rt_collection %>%
dplyr::filter(current_status == "IN_TRANSIT_TO")
}
pb$update(0.166)

# 3. Get unique lane segments (to optimize spatial join)
job <- callr::r_bg(function(lane_prioritization) { # update spinner while blocking method call
library(sf)
return(lane_prioritization |>
dplyr::distinct(way_osm_id, .keep_all = TRUE) |>
dplyr::select(way_osm_id))
}, args=list(lane_prioritization))
while (job$is_alive()) { pb$tick(0); Sys.sleep(0.1) }
lanes_unique <- job$get_result()
pb$update(0.333)

# 4. Create buffers in lane segments to overlap with updates
job <- callr::r_bg(function(lanes_unique, lane_buffer) { # update spinner while blocking method call
return(sf::st_buffer(
sf::st_transform(lanes_unique, crs=3857),
dist=lane_buffer
) |> sf::st_transform(crs=sf::st_crs(lanes_unique)))
}, args=list(lanes_unique, lane_buffer))
while (job$is_alive()) { pb$tick(0); Sys.sleep(0.1) }
lane_buffers <- job$get_result()
pb$update(0.5)

# 4. Spatial join between lane buffers and rt_collection points
Comment thread
gmatosferreira marked this conversation as resolved.
job <- callr::r_bg(function(rt_collection, lane_buffers) { # update spinner while blocking method call
return(sf::st_join(
rt_collection,
lane_buffers |> dplyr::select(way_osm_id),
left = FALSE,
join = sf::st_within
) |> sf::st_drop_geometry())
}, args=list(rt_collection, lane_buffers))
while (job$is_alive()) { pb$tick(0); Sys.sleep(0.1) }
overlap <- job$get_result()
pb$update(0.666)

# 5. Aggregate speed metrics by way_osm_id
job <- callr::r_bg(function(overlap, rt_attr_speed) { # update spinner while blocking method call
return(overlap |>
dplyr::group_by(way_osm_id) |>
dplyr::summarise(
speed_avg = mean(.data[[rt_attr_speed]], na.rm = TRUE),
speed_median = stats::median(.data[[rt_attr_speed]], na.rm = TRUE),
speed_p25 = stats::quantile(.data[[rt_attr_speed]], probs = 0.25, na.rm = TRUE),
speed_p75 = stats::quantile(.data[[rt_attr_speed]], probs = 0.75, na.rm = TRUE),
speed_count = dplyr::n()
) |>
dplyr::ungroup())
}, args=list(overlap, rt_attr_speed))
while (job$is_alive()) { pb$tick(0); Sys.sleep(0.1) }
speed_metrics <- job$get_result()
pb$update(0.833)

# 6. Join speed metrics back to lane_prioritization
job <- callr::r_bg(function(lane_prioritization, speed_metrics) { # update spinner while blocking method call
library(sf)
return(lane_prioritization |>
dplyr::left_join(speed_metrics, by = "way_osm_id"))
}, args=list(lane_prioritization, speed_metrics))
while (job$is_alive()) { pb$tick(0); Sys.sleep(0.1) }
lane_prioritization_extended <- job$get_result()
pb$update(1)
pb$terminate()

return(lane_prioritization_extended)
}
Loading