From eca5053a19e1df1b69c72a15441607ec4600e478 Mon Sep 17 00:00:00 2001 From: Julien Date: Sat, 14 Feb 2026 08:11:45 +0000 Subject: [PATCH 1/3] perf: replace per-module row copying with shared map enrichment Each enrichment module previously created a new Row (copying all column values) for every update. With 6 modules and a 108-column schema, this meant 6 full array copies per input row. The new approach has modules read/write enrichment values via a shared Map, with a single Row materialised at the end of the pipeline (~4.4x speedup). Co-Authored-By: Claude Opus 4.6 --- .../spruce/EnrichmentModule.java | 63 +---- .../spruce/EnrichmentPipeline.java | 18 +- .../spruce/modules/ConstantLoad.java | 4 +- .../spruce/modules/OperationalEmissions.java | 23 +- .../com/digitalpebble/spruce/modules/PUE.java | 20 +- .../spruce/modules/RegionExtraction.java | 10 +- .../spruce/modules/Serverless.java | 37 ++- .../spruce/modules/boavizta/BoaviztAPI.java | 35 ++- .../modules/boavizta/BoaviztAPIstatic.java | 32 ++- .../spruce/modules/ccf/Accelerators.java | 26 +- .../spruce/modules/ccf/Networking.java | 16 +- .../spruce/modules/ccf/Storage.java | 40 +-- .../AverageCarbonIntensity.java | 14 +- .../com/digitalpebble/spruce/ConfigTest.java | 3 +- .../spruce/EnrichmentModuleTest.java | 108 +++++--- .../spruce/EnrichmentStrategyBenchmark.java | 255 ++++++++++++++++++ .../spruce/modules/ConstantLoadTest.java | 14 +- .../modules/OperationalEmissionsTest.java | 39 +-- .../digitalpebble/spruce/modules/PUETest.java | 35 ++- .../spruce/modules/RegionExtractionTest.java | 24 +- .../spruce/modules/ServerlessTest.java | 42 +-- .../modules/boavizta/BoaviztAPITest.java | 82 +++--- .../boavizta/BoaviztAPIstaticTest.java | 38 ++- .../spruce/modules/ccf/AcceleratorsTest.java | 30 ++- .../spruce/modules/ccf/NetworkingTest.java | 22 +- .../spruce/modules/ccf/StorageTest.java | 38 ++- 26 files changed, 691 insertions(+), 377 deletions(-) create mode 100644 src/test/java/com/digitalpebble/spruce/EnrichmentStrategyBenchmark.java diff --git a/src/main/java/com/digitalpebble/spruce/EnrichmentModule.java b/src/main/java/com/digitalpebble/spruce/EnrichmentModule.java index a1f5a4c..90321b5 100644 --- a/src/main/java/com/digitalpebble/spruce/EnrichmentModule.java +++ b/src/main/java/com/digitalpebble/spruce/EnrichmentModule.java @@ -3,7 +3,6 @@ package com.digitalpebble.spruce; import org.apache.spark.sql.Row; -import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; import java.io.Serializable; import java.util.Map; @@ -11,7 +10,10 @@ /** * A module adds new columns to a Dataset and populates them based on its content. * The columns can represent energy or water consumption, carbon intensity, carbon emissions etc... - * The bulk of the work is done in the map function. + * + *

Modules read CUR (input) columns from the original {@link Row} and read/write + * Spruce (enrichment) columns via a shared {@code Map}. + * The pipeline materialises one final Row at the end, avoiding per-module row copies. **/ public interface EnrichmentModule extends Serializable { @@ -25,53 +27,12 @@ default void init(Map params){} /** Returns the columns added by this module **/ Column[] columnsAdded(); - Row process(Row row); - - static Row withUpdatedValue(Row row, Column column, Object newValue) { - Object[] values = new Object[row.size()]; - for (int i = 0; i < row.size(); i++) { - values[i] = row.get(i); - } - int index = column.resolveIndex(row); - values[index] = newValue; - return new GenericRowWithSchema(values, row.schema()); - } - - static Row withUpdatedValue(Row row, Column column, Double newValue, boolean add) { - Object[] values = new Object[row.size()]; - for (int i = 0; i < row.size(); i++) { - values[i] = row.get(i); - } - int index = column.resolveIndex(row); - Object existing = values[index]; - if (add && existing instanceof Double) { - values[index] = newValue + (Double) existing; - } else { - values[index] = newValue; - } - return new GenericRowWithSchema(values, row.schema()); - } - - static Row withUpdatedValues(Row row, Map updates) { - Object[] values = new Object[row.size()]; - for (int i = 0; i < row.size(); i++) { - values[i] = row.get(i); - } - - for (Map.Entry entry : updates.entrySet()) { - Column column = entry.getKey(); - Object newValue = entry.getValue(); - - int index; - try { - index = column.resolveIndex(row); - } catch (IllegalArgumentException e) { - throw new RuntimeException("Field not found in row: " + column.getLabel(), e); - } - - values[index] = newValue; - } - - return new GenericRowWithSchema(values, row.schema()); - } + /** + * Enrich the given row by reading input columns from {@code inputRow} and + * reading/writing enrichment columns via {@code enrichedValues}. + * + * @param inputRow the immutable original row from the dataset + * @param enrichedValues shared map accumulating enrichment values across all modules + */ + void enrich(Row inputRow, Map enrichedValues); } diff --git a/src/main/java/com/digitalpebble/spruce/EnrichmentPipeline.java b/src/main/java/com/digitalpebble/spruce/EnrichmentPipeline.java index 6da9f38..9ac2a1f 100644 --- a/src/main/java/com/digitalpebble/spruce/EnrichmentPipeline.java +++ b/src/main/java/com/digitalpebble/spruce/EnrichmentPipeline.java @@ -4,9 +4,12 @@ import org.apache.spark.api.java.function.MapPartitionsFunction; import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import static com.digitalpebble.spruce.CURColumn.LINE_ITEM_TYPE; @@ -41,10 +44,21 @@ public Row next() { // usage filter - only need to enrich entries that correspond to a usage (no tax, discount or fee) boolean usage = usageFilter(row); if (!usage) return row; + + Map enriched = new HashMap<>(); for (EnrichmentModule module : enrichmentModules) { - row = module.process(row); + module.enrich(row, enriched); + } + + // Materialise the final row — single copy instead of one per module + Object[] values = new Object[row.size()]; + for (int i = 0; i < row.size(); i++) { + values[i] = row.get(i); + } + for (Map.Entry entry : enriched.entrySet()) { + values[entry.getKey().resolveIndex(row)] = entry.getValue(); } - return row; + return new GenericRowWithSchema(values, row.schema()); } }; } diff --git a/src/main/java/com/digitalpebble/spruce/modules/ConstantLoad.java b/src/main/java/com/digitalpebble/spruce/modules/ConstantLoad.java index 398dc57..2b30e7d 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/ConstantLoad.java +++ b/src/main/java/com/digitalpebble/spruce/modules/ConstantLoad.java @@ -33,7 +33,7 @@ public Column[] columnsAdded() { } @Override - public Row process(Row row) { - return EnrichmentModule.withUpdatedValue(row, CPU_LOAD, load_value); + public void enrich(Row inputRow, Map enrichedValues) { + enrichedValues.put(CPU_LOAD, load_value); } } diff --git a/src/main/java/com/digitalpebble/spruce/modules/OperationalEmissions.java b/src/main/java/com/digitalpebble/spruce/modules/OperationalEmissions.java index 804c248..9ebc42c 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/OperationalEmissions.java +++ b/src/main/java/com/digitalpebble/spruce/modules/OperationalEmissions.java @@ -6,6 +6,8 @@ import com.digitalpebble.spruce.EnrichmentModule; import org.apache.spark.sql.Row; +import java.util.Map; + import static com.digitalpebble.spruce.SpruceColumn.*; /** @@ -25,22 +27,21 @@ public Column[] columnsAdded() { } @Override - public Row process(Row row) { - if (ENERGY_USED.isNullAt(row)) { - return row; - } + public void enrich(Row inputRow, Map enrichedValues) { + Object energyObj = enrichedValues.get(ENERGY_USED); + if (energyObj == null) return; - if (CARBON_INTENSITY.isNullAt(row)) { - return row; - } + Object ciObj = enrichedValues.get(CARBON_INTENSITY); + if (ciObj == null) return; - final double energyUsed = ENERGY_USED.getDouble(row); + final double energyUsed = (Double) energyObj; // take into account the PUE if present - final double pue = PUE.isNullAt(row) ? 1.0 : PUE.getDouble(row); - final double carbon_intensity = CARBON_INTENSITY.getDouble(row); + Object pueObj = enrichedValues.get(PUE); + final double pue = pueObj != null ? (Double) pueObj : 1.0; + final double carbon_intensity = (Double) ciObj; final double emissions = energyUsed * carbon_intensity * pue; - return EnrichmentModule.withUpdatedValue(row, OPERATIONAL_EMISSIONS, emissions); + enrichedValues.put(OPERATIONAL_EMISSIONS, emissions); } } diff --git a/src/main/java/com/digitalpebble/spruce/modules/PUE.java b/src/main/java/com/digitalpebble/spruce/modules/PUE.java index 46742bd..5195cdf 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/PUE.java +++ b/src/main/java/com/digitalpebble/spruce/modules/PUE.java @@ -84,22 +84,18 @@ public Column[] columnsAdded() { } @Override - public Row process(Row row) { - if (ENERGY_USED.isNullAt(row)) { - return row; - } + public void enrich(Row inputRow, Map enrichedValues) { + Object energyObj = enrichedValues.get(ENERGY_USED); + if (energyObj == null) return; - double energyUsed = ENERGY_USED.getDouble(row); - if (energyUsed <= 0) return row; + double energyUsed = (Double) energyObj; + if (energyUsed <= 0) return; - String region = null; - if (!REGION.isNullAt(row)) { - region = REGION.getString(row); - } + String region = (String) enrichedValues.get(REGION); double pueToApply = getPueForRegion(region); - return EnrichmentModule.withUpdatedValue(row, SpruceColumn.PUE, pueToApply); + enrichedValues.put(SpruceColumn.PUE, pueToApply); } private double getPueForRegion(String region) { @@ -119,4 +115,4 @@ private double getPueForRegion(String region) { return defaultPueValue; } -} \ No newline at end of file +} diff --git a/src/main/java/com/digitalpebble/spruce/modules/RegionExtraction.java b/src/main/java/com/digitalpebble/spruce/modules/RegionExtraction.java index 73e4e39..9f9e105 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/RegionExtraction.java +++ b/src/main/java/com/digitalpebble/spruce/modules/RegionExtraction.java @@ -6,6 +6,8 @@ import com.digitalpebble.spruce.EnrichmentModule; import org.apache.spark.sql.Row; +import java.util.Map; + import static com.digitalpebble.spruce.CURColumn.*; import static com.digitalpebble.spruce.SpruceColumn.*; @@ -27,16 +29,16 @@ public Column[] columnsAdded() { } @Override - public Row process(Row row) { + public void enrich(Row inputRow, Map enrichedValues) { // get the location // in most cases you have a product_region_code but can be product_to_region_code or product_from_region_code // when the traffic is between two regions or to/from the outside for (Column c : location_columns) { - String locationCode = c.getString(row); + String locationCode = c.getString(inputRow); if (locationCode != null) { - return EnrichmentModule.withUpdatedValue(row, REGION, locationCode); + enrichedValues.put(REGION, locationCode); + return; } } - return row; } } diff --git a/src/main/java/com/digitalpebble/spruce/modules/Serverless.java b/src/main/java/com/digitalpebble/spruce/modules/Serverless.java index f756c35..3322649 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/Serverless.java +++ b/src/main/java/com/digitalpebble/spruce/modules/Serverless.java @@ -4,20 +4,14 @@ import com.digitalpebble.spruce.Column; import com.digitalpebble.spruce.EnrichmentModule; -import com.digitalpebble.spruce.modules.ccf.Storage; import org.apache.spark.sql.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.List; import java.util.Map; import static com.digitalpebble.spruce.CURColumn.*; -import static com.digitalpebble.spruce.CURColumn.PRICING_UNIT; -import static com.digitalpebble.spruce.CURColumn.PRODUCT_SERVICE_CODE; import static com.digitalpebble.spruce.SpruceColumn.*; -import static com.digitalpebble.spruce.Utils.loadJSONResources; /** * Estimates the energy usage for CPU and memory of serverless services @@ -74,48 +68,49 @@ public Column[] columnsAdded() { } @Override - public Row process(Row row) { - String usage_type = LINE_ITEM_USAGE_TYPE.getString(row); + public void enrich(Row inputRow, Map enrichedValues) { + String usage_type = LINE_ITEM_USAGE_TYPE.getString(inputRow); if (usage_type == null) { - return row; + return; } - String operation = LINE_ITEM_OPERATION.getString(row); + String operation = LINE_ITEM_OPERATION.getString(inputRow); if ("FargateTask".equals(operation)) { // memory if (usage_type.endsWith("-GB-Hours")) { - double amount_gb = USAGE_AMOUNT.getDouble(row); + double amount_gb = USAGE_AMOUNT.getDouble(inputRow); double energy = amount_gb * memory_coefficient_kwh; - return EnrichmentModule.withUpdatedValue(row, ENERGY_USED, energy); + enrichedValues.put(ENERGY_USED, energy); + return; } // cpu if (usage_type.endsWith("-vCPU-Hours:perCPU")) { - double amount_vcpu = USAGE_AMOUNT.getDouble(row); + double amount_vcpu = USAGE_AMOUNT.getDouble(inputRow); boolean isARM = usage_type.contains("-ARM-"); double coefficient = isARM ? arm_cpu_coefficient_kwh : x86_cpu_coefficient_kwh; double energy = amount_vcpu * coefficient; - return EnrichmentModule.withUpdatedValue(row, ENERGY_USED, energy); + enrichedValues.put(ENERGY_USED, energy); + return; } } else if (usage_type.contains("EMR-SERVERLESS")) { if (usage_type.endsWith("MemoryGBHours")) { - double amount_gb = USAGE_AMOUNT.getDouble(row); + double amount_gb = USAGE_AMOUNT.getDouble(inputRow); double energy = amount_gb * memory_coefficient_kwh; - return EnrichmentModule.withUpdatedValue(row, ENERGY_USED, energy); + enrichedValues.put(ENERGY_USED, energy); + return; } // cpu if (usage_type.endsWith("-vCPUHours")) { - double amount_vcpu = USAGE_AMOUNT.getDouble(row); + double amount_vcpu = USAGE_AMOUNT.getDouble(inputRow); boolean isARM = usage_type.contains("-ARM-"); double coefficient = isARM ? arm_cpu_coefficient_kwh : x86_cpu_coefficient_kwh; double energy = amount_vcpu * coefficient; - return EnrichmentModule.withUpdatedValue(row, ENERGY_USED, energy); + enrichedValues.put(ENERGY_USED, energy); + return; } } - - - return row; } } diff --git a/src/main/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPI.java b/src/main/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPI.java index 3e73c4c..0744b42 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPI.java +++ b/src/main/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPI.java @@ -11,7 +11,6 @@ import org.jspecify.annotations.Nullable; import java.io.IOException; -import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -57,7 +56,7 @@ public Column[] columnsAdded() { } @Override - public Row process(Row row) { + public void enrich(Row inputRow, Map enrichedValues) { if (cache == null) { cache = Caffeine.newBuilder() @@ -75,17 +74,17 @@ public Row process(Row row) { // TODO handle non-default CPU loads - String instanceType = PRODUCT_INSTANCE_TYPE.getString(row); + String instanceType = PRODUCT_INSTANCE_TYPE.getString(inputRow); if (instanceType == null) { - return row; + return; } - final String service_code = PRODUCT_SERVICE_CODE.getString(row); - final String operation = LINE_ITEM_OPERATION.getString(row); - final String product_code = LINE_ITEM_PRODUCT_CODE.getString(row); + final String service_code = PRODUCT_SERVICE_CODE.getString(inputRow); + final String operation = LINE_ITEM_OPERATION.getString(inputRow); + final String product_code = LINE_ITEM_PRODUCT_CODE.getString(inputRow); if (operation == null || product_code == null) { - return row; + return; } // conditions for EC2 instances @@ -106,12 +105,12 @@ else if (product_code.equals("AmazonES") && operation.equals("ESDomain")) { instanceType = instanceType.substring(3); } } else { - return row; + return; } // don't look for instance types that are known to be unknown if (unknownInstanceTypes.contains(instanceType)) { - return row; + return; } Impacts impacts = cache.getIfPresent(instanceType); @@ -123,19 +122,17 @@ else if (product_code.equals("AmazonES") && operation.equals("ESDomain")) { } catch (InstanceTypeUknown e1) { LOG.info("Unknown instance type {}", instanceType); unknownInstanceTypes.add(instanceType); - return row; + return; } catch (IOException e) { LOG.error("Exception caught when retrieving estimates for {}", instanceType, e); - return row; + return; } } - double amount = USAGE_AMOUNT.getDouble(row); + double amount = USAGE_AMOUNT.getDouble(inputRow); - Map kv = new HashMap<>(); - kv.put(ENERGY_USED, impacts.getFinalEnergyKWh() * amount); - kv.put(EMBODIED_EMISSIONS, impacts.getEmbeddedEmissionsGramsCO2eq() * amount); - kv.put(EMBODIED_ADP, impacts.getAbioticDepletionPotentialGramsSbeq() * amount); - return EnrichmentModule.withUpdatedValues(row, kv); + enrichedValues.put(ENERGY_USED, impacts.getFinalEnergyKWh() * amount); + enrichedValues.put(EMBODIED_EMISSIONS, impacts.getEmbeddedEmissionsGramsCO2eq() * amount); + enrichedValues.put(EMBODIED_ADP, impacts.getAbioticDepletionPotentialGramsSbeq() * amount); } -} \ No newline at end of file +} diff --git a/src/main/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPIstatic.java b/src/main/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPIstatic.java index af0efdc..c45dfe0 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPIstatic.java +++ b/src/main/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPIstatic.java @@ -73,19 +73,19 @@ public Column[] columnsAdded() { } @Override - public Row process(Row row) { + public void enrich(Row inputRow, Map enrichedValues) { - String instanceType = PRODUCT_INSTANCE_TYPE.getString(row); + String instanceType = PRODUCT_INSTANCE_TYPE.getString(inputRow); if (instanceType == null) { - return row; + return; } - final String service_code = PRODUCT_SERVICE_CODE.getString(row); - final String operation = LINE_ITEM_OPERATION.getString(row); - final String product_code = LINE_ITEM_PRODUCT_CODE.getString(row); + final String service_code = PRODUCT_SERVICE_CODE.getString(inputRow); + final String operation = LINE_ITEM_OPERATION.getString(inputRow); + final String product_code = LINE_ITEM_PRODUCT_CODE.getString(inputRow); if (operation == null || product_code == null) { - return row; + return; } // conditions for EC2 instances @@ -106,12 +106,12 @@ else if (product_code.equals("AmazonES") && operation.equals("ESDomain")) { instanceType = instanceType.substring(3); } } else { - return row; + return; } // don't look for instance types that are known to be unknown if (unknownInstanceTypes.contains(instanceType)) { - return row; + return; } final Impacts impacts = impactsMap.get(instanceType); @@ -119,15 +119,13 @@ else if (product_code.equals("AmazonES") && operation.equals("ESDomain")) { if (impacts == null) { LOG.info("Unknown instance type {}", instanceType); unknownInstanceTypes.add(instanceType); - return row; + return; } - double amount = USAGE_AMOUNT.getDouble(row); + double amount = USAGE_AMOUNT.getDouble(inputRow); - Map kv = new HashMap<>(); - kv.put(ENERGY_USED, impacts.getFinalEnergyKWh() * amount); - kv.put(EMBODIED_EMISSIONS, impacts.getEmbeddedEmissionsGramsCO2eq() * amount); - kv.put(EMBODIED_ADP, impacts.getAbioticDepletionPotentialGramsSbeq() * amount); - return EnrichmentModule.withUpdatedValues(row, kv); + enrichedValues.put(ENERGY_USED, impacts.getFinalEnergyKWh() * amount); + enrichedValues.put(EMBODIED_EMISSIONS, impacts.getEmbeddedEmissionsGramsCO2eq() * amount); + enrichedValues.put(EMBODIED_ADP, impacts.getAbioticDepletionPotentialGramsSbeq() * amount); } -} \ No newline at end of file +} diff --git a/src/main/java/com/digitalpebble/spruce/modules/ccf/Accelerators.java b/src/main/java/com/digitalpebble/spruce/modules/ccf/Accelerators.java index acf206e..afef43a 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/ccf/Accelerators.java +++ b/src/main/java/com/digitalpebble/spruce/modules/ccf/Accelerators.java @@ -52,19 +52,19 @@ public Column[] columnsAdded() { } @Override - public Row process(Row row) { + public void enrich(Row inputRow, Map enrichedValues) { // limit to EC2 instances - String instanceType = PRODUCT_INSTANCE_TYPE.getString(row); + String instanceType = PRODUCT_INSTANCE_TYPE.getString(inputRow); if (instanceType == null) { - return row; + return; } - final String operation = LINE_ITEM_OPERATION.getString(row); - final String product_code = LINE_ITEM_PRODUCT_CODE.getString(row); + final String operation = LINE_ITEM_OPERATION.getString(inputRow); + final String product_code = LINE_ITEM_PRODUCT_CODE.getString(inputRow); if (operation == null || product_code == null) { - return row; + return; } // conditions for EC2 instances @@ -72,7 +72,7 @@ public Row process(Row row) { LOG.debug("EC2 instance {}", instanceType); } else { - return row; + return; } // check that they have a GPU @@ -81,11 +81,11 @@ public Row process(Row row) { if (instanceTypeInfo == null) { // check product instance family // if GPU then log if we have no info about it - String fam = PRODUCT_INSTANCE_FAMILY.getString(row, true); + String fam = PRODUCT_INSTANCE_FAMILY.getString(inputRow, true); if ("GPU instance".equals(fam)) { LOG.debug("Lacking info for instance type with GPU {}", instanceType); } - return row; + return; } String gpu = instanceTypeInfo.get("type").toString(); @@ -99,12 +99,16 @@ public Row process(Row row) { // minWatts + (gpu_utilisation_percent / 100) * (maxWatts - minWatts) double energy_used = minWatts + ((double) gpu_utilisation_percent / 100) * (maxWatts - minWatts); - double amount = USAGE_AMOUNT.getDouble(row); + double amount = USAGE_AMOUNT.getDouble(inputRow); // watts to kw energy_used = (amount * energy_used * quantity / 1000); // add it to an existing value or create it - return EnrichmentModule.withUpdatedValue(row, ENERGY_USED, energy_used, true); + Double existing = (Double) enrichedValues.get(ENERGY_USED); + if (existing != null) { + energy_used += existing; + } + enrichedValues.put(ENERGY_USED, energy_used); } } diff --git a/src/main/java/com/digitalpebble/spruce/modules/ccf/Networking.java b/src/main/java/com/digitalpebble/spruce/modules/ccf/Networking.java index 72b3e04..5801d97 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/ccf/Networking.java +++ b/src/main/java/com/digitalpebble/spruce/modules/ccf/Networking.java @@ -47,26 +47,26 @@ public Column[] columnsAdded() { } @Override - public Row process(Row row) { - String service_code = PRODUCT_SERVICE_CODE.getString(row); + public void enrich(Row inputRow, Map enrichedValues) { + String service_code = PRODUCT_SERVICE_CODE.getString(inputRow); if (service_code == null || !service_code.equals("AWSDataTransfer")) { - return row; + return; } // apply only to rows corresponding to networking in or out of a region - int index = PRODUCT.resolveIndex(row); - Map productMap = row.getJavaMap(index); + int index = PRODUCT.resolveIndex(inputRow); + Map productMap = inputRow.getJavaMap(index); String transfer_type = (String) productMap.getOrDefault("transfer_type", ""); if (!transfer_type.startsWith("InterRegion")) { - return row; + return; } // TODO consider extending to AWS Outbound and Inbound // get the amount of data transferred - double amount_gb = USAGE_AMOUNT.getDouble(row); + double amount_gb = USAGE_AMOUNT.getDouble(inputRow); double energy_gb = amount_gb * network_coefficient; - return EnrichmentModule.withUpdatedValue(row, ENERGY_USED, energy_gb); + enrichedValues.put(ENERGY_USED, energy_gb); } } diff --git a/src/main/java/com/digitalpebble/spruce/modules/ccf/Storage.java b/src/main/java/com/digitalpebble/spruce/modules/ccf/Storage.java index 2a9661e..b6a4083 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/ccf/Storage.java +++ b/src/main/java/com/digitalpebble/spruce/modules/ccf/Storage.java @@ -77,31 +77,32 @@ public Column[] columnsAdded() { } @Override - public Row process(Row row) { - final String operation = LINE_ITEM_OPERATION.getString(row); + public void enrich(Row inputRow, Map enrichedValues) { + final String operation = LINE_ITEM_OPERATION.getString(inputRow); if (operation == null) { - return row; + return; } // implement the logic from CCF // first check that the unit corresponds to storage - final String unit = PRICING_UNIT.getString(row); + final String unit = PRICING_UNIT.getString(inputRow); if (unit == null || !units.contains(unit)) { - return row; + return; } - final String usage_type = LINE_ITEM_USAGE_TYPE.getString(row); + final String usage_type = LINE_ITEM_USAGE_TYPE.getString(inputRow); if (usage_type == null) { - return row; + return; } - final String serviceCode = PRODUCT_SERVICE_CODE.getString(row); + final String serviceCode = PRODUCT_SERVICE_CODE.getString(inputRow); int replication = getReplicationFactor(serviceCode, usage_type); // loop on the values from the resources for (String ssd : ssd_usage_types) { if (usage_type.endsWith(ssd)) { - return enrich(row, false, replication); + computeEnergy(inputRow, enrichedValues, false, replication); + return; } } @@ -110,32 +111,31 @@ public Row process(Row row) { if (serviceCode != null && !usage_type.contains("Backup")) { for (String service : ssd_services) { if (serviceCode.endsWith(service)) { - return enrich(row, false, replication); + computeEnergy(inputRow, enrichedValues, false, replication); + return; } } } for (String hdd : hdd_usage_types) { if (usage_type.endsWith(hdd)) { - return enrich(row, true, replication); + computeEnergy(inputRow, enrichedValues, true, replication); + return; } } // Log so that can improve coverage in the longer term - String product_product_family = PRODUCT_PRODUCT_FAMILY.getString(row); + String product_product_family = PRODUCT_PRODUCT_FAMILY.getString(inputRow); if ("Storage".equals(product_product_family)) { log.debug("Storage type not found for {} {}", operation, usage_type); } - - // not been found - return row; } - private Row enrich(Row row, boolean isHDD, int replication) { + private void computeEnergy(Row inputRow, Map enrichedValues, boolean isHDD, int replication) { double coefficient = isHDD ? hdd_gb_coefficient : ssd_gb_coefficient; - double amount = USAGE_AMOUNT.getDouble(row); - String unit = PRICING_UNIT.getString(row); + double amount = USAGE_AMOUNT.getDouble(inputRow); + String unit = PRICING_UNIT.getString(inputRow); // normalisation if (!"GB-Hours".equals(unit)) { // it is in GBMonth @@ -143,7 +143,7 @@ private Row enrich(Row row, boolean isHDD, int replication) { } // to kwh double energy_kwh = amount /1000 * coefficient * replication; - return EnrichmentModule.withUpdatedValue(row, ENERGY_USED, energy_kwh); + enrichedValues.put(ENERGY_USED, energy_kwh); } /** @@ -211,4 +211,4 @@ private static boolean containsAny(String usageType, String... patterns) { return false; } -} \ No newline at end of file +} diff --git a/src/main/java/com/digitalpebble/spruce/modules/electricitymaps/AverageCarbonIntensity.java b/src/main/java/com/digitalpebble/spruce/modules/electricitymaps/AverageCarbonIntensity.java index 8c7e4dc..c8375aa 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/electricitymaps/AverageCarbonIntensity.java +++ b/src/main/java/com/digitalpebble/spruce/modules/electricitymaps/AverageCarbonIntensity.java @@ -77,23 +77,23 @@ public Column[] columnsAdded() { } @Override - public Row process(Row row) { - if (ENERGY_USED.isNullAt(row)) { - return row; + public void enrich(Row inputRow, Map enrichedValues) { + if (!enrichedValues.containsKey(ENERGY_USED)) { + return; } - String locationCode = REGION.getString(row); + String locationCode = (String) enrichedValues.get(REGION); // no location found - skip if (locationCode == null) { - return row; + return; } // get intensity for the location Double coeff = getAverageIntensity(Provider.AWS, locationCode); if (coeff == null) { // if the coefficient is 0 it means that the region is not supported - return row; + return; } - return EnrichmentModule.withUpdatedValue(row, CARBON_INTENSITY, coeff); + enrichedValues.put(CARBON_INTENSITY, coeff); } } diff --git a/src/test/java/com/digitalpebble/spruce/ConfigTest.java b/src/test/java/com/digitalpebble/spruce/ConfigTest.java index 42d6f16..4ef70b3 100644 --- a/src/test/java/com/digitalpebble/spruce/ConfigTest.java +++ b/src/test/java/com/digitalpebble/spruce/ConfigTest.java @@ -37,8 +37,7 @@ public Column[] columnsAdded() { } @Override - public Row process(Row row) { - return row; + public void enrich(Row inputRow, Map enrichedValues) { } } diff --git a/src/test/java/com/digitalpebble/spruce/EnrichmentModuleTest.java b/src/test/java/com/digitalpebble/spruce/EnrichmentModuleTest.java index d02b49c..33d5d23 100644 --- a/src/test/java/com/digitalpebble/spruce/EnrichmentModuleTest.java +++ b/src/test/java/com/digitalpebble/spruce/EnrichmentModuleTest.java @@ -8,62 +8,94 @@ import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.HashMap; +import java.util.Map; + import static com.digitalpebble.spruce.SpruceColumn.ENERGY_USED; import static org.junit.jupiter.api.Assertions.*; +/** + * Tests that modules correctly write to and read from the enrichedValues map. + */ class EnrichmentModuleTest { - private Row baseRow; - - @BeforeEach - void setUp() { - StructField[] fields = new StructField[]{ - DataTypes.createStructField("id", DataTypes.IntegerType, false), - DataTypes.createStructField(ENERGY_USED.getLabel(), DataTypes.DoubleType, true), - DataTypes.createStructField("note", DataTypes.StringType, true) + @Test + void enrichWritesToMap() { + // A minimal module that writes a value + EnrichmentModule module = new EnrichmentModule() { + @Override + public Column[] columnsNeeded() { return new Column[0]; } + @Override + public Column[] columnsAdded() { return new Column[]{ENERGY_USED}; } + @Override + public void enrich(Row inputRow, Map enrichedValues) { + enrichedValues.put(ENERGY_USED, 42.0); + } }; - StructType schema = new StructType(fields); - baseRow = new GenericRowWithSchema(new Object[]{1, 10.0, "initial"}, schema); - } - @Test - void withUpdatedValue() { - // replace value - Row replaced = EnrichmentModule.withUpdatedValue(baseRow, ENERGY_USED, 5.0, false); - assertEquals(5.0, replaced.getDouble(replaced.fieldIndex(ENERGY_USED.getLabel())), 0.0001); - - // add to existing value - Row added = EnrichmentModule.withUpdatedValue(baseRow, ENERGY_USED, 2.5, true); - assertEquals(12.5, added.getDouble(added.fieldIndex(ENERGY_USED.getLabel())), 0.0001); + StructType schema = Utils.getSchema(module); + Row row = new GenericRowWithSchema(new Object[]{null}, schema); + Map enriched = new HashMap<>(); + module.enrich(row, enriched); + + assertEquals(42.0, enriched.get(ENERGY_USED)); } @Test - void testWithUpdatedValue() { - // using the simple setter that infers replacement (non-Double add overload) - Row updated = EnrichmentModule.withUpdatedValue(baseRow, ENERGY_USED, 42.0); - assertEquals(42.0, updated.getDouble(updated.fieldIndex(ENERGY_USED.getLabel())), 0.0001); + void enrichAddsToExistingValue() { + // Simulates the Accelerators add-to-existing pattern + EnrichmentModule module = new EnrichmentModule() { + @Override + public Column[] columnsNeeded() { return new Column[0]; } + @Override + public Column[] columnsAdded() { return new Column[]{ENERGY_USED}; } + @Override + public void enrich(Row inputRow, Map enrichedValues) { + Double existing = (Double) enrichedValues.get(ENERGY_USED); + double newVal = 2.5; + enrichedValues.put(ENERGY_USED, (existing != null ? existing : 0.0) + newVal); + } + }; - // update non-numeric field (should simply set new value) - Column noteCol = new ProxyColumn("note", DataTypes.StringType); + StructType schema = Utils.getSchema(module); + Row row = new GenericRowWithSchema(new Object[]{null}, schema); + + Map enriched = new HashMap<>(); + enriched.put(ENERGY_USED, 10.0); + module.enrich(row, enriched); - Row noteUpdated = EnrichmentModule.withUpdatedValue(baseRow, noteCol, "updated"); - assertEquals("updated", noteUpdated.getString(noteUpdated.fieldIndex("note"))); + assertEquals(12.5, enriched.get(ENERGY_USED)); } @Test - void withUpdatedValues() { - java.util.Map updates = new java.util.HashMap<>(); - updates.put(ENERGY_USED, 7.5); - updates.put(new ProxyColumn("note", DataTypes.StringType), "changed"); + void enrichReadsFromMap() { + // Simulates a downstream module reading a value set by an earlier module + Column noteCol = new ProxyColumn("note", DataTypes.StringType); + + EnrichmentModule module = new EnrichmentModule() { + @Override + public Column[] columnsNeeded() { return new Column[]{ENERGY_USED}; } + @Override + public Column[] columnsAdded() { return new Column[]{noteCol}; } + @Override + public void enrich(Row inputRow, Map enrichedValues) { + Object val = enrichedValues.get(ENERGY_USED); + if (val != null) { + enrichedValues.put(noteCol, "energy=" + val); + } + } + }; + + StructType schema = Utils.getSchema(module); + Row row = new GenericRowWithSchema(new Object[]{null, null}, schema); - Row updated = EnrichmentModule.withUpdatedValues(baseRow, updates); + Map enriched = new HashMap<>(); + enriched.put(ENERGY_USED, 7.5); + module.enrich(row, enriched); - assertEquals(1, updated.getInt(updated.fieldIndex("id"))); - assertEquals(7.5, updated.getDouble(updated.fieldIndex(ENERGY_USED.getLabel())), 0.0001); - assertEquals("changed", updated.getString(updated.fieldIndex("note"))); + assertEquals("energy=7.5", enriched.get(noteCol)); } static class ProxyColumn extends Column { @@ -71,4 +103,4 @@ static class ProxyColumn extends Column { super(l, t); } } -} \ No newline at end of file +} diff --git a/src/test/java/com/digitalpebble/spruce/EnrichmentStrategyBenchmark.java b/src/test/java/com/digitalpebble/spruce/EnrichmentStrategyBenchmark.java new file mode 100644 index 0000000..07ee422 --- /dev/null +++ b/src/test/java/com/digitalpebble/spruce/EnrichmentStrategyBenchmark.java @@ -0,0 +1,255 @@ +// SPDX-License-Identifier: Apache-2.0 + +package com.digitalpebble.spruce; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static com.digitalpebble.spruce.SpruceColumn.*; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Benchmarks the current row-copy-per-module enrichment approach against a + * map-based approach where modules accumulate updates in a {@code Map} + * and a single Row is materialised at the end of the pipeline. + * + *

Current approach

+ * Every call to {@link EnrichmentModule#withUpdatedValue} allocates a new + * {@code Object[]}, copies all column values from the previous row, + * sets the target value, and wraps it in a new {@link GenericRowWithSchema}. + * With 6 modules and a 108-column schema this means 6 full array copies and + * 6 Row allocations per input row. + * + *

Map approach

+ * Modules read CUR (input) columns from the immutable original Row and + * read/write Spruce (enrichment) columns via a shared {@code Map}. + * One {@link GenericRowWithSchema} is created at the very end — reducing + * N array copies to 1. + * + *

The simulated pipeline mirrors the real module chain: + *

    + *
  1. RegionExtraction → writes REGION
  2. + *
  3. BoaviztAPIstatic → writes ENERGY_USED, EMBODIED_EMISSIONS, EMBODIED_ADP
  4. + *
  5. Accelerators → adds to ENERGY_USED
  6. + *
  7. PUE → reads ENERGY_USED, writes PUE
  8. + *
  9. CarbonIntensity → reads REGION, writes CARBON_INTENSITY
  10. + *
  11. OperationalEmissions → reads ENERGY_USED, PUE, CARBON_INTENSITY → writes OPERATIONAL_EMISSIONS
  12. + *
+ */ +public class EnrichmentStrategyBenchmark { + + /** CUR input columns — realistic width for an AWS CUR report. */ + private static final int CUR_WIDTH = 100; + + private static final Column[] SPRUCE_COLUMNS = { + REGION, ENERGY_USED, EMBODIED_EMISSIONS, EMBODIED_ADP, + PUE, CARBON_INTENSITY, OPERATIONAL_EMISSIONS, CPU_LOAD + }; + + private static StructType schema; + private static Row templateRow; + private static int totalWidth; + + @BeforeAll + static void setup() { + StructField[] fields = new StructField[CUR_WIDTH + SPRUCE_COLUMNS.length]; + for (int i = 0; i < CUR_WIDTH; i++) { + fields[i] = DataTypes.createStructField("cur_col_" + i, DataTypes.StringType, true); + } + for (int i = 0; i < SPRUCE_COLUMNS.length; i++) { + fields[CUR_WIDTH + i] = DataTypes.createStructField( + SPRUCE_COLUMNS[i].getLabel(), SPRUCE_COLUMNS[i].getType(), true); + } + schema = new StructType(fields); + totalWidth = schema.length(); + + Object[] values = new Object[totalWidth]; + for (int i = 0; i < CUR_WIDTH; i++) { + values[i] = "cur_value_" + i; + } + // Spruce columns start as null — same as the real pipeline + templateRow = new GenericRowWithSchema(values, schema); + } + + // ================================================================== + // CURRENT APPROACH — one Row copy per module update + // ================================================================== + + /** Mirrors the old per-module row-copy approach. */ + private static Row copyAndSet(Row row, Column column, Object newValue) { + Object[] values = new Object[row.size()]; + for (int i = 0; i < row.size(); i++) { + values[i] = row.get(i); + } + values[column.resolveIndex(row)] = newValue; + return new GenericRowWithSchema(values, row.schema()); + } + + /** Mirrors the old bulk row-copy approach. */ + private static Row copyAndSetBulk(Row row, Map updates) { + Object[] values = new Object[row.size()]; + for (int i = 0; i < row.size(); i++) { + values[i] = row.get(i); + } + for (Map.Entry e : updates.entrySet()) { + values[e.getKey().resolveIndex(row)] = e.getValue(); + } + return new GenericRowWithSchema(values, row.schema()); + } + + /** + * 6-module pipeline using the current row-copy strategy. + * Each module produces a new Row; subsequent modules read from it. + */ + static Row currentApproach(Row row) { + // 1. RegionExtraction → REGION + row = copyAndSet(row, REGION, "us-east-1"); + + // 2. BoaviztAPIstatic → ENERGY_USED, EMBODIED_EMISSIONS, EMBODIED_ADP + Map boavizta = new HashMap<>(4); + boavizta.put(ENERGY_USED, 0.042); + boavizta.put(EMBODIED_EMISSIONS, 0.15); + boavizta.put(EMBODIED_ADP, 0.003); + row = copyAndSetBulk(row, boavizta); + + // 3. Accelerators → adds to ENERGY_USED (withUpdatedValue with add=true) + double existing = row.getDouble(ENERGY_USED.resolveIndex(row)); + row = copyAndSet(row, ENERGY_USED, existing + 0.008); + + // 4. PUE → reads ENERGY_USED, writes PUE + @SuppressWarnings("unused") + double energy4 = row.getDouble(ENERGY_USED.resolveIndex(row)); + row = copyAndSet(row, PUE, 1.15); + + // 5. AverageCarbonIntensity → reads REGION, writes CARBON_INTENSITY + @SuppressWarnings("unused") + String region = row.getString(REGION.resolveIndex(row)); + row = copyAndSet(row, CARBON_INTENSITY, 450.0); + + // 6. OperationalEmissions → reads ENERGY_USED, PUE, CARBON_INTENSITY → OPERATIONAL_EMISSIONS + double energy = row.getDouble(ENERGY_USED.resolveIndex(row)); + double pue = row.getDouble(PUE.resolveIndex(row)); + double ci = row.getDouble(CARBON_INTENSITY.resolveIndex(row)); + row = copyAndSet(row, OPERATIONAL_EMISSIONS, energy * pue * ci); + + return row; + } + + // ================================================================== + // MAP APPROACH — modules enrich a shared Map, one Row at the end + // ================================================================== + + /** + * Same 6-module pipeline but enrichment values accumulate in a Map. + * CUR columns are read from the immutable original row. + * Spruce columns written by earlier modules are read from the map. + * A single Row is materialised at the very end. + */ + static Row mapApproach(Row originalRow) { + Map enriched = new HashMap<>(); + + // 1. RegionExtraction + enriched.put(REGION, "us-east-1"); + + // 2. BoaviztAPIstatic + enriched.put(ENERGY_USED, 0.042); + enriched.put(EMBODIED_EMISSIONS, 0.15); + enriched.put(EMBODIED_ADP, 0.003); + + // 3. Accelerators — adds to ENERGY_USED (reads previous value from map) + Double existingEnergy = (Double) enriched.get(ENERGY_USED); + enriched.put(ENERGY_USED, (existingEnergy != null ? existingEnergy : 0.0) + 0.008); + + // 4. PUE — reads ENERGY_USED from map + @SuppressWarnings("unused") + double energy4 = (Double) enriched.get(ENERGY_USED); + enriched.put(PUE, 1.15); + + // 5. AverageCarbonIntensity — reads REGION from map + @SuppressWarnings("unused") + String region = (String) enriched.get(REGION); + enriched.put(CARBON_INTENSITY, 450.0); + + // 6. OperationalEmissions — reads from map + double energy = (Double) enriched.get(ENERGY_USED); + double pue = (Double) enriched.get(PUE); + double ci = (Double) enriched.get(CARBON_INTENSITY); + enriched.put(OPERATIONAL_EMISSIONS, energy * pue * ci); + + // --- Single row materialisation --- + Object[] values = new Object[originalRow.size()]; + for (int i = 0; i < originalRow.size(); i++) { + values[i] = originalRow.get(i); + } + for (Map.Entry e : enriched.entrySet()) { + values[e.getKey().resolveIndex(originalRow)] = e.getValue(); + } + return new GenericRowWithSchema(values, originalRow.schema()); + } + + // ================================================================== + // Benchmark + // ================================================================== + + @Test + void benchmarkMapVsRowCopy() { + final int warmup = 100_000; + final int iterations = 1_000_000; + + // ---- Warmup: let HotSpot JIT compile both paths ---- + for (int i = 0; i < warmup; i++) { + currentApproach(templateRow); + mapApproach(templateRow); + } + + // ---- Measure current approach ---- + long t0 = System.nanoTime(); + Row resultCurrent = null; + for (int i = 0; i < iterations; i++) { + resultCurrent = currentApproach(templateRow); + } + long currentNs = System.nanoTime() - t0; + + // ---- Measure map approach ---- + t0 = System.nanoTime(); + Row resultMap = null; + for (int i = 0; i < iterations; i++) { + resultMap = mapApproach(templateRow); + } + long mapNs = System.nanoTime() - t0; + + // ---- Verify both approaches produce identical results ---- + for (int i = 0; i < totalWidth; i++) { + Object a = resultCurrent.get(i); + Object b = resultMap.get(i); + if (a instanceof Double da && b instanceof Double db) { + assertEquals(da, db, 1e-12, "Mismatch at column " + i); + } else { + assertEquals(a, b, "Mismatch at column " + i); + } + } + + // ---- Report ---- + long currentMs = currentNs / 1_000_000; + long mapMs = mapNs / 1_000_000; + double speedup = (double) currentNs / mapNs; + + System.out.println(); + System.out.println("=== Enrichment Strategy Benchmark ==="); + System.out.printf("Schema width: %d columns (%d CUR + %d Spruce)%n", + totalWidth, CUR_WIDTH, SPRUCE_COLUMNS.length); + System.out.printf("Pipeline: 6 modules (mirrors real chain)%n"); + System.out.printf("Rows processed: %,d%n%n", iterations); + System.out.printf("Current (row-copy per module): %,d ms%n", currentMs); + System.out.printf("Map (single row creation): %,d ms%n", mapMs); + System.out.printf("Speedup: %.2fx%n", speedup); + } +} diff --git a/src/test/java/com/digitalpebble/spruce/modules/ConstantLoadTest.java b/src/test/java/com/digitalpebble/spruce/modules/ConstantLoadTest.java index 6a58fed..9dc1781 100644 --- a/src/test/java/com/digitalpebble/spruce/modules/ConstantLoadTest.java +++ b/src/test/java/com/digitalpebble/spruce/modules/ConstantLoadTest.java @@ -2,12 +2,17 @@ package com.digitalpebble.spruce.modules; +import com.digitalpebble.spruce.Column; +import com.digitalpebble.spruce.SpruceColumn; import com.digitalpebble.spruce.Utils; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.Test; +import java.util.HashMap; +import java.util.Map; + import static org.junit.jupiter.api.Assertions.*; class ConstantLoadTest { @@ -16,10 +21,11 @@ class ConstantLoadTest { private final StructType schema = Utils.getSchema(load); @Test - void process() { + void enrich() { Object[] values = new Object[] {null}; Row row = new GenericRowWithSchema(values, schema); - Row enriched = load.process(row); - assertEquals(50d, enriched.getDouble(0)); + Map enriched = new HashMap<>(); + load.enrich(row, enriched); + assertEquals(50d, enriched.get(SpruceColumn.CPU_LOAD)); } -} \ No newline at end of file +} diff --git a/src/test/java/com/digitalpebble/spruce/modules/OperationalEmissionsTest.java b/src/test/java/com/digitalpebble/spruce/modules/OperationalEmissionsTest.java index 85a8a63..bf6cbca 100644 --- a/src/test/java/com/digitalpebble/spruce/modules/OperationalEmissionsTest.java +++ b/src/test/java/com/digitalpebble/spruce/modules/OperationalEmissionsTest.java @@ -2,12 +2,16 @@ package com.digitalpebble.spruce.modules; +import com.digitalpebble.spruce.Column; import com.digitalpebble.spruce.Utils; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.Test; +import java.util.HashMap; +import java.util.Map; + import static com.digitalpebble.spruce.SpruceColumn.*; import static org.junit.jupiter.api.Assertions.*; @@ -20,31 +24,34 @@ public class OperationalEmissionsTest { @Test void processNoValues() { - Object[] values = new Object[] {null, null, null}; + Object[] values = new Object[] {null, null, null, null}; Row row = new GenericRowWithSchema(values, schema); - Row enriched = module.process(row); - // missing values comes back as it was - assertEquals(row, enriched); + Map enriched = new HashMap<>(); + module.enrich(row, enriched); + // missing values - map should not contain OPERATIONAL_EMISSIONS + assertFalse(enriched.containsKey(OPERATIONAL_EMISSIONS)); } @Test void processNoPUE() { - Object[] values = new Object[] {10d, 321.04d, null, null}; - Row row = new GenericRowWithSchema(values, schema); - Row enriched = module.process(row); + Row row = new GenericRowWithSchema(new Object[]{null, null, null, null}, schema); + Map enriched = new HashMap<>(); + enriched.put(ENERGY_USED, 10d); + enriched.put(CARBON_INTENSITY, 321.04d); + module.enrich(row, enriched); double expected = 10 * 321.04; - double result = OPERATIONAL_EMISSIONS.getDouble(enriched); - assertEquals(expected, result); + assertEquals(expected, (Double) enriched.get(OPERATIONAL_EMISSIONS)); } @Test void processWithPUE() { - Object[] values = new Object[] {10d, 321.04d, 1.15, null}; - Row row = new GenericRowWithSchema(values, schema); - Row enriched = module.process(row); + Row row = new GenericRowWithSchema(new Object[]{null, null, null, null}, schema); + Map enriched = new HashMap<>(); + enriched.put(ENERGY_USED, 10d); + enriched.put(CARBON_INTENSITY, 321.04d); + enriched.put(PUE, 1.15); + module.enrich(row, enriched); double expected = 10 * 321.04 * 1.15; - double result = OPERATIONAL_EMISSIONS.getDouble(enriched); - assertEquals(expected, result); + assertEquals(expected, (Double) enriched.get(OPERATIONAL_EMISSIONS)); } - -} \ No newline at end of file +} diff --git a/src/test/java/com/digitalpebble/spruce/modules/PUETest.java b/src/test/java/com/digitalpebble/spruce/modules/PUETest.java index bd9b208..4e84a8f 100644 --- a/src/test/java/com/digitalpebble/spruce/modules/PUETest.java +++ b/src/test/java/com/digitalpebble/spruce/modules/PUETest.java @@ -2,6 +2,7 @@ package com.digitalpebble.spruce.modules; +import com.digitalpebble.spruce.Column; import com.digitalpebble.spruce.SpruceColumn; import com.digitalpebble.spruce.Utils; import org.apache.spark.sql.Row; @@ -15,7 +16,9 @@ import java.util.HashMap; import java.util.Map; +import static com.digitalpebble.spruce.SpruceColumn.*; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; class PUETest { @@ -31,10 +34,10 @@ void setUp() { @Test void processNoValues() { - Object[] values = new Object[] {null, null, null}; - Row row = new GenericRowWithSchema(values, schema); - Row enriched = pue.process(row); - assertEquals(row, enriched); + Row row = new GenericRowWithSchema(new Object[]{null, null, null}, schema); + Map enriched = new HashMap<>(); + pue.enrich(row, enriched); + assertFalse(enriched.containsKey(SpruceColumn.PUE)); } @Test @@ -44,11 +47,13 @@ void processCustomConfiguration() { config.put("default", 2.5); customPue.init(config); - Object[] values = new Object[] {10d, "Mars-Region", null}; - Row row = new GenericRowWithSchema(values, schema); - Row enriched = customPue.process(row); + Row row = new GenericRowWithSchema(new Object[]{null, null, null}, schema); + Map enriched = new HashMap<>(); + enriched.put(ENERGY_USED, 10d); + enriched.put(REGION, "Mars-Region"); + customPue.enrich(row, enriched); - assertEquals(2.5, SpruceColumn.PUE.getDouble(enriched), 0.001); + assertEquals(2.5, (Double) enriched.get(SpruceColumn.PUE), 0.001); } @ParameterizedTest @@ -61,10 +66,14 @@ void processCustomConfiguration() { "100, eu-central-2, 1.11" // Regex match (eu-.+) }) void processRegionPUEValues(double energyUsed, String region, double expectedPUE) { - Object[] values = new Object[] {energyUsed, region, null}; - Row row = new GenericRowWithSchema(values, schema); - Row enriched = pue.process(row); + Row row = new GenericRowWithSchema(new Object[]{null, null, null}, schema); + Map enriched = new HashMap<>(); + enriched.put(ENERGY_USED, energyUsed); + if (region != null) { + enriched.put(REGION, region); + } + pue.enrich(row, enriched); - assertEquals(expectedPUE, SpruceColumn.PUE.getDouble(enriched), 0.001, "Failed for region: " + region); + assertEquals(expectedPUE, (Double) enriched.get(SpruceColumn.PUE), 0.001, "Failed for region: " + region); } -} \ No newline at end of file +} diff --git a/src/test/java/com/digitalpebble/spruce/modules/RegionExtractionTest.java b/src/test/java/com/digitalpebble/spruce/modules/RegionExtractionTest.java index de77ae7..a9b04e0 100644 --- a/src/test/java/com/digitalpebble/spruce/modules/RegionExtractionTest.java +++ b/src/test/java/com/digitalpebble/spruce/modules/RegionExtractionTest.java @@ -2,12 +2,16 @@ package com.digitalpebble.spruce.modules; +import com.digitalpebble.spruce.Column; import com.digitalpebble.spruce.Utils; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.Test; +import java.util.HashMap; +import java.util.Map; + import static com.digitalpebble.spruce.SpruceColumn.REGION; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -23,32 +27,36 @@ public class RegionExtractionTest{ @Test void processEmpty() { Row row = generateRow(null, null, null); - Row enriched = region.process(row); - assertNull(REGION.getString(enriched)); + Map enriched = new HashMap<>(); + region.enrich(row, enriched); + assertNull(enriched.get(REGION)); } @Test void process() { String reg = "us-east-1"; Row row = generateRow(reg, null, null); - Row enriched = region.process(row); - assertEquals(reg, REGION.getString(enriched)); + Map enriched = new HashMap<>(); + region.enrich(row, enriched); + assertEquals(reg, enriched.get(REGION)); } @Test void process2() { String reg = "us-east-1"; Row row = generateRow(reg, "ignore_me", null); - Row enriched = region.process(row); - assertEquals(reg, REGION.getString(enriched)); + Map enriched = new HashMap<>(); + region.enrich(row, enriched); + assertEquals(reg, enriched.get(REGION)); } @Test void process3() { String reg = "us-east-1"; Row row = generateRow(null, reg, null); - Row enriched = region.process(row); - assertEquals(reg, REGION.getString(enriched)); + Map enriched = new HashMap<>(); + region.enrich(row, enriched); + assertEquals(reg, enriched.get(REGION)); } private Row generateRow(String PRODUCT_REGION_CODE, String PRODUCT_FROM_REGION_CODE, String PRODUCT_TO_REGION_CODE){ diff --git a/src/test/java/com/digitalpebble/spruce/modules/ServerlessTest.java b/src/test/java/com/digitalpebble/spruce/modules/ServerlessTest.java index eb292fa..3309dc3 100644 --- a/src/test/java/com/digitalpebble/spruce/modules/ServerlessTest.java +++ b/src/test/java/com/digitalpebble/spruce/modules/ServerlessTest.java @@ -2,14 +2,19 @@ package com.digitalpebble.spruce.modules; +import com.digitalpebble.spruce.Column; import com.digitalpebble.spruce.Utils; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.Test; +import java.util.HashMap; +import java.util.Map; + import static com.digitalpebble.spruce.SpruceColumn.ENERGY_USED; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; class ServerlessTest { @@ -19,66 +24,73 @@ class ServerlessTest { @Test void processEmptyRow() { Row row = generateRow(null, null, null); - Row enriched = serverless.process(row); - assertEquals(row, enriched); + Map enriched = new HashMap<>(); + serverless.enrich(row, enriched); + assertFalse(enriched.containsKey(ENERGY_USED)); } @Test void processFargateMemory() { double quantity = 10d; Row row = generateRow("FargateTask", quantity, "xx-Fargate-GB-Hours"); - Row enriched = serverless.process(row); + Map enriched = new HashMap<>(); + serverless.enrich(row, enriched); double expected = serverless.memory_coefficient_kwh * quantity; - assertEquals(expected, ENERGY_USED.getDouble(enriched)); + assertEquals(expected, enriched.get(ENERGY_USED)); } @Test void processFargatevCPU() { double quantity = 4d; Row row = generateRow("FargateTask", quantity, "xx-Fargate-vCPU-Hours:perCPU"); - Row enriched = serverless.process(row); + Map enriched = new HashMap<>(); + serverless.enrich(row, enriched); double expected = serverless.x86_cpu_coefficient_kwh * quantity; - assertEquals(expected, ENERGY_USED.getDouble(enriched)); + assertEquals(expected, enriched.get(ENERGY_USED)); } @Test void processFargatevCPUARM() { double quantity = 4d; Row row = generateRow("FargateTask", quantity, "xx-Fargate-ARM-vCPU-Hours:perCPU"); - Row enriched = serverless.process(row); + Map enriched = new HashMap<>(); + serverless.enrich(row, enriched); double expected = serverless.arm_cpu_coefficient_kwh * quantity; - assertEquals(expected, ENERGY_USED.getDouble(enriched)); + assertEquals(expected, enriched.get(ENERGY_USED)); } @Test void processEMRvCPUARM() { double quantity = 4d; Row row = generateRow("Worker", quantity, "xxx-EMR-SERVERLESS-ARM-vCPUHours"); - Row enriched = serverless.process(row); + Map enriched = new HashMap<>(); + serverless.enrich(row, enriched); double expected = serverless.arm_cpu_coefficient_kwh * quantity; - assertEquals(expected, ENERGY_USED.getDouble(enriched)); + assertEquals(expected, enriched.get(ENERGY_USED)); } @Test void processEMRvCPU() { double quantity = 4d; Row row = generateRow("Worker", quantity, "xx-EMR-SERVERLESS-vCPUHours"); - Row enriched = serverless.process(row); + Map enriched = new HashMap<>(); + serverless.enrich(row, enriched); double expected = serverless.x86_cpu_coefficient_kwh * quantity; - assertEquals(expected, ENERGY_USED.getDouble(enriched)); + assertEquals(expected, enriched.get(ENERGY_USED)); } @Test void processEMRMemory() { double quantity = 10d; Row row = generateRow("Worker", quantity, "EUN1-EMR-SERVERLESS-ARM-MemoryGBHours"); - Row enriched = serverless.process(row); + Map enriched = new HashMap<>(); + serverless.enrich(row, enriched); double expected = serverless.memory_coefficient_kwh * quantity; - assertEquals(expected, ENERGY_USED.getDouble(enriched)); + assertEquals(expected, enriched.get(ENERGY_USED)); } private Row generateRow(String LINE_ITEM_OPERATION, Object USAGE_AMOUNT, String LINE_ITEM_USAGE_TYPE){ Object[] values = new Object[] {LINE_ITEM_OPERATION, USAGE_AMOUNT, LINE_ITEM_USAGE_TYPE, null}; return new GenericRowWithSchema(values, schema); } -} \ No newline at end of file +} diff --git a/src/test/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPITest.java b/src/test/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPITest.java index f666bc7..b16f137 100644 --- a/src/test/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPITest.java +++ b/src/test/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPITest.java @@ -85,10 +85,11 @@ void testInitWithCustomAddress() { void testProcessWithNullValues(String instanceType, String serviceCode, String operation, String productCode) { Object[] values = new Object[]{instanceType, serviceCode, operation, productCode, null}; Row row = new GenericRowWithSchema(values, schema); - Row enriched = api.process(row); + Map enriched = new HashMap<>(); + api.enrich(row, enriched); - // Should return the original row unchanged - assertEquals(row, enriched); + // Should not add any enrichment + assertTrue(enriched.isEmpty()); } static Stream nullValueTestCases() { @@ -106,24 +107,18 @@ static Stream nullValueTestCases() { void testProcessWithEmptyValues(String instanceType, String serviceCode, String operation, String productCode) { Object[] values = new Object[]{instanceType, serviceCode, operation, productCode, null}; Row row = new GenericRowWithSchema(values, schema); + Map enriched = new HashMap<>(); - // Test cases 1 and 2 (null and empty instance types) should throw IllegalArgumentException - // Test cases 3 and 4 (empty strings for all fields) should return unchanged rows - // because BoaviztAPI.process() has early returns before calling BoaviztAPIClient.getEnergyEstimates() - if (instanceType == null || (instanceType != null && instanceType.trim().isEmpty())) { - // These should throw IllegalArgumentException if they reach the API call + if (instanceType == null || instanceType.trim().isEmpty()) { try { - Row enriched = api.process(row); - // If no exception was thrown, the row should be unchanged - assertEquals(row, enriched, "Should return unchanged row for null/empty instance types that don't reach API call"); + api.enrich(row, enriched); + assertTrue(enriched.isEmpty(), "Should not enrich for null/empty instance types"); } catch (IllegalArgumentException e) { - // This is also valid - the validation caught it assertTrue(e.getMessage().contains("Instance type cannot be null, empty, or whitespace only")); } } else { - // Other test cases should return unchanged rows - Row enriched = api.process(row); - assertEquals(row, enriched, "Should return unchanged row for other empty value cases"); + api.enrich(row, enriched); + assertTrue(enriched.isEmpty(), "Should not enrich for other empty value cases"); } } @@ -141,10 +136,11 @@ static Stream emptyValueTestCases() { void testProcessWithUnsupportedValues(String instanceType, String serviceCode, String operation, String productCode) { Object[] values = new Object[]{instanceType, serviceCode, operation, productCode, null}; Row row = new GenericRowWithSchema(values, schema); - Row enriched = api.process(row); + Map enriched = new HashMap<>(); + api.enrich(row, enriched); - // Should return the original row unchanged for unsupported services/operations - assertEquals(row, enriched); + // Should not add any enrichment for unsupported services/operations + assertTrue(enriched.isEmpty()); } static Stream unsupportedValueTestCases() { @@ -161,10 +157,11 @@ static Stream unsupportedValueTestCases() { void testProcessWithEdgeCases(String instanceType, String serviceCode, String operation, String productCode) { Object[] values = new Object[]{instanceType, serviceCode, operation, productCode, null}; Row row = new GenericRowWithSchema(values, schema); - Row enriched = api.process(row); + Map enriched = new HashMap<>(); + api.enrich(row, enriched); - // Should return the original row unchanged for edge cases - assertEquals(row, enriched); + // Should not add any enrichment for edge cases + assertTrue(enriched.isEmpty()); } static Stream edgeCaseTestCases() { @@ -198,10 +195,12 @@ void tearDown() throws IOException { mockWebServer.shutdown(); } - private Row enrich(String productInstanceType, String productServiceCode,String lineItemOperation, String lineItemProductCode, double usageAmount){ + private Map callEnrich(String productInstanceType, String productServiceCode, String lineItemOperation, String lineItemProductCode, double usageAmount){ Object[] values = new Object[]{productInstanceType, productServiceCode, lineItemOperation, lineItemProductCode, usageAmount, null, null, null}; Row row = new GenericRowWithSchema(values, schema); - return api.process(row); + Map enriched = new HashMap<>(); + api.enrich(row, enriched); + return enriched; } @Test @@ -213,11 +212,10 @@ void testProcessEC2InstanceWithValidData() throws IOException { .setResponseCode(200) .addHeader("Content-Type", "application/json")); - Row enriched = enrich("t3.micro", "AmazonEC2", "RunInstances", "AmazonEC2", 10); + Map enriched = callEnrich("t3.micro", "AmazonEC2", "RunInstances", "AmazonEC2", 10); - // The row should be processed and enriched with energy data - assertNotNull(enriched); - // Verify that the row was enriched (you can add more specific assertions here) + // The map should contain enrichment data + assertFalse(enriched.isEmpty()); } @Test @@ -229,10 +227,10 @@ void testProcessElasticSearchInstanceWithValidData() throws IOException { .setResponseCode(200) .addHeader("Content-Type", "application/json")); - Row enriched = enrich("t3.micro.search", "AmazonES", "ESDomain", "AmazonES", 1.0); + Map enriched = callEnrich("t3.micro.search", "AmazonES", "ESDomain", "AmazonES", 1.0); - // The row should be processed and enriched with energy data - assertNotNull(enriched); + // The map should contain enrichment data + assertFalse(enriched.isEmpty()); } @Test @@ -244,10 +242,10 @@ void testProcessRDSInstanceWithValidData() throws IOException { .setResponseCode(200) .addHeader("Content-Type", "application/json")); - Row enriched = enrich("db.t3.micro", "AmazonRDS", "CreateDBInstance", "AmazonRDS", 1.0); + Map enriched = callEnrich("db.t3.micro", "AmazonRDS", "CreateDBInstance", "AmazonRDS", 1.0); - // The row should be processed and enriched with energy data - assertNotNull(enriched); + // The map should contain enrichment data + assertFalse(enriched.isEmpty()); } @ParameterizedTest @@ -260,10 +258,10 @@ void testProcessEC2WithDifferentOperationPrefixes(String operation) throws IOExc .setResponseCode(200) .addHeader("Content-Type", "application/json")); - Row enriched = enrich("t3.micro", "AmazonEC2", operation, "AmazonEC2", 1.0); + Map enriched = callEnrich("t3.micro", "AmazonEC2", operation, "AmazonEC2", 1.0); - // The row should be processed and enriched with energy data - assertNotNull(enriched); + // The map should contain enrichment data + assertFalse(enriched.isEmpty()); } static Stream validEC2OperationTestCases() { @@ -284,10 +282,10 @@ void testProcessRDSWithDifferentOperationPrefixes(String operation) throws IOExc .setResponseCode(200) .addHeader("Content-Type", "application/json")); - Row enriched = enrich("db.t3.micro", "AmazonRDS", operation, "AmazonRDS", 1.0); + Map enriched = callEnrich("db.t3.micro", "AmazonRDS", operation, "AmazonRDS", 1.0); - // The row should be processed and enriched with energy data - assertNotNull(enriched); + // The map should contain enrichment data + assertFalse(enriched.isEmpty()); } static Stream validRDSOperationTestCases() { @@ -308,10 +306,10 @@ void testProcessWithComplexInstanceTypes(String instanceType) throws IOException .setResponseCode(200) .addHeader("Content-Type", "application/json")); - Row enriched = enrich(instanceType, "AmazonEC2", "RunInstances", "AmazonEC2", 1.0); + Map enriched = callEnrich(instanceType, "AmazonEC2", "RunInstances", "AmazonEC2", 1.0); - // The row should be processed and enriched with energy data - assertNotNull(enriched); + // The map should contain enrichment data + assertFalse(enriched.isEmpty()); } static Stream complexInstanceTypeTestCases() { diff --git a/src/test/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPIstaticTest.java b/src/test/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPIstaticTest.java index 3db5067..f39b1f7 100644 --- a/src/test/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPIstaticTest.java +++ b/src/test/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPIstaticTest.java @@ -17,6 +17,7 @@ import org.junit.jupiter.params.provider.MethodSource; import java.util.HashMap; +import java.util.Map; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -61,10 +62,11 @@ void testColumnsAdded() { void testProcessWithNullValues(String instanceType, String serviceCode, String operation, String productCode) { Object[] values = new Object[]{instanceType, serviceCode, operation, productCode, null, null, null}; Row row = new GenericRowWithSchema(values, schema); - Row enriched = module.process(row); + Map enriched = new HashMap<>(); + module.enrich(row, enriched); - // Should return the original row unchanged - assertEquals(row, enriched); + // Should not add any enrichment + assertTrue(enriched.isEmpty()); } static Stream nullValueTestCases() { @@ -82,24 +84,18 @@ static Stream nullValueTestCases() { void testProcessWithEmptyValues(String instanceType, String serviceCode, String operation, String productCode) { Object[] values = new Object[]{instanceType, serviceCode, operation, productCode, null, null, null}; Row row = new GenericRowWithSchema(values, schema); + Map enriched = new HashMap<>(); - // Test cases 1 and 2 (null and empty instance types) should throw IllegalArgumentException - // Test cases 3 and 4 (empty strings for all fields) should return unchanged rows - // because BoaviztAPI.process() has early returns before calling BoaviztAPIClient.getEnergyEstimates() if (instanceType == null || instanceType.trim().isEmpty()) { - // These should throw IllegalArgumentException if they reach the API call try { - Row enriched = module.process(row); - // If no exception was thrown, the row should be unchanged - assertEquals(row, enriched, "Should return unchanged row for null/empty instance types that don't reach API call"); + module.enrich(row, enriched); + assertTrue(enriched.isEmpty(), "Should not enrich for null/empty instance types"); } catch (IllegalArgumentException e) { - // This is also valid - the validation caught it assertTrue(e.getMessage().contains("Instance type cannot be null, empty, or whitespace only")); } } else { - // Other test cases should return unchanged rows - Row enriched = module.process(row); - assertEquals(row, enriched, "Should return unchanged row for other empty value cases"); + module.enrich(row, enriched); + assertTrue(enriched.isEmpty(), "Should not enrich for other empty value cases"); } } @@ -117,10 +113,11 @@ static Stream emptyValueTestCases() { void testProcessWithUnsupportedValues(String instanceType, String serviceCode, String operation, String productCode) { Object[] values = new Object[]{instanceType, serviceCode, operation, productCode, null, null, null}; Row row = new GenericRowWithSchema(values, schema); - Row enriched = module.process(row); + Map enriched = new HashMap<>(); + module.enrich(row, enriched); - // Should return the original row unchanged for unsupported services/operations - assertEquals(row, enriched); + // Should not add any enrichment for unsupported services/operations + assertTrue(enriched.isEmpty()); } static Stream unsupportedValueTestCases() { @@ -136,10 +133,11 @@ static Stream unsupportedValueTestCases() { void testProcessWithEdgeCases(String instanceType, String serviceCode, String operation, String productCode) { Object[] values = new Object[]{instanceType, serviceCode, operation, productCode, null, null, null}; Row row = new GenericRowWithSchema(values, schema); - Row enriched = module.process(row); + Map enriched = new HashMap<>(); + module.enrich(row, enriched); - // Should return the original row unchanged for edge cases - assertEquals(row, enriched); + // Should not add any enrichment for edge cases + assertTrue(enriched.isEmpty()); } static Stream edgeCaseTestCases() { diff --git a/src/test/java/com/digitalpebble/spruce/modules/ccf/AcceleratorsTest.java b/src/test/java/com/digitalpebble/spruce/modules/ccf/AcceleratorsTest.java index 203fe3d..c364be4 100644 --- a/src/test/java/com/digitalpebble/spruce/modules/ccf/AcceleratorsTest.java +++ b/src/test/java/com/digitalpebble/spruce/modules/ccf/AcceleratorsTest.java @@ -2,6 +2,7 @@ package com.digitalpebble.spruce.modules.ccf; +import com.digitalpebble.spruce.Column; import com.digitalpebble.spruce.Utils; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; @@ -11,17 +12,17 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import java.util.HashMap; import java.util.Map; import java.util.stream.Stream; import static com.digitalpebble.spruce.SpruceColumn.ENERGY_USED; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; class AcceleratorsTest { private static final Accelerators accelerators = new Accelerators(); - private static final StructType schema = Utils.getSchema(accelerators); @BeforeAll @@ -29,22 +30,29 @@ static void initialize() { accelerators.init(Map.of()); } - // {PRODUCT_INSTANCE_TYPE, LINE_ITEM_OPERATION, LINE_ITEM_PRODUCT_CODE, USAGE_AMOUNT}; private static Stream provideArgsWithType() { - return Stream.of(Arguments.of("g5.xlarge", "RunInstances", "AmazonEC2", 1.0d, 0.0855d), Arguments.of("g5.xlarge", "RunInstances", "AmazonEC2", 2.0d, 0.171d), Arguments.of("g4dn.xlarge", "RunInstances", "AmazonEC2", 1.0d, 0.0395d), Arguments.of("c5.xlarge", "RunInstances", "AmazonEC2", 1.0d, null), Arguments.of("g6.8xlarge", "RunInstances", "AmazonEC2", 1.0d, 0.04d)); + return Stream.of( + Arguments.of("g5.xlarge", "RunInstances", "AmazonEC2", 1.0d, 0.0855d), + Arguments.of("g5.xlarge", "RunInstances", "AmazonEC2", 2.0d, 0.171d), + Arguments.of("g4dn.xlarge", "RunInstances", "AmazonEC2", 1.0d, 0.0395d), + Arguments.of("c5.xlarge", "RunInstances", "AmazonEC2", 1.0d, null), + Arguments.of("g6.8xlarge", "RunInstances", "AmazonEC2", 1.0d, 0.04d) + ); } @ParameterizedTest @MethodSource("provideArgsWithType") - void process(String PRODUCT_INSTANCE_TYPE, String LINE_ITEM_OPERATION, String LINE_ITEM_PRODUCT_CODE, double USAGE_AMOUNT, Double expected) { - Object[] values = new Object[]{PRODUCT_INSTANCE_TYPE, LINE_ITEM_OPERATION, LINE_ITEM_PRODUCT_CODE, USAGE_AMOUNT, null}; + void process(String PRODUCT_INSTANCE_TYPE, String LINE_ITEM_OPERATION, + String LINE_ITEM_PRODUCT_CODE, double USAGE_AMOUNT, Double expected) { + Object[] values = new Object[]{PRODUCT_INSTANCE_TYPE, LINE_ITEM_OPERATION, + LINE_ITEM_PRODUCT_CODE, USAGE_AMOUNT, null}; Row row = new GenericRowWithSchema(values, schema); - Row result = accelerators.process(row); + Map enriched = new HashMap<>(); + accelerators.enrich(row, enriched); if (expected != null) { - assertEquals(expected, ENERGY_USED.getDouble(result), 0.0001); + assertEquals(expected, (Double) enriched.get(ENERGY_USED), 0.0001); } else { - assertTrue(ENERGY_USED.isNullAt(result)); + assertFalse(enriched.containsKey(ENERGY_USED)); } } - -} \ No newline at end of file +} diff --git a/src/test/java/com/digitalpebble/spruce/modules/ccf/NetworkingTest.java b/src/test/java/com/digitalpebble/spruce/modules/ccf/NetworkingTest.java index 73f8f72..965cd87 100644 --- a/src/test/java/com/digitalpebble/spruce/modules/ccf/NetworkingTest.java +++ b/src/test/java/com/digitalpebble/spruce/modules/ccf/NetworkingTest.java @@ -2,6 +2,7 @@ package com.digitalpebble.spruce.modules.ccf; +import com.digitalpebble.spruce.Column; import com.digitalpebble.spruce.Utils; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; @@ -12,35 +13,32 @@ import java.util.HashMap; import java.util.Map; +import static com.digitalpebble.spruce.SpruceColumn.ENERGY_USED; import static org.junit.jupiter.api.Assertions.*; class NetworkingTest { private Networking networking = new Networking(); - private StructType schema = Utils.getSchema(networking); @Test void processNoValues() { - Object[] values = new Object[] {null, null, null}; + Object[] values = new Object[] {null, null, null, null}; Row row = new GenericRowWithSchema(values, schema); - Row enriched = networking.process(row); - // missing values comes back as it was - assertEquals(row, enriched); + Map enriched = new HashMap<>(); + networking.enrich(row, enriched); + assertFalse(enriched.containsKey(ENERGY_USED)); } @Test void processValues() { - // 10 GBs of data transfer between regions Map product = new HashMap<>(); product.put("transfer_type", "InterRegion"); - ; Object[] values = new Object[] {"AWSDataTransfer", JavaConverters.asScala(product), 10d, null}; Row row = new GenericRowWithSchema(values, schema); - Row enriched = networking.process(row); - // should have an estimated 0.01 kwh + Map enriched = new HashMap<>(); + networking.enrich(row, enriched); double expected = networking.network_coefficient * 10; - assertEquals(0.01d, enriched.getDouble(3)); + assertEquals(0.01d, (Double) enriched.get(ENERGY_USED)); } - -} \ No newline at end of file +} diff --git a/src/test/java/com/digitalpebble/spruce/modules/ccf/StorageTest.java b/src/test/java/com/digitalpebble/spruce/modules/ccf/StorageTest.java index 6b40fd5..f5f2ed3 100644 --- a/src/test/java/com/digitalpebble/spruce/modules/ccf/StorageTest.java +++ b/src/test/java/com/digitalpebble/spruce/modules/ccf/StorageTest.java @@ -2,6 +2,7 @@ package com.digitalpebble.spruce.modules.ccf; +import com.digitalpebble.spruce.Column; import com.digitalpebble.spruce.Utils; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; @@ -11,16 +12,17 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import java.util.HashMap; import java.util.Map; import java.util.stream.Stream; import static com.digitalpebble.spruce.SpruceColumn.ENERGY_USED; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; class StorageTest { private static final Storage storage = new Storage(); - private static final StructType schema = Utils.getSchema(storage); @BeforeAll @@ -29,11 +31,21 @@ static void initialize() { } private static Stream provideArgsWithType() { - return Stream.of(Arguments.of("Storage", 0.1d, "EUW2-TimedStorage-ByteHrs", "AmazonS3", "GB-Mo", false), Arguments.of("Storage", 0.1d, "SomeUsageType", "AmazonDocDB", "GB-Mo", false), Arguments.of("CreateVolume", 10d, "EUW2-EBS:VolumeUsage", "AmazonEC2", "GB-Mo", false), Arguments.of("CreateVolume-Gp2", 10d, "EBS:VolumeUsage.gp2", "AmazonEC2", "GB-Mo", true), Arguments.of("CreateVolume-Gp3", 10d, "VolumeUsage.gp3", "AmazonEC2", "GB-Mo", true)); + return Stream.of( + Arguments.of("Storage", 0.1d, "EUW2-TimedStorage-ByteHrs", "AmazonS3", "GB-Mo", false), + Arguments.of("Storage", 0.1d, "SomeUsageType", "AmazonDocDB", "GB-Mo", false), + Arguments.of("CreateVolume", 10d, "EUW2-EBS:VolumeUsage", "AmazonEC2", "GB-Mo", false), + Arguments.of("CreateVolume-Gp2", 10d, "EBS:VolumeUsage.gp2", "AmazonEC2", "GB-Mo", true), + Arguments.of("CreateVolume-Gp3", 10d, "VolumeUsage.gp3", "AmazonEC2", "GB-Mo", true) + ); } private static Stream provideArgsWrongUnit() { - return Stream.of(Arguments.of("Storage", 10d, "SomeUsageType", "AmazonDocDB", "vCPU-hour"), Arguments.of("CreateVolume-Gp3", 0.1, "EBS:VolumeP-IOPS.gp3", "AmazonEC2", "IOPS-Mo"), Arguments.of("CreateVolume-Gp2", 0.1, "EBS:VolumeP-Throughput.gp3", "AmazonEC2", "GiBps-mo")); + return Stream.of( + Arguments.of("Storage", 10d, "SomeUsageType", "AmazonDocDB", "vCPU-hour"), + Arguments.of("CreateVolume-Gp3", 0.1, "EBS:VolumeP-IOPS.gp3", "AmazonEC2", "IOPS-Mo"), + Arguments.of("CreateVolume-Gp2", 0.1, "EBS:VolumeP-Throughput.gp3", "AmazonEC2", "GiBps-mo") + ); } @ParameterizedTest @@ -41,21 +53,25 @@ private static Stream provideArgsWrongUnit() { void process(String operation, double amount, String usage, String service, String unit, boolean isSSD) { Object[] values = new Object[]{operation, amount, usage, service, unit, null}; Row row = new GenericRowWithSchema(values, schema); - Row result = storage.process(row); + Map enriched = new HashMap<>(); + storage.enrich(row, enriched); double gb_hours = Utils.Conversions.GBMonthsToGBHours(amount); int replication = storage.getReplicationFactor(service, usage); double coef = isSSD ? storage.ssd_gb_coefficient : storage.hdd_gb_coefficient; double expected = gb_hours * coef * replication / 1000; - assertEquals(expected, ENERGY_USED.getDouble(result), 0.0001); + assertEquals(expected, (Double) enriched.get(ENERGY_USED), 0.0001); } @ParameterizedTest @MethodSource("provideArgsWrongUnit") - void processSSDServiceWrongUnit(String LINE_ITEM_OPERATION, double USAGE_AMOUNT, String LINE_ITEM_USAGE_TYPE, String PRODUCT_SERVICE_CODE, String PRICING_UNIT) { - Object[] values = new Object[]{LINE_ITEM_OPERATION, USAGE_AMOUNT, LINE_ITEM_USAGE_TYPE, PRODUCT_SERVICE_CODE, PRICING_UNIT, null}; + void processSSDServiceWrongUnit(String LINE_ITEM_OPERATION, double USAGE_AMOUNT, + String LINE_ITEM_USAGE_TYPE, String PRODUCT_SERVICE_CODE, + String PRICING_UNIT) { + Object[] values = new Object[]{LINE_ITEM_OPERATION, USAGE_AMOUNT, LINE_ITEM_USAGE_TYPE, + PRODUCT_SERVICE_CODE, PRICING_UNIT, null}; Row row = new GenericRowWithSchema(values, schema); - Row result = storage.process(row); - assertEquals(row, result); + Map enriched = new HashMap<>(); + storage.enrich(row, enriched); + assertFalse(enriched.containsKey(ENERGY_USED)); } - -} \ No newline at end of file +} From a93655be04d7bf4f8f4fc6a93b9a2c5c065f35e5 Mon Sep 17 00:00:00 2001 From: Julien Nioche Date: Sun, 15 Feb 2026 18:33:39 +0000 Subject: [PATCH 2/3] Method refactoring in Column - move specific to Rows to CURCOlumns + added utility method to SPRUCEcolumns Signed-off-by: Julien Nioche --- .../com/digitalpebble/spruce/CURColumn.java | 61 ++++++++++++++----- .../java/com/digitalpebble/spruce/Column.java | 39 ------------ .../digitalpebble/spruce/SpruceColumn.java | 28 ++++++--- .../spruce/modules/OperationalEmissions.java | 15 ++--- .../com/digitalpebble/spruce/modules/PUE.java | 7 +-- .../spruce/modules/RegionExtraction.java | 5 +- .../spruce/modules/ccf/Accelerators.java | 2 +- .../AverageCarbonIntensity.java | 2 +- .../com/digitalpebble/spruce/ColumnTest.java | 16 ++--- 9 files changed, 85 insertions(+), 90 deletions(-) diff --git a/src/main/java/com/digitalpebble/spruce/CURColumn.java b/src/main/java/com/digitalpebble/spruce/CURColumn.java index 5642270..b3a4ab4 100644 --- a/src/main/java/com/digitalpebble/spruce/CURColumn.java +++ b/src/main/java/com/digitalpebble/spruce/CURColumn.java @@ -2,6 +2,8 @@ package com.digitalpebble.spruce; +import org.apache.spark.SparkIllegalArgumentException; +import org.apache.spark.sql.Row; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.MapType; @@ -10,22 +12,53 @@ /** Columns from CUR reports **/ public class CURColumn extends Column { - public static Column LINE_ITEM_OPERATION = new CURColumn("line_item_operation", StringType); - public static Column LINE_ITEM_PRODUCT_CODE = new CURColumn("line_item_product_code", StringType); - public static Column LINE_ITEM_TYPE = new CURColumn("line_item_line_item_type", StringType); - public static Column LINE_ITEM_USAGE_TYPE = new CURColumn("line_item_usage_type", StringType); - public static Column PRICING_UNIT= new CURColumn("pricing_unit", StringType); - public static Column PRODUCT = new CURColumn("product", MapType.apply(StringType,StringType)); - public static Column PRODUCT_INSTANCE_TYPE = new CURColumn("product_instance_type", StringType); - public static Column PRODUCT_INSTANCE_FAMILY = new CURColumn("product_instance_family", StringType); - public static Column PRODUCT_PRODUCT_FAMILY = new CURColumn("product_product_family", StringType); - public static Column PRODUCT_REGION_CODE = new CURColumn("product_region_code", StringType); - public static Column PRODUCT_FROM_REGION_CODE = new CURColumn("product_from_region_code", StringType); - public static Column PRODUCT_TO_REGION_CODE = new CURColumn("product_to_region_code", StringType); - public static Column PRODUCT_SERVICE_CODE = new CURColumn("product_servicecode", StringType); - public static Column USAGE_AMOUNT = new CURColumn("line_item_usage_amount", DoubleType); + public static CURColumn LINE_ITEM_OPERATION = new CURColumn("line_item_operation", StringType); + public static CURColumn LINE_ITEM_PRODUCT_CODE = new CURColumn("line_item_product_code", StringType); + public static CURColumn LINE_ITEM_TYPE = new CURColumn("line_item_line_item_type", StringType); + public static CURColumn LINE_ITEM_USAGE_TYPE = new CURColumn("line_item_usage_type", StringType); + public static CURColumn PRICING_UNIT= new CURColumn("pricing_unit", StringType); + public static CURColumn PRODUCT = new CURColumn("product", MapType.apply(StringType,StringType)); + public static CURColumn PRODUCT_INSTANCE_TYPE = new CURColumn("product_instance_type", StringType); + public static CURColumn PRODUCT_INSTANCE_FAMILY = new CURColumn("product_instance_family", StringType); + public static CURColumn PRODUCT_PRODUCT_FAMILY = new CURColumn("product_product_family", StringType); + public static CURColumn PRODUCT_REGION_CODE = new CURColumn("product_region_code", StringType); + public static CURColumn PRODUCT_FROM_REGION_CODE = new CURColumn("product_from_region_code", StringType); + public static CURColumn PRODUCT_TO_REGION_CODE = new CURColumn("product_to_region_code", StringType); + public static CURColumn PRODUCT_SERVICE_CODE = new CURColumn("product_servicecode", StringType); + public static CURColumn USAGE_AMOUNT = new CURColumn("line_item_usage_amount", DoubleType); CURColumn(String l, DataType t) { super(l, t); } + + /** Returns the double value for this column in the given row. */ + public double getDouble(Row r) { + return r.getDouble(resolveIndex(r)); + } + + /** + * Returns the String value for this column in the given row. + * If optional is true, returns null when the field is not in the schema; + * otherwise propagates the exception. + */ + public String getString(Row r, boolean optional) { + try { + return r.getString(resolveIndex(r)); + } catch (SparkIllegalArgumentException e) { + if (optional) { + return null; + } + throw e; + } + } + + /** Returns the String value for this column in the given row. */ + public String getString(Row r) { + return getString(r, false); + } + + /** Returns true if the value for this column is null in the given row. */ + public boolean isNullAt(Row r) { + return r.isNullAt(resolveIndex(r)); + } } diff --git a/src/main/java/com/digitalpebble/spruce/Column.java b/src/main/java/com/digitalpebble/spruce/Column.java index cabe18d..9cd6e97 100644 --- a/src/main/java/com/digitalpebble/spruce/Column.java +++ b/src/main/java/com/digitalpebble/spruce/Column.java @@ -2,7 +2,6 @@ package com.digitalpebble.spruce; -import org.apache.spark.SparkIllegalArgumentException; import org.apache.spark.sql.Row; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.StructType; @@ -55,43 +54,5 @@ public int resolveIndex(Row r) { cachedIndex = index; return index; } - - /** - * Utility method to get the value for the column in the row - **/ - public double getDouble(Row r) { - return r.getDouble(resolveIndex(r)); - } - - - /** - * Utility method to get the value for the column in the row or null if it does not exist. - * Returns null if the field is not defined in the schema and optional is set to true, - * propagates an exception otherwise. - **/ - public String getString(Row r, boolean optional) { - try { - return r.getString(resolveIndex(r)); - } catch (SparkIllegalArgumentException e) { - if (optional) { - return null; - } - throw e; - } - } - - /** - * Utility method to get the value for the column in the row or null if it does not exist - **/ - public String getString(Row r) { - return getString(r, false); - } - - /** - * Utility method to get check that a value is non null - **/ - public boolean isNullAt(Row r) { - return r.isNullAt(resolveIndex(r)); - } } diff --git a/src/main/java/com/digitalpebble/spruce/SpruceColumn.java b/src/main/java/com/digitalpebble/spruce/SpruceColumn.java index 990fabf..5142cf6 100644 --- a/src/main/java/com/digitalpebble/spruce/SpruceColumn.java +++ b/src/main/java/com/digitalpebble/spruce/SpruceColumn.java @@ -4,23 +4,35 @@ import org.apache.spark.sql.types.DataType; +import java.util.Map; + import static org.apache.spark.sql.types.DataTypes.DoubleType; import static org.apache.spark.sql.types.DataTypes.StringType; /** Defines columns added by the EnrichmentModules **/ public class SpruceColumn extends Column { - public static Column ENERGY_USED = new SpruceColumn("operational_energy_kwh", DoubleType); - public static Column CARBON_INTENSITY = new SpruceColumn("carbon_intensity", DoubleType); - public static Column OPERATIONAL_EMISSIONS = new SpruceColumn("operational_emissions_co2eq_g", DoubleType); - public static Column EMBODIED_EMISSIONS = new SpruceColumn("embodied_emissions_co2eq_g", DoubleType); - public static Column EMBODIED_ADP = new SpruceColumn("embodied_adp_sbeq_g", DoubleType); - public static Column CPU_LOAD = new SpruceColumn("cpu_load_percentage", DoubleType); - public static Column PUE = new SpruceColumn("power_usage_effectiveness", DoubleType); - public static Column REGION = new SpruceColumn("region", StringType); + public static SpruceColumn ENERGY_USED = new SpruceColumn("operational_energy_kwh", DoubleType); + public static SpruceColumn CARBON_INTENSITY = new SpruceColumn("carbon_intensity", DoubleType); + public static SpruceColumn OPERATIONAL_EMISSIONS = new SpruceColumn("operational_emissions_co2eq_g", DoubleType); + public static SpruceColumn EMBODIED_EMISSIONS = new SpruceColumn("embodied_emissions_co2eq_g", DoubleType); + public static SpruceColumn EMBODIED_ADP = new SpruceColumn("embodied_adp_sbeq_g", DoubleType); + public static SpruceColumn CPU_LOAD = new SpruceColumn("cpu_load_percentage", DoubleType); + public static SpruceColumn PUE = new SpruceColumn("power_usage_effectiveness", DoubleType); + public static SpruceColumn REGION = new SpruceColumn("region", StringType); private SpruceColumn(String l, DataType t) { super(l, t); } + /** Returns the String value for this column from the enriched values map, or null if absent. */ + public String getString(Map enrichedValues) { + return (String) enrichedValues.get(this); + } + + /** Returns the Double value for this column from the enriched values map, or null if absent. */ + public Double getDouble(Map enrichedValues) { + return (Double) enrichedValues.get(this); + } + } diff --git a/src/main/java/com/digitalpebble/spruce/modules/OperationalEmissions.java b/src/main/java/com/digitalpebble/spruce/modules/OperationalEmissions.java index 9ebc42c..f9a3603 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/OperationalEmissions.java +++ b/src/main/java/com/digitalpebble/spruce/modules/OperationalEmissions.java @@ -28,18 +28,15 @@ public Column[] columnsAdded() { @Override public void enrich(Row inputRow, Map enrichedValues) { - Object energyObj = enrichedValues.get(ENERGY_USED); - if (energyObj == null) return; + Double energyUsed = ENERGY_USED.getDouble(enrichedValues); + if (energyUsed == null) return; - Object ciObj = enrichedValues.get(CARBON_INTENSITY); - if (ciObj == null) return; - - final double energyUsed = (Double) energyObj; + Double carbon_intensity = CARBON_INTENSITY.getDouble(enrichedValues); + if (carbon_intensity == null) return; // take into account the PUE if present - Object pueObj = enrichedValues.get(PUE); - final double pue = pueObj != null ? (Double) pueObj : 1.0; - final double carbon_intensity = (Double) ciObj; + Double pueVal = PUE.getDouble(enrichedValues); + final double pue = pueVal != null ? pueVal : 1.0; final double emissions = energyUsed * carbon_intensity * pue; enrichedValues.put(OPERATIONAL_EMISSIONS, emissions); diff --git a/src/main/java/com/digitalpebble/spruce/modules/PUE.java b/src/main/java/com/digitalpebble/spruce/modules/PUE.java index 5195cdf..bf60d6a 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/PUE.java +++ b/src/main/java/com/digitalpebble/spruce/modules/PUE.java @@ -85,13 +85,12 @@ public Column[] columnsAdded() { @Override public void enrich(Row inputRow, Map enrichedValues) { - Object energyObj = enrichedValues.get(ENERGY_USED); - if (energyObj == null) return; + Double energyUsed = ENERGY_USED.getDouble(enrichedValues); + if (energyUsed == null) return; - double energyUsed = (Double) energyObj; if (energyUsed <= 0) return; - String region = (String) enrichedValues.get(REGION); + String region = REGION.getString(enrichedValues); double pueToApply = getPueForRegion(region); diff --git a/src/main/java/com/digitalpebble/spruce/modules/RegionExtraction.java b/src/main/java/com/digitalpebble/spruce/modules/RegionExtraction.java index 9f9e105..d14a1b7 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/RegionExtraction.java +++ b/src/main/java/com/digitalpebble/spruce/modules/RegionExtraction.java @@ -2,6 +2,7 @@ package com.digitalpebble.spruce.modules; +import com.digitalpebble.spruce.CURColumn; import com.digitalpebble.spruce.Column; import com.digitalpebble.spruce.EnrichmentModule; import org.apache.spark.sql.Row; @@ -16,7 +17,7 @@ **/ public class RegionExtraction implements EnrichmentModule { - private static final Column[] location_columns = new Column[]{PRODUCT_REGION_CODE, PRODUCT_FROM_REGION_CODE, PRODUCT_TO_REGION_CODE}; + private static final CURColumn[] location_columns = new CURColumn[]{PRODUCT_REGION_CODE, PRODUCT_FROM_REGION_CODE, PRODUCT_TO_REGION_CODE}; @Override public Column[] columnsNeeded() { @@ -33,7 +34,7 @@ public void enrich(Row inputRow, Map enrichedValues) { // get the location // in most cases you have a product_region_code but can be product_to_region_code or product_from_region_code // when the traffic is between two regions or to/from the outside - for (Column c : location_columns) { + for (CURColumn c : location_columns) { String locationCode = c.getString(inputRow); if (locationCode != null) { enrichedValues.put(REGION, locationCode); diff --git a/src/main/java/com/digitalpebble/spruce/modules/ccf/Accelerators.java b/src/main/java/com/digitalpebble/spruce/modules/ccf/Accelerators.java index afef43a..4d06caf 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/ccf/Accelerators.java +++ b/src/main/java/com/digitalpebble/spruce/modules/ccf/Accelerators.java @@ -105,7 +105,7 @@ public void enrich(Row inputRow, Map enrichedValues) { energy_used = (amount * energy_used * quantity / 1000); // add it to an existing value or create it - Double existing = (Double) enrichedValues.get(ENERGY_USED); + Double existing = ENERGY_USED.getDouble(enrichedValues); if (existing != null) { energy_used += existing; } diff --git a/src/main/java/com/digitalpebble/spruce/modules/electricitymaps/AverageCarbonIntensity.java b/src/main/java/com/digitalpebble/spruce/modules/electricitymaps/AverageCarbonIntensity.java index c8375aa..7399ff5 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/electricitymaps/AverageCarbonIntensity.java +++ b/src/main/java/com/digitalpebble/spruce/modules/electricitymaps/AverageCarbonIntensity.java @@ -82,7 +82,7 @@ public void enrich(Row inputRow, Map enrichedValues) { return; } - String locationCode = (String) enrichedValues.get(REGION); + String locationCode = REGION.getString(enrichedValues); // no location found - skip if (locationCode == null) { return; diff --git a/src/test/java/com/digitalpebble/spruce/ColumnTest.java b/src/test/java/com/digitalpebble/spruce/ColumnTest.java index e934854..099953e 100644 --- a/src/test/java/com/digitalpebble/spruce/ColumnTest.java +++ b/src/test/java/com/digitalpebble/spruce/ColumnTest.java @@ -13,7 +13,7 @@ public class ColumnTest { @Test public void testGetLabelAndType() { - TestColumn col = new TestColumn("field1", DataTypes.StringType); + CURColumn col = new CURColumn("field1", DataTypes.StringType); assertEquals("field1", col.getLabel()); assertEquals(DataTypes.StringType, col.getType()); } @@ -26,7 +26,7 @@ public void testGetStringAndIsNullAt() { // non-null value Row r1 = new GenericRowWithSchema(new Object[]{"hello"}, schema); - TestColumn col = new TestColumn(name, DataTypes.StringType); + CURColumn col = new CURColumn(name, DataTypes.StringType); assertFalse(col.isNullAt(r1)); assertEquals("hello", col.getString(r1)); @@ -36,7 +36,7 @@ public void testGetStringAndIsNullAt() { assertNull(col.getString(r2)); // ask for a field not in the schema but allow it - TestColumn col2 = new TestColumn("unknown", DataTypes.StringType); + CURColumn col2 = new CURColumn("unknown", DataTypes.StringType); assertNull(col2.getString(r2, true)); boolean exception = false; @@ -55,7 +55,7 @@ public void testGetDoubleAndIsNullAt() { // non-null double Row r1 = new GenericRowWithSchema(new Object[]{3.14159}, schema); - TestColumn col = new TestColumn(name, DataTypes.DoubleType); + CURColumn col = new CURColumn(name, DataTypes.DoubleType); assertFalse(col.isNullAt(r1)); assertEquals(3.14159, col.getDouble(r1), 1e-9); @@ -63,12 +63,4 @@ public void testGetDoubleAndIsNullAt() { Row r2 = new GenericRowWithSchema(new Object[]{null}, schema); assertTrue(col.isNullAt(r2)); } - - // minimal concrete implementation for testing - static class TestColumn extends Column { - TestColumn(String l, DataType t) { - super(l, t); - } - } } - From a437b647f19fd748f3105f4b8e376e85e2617594 Mon Sep 17 00:00:00 2001 From: Julien Nioche Date: Sat, 21 Feb 2026 08:01:33 +0000 Subject: [PATCH 3/3] Rename inputRow into row Signed-off-by: Julien Nioche --- .../spruce/EnrichmentModule.java | 6 ++--- .../spruce/modules/ConstantLoad.java | 2 +- .../spruce/modules/OperationalEmissions.java | 2 +- .../com/digitalpebble/spruce/modules/PUE.java | 2 +- .../spruce/modules/RegionExtraction.java | 4 ++-- .../spruce/modules/Serverless.java | 14 +++++------ .../spruce/modules/boavizta/BoaviztAPI.java | 12 +++++----- .../modules/boavizta/BoaviztAPIstatic.java | 12 +++++----- .../spruce/modules/ccf/Accelerators.java | 12 +++++----- .../spruce/modules/ccf/Networking.java | 10 ++++---- .../spruce/modules/ccf/Storage.java | 24 +++++++++---------- .../AverageCarbonIntensity.java | 2 +- .../com/digitalpebble/spruce/ConfigTest.java | 2 +- .../spruce/EnrichmentModuleTest.java | 6 ++--- .../spruce/EnrichmentPipelineTest.java | 4 ++-- 15 files changed, 57 insertions(+), 57 deletions(-) diff --git a/src/main/java/com/digitalpebble/spruce/EnrichmentModule.java b/src/main/java/com/digitalpebble/spruce/EnrichmentModule.java index 90321b5..a4d0db5 100644 --- a/src/main/java/com/digitalpebble/spruce/EnrichmentModule.java +++ b/src/main/java/com/digitalpebble/spruce/EnrichmentModule.java @@ -28,11 +28,11 @@ default void init(Map params){} Column[] columnsAdded(); /** - * Enrich the given row by reading input columns from {@code inputRow} and + * Enrich the given row by reading input columns from {@code row} and * reading/writing enrichment columns via {@code enrichedValues}. * - * @param inputRow the immutable original row from the dataset + * @param row the immutable original row from the dataset * @param enrichedValues shared map accumulating enrichment values across all modules */ - void enrich(Row inputRow, Map enrichedValues); + void enrich(Row row, Map enrichedValues); } diff --git a/src/main/java/com/digitalpebble/spruce/modules/ConstantLoad.java b/src/main/java/com/digitalpebble/spruce/modules/ConstantLoad.java index 2b30e7d..b6f0981 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/ConstantLoad.java +++ b/src/main/java/com/digitalpebble/spruce/modules/ConstantLoad.java @@ -33,7 +33,7 @@ public Column[] columnsAdded() { } @Override - public void enrich(Row inputRow, Map enrichedValues) { + public void enrich(Row row, Map enrichedValues) { enrichedValues.put(CPU_LOAD, load_value); } } diff --git a/src/main/java/com/digitalpebble/spruce/modules/OperationalEmissions.java b/src/main/java/com/digitalpebble/spruce/modules/OperationalEmissions.java index f9a3603..8c0bc68 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/OperationalEmissions.java +++ b/src/main/java/com/digitalpebble/spruce/modules/OperationalEmissions.java @@ -27,7 +27,7 @@ public Column[] columnsAdded() { } @Override - public void enrich(Row inputRow, Map enrichedValues) { + public void enrich(Row row, Map enrichedValues) { Double energyUsed = ENERGY_USED.getDouble(enrichedValues); if (energyUsed == null) return; diff --git a/src/main/java/com/digitalpebble/spruce/modules/PUE.java b/src/main/java/com/digitalpebble/spruce/modules/PUE.java index bf60d6a..f0543ca 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/PUE.java +++ b/src/main/java/com/digitalpebble/spruce/modules/PUE.java @@ -84,7 +84,7 @@ public Column[] columnsAdded() { } @Override - public void enrich(Row inputRow, Map enrichedValues) { + public void enrich(Row row, Map enrichedValues) { Double energyUsed = ENERGY_USED.getDouble(enrichedValues); if (energyUsed == null) return; diff --git a/src/main/java/com/digitalpebble/spruce/modules/RegionExtraction.java b/src/main/java/com/digitalpebble/spruce/modules/RegionExtraction.java index d14a1b7..7ecc293 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/RegionExtraction.java +++ b/src/main/java/com/digitalpebble/spruce/modules/RegionExtraction.java @@ -30,12 +30,12 @@ public Column[] columnsAdded() { } @Override - public void enrich(Row inputRow, Map enrichedValues) { + public void enrich(Row row, Map enrichedValues) { // get the location // in most cases you have a product_region_code but can be product_to_region_code or product_from_region_code // when the traffic is between two regions or to/from the outside for (CURColumn c : location_columns) { - String locationCode = c.getString(inputRow); + String locationCode = c.getString(row); if (locationCode != null) { enrichedValues.put(REGION, locationCode); return; diff --git a/src/main/java/com/digitalpebble/spruce/modules/Serverless.java b/src/main/java/com/digitalpebble/spruce/modules/Serverless.java index 3322649..0b3a7e1 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/Serverless.java +++ b/src/main/java/com/digitalpebble/spruce/modules/Serverless.java @@ -68,17 +68,17 @@ public Column[] columnsAdded() { } @Override - public void enrich(Row inputRow, Map enrichedValues) { - String usage_type = LINE_ITEM_USAGE_TYPE.getString(inputRow); + public void enrich(Row row, Map enrichedValues) { + String usage_type = LINE_ITEM_USAGE_TYPE.getString(row); if (usage_type == null) { return; } - String operation = LINE_ITEM_OPERATION.getString(inputRow); + String operation = LINE_ITEM_OPERATION.getString(row); if ("FargateTask".equals(operation)) { // memory if (usage_type.endsWith("-GB-Hours")) { - double amount_gb = USAGE_AMOUNT.getDouble(inputRow); + double amount_gb = USAGE_AMOUNT.getDouble(row); double energy = amount_gb * memory_coefficient_kwh; enrichedValues.put(ENERGY_USED, energy); return; @@ -86,7 +86,7 @@ public void enrich(Row inputRow, Map enrichedValues) { // cpu if (usage_type.endsWith("-vCPU-Hours:perCPU")) { - double amount_vcpu = USAGE_AMOUNT.getDouble(inputRow); + double amount_vcpu = USAGE_AMOUNT.getDouble(row); boolean isARM = usage_type.contains("-ARM-"); double coefficient = isARM ? arm_cpu_coefficient_kwh : x86_cpu_coefficient_kwh; double energy = amount_vcpu * coefficient; @@ -96,7 +96,7 @@ public void enrich(Row inputRow, Map enrichedValues) { } else if (usage_type.contains("EMR-SERVERLESS")) { if (usage_type.endsWith("MemoryGBHours")) { - double amount_gb = USAGE_AMOUNT.getDouble(inputRow); + double amount_gb = USAGE_AMOUNT.getDouble(row); double energy = amount_gb * memory_coefficient_kwh; enrichedValues.put(ENERGY_USED, energy); return; @@ -104,7 +104,7 @@ else if (usage_type.contains("EMR-SERVERLESS")) { // cpu if (usage_type.endsWith("-vCPUHours")) { - double amount_vcpu = USAGE_AMOUNT.getDouble(inputRow); + double amount_vcpu = USAGE_AMOUNT.getDouble(row); boolean isARM = usage_type.contains("-ARM-"); double coefficient = isARM ? arm_cpu_coefficient_kwh : x86_cpu_coefficient_kwh; double energy = amount_vcpu * coefficient; diff --git a/src/main/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPI.java b/src/main/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPI.java index 0744b42..f7be778 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPI.java +++ b/src/main/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPI.java @@ -56,7 +56,7 @@ public Column[] columnsAdded() { } @Override - public void enrich(Row inputRow, Map enrichedValues) { + public void enrich(Row row, Map enrichedValues) { if (cache == null) { cache = Caffeine.newBuilder() @@ -74,14 +74,14 @@ public void enrich(Row inputRow, Map enrichedValues) { // TODO handle non-default CPU loads - String instanceType = PRODUCT_INSTANCE_TYPE.getString(inputRow); + String instanceType = PRODUCT_INSTANCE_TYPE.getString(row); if (instanceType == null) { return; } - final String service_code = PRODUCT_SERVICE_CODE.getString(inputRow); - final String operation = LINE_ITEM_OPERATION.getString(inputRow); - final String product_code = LINE_ITEM_PRODUCT_CODE.getString(inputRow); + final String service_code = PRODUCT_SERVICE_CODE.getString(row); + final String operation = LINE_ITEM_OPERATION.getString(row); + final String product_code = LINE_ITEM_PRODUCT_CODE.getString(row); if (operation == null || product_code == null) { return; @@ -129,7 +129,7 @@ else if (product_code.equals("AmazonES") && operation.equals("ESDomain")) { } } - double amount = USAGE_AMOUNT.getDouble(inputRow); + double amount = USAGE_AMOUNT.getDouble(row); enrichedValues.put(ENERGY_USED, impacts.getFinalEnergyKWh() * amount); enrichedValues.put(EMBODIED_EMISSIONS, impacts.getEmbeddedEmissionsGramsCO2eq() * amount); diff --git a/src/main/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPIstatic.java b/src/main/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPIstatic.java index c45dfe0..6d4532b 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPIstatic.java +++ b/src/main/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPIstatic.java @@ -73,16 +73,16 @@ public Column[] columnsAdded() { } @Override - public void enrich(Row inputRow, Map enrichedValues) { + public void enrich(Row row, Map enrichedValues) { - String instanceType = PRODUCT_INSTANCE_TYPE.getString(inputRow); + String instanceType = PRODUCT_INSTANCE_TYPE.getString(row); if (instanceType == null) { return; } - final String service_code = PRODUCT_SERVICE_CODE.getString(inputRow); - final String operation = LINE_ITEM_OPERATION.getString(inputRow); - final String product_code = LINE_ITEM_PRODUCT_CODE.getString(inputRow); + final String service_code = PRODUCT_SERVICE_CODE.getString(row); + final String operation = LINE_ITEM_OPERATION.getString(row); + final String product_code = LINE_ITEM_PRODUCT_CODE.getString(row); if (operation == null || product_code == null) { return; @@ -122,7 +122,7 @@ else if (product_code.equals("AmazonES") && operation.equals("ESDomain")) { return; } - double amount = USAGE_AMOUNT.getDouble(inputRow); + double amount = USAGE_AMOUNT.getDouble(row); enrichedValues.put(ENERGY_USED, impacts.getFinalEnergyKWh() * amount); enrichedValues.put(EMBODIED_EMISSIONS, impacts.getEmbeddedEmissionsGramsCO2eq() * amount); diff --git a/src/main/java/com/digitalpebble/spruce/modules/ccf/Accelerators.java b/src/main/java/com/digitalpebble/spruce/modules/ccf/Accelerators.java index 4d06caf..80581bb 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/ccf/Accelerators.java +++ b/src/main/java/com/digitalpebble/spruce/modules/ccf/Accelerators.java @@ -52,16 +52,16 @@ public Column[] columnsAdded() { } @Override - public void enrich(Row inputRow, Map enrichedValues) { + public void enrich(Row row, Map enrichedValues) { // limit to EC2 instances - String instanceType = PRODUCT_INSTANCE_TYPE.getString(inputRow); + String instanceType = PRODUCT_INSTANCE_TYPE.getString(row); if (instanceType == null) { return; } - final String operation = LINE_ITEM_OPERATION.getString(inputRow); - final String product_code = LINE_ITEM_PRODUCT_CODE.getString(inputRow); + final String operation = LINE_ITEM_OPERATION.getString(row); + final String product_code = LINE_ITEM_PRODUCT_CODE.getString(row); if (operation == null || product_code == null) { return; @@ -81,7 +81,7 @@ public void enrich(Row inputRow, Map enrichedValues) { if (instanceTypeInfo == null) { // check product instance family // if GPU then log if we have no info about it - String fam = PRODUCT_INSTANCE_FAMILY.getString(inputRow, true); + String fam = PRODUCT_INSTANCE_FAMILY.getString(row, true); if ("GPU instance".equals(fam)) { LOG.debug("Lacking info for instance type with GPU {}", instanceType); } @@ -99,7 +99,7 @@ public void enrich(Row inputRow, Map enrichedValues) { // minWatts + (gpu_utilisation_percent / 100) * (maxWatts - minWatts) double energy_used = minWatts + ((double) gpu_utilisation_percent / 100) * (maxWatts - minWatts); - double amount = USAGE_AMOUNT.getDouble(inputRow); + double amount = USAGE_AMOUNT.getDouble(row); // watts to kw energy_used = (amount * energy_used * quantity / 1000); diff --git a/src/main/java/com/digitalpebble/spruce/modules/ccf/Networking.java b/src/main/java/com/digitalpebble/spruce/modules/ccf/Networking.java index 5801d97..5a83f9e 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/ccf/Networking.java +++ b/src/main/java/com/digitalpebble/spruce/modules/ccf/Networking.java @@ -47,14 +47,14 @@ public Column[] columnsAdded() { } @Override - public void enrich(Row inputRow, Map enrichedValues) { - String service_code = PRODUCT_SERVICE_CODE.getString(inputRow); + public void enrich(Row row, Map enrichedValues) { + String service_code = PRODUCT_SERVICE_CODE.getString(row); if (service_code == null || !service_code.equals("AWSDataTransfer")) { return; } // apply only to rows corresponding to networking in or out of a region - int index = PRODUCT.resolveIndex(inputRow); - Map productMap = inputRow.getJavaMap(index); + int index = PRODUCT.resolveIndex(row); + Map productMap = row.getJavaMap(index); String transfer_type = (String) productMap.getOrDefault("transfer_type", ""); if (!transfer_type.startsWith("InterRegion")) { @@ -64,7 +64,7 @@ public void enrich(Row inputRow, Map enrichedValues) { // TODO consider extending to AWS Outbound and Inbound // get the amount of data transferred - double amount_gb = USAGE_AMOUNT.getDouble(inputRow); + double amount_gb = USAGE_AMOUNT.getDouble(row); double energy_gb = amount_gb * network_coefficient; enrichedValues.put(ENERGY_USED, energy_gb); diff --git a/src/main/java/com/digitalpebble/spruce/modules/ccf/Storage.java b/src/main/java/com/digitalpebble/spruce/modules/ccf/Storage.java index b6a4083..9ab3ca5 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/ccf/Storage.java +++ b/src/main/java/com/digitalpebble/spruce/modules/ccf/Storage.java @@ -77,31 +77,31 @@ public Column[] columnsAdded() { } @Override - public void enrich(Row inputRow, Map enrichedValues) { - final String operation = LINE_ITEM_OPERATION.getString(inputRow); + public void enrich(Row row, Map enrichedValues) { + final String operation = LINE_ITEM_OPERATION.getString(row); if (operation == null) { return; } // implement the logic from CCF // first check that the unit corresponds to storage - final String unit = PRICING_UNIT.getString(inputRow); + final String unit = PRICING_UNIT.getString(row); if (unit == null || !units.contains(unit)) { return; } - final String usage_type = LINE_ITEM_USAGE_TYPE.getString(inputRow); + final String usage_type = LINE_ITEM_USAGE_TYPE.getString(row); if (usage_type == null) { return; } - final String serviceCode = PRODUCT_SERVICE_CODE.getString(inputRow); + final String serviceCode = PRODUCT_SERVICE_CODE.getString(row); int replication = getReplicationFactor(serviceCode, usage_type); // loop on the values from the resources for (String ssd : ssd_usage_types) { if (usage_type.endsWith(ssd)) { - computeEnergy(inputRow, enrichedValues, false, replication); + computeEnergy(row, enrichedValues, false, replication); return; } } @@ -111,7 +111,7 @@ public void enrich(Row inputRow, Map enrichedValues) { if (serviceCode != null && !usage_type.contains("Backup")) { for (String service : ssd_services) { if (serviceCode.endsWith(service)) { - computeEnergy(inputRow, enrichedValues, false, replication); + computeEnergy(row, enrichedValues, false, replication); return; } } @@ -119,23 +119,23 @@ public void enrich(Row inputRow, Map enrichedValues) { for (String hdd : hdd_usage_types) { if (usage_type.endsWith(hdd)) { - computeEnergy(inputRow, enrichedValues, true, replication); + computeEnergy(row, enrichedValues, true, replication); return; } } // Log so that can improve coverage in the longer term - String product_product_family = PRODUCT_PRODUCT_FAMILY.getString(inputRow); + String product_product_family = PRODUCT_PRODUCT_FAMILY.getString(row); if ("Storage".equals(product_product_family)) { log.debug("Storage type not found for {} {}", operation, usage_type); } } - private void computeEnergy(Row inputRow, Map enrichedValues, boolean isHDD, int replication) { + private void computeEnergy(Row row, Map enrichedValues, boolean isHDD, int replication) { double coefficient = isHDD ? hdd_gb_coefficient : ssd_gb_coefficient; - double amount = USAGE_AMOUNT.getDouble(inputRow); - String unit = PRICING_UNIT.getString(inputRow); + double amount = USAGE_AMOUNT.getDouble(row); + String unit = PRICING_UNIT.getString(row); // normalisation if (!"GB-Hours".equals(unit)) { // it is in GBMonth diff --git a/src/main/java/com/digitalpebble/spruce/modules/electricitymaps/AverageCarbonIntensity.java b/src/main/java/com/digitalpebble/spruce/modules/electricitymaps/AverageCarbonIntensity.java index 7399ff5..672d809 100644 --- a/src/main/java/com/digitalpebble/spruce/modules/electricitymaps/AverageCarbonIntensity.java +++ b/src/main/java/com/digitalpebble/spruce/modules/electricitymaps/AverageCarbonIntensity.java @@ -77,7 +77,7 @@ public Column[] columnsAdded() { } @Override - public void enrich(Row inputRow, Map enrichedValues) { + public void enrich(Row row, Map enrichedValues) { if (!enrichedValues.containsKey(ENERGY_USED)) { return; } diff --git a/src/test/java/com/digitalpebble/spruce/ConfigTest.java b/src/test/java/com/digitalpebble/spruce/ConfigTest.java index 4ef70b3..5d19987 100644 --- a/src/test/java/com/digitalpebble/spruce/ConfigTest.java +++ b/src/test/java/com/digitalpebble/spruce/ConfigTest.java @@ -37,7 +37,7 @@ public Column[] columnsAdded() { } @Override - public void enrich(Row inputRow, Map enrichedValues) { + public void enrich(Row row, Map enrichedValues) { } } diff --git a/src/test/java/com/digitalpebble/spruce/EnrichmentModuleTest.java b/src/test/java/com/digitalpebble/spruce/EnrichmentModuleTest.java index 33d5d23..8caf02d 100644 --- a/src/test/java/com/digitalpebble/spruce/EnrichmentModuleTest.java +++ b/src/test/java/com/digitalpebble/spruce/EnrichmentModuleTest.java @@ -30,7 +30,7 @@ void enrichWritesToMap() { @Override public Column[] columnsAdded() { return new Column[]{ENERGY_USED}; } @Override - public void enrich(Row inputRow, Map enrichedValues) { + public void enrich(Row row, Map enrichedValues) { enrichedValues.put(ENERGY_USED, 42.0); } }; @@ -52,7 +52,7 @@ void enrichAddsToExistingValue() { @Override public Column[] columnsAdded() { return new Column[]{ENERGY_USED}; } @Override - public void enrich(Row inputRow, Map enrichedValues) { + public void enrich(Row row, Map enrichedValues) { Double existing = (Double) enrichedValues.get(ENERGY_USED); double newVal = 2.5; enrichedValues.put(ENERGY_USED, (existing != null ? existing : 0.0) + newVal); @@ -80,7 +80,7 @@ void enrichReadsFromMap() { @Override public Column[] columnsAdded() { return new Column[]{noteCol}; } @Override - public void enrich(Row inputRow, Map enrichedValues) { + public void enrich(Row row, Map enrichedValues) { Object val = enrichedValues.get(ENERGY_USED); if (val != null) { enrichedValues.put(noteCol, "energy=" + val); diff --git a/src/test/java/com/digitalpebble/spruce/EnrichmentPipelineTest.java b/src/test/java/com/digitalpebble/spruce/EnrichmentPipelineTest.java index b5892b3..1aac9db 100644 --- a/src/test/java/com/digitalpebble/spruce/EnrichmentPipelineTest.java +++ b/src/test/java/com/digitalpebble/spruce/EnrichmentPipelineTest.java @@ -63,8 +63,8 @@ public void testPipelineEnrichmentLogic(String lineItemType, boolean expectEnric values[schema.fieldIndex(PRODUCT_REGION_CODE.getLabel())] = "us-east-1"; values[schema.fieldIndex(LINE_ITEM_TYPE.getLabel())] = lineItemType; - Row inputRow = new GenericRowWithSchema(values, schema); - List inputList = Collections.singletonList(inputRow); + Row row = new GenericRowWithSchema(values, schema); + List inputList = Collections.singletonList(row); Iterator results = pipeline.call(inputList.iterator());