Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
with:
node-version: 22.x
- name: Start MongoDB
uses: supercharge/mongodb-github-action@1.3.0
uses: supercharge/mongodb-github-action@1.12.1
with:
mongodb-version: 4.2
- name: Setup Maven Cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.common.collect.Lists;
import com.mongodb.client.model.Accumulators;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Sorts;
import com.mongodb.client.model.UnwindOptions;
import com.mongodb.client.model.Variable;
import org.bson.Document;
import org.bson.conversions.Bson;

import java.time.LocalDate;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
Expand All @@ -26,15 +26,19 @@
import static com.conveyal.datatools.manager.DataManager.hasConfigProperty;
import static com.conveyal.datatools.manager.DataManager.isExtensionEnabled;
import static com.conveyal.datatools.manager.DataManager.isModuleEnabled;
import static com.mongodb.client.model.Aggregates.group;
import static com.mongodb.client.model.Aggregates.limit;
import static com.mongodb.client.model.Aggregates.lookup;
import static com.mongodb.client.model.Aggregates.match;
import static com.mongodb.client.model.Aggregates.project;
import static com.mongodb.client.model.Aggregates.replaceRoot;
import static com.mongodb.client.model.Aggregates.sort;
import static com.mongodb.client.model.Aggregates.unwind;
import static com.mongodb.client.model.Filters.expr;
import static com.mongodb.client.model.Filters.in;
import static com.mongodb.client.model.Projections.computed;
import static com.mongodb.client.model.Projections.fields;
import static com.mongodb.client.model.Projections.include;
import static com.mongodb.client.model.Sorts.descending;
import static java.util.Objects.requireNonNullElse;

