Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 58 additions & 9 deletions src/test/java/com/digitalpebble/spruce/EnrichmentPipelineTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import static com.digitalpebble.spruce.CURColumn.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -23,6 +25,56 @@
*/
public class EnrichmentPipelineTest {

/** A lightweight test column used by DummyModule to verify enrichment. */
private static final Column DUMMY_ENRICHED = new Column("dummy_enriched", DataTypes.DoubleType) {};

/**
* Minimal {@link EnrichmentModule} for testing pipeline logic.
* Declares a single output column and sets it to a fixed value during process().
*/
static class DummyModule implements EnrichmentModule {

@Override
public Column[] columnsNeeded() {
return new Column[0];
}

@Override
public Column[] columnsAdded() {
return new Column[]{ DUMMY_ENRICHED };
}

@Override
public Row process(Row row) {
return EnrichmentModule.withUpdatedValue(row, DUMMY_ENRICHED, 42.0);
}
}

/**
* Creates a Config pre-loaded with a single DummyModule.
* This avoids loading the real production modules (which require the full CUR schema)
* and keeps the test focused on the pipeline's Usage-filter logic.
*/
private static Config createMockConfig() {
Config config = new Config();
DummyModule module = new DummyModule();
config.getModules().add(module);

// Also inject matching config entry via reflection so configureModules() works
try {
var configsField = Config.class.getDeclaredField("configs");
configsField.setAccessible(true);
@SuppressWarnings("unchecked")
List<Map<String, Object>> configs =
(List<Map<String, Object>>) configsField.get(config);
configs.add(Collections.emptyMap());
} catch (Exception e) {
throw new RuntimeException("Failed to set up mock config", e);
}

return config;
}

/**
* Verifies the selective enrichment logic based on Line Item Type.
* <p>
Expand All @@ -39,21 +91,18 @@ public class EnrichmentPipelineTest {
"Fee, false"
})
public void testPipelineEnrichmentLogic(String lineItemType, boolean expectEnrichment) throws Exception {
Config config = new Config();
Config config = createMockConfig();
EnrichmentPipeline pipeline = new EnrichmentPipeline(config);

// TODO: Validate that modules are loaded (Option 1 - Mocking).
// Currently, new Config() returns an empty list in unit tests.
// We will implement a Mock Config in a separate PR to handle this without
// requiring the full production schema.
// assertFalse(config.getModules().isEmpty(), "Config should have at least one enrichment module");
List<EnrichmentModule> modules = config.getModules();
assertFalse(modules.isEmpty(), "Config should have at least one enrichment module for this test");

// Minimal schema: use ONLY existing constants in CURColumn.
// Minimal schema: base CUR columns + columns added by the DummyModule
StructType schema = new StructType()
.add(PRODUCT_REGION_CODE.getLabel(), PRODUCT_REGION_CODE.getType(), true)
.add(LINE_ITEM_TYPE.getLabel(), LINE_ITEM_TYPE.getType(), true);

for (EnrichmentModule module : config.getModules()) {
for (EnrichmentModule module : modules) {
for (Column c : module.columnsAdded()) {
schema = schema.add(c.getLabel(), c.getType(), true);
}
Expand All @@ -75,7 +124,7 @@ public void testPipelineEnrichmentLogic(String lineItemType, boolean expectEnric
assertEquals("us-east-1", PRODUCT_REGION_CODE.getString(processedRow));

// Verify enrichment logic
for (EnrichmentModule module : config.getModules()) {
for (EnrichmentModule module : modules) {
for (Column c : module.columnsAdded()) {
int fieldIndex = processedRow.fieldIndex(c.getLabel());
if (expectEnrichment) {
Expand Down