Skip to content

Commit 30feaa2

Browse files
committed
update the date_filter feature with new absolute open-ended filtering (from / until)
1 parent d25e541 commit 30feaa2

9 files changed

Lines changed: 874 additions & 239 deletions

File tree

Lines changed: 74 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,85 @@
11
package com.amadeus.dataio.config.fields
22

3+
import com.amadeus.dataio.core.Logging
34
import com.amadeus.dataio.core.time.DateRange
45
import com.typesafe.config.Config
56
import org.apache.spark.sql.Column
67
import org.apache.spark.sql.functions.col
78

9+
import java.time.LocalDate
810
import scala.util.Try
911

10-
trait DateFilterConfigurator {
12+
trait DateFilterConfigurator extends Logging{
13+
def getDateFilterConfig(implicit config: Config): Option[DateFilterConfig] = {
14+
if (!config.hasPath("date_filter")) {
15+
return None
16+
}
1117

12-
/** @param config The typesafe Config object holding the configuration.
13-
* @return The date range, or None.
14-
* @throws IllegalArgumentException If the data was found but is not formatted properly.
15-
*/
16-
def getDateFilterRange(implicit config: Config): Option[DateRange] = {
17-
// If only reference or offset is present for a given syntax, this is incorrect
18-
val syntaxTwoOk = testArguments("date_filter.reference", "date_filter.offset")
19-
20-
// if any is false, we have an incomplete conf
21-
if (!syntaxTwoOk) {
22-
throw new IllegalArgumentException("Configuration incomplete for date filter reference/offset")
23-
} else {
24-
Try {
25-
getArguments("date_filter.reference", "date_filter.offset")
26-
}.toOption
18+
val filterConfig = config.getConfig("date_filter")
19+
20+
val referenceOffsetConfig = getReferenceOffset(filterConfig)
21+
val fromUntilConfig = getFromUntil(filterConfig)
22+
23+
(referenceOffsetConfig, fromUntilConfig) match {
24+
case (Some(_), Some(_)) =>
25+
throw new IllegalArgumentException(
26+
"date_filter: Cannot use both reference+offset and from/until syntaxes simultaneously"
27+
)
28+
29+
case (Some(config), None) =>
30+
Some(config)
31+
32+
case (None, Some(config)) =>
33+
Some(config)
34+
35+
case (None, None) =>
36+
logger.warn("date_filter: configuration found but no valid syntax detected (expected reference+offset or from/until).")
37+
None
2738
}
2839
}
2940

30-
/** @param dateReferencePath the path in the config holding Date Reference
31-
* @param offsetPath the path in the config holding Date Offset
32-
* @return True if the config holds either both keys or none
33-
*/
34-
private def testArguments(dateReferencePath: String, offsetPath: String)(implicit config: Config): Boolean = {
35-
// there must be both keys or none => true
36-
(config.hasPath(dateReferencePath) && config.hasPath(offsetPath)) ||
37-
(!config.hasPath(dateReferencePath) && !config.hasPath(offsetPath))
41+
private def getFromUntil(config: Config): Option[DateFilterConfig] = {
42+
val hasFrom = config.hasPath("from")
43+
val hasUntil = config.hasPath("until")
3844

45+
(hasFrom, hasUntil) match {
46+
case (true, false) =>
47+
val from = LocalDate.parse(config.getString("from"))
48+
Some(DateFilterConfig.FromOnly(from))
49+
case (false, true) =>
50+
val until = LocalDate.parse(config.getString("until"))
51+
Some(DateFilterConfig.UntilOnly(until))
52+
case (true, true) =>
53+
val from = LocalDate.parse(config.getString("from"))
54+
val until = LocalDate.parse(config.getString("until"))
55+
56+
if (!from.isBefore(until)) {
57+
throw new IllegalArgumentException(
58+
s"date_filter: 'from' ($from) must be before 'until' ($until). " +
59+
s"For a single day, use: from = '$from', until = '${from.plusDays(1)}'"
60+
)
61+
}
62+
Some(DateFilterConfig.Range(DateRange(from, until)))
63+
case _ => None
64+
}
3965
}
4066

41-
/** @param dateReferencePath the path in the config holding Date Reference
42-
* @param offsetPath the path in the config holding Date Offset
43-
* @return The DateRange corresponding to the reference and the offset
44-
*/
45-
private def getArguments(dateReferencePath: String, offsetPath: String)(implicit config: Config): DateRange = {
46-
val referenceDate = config.getString(dateReferencePath)
47-
val offset = config.getString(offsetPath)
48-
DateRange(referenceDate, offset)
67+
private def getReferenceOffset(config: Config): Option[DateFilterConfig] = {
68+
val hasReference = config.hasPath("reference")
69+
val hasOffset = config.hasPath("offset")
70+
71+
(hasReference, hasOffset) match {
72+
case (true, true) =>
73+
val reference = config.getString("reference")
74+
val offset = config.getString("offset")
75+
Some(DateFilterConfig.Range(DateRange(reference, offset)))
76+
case (true, false) | (false, true) =>
77+
throw new IllegalArgumentException(
78+
"date_filter with reference/offset requires both 'reference' and 'offset'"
79+
)
80+
case _ =>
81+
None
82+
}
4983
}
5084

5185
/** @param config The typesafe Config object holding the configuration.
@@ -57,3 +91,11 @@ trait DateFilterConfigurator {
5791
}.toOption
5892
}
5993
}
94+
95+
sealed trait DateFilterConfig
96+
97+
object DateFilterConfig {
98+
case class Range(dateRange: DateRange) extends DateFilterConfig
99+
case class FromOnly(from: LocalDate) extends DateFilterConfig // >= from (inclusive)
100+
case class UntilOnly(until: LocalDate) extends DateFilterConfig // < until (exclusive)
101+
}

core/src/main/scala/com/amadeus/dataio/core/time/Formats.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@ import java.time.temporal.ChronoField
66
import java.time.temporal.ChronoField._
77

88
/**
9-
* Collection of [[com.amadeus.bdp.api.functions.time.Formats.LabeledFormatter]].
10-
* Formats are related to dates and can be used as placeholder in a [[com.amadeus.bdp.api.functions.paths.TemplatePath]].
9+
* Collection of LabeledFormatter.
1110
*
1211
* @see Documentation for pattern symbols: [[https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html]]
1312
*/
@@ -118,7 +117,7 @@ object Formats {
118117
val LbxSecond = LabeledFormatter("second", DateTimeFormatter.ofPattern("ss"), """(\d{2})""", Set(SECOND_OF_MINUTE))
119118

120119
/**
121-
* Set of all [[com.amadeus.bdp.api.functions.time.Formats.LabeledFormatter LabeledFormatter]] defined into [[com.amadeus.bdp.api.functions.time.Formats Formats]] object
120+
* Set of all
122121
*/
123122
val AllFormats: Set[LabeledFormatter] =
124123
Set(BasicIsoDate, IsoLocalDate, IsoLocalTime, IsoOrdinalDate, IsoWeekDate, Iso8601, IsoInstant, LbxYear, LbxMonth, LbxWeek, LbxDay, LbxHour, LbxMinute, LbxSecond)
@@ -134,7 +133,7 @@ object Formats {
134133
private val labelToFormatterMap = AllFormats.map(formatter => (formatter.label, formatter)).toMap
135134

136135
/**
137-
* Gets a [[com.amadeus.bdp.api.functions.time.Formats.LabeledFormatter LabeledFormatter]] from its label.
136+
* Gets a LabeledFormatter from its label.
138137
*
139138
* @param label the label
140139
* @return an [[scala.Option Option]] containing the formatter or [[scala.None None]] if there is no formatter for the given label

core/src/main/scala/com/amadeus/dataio/core/transformers/DateFilterer.scala

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,75 @@
11
package com.amadeus.dataio.core.transformers
22

3+
import com.amadeus.dataio.config.fields.DateFilterConfig
34
import com.amadeus.dataio.core.Logging
45
import com.amadeus.dataio.core.time.DateRange
56
import org.apache.spark.sql.{Column, Dataset}
7+
68
import java.sql.Date
7-
import java.time.LocalTime
9+
import java.time.{LocalDate, LocalTime}
810

911
trait DateFilterer extends Logging {
10-
val dateRange: Option[DateRange]
12+
val dateFilterConfig: Option[DateFilterConfig]
1113
val dateColumn: Option[Column]
1214

1315
def applyDateFilter[T](ds: Dataset[T]): Dataset[T] = {
14-
(dateRange, dateColumn) match {
15-
case (Some(range), Some(column)) =>
16-
val dateFrom: Date = Date.valueOf(range.from.toLocalDate)
17-
18-
// if we have a time after midnight, include the right extreme of the DateRange
19-
val dateUntil =
20-
if (range.until.toLocalTime.isAfter(LocalTime.of(0, 0))) Date.valueOf(range.until.plusDays(1).toLocalDate)
21-
else Date.valueOf(range.until.toLocalDate)
22-
23-
logger.info(s"date_filter: $column >= $dateFrom and $column < $dateUntil")
24-
ds.filter(column >= dateFrom and column < dateUntil)
25-
case (Some(range), None) => throw new Exception("date_filter requires a date column")
26-
case (None, Some(column)) => throw new Exception("date_filter requires a date range")
16+
(dateFilterConfig, dateColumn) match {
17+
case (Some(config), Some(column)) =>
18+
applyFilter(ds, column, config)
19+
case (Some(_), None) =>
20+
throw new Exception("date_filter requires a date column")
21+
case (None, Some(_)) =>
22+
throw new Exception("date_filter requires a date configuration")
2723
case (_, _) =>
2824
ds
2925
}
3026
}
27+
28+
private def applyFilter[T](ds: Dataset[T], column: Column, filterConfig: DateFilterConfig): Dataset[T] = {
29+
filterConfig match {
30+
case DateFilterConfig.Range(range) =>
31+
applyRangeFilter(ds, column, range)
32+
33+
case DateFilterConfig.FromOnly(fromStr) =>
34+
applyFromFilter(ds, column, fromStr)
35+
36+
case DateFilterConfig.UntilOnly(untilStr) =>
37+
applyUntilFilter(ds, column, untilStr)
38+
}
39+
}
40+
41+
private def applyRangeFilter[T](ds: Dataset[T], column: Column, range: DateRange): Dataset[T] = {
42+
val dateFrom = Date.valueOf(range.from.toLocalDate)
43+
44+
val dateUntil =
45+
if (range.until.toLocalTime.isAfter(LocalTime.of(0, 0)))
46+
Date.valueOf(range.until.plusDays(1).toLocalDate)
47+
else
48+
Date.valueOf(range.until.toLocalDate)
49+
50+
logger.info(s"date_filter: $column >= $dateFrom AND $column < $dateUntil")
51+
ds.filter(column >= dateFrom && column < dateUntil)
52+
}
53+
54+
private def applyFromFilter[T](ds: Dataset[T], column: Column, dateFrom: LocalDate): Dataset[T] = {
55+
val from = Date.valueOf(dateFrom)
56+
57+
logger.info(s"date_filter: $column >= $from")
58+
ds.filter(column >= from)
59+
}
60+
61+
private def applyUntilFilter[T](ds: Dataset[T], column: Column, dateUntil: LocalDate): Dataset[T] = {
62+
val until = Date.valueOf(dateUntil)
63+
64+
logger.info(s"date_filter: $column < $until")
65+
ds.filter(column < until)
66+
}
3167
}
3268

3369
object DateFilterer {
34-
def apply[T](range: Option[DateRange], column: Option[Column]): Dataset[T] => Dataset[T] = {
70+
def apply[T](filterConfig: Option[DateFilterConfig], column: Option[Column]): Dataset[T] => Dataset[T] = {
3571
new DateFilterer {
36-
override val dateRange: Option[DateRange] = range
72+
override val dateFilterConfig: Option[DateFilterConfig] = filterConfig
3773
override val dateColumn: Option[Column] = column
3874
}.applyDateFilter
3975
}

core/src/main/scala/com/amadeus/dataio/pipes/spark/batch/SparkInput.scala

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.amadeus.dataio.pipes.spark.batch
22

3-
import com.amadeus.dataio.core.time.DateRange
3+
import com.amadeus.dataio.config.fields.DateFilterConfig
44
import com.amadeus.dataio.core.transformers.{Coalescer, DateFilterer, Repartitioner}
55
import com.amadeus.dataio.core.{Input, Logging, SchemaRegistry}
66
import com.amadeus.dataio.pipes.spark.{SparkPathSource, SparkSource, SparkSourceConfigurator, SparkTableSource}
@@ -16,16 +16,16 @@ import scala.util.Try
1616
* @param config Contains the Typesafe Config object that was used at instantiation to configure this entity.
1717
*/
1818
case class SparkInput(
19-
name: String,
20-
source: Option[SparkSource] = None,
21-
options: Map[String, String] = Map(),
22-
dateRange: Option[DateRange] = None,
23-
dateColumn: Option[Column] = None,
24-
repartitionExprs: Option[String] = None,
25-
repartitionNum: Option[Int] = None,
26-
coalesce: Option[Int] = None,
27-
schema: Option[String] = None,
28-
config: Config = ConfigFactory.empty()
19+
name: String,
20+
source: Option[SparkSource] = None,
21+
options: Map[String, String] = Map(),
22+
dateFilterConfig: Option[DateFilterConfig] = None,
23+
dateColumn: Option[Column] = None,
24+
repartitionExprs: Option[String] = None,
25+
repartitionNum: Option[Int] = None,
26+
coalesce: Option[Int] = None,
27+
schema: Option[String] = None,
28+
config: Config = ConfigFactory.empty()
2929
) extends Input
3030
with DateFilterer
3131
with Repartitioner
@@ -86,7 +86,7 @@ object SparkInput extends SparkSourceConfigurator {
8686
val source = getSparkSource
8787

8888
val options = getOptions
89-
val dateRange = getDateFilterRange
89+
val dateFilterConfig = getDateFilterConfig
9090
val dateColumn = getDateFilterColumn
9191
val repartitionExprs = getRepartitionExprs
9292
val repartitionNum = getRepartitionNum
@@ -97,7 +97,7 @@ object SparkInput extends SparkSourceConfigurator {
9797
name,
9898
source,
9999
options,
100-
dateRange,
100+
dateFilterConfig,
101101
dateColumn,
102102
repartitionExprs,
103103
repartitionNum,

core/src/main/scala/com/amadeus/dataio/pipes/spark/streaming/SparkInput.scala

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.amadeus.dataio.pipes.spark.streaming
22

3-
import com.amadeus.dataio.core.time.DateRange
3+
import com.amadeus.dataio.config.fields.DateFilterConfig
44
import com.amadeus.dataio.core.transformers.{Coalescer, DateFilterer, Repartitioner}
55
import com.amadeus.dataio.core.{Input, Logging, SchemaRegistry}
66
import com.amadeus.dataio.pipes.spark.{SparkPathSource, SparkSource, SparkTableSource}
@@ -17,16 +17,16 @@ import scala.util.Try
1717
* @param config Contains the Typesafe Config object that was used at instantiation to configure this entity.
1818
*/
1919
case class SparkInput(
20-
name: String,
21-
source: Option[SparkSource] = None,
22-
options: Map[String, String] = Map(),
23-
dateRange: Option[DateRange] = None,
24-
dateColumn: Option[Column] = None,
25-
repartitionExprs: Option[String] = None,
26-
repartitionNum: Option[Int] = None,
27-
coalesce: Option[Int] = None,
28-
schema: Option[String] = None,
29-
config: Config = ConfigFactory.empty()
20+
name: String,
21+
source: Option[SparkSource] = None,
22+
options: Map[String, String] = Map(),
23+
dateFilterConfig: Option[DateFilterConfig] = None,
24+
dateColumn: Option[Column] = None,
25+
repartitionExprs: Option[String] = None,
26+
repartitionNum: Option[Int] = None,
27+
coalesce: Option[Int] = None,
28+
schema: Option[String] = None,
29+
config: Config = ConfigFactory.empty()
3030
) extends Input
3131
with Repartitioner
3232
with Coalescer
@@ -87,7 +87,7 @@ object SparkInput {
8787
val source = getSparkSource
8888

8989
val options = getOptions
90-
val dateRange = getDateFilterRange
90+
val dateFilterConfig = getDateFilterConfig
9191
val dateColumn = getDateFilterColumn
9292
val repartitionExprs = getRepartitionExprs
9393
val repartitionNum = getRepartitionNum
@@ -98,7 +98,7 @@ object SparkInput {
9898
name,
9999
source,
100100
options,
101-
dateRange,
101+
dateFilterConfig,
102102
dateColumn,
103103
repartitionExprs,
104104
repartitionNum,

0 commit comments

Comments
 (0)