Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,51 +1,85 @@
package com.amadeus.dataio.config.fields

import com.amadeus.dataio.core.Logging
import com.amadeus.dataio.core.time.DateRange
import com.typesafe.config.Config
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.col

import java.time.LocalDate
import scala.util.Try

trait DateFilterConfigurator {
trait DateFilterConfigurator extends Logging{
def getDateFilterConfig(implicit config: Config): Option[DateFilterConfig] = {
if (!config.hasPath("date_filter")) {
return None
}

/** @param config The typesafe Config object holding the configuration.
* @return The date range, or None.
* @throws IllegalArgumentException If the data was found but is not formatted properly.
*/
def getDateFilterRange(implicit config: Config): Option[DateRange] = {
// If only reference or offset is present for a given syntax, this is incorrect
val syntaxTwoOk = testArguments("date_filter.reference", "date_filter.offset")

// if any is false, we have an incomplete conf
if (!syntaxTwoOk) {
throw new IllegalArgumentException("Configuration incomplete for date filter reference/offset")
} else {
Try {
getArguments("date_filter.reference", "date_filter.offset")
}.toOption
val filterConfig = config.getConfig("date_filter")

val referenceOffsetConfig = getReferenceOffset(filterConfig)
val fromUntilConfig = getFromUntil(filterConfig)

(referenceOffsetConfig, fromUntilConfig) match {
case (Some(_), Some(_)) =>
throw new IllegalArgumentException(
"date_filter: Cannot use both reference+offset and from/until syntaxes simultaneously"
)

case (Some(config), None) =>
Some(config)

case (None, Some(config)) =>
Some(config)

case (None, None) =>
logger.warn("date_filter: configuration found but no valid syntax detected (expected reference+offset or from/until).")
None
}
}

/** @param dateReferencePath the path in the config holding Date Reference
* @param offsetPath the path in the config holding Date Offset
* @return True if the config holds either both keys or none
*/
private def testArguments(dateReferencePath: String, offsetPath: String)(implicit config: Config): Boolean = {
// there must be both keys or none => true
(config.hasPath(dateReferencePath) && config.hasPath(offsetPath)) ||
(!config.hasPath(dateReferencePath) && !config.hasPath(offsetPath))
private def getFromUntil(config: Config): Option[DateFilterConfig] = {
val hasFrom = config.hasPath("from")
val hasUntil = config.hasPath("until")

(hasFrom, hasUntil) match {
case (true, false) =>
val from = LocalDate.parse(config.getString("from"))
Some(DateFilterConfig.FromOnly(from))
case (false, true) =>
val until = LocalDate.parse(config.getString("until"))
Some(DateFilterConfig.UntilOnly(until))
case (true, true) =>
val from = LocalDate.parse(config.getString("from"))
val until = LocalDate.parse(config.getString("until"))

if (!from.isBefore(until)) {
throw new IllegalArgumentException(
s"date_filter: 'from' ($from) must be before 'until' ($until). " +
s"For a single day, use: from = '$from', until = '${from.plusDays(1)}'"
)
}
Some(DateFilterConfig.Range(DateRange(from, until)))
case _ => None
}
}

/** @param dateReferencePath the path in the config holding Date Reference
* @param offsetPath the path in the config holding Date Offset
* @return The DateRange corresponding to the reference and the offset
*/
private def getArguments(dateReferencePath: String, offsetPath: String)(implicit config: Config): DateRange = {
val referenceDate = config.getString(dateReferencePath)
val offset = config.getString(offsetPath)
DateRange(referenceDate, offset)
private def getReferenceOffset(config: Config): Option[DateFilterConfig] = {
val hasReference = config.hasPath("reference")
val hasOffset = config.hasPath("offset")

(hasReference, hasOffset) match {
case (true, true) =>
val reference = config.getString("reference")
val offset = config.getString("offset")
Some(DateFilterConfig.Range(DateRange(reference, offset)))
case (true, false) | (false, true) =>
throw new IllegalArgumentException(
"date_filter with reference/offset requires both 'reference' and 'offset'"
)
case _ =>
None
}
}

/** @param config The typesafe Config object holding the configuration.
Expand All @@ -57,3 +91,11 @@ trait DateFilterConfigurator {
}.toOption
}
}

sealed trait DateFilterConfig

object DateFilterConfig {
case class Range(dateRange: DateRange) extends DateFilterConfig
case class FromOnly(from: LocalDate) extends DateFilterConfig // >= from (inclusive)
case class UntilOnly(until: LocalDate) extends DateFilterConfig // < until (exclusive)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import java.time.temporal.ChronoField
import java.time.temporal.ChronoField._

/**
* Collection of [[com.amadeus.bdp.api.functions.time.Formats.LabeledFormatter]].
* Formats are related to dates and can be used as placeholder in a [[com.amadeus.bdp.api.functions.paths.TemplatePath]].
* Collection of LabeledFormatter.
*
* @see Documentation for pattern symbols: [[https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html]]
*/
Expand Down Expand Up @@ -118,7 +117,7 @@ object Formats {
val LbxSecond = LabeledFormatter("second", DateTimeFormatter.ofPattern("ss"), """(\d{2})""", Set(SECOND_OF_MINUTE))

/**
* Set of all [[com.amadeus.bdp.api.functions.time.Formats.LabeledFormatter LabeledFormatter]] defined into [[com.amadeus.bdp.api.functions.time.Formats Formats]] object
* Set of all
*/
val AllFormats: Set[LabeledFormatter] =
Set(BasicIsoDate, IsoLocalDate, IsoLocalTime, IsoOrdinalDate, IsoWeekDate, Iso8601, IsoInstant, LbxYear, LbxMonth, LbxWeek, LbxDay, LbxHour, LbxMinute, LbxSecond)
Expand All @@ -134,7 +133,7 @@ object Formats {
private val labelToFormatterMap = AllFormats.map(formatter => (formatter.label, formatter)).toMap

/**
* Gets a [[com.amadeus.bdp.api.functions.time.Formats.LabeledFormatter LabeledFormatter]] from its label.
* Gets a LabeledFormatter from its label.
*
* @param label the label
* @return an [[scala.Option Option]] containing the formatter or [[scala.None None]] if there is no formatter for the given label
Expand Down
Original file line number Diff line number Diff line change
@@ -1,39 +1,75 @@
package com.amadeus.dataio.core.transformers

import com.amadeus.dataio.config.fields.DateFilterConfig
import com.amadeus.dataio.core.Logging
import com.amadeus.dataio.core.time.DateRange
import org.apache.spark.sql.{Column, Dataset}

import java.sql.Date
import java.time.LocalTime
import java.time.{LocalDate, LocalTime}

trait DateFilterer extends Logging {
val dateRange: Option[DateRange]
val dateFilterConfig: Option[DateFilterConfig]
val dateColumn: Option[Column]

def applyDateFilter[T](ds: Dataset[T]): Dataset[T] = {
(dateRange, dateColumn) match {
case (Some(range), Some(column)) =>
val dateFrom: Date = Date.valueOf(range.from.toLocalDate)

// if we have a time after midnight, include the right extreme of the DateRange
val dateUntil =
if (range.until.toLocalTime.isAfter(LocalTime.of(0, 0))) Date.valueOf(range.until.plusDays(1).toLocalDate)
else Date.valueOf(range.until.toLocalDate)

logger.info(s"date_filter: $column >= $dateFrom and $column < $dateUntil")
ds.filter(column >= dateFrom and column < dateUntil)
case (Some(range), None) => throw new Exception("date_filter requires a date column")
case (None, Some(column)) => throw new Exception("date_filter requires a date range")
(dateFilterConfig, dateColumn) match {
case (Some(config), Some(column)) =>
applyFilter(ds, column, config)
case (Some(_), None) =>
throw new Exception("date_filter requires a date column")
case (None, Some(_)) =>
throw new Exception("date_filter requires a date configuration")
case (_, _) =>
ds
}
}

private def applyFilter[T](ds: Dataset[T], column: Column, filterConfig: DateFilterConfig): Dataset[T] = {
filterConfig match {
case DateFilterConfig.Range(range) =>
applyRangeFilter(ds, column, range)

case DateFilterConfig.FromOnly(fromStr) =>
applyFromFilter(ds, column, fromStr)

case DateFilterConfig.UntilOnly(untilStr) =>
applyUntilFilter(ds, column, untilStr)
}
}

private def applyRangeFilter[T](ds: Dataset[T], column: Column, range: DateRange): Dataset[T] = {
val dateFrom = Date.valueOf(range.from.toLocalDate)

val dateUntil =
if (range.until.toLocalTime.isAfter(LocalTime.of(0, 0)))
Date.valueOf(range.until.plusDays(1).toLocalDate)
else
Date.valueOf(range.until.toLocalDate)

logger.info(s"date_filter: $column >= $dateFrom AND $column < $dateUntil")
ds.filter(column >= dateFrom && column < dateUntil)
}

private def applyFromFilter[T](ds: Dataset[T], column: Column, dateFrom: LocalDate): Dataset[T] = {
val from = Date.valueOf(dateFrom)

logger.info(s"date_filter: $column >= $from")
ds.filter(column >= from)
}

private def applyUntilFilter[T](ds: Dataset[T], column: Column, dateUntil: LocalDate): Dataset[T] = {
val until = Date.valueOf(dateUntil)

logger.info(s"date_filter: $column < $until")
ds.filter(column < until)
}
}

object DateFilterer {
def apply[T](range: Option[DateRange], column: Option[Column]): Dataset[T] => Dataset[T] = {
def apply[T](filterConfig: Option[DateFilterConfig], column: Option[Column]): Dataset[T] => Dataset[T] = {
new DateFilterer {
override val dateRange: Option[DateRange] = range
override val dateFilterConfig: Option[DateFilterConfig] = filterConfig
override val dateColumn: Option[Column] = column
}.applyDateFilter
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.amadeus.dataio.pipes.spark.batch

import com.amadeus.dataio.core.time.DateRange
import com.amadeus.dataio.config.fields.DateFilterConfig
import com.amadeus.dataio.core.transformers.{Coalescer, DateFilterer, Repartitioner}
import com.amadeus.dataio.core.{Input, Logging, SchemaRegistry}
import com.amadeus.dataio.pipes.spark.{SparkPathSource, SparkSource, SparkSourceConfigurator, SparkTableSource}
Expand All @@ -16,16 +16,16 @@ import scala.util.Try
* @param config Contains the Typesafe Config object that was used at instantiation to configure this entity.
*/
case class SparkInput(
name: String,
source: Option[SparkSource] = None,
options: Map[String, String] = Map(),
dateRange: Option[DateRange] = None,
dateColumn: Option[Column] = None,
repartitionExprs: Option[String] = None,
repartitionNum: Option[Int] = None,
coalesce: Option[Int] = None,
schema: Option[String] = None,
config: Config = ConfigFactory.empty()
name: String,
source: Option[SparkSource] = None,
options: Map[String, String] = Map(),
dateFilterConfig: Option[DateFilterConfig] = None,
dateColumn: Option[Column] = None,
repartitionExprs: Option[String] = None,
repartitionNum: Option[Int] = None,
coalesce: Option[Int] = None,
schema: Option[String] = None,
config: Config = ConfigFactory.empty()
) extends Input
with DateFilterer
with Repartitioner
Expand Down Expand Up @@ -86,7 +86,7 @@ object SparkInput extends SparkSourceConfigurator {
val source = getSparkSource

val options = getOptions
val dateRange = getDateFilterRange
val dateFilterConfig = getDateFilterConfig
val dateColumn = getDateFilterColumn
val repartitionExprs = getRepartitionExprs
val repartitionNum = getRepartitionNum
Expand All @@ -97,7 +97,7 @@ object SparkInput extends SparkSourceConfigurator {
name,
source,
options,
dateRange,
dateFilterConfig,
dateColumn,
repartitionExprs,
repartitionNum,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.amadeus.dataio.pipes.spark.streaming

import com.amadeus.dataio.core.time.DateRange
import com.amadeus.dataio.config.fields.DateFilterConfig
import com.amadeus.dataio.core.transformers.{Coalescer, DateFilterer, Repartitioner}
import com.amadeus.dataio.core.{Input, Logging, SchemaRegistry}
import com.amadeus.dataio.pipes.spark.{SparkPathSource, SparkSource, SparkTableSource}
Expand All @@ -17,16 +17,16 @@ import scala.util.Try
* @param config Contains the Typesafe Config object that was used at instantiation to configure this entity.
*/
case class SparkInput(
name: String,
source: Option[SparkSource] = None,
options: Map[String, String] = Map(),
dateRange: Option[DateRange] = None,
dateColumn: Option[Column] = None,
repartitionExprs: Option[String] = None,
repartitionNum: Option[Int] = None,
coalesce: Option[Int] = None,
schema: Option[String] = None,
config: Config = ConfigFactory.empty()
name: String,
source: Option[SparkSource] = None,
options: Map[String, String] = Map(),
dateFilterConfig: Option[DateFilterConfig] = None,
dateColumn: Option[Column] = None,
repartitionExprs: Option[String] = None,
repartitionNum: Option[Int] = None,
coalesce: Option[Int] = None,
schema: Option[String] = None,
config: Config = ConfigFactory.empty()
) extends Input
with Repartitioner
with Coalescer
Expand Down Expand Up @@ -87,7 +87,7 @@ object SparkInput {
val source = getSparkSource

val options = getOptions
val dateRange = getDateFilterRange
val dateFilterConfig = getDateFilterConfig
val dateColumn = getDateFilterColumn
val repartitionExprs = getRepartitionExprs
val repartitionNum = getRepartitionNum
Expand All @@ -98,7 +98,7 @@ object SparkInput {
name,
source,
options,
dateRange,
dateFilterConfig,
dateColumn,
repartitionExprs,
repartitionNum,
Expand Down
Loading