diff --git a/functions-python/feed_sync_dispatcher_transitland/.coveragerc b/functions-python/feed_sync_dispatcher_transitland/.coveragerc deleted file mode 100644 index f1916e53f..000000000 --- a/functions-python/feed_sync_dispatcher_transitland/.coveragerc +++ /dev/null @@ -1,11 +0,0 @@ -[run] -omit = - */test*/* - */database_gen/* - */dataset_service/* - */helpers/* - */shared/* - -[report] -exclude_lines = - if __name__ == .__main__.: \ No newline at end of file diff --git a/functions-python/feed_sync_dispatcher_transitland/.env.rename_me b/functions-python/feed_sync_dispatcher_transitland/.env.rename_me deleted file mode 100644 index 3250ba24d..000000000 --- a/functions-python/feed_sync_dispatcher_transitland/.env.rename_me +++ /dev/null @@ -1,5 +0,0 @@ -# Environment variables for tokens function to run locally. Delete this line after rename the file. -FEEDS_DATABASE_URL=postgresql://postgres:postgres@localhost:5432/MobilityDatabase -PROJECT_ID=my-project-id -PUBSUB_TOPIC_NAME=my-topic -TRANSITLAND_API_KEY=your-api-key diff --git a/functions-python/feed_sync_dispatcher_transitland/README.md b/functions-python/feed_sync_dispatcher_transitland/README.md deleted file mode 100644 index 303f1ef35..000000000 --- a/functions-python/feed_sync_dispatcher_transitland/README.md +++ /dev/null @@ -1,79 +0,0 @@ -# Batch Datasets -This directory contains the GCP serverless function that triggers the sync feeds in transitland. -The function publish one Pub/Sub message per transitland feed to be synced. -```json - { - "message": { - "data": - { - external_id="feeds_onestop_id", - feed_id="feed_id", - execution_id=execution_id, - feed_url="feed_url", - spec="spec", - auth_info_url="auth_info_url", - auth_param_name="auth_param_name", - type="type", - operator_name="operator_name", - country="country", - state_province="state_province", - city_name="city_name", - source="TLD", - payload_type=payload_type - } - } - } -``` - -# Function configuration -The function is configured using the following environment variables: -- `PUBSUB_TOPIC`: The Pub/Sub topic to publish the messages to. -- `PROJECT_ID`: The GCP Project id. -- `TRANSITLAND_API_KEY`: The Transitland API key(secret). - -# Local development -The local development of this function follows the same steps as the other functions. - -Install Google Pub/Sub emulator, please refer to the [README.md](../README.md) file for more information. - -## Python requirements - -- Install the requirements -```bash - pip install -r ./functions-python/feed_sync_dispatcher_transitland/requirements.txt -``` - -## Test locally with Google Cloud Emulators - -- Execute the following commands to start the emulators: -```bash - gcloud beta emulators pubsub start --project=test-project --host-port='localhost:8043' -``` - -- Create a Pub/Sub topic in the emulator: -```bash - curl -X PUT "http://localhost:8043/v1/projects/test-project/topics/feed-sync-transitland" -``` - -- Start function -```bash - export PUBSUB_EMULATOR_HOST=localhost:8043 && ./scripts/function-python-run.sh --function_name feed_sync_dispatcher_transitland -``` - -- [Optional]: Create a local subscription to print published messages: -```bash -./scripts/pubsub_message_print.sh feed-sync-transitland -``` - -- Execute function -```bash - curl http://localhost:8080 -``` - -- To run/debug from your IDE use the file `main_local_debug.py` - -# Test -- Run the tests -```bash - ./scripts/api-tests.sh --folder functions-python/feed_sync_dispatcher_transitland -``` diff --git a/functions-python/feed_sync_dispatcher_transitland/function_config.json b/functions-python/feed_sync_dispatcher_transitland/function_config.json deleted file mode 100644 index 70606e46a..000000000 --- a/functions-python/feed_sync_dispatcher_transitland/function_config.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "name": "feed-sync-dispatcher-transitland", - "description": "Feed Sync Dispatcher for Transitland", - "entry_point": "feed_sync_dispatcher_transitland", - "timeout": 3600, - "trigger_http": true, - "include_folders": ["helpers"], - "include_api_folders": ["database_gen", "database", "common"], - "secret_environment_variables": [ - { - "key": "FEEDS_DATABASE_URL" - } - ], - "ingress_settings": "ALLOW_ALL", - "max_instance_request_concurrency": 1, - "max_instance_count": 1, - "min_instance_count": 0, - "available_cpu": 1, - "available_memory": "1Gi" -} diff --git a/functions-python/feed_sync_dispatcher_transitland/main_local_debug.py b/functions-python/feed_sync_dispatcher_transitland/main_local_debug.py deleted file mode 100644 index 0fc346f9f..000000000 --- a/functions-python/feed_sync_dispatcher_transitland/main_local_debug.py +++ /dev/null @@ -1,26 +0,0 @@ -# Code to be able to debug locally without affecting the runtime cloud function - - -# Requirements: -# - Google Cloud SDK installed -# - Make sure to have the following environment variables set in your .env.local file -# - Local database in running state -# - Follow the instructions in the README.md file -# -# Usage: -# - python feed_sync_dispatcher_transitland/main_local_debug.py - -from main import feed_sync_dispatcher_transitland -from dotenv import load_dotenv - -# Load environment variables from .env.local -load_dotenv(dotenv_path=".env.local_test") - -if __name__ == "__main__": - - class RequestObject: - def __init__(self, headers): - self.headers = headers - - request = RequestObject({"X-Cloud-Trace-Context": "1234567890abcdef"}) - feed_sync_dispatcher_transitland(request) diff --git a/functions-python/feed_sync_dispatcher_transitland/requirements.txt b/functions-python/feed_sync_dispatcher_transitland/requirements.txt deleted file mode 100644 index 74ecc9807..000000000 --- a/functions-python/feed_sync_dispatcher_transitland/requirements.txt +++ /dev/null @@ -1,23 +0,0 @@ -# Common packages -functions-framework==3.* -google-cloud-logging -psycopg2-binary==2.9.6 -aiohttp~=3.10.5 -asyncio~=3.4.3 -urllib3~=2.5.0 -requests~=2.32.3 -attrs~=23.1.0 -pluggy~=1.3.0 -certifi~=2025.8.3 -pandas - -# SQL Alchemy and Geo Alchemy -SQLAlchemy==2.0.23 -geoalchemy2==0.14.7 - -# Google specific packages for this function -google-cloud-pubsub -cloudevents~=1.10.1 - -# Configuration -python-dotenv==1.0.0 \ No newline at end of file diff --git a/functions-python/feed_sync_dispatcher_transitland/requirements_dev.txt b/functions-python/feed_sync_dispatcher_transitland/requirements_dev.txt deleted file mode 100644 index 9ee50adce..000000000 --- a/functions-python/feed_sync_dispatcher_transitland/requirements_dev.txt +++ /dev/null @@ -1,2 +0,0 @@ -Faker -pytest~=7.4.3 \ No newline at end of file diff --git a/functions-python/feed_sync_dispatcher_transitland/src/__init__.py b/functions-python/feed_sync_dispatcher_transitland/src/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/functions-python/feed_sync_dispatcher_transitland/src/main.py b/functions-python/feed_sync_dispatcher_transitland/src/main.py deleted file mode 100644 index cae657834..000000000 --- a/functions-python/feed_sync_dispatcher_transitland/src/main.py +++ /dev/null @@ -1,397 +0,0 @@ -# -# MobilityData 2024 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import logging -import os -import random -import time -from typing import Optional - -import functions_framework -import pandas as pd -import requests -from google.cloud.pubsub_v1.futures import Future -from requests.exceptions import RequestException, HTTPError -from sqlalchemy.orm import Session -from shared.database.database import with_db_session - -from shared.database_gen.sqlacodegen_models import Feed -from shared.helpers.feed_sync.feed_sync_common import FeedSyncProcessor, FeedSyncPayload -from shared.helpers.feed_sync.feed_sync_dispatcher import feed_sync_dispatcher -from shared.helpers.feed_sync.models import TransitFeedSyncPayload -from shared.helpers.logger import init_logger -from shared.helpers.pub_sub import get_pubsub_client, get_execution_id -from typing import Tuple, List -from collections import defaultdict - - -# Environment variables -PUBSUB_TOPIC_NAME = os.getenv("PUBSUB_TOPIC_NAME") -PROJECT_ID = os.getenv("PROJECT_ID") -TRANSITLAND_API_KEY = os.getenv("TRANSITLAND_API_KEY") -TRANSITLAND_OPERATOR_URL = os.getenv("TRANSITLAND_OPERATOR_URL") -TRANSITLAND_FEED_URL = os.getenv("TRANSITLAND_FEED_URL") - -# session instance to reuse connections -session = requests.Session() -init_logger() - - -def process_feed_urls(feed: dict, urls_in_db: List[str]) -> Tuple[List[str], List[str]]: - """ - Extracts the valid feed URLs and their corresponding entity types from the feed dictionary. If the same URL - corresponds to multiple entity types, the types are concatenated with a comma. - """ - url_keys_to_types = { - "static_current": "", - "realtime_alerts": "sa", - "realtime_trip_updates": "tu", - "realtime_vehicle_positions": "vp", - } - - urls = feed.get("urls", {}) - url_to_entity_types = defaultdict(list) - - for key, entity_type in url_keys_to_types.items(): - if (url := urls.get(key)) and (url not in urls_in_db): - if entity_type: - logging.info(f"Found URL for entity type: {entity_type}") - url_to_entity_types[url].append(entity_type) - - valid_urls = [] - entity_types = [] - - for url, types in url_to_entity_types.items(): - valid_urls.append(url) - logging.info(f"URL = {url}, Entity types = {types}") - entity_types.append(",".join(types)) - - return valid_urls, entity_types - - -class TransitFeedSyncProcessor(FeedSyncProcessor): - @with_db_session - def process_sync( - self, execution_id: Optional[str], db_session: Session - ) -> List[FeedSyncPayload]: - """ - Process data synchronously to fetch, extract, combine, filter and prepare payloads for publishing - to a queue based on conditions related to the data retrieved from TransitLand API. - """ - feeds_data_gtfs_rt = self.get_data( - TRANSITLAND_FEED_URL, TRANSITLAND_API_KEY, "gtfs_rt", session - ) - logging.info( - "Fetched %s GTFS-RT feeds from TransitLand API", - len(feeds_data_gtfs_rt["feeds"]), - ) - - feeds_data_gtfs = self.get_data( - TRANSITLAND_FEED_URL, TRANSITLAND_API_KEY, "gtfs", session - ) - logging.info( - "Fetched %s GTFS feeds from TransitLand API", len(feeds_data_gtfs["feeds"]) - ) - feeds_data = feeds_data_gtfs["feeds"] + feeds_data_gtfs_rt["feeds"] - - operators_data = self.get_data( - TRANSITLAND_OPERATOR_URL, TRANSITLAND_API_KEY, session=session - ) - logging.info( - "Fetched %s operators from TransitLand API", - len(operators_data["operators"]), - ) - all_urls = set( - [element[0] for element in db_session.query(Feed.producer_url).all()] - ) - feeds = self.extract_feeds_data(feeds_data, all_urls) - operators = self.extract_operators_data(operators_data) - - # Converts operators and feeds to pandas DataFrames - operators_df = pd.DataFrame(operators) - feeds_df = pd.DataFrame(feeds) - - # Merge operators and feeds DataFrames on 'operator_feed_id' and 'feed_id' - combined_df = pd.merge( - operators_df, - feeds_df, - left_on="operator_feed_id", - right_on="feed_id", - how="inner", - ) - - # Filtered out rows where 'feed_url' is missing - combined_df = combined_df[combined_df["feed_url"].notna()] - - # Group by 'stable_id' and concatenate 'operator_name' while keeping first values of other columns - df_grouped = ( - combined_df.groupby("stable_id") - .agg( - { - "operator_name": lambda x: ", ".join(x), - "feeds_onestop_id": "first", - "feed_id": "first", - "feed_url": "first", - "operator_feed_id": "first", - "spec": "first", - "entity_types": "first", - "country": "first", - "state_province": "first", - "city_name": "first", - "auth_info_url": "first", - "auth_param_name": "first", - "type": "first", - } - ) - .reset_index() - ) - - # Filtered out unwanted countries - countries_not_included = ["France", "Japan"] - filtered_df = df_grouped[ - ~df_grouped["country"] - .str.lower() - .isin([c.lower() for c in countries_not_included]) - ] - logging.info( - "Filtered out %s feeds from countries: %s", - len(df_grouped) - len(filtered_df), - countries_not_included, - ) - - # Filtered out URLs that return undesired status codes - filtered_df = filtered_df.drop_duplicates( - subset=["feed_url"] - ) # Drop duplicates - - # Convert filtered DataFrame to dictionary format - combined_data = filtered_df.to_dict(orient="records") - logging.info("Prepared %s feeds for publishing", len(combined_data)) - - payloads = [] - for data in combined_data: - external_id = data["feeds_onestop_id"] - feed_url = data["feed_url"] - source = "tld" - - if not self.check_external_id(db_session, external_id, source): - payload_type = "new" - else: - mbd_feed_url = self.get_mbd_feed_url(db_session, external_id, source) - if mbd_feed_url != feed_url: - payload_type = "update" - else: - continue - - # prepare payload - payload = TransitFeedSyncPayload( - external_id=external_id, - stable_id=data["stable_id"], - entity_types=data["entity_types"], - feed_id=data["feed_id"], - execution_id=execution_id, - feed_url=feed_url, - spec=data["spec"], - auth_info_url=data["auth_info_url"], - auth_param_name=data["auth_param_name"], - type=data["type"], - operator_name=data["operator_name"], - country=data["country"], - state_province=data["state_province"], - city_name=data["city_name"], - source="tld", - payload_type=payload_type, - ) - payloads.append(FeedSyncPayload(external_id=external_id, payload=payload)) - - return payloads - - def get_data( - self, - url, - api_key, - spec=None, - session=None, - max_retries=3, - initial_delay=60, - max_delay=120, - ): - """ - This function retrieves data from the specified Transitland feeds and operator endpoints. - Handles rate limits, retries, and error cases. - Returns the parsed data as a dictionary containing feeds and operators. - """ - headers = {"apikey": api_key} - params = {"spec": spec} if spec else {} - all_data = {"feeds": [], "operators": []} - delay = initial_delay - response = None - - logging.info("Fetching data from %s", url) - while url: - for attempt in range(max_retries): - try: - response = session.get( - url, headers=headers, params=params, timeout=30 - ) - response.raise_for_status() - data = response.json() - all_data["feeds"].extend(data.get("feeds", [])) - all_data["operators"].extend(data.get("operators", [])) - url = data.get("meta", {}).get("next") - logging.info( - "Fetched %s feeds and %s operators", - len(all_data["feeds"]), - len(all_data["operators"]), - ) - logging.info("Next URL: %s", url) - delay = initial_delay - break - except (RequestException, HTTPError) as e: - logging.error("Attempt %s failed: %s", attempt + 1, e) - if response is not None and response.status_code == 429: - logging.warning("Rate limit hit. Waiting for %s seconds", delay) - time.sleep(delay + random.uniform(0, 1)) - delay = min(delay * 2, max_delay) - elif attempt == max_retries - 1: - logging.error( - "Failed to fetch data after %s attempts.", max_retries - ) - return all_data - else: - logging.info("Retrying in %s seconds", delay) - time.sleep(delay) - logging.info("Finished fetching data.") - return all_data - - def extract_feeds_data(self, feeds_data: dict, urls_in_db: List[str]) -> List[dict]: - """ - This function extracts relevant data from the Transitland feeds endpoint containing feeds information. - Returns a list of dictionaries representing each feed. - """ - feeds = [] - for feed in feeds_data: - feed_urls, entity_types = process_feed_urls(feed, urls_in_db) - logging.info("Feed %s has %s valid URL(s)", feed["id"], len(feed_urls)) - logging.info("Feed %s entity types: %s", feed["id"], entity_types) - if len(feed_urls) == 0: - logging.warning("Feed URL not found for feed %s", feed["id"]) - continue - - for feed_url, entity_types in zip(feed_urls, entity_types): - if entity_types is not None and len(entity_types) > 0: - stable_id = f"{feed['id']}-{entity_types.replace(',', '_')}" - else: - stable_id = feed["id"] - logging.info("Stable ID: %s", stable_id) - feeds.append( - { - "feed_id": feed["id"], - "stable_id": stable_id, - "feed_url": feed_url, - "entity_types": entity_types if len(entity_types) > 0 else None, - "spec": feed["spec"].lower(), - "feeds_onestop_id": feed["onestop_id"], - "auth_info_url": feed["authorization"].get("info_url"), - "auth_param_name": feed["authorization"].get("param_name"), - "type": feed["authorization"].get("type"), - } - ) - return feeds - - def extract_operators_data(self, operators_data: dict) -> List[dict]: - """ - This function extracts relevant data from the Transitland operators endpoint. - Constructs a list of dictionaries containing information about each operator. - """ - operators = [] - for operator in operators_data["operators"]: - if operator.get("agencies") and operator["agencies"][0].get("places"): - places = operator["agencies"][0]["places"] - place = places[1] if len(places) > 1 else places[0] - - for related_feed in operator.get("feeds", []): - operator_data = { - "operator_name": operator.get("name"), - "operator_feed_id": related_feed["id"], - "country": place.get("adm0_name") if place else None, - "state_province": place.get("adm1_name") if place else None, - "city_name": place.get("city_name") if place else None, - } - operators.append(operator_data) - return operators - - def check_external_id( - self, db_session: Session, external_id: str, source: str - ) -> bool: - """ - Checks if the external_id exists in the public.externalid table for the given source. - :param db_session: SQLAlchemy session - :param external_id: The external_id (feeds_onestop_id) to check - :param source: The source to filter by (e.g., 'tld' for TransitLand) - :return: True if the feed exists, False otherwise - """ - results = ( - db_session.query(Feed) - .filter(Feed.externalids.any(associated_id=external_id)) - .all() - ) - return results is not None and len(results) > 0 - - def get_mbd_feed_url( - self, db_session: Session, external_id: str, source: str - ) -> Optional[str]: - """ - Retrieves the feed_url from the public.feed table in the mbd for the given external_id. - :param db_session: SQLAlchemy session - :param external_id: The external_id (feeds_onestop_id) from TransitLand - :param source: The source to filter by (e.g., 'tld' for TransitLand) - :return: feed_url in mbd if exists, otherwise None - """ - results = ( - db_session.query(Feed) - .filter(Feed.externalids.any(associated_id=external_id)) - .all() - ) - return results[0].producer_url if results else None - - def publish_callback( - self, future: Future, payload: FeedSyncPayload, topic_path: str - ): - """ - Callback function for when the message is published to Pub/Sub. - This function logs the result of the publishing operation. - """ - if future.exception(): - print( - f"Error publishing transit land feed {payload.external_id} " - f"to Pub/Sub topic {topic_path}: {future.exception()}" - ) - else: - print(f"Published transit land feed {payload.external_id}.") - - -@functions_framework.http -def feed_sync_dispatcher_transitland(request): - """ - HTTP Function entry point queries the transitland API and publishes events to a Pub/Sub topic to be processed. - """ - publisher = get_pubsub_client() - topic_path = publisher.topic_path(PROJECT_ID, PUBSUB_TOPIC_NAME) - transit_land_feed_sync_processor = TransitFeedSyncProcessor() - execution_id = get_execution_id(request, "feed-sync-dispatcher") - feed_sync_dispatcher(transit_land_feed_sync_processor, topic_path, execution_id) - return "Feed sync dispatcher executed successfully." diff --git a/functions-python/feed_sync_dispatcher_transitland/tests/test_feed_sync.py b/functions-python/feed_sync_dispatcher_transitland/tests/test_feed_sync.py deleted file mode 100644 index 0c040111f..000000000 --- a/functions-python/feed_sync_dispatcher_transitland/tests/test_feed_sync.py +++ /dev/null @@ -1,312 +0,0 @@ -import pytest -from unittest.mock import Mock, patch, call -from requests import Session as RequestsSession -from sqlalchemy.orm import Session as DBSession - -from shared.database_gen.sqlacodegen_models import Gtfsfeed -from main import ( - TransitFeedSyncProcessor, -) -import pandas as pd -from requests.exceptions import HTTPError - -from shared.helpers.feed_sync.feed_sync_common import FeedSyncPayload - - -@pytest.fixture -def processor(): - return TransitFeedSyncProcessor() - - -@patch("main.requests.Session.get") -def test_get_data(mock_get, processor): - mock_response = Mock() - mock_response.json.return_value = { - "feeds": [ - { - "id": "feed1", - "urls": {"static_current": "http://example.com/feed1"}, - "spec": "gtfs", - "onestop_id": "onestop1", - "authorization": {}, - } - ], - "operators": [], - } - mock_response.status_code = 200 - mock_get.return_value = mock_response - - result = processor.get_data( - "https://api.transit.land", "dummy_api_key", session=RequestsSession() - ) - assert "feeds" in result - assert result["feeds"][0]["id"] == "feed1" - - -@patch("main.requests.Session.get") -def test_get_data_rate_limit(mock_get, processor): - mock_response = Mock() - mock_response.status_code = 429 - mock_response.json.return_value = {"feeds": [], "operators": []} - mock_get.return_value = mock_response - - result = processor.get_data( - "https://api.transit.land", - "dummy_api_key", - session=RequestsSession(), - max_retries=1, - ) - assert result == {"feeds": [], "operators": []} - - -def test_extract_feeds_data(processor): - feeds_data = [ - { - "id": "feed1", - "urls": {"static_current": "http://example.com"}, - "spec": "gtfs", - "onestop_id": "onestop1", - "authorization": {}, - } - ] - result = processor.extract_feeds_data(feeds_data, []) - assert len(result) == 1 - assert result[0]["feed_id"] == "feed1" - - -def test_extract_operators_data(processor): - operators_data = { - "operators": [ - { - "name": "Operator 1", - "feeds": [{"id": "feed1"}], - "agencies": [{"places": [{"adm0_name": "USA"}]}], - } - ] - } - result = processor.extract_operators_data(operators_data) - assert len(result) == 1 - assert result[0]["operator_name"] == "Operator 1" - - -def test_check_external_id(processor): - mock_db_session = Mock(spec=DBSession) - mock_db_session.query.return_value.filter.return_value.all.return_value = (1,) - result = processor.check_external_id(mock_db_session, "onestop1", "TLD") - assert result is True - - mock_db_session.query.return_value.filter.return_value.all.return_value = None - result = processor.check_external_id(mock_db_session, "onestop2", "TLD") - assert result is False - - -def test_get_mbd_feed_url(processor): - mock_db_session = Mock(spec=DBSession) - mock_db_session.query.return_value.filter.return_value.all.return_value = [ - Gtfsfeed(producer_url="http://example.com/feed1") - ] - result = processor.get_mbd_feed_url(mock_db_session, "onestop1", "TLD") - assert result == "http://example.com/feed1" - - mock_db_session.query.return_value.filter.return_value.all.return_value = None - result = processor.get_mbd_feed_url(mock_db_session, "onestop2", "TLD") - assert result is None - - -def test_process_sync_new_feed(processor): - mock_db_session = Mock(spec=DBSession) - mock_db_session.query.return_value.all.return_value = [] - feeds_data = [ - { - "id": "feed1", - "urls": {"static_current": "http://example.com"}, - "spec": "gtfs", - "onestop_id": "onestop1", - "authorization": {}, - } - ] - operators_data = [ - { - "name": "Operator 1", - "feeds": [{"id": "feed1"}], - "agencies": [{"places": [{"adm0_name": "USA"}]}], - } - ] - processor.get_data = Mock( - return_value={"feeds": feeds_data, "operators": operators_data} - ) - processor.check_external_id = Mock(return_value=False) - payloads = processor.process_sync("exec123", db_session=mock_db_session) - assert len(payloads) == 1, "Expected 1 payload" - assert payloads[0].payload.payload_type == "new" - - -def test_process_sync_updated_feed(processor): - mock_db_session = Mock(spec=DBSession) - mock_db_session.query.return_value.all.return_value = [] - feeds_data = [ - { - "id": "feed1", - "urls": {"static_current": "http://example.com"}, - "spec": "gtfs", - "onestop_id": "onestop1", - "authorization": {}, - } - ] - operators_data = [ - { - "name": "Operator 1", - "feeds": [{"id": "feed1"}], - "agencies": [{"places": [{"adm0_name": "USA"}]}], - } - ] - processor.get_data = Mock( - return_value={"feeds": feeds_data, "operators": operators_data} - ) - processor.check_external_id = Mock(return_value=True) - processor.get_mbd_feed_url = Mock(return_value="http://example-2.com") - payloads = processor.process_sync("exec123", db_session=mock_db_session) - assert len(payloads) == 1, "Expected 1 payload" - assert payloads[0].payload.payload_type == "update" - - -def test_process_sync_unchanged_feed(processor): - mock_db_session = Mock(spec=DBSession) - mock_db_session.query.return_value.all.return_value = [] - feeds_data = [ - { - "id": "feed1", - "urls": {"static_current": "http://example.com"}, - "spec": "gtfs", - "onestop_id": "onestop1", - "authorization": {}, - } - ] - operators_data = [ - { - "name": "Operator 1", - "feeds": [{"id": "feed1"}], - "agencies": [{"places": [{"adm0_name": "USA"}]}], - } - ] - processor.get_data = Mock( - return_value={"feeds": feeds_data, "operators": operators_data} - ) - processor.check_external_id = Mock(return_value=True) - processor.get_mbd_feed_url = Mock(return_value="http://example.com") - payloads = processor.process_sync("exec123", db_session=mock_db_session) - assert len(payloads) == 0, "No payloads expected" - - -def test_merge_and_filter_dataframes(processor): - operators = [ - { - "operator_name": "Operator 1", - "operator_feed_id": "feed1", - "country": "USA", - "state_province": "CA", - "city_name": "San Francisco", - }, - { - "operator_name": "Operator 2", - "operator_feed_id": "feed2", - "country": "Japan", - "state_province": "Tokyo", - "city_name": "Tokyo", - }, - ] - feeds = [ - { - "feed_id": "feed1", - "feed_url": "http://example.com/feed1", - "spec": "gtfs", - "feeds_onestop_id": "onestop1", - "auth_info_url": None, - "auth_param_name": None, - "type": None, - }, - { - "feed_id": "feed2", - "feed_url": "http://example.com/feed2", - "spec": "gtfs", - "feeds_onestop_id": "onestop2", - "auth_info_url": None, - "auth_param_name": None, - "type": None, - }, - ] - - operators_df = pd.DataFrame(operators) - feeds_df = pd.DataFrame(feeds) - - combined_df = pd.merge( - operators_df, - feeds_df, - left_on="operator_feed_id", - right_on="feed_id", - how="inner", - ) - combined_df = combined_df[combined_df["feed_url"].notna()] - countries_not_included = ["France", "Japan"] - filtered_df = combined_df[ - ~combined_df["country"] - .str.lower() - .isin([c.lower() for c in countries_not_included]) - ] - - assert len(filtered_df) == 1 - assert filtered_df.iloc[0]["operator_name"] == "Operator 1" - assert filtered_df.iloc[0]["feed_id"] == "feed1" - - -def test_publish_callback_success(processor): - future = Mock() - future.exception.return_value = None - payload = FeedSyncPayload(external_id="onestop1", payload=None) - topic_path = "projects/project-id/topics/topic-name" - - with patch("builtins.print") as mock_print: - processor.publish_callback(future, payload, topic_path) - mock_print.assert_called_with("Published transit land feed onestop1.") - - -def test_publish_callback_failure(processor): - future = Mock() - future.exception.return_value = Exception("Publish error") - payload = FeedSyncPayload(external_id="onestop1", payload=None) - topic_path = "projects/project-id/topics/topic-name" - - with patch("builtins.print") as mock_print: - processor.publish_callback(future, payload, topic_path) - mock_print.assert_called_with( - f"Error publishing transit land feed onestop1 to Pub/Sub topic {topic_path}: Publish error" - ) - - -def test_get_data_retries(processor): - # Mock the requests session - mock_session = Mock(spec=RequestsSession) - - mock_response = Mock() - mock_response.raise_for_status.side_effect = HTTPError() - mock_response.status_code = 500 - - mock_session.get.return_value = mock_response - - with patch("time.sleep", return_value=None) as mock_sleep: - result = processor.get_data( - url="http://example.com", - api_key="dummy_api_key", - session=mock_session, - max_retries=3, - initial_delay=1, - max_delay=2, - ) - - assert mock_session.get.call_count == 3 - - assert mock_sleep.call_count == 2 - - mock_sleep.assert_has_calls([call(1), call(1)]) - - assert result == {"feeds": [], "operators": []} diff --git a/functions-python/feed_sync_process_transitland/.coveragerc b/functions-python/feed_sync_process_transitland/.coveragerc deleted file mode 100644 index f1916e53f..000000000 --- a/functions-python/feed_sync_process_transitland/.coveragerc +++ /dev/null @@ -1,11 +0,0 @@ -[run] -omit = - */test*/* - */database_gen/* - */dataset_service/* - */helpers/* - */shared/* - -[report] -exclude_lines = - if __name__ == .__main__.: \ No newline at end of file diff --git a/functions-python/feed_sync_process_transitland/.env.rename_me b/functions-python/feed_sync_process_transitland/.env.rename_me deleted file mode 100644 index 601002cd5..000000000 --- a/functions-python/feed_sync_process_transitland/.env.rename_me +++ /dev/null @@ -1,5 +0,0 @@ -# Environment variables for tokens function to run locally. Delete this line after rename the file. -FEEDS_DATABASE_URL=postgresql://postgres:postgres@localhost:54320/MobilityDatabase -PROJECT_ID=mobility-feeds-dev -PUBSUB_TOPIC_NAME=my-topic -DATASET_BATCH_TOPIC_NAME=dataset_batch_topic_{env}_ diff --git a/functions-python/feed_sync_process_transitland/README.md b/functions-python/feed_sync_process_transitland/README.md deleted file mode 100644 index 8420508f3..000000000 --- a/functions-python/feed_sync_process_transitland/README.md +++ /dev/null @@ -1,107 +0,0 @@ -# TLD Feed Sync Process - -Subscribed to the topic set in the `feed-sync-dispatcher` function, `feed-sync-process` is triggered for each message published. It handles the processing of feed updates, ensuring data consistency and integrity. The function performs the following operations: - -1. **Feed Status Check**: It verifies the current state of the feed in the database using external_id and source. -2. **URL Validation**: Checks if the feed URL already exists in the database. -3. **Feed Processing**: Based on the current state: - - If no existing feed is found, creates a new feed entry - - If feed exists with a different URL, creates a new feed and deprecates the old one - - If feed exists with the same URL, no action is taken -4. **Batch Processing Trigger**: For non-authenticated feeds, publishes events to the dataset batch topic for further processing. - -The function maintains feed history through the `redirectingid` table and ensures proper status tracking with 'active' and 'deprecated' states. - -# Message Format -The function expects a Pub/Sub message with the following format: -```json -{ - "message": { - "data": { - "external_id": "feed-identifier", - "feed_id": "unique-feed-id", - "feed_url": "http://example.com/feed", - "execution_id": "execution-identifier", - "spec": "gtfs", - "auth_info_url": null, - "auth_param_name": null, - "type": null, - "operator_name": "Transit Agency Name", - "country": "Country Name", - "state_province": "State/Province", - "city_name": "City Name", - "source": "TLD", - "payload_type": "new|update" - } - } -} -``` - -# Function Configuration -The function is configured using the following environment variables: -- `PROJECT_ID`: The Google Cloud project ID -- `DATASET_BATCH_TOPIC_NAME`: The name of the topic for batch processing triggers -- `FEEDS_DATABASE_URL`: The URL of the feeds database -- `ENV`: [Optional] Environment identifier (e.g., 'dev', 'prod') - -# Database Schema -The function interacts with the following tables: -1. `feed`: Stores feed information - - Contains fields like id, data_type, feed_name, producer_url, etc. - - Tracks feed status ('active' or 'deprecated') - - Uses CURRENT_TIMESTAMP for created_at - -2. `externalid`: Maps external identifiers to feed IDs - - Links external_id and source to feed entries - - Maintains source tracking - -3. `redirectingid`: Tracks feed updates - - Maps old feed IDs to new ones - - Maintains update history - -# Local development -The local development of this function follows the same steps as the other functions. - -Install Google Pub/Sub emulator, please refer to the [README.md](../README.md) file for more information. - -## Python requirements - -- Install the requirements -```bash - pip install -r ./functions-python/feed_sync_process_transitland/requirements.txt -``` - -## Test locally with Google Cloud Emulators - -- Execute the following commands to start the emulators: -```bash - gcloud beta emulators pubsub start --project=test-project --host-port='localhost:8043' -``` - -- Create a Pub/Sub topic in the emulator: -```bash - curl -X PUT "http://localhost:8043/v1/projects/test-project/topics/feed-sync-transitland" -``` - -- Start function -```bash - export PUBSUB_EMULATOR_HOST=localhost:8043 && ./scripts/function-python-run.sh --function_name feed_sync_process_transitland -``` - -- [Optional]: Create a local subscription to print published messages: -```bash -./scripts/pubsub_message_print.sh feed-sync-process-transitland -``` - -- Execute function -```bash - curl http://localhost:8080 -``` - -- To run/debug from your IDE use the file `main_local_debug.py` - -# Test -- Run the tests -```bash - ./scripts/api-tests.sh --folder functions-python/feed_sync_dispatcher_transitland -``` diff --git a/functions-python/feed_sync_process_transitland/function_config.json b/functions-python/feed_sync_process_transitland/function_config.json deleted file mode 100644 index 06254765f..000000000 --- a/functions-python/feed_sync_process_transitland/function_config.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "name": "feed-sync-process-transitland", - "description": "Feed Sync process for Transitland feeds", - "entry_point": "process_feed_event", - "timeout": 540, - "memory": "512Mi", - "trigger_http": true, - "include_folders": ["helpers"], - "include_api_folders": ["database_gen", "database", "common"], - "secret_environment_variables": [ - { - "key": "FEEDS_DATABASE_URL" - } - ], - "ingress_settings": "ALLOW_INTERNAL_AND_GCLB", - "max_instance_request_concurrency": 1, - "max_instance_count": 10, - "min_instance_count": 0, - "available_cpu": 1 -} diff --git a/functions-python/feed_sync_process_transitland/main_local_debug.py b/functions-python/feed_sync_process_transitland/main_local_debug.py deleted file mode 100644 index 60a3b1723..000000000 --- a/functions-python/feed_sync_process_transitland/main_local_debug.py +++ /dev/null @@ -1,173 +0,0 @@ -""" -Code to be able to debug locally without affecting the runtime cloud function. - -Requirements: -- Google Cloud SDK installed -- Make sure to have the following environment variables set in your .env.local file: - - PROJECT_ID - - DATASET_BATCH_TOPIC_NAME - - FEEDS_DATABASE_URL -- Local database in running state - -Usage: -- python feed_sync_process_transitland/main_local_debug.py -""" - -import base64 -import json -import os -from unittest.mock import MagicMock, patch -import logging -import sys - -import pytest -from dotenv import load_dotenv - -# Configure local logging first -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", - stream=sys.stdout, -) - -logger = logging.getLogger("feed_processor") - -# Mock the Google Cloud Logger - - -class MockLogger: - - """Mock logger class""" - - @staticmethod - def init_logger(): - return MagicMock() - - def __init__(self, name): - self.name = name - - def get_logger(self): - return logger - - def addFilter(self, filter): - pass - - -with patch("helpers.logger.Logger", MockLogger): - from feed_sync_process_transitland.src.main import process_feed_event - -# Load environment variables -load_dotenv(dotenv_path=".env.rename_me") - - -class CloudEvent: - """Cloud Event data structure.""" - - def __init__(self, attributes: dict, data: dict): - self.attributes = attributes - self.data = data - - -@pytest.fixture -def mock_pubsub(): - """Fixture to mock PubSub client""" - with patch("google.cloud.pubsub_v1.PublisherClient") as mock_publisher: - publisher_instance = MagicMock() - - def mock_topic_path(project_id, topic_id): - return f"projects/{project_id}/topics/{topic_id}" - - def mock_publish(topic_path, data): - logger.info( - f"[LOCAL DEBUG] Would publish to {topic_path}: {data.decode('utf-8')}" - ) - future = MagicMock() - future.result.return_value = "message_id" - return future - - publisher_instance.topic_path.side_effect = mock_topic_path - publisher_instance.publish.side_effect = mock_publish - mock_publisher.return_value = publisher_instance - - yield mock_publisher - - -def process_event_safely(cloud_event, description=""): - """Process event with error handling.""" - try: - logger.info(f"\nProcessing {description}:") - logger.info("-" * 50) - result = process_feed_event(cloud_event) - logger.info(f"Process result: {result}") - return True - except Exception as e: - logger.error(f"Error processing {description}: {str(e)}") - return False - - -def main(): - """Main function to run local debug tests""" - logger.info("Starting local debug session...") - - # Define test event data - test_payload = { - "external_id": "test-feed-1", - "feed_id": "feed1", - "feed_url": "https://example.com/test-feed-2", - "execution_id": "local-debug-123", - "spec": "gtfs", - "auth_info_url": None, - "auth_param_name": None, - "type": None, - "operator_name": "Test Operator", - "country": "USA", - "state_province": "CA", - "city_name": "Test City", - "source": "TLD", - "payload_type": "new", - } - - # Create cloud event - cloud_event = CloudEvent( - attributes={ - "type": "com.google.cloud.pubsub.topic.publish", - "source": f"//pubsub.googleapis.com/projects/{os.getenv('PROJECT_ID')}/topics/test-topic", - }, - data={ - "message": { - "data": base64.b64encode( - json.dumps(test_payload).encode("utf-8") - ).decode("utf-8") - } - }, - ) - - # Set up mocks - with patch( - "google.cloud.pubsub_v1.PublisherClient", new_callable=MagicMock - ) as mock_publisher, patch("google.cloud.logging.Client", MagicMock()): - publisher_instance = MagicMock() - - def mock_topic_path(project_id, topic_id): - return f"projects/{project_id}/topics/{topic_id}" - - def mock_publish(topic_path, data): - logger.info( - f"[LOCAL DEBUG] Would publish to {topic_path}: {data.decode('utf-8')}" - ) - future = MagicMock() - future.result.return_value = "message_id" - return future - - publisher_instance.topic_path.side_effect = mock_topic_path - publisher_instance.publish.side_effect = mock_publish - mock_publisher.return_value = publisher_instance - - # Process test event - process_event_safely(cloud_event, "test feed event") - - logger.info("Local debug session completed.") - - -if __name__ == "__main__": - main() diff --git a/functions-python/feed_sync_process_transitland/requirements.txt b/functions-python/feed_sync_process_transitland/requirements.txt deleted file mode 100644 index d6c998465..000000000 --- a/functions-python/feed_sync_process_transitland/requirements.txt +++ /dev/null @@ -1,26 +0,0 @@ -# Common packages -functions-framework==3.* -google-cloud-logging -psycopg2-binary==2.9.6 -aiohttp~=3.10.5 -asyncio~=3.4.3 -urllib3~=2.5.0 -requests~=2.32.3 -attrs~=23.1.0 -pluggy~=1.3.0 -certifi~=2025.8.3 - -# SQL Alchemy and Geo Alchemy -SQLAlchemy==2.0.23 -geoalchemy2==0.14.7 - -# Google specific packages for this function -google-cloud-pubsub -cloudevents~=1.10.1 - -# Additional packages for this function -pandas -pycountry - -# Configuration -python-dotenv==1.0.0 \ No newline at end of file diff --git a/functions-python/feed_sync_process_transitland/requirements_dev.txt b/functions-python/feed_sync_process_transitland/requirements_dev.txt deleted file mode 100644 index 9ee50adce..000000000 --- a/functions-python/feed_sync_process_transitland/requirements_dev.txt +++ /dev/null @@ -1,2 +0,0 @@ -Faker -pytest~=7.4.3 \ No newline at end of file diff --git a/functions-python/feed_sync_process_transitland/src/__init__.py b/functions-python/feed_sync_process_transitland/src/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/functions-python/feed_sync_process_transitland/src/feed_processor_utils.py b/functions-python/feed_sync_process_transitland/src/feed_processor_utils.py deleted file mode 100644 index ec15f7731..000000000 --- a/functions-python/feed_sync_process_transitland/src/feed_processor_utils.py +++ /dev/null @@ -1,107 +0,0 @@ -import logging -import uuid -from datetime import datetime -from typing import Tuple, Optional -from sqlalchemy.orm import Session -import requests - -from shared.common.locations_utils import create_or_get_location -from shared.helpers.feed_sync.models import TransitFeedSyncPayload as FeedPayload -from shared.database_gen.sqlacodegen_models import ( - Gtfsfeed, - Gtfsrealtimefeed, - Externalid, - Entitytype, - Feed, -) - - -def check_url_status(url: str) -> bool: - """Check if a URL is reachable.""" - try: - response = requests.head(url, timeout=10) - result = response.status_code < 400 or response.status_code == 403 - if not result: - logging.error( - "Url [%s] replied with status code: [%s]", url, response.status_code - ) - return result - except requests.RequestException: - logging.warning("Failed to reach URL: %s", url) - return False - - -def get_feed_model(spec: str) -> Tuple[type, str]: - """Map feed specification to model and type.""" - spec_lower = spec.lower().replace("-", "_") - if spec_lower == "gtfs": - return Gtfsfeed, spec_lower - if spec_lower == "gtfs_rt": - return Gtfsrealtimefeed, spec_lower - raise ValueError(f"Invalid feed specification: {spec}") - - -def get_tlnd_authentication_type(auth_type: Optional[str]) -> str: - """Map TransitLand authentication type to database format.""" - if auth_type in (None, ""): - return "0" - if auth_type == "query_param": - return "1" - if auth_type == "header": - return "2" - raise ValueError(f"Invalid authentication type: {auth_type}") - - -def create_new_feed(session: Session, stable_id: str, payload: FeedPayload) -> Feed: - """Create a new feed and its dependencies.""" - feed_type, data_type = get_feed_model(payload.spec) - - # Create new feed - new_feed = feed_type( - id=str(uuid.uuid4()), - stable_id=stable_id, - producer_url=payload.feed_url, - data_type=data_type, - authentication_type=get_tlnd_authentication_type(payload.type), - authentication_info_url=payload.auth_info_url, - api_key_parameter_name=payload.auth_param_name, - status="active", - provider=payload.operator_name, - operational_status="wip", # Default to of wip - created_at=datetime.now(), - ) - - # Add external ID relationship - external_id = Externalid( - feed_id=new_feed.id, - associated_id=payload.external_id, - source=payload.source, - ) - new_feed.externalids = [external_id] - - # Add entity types if applicable - if feed_type == Gtfsrealtimefeed and payload.entity_types: - entity_type_names = payload.entity_types.split(",") - for entity_name in entity_type_names: - entity = session.query(Entitytype).filter_by(name=entity_name).first() - if not entity: - entity = Entitytype(name=entity_name) - session.add(entity) - new_feed.entitytypes.append(entity) - - # Add location if provided - location = create_or_get_location( - session, - payload.country, - payload.state_province, - payload.city_name, - ) - if location: - new_feed.locations = [location] - logging.debug("Added location for feed %s", new_feed.id) - - # Persist the new feed - session.add(new_feed) - session.flush() - logging.info("Created new feed with ID: %s", new_feed.id) - return new_feed diff --git a/functions-python/feed_sync_process_transitland/src/main.py b/functions-python/feed_sync_process_transitland/src/main.py deleted file mode 100644 index 5c74f68a0..000000000 --- a/functions-python/feed_sync_process_transitland/src/main.py +++ /dev/null @@ -1,203 +0,0 @@ -# -# MobilityData 2024 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import base64 -import json -import logging -import os -from typing import Optional, List - -import functions_framework -from google.cloud import pubsub_v1 -from sqlalchemy.exc import SQLAlchemyError -from sqlalchemy.orm import Session - -from shared.database.database import with_db_session -from shared.helpers.logger import init_logger -from shared.database_gen.sqlacodegen_models import Feed -from shared.helpers.feed_sync.models import TransitFeedSyncPayload as FeedPayload -from feed_processor_utils import check_url_status, create_new_feed - -# Environment variables -PROJECT_ID = os.getenv("PROJECT_ID") -DATASET_BATCH_TOPIC = os.getenv("DATASET_BATCH_TOPIC_NAME") -FEEDS_DATABASE_URL = os.getenv("FEEDS_DATABASE_URL") - -init_logger() - - -class FeedProcessor: - def __init__(self, db_session: Session): - self.session = db_session - self.publisher = pubsub_v1.PublisherClient() - self.feed_stable_id: Optional[str] = None - - def process_feed(self, payload: FeedPayload) -> None: - """Process a feed based on its database state.""" - try: - logging.info( - "Processing feed: external_id=%s, feed_id=%s", - payload.external_id, - payload.feed_id, - ) - if not check_url_status(payload.feed_url): - logging.error("Feed URL not reachable: %s. Skipping.", payload.feed_url) - return - - self.feed_stable_id = f"{payload.source}-{payload.stable_id}".lower() - current_feeds = self._get_current_feeds(payload.external_id, payload.source) - - if not current_feeds: - new_feed = self._process_new_feed_or_skip(payload) - else: - new_feed = self._process_existing_feed_refs(payload, current_feeds) - - self.session.commit() - self._publish_to_batch_topic_if_needed(payload, new_feed) - except SQLAlchemyError as e: - self._rollback_transaction(f"Database error: {str(e)}") - except Exception as e: - self._rollback_transaction(f"Error processing feed: {str(e)}") - - def _process_new_feed_or_skip(self, payload: FeedPayload) -> Optional[Feed]: - """Process a new feed or skip if the URL already exists.""" - if self._check_feed_url_exists(payload.feed_url): - logging.error("Feed URL already exists: %s. Skipping.", payload.feed_url) - return - logging.info("Creating new feed for external_id: %s", payload.external_id) - return create_new_feed(self.session, self.feed_stable_id, payload) - - def _process_existing_feed_refs( - self, payload: FeedPayload, current_feeds: List[Feed] - ) -> Optional[Feed]: - """Process existing feeds, updating if necessary.""" - matching_feeds = [ - f for f in current_feeds if f.producer_url == payload.feed_url - ] - if matching_feeds: - logging.info( - "Feed with URL already exists: %s. Skipping.", payload.feed_url - ) - return - - stable_id_matches = [ - f for f in current_feeds if self.feed_stable_id in f.stable_id - ] - reference_count = len(stable_id_matches) - active_match = [f for f in stable_id_matches if f.status == "active"] - if reference_count > 0: - logging.info("Updating feed for stable_id: %s", self.feed_stable_id) - self.feed_stable_id = f"{self.feed_stable_id}_{reference_count}".lower() - new_feed = self._deprecate_old_feed(payload, active_match[0].id) - else: - logging.info( - "No matching stable_id. Creating new feed for %s.", payload.external_id - ) - new_feed = create_new_feed(self.session, self.feed_stable_id, payload) - return new_feed - - def _check_feed_url_exists(self, feed_url: str) -> bool: - """Check if a feed with the given URL exists.""" - existing_feeds = ( - self.session.query(Feed).filter_by(producer_url=feed_url).count() - ) - return existing_feeds > 0 - - def _get_current_feeds(self, external_id: str, source: str) -> List[Feed]: - """Retrieve current feeds for a given external ID and source.""" - return ( - self.session.query(Feed) - .filter(Feed.externalids.any(associated_id=external_id, source=source)) - .all() - ) - - def _deprecate_old_feed( - self, payload: FeedPayload, old_feed_id: Optional[str] - ) -> Feed: - """Update the status of an old feed and create a new one.""" - if old_feed_id: - old_feed = self.session.get(Feed, old_feed_id) - if old_feed: - old_feed.status = "deprecated" - logging.info("Deprecated old feed: %s", old_feed.id) - return create_new_feed(self.session, self.feed_stable_id, payload) - - def _publish_to_batch_topic_if_needed( - self, payload: FeedPayload, feed: Optional[Feed] - ) -> None: - """Publishes a feed to the dataset batch topic if it meets the necessary criteria.""" - if ( - feed is not None - and feed.authentication_type == "0" # Authentication type check - and payload.spec == "gtfs" # Only for GTFS feeds - ): - self._publish_to_topic(feed, payload) - - def _publish_to_topic(self, feed: Feed, payload: FeedPayload) -> None: - """Publishes the feed to the configured Pub/Sub topic.""" - topic_path = self.publisher.topic_path(PROJECT_ID, DATASET_BATCH_TOPIC) - logging.debug(f"Publishing to Pub/Sub topic: {topic_path}") - - message_data = { - "execution_id": payload.execution_id, - "producer_url": feed.producer_url, - "feed_stable_id": feed.stable_id, - "feed_id": feed.id, - "dataset_id": None, - "dataset_hash": None, - "authentication_type": feed.authentication_type, - "authentication_info_url": feed.authentication_info_url, - "api_key_parameter_name": feed.api_key_parameter_name, - } - - try: - # Convert to JSON string - json_message = json.dumps(message_data) - future = self.publisher.publish( - topic_path, data=json_message.encode("utf-8") - ) - future.add_done_callback( - lambda _: logging.info( - "Published feed %s to dataset batch topic", feed.stable_id - ) - ) - future.result() - logging.info("Message published for feed %s", feed.stable_id) - except Exception as e: - logging.error("Error publishing to dataset batch topic: %s", str(e)) - raise - - def _rollback_transaction(self, message: str) -> None: - """Rollback the current transaction and log an error.""" - logging.error(message) - self.session.rollback() - - -@with_db_session -@functions_framework.cloud_event -def process_feed_event(cloud_event, db_session: Session) -> str: - """Cloud Function entry point for feed processing.""" - try: - message_data = base64.b64decode(cloud_event.data["message"]["data"]).decode() - payload = FeedPayload(**json.loads(message_data)) - processor = FeedProcessor(db_session) - processor.process_feed(payload) - result = "Feed processing completed successfully." - logging.info(result) - return result - except Exception as e: - result = f"Error processing feed event: {str(e)}" - logging.error(result) - return result diff --git a/functions-python/feed_sync_process_transitland/tests/test_feed_processor_utils.py b/functions-python/feed_sync_process_transitland/tests/test_feed_processor_utils.py deleted file mode 100644 index 82746ec77..000000000 --- a/functions-python/feed_sync_process_transitland/tests/test_feed_processor_utils.py +++ /dev/null @@ -1,75 +0,0 @@ -from unittest.mock import patch - -import requests - -from shared.database_gen.sqlacodegen_models import Gtfsfeed, Gtfsrealtimefeed -from feed_processor_utils import ( - check_url_status, - get_feed_model, - get_tlnd_authentication_type, - create_new_feed, -) -from shared.database.database import configure_polymorphic_mappers, with_db_session -from shared.helpers.feed_sync.models import TransitFeedSyncPayload -from test_shared.test_utils.database_utils import default_db_url - - -@patch("requests.head") -def test_check_url_status(mock_head): - mock_head.return_value.status_code = 200 - assert check_url_status("http://example.com") - mock_head.return_value.status_code = 404 - assert not check_url_status("http://example.com/404") - mock_head.return_value.status_code = 403 - assert check_url_status("http://example.com/403") - mock_head.side_effect = requests.RequestException("Error") - assert not check_url_status("http://example.com/exception") - - -def test_get_feed_model(): - assert get_feed_model("gtfs") == (Gtfsfeed, "gtfs") - assert get_feed_model("gtfs_rt") == (Gtfsrealtimefeed, "gtfs_rt") - try: - get_feed_model("invalid") - assert False - except ValueError: - assert True - - -def test_get_tlnd_authentication_type() -> str: - assert get_tlnd_authentication_type(None) == "0" - assert get_tlnd_authentication_type("") == "0" - assert get_tlnd_authentication_type("query_param") == "1" - assert get_tlnd_authentication_type("header") == "2" - try: - get_tlnd_authentication_type("invalid") - assert False - except ValueError: - assert True - - -@with_db_session(db_url=default_db_url) -def test_create_new_feed_gtfs_rt(db_session): - payload = { - "spec": "gtfs_rt", - "entity_types": "tu", - "feed_url": "http://example.com", - "feed_id": "102_tu", - "stable_id": "tld-102_tu", - "type": "query_param", - "auth_info_url": "http://example.com/info", - "auth_param_name": "key", - "operator_name": "Operator 1", - "external_id": "onestop1", - "source": "tld", - "country": "USA", - "state_province": "California", - "city_name": "San Francisco", - } - feed_payload = TransitFeedSyncPayload(**payload) - configure_polymorphic_mappers() - new_feed = create_new_feed(db_session, "tld-102_tu", feed_payload) - db_session.delete(new_feed) - assert new_feed.stable_id == "tld-102_tu" - assert new_feed.data_type == "gtfs_rt" - assert len(new_feed.entitytypes) == 1 diff --git a/functions-python/feed_sync_process_transitland/tests/test_feed_sync_process.py b/functions-python/feed_sync_process_transitland/tests/test_feed_sync_process.py deleted file mode 100644 index 87b5f06fd..000000000 --- a/functions-python/feed_sync_process_transitland/tests/test_feed_sync_process.py +++ /dev/null @@ -1,355 +0,0 @@ -import base64 -import json -from unittest import mock -from unittest.mock import patch, Mock, MagicMock - -import pytest -from sqlalchemy.exc import SQLAlchemyError -from sqlalchemy.orm import Session as DBSession - -from shared.database_gen.sqlacodegen_models import Feed, Gtfsfeed -from shared.helpers.feed_sync.models import TransitFeedSyncPayload as FeedPayload - -from main import FeedProcessor, process_feed_event - - -@pytest.fixture -def mock_feed(): - """Fixture for a Feed model instance""" - return Mock() - - -@pytest.fixture -def mock_external_id(): - """Fixture for an ExternalId model instance""" - return Mock() - - -@pytest.fixture -def mock_location(): - """Fixture for a Location model instance""" - return Mock() - - -@pytest.fixture -def mock_db_session(): - return Mock() - - -@pytest.fixture -def feed_payload(): - """Fixture for feed payload.""" - return FeedPayload( - external_id="test123", - stable_id="feed1", - feed_id="feed1", - feed_url="https://example.com", - execution_id="exec123", - spec="gtfs", - auth_info_url=None, - auth_param_name=None, - type=None, - operator_name="Test Operator", - country="United States", - state_province="CA", - city_name="Test City", - source="TLD", - payload_type="new", - ) - - -@mock.patch.dict( - "os.environ", - { - "GOOGLE_APPLICATION_CREDENTIALS": "dummy-credentials.json", - }, -) -class TestFeedProcessor: - """Test suite for FeedProcessor.""" - - @pytest.fixture - def processor(self): - """Fixture for FeedProcessor with mocked dependencies.""" - # mock for the database session - mock_session = Mock(spec=DBSession) - - # Mock the PublisherClient - with patch("google.cloud.pubsub_v1.PublisherClient") as MockPublisherClient: - mock_publisher = MockPublisherClient.return_value - processor = FeedProcessor(mock_session) - processor.publisher = mock_publisher - mock_publisher.topic_path = Mock() - mock_publisher.publish = Mock() - - mock_query = Mock() - mock_filter = Mock() - mock_query.filter.return_value = mock_filter - mock_filter.first.return_value = None - mock_session.query.return_value = mock_query - - return processor - - @staticmethod - def _create_payload_dict(feed_payload: FeedPayload) -> dict: - """Helper method to create a payload dictionary from a FeedPayload object.""" - return { - "external_id": feed_payload.external_id, - "feed_id": feed_payload.feed_id, - "feed_url": feed_payload.feed_url, - "execution_id": feed_payload.execution_id, - "spec": feed_payload.spec, - "auth_info_url": feed_payload.auth_info_url, - "auth_param_name": feed_payload.auth_param_name, - "type": feed_payload.type, - "operator_name": feed_payload.operator_name, - "country": feed_payload.country, - "state_province": feed_payload.state_province, - "city_name": feed_payload.city_name, - "source": feed_payload.source, - "payload_type": feed_payload.payload_type, - } - - def test_get_current_feed_info(self, processor, feed_payload): - """Test retrieving current feed information.""" - # Mock database query - processor.session.query.return_value.filter.return_value.all.return_value = [ - Feed( - id="feed-uuid", - producer_url="https://example.com/feed", - stable_id="TLD-test123", - status="active", - ) - ] - - feeds = processor._get_current_feeds( - feed_payload.external_id, feed_payload.source - ) - - # Assertions - assert len(feeds) == 1 - feed_id, url = feeds[0].id, feeds[0].producer_url - assert feed_id == "feed-uuid" - assert url == "https://example.com/feed" - - # Test case when feed does not exist - processor.session.query.return_value.filter.return_value.all.return_value = [] - feeds = processor._get_current_feeds( - feed_payload.external_id, feed_payload.source - ) - assert len(feeds) == 0 - - def test_check_feed_url_exists_comprehensive(self, processor): - """Test comprehensive feed URL existence checks.""" - test_url = "https://example.com/feed" - - # Test case 1: Active feed exists - processor.session.query.return_value.filter_by.return_value.count.return_value = ( - 1 - ) - - result = processor._check_feed_url_exists(test_url) - assert result is True - - @patch("main.check_url_status") - def test_database_error_handling( - self, mock_check_url_status, processor, feed_payload - ): - """Test database error handling in different scenarios.""" - - mock_check_url_status.return_value = True - # Test case 1: General database error during feed processing - processor.session.query.side_effect = SQLAlchemyError("Database error") - processor._rollback_transaction = MagicMock(return_value=None) - processor.process_feed(feed_payload) - - processor._rollback_transaction.assert_called_once() - - def test_publish_to_batch_topic_comprehensive(self, processor, feed_payload): - """Test publishing to batch topic including success, error, and message format validation.""" - - # Test case 1: Successful publish with message format validation - processor.publisher.topic_path.return_value = "test_topic" - mock_future = Mock() - processor.publisher.publish.return_value = mock_future - - processor._publish_to_batch_topic_if_needed( - feed_payload, - Feed( - id="test-id", - authentication_type="0", - producer_url=feed_payload.feed_url, - stable_id=f"{feed_payload.source}-{feed_payload.feed_id}".lower(), - ), - ) - - # Verify publish was called and message format - topic_arg, message_arg = processor.publisher.publish.call_args - assert topic_arg == ("test_topic",) - assert "feed_stable_id" in json.loads(message_arg["data"]) - assert "tld-feed1" == json.loads(message_arg["data"])["feed_stable_id"] - - def test_process_feed_event_validation(self): - """Test feed event processing with various invalid payloads.""" - - # Test case 1: Empty payload - empty_payload_data = base64.b64encode(json.dumps({}).encode("utf-8")).decode() - cloud_event = Mock() - cloud_event.data = {"message": {"data": empty_payload_data}} - - process_feed_event(cloud_event) - - # Test case 2: Invalid field - invalid_payload_data = base64.b64encode( - json.dumps({"invalid": "data"}).encode("utf-8") - ).decode() - cloud_event.data = {"message": {"data": invalid_payload_data}} - - process_feed_event(cloud_event) - - # Test case 3: Type error - type_error_payload = {"external_id": 12345, "feed_url": True, "feed_id": None} - payload_data = base64.b64encode( - json.dumps(type_error_payload).encode("utf-8") - ).decode() - cloud_event.data = {"message": {"data": payload_data}} - - process_feed_event(cloud_event) - - def test_process_feed_event_pubsub_error( - self, processor, feed_payload, mock_db_session - ): - """Test feed event processing handles missing credentials error.""" - # Create cloud event with valid payload - payload_dict = self._create_payload_dict(feed_payload) - payload_data = base64.b64encode( - json.dumps(payload_dict).encode("utf-8") - ).decode() - - # Create cloud event mock with minimal required structure - cloud_event = Mock() - cloud_event.data = {"message": {"data": payload_data}} - - # Mock database session with minimal setup - mock_session = MagicMock() - mock_session.query.return_value.filter.return_value.all.return_value = [] - - process_feed_event(cloud_event, db_session=mock_session) - - def test_process_feed_event_malformed_cloud_event(self): - """Test feed event processing with malformed cloud event.""" - # Test case 1: Missing message data - cloud_event = Mock() - cloud_event.data = {} - - process_feed_event(cloud_event) - - # Test case 2: Invalid base64 data - cloud_event.data = {"message": {"data": "invalid-base64"}} - - process_feed_event(cloud_event) - - def test_process_feed_event_invalid_json(self): - """Test handling of invalid JSON in cloud event""" - # Create invalid base64 encoded JSON - invalid_json = base64.b64encode(b'{"invalid": "json"').decode() - - cloud_event = Mock() - cloud_event.data = {"message": {"data": invalid_json}} - - # Process the event - result = process_feed_event(cloud_event) - - # Verify error handling - assert result.startswith("Error processing feed event") - - @patch("main.create_new_feed") - def test_process_new_feed_or_skip( - self, create_new_feed_mock, processor, feed_payload - ): - """Test processing new feed or skipping existing feed.""" - processor._check_feed_url_exists = MagicMock() - # Test case 1: New feed - processor._check_feed_url_exists.return_value = False - processor._process_new_feed_or_skip(feed_payload) - create_new_feed_mock.assert_called_once() - - @patch("main.create_new_feed") - def test_process_new_feed_skip(self, create_new_feed_mock, processor, feed_payload): - """Test processing new feed or skipping existing feed.""" - processor._check_feed_url_exists = MagicMock() - # Test case 2: Existing feed - processor._check_feed_url_exists.return_value = True - processor._process_new_feed_or_skip(feed_payload) - create_new_feed_mock.assert_not_called() - - @patch("main.create_new_feed") - def test_process_existing_feed_refs( - self, create_new_feed_mock, processor, feed_payload - ): - """Test processing existing feed references.""" - # 1. Existing feed with same url - matching_feeds = [ - Gtfsfeed( - id="feed-uuid", - producer_url="https://example.com", - stable_id="TLD-test123", - status="active", - ) - ] - new_feed = processor._process_existing_feed_refs(feed_payload, matching_feeds) - assert new_feed is None - - # 2. Existing feed with same stable_id - matching_feeds = [ - Gtfsfeed( - id="feed-uuid", - producer_url="https://example.com/different", - stable_id="tld-feed1", - status="active", - ) - ] - processor.feed_stable_id = "tld-feed1" - processor._deprecate_old_feed = MagicMock( - return_value=Feed( - id="feed-uuid", - producer_url="https://example.com/different", - stable_id="tld-feed1_2", - status="active", - ) - ) - new_feed = processor._process_existing_feed_refs(feed_payload, matching_feeds) - assert new_feed is not None - - # 3. No existing feed with same stable_id - matching_feeds = [ - Gtfsfeed( - id="feed-uuid", - producer_url="https://example.com/different", - stable_id="tld-different", - status="active", - ) - ] - processor.feed_stable_id = "tld-feed1" - _ = processor._process_existing_feed_refs(feed_payload, matching_feeds) - create_new_feed_mock.assert_called_once() - - @patch("main.create_new_feed") - def test_update_feed(self, create_new_feed_mock, processor, feed_payload): - """Test updating an existing feed.""" - # No matching feed - processor._deprecate_old_feed(feed_payload, None) - create_new_feed_mock.assert_called_once() - # Provided id but no db entity - processor.session.get.return_value = None - processor._deprecate_old_feed(feed_payload, "feed-uuid") - create_new_feed_mock.assert_called() - # Update existing feed - returned_feed = Gtfsfeed( - id="feed-uuid", - producer_url="https://example.com", - stable_id="TLD-test123", - status="active", - ) - processor.session.get.return_value = returned_feed - processor._deprecate_old_feed(feed_payload, "feed-uuid") - assert returned_feed.status == "deprecated" diff --git a/infra/functions-python/main.tf b/infra/functions-python/main.tf index 204986c57..668542b93 100644 --- a/infra/functions-python/main.tf +++ b/infra/functions-python/main.tf @@ -48,12 +48,6 @@ locals { function_reverse_geolocation_populate_config = jsondecode(file("${path.module}/../../functions-python/reverse_geolocation_populate/function_config.json")) function_reverse_geolocation_populate_zip = "${path.module}/../../functions-python/reverse_geolocation_populate/.dist/reverse_geolocation_populate.zip" - function_feed_sync_dispatcher_transitland_config = jsondecode(file("${path.module}/../../functions-python/feed_sync_dispatcher_transitland/function_config.json")) - function_feed_sync_dispatcher_transitland_zip = "${path.module}/../../functions-python/feed_sync_dispatcher_transitland/.dist/feed_sync_dispatcher_transitland.zip" - - function_feed_sync_process_transitland_config = jsondecode(file("${path.module}/../../functions-python/feed_sync_process_transitland/function_config.json")) - function_feed_sync_process_transitland_zip = "${path.module}/../../functions-python/feed_sync_process_transitland/.dist/feed_sync_process_transitland.zip" - function_operations_api_config = jsondecode(file("${path.module}/../../functions-python/operations_api/function_config.json")) function_operations_api_zip = "${path.module}/../../functions-python/operations_api/.dist/operations_api.zip" @@ -180,19 +174,8 @@ resource "google_storage_bucket_object" "gbfs_validation_report_zip" { source = local.function_gbfs_validation_report_zip } -# 6. Feed sync dispatcher transitland -resource "google_storage_bucket_object" "feed_sync_dispatcher_transitland_zip" { - bucket = google_storage_bucket.functions_bucket.name - name = "feed-sync-dispatcher-transitland-${substr(filebase64sha256(local.function_feed_sync_dispatcher_transitland_zip), 0, 10)}.zip" - source = local.function_feed_sync_dispatcher_transitland_zip -} - -# 7. Feed sync process transitland -resource "google_storage_bucket_object" "feed_sync_process_transitland_zip" { - bucket = google_storage_bucket.functions_bucket.name - name = "feed-sync-process-transitland-${substr(filebase64sha256(local.function_feed_sync_process_transitland_zip), 0, 10)}.zip" - source = local.function_feed_sync_process_transitland_zip -} +# 6. Feed sync dispatcher transitland - Deleted +# # 7. Feed sync process transitland - Deleted # 8. Operations API resource "google_storage_bucket_object" "operations_api_zip" { @@ -596,27 +579,6 @@ resource "google_cloud_scheduler_job" "tdg_import_schedule" { } -resource "google_cloud_scheduler_job" "transit_land_scraping_scheduler" { - name = "transitland-scraping-scheduler-${var.environment}" - description = "Schedule the transitland scraping function" - time_zone = "Etc/UTC" - schedule = var.transitland_scraping_schedule - region = var.gcp_region - paused = var.environment == "prod" ? false : true - depends_on = [google_cloudfunctions2_function.feed_sync_dispatcher_transitland, google_cloudfunctions2_function_iam_member.transitland_feeds_dispatcher_invoker] - http_target { - http_method = "POST" - uri = google_cloudfunctions2_function.feed_sync_dispatcher_transitland.url - oidc_token { - service_account_email = google_service_account.functions_service_account.email - } - headers = { - "Content-Type" = "application/json" - } - } - attempt_deadline = "320s" -} - # 5.3 Create function that subscribes to the Pub/Sub topic resource "google_cloudfunctions2_function" "gbfs_validator_pubsub" { name = "${local.function_gbfs_validation_report_config.name}-pubsub" @@ -671,59 +633,6 @@ resource "google_cloudfunctions2_function" "gbfs_validator_pubsub" { } } -# 6. functions/feed_sync_dispatcher_transitland cloud function -# 6.1 Create Pub/Sub topic -resource "google_pubsub_topic" "transitland_feeds_dispatch" { - name = "transitland-feeds-dispatch" -} -# 6.2 Create batch function that publishes to the Pub/Sub topic -resource "google_cloudfunctions2_function" "feed_sync_dispatcher_transitland" { - name = "${local.function_feed_sync_dispatcher_transitland_config.name}-batch" - description = local.function_feed_sync_dispatcher_transitland_config.description - location = var.gcp_region - depends_on = [google_project_iam_member.event-receiving, google_secret_manager_secret_iam_member.secret_iam_member] - - build_config { - runtime = var.python_runtime - entry_point = local.function_feed_sync_dispatcher_transitland_config.entry_point - source { - storage_source { - bucket = google_storage_bucket.functions_bucket.name - object = google_storage_bucket_object.feed_sync_dispatcher_transitland_zip.name - } - } - } - service_config { - environment_variables = { - PROJECT_ID = var.project_id - PYTHONNODEBUGRANGES = 0 - PUBSUB_TOPIC_NAME = google_pubsub_topic.transitland_feeds_dispatch.name - TRANSITLAND_API_KEY=var.transitland_api_key - TRANSITLAND_OPERATOR_URL="https://transit.land/api/v2/rest/operators" - TRANSITLAND_FEED_URL="https://transit.land/api/v2/rest/feeds" - } - available_memory = local.function_feed_sync_dispatcher_transitland_config.available_memory - timeout_seconds = local.function_feed_sync_dispatcher_transitland_config.timeout - available_cpu = local.function_feed_sync_dispatcher_transitland_config.available_cpu - max_instance_request_concurrency = local.function_feed_sync_dispatcher_transitland_config.max_instance_request_concurrency - max_instance_count = local.function_feed_sync_dispatcher_transitland_config.max_instance_count - min_instance_count = local.function_feed_sync_dispatcher_transitland_config.min_instance_count - service_account_email = google_service_account.functions_service_account.email - ingress_settings = local.function_feed_sync_dispatcher_transitland_config.ingress_settings - vpc_connector = data.google_vpc_access_connector.vpc_connector.id - vpc_connector_egress_settings = "PRIVATE_RANGES_ONLY" - dynamic "secret_environment_variables" { - for_each = local.function_feed_sync_dispatcher_transitland_config.secret_environment_variables - content { - key = secret_environment_variables.value["key"] - project_id = var.project_id - secret = "${upper(var.environment)}_${secret_environment_variables.value["key"]}" - version = "latest" - } - } - } -} - # 7. functions/operations_api cloud function resource "google_cloudfunctions2_function" "operations_api" { name = "${local.function_operations_api_config.name}" @@ -769,58 +678,6 @@ resource "google_cloudfunctions2_function" "operations_api" { } } } -# 8. functions/feed_sync_process_transitland cloud function -resource "google_cloudfunctions2_function" "feed_sync_process_transitland" { - name = "${local.function_feed_sync_process_transitland_config.name}-pubsub" - description = local.function_feed_sync_process_transitland_config.description - location = var.gcp_region - depends_on = [google_project_iam_member.event-receiving, google_secret_manager_secret_iam_member.secret_iam_member] - event_trigger { - trigger_region = var.gcp_region - service_account_email = google_service_account.functions_service_account.email - event_type = "google.cloud.pubsub.topic.v1.messagePublished" - pubsub_topic = google_pubsub_topic.transitland_feeds_dispatch.id - retry_policy = "RETRY_POLICY_RETRY" - } - build_config { - runtime = var.python_runtime - entry_point = local.function_feed_sync_process_transitland_config.entry_point - source { - storage_source { - bucket = google_storage_bucket.functions_bucket.name - object = google_storage_bucket_object.feed_sync_process_transitland_zip.name - } - } - } - service_config { - available_memory = local.function_feed_sync_process_transitland_config.memory - timeout_seconds = local.function_feed_sync_process_transitland_config.timeout - available_cpu = local.function_feed_sync_process_transitland_config.available_cpu - max_instance_request_concurrency = local.function_feed_sync_process_transitland_config.max_instance_request_concurrency - max_instance_count = local.function_feed_sync_process_transitland_config.max_instance_count - min_instance_count = local.function_feed_sync_process_transitland_config.min_instance_count - service_account_email = google_service_account.functions_service_account.email - ingress_settings = var.environment == "dev" ? "ALLOW_ALL" : local.function_feed_sync_process_transitland_config.ingress_settings - vpc_connector = data.google_vpc_access_connector.vpc_connector.id - vpc_connector_egress_settings = "PRIVATE_RANGES_ONLY" - environment_variables = { - PYTHONNODEBUGRANGES = 0 - DB_REUSE_SESSION = "True" - PROJECT_ID = var.project_id - PUBSUB_TOPIC_NAME = google_pubsub_topic.transitland_feeds_dispatch.name - DATASET_BATCH_TOPIC_NAME = data.google_pubsub_topic.datasets_batch_topic.name - } - dynamic "secret_environment_variables" { - for_each = local.function_feed_sync_process_transitland_config.secret_environment_variables - content { - key = secret_environment_variables.value["key"] - project_id = var.project_id - secret = "${upper(var.environment)}_${secret_environment_variables.value["key"]}" - version = "latest" - } - } - } -} # 9. functions/backfill_dataset_service_date_range cloud function # Fills all the NULL values for service date range in the gtfs datasets table @@ -1518,19 +1375,10 @@ resource "google_cloudfunctions2_function_iam_member" "gbfs_validator_batch_invo member = "serviceAccount:${google_service_account.functions_service_account.email}" } -resource "google_cloudfunctions2_function_iam_member" "transitland_feeds_dispatcher_invoker" { - project = var.project_id - location = var.gcp_region - cloud_function = google_cloudfunctions2_function.feed_sync_dispatcher_transitland.name - role = "roles/cloudfunctions.invoker" - member = "serviceAccount:${google_service_account.functions_service_account.email}" -} - # Grant permissions to the service account to publish to the pubsub topic resource "google_pubsub_topic_iam_member" "functions_publisher" { for_each = { validate_gbfs_feed = google_pubsub_topic.validate_gbfs_feed.name - feed_sync_dispatcher_transitland = google_pubsub_topic.transitland_feeds_dispatch.name dataset_batch = data.google_pubsub_topic.datasets_batch_topic.name } @@ -1544,7 +1392,6 @@ resource "google_pubsub_topic_iam_member" "functions_publisher" { resource "google_pubsub_topic_iam_member" "functions_subscriber" { for_each = { validate_gbfs_feed = google_pubsub_topic.validate_gbfs_feed.name - feed_sync_dispatcher_transitland = google_pubsub_topic.transitland_feeds_dispatch.name } project = var.project_id