diff --git a/src/main/java/com/digitalpebble/spruce/CURColumn.java b/src/main/java/com/digitalpebble/spruce/CURColumn.java
index 5642270..b3a4ab4 100644
--- a/src/main/java/com/digitalpebble/spruce/CURColumn.java
+++ b/src/main/java/com/digitalpebble/spruce/CURColumn.java
@@ -2,6 +2,8 @@
package com.digitalpebble.spruce;
+import org.apache.spark.SparkIllegalArgumentException;
+import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.MapType;
@@ -10,22 +12,53 @@
/** Columns from CUR reports **/
public class CURColumn extends Column {
- public static Column LINE_ITEM_OPERATION = new CURColumn("line_item_operation", StringType);
- public static Column LINE_ITEM_PRODUCT_CODE = new CURColumn("line_item_product_code", StringType);
- public static Column LINE_ITEM_TYPE = new CURColumn("line_item_line_item_type", StringType);
- public static Column LINE_ITEM_USAGE_TYPE = new CURColumn("line_item_usage_type", StringType);
- public static Column PRICING_UNIT= new CURColumn("pricing_unit", StringType);
- public static Column PRODUCT = new CURColumn("product", MapType.apply(StringType,StringType));
- public static Column PRODUCT_INSTANCE_TYPE = new CURColumn("product_instance_type", StringType);
- public static Column PRODUCT_INSTANCE_FAMILY = new CURColumn("product_instance_family", StringType);
- public static Column PRODUCT_PRODUCT_FAMILY = new CURColumn("product_product_family", StringType);
- public static Column PRODUCT_REGION_CODE = new CURColumn("product_region_code", StringType);
- public static Column PRODUCT_FROM_REGION_CODE = new CURColumn("product_from_region_code", StringType);
- public static Column PRODUCT_TO_REGION_CODE = new CURColumn("product_to_region_code", StringType);
- public static Column PRODUCT_SERVICE_CODE = new CURColumn("product_servicecode", StringType);
- public static Column USAGE_AMOUNT = new CURColumn("line_item_usage_amount", DoubleType);
+ public static CURColumn LINE_ITEM_OPERATION = new CURColumn("line_item_operation", StringType);
+ public static CURColumn LINE_ITEM_PRODUCT_CODE = new CURColumn("line_item_product_code", StringType);
+ public static CURColumn LINE_ITEM_TYPE = new CURColumn("line_item_line_item_type", StringType);
+ public static CURColumn LINE_ITEM_USAGE_TYPE = new CURColumn("line_item_usage_type", StringType);
+ public static CURColumn PRICING_UNIT= new CURColumn("pricing_unit", StringType);
+ public static CURColumn PRODUCT = new CURColumn("product", MapType.apply(StringType,StringType));
+ public static CURColumn PRODUCT_INSTANCE_TYPE = new CURColumn("product_instance_type", StringType);
+ public static CURColumn PRODUCT_INSTANCE_FAMILY = new CURColumn("product_instance_family", StringType);
+ public static CURColumn PRODUCT_PRODUCT_FAMILY = new CURColumn("product_product_family", StringType);
+ public static CURColumn PRODUCT_REGION_CODE = new CURColumn("product_region_code", StringType);
+ public static CURColumn PRODUCT_FROM_REGION_CODE = new CURColumn("product_from_region_code", StringType);
+ public static CURColumn PRODUCT_TO_REGION_CODE = new CURColumn("product_to_region_code", StringType);
+ public static CURColumn PRODUCT_SERVICE_CODE = new CURColumn("product_servicecode", StringType);
+ public static CURColumn USAGE_AMOUNT = new CURColumn("line_item_usage_amount", DoubleType);
CURColumn(String l, DataType t) {
super(l, t);
}
+
+ /** Returns the double value for this column in the given row. */
+ public double getDouble(Row r) {
+ return r.getDouble(resolveIndex(r));
+ }
+
+ /**
+ * Returns the String value for this column in the given row.
+ * If optional is true, returns null when the field is not in the schema;
+ * otherwise propagates the exception.
+ */
+ public String getString(Row r, boolean optional) {
+ try {
+ return r.getString(resolveIndex(r));
+ } catch (SparkIllegalArgumentException e) {
+ if (optional) {
+ return null;
+ }
+ throw e;
+ }
+ }
+
+ /** Returns the String value for this column in the given row. */
+ public String getString(Row r) {
+ return getString(r, false);
+ }
+
+ /** Returns true if the value for this column is null in the given row. */
+ public boolean isNullAt(Row r) {
+ return r.isNullAt(resolveIndex(r));
+ }
}
diff --git a/src/main/java/com/digitalpebble/spruce/Column.java b/src/main/java/com/digitalpebble/spruce/Column.java
index cabe18d..9cd6e97 100644
--- a/src/main/java/com/digitalpebble/spruce/Column.java
+++ b/src/main/java/com/digitalpebble/spruce/Column.java
@@ -2,7 +2,6 @@
package com.digitalpebble.spruce;
-import org.apache.spark.SparkIllegalArgumentException;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
@@ -55,43 +54,5 @@ public int resolveIndex(Row r) {
cachedIndex = index;
return index;
}
-
- /**
- * Utility method to get the value for the column in the row
- **/
- public double getDouble(Row r) {
- return r.getDouble(resolveIndex(r));
- }
-
-
- /**
- * Utility method to get the value for the column in the row or null if it does not exist.
- * Returns null if the field is not defined in the schema and optional is set to true,
- * propagates an exception otherwise.
- **/
- public String getString(Row r, boolean optional) {
- try {
- return r.getString(resolveIndex(r));
- } catch (SparkIllegalArgumentException e) {
- if (optional) {
- return null;
- }
- throw e;
- }
- }
-
- /**
- * Utility method to get the value for the column in the row or null if it does not exist
- **/
- public String getString(Row r) {
- return getString(r, false);
- }
-
- /**
- * Utility method to get check that a value is non null
- **/
- public boolean isNullAt(Row r) {
- return r.isNullAt(resolveIndex(r));
- }
}
diff --git a/src/main/java/com/digitalpebble/spruce/EnrichmentModule.java b/src/main/java/com/digitalpebble/spruce/EnrichmentModule.java
index a1f5a4c..a4d0db5 100644
--- a/src/main/java/com/digitalpebble/spruce/EnrichmentModule.java
+++ b/src/main/java/com/digitalpebble/spruce/EnrichmentModule.java
@@ -3,7 +3,6 @@
package com.digitalpebble.spruce;
import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import java.io.Serializable;
import java.util.Map;
@@ -11,7 +10,10 @@
/**
* A module adds new columns to a Dataset and populates them based on its content.
* The columns can represent energy or water consumption, carbon intensity, carbon emissions etc...
- * The bulk of the work is done in the map function.
+ *
+ *
Modules read CUR (input) columns from the original {@link Row} and read/write
+ * Spruce (enrichment) columns via a shared {@code Map}.
+ * The pipeline materialises one final Row at the end, avoiding per-module row copies.
**/
public interface EnrichmentModule extends Serializable {
@@ -25,53 +27,12 @@ default void init(Map params){}
/** Returns the columns added by this module **/
Column[] columnsAdded();
- Row process(Row row);
-
- static Row withUpdatedValue(Row row, Column column, Object newValue) {
- Object[] values = new Object[row.size()];
- for (int i = 0; i < row.size(); i++) {
- values[i] = row.get(i);
- }
- int index = column.resolveIndex(row);
- values[index] = newValue;
- return new GenericRowWithSchema(values, row.schema());
- }
-
- static Row withUpdatedValue(Row row, Column column, Double newValue, boolean add) {
- Object[] values = new Object[row.size()];
- for (int i = 0; i < row.size(); i++) {
- values[i] = row.get(i);
- }
- int index = column.resolveIndex(row);
- Object existing = values[index];
- if (add && existing instanceof Double) {
- values[index] = newValue + (Double) existing;
- } else {
- values[index] = newValue;
- }
- return new GenericRowWithSchema(values, row.schema());
- }
-
- static Row withUpdatedValues(Row row, Map updates) {
- Object[] values = new Object[row.size()];
- for (int i = 0; i < row.size(); i++) {
- values[i] = row.get(i);
- }
-
- for (Map.Entry entry : updates.entrySet()) {
- Column column = entry.getKey();
- Object newValue = entry.getValue();
-
- int index;
- try {
- index = column.resolveIndex(row);
- } catch (IllegalArgumentException e) {
- throw new RuntimeException("Field not found in row: " + column.getLabel(), e);
- }
-
- values[index] = newValue;
- }
-
- return new GenericRowWithSchema(values, row.schema());
- }
+ /**
+ * Enrich the given row by reading input columns from {@code row} and
+ * reading/writing enrichment columns via {@code enrichedValues}.
+ *
+ * @param row the immutable original row from the dataset
+ * @param enrichedValues shared map accumulating enrichment values across all modules
+ */
+ void enrich(Row row, Map enrichedValues);
}
diff --git a/src/main/java/com/digitalpebble/spruce/EnrichmentPipeline.java b/src/main/java/com/digitalpebble/spruce/EnrichmentPipeline.java
index 6da9f38..9ac2a1f 100644
--- a/src/main/java/com/digitalpebble/spruce/EnrichmentPipeline.java
+++ b/src/main/java/com/digitalpebble/spruce/EnrichmentPipeline.java
@@ -4,9 +4,12 @@
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import static com.digitalpebble.spruce.CURColumn.LINE_ITEM_TYPE;
@@ -41,10 +44,21 @@ public Row next() {
// usage filter - only need to enrich entries that correspond to a usage (no tax, discount or fee)
boolean usage = usageFilter(row);
if (!usage) return row;
+
+ Map enriched = new HashMap<>();
for (EnrichmentModule module : enrichmentModules) {
- row = module.process(row);
+ module.enrich(row, enriched);
+ }
+
+ // Materialise the final row — single copy instead of one per module
+ Object[] values = new Object[row.size()];
+ for (int i = 0; i < row.size(); i++) {
+ values[i] = row.get(i);
+ }
+ for (Map.Entry entry : enriched.entrySet()) {
+ values[entry.getKey().resolveIndex(row)] = entry.getValue();
}
- return row;
+ return new GenericRowWithSchema(values, row.schema());
}
};
}
diff --git a/src/main/java/com/digitalpebble/spruce/SpruceColumn.java b/src/main/java/com/digitalpebble/spruce/SpruceColumn.java
index 990fabf..5142cf6 100644
--- a/src/main/java/com/digitalpebble/spruce/SpruceColumn.java
+++ b/src/main/java/com/digitalpebble/spruce/SpruceColumn.java
@@ -4,23 +4,35 @@
import org.apache.spark.sql.types.DataType;
+import java.util.Map;
+
import static org.apache.spark.sql.types.DataTypes.DoubleType;
import static org.apache.spark.sql.types.DataTypes.StringType;
/** Defines columns added by the EnrichmentModules **/
public class SpruceColumn extends Column {
- public static Column ENERGY_USED = new SpruceColumn("operational_energy_kwh", DoubleType);
- public static Column CARBON_INTENSITY = new SpruceColumn("carbon_intensity", DoubleType);
- public static Column OPERATIONAL_EMISSIONS = new SpruceColumn("operational_emissions_co2eq_g", DoubleType);
- public static Column EMBODIED_EMISSIONS = new SpruceColumn("embodied_emissions_co2eq_g", DoubleType);
- public static Column EMBODIED_ADP = new SpruceColumn("embodied_adp_sbeq_g", DoubleType);
- public static Column CPU_LOAD = new SpruceColumn("cpu_load_percentage", DoubleType);
- public static Column PUE = new SpruceColumn("power_usage_effectiveness", DoubleType);
- public static Column REGION = new SpruceColumn("region", StringType);
+ public static SpruceColumn ENERGY_USED = new SpruceColumn("operational_energy_kwh", DoubleType);
+ public static SpruceColumn CARBON_INTENSITY = new SpruceColumn("carbon_intensity", DoubleType);
+ public static SpruceColumn OPERATIONAL_EMISSIONS = new SpruceColumn("operational_emissions_co2eq_g", DoubleType);
+ public static SpruceColumn EMBODIED_EMISSIONS = new SpruceColumn("embodied_emissions_co2eq_g", DoubleType);
+ public static SpruceColumn EMBODIED_ADP = new SpruceColumn("embodied_adp_sbeq_g", DoubleType);
+ public static SpruceColumn CPU_LOAD = new SpruceColumn("cpu_load_percentage", DoubleType);
+ public static SpruceColumn PUE = new SpruceColumn("power_usage_effectiveness", DoubleType);
+ public static SpruceColumn REGION = new SpruceColumn("region", StringType);
private SpruceColumn(String l, DataType t) {
super(l, t);
}
+ /** Returns the String value for this column from the enriched values map, or null if absent. */
+ public String getString(Map enrichedValues) {
+ return (String) enrichedValues.get(this);
+ }
+
+ /** Returns the Double value for this column from the enriched values map, or null if absent. */
+ public Double getDouble(Map enrichedValues) {
+ return (Double) enrichedValues.get(this);
+ }
+
}
diff --git a/src/main/java/com/digitalpebble/spruce/modules/ConstantLoad.java b/src/main/java/com/digitalpebble/spruce/modules/ConstantLoad.java
index 398dc57..b6f0981 100644
--- a/src/main/java/com/digitalpebble/spruce/modules/ConstantLoad.java
+++ b/src/main/java/com/digitalpebble/spruce/modules/ConstantLoad.java
@@ -33,7 +33,7 @@ public Column[] columnsAdded() {
}
@Override
- public Row process(Row row) {
- return EnrichmentModule.withUpdatedValue(row, CPU_LOAD, load_value);
+ public void enrich(Row row, Map enrichedValues) {
+ enrichedValues.put(CPU_LOAD, load_value);
}
}
diff --git a/src/main/java/com/digitalpebble/spruce/modules/OperationalEmissions.java b/src/main/java/com/digitalpebble/spruce/modules/OperationalEmissions.java
index 804c248..8c0bc68 100644
--- a/src/main/java/com/digitalpebble/spruce/modules/OperationalEmissions.java
+++ b/src/main/java/com/digitalpebble/spruce/modules/OperationalEmissions.java
@@ -6,6 +6,8 @@
import com.digitalpebble.spruce.EnrichmentModule;
import org.apache.spark.sql.Row;
+import java.util.Map;
+
import static com.digitalpebble.spruce.SpruceColumn.*;
/**
@@ -25,22 +27,18 @@ public Column[] columnsAdded() {
}
@Override
- public Row process(Row row) {
- if (ENERGY_USED.isNullAt(row)) {
- return row;
- }
-
- if (CARBON_INTENSITY.isNullAt(row)) {
- return row;
- }
+ public void enrich(Row row, Map enrichedValues) {
+ Double energyUsed = ENERGY_USED.getDouble(enrichedValues);
+ if (energyUsed == null) return;
- final double energyUsed = ENERGY_USED.getDouble(row);
+ Double carbon_intensity = CARBON_INTENSITY.getDouble(enrichedValues);
+ if (carbon_intensity == null) return;
// take into account the PUE if present
- final double pue = PUE.isNullAt(row) ? 1.0 : PUE.getDouble(row);
- final double carbon_intensity = CARBON_INTENSITY.getDouble(row);
+ Double pueVal = PUE.getDouble(enrichedValues);
+ final double pue = pueVal != null ? pueVal : 1.0;
final double emissions = energyUsed * carbon_intensity * pue;
- return EnrichmentModule.withUpdatedValue(row, OPERATIONAL_EMISSIONS, emissions);
+ enrichedValues.put(OPERATIONAL_EMISSIONS, emissions);
}
}
diff --git a/src/main/java/com/digitalpebble/spruce/modules/PUE.java b/src/main/java/com/digitalpebble/spruce/modules/PUE.java
index 46742bd..f0543ca 100644
--- a/src/main/java/com/digitalpebble/spruce/modules/PUE.java
+++ b/src/main/java/com/digitalpebble/spruce/modules/PUE.java
@@ -84,22 +84,17 @@ public Column[] columnsAdded() {
}
@Override
- public Row process(Row row) {
- if (ENERGY_USED.isNullAt(row)) {
- return row;
- }
+ public void enrich(Row row, Map enrichedValues) {
+ Double energyUsed = ENERGY_USED.getDouble(enrichedValues);
+ if (energyUsed == null) return;
- double energyUsed = ENERGY_USED.getDouble(row);
- if (energyUsed <= 0) return row;
+ if (energyUsed <= 0) return;
- String region = null;
- if (!REGION.isNullAt(row)) {
- region = REGION.getString(row);
- }
+ String region = REGION.getString(enrichedValues);
double pueToApply = getPueForRegion(region);
- return EnrichmentModule.withUpdatedValue(row, SpruceColumn.PUE, pueToApply);
+ enrichedValues.put(SpruceColumn.PUE, pueToApply);
}
private double getPueForRegion(String region) {
@@ -119,4 +114,4 @@ private double getPueForRegion(String region) {
return defaultPueValue;
}
-}
\ No newline at end of file
+}
diff --git a/src/main/java/com/digitalpebble/spruce/modules/RegionExtraction.java b/src/main/java/com/digitalpebble/spruce/modules/RegionExtraction.java
index 73e4e39..7ecc293 100644
--- a/src/main/java/com/digitalpebble/spruce/modules/RegionExtraction.java
+++ b/src/main/java/com/digitalpebble/spruce/modules/RegionExtraction.java
@@ -2,10 +2,13 @@
package com.digitalpebble.spruce.modules;
+import com.digitalpebble.spruce.CURColumn;
import com.digitalpebble.spruce.Column;
import com.digitalpebble.spruce.EnrichmentModule;
import org.apache.spark.sql.Row;
+import java.util.Map;
+
import static com.digitalpebble.spruce.CURColumn.*;
import static com.digitalpebble.spruce.SpruceColumn.*;
@@ -14,7 +17,7 @@
**/
public class RegionExtraction implements EnrichmentModule {
- private static final Column[] location_columns = new Column[]{PRODUCT_REGION_CODE, PRODUCT_FROM_REGION_CODE, PRODUCT_TO_REGION_CODE};
+ private static final CURColumn[] location_columns = new CURColumn[]{PRODUCT_REGION_CODE, PRODUCT_FROM_REGION_CODE, PRODUCT_TO_REGION_CODE};
@Override
public Column[] columnsNeeded() {
@@ -27,16 +30,16 @@ public Column[] columnsAdded() {
}
@Override
- public Row process(Row row) {
+ public void enrich(Row row, Map enrichedValues) {
// get the location
// in most cases you have a product_region_code but can be product_to_region_code or product_from_region_code
// when the traffic is between two regions or to/from the outside
- for (Column c : location_columns) {
+ for (CURColumn c : location_columns) {
String locationCode = c.getString(row);
if (locationCode != null) {
- return EnrichmentModule.withUpdatedValue(row, REGION, locationCode);
+ enrichedValues.put(REGION, locationCode);
+ return;
}
}
- return row;
}
}
diff --git a/src/main/java/com/digitalpebble/spruce/modules/Serverless.java b/src/main/java/com/digitalpebble/spruce/modules/Serverless.java
index f756c35..0b3a7e1 100644
--- a/src/main/java/com/digitalpebble/spruce/modules/Serverless.java
+++ b/src/main/java/com/digitalpebble/spruce/modules/Serverless.java
@@ -4,20 +4,14 @@
import com.digitalpebble.spruce.Column;
import com.digitalpebble.spruce.EnrichmentModule;
-import com.digitalpebble.spruce.modules.ccf.Storage;
import org.apache.spark.sql.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.List;
import java.util.Map;
import static com.digitalpebble.spruce.CURColumn.*;
-import static com.digitalpebble.spruce.CURColumn.PRICING_UNIT;
-import static com.digitalpebble.spruce.CURColumn.PRODUCT_SERVICE_CODE;
import static com.digitalpebble.spruce.SpruceColumn.*;
-import static com.digitalpebble.spruce.Utils.loadJSONResources;
/**
* Estimates the energy usage for CPU and memory of serverless services
@@ -74,10 +68,10 @@ public Column[] columnsAdded() {
}
@Override
- public Row process(Row row) {
+ public void enrich(Row row, Map enrichedValues) {
String usage_type = LINE_ITEM_USAGE_TYPE.getString(row);
if (usage_type == null) {
- return row;
+ return;
}
String operation = LINE_ITEM_OPERATION.getString(row);
@@ -86,7 +80,8 @@ public Row process(Row row) {
if (usage_type.endsWith("-GB-Hours")) {
double amount_gb = USAGE_AMOUNT.getDouble(row);
double energy = amount_gb * memory_coefficient_kwh;
- return EnrichmentModule.withUpdatedValue(row, ENERGY_USED, energy);
+ enrichedValues.put(ENERGY_USED, energy);
+ return;
}
// cpu
@@ -95,14 +90,16 @@ public Row process(Row row) {
boolean isARM = usage_type.contains("-ARM-");
double coefficient = isARM ? arm_cpu_coefficient_kwh : x86_cpu_coefficient_kwh;
double energy = amount_vcpu * coefficient;
- return EnrichmentModule.withUpdatedValue(row, ENERGY_USED, energy);
+ enrichedValues.put(ENERGY_USED, energy);
+ return;
}
}
else if (usage_type.contains("EMR-SERVERLESS")) {
if (usage_type.endsWith("MemoryGBHours")) {
double amount_gb = USAGE_AMOUNT.getDouble(row);
double energy = amount_gb * memory_coefficient_kwh;
- return EnrichmentModule.withUpdatedValue(row, ENERGY_USED, energy);
+ enrichedValues.put(ENERGY_USED, energy);
+ return;
}
// cpu
@@ -111,11 +108,9 @@ else if (usage_type.contains("EMR-SERVERLESS")) {
boolean isARM = usage_type.contains("-ARM-");
double coefficient = isARM ? arm_cpu_coefficient_kwh : x86_cpu_coefficient_kwh;
double energy = amount_vcpu * coefficient;
- return EnrichmentModule.withUpdatedValue(row, ENERGY_USED, energy);
+ enrichedValues.put(ENERGY_USED, energy);
+ return;
}
}
-
-
- return row;
}
}
diff --git a/src/main/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPI.java b/src/main/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPI.java
index 3e73c4c..f7be778 100644
--- a/src/main/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPI.java
+++ b/src/main/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPI.java
@@ -11,7 +11,6 @@
import org.jspecify.annotations.Nullable;
import java.io.IOException;
-import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -57,7 +56,7 @@ public Column[] columnsAdded() {
}
@Override
- public Row process(Row row) {
+ public void enrich(Row row, Map enrichedValues) {
if (cache == null) {
cache = Caffeine.newBuilder()
@@ -77,7 +76,7 @@ public Row process(Row row) {
String instanceType = PRODUCT_INSTANCE_TYPE.getString(row);
if (instanceType == null) {
- return row;
+ return;
}
final String service_code = PRODUCT_SERVICE_CODE.getString(row);
@@ -85,7 +84,7 @@ public Row process(Row row) {
final String product_code = LINE_ITEM_PRODUCT_CODE.getString(row);
if (operation == null || product_code == null) {
- return row;
+ return;
}
// conditions for EC2 instances
@@ -106,12 +105,12 @@ else if (product_code.equals("AmazonES") && operation.equals("ESDomain")) {
instanceType = instanceType.substring(3);
}
} else {
- return row;
+ return;
}
// don't look for instance types that are known to be unknown
if (unknownInstanceTypes.contains(instanceType)) {
- return row;
+ return;
}
Impacts impacts = cache.getIfPresent(instanceType);
@@ -123,19 +122,17 @@ else if (product_code.equals("AmazonES") && operation.equals("ESDomain")) {
} catch (InstanceTypeUknown e1) {
LOG.info("Unknown instance type {}", instanceType);
unknownInstanceTypes.add(instanceType);
- return row;
+ return;
} catch (IOException e) {
LOG.error("Exception caught when retrieving estimates for {}", instanceType, e);
- return row;
+ return;
}
}
double amount = USAGE_AMOUNT.getDouble(row);
- Map kv = new HashMap<>();
- kv.put(ENERGY_USED, impacts.getFinalEnergyKWh() * amount);
- kv.put(EMBODIED_EMISSIONS, impacts.getEmbeddedEmissionsGramsCO2eq() * amount);
- kv.put(EMBODIED_ADP, impacts.getAbioticDepletionPotentialGramsSbeq() * amount);
- return EnrichmentModule.withUpdatedValues(row, kv);
+ enrichedValues.put(ENERGY_USED, impacts.getFinalEnergyKWh() * amount);
+ enrichedValues.put(EMBODIED_EMISSIONS, impacts.getEmbeddedEmissionsGramsCO2eq() * amount);
+ enrichedValues.put(EMBODIED_ADP, impacts.getAbioticDepletionPotentialGramsSbeq() * amount);
}
-}
\ No newline at end of file
+}
diff --git a/src/main/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPIstatic.java b/src/main/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPIstatic.java
index af0efdc..6d4532b 100644
--- a/src/main/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPIstatic.java
+++ b/src/main/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPIstatic.java
@@ -73,11 +73,11 @@ public Column[] columnsAdded() {
}
@Override
- public Row process(Row row) {
+ public void enrich(Row row, Map enrichedValues) {
String instanceType = PRODUCT_INSTANCE_TYPE.getString(row);
if (instanceType == null) {
- return row;
+ return;
}
final String service_code = PRODUCT_SERVICE_CODE.getString(row);
@@ -85,7 +85,7 @@ public Row process(Row row) {
final String product_code = LINE_ITEM_PRODUCT_CODE.getString(row);
if (operation == null || product_code == null) {
- return row;
+ return;
}
// conditions for EC2 instances
@@ -106,12 +106,12 @@ else if (product_code.equals("AmazonES") && operation.equals("ESDomain")) {
instanceType = instanceType.substring(3);
}
} else {
- return row;
+ return;
}
// don't look for instance types that are known to be unknown
if (unknownInstanceTypes.contains(instanceType)) {
- return row;
+ return;
}
final Impacts impacts = impactsMap.get(instanceType);
@@ -119,15 +119,13 @@ else if (product_code.equals("AmazonES") && operation.equals("ESDomain")) {
if (impacts == null) {
LOG.info("Unknown instance type {}", instanceType);
unknownInstanceTypes.add(instanceType);
- return row;
+ return;
}
double amount = USAGE_AMOUNT.getDouble(row);
- Map kv = new HashMap<>();
- kv.put(ENERGY_USED, impacts.getFinalEnergyKWh() * amount);
- kv.put(EMBODIED_EMISSIONS, impacts.getEmbeddedEmissionsGramsCO2eq() * amount);
- kv.put(EMBODIED_ADP, impacts.getAbioticDepletionPotentialGramsSbeq() * amount);
- return EnrichmentModule.withUpdatedValues(row, kv);
+ enrichedValues.put(ENERGY_USED, impacts.getFinalEnergyKWh() * amount);
+ enrichedValues.put(EMBODIED_EMISSIONS, impacts.getEmbeddedEmissionsGramsCO2eq() * amount);
+ enrichedValues.put(EMBODIED_ADP, impacts.getAbioticDepletionPotentialGramsSbeq() * amount);
}
-}
\ No newline at end of file
+}
diff --git a/src/main/java/com/digitalpebble/spruce/modules/ccf/Accelerators.java b/src/main/java/com/digitalpebble/spruce/modules/ccf/Accelerators.java
index acf206e..80581bb 100644
--- a/src/main/java/com/digitalpebble/spruce/modules/ccf/Accelerators.java
+++ b/src/main/java/com/digitalpebble/spruce/modules/ccf/Accelerators.java
@@ -52,19 +52,19 @@ public Column[] columnsAdded() {
}
@Override
- public Row process(Row row) {
+ public void enrich(Row row, Map enrichedValues) {
// limit to EC2 instances
String instanceType = PRODUCT_INSTANCE_TYPE.getString(row);
if (instanceType == null) {
- return row;
+ return;
}
final String operation = LINE_ITEM_OPERATION.getString(row);
final String product_code = LINE_ITEM_PRODUCT_CODE.getString(row);
if (operation == null || product_code == null) {
- return row;
+ return;
}
// conditions for EC2 instances
@@ -72,7 +72,7 @@ public Row process(Row row) {
LOG.debug("EC2 instance {}", instanceType);
}
else {
- return row;
+ return;
}
// check that they have a GPU
@@ -85,7 +85,7 @@ public Row process(Row row) {
if ("GPU instance".equals(fam)) {
LOG.debug("Lacking info for instance type with GPU {}", instanceType);
}
- return row;
+ return;
}
String gpu = instanceTypeInfo.get("type").toString();
@@ -105,6 +105,10 @@ public Row process(Row row) {
energy_used = (amount * energy_used * quantity / 1000);
// add it to an existing value or create it
- return EnrichmentModule.withUpdatedValue(row, ENERGY_USED, energy_used, true);
+ Double existing = ENERGY_USED.getDouble(enrichedValues);
+ if (existing != null) {
+ energy_used += existing;
+ }
+ enrichedValues.put(ENERGY_USED, energy_used);
}
}
diff --git a/src/main/java/com/digitalpebble/spruce/modules/ccf/Networking.java b/src/main/java/com/digitalpebble/spruce/modules/ccf/Networking.java
index 72b3e04..5a83f9e 100644
--- a/src/main/java/com/digitalpebble/spruce/modules/ccf/Networking.java
+++ b/src/main/java/com/digitalpebble/spruce/modules/ccf/Networking.java
@@ -47,10 +47,10 @@ public Column[] columnsAdded() {
}
@Override
- public Row process(Row row) {
+ public void enrich(Row row, Map enrichedValues) {
String service_code = PRODUCT_SERVICE_CODE.getString(row);
if (service_code == null || !service_code.equals("AWSDataTransfer")) {
- return row;
+ return;
}
// apply only to rows corresponding to networking in or out of a region
int index = PRODUCT.resolveIndex(row);
@@ -58,7 +58,7 @@ public Row process(Row row) {
String transfer_type = (String) productMap.getOrDefault("transfer_type", "");
if (!transfer_type.startsWith("InterRegion")) {
- return row;
+ return;
}
// TODO consider extending to AWS Outbound and Inbound
@@ -67,6 +67,6 @@ public Row process(Row row) {
double amount_gb = USAGE_AMOUNT.getDouble(row);
double energy_gb = amount_gb * network_coefficient;
- return EnrichmentModule.withUpdatedValue(row, ENERGY_USED, energy_gb);
+ enrichedValues.put(ENERGY_USED, energy_gb);
}
}
diff --git a/src/main/java/com/digitalpebble/spruce/modules/ccf/Storage.java b/src/main/java/com/digitalpebble/spruce/modules/ccf/Storage.java
index 2a9661e..9ab3ca5 100644
--- a/src/main/java/com/digitalpebble/spruce/modules/ccf/Storage.java
+++ b/src/main/java/com/digitalpebble/spruce/modules/ccf/Storage.java
@@ -77,22 +77,22 @@ public Column[] columnsAdded() {
}
@Override
- public Row process(Row row) {
+ public void enrich(Row row, Map enrichedValues) {
final String operation = LINE_ITEM_OPERATION.getString(row);
if (operation == null) {
- return row;
+ return;
}
// implement the logic from CCF
// first check that the unit corresponds to storage
final String unit = PRICING_UNIT.getString(row);
if (unit == null || !units.contains(unit)) {
- return row;
+ return;
}
final String usage_type = LINE_ITEM_USAGE_TYPE.getString(row);
if (usage_type == null) {
- return row;
+ return;
}
final String serviceCode = PRODUCT_SERVICE_CODE.getString(row);
@@ -101,7 +101,8 @@ public Row process(Row row) {
// loop on the values from the resources
for (String ssd : ssd_usage_types) {
if (usage_type.endsWith(ssd)) {
- return enrich(row, false, replication);
+ computeEnergy(row, enrichedValues, false, replication);
+ return;
}
}
@@ -110,14 +111,16 @@ public Row process(Row row) {
if (serviceCode != null && !usage_type.contains("Backup")) {
for (String service : ssd_services) {
if (serviceCode.endsWith(service)) {
- return enrich(row, false, replication);
+ computeEnergy(row, enrichedValues, false, replication);
+ return;
}
}
}
for (String hdd : hdd_usage_types) {
if (usage_type.endsWith(hdd)) {
- return enrich(row, true, replication);
+ computeEnergy(row, enrichedValues, true, replication);
+ return;
}
}
@@ -126,13 +129,10 @@ public Row process(Row row) {
if ("Storage".equals(product_product_family)) {
log.debug("Storage type not found for {} {}", operation, usage_type);
}
-
- // not been found
- return row;
}
- private Row enrich(Row row, boolean isHDD, int replication) {
+ private void computeEnergy(Row row, Map enrichedValues, boolean isHDD, int replication) {
double coefficient = isHDD ? hdd_gb_coefficient : ssd_gb_coefficient;
double amount = USAGE_AMOUNT.getDouble(row);
String unit = PRICING_UNIT.getString(row);
@@ -143,7 +143,7 @@ private Row enrich(Row row, boolean isHDD, int replication) {
}
// to kwh
double energy_kwh = amount /1000 * coefficient * replication;
- return EnrichmentModule.withUpdatedValue(row, ENERGY_USED, energy_kwh);
+ enrichedValues.put(ENERGY_USED, energy_kwh);
}
/**
@@ -211,4 +211,4 @@ private static boolean containsAny(String usageType, String... patterns) {
return false;
}
-}
\ No newline at end of file
+}
diff --git a/src/main/java/com/digitalpebble/spruce/modules/electricitymaps/AverageCarbonIntensity.java b/src/main/java/com/digitalpebble/spruce/modules/electricitymaps/AverageCarbonIntensity.java
index 8c7e4dc..672d809 100644
--- a/src/main/java/com/digitalpebble/spruce/modules/electricitymaps/AverageCarbonIntensity.java
+++ b/src/main/java/com/digitalpebble/spruce/modules/electricitymaps/AverageCarbonIntensity.java
@@ -77,23 +77,23 @@ public Column[] columnsAdded() {
}
@Override
- public Row process(Row row) {
- if (ENERGY_USED.isNullAt(row)) {
- return row;
+ public void enrich(Row row, Map enrichedValues) {
+ if (!enrichedValues.containsKey(ENERGY_USED)) {
+ return;
}
- String locationCode = REGION.getString(row);
+ String locationCode = REGION.getString(enrichedValues);
// no location found - skip
if (locationCode == null) {
- return row;
+ return;
}
// get intensity for the location
Double coeff = getAverageIntensity(Provider.AWS, locationCode);
if (coeff == null) {
// if the coefficient is 0 it means that the region is not supported
- return row;
+ return;
}
- return EnrichmentModule.withUpdatedValue(row, CARBON_INTENSITY, coeff);
+ enrichedValues.put(CARBON_INTENSITY, coeff);
}
}
diff --git a/src/test/java/com/digitalpebble/spruce/ColumnTest.java b/src/test/java/com/digitalpebble/spruce/ColumnTest.java
index e934854..099953e 100644
--- a/src/test/java/com/digitalpebble/spruce/ColumnTest.java
+++ b/src/test/java/com/digitalpebble/spruce/ColumnTest.java
@@ -13,7 +13,7 @@ public class ColumnTest {
@Test
public void testGetLabelAndType() {
- TestColumn col = new TestColumn("field1", DataTypes.StringType);
+ CURColumn col = new CURColumn("field1", DataTypes.StringType);
assertEquals("field1", col.getLabel());
assertEquals(DataTypes.StringType, col.getType());
}
@@ -26,7 +26,7 @@ public void testGetStringAndIsNullAt() {
// non-null value
Row r1 = new GenericRowWithSchema(new Object[]{"hello"}, schema);
- TestColumn col = new TestColumn(name, DataTypes.StringType);
+ CURColumn col = new CURColumn(name, DataTypes.StringType);
assertFalse(col.isNullAt(r1));
assertEquals("hello", col.getString(r1));
@@ -36,7 +36,7 @@ public void testGetStringAndIsNullAt() {
assertNull(col.getString(r2));
// ask for a field not in the schema but allow it
- TestColumn col2 = new TestColumn("unknown", DataTypes.StringType);
+ CURColumn col2 = new CURColumn("unknown", DataTypes.StringType);
assertNull(col2.getString(r2, true));
boolean exception = false;
@@ -55,7 +55,7 @@ public void testGetDoubleAndIsNullAt() {
// non-null double
Row r1 = new GenericRowWithSchema(new Object[]{3.14159}, schema);
- TestColumn col = new TestColumn(name, DataTypes.DoubleType);
+ CURColumn col = new CURColumn(name, DataTypes.DoubleType);
assertFalse(col.isNullAt(r1));
assertEquals(3.14159, col.getDouble(r1), 1e-9);
@@ -63,12 +63,4 @@ public void testGetDoubleAndIsNullAt() {
Row r2 = new GenericRowWithSchema(new Object[]{null}, schema);
assertTrue(col.isNullAt(r2));
}
-
- // minimal concrete implementation for testing
- static class TestColumn extends Column {
- TestColumn(String l, DataType t) {
- super(l, t);
- }
- }
}
-
diff --git a/src/test/java/com/digitalpebble/spruce/ConfigTest.java b/src/test/java/com/digitalpebble/spruce/ConfigTest.java
index 42d6f16..5d19987 100644
--- a/src/test/java/com/digitalpebble/spruce/ConfigTest.java
+++ b/src/test/java/com/digitalpebble/spruce/ConfigTest.java
@@ -37,8 +37,7 @@ public Column[] columnsAdded() {
}
@Override
- public Row process(Row row) {
- return row;
+ public void enrich(Row row, Map enrichedValues) {
}
}
diff --git a/src/test/java/com/digitalpebble/spruce/EnrichmentModuleTest.java b/src/test/java/com/digitalpebble/spruce/EnrichmentModuleTest.java
index d02b49c..8caf02d 100644
--- a/src/test/java/com/digitalpebble/spruce/EnrichmentModuleTest.java
+++ b/src/test/java/com/digitalpebble/spruce/EnrichmentModuleTest.java
@@ -8,62 +8,94 @@
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.HashMap;
+import java.util.Map;
+
import static com.digitalpebble.spruce.SpruceColumn.ENERGY_USED;
import static org.junit.jupiter.api.Assertions.*;
+/**
+ * Tests that modules correctly write to and read from the enrichedValues map.
+ */
class EnrichmentModuleTest {
- private Row baseRow;
-
- @BeforeEach
- void setUp() {
- StructField[] fields = new StructField[]{
- DataTypes.createStructField("id", DataTypes.IntegerType, false),
- DataTypes.createStructField(ENERGY_USED.getLabel(), DataTypes.DoubleType, true),
- DataTypes.createStructField("note", DataTypes.StringType, true)
+ @Test
+ void enrichWritesToMap() {
+ // A minimal module that writes a value
+ EnrichmentModule module = new EnrichmentModule() {
+ @Override
+ public Column[] columnsNeeded() { return new Column[0]; }
+ @Override
+ public Column[] columnsAdded() { return new Column[]{ENERGY_USED}; }
+ @Override
+ public void enrich(Row row, Map enrichedValues) {
+ enrichedValues.put(ENERGY_USED, 42.0);
+ }
};
- StructType schema = new StructType(fields);
- baseRow = new GenericRowWithSchema(new Object[]{1, 10.0, "initial"}, schema);
- }
- @Test
- void withUpdatedValue() {
- // replace value
- Row replaced = EnrichmentModule.withUpdatedValue(baseRow, ENERGY_USED, 5.0, false);
- assertEquals(5.0, replaced.getDouble(replaced.fieldIndex(ENERGY_USED.getLabel())), 0.0001);
-
- // add to existing value
- Row added = EnrichmentModule.withUpdatedValue(baseRow, ENERGY_USED, 2.5, true);
- assertEquals(12.5, added.getDouble(added.fieldIndex(ENERGY_USED.getLabel())), 0.0001);
+ StructType schema = Utils.getSchema(module);
+ Row row = new GenericRowWithSchema(new Object[]{null}, schema);
+ Map enriched = new HashMap<>();
+ module.enrich(row, enriched);
+
+ assertEquals(42.0, enriched.get(ENERGY_USED));
}
@Test
- void testWithUpdatedValue() {
- // using the simple setter that infers replacement (non-Double add overload)
- Row updated = EnrichmentModule.withUpdatedValue(baseRow, ENERGY_USED, 42.0);
- assertEquals(42.0, updated.getDouble(updated.fieldIndex(ENERGY_USED.getLabel())), 0.0001);
+ void enrichAddsToExistingValue() {
+ // Simulates the Accelerators add-to-existing pattern
+ EnrichmentModule module = new EnrichmentModule() {
+ @Override
+ public Column[] columnsNeeded() { return new Column[0]; }
+ @Override
+ public Column[] columnsAdded() { return new Column[]{ENERGY_USED}; }
+ @Override
+ public void enrich(Row row, Map enrichedValues) {
+ Double existing = (Double) enrichedValues.get(ENERGY_USED);
+ double newVal = 2.5;
+ enrichedValues.put(ENERGY_USED, (existing != null ? existing : 0.0) + newVal);
+ }
+ };
- // update non-numeric field (should simply set new value)
- Column noteCol = new ProxyColumn("note", DataTypes.StringType);
+ StructType schema = Utils.getSchema(module);
+ Row row = new GenericRowWithSchema(new Object[]{null}, schema);
+
+ Map enriched = new HashMap<>();
+ enriched.put(ENERGY_USED, 10.0);
+ module.enrich(row, enriched);
- Row noteUpdated = EnrichmentModule.withUpdatedValue(baseRow, noteCol, "updated");
- assertEquals("updated", noteUpdated.getString(noteUpdated.fieldIndex("note")));
+ assertEquals(12.5, enriched.get(ENERGY_USED));
}
@Test
- void withUpdatedValues() {
- java.util.Map updates = new java.util.HashMap<>();
- updates.put(ENERGY_USED, 7.5);
- updates.put(new ProxyColumn("note", DataTypes.StringType), "changed");
+ void enrichReadsFromMap() {
+ // Simulates a downstream module reading a value set by an earlier module
+ Column noteCol = new ProxyColumn("note", DataTypes.StringType);
+
+ EnrichmentModule module = new EnrichmentModule() {
+ @Override
+ public Column[] columnsNeeded() { return new Column[]{ENERGY_USED}; }
+ @Override
+ public Column[] columnsAdded() { return new Column[]{noteCol}; }
+ @Override
+ public void enrich(Row row, Map enrichedValues) {
+ Object val = enrichedValues.get(ENERGY_USED);
+ if (val != null) {
+ enrichedValues.put(noteCol, "energy=" + val);
+ }
+ }
+ };
+
+ StructType schema = Utils.getSchema(module);
+ Row row = new GenericRowWithSchema(new Object[]{null, null}, schema);
- Row updated = EnrichmentModule.withUpdatedValues(baseRow, updates);
+ Map enriched = new HashMap<>();
+ enriched.put(ENERGY_USED, 7.5);
+ module.enrich(row, enriched);
- assertEquals(1, updated.getInt(updated.fieldIndex("id")));
- assertEquals(7.5, updated.getDouble(updated.fieldIndex(ENERGY_USED.getLabel())), 0.0001);
- assertEquals("changed", updated.getString(updated.fieldIndex("note")));
+ assertEquals("energy=7.5", enriched.get(noteCol));
}
static class ProxyColumn extends Column {
@@ -71,4 +103,4 @@ static class ProxyColumn extends Column {
super(l, t);
}
}
-}
\ No newline at end of file
+}
diff --git a/src/test/java/com/digitalpebble/spruce/EnrichmentPipelineTest.java b/src/test/java/com/digitalpebble/spruce/EnrichmentPipelineTest.java
index b5892b3..1aac9db 100644
--- a/src/test/java/com/digitalpebble/spruce/EnrichmentPipelineTest.java
+++ b/src/test/java/com/digitalpebble/spruce/EnrichmentPipelineTest.java
@@ -63,8 +63,8 @@ public void testPipelineEnrichmentLogic(String lineItemType, boolean expectEnric
values[schema.fieldIndex(PRODUCT_REGION_CODE.getLabel())] = "us-east-1";
values[schema.fieldIndex(LINE_ITEM_TYPE.getLabel())] = lineItemType;
- Row inputRow = new GenericRowWithSchema(values, schema);
- List inputList = Collections.singletonList(inputRow);
+ Row row = new GenericRowWithSchema(values, schema);
+ List inputList = Collections.singletonList(row);
Iterator results = pipeline.call(inputList.iterator());
diff --git a/src/test/java/com/digitalpebble/spruce/EnrichmentStrategyBenchmark.java b/src/test/java/com/digitalpebble/spruce/EnrichmentStrategyBenchmark.java
new file mode 100644
index 0000000..07ee422
--- /dev/null
+++ b/src/test/java/com/digitalpebble/spruce/EnrichmentStrategyBenchmark.java
@@ -0,0 +1,255 @@
+// SPDX-License-Identifier: Apache-2.0
+
+package com.digitalpebble.spruce;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.digitalpebble.spruce.SpruceColumn.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Benchmarks the current row-copy-per-module enrichment approach against a
+ * map-based approach where modules accumulate updates in a {@code Map}
+ * and a single Row is materialised at the end of the pipeline.
+ *
+ * Current approach
+ * Every call to {@link EnrichmentModule#withUpdatedValue} allocates a new
+ * {@code Object[]}, copies all column values from the previous row,
+ * sets the target value, and wraps it in a new {@link GenericRowWithSchema}.
+ * With 6 modules and a 108-column schema this means 6 full array copies and
+ * 6 Row allocations per input row.
+ *
+ * Map approach
+ * Modules read CUR (input) columns from the immutable original Row and
+ * read/write Spruce (enrichment) columns via a shared {@code Map}.
+ * One {@link GenericRowWithSchema} is created at the very end — reducing
+ * N array copies to 1.
+ *
+ * The simulated pipeline mirrors the real module chain:
+ *
+ * - RegionExtraction → writes REGION
+ * - BoaviztAPIstatic → writes ENERGY_USED, EMBODIED_EMISSIONS, EMBODIED_ADP
+ * - Accelerators → adds to ENERGY_USED
+ * - PUE → reads ENERGY_USED, writes PUE
+ * - CarbonIntensity → reads REGION, writes CARBON_INTENSITY
+ * - OperationalEmissions → reads ENERGY_USED, PUE, CARBON_INTENSITY → writes OPERATIONAL_EMISSIONS
+ *
+ */
+public class EnrichmentStrategyBenchmark {
+
+ /** CUR input columns — realistic width for an AWS CUR report. */
+ private static final int CUR_WIDTH = 100;
+
+ private static final Column[] SPRUCE_COLUMNS = {
+ REGION, ENERGY_USED, EMBODIED_EMISSIONS, EMBODIED_ADP,
+ PUE, CARBON_INTENSITY, OPERATIONAL_EMISSIONS, CPU_LOAD
+ };
+
+ private static StructType schema;
+ private static Row templateRow;
+ private static int totalWidth;
+
+ @BeforeAll
+ static void setup() {
+ StructField[] fields = new StructField[CUR_WIDTH + SPRUCE_COLUMNS.length];
+ for (int i = 0; i < CUR_WIDTH; i++) {
+ fields[i] = DataTypes.createStructField("cur_col_" + i, DataTypes.StringType, true);
+ }
+ for (int i = 0; i < SPRUCE_COLUMNS.length; i++) {
+ fields[CUR_WIDTH + i] = DataTypes.createStructField(
+ SPRUCE_COLUMNS[i].getLabel(), SPRUCE_COLUMNS[i].getType(), true);
+ }
+ schema = new StructType(fields);
+ totalWidth = schema.length();
+
+ Object[] values = new Object[totalWidth];
+ for (int i = 0; i < CUR_WIDTH; i++) {
+ values[i] = "cur_value_" + i;
+ }
+ // Spruce columns start as null — same as the real pipeline
+ templateRow = new GenericRowWithSchema(values, schema);
+ }
+
+ // ==================================================================
+ // CURRENT APPROACH — one Row copy per module update
+ // ==================================================================
+
+ /** Mirrors the old per-module row-copy approach. */
+ private static Row copyAndSet(Row row, Column column, Object newValue) {
+ Object[] values = new Object[row.size()];
+ for (int i = 0; i < row.size(); i++) {
+ values[i] = row.get(i);
+ }
+ values[column.resolveIndex(row)] = newValue;
+ return new GenericRowWithSchema(values, row.schema());
+ }
+
+ /** Mirrors the old bulk row-copy approach. */
+ private static Row copyAndSetBulk(Row row, Map updates) {
+ Object[] values = new Object[row.size()];
+ for (int i = 0; i < row.size(); i++) {
+ values[i] = row.get(i);
+ }
+ for (Map.Entry e : updates.entrySet()) {
+ values[e.getKey().resolveIndex(row)] = e.getValue();
+ }
+ return new GenericRowWithSchema(values, row.schema());
+ }
+
+ /**
+ * 6-module pipeline using the current row-copy strategy.
+ * Each module produces a new Row; subsequent modules read from it.
+ */
+ static Row currentApproach(Row row) {
+ // 1. RegionExtraction → REGION
+ row = copyAndSet(row, REGION, "us-east-1");
+
+ // 2. BoaviztAPIstatic → ENERGY_USED, EMBODIED_EMISSIONS, EMBODIED_ADP
+ Map boavizta = new HashMap<>(4);
+ boavizta.put(ENERGY_USED, 0.042);
+ boavizta.put(EMBODIED_EMISSIONS, 0.15);
+ boavizta.put(EMBODIED_ADP, 0.003);
+ row = copyAndSetBulk(row, boavizta);
+
+ // 3. Accelerators → adds to ENERGY_USED (withUpdatedValue with add=true)
+ double existing = row.getDouble(ENERGY_USED.resolveIndex(row));
+ row = copyAndSet(row, ENERGY_USED, existing + 0.008);
+
+ // 4. PUE → reads ENERGY_USED, writes PUE
+ @SuppressWarnings("unused")
+ double energy4 = row.getDouble(ENERGY_USED.resolveIndex(row));
+ row = copyAndSet(row, PUE, 1.15);
+
+ // 5. AverageCarbonIntensity → reads REGION, writes CARBON_INTENSITY
+ @SuppressWarnings("unused")
+ String region = row.getString(REGION.resolveIndex(row));
+ row = copyAndSet(row, CARBON_INTENSITY, 450.0);
+
+ // 6. OperationalEmissions → reads ENERGY_USED, PUE, CARBON_INTENSITY → OPERATIONAL_EMISSIONS
+ double energy = row.getDouble(ENERGY_USED.resolveIndex(row));
+ double pue = row.getDouble(PUE.resolveIndex(row));
+ double ci = row.getDouble(CARBON_INTENSITY.resolveIndex(row));
+ row = copyAndSet(row, OPERATIONAL_EMISSIONS, energy * pue * ci);
+
+ return row;
+ }
+
+ // ==================================================================
+ // MAP APPROACH — modules enrich a shared Map, one Row at the end
+ // ==================================================================
+
+ /**
+ * Same 6-module pipeline but enrichment values accumulate in a Map.
+ * CUR columns are read from the immutable original row.
+ * Spruce columns written by earlier modules are read from the map.
+ * A single Row is materialised at the very end.
+ */
+ static Row mapApproach(Row originalRow) {
+ Map enriched = new HashMap<>();
+
+ // 1. RegionExtraction
+ enriched.put(REGION, "us-east-1");
+
+ // 2. BoaviztAPIstatic
+ enriched.put(ENERGY_USED, 0.042);
+ enriched.put(EMBODIED_EMISSIONS, 0.15);
+ enriched.put(EMBODIED_ADP, 0.003);
+
+ // 3. Accelerators — adds to ENERGY_USED (reads previous value from map)
+ Double existingEnergy = (Double) enriched.get(ENERGY_USED);
+ enriched.put(ENERGY_USED, (existingEnergy != null ? existingEnergy : 0.0) + 0.008);
+
+ // 4. PUE — reads ENERGY_USED from map
+ @SuppressWarnings("unused")
+ double energy4 = (Double) enriched.get(ENERGY_USED);
+ enriched.put(PUE, 1.15);
+
+ // 5. AverageCarbonIntensity — reads REGION from map
+ @SuppressWarnings("unused")
+ String region = (String) enriched.get(REGION);
+ enriched.put(CARBON_INTENSITY, 450.0);
+
+ // 6. OperationalEmissions — reads from map
+ double energy = (Double) enriched.get(ENERGY_USED);
+ double pue = (Double) enriched.get(PUE);
+ double ci = (Double) enriched.get(CARBON_INTENSITY);
+ enriched.put(OPERATIONAL_EMISSIONS, energy * pue * ci);
+
+ // --- Single row materialisation ---
+ Object[] values = new Object[originalRow.size()];
+ for (int i = 0; i < originalRow.size(); i++) {
+ values[i] = originalRow.get(i);
+ }
+ for (Map.Entry e : enriched.entrySet()) {
+ values[e.getKey().resolveIndex(originalRow)] = e.getValue();
+ }
+ return new GenericRowWithSchema(values, originalRow.schema());
+ }
+
+ // ==================================================================
+ // Benchmark
+ // ==================================================================
+
+ @Test
+ void benchmarkMapVsRowCopy() {
+ final int warmup = 100_000;
+ final int iterations = 1_000_000;
+
+ // ---- Warmup: let HotSpot JIT compile both paths ----
+ for (int i = 0; i < warmup; i++) {
+ currentApproach(templateRow);
+ mapApproach(templateRow);
+ }
+
+ // ---- Measure current approach ----
+ long t0 = System.nanoTime();
+ Row resultCurrent = null;
+ for (int i = 0; i < iterations; i++) {
+ resultCurrent = currentApproach(templateRow);
+ }
+ long currentNs = System.nanoTime() - t0;
+
+ // ---- Measure map approach ----
+ t0 = System.nanoTime();
+ Row resultMap = null;
+ for (int i = 0; i < iterations; i++) {
+ resultMap = mapApproach(templateRow);
+ }
+ long mapNs = System.nanoTime() - t0;
+
+ // ---- Verify both approaches produce identical results ----
+ for (int i = 0; i < totalWidth; i++) {
+ Object a = resultCurrent.get(i);
+ Object b = resultMap.get(i);
+ if (a instanceof Double da && b instanceof Double db) {
+ assertEquals(da, db, 1e-12, "Mismatch at column " + i);
+ } else {
+ assertEquals(a, b, "Mismatch at column " + i);
+ }
+ }
+
+ // ---- Report ----
+ long currentMs = currentNs / 1_000_000;
+ long mapMs = mapNs / 1_000_000;
+ double speedup = (double) currentNs / mapNs;
+
+ System.out.println();
+ System.out.println("=== Enrichment Strategy Benchmark ===");
+ System.out.printf("Schema width: %d columns (%d CUR + %d Spruce)%n",
+ totalWidth, CUR_WIDTH, SPRUCE_COLUMNS.length);
+ System.out.printf("Pipeline: 6 modules (mirrors real chain)%n");
+ System.out.printf("Rows processed: %,d%n%n", iterations);
+ System.out.printf("Current (row-copy per module): %,d ms%n", currentMs);
+ System.out.printf("Map (single row creation): %,d ms%n", mapMs);
+ System.out.printf("Speedup: %.2fx%n", speedup);
+ }
+}
diff --git a/src/test/java/com/digitalpebble/spruce/modules/ConstantLoadTest.java b/src/test/java/com/digitalpebble/spruce/modules/ConstantLoadTest.java
index 6a58fed..9dc1781 100644
--- a/src/test/java/com/digitalpebble/spruce/modules/ConstantLoadTest.java
+++ b/src/test/java/com/digitalpebble/spruce/modules/ConstantLoadTest.java
@@ -2,12 +2,17 @@
package com.digitalpebble.spruce.modules;
+import com.digitalpebble.spruce.Column;
+import com.digitalpebble.spruce.SpruceColumn;
import com.digitalpebble.spruce.Utils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Test;
+import java.util.HashMap;
+import java.util.Map;
+
import static org.junit.jupiter.api.Assertions.*;
class ConstantLoadTest {
@@ -16,10 +21,11 @@ class ConstantLoadTest {
private final StructType schema = Utils.getSchema(load);
@Test
- void process() {
+ void enrich() {
Object[] values = new Object[] {null};
Row row = new GenericRowWithSchema(values, schema);
- Row enriched = load.process(row);
- assertEquals(50d, enriched.getDouble(0));
+ Map enriched = new HashMap<>();
+ load.enrich(row, enriched);
+ assertEquals(50d, enriched.get(SpruceColumn.CPU_LOAD));
}
-}
\ No newline at end of file
+}
diff --git a/src/test/java/com/digitalpebble/spruce/modules/OperationalEmissionsTest.java b/src/test/java/com/digitalpebble/spruce/modules/OperationalEmissionsTest.java
index 85a8a63..bf6cbca 100644
--- a/src/test/java/com/digitalpebble/spruce/modules/OperationalEmissionsTest.java
+++ b/src/test/java/com/digitalpebble/spruce/modules/OperationalEmissionsTest.java
@@ -2,12 +2,16 @@
package com.digitalpebble.spruce.modules;
+import com.digitalpebble.spruce.Column;
import com.digitalpebble.spruce.Utils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Test;
+import java.util.HashMap;
+import java.util.Map;
+
import static com.digitalpebble.spruce.SpruceColumn.*;
import static org.junit.jupiter.api.Assertions.*;
@@ -20,31 +24,34 @@ public class OperationalEmissionsTest {
@Test
void processNoValues() {
- Object[] values = new Object[] {null, null, null};
+ Object[] values = new Object[] {null, null, null, null};
Row row = new GenericRowWithSchema(values, schema);
- Row enriched = module.process(row);
- // missing values comes back as it was
- assertEquals(row, enriched);
+ Map enriched = new HashMap<>();
+ module.enrich(row, enriched);
+ // missing values - map should not contain OPERATIONAL_EMISSIONS
+ assertFalse(enriched.containsKey(OPERATIONAL_EMISSIONS));
}
@Test
void processNoPUE() {
- Object[] values = new Object[] {10d, 321.04d, null, null};
- Row row = new GenericRowWithSchema(values, schema);
- Row enriched = module.process(row);
+ Row row = new GenericRowWithSchema(new Object[]{null, null, null, null}, schema);
+ Map enriched = new HashMap<>();
+ enriched.put(ENERGY_USED, 10d);
+ enriched.put(CARBON_INTENSITY, 321.04d);
+ module.enrich(row, enriched);
double expected = 10 * 321.04;
- double result = OPERATIONAL_EMISSIONS.getDouble(enriched);
- assertEquals(expected, result);
+ assertEquals(expected, (Double) enriched.get(OPERATIONAL_EMISSIONS));
}
@Test
void processWithPUE() {
- Object[] values = new Object[] {10d, 321.04d, 1.15, null};
- Row row = new GenericRowWithSchema(values, schema);
- Row enriched = module.process(row);
+ Row row = new GenericRowWithSchema(new Object[]{null, null, null, null}, schema);
+ Map enriched = new HashMap<>();
+ enriched.put(ENERGY_USED, 10d);
+ enriched.put(CARBON_INTENSITY, 321.04d);
+ enriched.put(PUE, 1.15);
+ module.enrich(row, enriched);
double expected = 10 * 321.04 * 1.15;
- double result = OPERATIONAL_EMISSIONS.getDouble(enriched);
- assertEquals(expected, result);
+ assertEquals(expected, (Double) enriched.get(OPERATIONAL_EMISSIONS));
}
-
-}
\ No newline at end of file
+}
diff --git a/src/test/java/com/digitalpebble/spruce/modules/PUETest.java b/src/test/java/com/digitalpebble/spruce/modules/PUETest.java
index bd9b208..4e84a8f 100644
--- a/src/test/java/com/digitalpebble/spruce/modules/PUETest.java
+++ b/src/test/java/com/digitalpebble/spruce/modules/PUETest.java
@@ -2,6 +2,7 @@
package com.digitalpebble.spruce.modules;
+import com.digitalpebble.spruce.Column;
import com.digitalpebble.spruce.SpruceColumn;
import com.digitalpebble.spruce.Utils;
import org.apache.spark.sql.Row;
@@ -15,7 +16,9 @@
import java.util.HashMap;
import java.util.Map;
+import static com.digitalpebble.spruce.SpruceColumn.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
class PUETest {
@@ -31,10 +34,10 @@ void setUp() {
@Test
void processNoValues() {
- Object[] values = new Object[] {null, null, null};
- Row row = new GenericRowWithSchema(values, schema);
- Row enriched = pue.process(row);
- assertEquals(row, enriched);
+ Row row = new GenericRowWithSchema(new Object[]{null, null, null}, schema);
+ Map enriched = new HashMap<>();
+ pue.enrich(row, enriched);
+ assertFalse(enriched.containsKey(SpruceColumn.PUE));
}
@Test
@@ -44,11 +47,13 @@ void processCustomConfiguration() {
config.put("default", 2.5);
customPue.init(config);
- Object[] values = new Object[] {10d, "Mars-Region", null};
- Row row = new GenericRowWithSchema(values, schema);
- Row enriched = customPue.process(row);
+ Row row = new GenericRowWithSchema(new Object[]{null, null, null}, schema);
+ Map enriched = new HashMap<>();
+ enriched.put(ENERGY_USED, 10d);
+ enriched.put(REGION, "Mars-Region");
+ customPue.enrich(row, enriched);
- assertEquals(2.5, SpruceColumn.PUE.getDouble(enriched), 0.001);
+ assertEquals(2.5, (Double) enriched.get(SpruceColumn.PUE), 0.001);
}
@ParameterizedTest
@@ -61,10 +66,14 @@ void processCustomConfiguration() {
"100, eu-central-2, 1.11" // Regex match (eu-.+)
})
void processRegionPUEValues(double energyUsed, String region, double expectedPUE) {
- Object[] values = new Object[] {energyUsed, region, null};
- Row row = new GenericRowWithSchema(values, schema);
- Row enriched = pue.process(row);
+ Row row = new GenericRowWithSchema(new Object[]{null, null, null}, schema);
+ Map enriched = new HashMap<>();
+ enriched.put(ENERGY_USED, energyUsed);
+ if (region != null) {
+ enriched.put(REGION, region);
+ }
+ pue.enrich(row, enriched);
- assertEquals(expectedPUE, SpruceColumn.PUE.getDouble(enriched), 0.001, "Failed for region: " + region);
+ assertEquals(expectedPUE, (Double) enriched.get(SpruceColumn.PUE), 0.001, "Failed for region: " + region);
}
-}
\ No newline at end of file
+}
diff --git a/src/test/java/com/digitalpebble/spruce/modules/RegionExtractionTest.java b/src/test/java/com/digitalpebble/spruce/modules/RegionExtractionTest.java
index de77ae7..a9b04e0 100644
--- a/src/test/java/com/digitalpebble/spruce/modules/RegionExtractionTest.java
+++ b/src/test/java/com/digitalpebble/spruce/modules/RegionExtractionTest.java
@@ -2,12 +2,16 @@
package com.digitalpebble.spruce.modules;
+import com.digitalpebble.spruce.Column;
import com.digitalpebble.spruce.Utils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Test;
+import java.util.HashMap;
+import java.util.Map;
+
import static com.digitalpebble.spruce.SpruceColumn.REGION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -23,32 +27,36 @@ public class RegionExtractionTest{
@Test
void processEmpty() {
Row row = generateRow(null, null, null);
- Row enriched = region.process(row);
- assertNull(REGION.getString(enriched));
+ Map enriched = new HashMap<>();
+ region.enrich(row, enriched);
+ assertNull(enriched.get(REGION));
}
@Test
void process() {
String reg = "us-east-1";
Row row = generateRow(reg, null, null);
- Row enriched = region.process(row);
- assertEquals(reg, REGION.getString(enriched));
+ Map enriched = new HashMap<>();
+ region.enrich(row, enriched);
+ assertEquals(reg, enriched.get(REGION));
}
@Test
void process2() {
String reg = "us-east-1";
Row row = generateRow(reg, "ignore_me", null);
- Row enriched = region.process(row);
- assertEquals(reg, REGION.getString(enriched));
+ Map enriched = new HashMap<>();
+ region.enrich(row, enriched);
+ assertEquals(reg, enriched.get(REGION));
}
@Test
void process3() {
String reg = "us-east-1";
Row row = generateRow(null, reg, null);
- Row enriched = region.process(row);
- assertEquals(reg, REGION.getString(enriched));
+ Map enriched = new HashMap<>();
+ region.enrich(row, enriched);
+ assertEquals(reg, enriched.get(REGION));
}
private Row generateRow(String PRODUCT_REGION_CODE, String PRODUCT_FROM_REGION_CODE, String PRODUCT_TO_REGION_CODE){
diff --git a/src/test/java/com/digitalpebble/spruce/modules/ServerlessTest.java b/src/test/java/com/digitalpebble/spruce/modules/ServerlessTest.java
index eb292fa..3309dc3 100644
--- a/src/test/java/com/digitalpebble/spruce/modules/ServerlessTest.java
+++ b/src/test/java/com/digitalpebble/spruce/modules/ServerlessTest.java
@@ -2,14 +2,19 @@
package com.digitalpebble.spruce.modules;
+import com.digitalpebble.spruce.Column;
import com.digitalpebble.spruce.Utils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Test;
+import java.util.HashMap;
+import java.util.Map;
+
import static com.digitalpebble.spruce.SpruceColumn.ENERGY_USED;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
class ServerlessTest {
@@ -19,66 +24,73 @@ class ServerlessTest {
@Test
void processEmptyRow() {
Row row = generateRow(null, null, null);
- Row enriched = serverless.process(row);
- assertEquals(row, enriched);
+ Map enriched = new HashMap<>();
+ serverless.enrich(row, enriched);
+ assertFalse(enriched.containsKey(ENERGY_USED));
}
@Test
void processFargateMemory() {
double quantity = 10d;
Row row = generateRow("FargateTask", quantity, "xx-Fargate-GB-Hours");
- Row enriched = serverless.process(row);
+ Map enriched = new HashMap<>();
+ serverless.enrich(row, enriched);
double expected = serverless.memory_coefficient_kwh * quantity;
- assertEquals(expected, ENERGY_USED.getDouble(enriched));
+ assertEquals(expected, enriched.get(ENERGY_USED));
}
@Test
void processFargatevCPU() {
double quantity = 4d;
Row row = generateRow("FargateTask", quantity, "xx-Fargate-vCPU-Hours:perCPU");
- Row enriched = serverless.process(row);
+ Map enriched = new HashMap<>();
+ serverless.enrich(row, enriched);
double expected = serverless.x86_cpu_coefficient_kwh * quantity;
- assertEquals(expected, ENERGY_USED.getDouble(enriched));
+ assertEquals(expected, enriched.get(ENERGY_USED));
}
@Test
void processFargatevCPUARM() {
double quantity = 4d;
Row row = generateRow("FargateTask", quantity, "xx-Fargate-ARM-vCPU-Hours:perCPU");
- Row enriched = serverless.process(row);
+ Map enriched = new HashMap<>();
+ serverless.enrich(row, enriched);
double expected = serverless.arm_cpu_coefficient_kwh * quantity;
- assertEquals(expected, ENERGY_USED.getDouble(enriched));
+ assertEquals(expected, enriched.get(ENERGY_USED));
}
@Test
void processEMRvCPUARM() {
double quantity = 4d;
Row row = generateRow("Worker", quantity, "xxx-EMR-SERVERLESS-ARM-vCPUHours");
- Row enriched = serverless.process(row);
+ Map enriched = new HashMap<>();
+ serverless.enrich(row, enriched);
double expected = serverless.arm_cpu_coefficient_kwh * quantity;
- assertEquals(expected, ENERGY_USED.getDouble(enriched));
+ assertEquals(expected, enriched.get(ENERGY_USED));
}
@Test
void processEMRvCPU() {
double quantity = 4d;
Row row = generateRow("Worker", quantity, "xx-EMR-SERVERLESS-vCPUHours");
- Row enriched = serverless.process(row);
+ Map enriched = new HashMap<>();
+ serverless.enrich(row, enriched);
double expected = serverless.x86_cpu_coefficient_kwh * quantity;
- assertEquals(expected, ENERGY_USED.getDouble(enriched));
+ assertEquals(expected, enriched.get(ENERGY_USED));
}
@Test
void processEMRMemory() {
double quantity = 10d;
Row row = generateRow("Worker", quantity, "EUN1-EMR-SERVERLESS-ARM-MemoryGBHours");
- Row enriched = serverless.process(row);
+ Map enriched = new HashMap<>();
+ serverless.enrich(row, enriched);
double expected = serverless.memory_coefficient_kwh * quantity;
- assertEquals(expected, ENERGY_USED.getDouble(enriched));
+ assertEquals(expected, enriched.get(ENERGY_USED));
}
private Row generateRow(String LINE_ITEM_OPERATION, Object USAGE_AMOUNT, String LINE_ITEM_USAGE_TYPE){
Object[] values = new Object[] {LINE_ITEM_OPERATION, USAGE_AMOUNT, LINE_ITEM_USAGE_TYPE, null};
return new GenericRowWithSchema(values, schema);
}
-}
\ No newline at end of file
+}
diff --git a/src/test/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPITest.java b/src/test/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPITest.java
index f666bc7..b16f137 100644
--- a/src/test/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPITest.java
+++ b/src/test/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPITest.java
@@ -85,10 +85,11 @@ void testInitWithCustomAddress() {
void testProcessWithNullValues(String instanceType, String serviceCode, String operation, String productCode) {
Object[] values = new Object[]{instanceType, serviceCode, operation, productCode, null};
Row row = new GenericRowWithSchema(values, schema);
- Row enriched = api.process(row);
+ Map enriched = new HashMap<>();
+ api.enrich(row, enriched);
- // Should return the original row unchanged
- assertEquals(row, enriched);
+ // Should not add any enrichment
+ assertTrue(enriched.isEmpty());
}
static Stream nullValueTestCases() {
@@ -106,24 +107,18 @@ static Stream nullValueTestCases() {
void testProcessWithEmptyValues(String instanceType, String serviceCode, String operation, String productCode) {
Object[] values = new Object[]{instanceType, serviceCode, operation, productCode, null};
Row row = new GenericRowWithSchema(values, schema);
+ Map enriched = new HashMap<>();
- // Test cases 1 and 2 (null and empty instance types) should throw IllegalArgumentException
- // Test cases 3 and 4 (empty strings for all fields) should return unchanged rows
- // because BoaviztAPI.process() has early returns before calling BoaviztAPIClient.getEnergyEstimates()
- if (instanceType == null || (instanceType != null && instanceType.trim().isEmpty())) {
- // These should throw IllegalArgumentException if they reach the API call
+ if (instanceType == null || instanceType.trim().isEmpty()) {
try {
- Row enriched = api.process(row);
- // If no exception was thrown, the row should be unchanged
- assertEquals(row, enriched, "Should return unchanged row for null/empty instance types that don't reach API call");
+ api.enrich(row, enriched);
+ assertTrue(enriched.isEmpty(), "Should not enrich for null/empty instance types");
} catch (IllegalArgumentException e) {
- // This is also valid - the validation caught it
assertTrue(e.getMessage().contains("Instance type cannot be null, empty, or whitespace only"));
}
} else {
- // Other test cases should return unchanged rows
- Row enriched = api.process(row);
- assertEquals(row, enriched, "Should return unchanged row for other empty value cases");
+ api.enrich(row, enriched);
+ assertTrue(enriched.isEmpty(), "Should not enrich for other empty value cases");
}
}
@@ -141,10 +136,11 @@ static Stream emptyValueTestCases() {
void testProcessWithUnsupportedValues(String instanceType, String serviceCode, String operation, String productCode) {
Object[] values = new Object[]{instanceType, serviceCode, operation, productCode, null};
Row row = new GenericRowWithSchema(values, schema);
- Row enriched = api.process(row);
+ Map enriched = new HashMap<>();
+ api.enrich(row, enriched);
- // Should return the original row unchanged for unsupported services/operations
- assertEquals(row, enriched);
+ // Should not add any enrichment for unsupported services/operations
+ assertTrue(enriched.isEmpty());
}
static Stream unsupportedValueTestCases() {
@@ -161,10 +157,11 @@ static Stream unsupportedValueTestCases() {
void testProcessWithEdgeCases(String instanceType, String serviceCode, String operation, String productCode) {
Object[] values = new Object[]{instanceType, serviceCode, operation, productCode, null};
Row row = new GenericRowWithSchema(values, schema);
- Row enriched = api.process(row);
+ Map enriched = new HashMap<>();
+ api.enrich(row, enriched);
- // Should return the original row unchanged for edge cases
- assertEquals(row, enriched);
+ // Should not add any enrichment for edge cases
+ assertTrue(enriched.isEmpty());
}
static Stream edgeCaseTestCases() {
@@ -198,10 +195,12 @@ void tearDown() throws IOException {
mockWebServer.shutdown();
}
- private Row enrich(String productInstanceType, String productServiceCode,String lineItemOperation, String lineItemProductCode, double usageAmount){
+ private Map callEnrich(String productInstanceType, String productServiceCode, String lineItemOperation, String lineItemProductCode, double usageAmount){
Object[] values = new Object[]{productInstanceType, productServiceCode, lineItemOperation, lineItemProductCode, usageAmount, null, null, null};
Row row = new GenericRowWithSchema(values, schema);
- return api.process(row);
+ Map enriched = new HashMap<>();
+ api.enrich(row, enriched);
+ return enriched;
}
@Test
@@ -213,11 +212,10 @@ void testProcessEC2InstanceWithValidData() throws IOException {
.setResponseCode(200)
.addHeader("Content-Type", "application/json"));
- Row enriched = enrich("t3.micro", "AmazonEC2", "RunInstances", "AmazonEC2", 10);
+ Map enriched = callEnrich("t3.micro", "AmazonEC2", "RunInstances", "AmazonEC2", 10);
- // The row should be processed and enriched with energy data
- assertNotNull(enriched);
- // Verify that the row was enriched (you can add more specific assertions here)
+ // The map should contain enrichment data
+ assertFalse(enriched.isEmpty());
}
@Test
@@ -229,10 +227,10 @@ void testProcessElasticSearchInstanceWithValidData() throws IOException {
.setResponseCode(200)
.addHeader("Content-Type", "application/json"));
- Row enriched = enrich("t3.micro.search", "AmazonES", "ESDomain", "AmazonES", 1.0);
+ Map enriched = callEnrich("t3.micro.search", "AmazonES", "ESDomain", "AmazonES", 1.0);
- // The row should be processed and enriched with energy data
- assertNotNull(enriched);
+ // The map should contain enrichment data
+ assertFalse(enriched.isEmpty());
}
@Test
@@ -244,10 +242,10 @@ void testProcessRDSInstanceWithValidData() throws IOException {
.setResponseCode(200)
.addHeader("Content-Type", "application/json"));
- Row enriched = enrich("db.t3.micro", "AmazonRDS", "CreateDBInstance", "AmazonRDS", 1.0);
+ Map enriched = callEnrich("db.t3.micro", "AmazonRDS", "CreateDBInstance", "AmazonRDS", 1.0);
- // The row should be processed and enriched with energy data
- assertNotNull(enriched);
+ // The map should contain enrichment data
+ assertFalse(enriched.isEmpty());
}
@ParameterizedTest
@@ -260,10 +258,10 @@ void testProcessEC2WithDifferentOperationPrefixes(String operation) throws IOExc
.setResponseCode(200)
.addHeader("Content-Type", "application/json"));
- Row enriched = enrich("t3.micro", "AmazonEC2", operation, "AmazonEC2", 1.0);
+ Map enriched = callEnrich("t3.micro", "AmazonEC2", operation, "AmazonEC2", 1.0);
- // The row should be processed and enriched with energy data
- assertNotNull(enriched);
+ // The map should contain enrichment data
+ assertFalse(enriched.isEmpty());
}
static Stream validEC2OperationTestCases() {
@@ -284,10 +282,10 @@ void testProcessRDSWithDifferentOperationPrefixes(String operation) throws IOExc
.setResponseCode(200)
.addHeader("Content-Type", "application/json"));
- Row enriched = enrich("db.t3.micro", "AmazonRDS", operation, "AmazonRDS", 1.0);
+ Map enriched = callEnrich("db.t3.micro", "AmazonRDS", operation, "AmazonRDS", 1.0);
- // The row should be processed and enriched with energy data
- assertNotNull(enriched);
+ // The map should contain enrichment data
+ assertFalse(enriched.isEmpty());
}
static Stream validRDSOperationTestCases() {
@@ -308,10 +306,10 @@ void testProcessWithComplexInstanceTypes(String instanceType) throws IOException
.setResponseCode(200)
.addHeader("Content-Type", "application/json"));
- Row enriched = enrich(instanceType, "AmazonEC2", "RunInstances", "AmazonEC2", 1.0);
+ Map enriched = callEnrich(instanceType, "AmazonEC2", "RunInstances", "AmazonEC2", 1.0);
- // The row should be processed and enriched with energy data
- assertNotNull(enriched);
+ // The map should contain enrichment data
+ assertFalse(enriched.isEmpty());
}
static Stream complexInstanceTypeTestCases() {
diff --git a/src/test/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPIstaticTest.java b/src/test/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPIstaticTest.java
index 3db5067..f39b1f7 100644
--- a/src/test/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPIstaticTest.java
+++ b/src/test/java/com/digitalpebble/spruce/modules/boavizta/BoaviztAPIstaticTest.java
@@ -17,6 +17,7 @@
import org.junit.jupiter.params.provider.MethodSource;
import java.util.HashMap;
+import java.util.Map;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -61,10 +62,11 @@ void testColumnsAdded() {
void testProcessWithNullValues(String instanceType, String serviceCode, String operation, String productCode) {
Object[] values = new Object[]{instanceType, serviceCode, operation, productCode, null, null, null};
Row row = new GenericRowWithSchema(values, schema);
- Row enriched = module.process(row);
+ Map enriched = new HashMap<>();
+ module.enrich(row, enriched);
- // Should return the original row unchanged
- assertEquals(row, enriched);
+ // Should not add any enrichment
+ assertTrue(enriched.isEmpty());
}
static Stream nullValueTestCases() {
@@ -82,24 +84,18 @@ static Stream nullValueTestCases() {
void testProcessWithEmptyValues(String instanceType, String serviceCode, String operation, String productCode) {
Object[] values = new Object[]{instanceType, serviceCode, operation, productCode, null, null, null};
Row row = new GenericRowWithSchema(values, schema);
+ Map enriched = new HashMap<>();
- // Test cases 1 and 2 (null and empty instance types) should throw IllegalArgumentException
- // Test cases 3 and 4 (empty strings for all fields) should return unchanged rows
- // because BoaviztAPI.process() has early returns before calling BoaviztAPIClient.getEnergyEstimates()
if (instanceType == null || instanceType.trim().isEmpty()) {
- // These should throw IllegalArgumentException if they reach the API call
try {
- Row enriched = module.process(row);
- // If no exception was thrown, the row should be unchanged
- assertEquals(row, enriched, "Should return unchanged row for null/empty instance types that don't reach API call");
+ module.enrich(row, enriched);
+ assertTrue(enriched.isEmpty(), "Should not enrich for null/empty instance types");
} catch (IllegalArgumentException e) {
- // This is also valid - the validation caught it
assertTrue(e.getMessage().contains("Instance type cannot be null, empty, or whitespace only"));
}
} else {
- // Other test cases should return unchanged rows
- Row enriched = module.process(row);
- assertEquals(row, enriched, "Should return unchanged row for other empty value cases");
+ module.enrich(row, enriched);
+ assertTrue(enriched.isEmpty(), "Should not enrich for other empty value cases");
}
}
@@ -117,10 +113,11 @@ static Stream emptyValueTestCases() {
void testProcessWithUnsupportedValues(String instanceType, String serviceCode, String operation, String productCode) {
Object[] values = new Object[]{instanceType, serviceCode, operation, productCode, null, null, null};
Row row = new GenericRowWithSchema(values, schema);
- Row enriched = module.process(row);
+ Map enriched = new HashMap<>();
+ module.enrich(row, enriched);
- // Should return the original row unchanged for unsupported services/operations
- assertEquals(row, enriched);
+ // Should not add any enrichment for unsupported services/operations
+ assertTrue(enriched.isEmpty());
}
static Stream unsupportedValueTestCases() {
@@ -136,10 +133,11 @@ static Stream unsupportedValueTestCases() {
void testProcessWithEdgeCases(String instanceType, String serviceCode, String operation, String productCode) {
Object[] values = new Object[]{instanceType, serviceCode, operation, productCode, null, null, null};
Row row = new GenericRowWithSchema(values, schema);
- Row enriched = module.process(row);
+ Map enriched = new HashMap<>();
+ module.enrich(row, enriched);
- // Should return the original row unchanged for edge cases
- assertEquals(row, enriched);
+ // Should not add any enrichment for edge cases
+ assertTrue(enriched.isEmpty());
}
static Stream edgeCaseTestCases() {
diff --git a/src/test/java/com/digitalpebble/spruce/modules/ccf/AcceleratorsTest.java b/src/test/java/com/digitalpebble/spruce/modules/ccf/AcceleratorsTest.java
index 203fe3d..c364be4 100644
--- a/src/test/java/com/digitalpebble/spruce/modules/ccf/AcceleratorsTest.java
+++ b/src/test/java/com/digitalpebble/spruce/modules/ccf/AcceleratorsTest.java
@@ -2,6 +2,7 @@
package com.digitalpebble.spruce.modules.ccf;
+import com.digitalpebble.spruce.Column;
import com.digitalpebble.spruce.Utils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
@@ -11,17 +12,17 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
import static com.digitalpebble.spruce.SpruceColumn.ENERGY_USED;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
class AcceleratorsTest {
private static final Accelerators accelerators = new Accelerators();
-
private static final StructType schema = Utils.getSchema(accelerators);
@BeforeAll
@@ -29,22 +30,29 @@ static void initialize() {
accelerators.init(Map.of());
}
- // {PRODUCT_INSTANCE_TYPE, LINE_ITEM_OPERATION, LINE_ITEM_PRODUCT_CODE, USAGE_AMOUNT};
private static Stream provideArgsWithType() {
- return Stream.of(Arguments.of("g5.xlarge", "RunInstances", "AmazonEC2", 1.0d, 0.0855d), Arguments.of("g5.xlarge", "RunInstances", "AmazonEC2", 2.0d, 0.171d), Arguments.of("g4dn.xlarge", "RunInstances", "AmazonEC2", 1.0d, 0.0395d), Arguments.of("c5.xlarge", "RunInstances", "AmazonEC2", 1.0d, null), Arguments.of("g6.8xlarge", "RunInstances", "AmazonEC2", 1.0d, 0.04d));
+ return Stream.of(
+ Arguments.of("g5.xlarge", "RunInstances", "AmazonEC2", 1.0d, 0.0855d),
+ Arguments.of("g5.xlarge", "RunInstances", "AmazonEC2", 2.0d, 0.171d),
+ Arguments.of("g4dn.xlarge", "RunInstances", "AmazonEC2", 1.0d, 0.0395d),
+ Arguments.of("c5.xlarge", "RunInstances", "AmazonEC2", 1.0d, null),
+ Arguments.of("g6.8xlarge", "RunInstances", "AmazonEC2", 1.0d, 0.04d)
+ );
}
@ParameterizedTest
@MethodSource("provideArgsWithType")
- void process(String PRODUCT_INSTANCE_TYPE, String LINE_ITEM_OPERATION, String LINE_ITEM_PRODUCT_CODE, double USAGE_AMOUNT, Double expected) {
- Object[] values = new Object[]{PRODUCT_INSTANCE_TYPE, LINE_ITEM_OPERATION, LINE_ITEM_PRODUCT_CODE, USAGE_AMOUNT, null};
+ void process(String PRODUCT_INSTANCE_TYPE, String LINE_ITEM_OPERATION,
+ String LINE_ITEM_PRODUCT_CODE, double USAGE_AMOUNT, Double expected) {
+ Object[] values = new Object[]{PRODUCT_INSTANCE_TYPE, LINE_ITEM_OPERATION,
+ LINE_ITEM_PRODUCT_CODE, USAGE_AMOUNT, null};
Row row = new GenericRowWithSchema(values, schema);
- Row result = accelerators.process(row);
+ Map enriched = new HashMap<>();
+ accelerators.enrich(row, enriched);
if (expected != null) {
- assertEquals(expected, ENERGY_USED.getDouble(result), 0.0001);
+ assertEquals(expected, (Double) enriched.get(ENERGY_USED), 0.0001);
} else {
- assertTrue(ENERGY_USED.isNullAt(result));
+ assertFalse(enriched.containsKey(ENERGY_USED));
}
}
-
-}
\ No newline at end of file
+}
diff --git a/src/test/java/com/digitalpebble/spruce/modules/ccf/NetworkingTest.java b/src/test/java/com/digitalpebble/spruce/modules/ccf/NetworkingTest.java
index 73f8f72..965cd87 100644
--- a/src/test/java/com/digitalpebble/spruce/modules/ccf/NetworkingTest.java
+++ b/src/test/java/com/digitalpebble/spruce/modules/ccf/NetworkingTest.java
@@ -2,6 +2,7 @@
package com.digitalpebble.spruce.modules.ccf;
+import com.digitalpebble.spruce.Column;
import com.digitalpebble.spruce.Utils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
@@ -12,35 +13,32 @@
import java.util.HashMap;
import java.util.Map;
+import static com.digitalpebble.spruce.SpruceColumn.ENERGY_USED;
import static org.junit.jupiter.api.Assertions.*;
class NetworkingTest {
private Networking networking = new Networking();
-
private StructType schema = Utils.getSchema(networking);
@Test
void processNoValues() {
- Object[] values = new Object[] {null, null, null};
+ Object[] values = new Object[] {null, null, null, null};
Row row = new GenericRowWithSchema(values, schema);
- Row enriched = networking.process(row);
- // missing values comes back as it was
- assertEquals(row, enriched);
+ Map enriched = new HashMap<>();
+ networking.enrich(row, enriched);
+ assertFalse(enriched.containsKey(ENERGY_USED));
}
@Test
void processValues() {
- // 10 GBs of data transfer between regions
Map product = new HashMap<>();
product.put("transfer_type", "InterRegion");
- ;
Object[] values = new Object[] {"AWSDataTransfer", JavaConverters.asScala(product), 10d, null};
Row row = new GenericRowWithSchema(values, schema);
- Row enriched = networking.process(row);
- // should have an estimated 0.01 kwh
+ Map enriched = new HashMap<>();
+ networking.enrich(row, enriched);
double expected = networking.network_coefficient * 10;
- assertEquals(0.01d, enriched.getDouble(3));
+ assertEquals(0.01d, (Double) enriched.get(ENERGY_USED));
}
-
-}
\ No newline at end of file
+}
diff --git a/src/test/java/com/digitalpebble/spruce/modules/ccf/StorageTest.java b/src/test/java/com/digitalpebble/spruce/modules/ccf/StorageTest.java
index 6b40fd5..f5f2ed3 100644
--- a/src/test/java/com/digitalpebble/spruce/modules/ccf/StorageTest.java
+++ b/src/test/java/com/digitalpebble/spruce/modules/ccf/StorageTest.java
@@ -2,6 +2,7 @@
package com.digitalpebble.spruce.modules.ccf;
+import com.digitalpebble.spruce.Column;
import com.digitalpebble.spruce.Utils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
@@ -11,16 +12,17 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
import static com.digitalpebble.spruce.SpruceColumn.ENERGY_USED;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
class StorageTest {
private static final Storage storage = new Storage();
-
private static final StructType schema = Utils.getSchema(storage);
@BeforeAll
@@ -29,11 +31,21 @@ static void initialize() {
}
private static Stream provideArgsWithType() {
- return Stream.of(Arguments.of("Storage", 0.1d, "EUW2-TimedStorage-ByteHrs", "AmazonS3", "GB-Mo", false), Arguments.of("Storage", 0.1d, "SomeUsageType", "AmazonDocDB", "GB-Mo", false), Arguments.of("CreateVolume", 10d, "EUW2-EBS:VolumeUsage", "AmazonEC2", "GB-Mo", false), Arguments.of("CreateVolume-Gp2", 10d, "EBS:VolumeUsage.gp2", "AmazonEC2", "GB-Mo", true), Arguments.of("CreateVolume-Gp3", 10d, "VolumeUsage.gp3", "AmazonEC2", "GB-Mo", true));
+ return Stream.of(
+ Arguments.of("Storage", 0.1d, "EUW2-TimedStorage-ByteHrs", "AmazonS3", "GB-Mo", false),
+ Arguments.of("Storage", 0.1d, "SomeUsageType", "AmazonDocDB", "GB-Mo", false),
+ Arguments.of("CreateVolume", 10d, "EUW2-EBS:VolumeUsage", "AmazonEC2", "GB-Mo", false),
+ Arguments.of("CreateVolume-Gp2", 10d, "EBS:VolumeUsage.gp2", "AmazonEC2", "GB-Mo", true),
+ Arguments.of("CreateVolume-Gp3", 10d, "VolumeUsage.gp3", "AmazonEC2", "GB-Mo", true)
+ );
}
private static Stream provideArgsWrongUnit() {
- return Stream.of(Arguments.of("Storage", 10d, "SomeUsageType", "AmazonDocDB", "vCPU-hour"), Arguments.of("CreateVolume-Gp3", 0.1, "EBS:VolumeP-IOPS.gp3", "AmazonEC2", "IOPS-Mo"), Arguments.of("CreateVolume-Gp2", 0.1, "EBS:VolumeP-Throughput.gp3", "AmazonEC2", "GiBps-mo"));
+ return Stream.of(
+ Arguments.of("Storage", 10d, "SomeUsageType", "AmazonDocDB", "vCPU-hour"),
+ Arguments.of("CreateVolume-Gp3", 0.1, "EBS:VolumeP-IOPS.gp3", "AmazonEC2", "IOPS-Mo"),
+ Arguments.of("CreateVolume-Gp2", 0.1, "EBS:VolumeP-Throughput.gp3", "AmazonEC2", "GiBps-mo")
+ );
}
@ParameterizedTest
@@ -41,21 +53,25 @@ private static Stream provideArgsWrongUnit() {
void process(String operation, double amount, String usage, String service, String unit, boolean isSSD) {
Object[] values = new Object[]{operation, amount, usage, service, unit, null};
Row row = new GenericRowWithSchema(values, schema);
- Row result = storage.process(row);
+ Map enriched = new HashMap<>();
+ storage.enrich(row, enriched);
double gb_hours = Utils.Conversions.GBMonthsToGBHours(amount);
int replication = storage.getReplicationFactor(service, usage);
double coef = isSSD ? storage.ssd_gb_coefficient : storage.hdd_gb_coefficient;
double expected = gb_hours * coef * replication / 1000;
- assertEquals(expected, ENERGY_USED.getDouble(result), 0.0001);
+ assertEquals(expected, (Double) enriched.get(ENERGY_USED), 0.0001);
}
@ParameterizedTest
@MethodSource("provideArgsWrongUnit")
- void processSSDServiceWrongUnit(String LINE_ITEM_OPERATION, double USAGE_AMOUNT, String LINE_ITEM_USAGE_TYPE, String PRODUCT_SERVICE_CODE, String PRICING_UNIT) {
- Object[] values = new Object[]{LINE_ITEM_OPERATION, USAGE_AMOUNT, LINE_ITEM_USAGE_TYPE, PRODUCT_SERVICE_CODE, PRICING_UNIT, null};
+ void processSSDServiceWrongUnit(String LINE_ITEM_OPERATION, double USAGE_AMOUNT,
+ String LINE_ITEM_USAGE_TYPE, String PRODUCT_SERVICE_CODE,
+ String PRICING_UNIT) {
+ Object[] values = new Object[]{LINE_ITEM_OPERATION, USAGE_AMOUNT, LINE_ITEM_USAGE_TYPE,
+ PRODUCT_SERVICE_CODE, PRICING_UNIT, null};
Row row = new GenericRowWithSchema(values, schema);
- Row result = storage.process(row);
- assertEquals(row, result);
+ Map enriched = new HashMap<>();
+ storage.enrich(row, enriched);
+ assertFalse(enriched.containsKey(ENERGY_USED));
}
-
-}
\ No newline at end of file
+}