Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 47 additions & 14 deletions src/main/java/com/digitalpebble/spruce/CURColumn.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

package com.digitalpebble.spruce;

import org.apache.spark.SparkIllegalArgumentException;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.MapType;

Expand All @@ -10,22 +12,53 @@
/** Columns from CUR reports **/
public class CURColumn extends Column {

public static Column LINE_ITEM_OPERATION = new CURColumn("line_item_operation", StringType);
public static Column LINE_ITEM_PRODUCT_CODE = new CURColumn("line_item_product_code", StringType);
public static Column LINE_ITEM_TYPE = new CURColumn("line_item_line_item_type", StringType);
public static Column LINE_ITEM_USAGE_TYPE = new CURColumn("line_item_usage_type", StringType);
public static Column PRICING_UNIT= new CURColumn("pricing_unit", StringType);
public static Column PRODUCT = new CURColumn("product", MapType.apply(StringType,StringType));
public static Column PRODUCT_INSTANCE_TYPE = new CURColumn("product_instance_type", StringType);
public static Column PRODUCT_INSTANCE_FAMILY = new CURColumn("product_instance_family", StringType);
public static Column PRODUCT_PRODUCT_FAMILY = new CURColumn("product_product_family", StringType);
public static Column PRODUCT_REGION_CODE = new CURColumn("product_region_code", StringType);
public static Column PRODUCT_FROM_REGION_CODE = new CURColumn("product_from_region_code", StringType);
public static Column PRODUCT_TO_REGION_CODE = new CURColumn("product_to_region_code", StringType);
public static Column PRODUCT_SERVICE_CODE = new CURColumn("product_servicecode", StringType);
public static Column USAGE_AMOUNT = new CURColumn("line_item_usage_amount", DoubleType);
public static CURColumn LINE_ITEM_OPERATION = new CURColumn("line_item_operation", StringType);
public static CURColumn LINE_ITEM_PRODUCT_CODE = new CURColumn("line_item_product_code", StringType);
public static CURColumn LINE_ITEM_TYPE = new CURColumn("line_item_line_item_type", StringType);
public static CURColumn LINE_ITEM_USAGE_TYPE = new CURColumn("line_item_usage_type", StringType);
public static CURColumn PRICING_UNIT= new CURColumn("pricing_unit", StringType);
public static CURColumn PRODUCT = new CURColumn("product", MapType.apply(StringType,StringType));
public static CURColumn PRODUCT_INSTANCE_TYPE = new CURColumn("product_instance_type", StringType);
public static CURColumn PRODUCT_INSTANCE_FAMILY = new CURColumn("product_instance_family", StringType);
public static CURColumn PRODUCT_PRODUCT_FAMILY = new CURColumn("product_product_family", StringType);
public static CURColumn PRODUCT_REGION_CODE = new CURColumn("product_region_code", StringType);
public static CURColumn PRODUCT_FROM_REGION_CODE = new CURColumn("product_from_region_code", StringType);
public static CURColumn PRODUCT_TO_REGION_CODE = new CURColumn("product_to_region_code", StringType);
public static CURColumn PRODUCT_SERVICE_CODE = new CURColumn("product_servicecode", StringType);
public static CURColumn USAGE_AMOUNT = new CURColumn("line_item_usage_amount", DoubleType);

CURColumn(String l, DataType t) {
super(l, t);
}

/** Returns the double value for this column in the given row. */
public double getDouble(Row r) {
return r.getDouble(resolveIndex(r));
}

/**
* Returns the String value for this column in the given row.
* If optional is true, returns null when the field is not in the schema;
* otherwise propagates the exception.
*/
public String getString(Row r, boolean optional) {
try {
return r.getString(resolveIndex(r));
} catch (SparkIllegalArgumentException e) {
if (optional) {
return null;
}
throw e;
}
}

/** Returns the String value for this column in the given row. */
public String getString(Row r) {
return getString(r, false);
}

/** Returns true if the value for this column is null in the given row. */
public boolean isNullAt(Row r) {
return r.isNullAt(resolveIndex(r));
}
}
39 changes: 0 additions & 39 deletions src/main/java/com/digitalpebble/spruce/Column.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

package com.digitalpebble.spruce;

import org.apache.spark.SparkIllegalArgumentException;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
Expand Down Expand Up @@ -55,43 +54,5 @@ public int resolveIndex(Row r) {
cachedIndex = index;
return index;
}

/**
* Utility method to get the value for the column in the row
**/
public double getDouble(Row r) {
return r.getDouble(resolveIndex(r));
}


/**
* Utility method to get the value for the column in the row or null if it does not exist.
* Returns null if the field is not defined in the schema and optional is set to true,
* propagates an exception otherwise.
**/
public String getString(Row r, boolean optional) {
try {
return r.getString(resolveIndex(r));
} catch (SparkIllegalArgumentException e) {
if (optional) {
return null;
}
throw e;
}
}

