diff --git a/core/src/main/scala/com/amadeus/dataio/config/fields/DateFilterConfigurator.scala b/core/src/main/scala/com/amadeus/dataio/config/fields/DateFilterConfigurator.scala index c4ee46d..051b9ca 100644 --- a/core/src/main/scala/com/amadeus/dataio/config/fields/DateFilterConfigurator.scala +++ b/core/src/main/scala/com/amadeus/dataio/config/fields/DateFilterConfigurator.scala @@ -1,51 +1,85 @@ package com.amadeus.dataio.config.fields +import com.amadeus.dataio.core.Logging import com.amadeus.dataio.core.time.DateRange import com.typesafe.config.Config import org.apache.spark.sql.Column import org.apache.spark.sql.functions.col +import java.time.LocalDate import scala.util.Try -trait DateFilterConfigurator { +trait DateFilterConfigurator extends Logging{ + def getDateFilterConfig(implicit config: Config): Option[DateFilterConfig] = { + if (!config.hasPath("date_filter")) { + return None + } - /** @param config The typesafe Config object holding the configuration. - * @return The date range, or None. - * @throws IllegalArgumentException If the data was found but is not formatted properly. - */ - def getDateFilterRange(implicit config: Config): Option[DateRange] = { - // If only reference or offset is present for a given syntax, this is incorrect - val syntaxTwoOk = testArguments("date_filter.reference", "date_filter.offset") - - // if any is false, we have an incomplete conf - if (!syntaxTwoOk) { - throw new IllegalArgumentException("Configuration incomplete for date filter reference/offset") - } else { - Try { - getArguments("date_filter.reference", "date_filter.offset") - }.toOption + val filterConfig = config.getConfig("date_filter") + + val referenceOffsetConfig = getReferenceOffset(filterConfig) + val fromUntilConfig = getFromUntil(filterConfig) + + (referenceOffsetConfig, fromUntilConfig) match { + case (Some(_), Some(_)) => + throw new IllegalArgumentException( + "date_filter: Cannot use both reference+offset and from/until syntaxes simultaneously" + ) + + case (Some(config), None) => + Some(config) + + case (None, Some(config)) => + Some(config) + + case (None, None) => + logger.warn("date_filter: configuration found but no valid syntax detected (expected reference+offset or from/until).") + None } } - /** @param dateReferencePath the path in the config holding Date Reference - * @param offsetPath the path in the config holding Date Offset - * @return True if the config holds either both keys or none - */ - private def testArguments(dateReferencePath: String, offsetPath: String)(implicit config: Config): Boolean = { - // there must be both keys or none => true - (config.hasPath(dateReferencePath) && config.hasPath(offsetPath)) || - (!config.hasPath(dateReferencePath) && !config.hasPath(offsetPath)) + private def getFromUntil(config: Config): Option[DateFilterConfig] = { + val hasFrom = config.hasPath("from") + val hasUntil = config.hasPath("until") + (hasFrom, hasUntil) match { + case (true, false) => + val from = LocalDate.parse(config.getString("from")) + Some(DateFilterConfig.FromOnly(from)) + case (false, true) => + val until = LocalDate.parse(config.getString("until")) + Some(DateFilterConfig.UntilOnly(until)) + case (true, true) => + val from = LocalDate.parse(config.getString("from")) + val until = LocalDate.parse(config.getString("until")) + + if (!from.isBefore(until)) { + throw new IllegalArgumentException( + s"date_filter: 'from' ($from) must be before 'until' ($until). " + + s"For a single day, use: from = '$from', until = '${from.plusDays(1)}'" + ) + } + Some(DateFilterConfig.Range(DateRange(from, until))) + case _ => None + } } - /** @param dateReferencePath the path in the config holding Date Reference - * @param offsetPath the path in the config holding Date Offset - * @return The DateRange corresponding to the reference and the offset - */ - private def getArguments(dateReferencePath: String, offsetPath: String)(implicit config: Config): DateRange = { - val referenceDate = config.getString(dateReferencePath) - val offset = config.getString(offsetPath) - DateRange(referenceDate, offset) + private def getReferenceOffset(config: Config): Option[DateFilterConfig] = { + val hasReference = config.hasPath("reference") + val hasOffset = config.hasPath("offset") + + (hasReference, hasOffset) match { + case (true, true) => + val reference = config.getString("reference") + val offset = config.getString("offset") + Some(DateFilterConfig.Range(DateRange(reference, offset))) + case (true, false) | (false, true) => + throw new IllegalArgumentException( + "date_filter with reference/offset requires both 'reference' and 'offset'" + ) + case _ => + None + } } /** @param config The typesafe Config object holding the configuration. @@ -57,3 +91,11 @@ trait DateFilterConfigurator { }.toOption } } + +sealed trait DateFilterConfig + +object DateFilterConfig { + case class Range(dateRange: DateRange) extends DateFilterConfig + case class FromOnly(from: LocalDate) extends DateFilterConfig // >= from (inclusive) + case class UntilOnly(until: LocalDate) extends DateFilterConfig // < until (exclusive) +} diff --git a/core/src/main/scala/com/amadeus/dataio/core/time/Formats.scala b/core/src/main/scala/com/amadeus/dataio/core/time/Formats.scala index fa247b4..eb2afb1 100644 --- a/core/src/main/scala/com/amadeus/dataio/core/time/Formats.scala +++ b/core/src/main/scala/com/amadeus/dataio/core/time/Formats.scala @@ -6,8 +6,7 @@ import java.time.temporal.ChronoField import java.time.temporal.ChronoField._ /** - * Collection of [[com.amadeus.bdp.api.functions.time.Formats.LabeledFormatter]]. - * Formats are related to dates and can be used as placeholder in a [[com.amadeus.bdp.api.functions.paths.TemplatePath]]. + * Collection of LabeledFormatter. * * @see Documentation for pattern symbols: [[https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html]] */ @@ -118,7 +117,7 @@ object Formats { val LbxSecond = LabeledFormatter("second", DateTimeFormatter.ofPattern("ss"), """(\d{2})""", Set(SECOND_OF_MINUTE)) /** - * Set of all [[com.amadeus.bdp.api.functions.time.Formats.LabeledFormatter LabeledFormatter]] defined into [[com.amadeus.bdp.api.functions.time.Formats Formats]] object + * Set of all */ val AllFormats: Set[LabeledFormatter] = Set(BasicIsoDate, IsoLocalDate, IsoLocalTime, IsoOrdinalDate, IsoWeekDate, Iso8601, IsoInstant, LbxYear, LbxMonth, LbxWeek, LbxDay, LbxHour, LbxMinute, LbxSecond) @@ -134,7 +133,7 @@ object Formats { private val labelToFormatterMap = AllFormats.map(formatter => (formatter.label, formatter)).toMap /** - * Gets a [[com.amadeus.bdp.api.functions.time.Formats.LabeledFormatter LabeledFormatter]] from its label. + * Gets a LabeledFormatter from its label. * * @param label the label * @return an [[scala.Option Option]] containing the formatter or [[scala.None None]] if there is no formatter for the given label diff --git a/core/src/main/scala/com/amadeus/dataio/core/transformers/DateFilterer.scala b/core/src/main/scala/com/amadeus/dataio/core/transformers/DateFilterer.scala index f5e6a3f..1038175 100644 --- a/core/src/main/scala/com/amadeus/dataio/core/transformers/DateFilterer.scala +++ b/core/src/main/scala/com/amadeus/dataio/core/transformers/DateFilterer.scala @@ -1,39 +1,75 @@ package com.amadeus.dataio.core.transformers +import com.amadeus.dataio.config.fields.DateFilterConfig import com.amadeus.dataio.core.Logging import com.amadeus.dataio.core.time.DateRange import org.apache.spark.sql.{Column, Dataset} + import java.sql.Date -import java.time.LocalTime +import java.time.{LocalDate, LocalTime} trait DateFilterer extends Logging { - val dateRange: Option[DateRange] + val dateFilterConfig: Option[DateFilterConfig] val dateColumn: Option[Column] def applyDateFilter[T](ds: Dataset[T]): Dataset[T] = { - (dateRange, dateColumn) match { - case (Some(range), Some(column)) => - val dateFrom: Date = Date.valueOf(range.from.toLocalDate) - - // if we have a time after midnight, include the right extreme of the DateRange - val dateUntil = - if (range.until.toLocalTime.isAfter(LocalTime.of(0, 0))) Date.valueOf(range.until.plusDays(1).toLocalDate) - else Date.valueOf(range.until.toLocalDate) - - logger.info(s"date_filter: $column >= $dateFrom and $column < $dateUntil") - ds.filter(column >= dateFrom and column < dateUntil) - case (Some(range), None) => throw new Exception("date_filter requires a date column") - case (None, Some(column)) => throw new Exception("date_filter requires a date range") + (dateFilterConfig, dateColumn) match { + case (Some(config), Some(column)) => + applyFilter(ds, column, config) + case (Some(_), None) => + throw new Exception("date_filter requires a date column") + case (None, Some(_)) => + throw new Exception("date_filter requires a date configuration") case (_, _) => ds } } + + private def applyFilter[T](ds: Dataset[T], column: Column, filterConfig: DateFilterConfig): Dataset[T] = { + filterConfig match { + case DateFilterConfig.Range(range) => + applyRangeFilter(ds, column, range) + + case DateFilterConfig.FromOnly(fromStr) => + applyFromFilter(ds, column, fromStr) + + case DateFilterConfig.UntilOnly(untilStr) => + applyUntilFilter(ds, column, untilStr) + } + } + + private def applyRangeFilter[T](ds: Dataset[T], column: Column, range: DateRange): Dataset[T] = { + val dateFrom = Date.valueOf(range.from.toLocalDate) + + val dateUntil = + if (range.until.toLocalTime.isAfter(LocalTime.of(0, 0))) + Date.valueOf(range.until.plusDays(1).toLocalDate) + else + Date.valueOf(range.until.toLocalDate) + + logger.info(s"date_filter: $column >= $dateFrom AND $column < $dateUntil") + ds.filter(column >= dateFrom && column < dateUntil) + } + + private def applyFromFilter[T](ds: Dataset[T], column: Column, dateFrom: LocalDate): Dataset[T] = { + val from = Date.valueOf(dateFrom) + + logger.info(s"date_filter: $column >= $from") + ds.filter(column >= from) + } + + private def applyUntilFilter[T](ds: Dataset[T], column: Column, dateUntil: LocalDate): Dataset[T] = { + val until = Date.valueOf(dateUntil) + + logger.info(s"date_filter: $column < $until") + ds.filter(column < until) + } } object DateFilterer { - def apply[T](range: Option[DateRange], column: Option[Column]): Dataset[T] => Dataset[T] = { + def apply[T](filterConfig: Option[DateFilterConfig], column: Option[Column]): Dataset[T] => Dataset[T] = { new DateFilterer { - override val dateRange: Option[DateRange] = range + override val dateFilterConfig: Option[DateFilterConfig] = filterConfig override val dateColumn: Option[Column] = column }.applyDateFilter } diff --git a/core/src/main/scala/com/amadeus/dataio/pipes/spark/batch/SparkInput.scala b/core/src/main/scala/com/amadeus/dataio/pipes/spark/batch/SparkInput.scala index 62ed153..9865d09 100644 --- a/core/src/main/scala/com/amadeus/dataio/pipes/spark/batch/SparkInput.scala +++ b/core/src/main/scala/com/amadeus/dataio/pipes/spark/batch/SparkInput.scala @@ -1,6 +1,6 @@ package com.amadeus.dataio.pipes.spark.batch -import com.amadeus.dataio.core.time.DateRange +import com.amadeus.dataio.config.fields.DateFilterConfig import com.amadeus.dataio.core.transformers.{Coalescer, DateFilterer, Repartitioner} import com.amadeus.dataio.core.{Input, Logging, SchemaRegistry} import com.amadeus.dataio.pipes.spark.{SparkPathSource, SparkSource, SparkSourceConfigurator, SparkTableSource} @@ -16,16 +16,16 @@ import scala.util.Try * @param config Contains the Typesafe Config object that was used at instantiation to configure this entity. */ case class SparkInput( - name: String, - source: Option[SparkSource] = None, - options: Map[String, String] = Map(), - dateRange: Option[DateRange] = None, - dateColumn: Option[Column] = None, - repartitionExprs: Option[String] = None, - repartitionNum: Option[Int] = None, - coalesce: Option[Int] = None, - schema: Option[String] = None, - config: Config = ConfigFactory.empty() + name: String, + source: Option[SparkSource] = None, + options: Map[String, String] = Map(), + dateFilterConfig: Option[DateFilterConfig] = None, + dateColumn: Option[Column] = None, + repartitionExprs: Option[String] = None, + repartitionNum: Option[Int] = None, + coalesce: Option[Int] = None, + schema: Option[String] = None, + config: Config = ConfigFactory.empty() ) extends Input with DateFilterer with Repartitioner @@ -86,7 +86,7 @@ object SparkInput extends SparkSourceConfigurator { val source = getSparkSource val options = getOptions - val dateRange = getDateFilterRange + val dateFilterConfig = getDateFilterConfig val dateColumn = getDateFilterColumn val repartitionExprs = getRepartitionExprs val repartitionNum = getRepartitionNum @@ -97,7 +97,7 @@ object SparkInput extends SparkSourceConfigurator { name, source, options, - dateRange, + dateFilterConfig, dateColumn, repartitionExprs, repartitionNum, diff --git a/core/src/main/scala/com/amadeus/dataio/pipes/spark/streaming/SparkInput.scala b/core/src/main/scala/com/amadeus/dataio/pipes/spark/streaming/SparkInput.scala index 0da5de3..360a06f 100644 --- a/core/src/main/scala/com/amadeus/dataio/pipes/spark/streaming/SparkInput.scala +++ b/core/src/main/scala/com/amadeus/dataio/pipes/spark/streaming/SparkInput.scala @@ -1,6 +1,6 @@ package com.amadeus.dataio.pipes.spark.streaming -import com.amadeus.dataio.core.time.DateRange +import com.amadeus.dataio.config.fields.DateFilterConfig import com.amadeus.dataio.core.transformers.{Coalescer, DateFilterer, Repartitioner} import com.amadeus.dataio.core.{Input, Logging, SchemaRegistry} import com.amadeus.dataio.pipes.spark.{SparkPathSource, SparkSource, SparkTableSource} @@ -17,16 +17,16 @@ import scala.util.Try * @param config Contains the Typesafe Config object that was used at instantiation to configure this entity. */ case class SparkInput( - name: String, - source: Option[SparkSource] = None, - options: Map[String, String] = Map(), - dateRange: Option[DateRange] = None, - dateColumn: Option[Column] = None, - repartitionExprs: Option[String] = None, - repartitionNum: Option[Int] = None, - coalesce: Option[Int] = None, - schema: Option[String] = None, - config: Config = ConfigFactory.empty() + name: String, + source: Option[SparkSource] = None, + options: Map[String, String] = Map(), + dateFilterConfig: Option[DateFilterConfig] = None, + dateColumn: Option[Column] = None, + repartitionExprs: Option[String] = None, + repartitionNum: Option[Int] = None, + coalesce: Option[Int] = None, + schema: Option[String] = None, + config: Config = ConfigFactory.empty() ) extends Input with Repartitioner with Coalescer @@ -87,7 +87,7 @@ object SparkInput { val source = getSparkSource val options = getOptions - val dateRange = getDateFilterRange + val dateFilterConfig = getDateFilterConfig val dateColumn = getDateFilterColumn val repartitionExprs = getRepartitionExprs val repartitionNum = getRepartitionNum @@ -98,7 +98,7 @@ object SparkInput { name, source, options, - dateRange, + dateFilterConfig, dateColumn, repartitionExprs, repartitionNum, diff --git a/core/src/test/scala/com/amadeus/dataio/config/fields/DateFilterConfiguratorTest.scala b/core/src/test/scala/com/amadeus/dataio/config/fields/DateFilterConfiguratorTest.scala index 1bfa83f..784f29a 100644 --- a/core/src/test/scala/com/amadeus/dataio/config/fields/DateFilterConfiguratorTest.scala +++ b/core/src/test/scala/com/amadeus/dataio/config/fields/DateFilterConfiguratorTest.scala @@ -1,86 +1,227 @@ package com.amadeus.dataio.config.fields -import com.typesafe.config.{Config, ConfigFactory} +import com.amadeus.dataio.testutils.ConfigCreator import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import java.time.LocalDate -class DateFilterConfiguratorTest extends AnyFlatSpec with Matchers { - behavior of "getDateFilterRange" +class DateFilterConfiguratorTest extends AnyFlatSpec with Matchers with ConfigCreator { - it should "return date range when proper config exists" in { - val configStr = """ - date_filter { - reference= "2023-01-01" - offset = "+5D" - } - """ - implicit val config: Config = ConfigFactory.parseString(configStr) + // Test fixture to create a configurator instance + trait TestConfigurator extends DateFilterConfigurator - val result = getDateFilterRange - result.isDefined should be(true) + behavior of "DateFilterConfigurator.getDateFilterConfig" - val dateRange = result.get - dateRange.from.toLocalDate should be(LocalDate.parse("2023-01-01")) - dateRange.until.toLocalDate should be(LocalDate.parse("2023-01-06")) + // ===== No Configuration Tests ===== + + it should "return None when date_filter config is not present" in new TestConfigurator { + implicit val config = createConfig(""" + |some_other_config = "value" + |""".stripMargin) + + getDateFilterConfig shouldBe None + } + + // ===== reference+offset Syntax Tests ===== + + it should "parse reference+offset syntax" in new TestConfigurator { + implicit val config = createConfig(""" + |date_filter { + | reference = "2026-01-25" + | offset = "-7D" + |} + |""".stripMargin) + + val result = getDateFilterConfig + result shouldBe defined + result.get shouldBe a[DateFilterConfig.Range] } - it should "throw IllegalArgumentException when only reference exists" in { - val configStr = - """ - date_filter.reference = "2023-01-01" - """ - implicit val config: Config = ConfigFactory.parseString(configStr) + it should "throw exception when only reference is provided (missing offset)" in new TestConfigurator { + implicit val config = createConfig(""" + |date_filter { + | reference = "2026-01-25" + |} + |""".stripMargin) an[IllegalArgumentException] should be thrownBy { - getDateFilterRange + getDateFilterConfig } } - it should "throw IllegalArgumentException when only offset exists" in { - val configStr = - """ - date_filter.offset = "+5D" - """ - implicit val config: Config = ConfigFactory.parseString(configStr) + it should "throw exception when only offset is provided (missing reference)" in new TestConfigurator { + implicit val config = createConfig(""" + |date_filter { + | offset = "-7D" + |} + |""".stripMargin) an[IllegalArgumentException] should be thrownBy { - getDateFilterRange + getDateFilterConfig } } - it should "return None when neither reference nor offset exists" in { - val configStr = - """ - some_other_config = "value" - """ - implicit val config: Config = ConfigFactory.parseString(configStr) + // ===== from/until Syntax Tests ===== - val result = getDateFilterRange - result should be(None) + it should "parse new syntax with both from and until" in new TestConfigurator { + implicit val config = createConfig(""" + |date_filter { + | from = "2026-01-18" + | until = "2026-01-25" + |} + |""".stripMargin) + + val result = getDateFilterConfig + result shouldBe defined + result.get shouldBe a[DateFilterConfig.Range] } - "getDateFilterColumn" should "return a Column when column exists in config" in { - val configStr = - """ - date_filter.column = "created_at" - """ - implicit val config: Config = ConfigFactory.parseString(configStr) + it should "parse new syntax with only from" in new TestConfigurator { + implicit val config = createConfig(""" + |date_filter { + | from = "2026-01-20" + |} + |""".stripMargin) + + val result = getDateFilterConfig + result shouldBe defined + result.get match { + case DateFilterConfig.FromOnly(date) => + date shouldBe LocalDate.parse("2026-01-20") + case _ => fail("Expected FromOnly") + } + } - val result = getDateFilterColumn - result.isDefined should be(true) - // Cannot directly compare Column objects, so checking if defined is sufficient + it should "parse new syntax with only until" in new TestConfigurator { + implicit val config = createConfig(""" + |date_filter { + | until = "2026-01-25" + |} + |""".stripMargin) + + val result = getDateFilterConfig + result shouldBe defined + result.get match { + case DateFilterConfig.UntilOnly(date) => + date shouldBe LocalDate.parse("2026-01-25") + case _ => fail("Expected UntilOnly") + } + } + + // ===== Error Tests ===== + + it should "throw exception when from == until" in new TestConfigurator { + implicit val configStr = """ + |date_filter { + | column = "bookingDate" + | from = "2026-01-15" + | until = "2026-01-15" + |} + |""".stripMargin + + implicit val config = createConfig(configStr) + + an[IllegalArgumentException] should be thrownBy { + getDateFilterConfig + } + } + + it should "throw exception when reference, offset and from are present" in new TestConfigurator { + implicit val config = createConfig(""" + |date_filter { + | reference = "2026-01-25" + | offset = "-7D" + | from = "2026-01-18" + |} + |""".stripMargin) + + an[IllegalArgumentException] should be thrownBy { + getDateFilterConfig + } + } + + it should "throw exception when reference, offset, and until are present" in new TestConfigurator { + implicit val config = createConfig(""" + |date_filter { + | reference = "2026-01-25" + | offset = "-7D" + | until = "2026-01-30" + |} + |""".stripMargin) + + an[IllegalArgumentException] should be thrownBy { + getDateFilterConfig + } + } + + // ===== Invalid Configuration Tests ===== + + it should "return None and log warning when date_filter exists but has no valid syntax" in new TestConfigurator { + implicit val config = createConfig(""" + |date_filter { + | column = "booking_date" + |} + |""".stripMargin) + + // Should return None and log a warning + getDateFilterConfig shouldBe None + } + + it should "throw exception for invalid date format in from" in new TestConfigurator { + implicit val config = createConfig(""" + |date_filter { + | from = "invalid-date" + |} + |""".stripMargin) + + an[Exception] should be thrownBy { + getDateFilterConfig + } } - it should "return None when column doesn't exist in config" in { - val configStr = - """ - some_other_config = "value" - """ - implicit val config: Config = ConfigFactory.parseString(configStr) + it should "throw exception for invalid date format in until" in new TestConfigurator { + implicit val config = createConfig(""" + |date_filter { + | until = "not-a-date" + |} + |""".stripMargin) + + an[Exception] should be thrownBy { + getDateFilterConfig + } + } + + // ===== Column Configuration Tests ===== + + behavior of "DateFilterConfigurator.getDateFilterColumn" + + it should "return column when specified" in new TestConfigurator { + implicit val config = createConfig(""" + |date_filter { + | column = "booking_date" + |} + |""".stripMargin) val result = getDateFilterColumn - result should be(None) + result shouldBe defined + } + + it should "return None when column is not specified" in new TestConfigurator { + implicit val config = createConfig(""" + |date_filter { + | from = "2026-01-20" + |} + |""".stripMargin) + + getDateFilterColumn shouldBe None + } + + it should "return None when date_filter is not present" in new TestConfigurator { + implicit val config = createConfig(""" + |other_config = "value" + |""".stripMargin) + + getDateFilterColumn shouldBe None } } diff --git a/core/src/test/scala/com/amadeus/dataio/core/transformers/DateFiltererTest.scala b/core/src/test/scala/com/amadeus/dataio/core/transformers/DateFiltererTest.scala index 4794888..f81a297 100644 --- a/core/src/test/scala/com/amadeus/dataio/core/transformers/DateFiltererTest.scala +++ b/core/src/test/scala/com/amadeus/dataio/core/transformers/DateFiltererTest.scala @@ -1,139 +1,258 @@ package com.amadeus.dataio.core.transformers +import com.amadeus.dataio.config.fields.DateFilterConfig import com.amadeus.dataio.core.time.DateRange import com.amadeus.dataio.testutils.SparkSpec -import org.apache.spark.sql.{Column, Dataset} +import org.apache.spark.sql.Dataset import org.apache.spark.sql.functions.col -import org.scalatest.matchers.should.Matchers import java.sql.Date -import java.time.LocalDateTime - -case class TestRecord(id: Int, eventDate: Date, value: Double) -class DateFiltererTest extends SparkSpec with Matchers { - - val testData = Seq( - TestRecord(1, Date.valueOf("2023-01-01"), 10.0), - TestRecord(2, Date.valueOf("2023-01-15"), 20.0), - TestRecord(3, Date.valueOf("2023-01-31"), 30.0), - TestRecord(4, Date.valueOf("2023-02-01"), 40.0), - TestRecord(5, Date.valueOf("2023-02-15"), 50.0), - TestRecord(6, Date.valueOf("2023-02-28"), 60.0), - TestRecord(7, Date.valueOf("2023-03-01"), 70.0) - ) - - "DateFilterer" should "filter dataset with proper date range" in sparkTest { spark => +import java.time.{LocalDate, LocalDateTime} + +// Case classes must be outside the test class for Spark encoder generation +case class EventData(id: Int, eventDate: Date, name: String) +case class CustomEventData(id: Int, timestamp: Date, name: String) + +class DateFiltererTest extends SparkSpec { + + // Helper to create test datasets + def createTestDataset()(implicit spark: org.apache.spark.sql.SparkSession): Dataset[EventData] = { import spark.implicits._ + Seq( + EventData(1, Date.valueOf("2026-01-15"), "Event 1"), + EventData(2, Date.valueOf("2026-01-18"), "Event 2"), + EventData(3, Date.valueOf("2026-01-20"), "Event 3"), + EventData(4, Date.valueOf("2026-01-22"), "Event 4"), + EventData(5, Date.valueOf("2026-01-25"), "Event 5"), + EventData(6, Date.valueOf("2026-01-28"), "Event 6"), + EventData(7, Date.valueOf("2026-01-30"), "Event 7") + ).toDS() + } - val testDs: Dataset[TestRecord] = spark.createDataset(testData) - // Create date range for all of January 2023 - val rangeStart = LocalDateTime.of(2023, 1, 1, 0, 0) - val rangeEnd = LocalDateTime.of(2023, 1, 31, 23, 59, 59) - val dateRange = DateRange(rangeStart, rangeEnd) + behavior of "DateFilterer.applyDateFilter" - // Create a date filterer with our range and column - val filterer = DateFilterer[TestRecord](Some(dateRange), Some(col("eventDate"))) + // ===== No Filter Tests ===== - // Apply filter - val filtered = filterer(testDs) + it should "return original dataset when both config and column are None" in sparkTest { implicit spark => + val ds = createTestDataset() + val filterer = DateFilterer[EventData](None, None) - // Check results - filtered.count() shouldBe 3 - filtered.collect().map(_.id) should contain theSameElementsAs Seq(1, 2, 3) + val result = filterer(ds) + result should haveCountOf(ds.count()) } - it should "filter with midnight time in until date correctly" in sparkTest { spark => - import spark.implicits._ + it should "throw exception when config is provided but column is None" in sparkTest { implicit spark => + val ds = createTestDataset() + val config = DateFilterConfig.FromOnly(LocalDate.parse("2026-01-20")) + val filterer = DateFilterer[EventData](Some(config), None) - val testDs: Dataset[TestRecord] = spark.createDataset(testData) - // Create date range ending at midnight - val rangeStart = LocalDateTime.of(2023, 1, 1, 0, 0) - val rangeEnd = LocalDateTime.of(2023, 2, 1, 0, 0) // Midnight - val dateRange = DateRange(rangeStart, rangeEnd) + an[Exception] should be thrownBy { + filterer(ds) + } + } - // Create a date filterer with our range and column - val filterer = DateFilterer[TestRecord](Some(dateRange), Some(col("eventDate"))) + it should "throw exception when column is provided but config is None" in sparkTest { implicit spark => + val ds = createTestDataset() + val filterer = DateFilterer[EventData](None, Some(col("eventDate"))) - // Apply filter - val filtered = filterer(testDs) + an[Exception] should be thrownBy { + filterer(ds) + } + } + + // ===== FromOnly Tests (>= date) ===== + + it should "filter correctly with FromOnly (>= date)" in sparkTest { implicit spark => + val ds = createTestDataset() + val config = DateFilterConfig.FromOnly(LocalDate.parse("2026-01-20")) + val filterer = DateFilterer[EventData](Some(config), Some(col("eventDate"))) + + val result = filterer(ds) - // Check results - should include January but not February 1st - filtered.count() shouldBe 3 - filtered.collect().map(_.id) should contain theSameElementsAs Seq(1, 2, 3) + result should haveCountOf(5) + result.collect().map(_.id) should contain theSameElementsAs Seq(3, 4, 5, 6, 7) } - it should "filter with time after midnight in until date correctly" in sparkTest { spark => - import spark.implicits._ + it should "include the boundary date with FromOnly" in sparkTest { implicit spark => + val ds = createTestDataset() + val config = DateFilterConfig.FromOnly(LocalDate.parse("2026-01-20")) + val filterer = DateFilterer[EventData](Some(config), Some(col("eventDate"))) - val testDs: Dataset[TestRecord] = spark.createDataset(testData) - // Create date range ending just after midnight - val rangeStart = LocalDateTime.of(2023, 1, 1, 0, 0) - val rangeEnd = LocalDateTime.of(2023, 2, 1, 0, 1) // Just after midnight - val dateRange = DateRange(rangeStart, rangeEnd) + val result = filterer(ds).collect() - // Create a date filterer with our range and column - val filterer = DateFilterer[TestRecord](Some(dateRange), Some(col("eventDate"))) + result.exists(_.eventDate == Date.valueOf("2026-01-20")) shouldBe true + } - // Apply filter - val filtered = filterer(testDs) + it should "return empty dataset when FromOnly date is after all events" in sparkTest { implicit spark => + val ds = createTestDataset() + val config = DateFilterConfig.FromOnly(LocalDate.parse("2026-02-01")) + val filterer = DateFilterer[EventData](Some(config), Some(col("eventDate"))) - // Check results - should include January and February 1st - filtered should haveCountOf(4) - filtered.collect().map(_.id) should contain theSameElementsAs Seq(1, 2, 3, 4) + val result = filterer(ds) + result should beEmpty } - it should "throw exception when date range is specified but column is missing" in sparkTest { spark => - import spark.implicits._ + it should "return all events when FromOnly date is before all events" in sparkTest { implicit spark => + val ds = createTestDataset() + val config = DateFilterConfig.FromOnly(LocalDate.parse("2026-01-01")) + val filterer = DateFilterer[EventData](Some(config), Some(col("eventDate"))) - val testDs: Dataset[TestRecord] = spark.createDataset(testData) - val rangeStart = LocalDateTime.of(2023, 1, 1, 0, 0) - val rangeEnd = LocalDateTime.of(2023, 1, 31, 23, 59, 59) - val range = DateRange(rangeStart, rangeEnd) + val result = filterer(ds) + result should haveCountOf(7) + } - val filterer = new DateFilterer { - override val dateRange: Option[DateRange] = Some(range) - override val dateColumn: Option[Column] = None - } + // ===== UntilOnly Tests (< date) ===== - val exception = intercept[Exception] { - filterer.applyDateFilter(testDs) - } + it should "filter correctly with UntilOnly (< date)" in sparkTest { implicit spark => + val ds = createTestDataset() + val config = DateFilterConfig.UntilOnly(LocalDate.parse("2026-01-25")) + val filterer = DateFilterer[EventData](Some(config), Some(col("eventDate"))) - exception.getMessage shouldBe "date_filter requires a date column" + val result = filterer(ds) + + result should haveCountOf(4) + result.collect().map(_.id) should contain theSameElementsAs Seq(1, 2, 3, 4) } - it should "throw exception when column is specified but date range is missing" in sparkTest { spark => - import spark.implicits._ + it should "exclude the boundary date with UntilOnly" in sparkTest { implicit spark => + val ds = createTestDataset() + val config = DateFilterConfig.UntilOnly(LocalDate.parse("2026-01-25")) + val filterer = DateFilterer[EventData](Some(config), Some(col("eventDate"))) - val testDs: Dataset[TestRecord] = spark.createDataset(testData) - val filterer = new DateFilterer { - override val dateRange: Option[DateRange] = None - override val dateColumn: Option[Column] = Some(col("eventDate")) - } + val result = filterer(ds).collect() - val exception = intercept[Exception] { - filterer.applyDateFilter(testDs) - } + result.exists(_.eventDate == Date.valueOf("2026-01-25")) shouldBe false + } + + it should "return empty dataset when UntilOnly date is before all events" in sparkTest { implicit spark => + val ds = createTestDataset() + val config = DateFilterConfig.UntilOnly(LocalDate.parse("2026-01-01")) + val filterer = DateFilterer[EventData](Some(config), Some(col("eventDate"))) + + val result = filterer(ds) + result should beEmpty + } + + it should "return all events when UntilOnly date is after all events" in sparkTest { implicit spark => + val ds = createTestDataset() + val config = DateFilterConfig.UntilOnly(LocalDate.parse("2026-02-01")) + val filterer = DateFilterer[EventData](Some(config), Some(col("eventDate"))) + + val result = filterer(ds) + result should haveCountOf(7) + } + + // ===== Range Tests ===== + + it should "filter correctly with DateRange using LocalDate" in sparkTest { implicit spark => + val ds = createTestDataset() + // DateRange.apply(LocalDate, LocalDate) converts to midnight LocalDateTime + val dateRange = DateRange(LocalDate.parse("2026-01-20"), LocalDate.parse("2026-01-25")) + val config = DateFilterConfig.Range(dateRange) + val filterer = DateFilterer[EventData](Some(config), Some(col("eventDate"))) + + val result = filterer(ds) + + // Filter: >= 2026-01-20 AND < 2026-01-25 (from inclusive, until exclusive) + // Should include: 2026-01-20, 2026-01-22 (ids 3, 4) + result should haveCountOf(2) + result.collect().map(_.id) should contain theSameElementsAs Seq(3, 4) + } + + it should "filter correctly with DateRange using reference and offset" in sparkTest { implicit spark => + val ds = createTestDataset() + // DateRange with reference and offset (legacy syntax) + val dateRange = DateRange("2026-01-20", "+5D") + val config = DateFilterConfig.Range(dateRange) + val filterer = DateFilterer[EventData](Some(config), Some(col("eventDate"))) + + val result = filterer(ds) + + // from = 2026-01-20, until = 2026-01-20 + 5D = 2026-01-25 + // Filter: >= 2026-01-20 AND < 2026-01-25 (from inclusive, until exclusive) + // Should include: 2026-01-20, 2026-01-22 (ids 3, 4) + result should haveCountOf(2) + result.collect().map(_.id) should contain theSameElementsAs Seq(3, 4) + } + + it should "include from boundary and exclude until boundary in Range" in sparkTest { implicit spark => + val ds = createTestDataset() + val dateRange = DateRange(LocalDate.parse("2026-01-20"), LocalDate.parse("2026-01-25")) + val config = DateFilterConfig.Range(dateRange) + val filterer = DateFilterer[EventData](Some(config), Some(col("eventDate"))) + + val result = filterer(ds).collect() + + // from is inclusive + result.exists(_.eventDate == Date.valueOf("2026-01-20")) shouldBe true + // until is exclusive + result.exists(_.eventDate == Date.valueOf("2026-01-25")) shouldBe false + } + + it should "return empty dataset when Range is outside all events" in sparkTest { implicit spark => + val ds = createTestDataset() + val dateRange = DateRange(LocalDate.parse("2026-02-01"), LocalDate.parse("2026-02-10")) + val config = DateFilterConfig.Range(dateRange) + val filterer = DateFilterer[EventData](Some(config), Some(col("eventDate"))) - exception.getMessage shouldBe "date_filter requires a date range" + val result = filterer(ds) + result should beEmpty } - it should "handle complex date ranges correctly" in sparkTest { spark => + it should "handle one day range correctly" in sparkTest { implicit spark => + val ds = createTestDataset() + // To get exactly one day, until should be the next day + val dateRange = DateRange(LocalDate.parse("2026-01-20"), LocalDate.parse("2026-01-21")) + val config = DateFilterConfig.Range(dateRange) + val filterer = DateFilterer[EventData](Some(config), Some(col("eventDate"))) + + val result = filterer(ds) + + result should haveCountOf(1) + result.collect().head.id shouldBe 3 + } + + it should "return an empty result when from == until" in sparkTest { implicit spark => + val ds = createTestDataset() + // To get exactly one day, until should be the next day + val dateRange = DateRange(LocalDate.parse("2026-01-20"), LocalDate.parse("2026-01-20")) + val config = DateFilterConfig.Range(dateRange) + val filterer = DateFilterer[EventData](Some(config), Some(col("eventDate"))) + + val result = filterer(ds) + + result should haveCountOf(0) + } + + // ===== Combined Logic Tests ===== + + it should "work with different column names" in sparkTest { implicit spark => import spark.implicits._ - val testDs: Dataset[TestRecord] = spark.createDataset(testData) - // Create date range for a specific period - val rangeStart = LocalDateTime.of(2023, 1, 15, 0, 0) // Mid-January - val rangeEnd = LocalDateTime.of(2023, 2, 15, 0, 0) // Mid-February - val dateRange = DateRange(rangeStart, rangeEnd) + val ds = Seq( + CustomEventData(1, Date.valueOf("2026-01-15"), "Event 1"), + CustomEventData(2, Date.valueOf("2026-01-25"), "Event 2") + ).toDS() + + val config = DateFilterConfig.FromOnly(LocalDate.parse("2026-01-20")) + val filterer = DateFilterer[CustomEventData](Some(config), Some(col("timestamp"))) + + val result = filterer(ds) + result should haveCountOf(1) + result.collect().head.id shouldBe 2 + } - // Create a date filterer with our range and column - val filterer = DateFilterer[TestRecord](Some(dateRange), Some(col("eventDate"))) - // Apply filter - val filtered = filterer(testDs) + it should "work with Dataset[Row] not just typed datasets" in sparkTest { implicit spark => + val ds = createTestDataset().toDF() + val config = DateFilterConfig.FromOnly(LocalDate.parse("2026-01-20")) + + // Note: For untyped datasets, we need to use the untyped apply method + val filterer = new DateFilterer { + override val dateFilterConfig = Some(config) + override val dateColumn = Some(col("eventDate")) + } - // Should include Jan 15 through Feb 15 - filtered should haveCountOf(3) - filtered should containTheSameRowsAs(testDs.filter($"eventDate" >= "2023-01-15" and $"eventDate" < "2023-02-15")) + val result = filterer.applyDateFilter(ds) + result should haveCountOf(5) } -} +} \ No newline at end of file diff --git a/core/src/test/scala/com/amadeus/dataio/integration/DateFilterIntegrationTest.scala b/core/src/test/scala/com/amadeus/dataio/integration/DateFilterIntegrationTest.scala new file mode 100644 index 0000000..3a36769 --- /dev/null +++ b/core/src/test/scala/com/amadeus/dataio/integration/DateFilterIntegrationTest.scala @@ -0,0 +1,242 @@ +package com.amadeus.dataio.integration + +import com.amadeus.dataio.config.fields.DateFilterConfigurator +import com.amadeus.dataio.core.transformers.DateFilterer +import com.amadeus.dataio.testutils.SparkSpec +import org.apache.spark.sql.Dataset + +import java.sql.Date + +// Case class must be outside the test class for Spark encoder generation +case class Booking(id: Int, bookingDate: Date, customerName: String) + +class DateFilterIntegrationTest extends SparkSpec { + + def createBookings()(implicit spark: org.apache.spark.sql.SparkSession): Dataset[Booking] = { + import spark.implicits._ + Seq( + Booking(1, Date.valueOf("2026-01-10"), "Alice"), + Booking(2, Date.valueOf("2026-01-15"), "Bob"), + Booking(3, Date.valueOf("2026-01-20"), "Charlie"), + Booking(4, Date.valueOf("2026-01-25"), "David"), + Booking(5, Date.valueOf("2026-01-30"), "Eve") + ).toDS() + } + + object TestConfigurator extends DateFilterConfigurator + + behavior of "DateFilter Integration (Configurator + Filterer)" + + // ===== Reference+Offset Syntax Integration Tests ===== + + it should "work end-to-end with reference+offset syntax" in sparkTest { implicit spark => + val configStr = """ + |date_filter { + | column = "bookingDate" + | reference = "2026-01-25" + | offset = "-7D" + |} + |""".stripMargin + + implicit val config = createConfig(configStr) + + val filterConfig = TestConfigurator.getDateFilterConfig + val column = TestConfigurator.getDateFilterColumn + + filterConfig shouldBe defined + column shouldBe defined + + val ds = createBookings() + val filterer = DateFilterer[Booking](filterConfig, column) + val result = filterer(ds) + + // Reference - offset = 2026-01-25 - 7D = 2026-01-18 + // Filter: >= 2026-01-18 AND < 2026-01-25 (from inclusive, until exclusive) + // Should include: 2026-01-20 (id 3) + result should haveCountOf(1) + result.collect().map(_.id) should contain theSameElementsAs Seq(3) + } + + // ===== From/Until Syntax Integration Tests ===== + + it should "work end-to-end with from/until syntax" in sparkTest { implicit spark => + val configStr = """ + |date_filter { + | column = "bookingDate" + | from = "2026-01-15" + | until = "2026-01-25" + |} + |""".stripMargin + + implicit val config = createConfig(configStr) + + val filterConfig = TestConfigurator.getDateFilterConfig + val column = TestConfigurator.getDateFilterColumn + + val ds = createBookings() + val filterer = DateFilterer[Booking](filterConfig, column) + val result = filterer(ds) + + // Filter: >= 2026-01-15 AND < 2026-01-25 + // Should include: 2026-01-15, 2026-01-20 (ids 2, 3) + result should haveCountOf(2) + result.collect().map(_.id) should contain theSameElementsAs Seq(2, 3) + } + + it should "work end-to-end with only from" in sparkTest { implicit spark => + val configStr = """ + |date_filter { + | column = "bookingDate" + | from = "2026-01-20" + |} + |""".stripMargin + + implicit val config = createConfig(configStr) + + val filterConfig = TestConfigurator.getDateFilterConfig + val column = TestConfigurator.getDateFilterColumn + + val ds = createBookings() + val filterer = DateFilterer[Booking](filterConfig, column) + val result = filterer(ds) + + // Should include bookings >= 2026-01-20 + result should haveCountOf(3) + result.collect().map(_.id) should contain theSameElementsAs Seq(3, 4, 5) + } + + it should "work end-to-end with only until" in sparkTest { implicit spark => + val configStr = """ + |date_filter { + | column = "bookingDate" + | until = "2026-01-25" + |} + |""".stripMargin + + implicit val config = createConfig(configStr) + + val filterConfig = TestConfigurator.getDateFilterConfig + val column = TestConfigurator.getDateFilterColumn + + val ds = createBookings() + val filterer = DateFilterer[Booking](filterConfig, column) + val result = filterer(ds) + + // Should include bookings < 2026-01-25 + result should haveCountOf(3) + result.collect().map(_.id) should contain theSameElementsAs Seq(1, 2, 3) + } + + // ===== Edge Cases Integration Tests ===== + + it should "handle single day range correctly (until = from + 1)" in sparkTest { implicit spark => + val configStr = """ + |date_filter { + | column = "bookingDate" + | from = "2026-01-20" + | until = "2026-01-21" + |} + |""".stripMargin + + implicit val config = createConfig(configStr) + + val filterConfig = TestConfigurator.getDateFilterConfig + val column = TestConfigurator.getDateFilterColumn + + val ds = createBookings() + val result = DateFilterer[Booking](filterConfig, column)(ds) + + // Should include only 2026-01-20 + result should haveCountOf(1) + result.collect().head.id shouldBe 3 + } + + // ===== No Filter Integration Test ===== + + it should "return all data when no date_filter configuration is present" in sparkTest { implicit spark => + val configStr = """ + |some_other_config = "value" + |""".stripMargin + + implicit val config = createConfig(configStr) + + val filterConfig = TestConfigurator.getDateFilterConfig + val column = TestConfigurator.getDateFilterColumn + + filterConfig shouldBe None + column shouldBe None + + val ds = createBookings() + val filterer = DateFilterer[Booking](filterConfig, column) + val result = filterer(ds) + + result should haveCountOf(5) + } + + // ===== Error Handling Integration Tests ===== + + it should "fail when column is missing but config is present" in sparkTest { implicit spark => + val configStr = """ + |date_filter { + | from = "2026-01-20" + |} + |""".stripMargin + + implicit val config = createConfig(configStr) + + val filterConfig = TestConfigurator.getDateFilterConfig + val column = TestConfigurator.getDateFilterColumn + + filterConfig shouldBe defined + column shouldBe None + + val ds = createBookings() + val filterer = DateFilterer[Booking](filterConfig, column) + + an[Exception] should be thrownBy { + filterer(ds) + } + } + + // ===== Boundary Condition Tests ===== + + it should "correctly handle inclusive from boundary" in sparkTest { implicit spark => + val configStr = """ + |date_filter { + | column = "bookingDate" + | from = "2026-01-20" + |} + |""".stripMargin + + implicit val config = createConfig(configStr) + + val filterConfig = TestConfigurator.getDateFilterConfig + val column = TestConfigurator.getDateFilterColumn + + val ds = createBookings() + val result = DateFilterer[Booking](filterConfig, column)(ds).collect() + + // Should include the boundary date 2026-01-20 + result.exists(_.bookingDate == Date.valueOf("2026-01-20")) shouldBe true + } + + it should "correctly handle exclusive until boundary" in sparkTest { implicit spark => + val configStr = """ + |date_filter { + | column = "bookingDate" + | until = "2026-01-20" + |} + |""".stripMargin + + implicit val config = createConfig(configStr) + + val filterConfig = TestConfigurator.getDateFilterConfig + val column = TestConfigurator.getDateFilterColumn + + val ds = createBookings() + val result = DateFilterer[Booking](filterConfig, column)(ds).collect() + + // Should NOT include the boundary date 2026-01-20 + result.exists(_.bookingDate == Date.valueOf("2026-01-20")) shouldBe false + } +} \ No newline at end of file diff --git a/docs/content/configuration/date-filters.md b/docs/content/configuration/date-filters.md index 2778513..6fc95cc 100644 --- a/docs/content/configuration/date-filters.md +++ b/docs/content/configuration/date-filters.md @@ -28,32 +28,88 @@ is available. ## Fields -`date_filter` requires a `reference` and an `offset`, in order to define a date range, as well as a `column` field in -order to specify where to apply the filter. +`date_filter` always requires a `column` field (where the filter is applied), plus **one** of the following range definitions: -{% include fields_table.md fields=page.fields %} +* **Relative range**: `reference` + `offset` +* **Absolute range**: `from` and/or `until` (you can provide both, or only one of them) -If the upper limit of the date range has a time past midnight, it will include the day (e.g. if the upper limit is 2022-09-28, 03h00, the 28th of September will be included in the range). The lower limit of the date range is always included. +`reference`+`offset` and `from`/`until` are mutually exclusive. +{: .warning} + +### Common field + +* `column` *(required)*: the date column used for filtering. + +### Relative range: `reference` + `offset` + +* `reference` *(required)*: the anchor date (iso-date format). +* `offset` *(required)*: a duration relative to `reference` (e.g. `-7D`, `-1M`, `+3D`), defining the other bound. + +The resulting interval is the range between `reference` and `reference + offset` (order doesn’t matter: the earliest becomes the lower bound, the latest becomes the upper bound). + +### Absolute range: `from` / `until` + +* `from` *(optional)*: lower bound (inclusive). +* `until` *(optional)*: upper bound (exclusive). + +If only `from` is provided, the filter is **open-ended on the upper side**. +If only `until` is provided, the filter is **open-ended on the lower side**. + +If both `from` and `until` are provided, `from` must always be strictly lower than `until`. {: .info} ## Example -Here's an example of input using the `date_filter` feature: +### Relative range (reference + offset) ```hocon -(...) - input { name = "my-input" type = "com.amadeus.dataio.pipes.spark.batch.SparkInput" format = "delta" path = "hdfs://path/to/data" + date_filter { reference = "2023-07-01" offset = "-7D" column = "date" - } + } } +``` + +### Absolute range (from + until) -(...) +```hocon +input { + name = "my-input" + type = "com.amadeus.dataio.pipes.spark.batch.SparkInput" + format = "delta" + path = "hdfs://path/to/data" + + date_filter { + from = "2023-06-24" + until = "2023-07-01" + column = "date" + } +} +``` + +### Absolute range (only one bound) + +Only `from`: + +```hocon +date_filter { + from = "2023-06-24" + column = "date" +} +``` + +Only `until`: + +```hocon +date_filter { + until = "2023-07-01" + column = "date" +} ```