/**
Expand Down Expand Up @@ -164,8 +168,10 @@ public static List<FeedSourceSummary> getFeedSourceSummaries(String projectId, S
match(
in("projectId", projectId)
),

// Project only necessary fields early to reduce document size.
project(
Projections.fields(Projections.include(
include(
"_id",
"name",
"deployable",
Expand All @@ -174,11 +180,12 @@ public static List<FeedSourceSummary> getFeedSourceSummaries(String projectId, S
"labelIds",
"url",
"filename",
"noteIds")
"noteIds"
)
),
sort(Sorts.ascending("name"))
);

return extractFeedSourceSummaries(projectId, organizationId, stages);
}

Expand All @@ -188,31 +195,103 @@ public static List<FeedSourceSummary> getFeedSourceSummaries(String projectId, S
* If this is updated, be sure to also update the matching Mongo query.
*/
public static Map<String, FeedVersionSummary> getLatestFeedVersionForFeedSources(String projectId) {
List<Bson> stages = Lists.newArrayList(
List<Bson> feedVersionPipeline = Arrays.asList(
// Match FeedVersion documents where feedSourceId equals the feedSourceId passed from the outer document.
match(
in("projectId", projectId)
expr(
new Document("$eq", Arrays.asList("$feedSourceId", "$$feedSourceId"))
)
),
lookup("FeedVersion", "_id", "feedSourceId", "feedVersions"),
lookup("FeedVersion", "publishedVersionId", "namespace", "publishedFeedVersion"),
unwind("$feedVersions"),
unwind("$publishedFeedVersion", new UnwindOptions().preserveNullAndEmptyArrays(true)),
sort(Sorts.descending("feedVersions.version")),
group(
"$_id",
Accumulators.first("publishedFeedVersionErrorCount", "$publishedFeedVersion.validationResult.errorCount"),
Accumulators.first("publishedFeedVersionStartDate", "$publishedFeedVersion.validationResult.firstCalendarDate"),
Accumulators.first("publishedFeedVersionEndDate", "$publishedFeedVersion.validationResult.lastCalendarDate"),
Accumulators.first("publishedVersionId", "$publishedVersionId"),
Accumulators.first("feedVersionId", "$feedVersions._id"),
Accumulators.first("firstCalendarDate", "$feedVersions.validationResult.firstCalendarDate"),
Accumulators.first("lastCalendarDate", "$feedVersions.validationResult.lastCalendarDate"),
Accumulators.first("errorCount", "$feedVersions.validationResult.errorCount"),
Accumulators.first("processedByExternalPublisher", "$feedVersions.processedByExternalPublisher"),
Accumulators.first("sentToExternalPublisher", "$feedVersions.sentToExternalPublisher"),
Accumulators.first("gtfsPlusValidation", "$feedVersions.gtfsPlusValidation"),
Accumulators.first("namespace", "$feedVersions.namespace")
sort(descending("version")),
limit(1),
// Project only the fields needed from the FeedVersion to reduce payload size.
project(
include(
"version",
"_id",
"validationResult",
"processedByExternalPublisher",
"sentToExternalPublisher",
"gtfsPlusValidation",
"namespace"
)
)
);

// Define the variable passed into the lookup pipeline.
List<Variable<String>> feedSourceId = List.of(new Variable<>("feedSourceId", "$_id"));

// $lookup that uses the above pipeline to produce "latestFeedVersion" (an array with at most one element).
Bson lookupLatestFeedVersion = lookup(
"FeedVersion",
feedSourceId,
feedVersionPipeline,
"latestFeedVersion"
);

// Pipeline to find the published FeedVersion by namespace (or identifier stored in publishedVersionId)
List<Bson> publishedFeedVersionPipeline = Arrays.asList(
// Match FeedVersion documents where namespace equals the outer document's publishedVersionId.
match(
expr(
new Document("$eq", Arrays.asList("$namespace", "$$publishedVersionId"))
)
),
limit(1),
// Project only the validationResult because that's all that is needed later.
project(include("validationResult"))
);

// Pass publishedVersionId from the local document into the lookup pipeline.
List<Variable<String>> publishedVersionId = List.of(new Variable<>("publishedVersionId", "$publishedVersionId"));

// $lookup that uses the above pipeline to produce "publishedFeedVersion" (an array with at most one element).
Bson lookupPublishedFeedVersion = lookup(
"FeedVersion",
publishedVersionId,
publishedFeedVersionPipeline,
"publishedFeedVersion"
);

// Top-level aggregation stages that combine the lookups and map required fields into a slimmed down result.
List<Bson> stages = Arrays.asList(
// Start by filtering documents by projectId (reduces the number of input documents early).
match(in("projectId", projectId)),

// Attach the latest FeedVersion (as an array "latestFeedVersion").
lookupLatestFeedVersion,

// Attach the published FeedVersion (as an array "publishedFeedVersion").
lookupPublishedFeedVersion,

// Unwind the latestFeedVersion array into a single document.
unwind("$latestFeedVersion", new UnwindOptions().preserveNullAndEmptyArrays(true)),

// Unwind the publishedFeedVersion array into a single document.
unwind("$publishedFeedVersion", new UnwindOptions().preserveNullAndEmptyArrays(true)),

// Final projection: select and compute only the fields needed for the output to minimize size.
project(fields(
// keep the raw publishedVersionId field for reference.
include("publishedVersionId"),

// Published feed version fields (mapped from the nested publishedFeedVersion.validationResult).
computed("publishedFeedVersionErrorCount", "$publishedFeedVersion.validationResult.errorCount"),
computed("publishedFeedVersionStartDate", "$publishedFeedVersion.validationResult.firstCalendarDate"),
computed("publishedFeedVersionEndDate", "$publishedFeedVersion.validationResult.lastCalendarDate"),

// Latest feed version fields (mapped from the nested latestFeedVersion).
computed("feedVersionId", "$latestFeedVersion._id"),
computed("firstCalendarDate", "$latestFeedVersion.validationResult.firstCalendarDate"),
computed("lastCalendarDate", "$latestFeedVersion.validationResult.lastCalendarDate"),
computed("errorCount", "$latestFeedVersion.validationResult.errorCount"),
computed("processedByExternalPublisher", "$latestFeedVersion.processedByExternalPublisher"),
computed("sentToExternalPublisher", "$latestFeedVersion.sentToExternalPublisher"),
computed("gtfsPlusValidation", "$latestFeedVersion.gtfsPlusValidation"),
computed("namespace", "$latestFeedVersion.namespace")
))
);

return extractFeedVersionSummaries(
"FeedSource",
"feedVersionId",
Expand All @@ -228,27 +307,63 @@ public static Map<String, FeedVersionSummary> getLatestFeedVersionForFeedSources
* If this is updated, be sure to also update the matching Mongo query.
*/
public static Map<String, FeedVersionSummary> getFeedVersionsFromLatestDeployment(String projectId) {
List<Bson> stages = Lists.newArrayList(
match(
in("_id", projectId)
),
lookup("Deployment", "_id", "projectId", "deployments"),
unwind("$deployments"),
replaceRoot("$deployments"),
sort(Sorts.descending("lastUpdated")),
limit(1),
lookup("FeedVersion", "feedVersionIds", "_id", "feedVersions"),
unwind("$feedVersions"),
replaceRoot("$feedVersions"),
List<Bson> stages = new ArrayList<>();
stages.add(match(in("_id", projectId)));

// Lookup Deployments for the project.
stages.add(lookup(
"Deployment",
"_id",
"projectId",
"deployments"
));

// Unwind deployments array to get individual deployment documents.
stages.add(unwind("$deployments"));

// Project only fields needed from deployment to reduce doc size before sorting.
stages.add(project(fields(
computed("deployment", "$deployments._id"),
computed("lastUpdated", "$deployments.lastUpdated"),
computed("feedVersionIds", "$deployments.feedVersionIds")
)));

// Sort deployments by lastUpdated descending.
stages.add(sort(descending("lastUpdated")));
stages.add(limit(1));

List<Bson> feedVersionPipeline = Arrays.asList(
match(expr(new Document("$in", Arrays.asList("$_id", "$$feedVersionIds")))),
project(
Projections.fields(Projections.include(
include(
"feedSourceId",
"validationResult.firstCalendarDate",
"validationResult.lastCalendarDate",
"validationResult.errorCount")
"validationResult.errorCount"
)
)
);

// Use pipeline form of lookup to fetch FeedVersions matching deployment’s feedVersionIds
List<Variable<String>> feedVersionIds = List.of(new Variable<>("feedVersionIds", "$feedVersionIds"));

stages.add(lookup(
"FeedVersion",
feedVersionIds,
feedVersionPipeline,
"feedVersions"
));
stages.add(unwind("$feedVersions", new UnwindOptions().preserveNullAndEmptyArrays(false)));
stages.add(replaceRoot("$feedVersions"));
// Final projection: select and compute only the fields needed for the output to minimize size.
stages.add(project(
include(
"_id",
"feedSourceId",
"validationResult"
)
));

return extractFeedVersionSummaries(
"Project",
"_id",
Expand All @@ -263,27 +378,46 @@ public static Map<String, FeedVersionSummary> getFeedVersionsFromLatestDeploymen
* <a href="src/main/resources/mongo/getFeedVersionsFromPinnedDeployment.js">getFeedVersionsFromPinnedDeployment.js</a>.
*/
public static Map<String, FeedVersionSummary> getFeedVersionsFromPinnedDeployment(String projectId) {
List<Bson> stages = Lists.newArrayList(
List<Bson> stages = new ArrayList<>();

// Match projects by projectId.
stages.add(match(in("_id", projectId)));

// Project only pinnedDeploymentId to keep doc small.
stages.add(project(include("pinnedDeploymentId")));

// Lookup Deployment documents by pinnedDeploymentId.
stages.add(lookup("Deployment", "pinnedDeploymentId", "_id", "deployment"));

// Unwind deployment array (assuming single deployment per project).
stages.add(unwind("$deployment"));

// Define pipeline in $lookup to filter and project FeedVersion docs.
List<Bson> feedVersionPipeline = Arrays.asList(
match(
in("_id", projectId)
),
project(
Projections.fields(Projections.include("pinnedDeploymentId"))
expr(
new Document("$in", Arrays.asList("$_id", "$$feedVersionIds"))
)
),
lookup("Deployment", "pinnedDeploymentId", "_id", "deployment"),
unwind("$deployment"),
lookup("FeedVersion", "deployment.feedVersionIds", "_id", "feedVersions"),
unwind("$feedVersions"),
replaceRoot("$feedVersions"),
project(
Projections.fields(Projections.include(
include(
"_id",
"feedSourceId",
"validationResult.firstCalendarDate",
"validationResult.lastCalendarDate",
"validationResult.errorCount")
"validationResult.errorCount"
)
)
);

// Define variable for correlated lookup on FeedVersion collection.
List<Variable<String>> feedVersionIds = List.of(
new Variable<>("feedVersionIds", "$deployment.feedVersionIds")
);

// Lookup FeedVersion docs with pipeline and store as feedVersions array.
stages.add(lookup("FeedVersion", feedVersionIds, feedVersionPipeline, "feedVersions"));

return extractFeedVersionSummaries(
"Project",
"_id",
Expand Down
Loading
Loading