/**
* Utility method to get the value for the column in the row or null if it does not exist
**/
public String getString(Row r) {
return getString(r, false);
}

/**
* Utility method to get check that a value is non null
**/
public boolean isNullAt(Row r) {
return r.isNullAt(resolveIndex(r));
}
}

63 changes: 12 additions & 51 deletions src/main/java/com/digitalpebble/spruce/EnrichmentModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
package com.digitalpebble.spruce;

import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;

import java.io.Serializable;
import java.util.Map;

/**
* A module adds new columns to a Dataset and populates them based on its content.
* The columns can represent energy or water consumption, carbon intensity, carbon emissions etc...
* The bulk of the work is done in the map function.
*
* <p>Modules read CUR (input) columns from the original {@link Row} and read/write
* Spruce (enrichment) columns via a shared {@code Map<Column, Object>}.
* The pipeline materialises one final Row at the end, avoiding per-module row copies.
**/

public interface EnrichmentModule extends Serializable {
Expand All @@ -25,53 +27,12 @@ default void init(Map<String, Object> params){}
/** Returns the columns added by this module **/
Column[] columnsAdded();

Row process(Row row);

static Row withUpdatedValue(Row row, Column column, Object newValue) {
Object[] values = new Object[row.size()];
for (int i = 0; i < row.size(); i++) {
values[i] = row.get(i);
}
int index = column.resolveIndex(row);
values[index] = newValue;
return new GenericRowWithSchema(values, row.schema());
}

static Row withUpdatedValue(Row row, Column column, Double newValue, boolean add) {
Object[] values = new Object[row.size()];
for (int i = 0; i < row.size(); i++) {
values[i] = row.get(i);
}
int index = column.resolveIndex(row);
Object existing = values[index];
if (add && existing instanceof Double) {
values[index] = newValue + (Double) existing;
} else {
values[index] = newValue;
}
return new GenericRowWithSchema(values, row.schema());
}

static Row withUpdatedValues(Row row, Map<Column, Object> updates) {
Object[] values = new Object[row.size()];
for (int i = 0; i < row.size(); i++) {
values[i] = row.get(i);
}

for (Map.Entry<Column, Object> entry : updates.entrySet()) {
Column column = entry.getKey();
Object newValue = entry.getValue();

int index;
try {
index = column.resolveIndex(row);
} catch (IllegalArgumentException e) {
throw new RuntimeException("Field not found in row: " + column.getLabel(), e);
}

values[index] = newValue;
}

return new GenericRowWithSchema(values, row.schema());
}
/**
* Enrich the given row by reading input columns from {@code row} and
* reading/writing enrichment columns via {@code enrichedValues}.
*
* @param row the immutable original row from the dataset
* @param enrichedValues shared map accumulating enrichment values across all modules
*/
void enrich(Row row, Map<Column, Object> enrichedValues);
}
18 changes: 16 additions & 2 deletions src/main/java/com/digitalpebble/spruce/EnrichmentPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@

import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import static com.digitalpebble.spruce.CURColumn.LINE_ITEM_TYPE;

Expand Down Expand Up @@ -41,10 +44,21 @@ public Row next() {
// usage filter - only need to enrich entries that correspond to a usage (no tax, discount or fee)
boolean usage = usageFilter(row);
if (!usage) return row;

Map<Column, Object> enriched = new HashMap<>();
for (EnrichmentModule module : enrichmentModules) {
row = module.process(row);
module.enrich(row, enriched);
}

// Materialise the final row — single copy instead of one per module
Object[] values = new Object[row.size()];
for (int i = 0; i < row.size(); i++) {
values[i] = row.get(i);
}
for (Map.Entry<Column, Object> entry : enriched.entrySet()) {
values[entry.getKey().resolveIndex(row)] = entry.getValue();
}
return row;
return new GenericRowWithSchema(values, row.schema());
}
};
}
Expand Down
28 changes: 20 additions & 8 deletions src/main/java/com/digitalpebble/spruce/SpruceColumn.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,35 @@

import org.apache.spark.sql.types.DataType;

import java.util.Map;

import static org.apache.spark.sql.types.DataTypes.DoubleType;
import static org.apache.spark.sql.types.DataTypes.StringType;

/** Defines columns added by the EnrichmentModules **/
public class SpruceColumn extends Column {

public static Column ENERGY_USED = new SpruceColumn("operational_energy_kwh", DoubleType);
public static Column CARBON_INTENSITY = new SpruceColumn("carbon_intensity", DoubleType);
public static Column OPERATIONAL_EMISSIONS = new SpruceColumn("operational_emissions_co2eq_g", DoubleType);
public static Column EMBODIED_EMISSIONS = new SpruceColumn("embodied_emissions_co2eq_g", DoubleType);
public static Column EMBODIED_ADP = new SpruceColumn("embodied_adp_sbeq_g", DoubleType);
public static Column CPU_LOAD = new SpruceColumn("cpu_load_percentage", DoubleType);
public static Column PUE = new SpruceColumn("power_usage_effectiveness", DoubleType);
public static Column REGION = new SpruceColumn("region", StringType);
public static SpruceColumn ENERGY_USED = new SpruceColumn("operational_energy_kwh", DoubleType);
public static SpruceColumn CARBON_INTENSITY = new SpruceColumn("carbon_intensity", DoubleType);
public static SpruceColumn OPERATIONAL_EMISSIONS = new SpruceColumn("operational_emissions_co2eq_g", DoubleType);
public static SpruceColumn EMBODIED_EMISSIONS = new SpruceColumn("embodied_emissions_co2eq_g", DoubleType);
public static SpruceColumn EMBODIED_ADP = new SpruceColumn("embodied_adp_sbeq_g", DoubleType);
public static SpruceColumn CPU_LOAD = new SpruceColumn("cpu_load_percentage", DoubleType);
public static SpruceColumn PUE = new SpruceColumn("power_usage_effectiveness", DoubleType);
public static SpruceColumn REGION = new SpruceColumn("region", StringType);

private SpruceColumn(String l, DataType t) {
super(l, t);
}

/** Returns the String value for this column from the enriched values map, or null if absent. */
public String getString(Map<Column, Object> enrichedValues) {
return (String) enrichedValues.get(this);
}

/** Returns the Double value for this column from the enriched values map, or null if absent. */
public Double getDouble(Map<Column, Object> enrichedValues) {
return (Double) enrichedValues.get(this);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public Column[] columnsAdded() {
}

@Override
public Row process(Row row) {
return EnrichmentModule.withUpdatedValue(row, CPU_LOAD, load_value);
public void enrich(Row row, Map<Column, Object> enrichedValues) {
enrichedValues.put(CPU_LOAD, load_value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import com.digitalpebble.spruce.EnrichmentModule;
import org.apache.spark.sql.Row;

import java.util.Map;

import static com.digitalpebble.spruce.SpruceColumn.*;

/**
Expand All @@ -25,22 +27,18 @@ public Column[] columnsAdded() {
}

@Override
public Row process(Row row) {
if (ENERGY_USED.isNullAt(row)) {
return row;
}

if (CARBON_INTENSITY.isNullAt(row)) {
return row;
}
public void enrich(Row row, Map<Column, Object> enrichedValues) {
Double energyUsed = ENERGY_USED.getDouble(enrichedValues);
if (energyUsed == null) return;

final double energyUsed = ENERGY_USED.getDouble(row);
Double carbon_intensity = CARBON_INTENSITY.getDouble(enrichedValues);
if (carbon_intensity == null) return;

// take into account the PUE if present
final double pue = PUE.isNullAt(row) ? 1.0 : PUE.getDouble(row);
final double carbon_intensity = CARBON_INTENSITY.getDouble(row);
Double pueVal = PUE.getDouble(enrichedValues);
final double pue = pueVal != null ? pueVal : 1.0;
final double emissions = energyUsed * carbon_intensity * pue;

return EnrichmentModule.withUpdatedValue(row, OPERATIONAL_EMISSIONS, emissions);
enrichedValues.put(OPERATIONAL_EMISSIONS, emissions);
}
}
19 changes: 7 additions & 12 deletions src/main/java/com/digitalpebble/spruce/modules/PUE.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,17 @@ public Column[] columnsAdded() {
}

@Override
public Row process(Row row) {
if (ENERGY_USED.isNullAt(row)) {
return row;
}
public void enrich(Row row, Map<Column, Object> enrichedValues) {
Double energyUsed = ENERGY_USED.getDouble(enrichedValues);
if (energyUsed == null) return;

double energyUsed = ENERGY_USED.getDouble(row);
if (energyUsed <= 0) return row;
if (energyUsed <= 0) return;

String region = null;
if (!REGION.isNullAt(row)) {
region = REGION.getString(row);
}
String region = REGION.getString(enrichedValues);

double pueToApply = getPueForRegion(region);

return EnrichmentModule.withUpdatedValue(row, SpruceColumn.PUE, pueToApply);
enrichedValues.put(SpruceColumn.PUE, pueToApply);
}

private double getPueForRegion(String region) {
Expand All @@ -119,4 +114,4 @@ private double getPueForRegion(String region) {

return defaultPueValue;
}
}
}
Loading
Loading