Skip to content

Commit c5738bf

Browse files
committed
refactor SnowflakeInput and SnowflakeOutput to include required name field; update configuration examples to use lowercase keys
1 parent 4f37944 commit c5738bf

11 files changed

Lines changed: 50 additions & 495 deletions

File tree

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,4 +179,4 @@ lazy val root = (project in file("."))
179179
name := "dataio",
180180
publish / skip := true
181181
)
182-
.aggregate(core, test, kafka)
182+
.aggregate(core, test, kafka, snowflake)

docs/content/_data/config/pipes/snowflake/batch.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ input:
55
output:
66
type: com.amadeus.dataio.pipes.snowflake.batch.SnowflakeOutput
77
fields:
8-
- name: Mode
8+
- name: mode
99
mandatory: Yes
1010
description: Writing mode on the Snowflake table
11-
example: Mode = "append"
11+
example: mode = "append"
1212

docs/content/_data/config/pipes/snowflake/common.yaml

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ links:
88
url: https://docs.snowflake.com/en/user-guide/spark-connector-use#using-the-connector-in-scala
99

1010
fields:
11-
- name: Options
11+
- name: options
1212
description: Snowflake options to specify such as key = value pairs. These options are then passed as option to the Spark connector for Snowflake
13-
example: Options { Database = "example_database"
14-
Schema = "example_schema"
15-
...
16-
}
13+
example: options {"sfDatabase" = "my_db" }

docs/content/_data/config/pipes/snowflake/streaming.yaml

Lines changed: 0 additions & 21 deletions
This file was deleted.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package com.amadeus.dataio.pipes.snowflake
2+
3+
object SnowflakeCommons {
4+
val SNOWFLAKE_CONNECTOR_NAME = "net.snowflake.spark.snowflake"
5+
}

snowflake/src/main/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeInput.scala

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,46 +2,46 @@ package com.amadeus.dataio.pipes.snowflake.batch
22

33
import com.amadeus.dataio.core.{Input, Logging}
44
import com.amadeus.dataio.config.fields._
5+
import com.amadeus.dataio.pipes.snowflake.SnowflakeCommons.SNOWFLAKE_CONNECTOR_NAME
56
import com.typesafe.config.{Config, ConfigFactory}
67
import org.apache.spark.sql.{DataFrame, SparkSession}
78

8-
/**
9-
* Class for reading Snowflake input
9+
import scala.util.Try
10+
11+
/** Class for reading Snowflake input.
1012
*
11-
* @param options the snowflake connector options.
1213
* @param config Contains the Typesafe Config object that was used at instantiation to configure this entity.
1314
*/
1415
case class SnowflakeInput(
15-
options: Map[String, String],
16+
name: String,
17+
options: Map[String, String] = Map(),
1618
config: Config = ConfigFactory.empty()
1719
) extends Input
1820
with Logging {
19-
20-
val SNOWFLAKE_CONNECTOR_NAME = "net.snowflake.spark.snowflake"
21-
22-
/**
23-
* Reads a batch of data from snowflake.
24-
*
25-
* @param spark The SparkSession which will be used to read the data.
26-
* @return The data that was read.
27-
* @throws Exception If the exactly one of the dateRange/dateColumn fields is None.
28-
*/
2921
override def read(implicit spark: SparkSession): DataFrame = {
22+
logger.info(s"reading: $name")
23+
if (options.nonEmpty) logger.info(s"options: $options")
24+
3025
spark.read.format(SNOWFLAKE_CONNECTOR_NAME).options(options).load()
3126
}
3227

3328
}
3429

3530
object SnowflakeInput {
3631

37-
/**
38-
* Creates a new instance of SnowflakeInput from a typesafe Config object.
32+
/** Creates a new instance of SnowflakeInput from a typesafe Config object.
3933
*
4034
* @param config typesafe Config object containing the configuration fields.
4135
* @return a new SnowflakeInput object.
4236
* @throws com.typesafe.config.ConfigException If any of the mandatory fields is not available in the config argument.
4337
*/
4438
def apply(implicit config: Config): SnowflakeInput = {
45-
SnowflakeInput(options = getOptions, config = config)
39+
val name = Try {
40+
config.getString("name")
41+
} getOrElse {
42+
throw new Exception("Missing required `name` field in configuration.")
43+
}
44+
45+
SnowflakeInput(name, options = getOptions, config = config)
4646
}
4747
}

snowflake/src/main/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeOutput.scala

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,28 @@ package com.amadeus.dataio.pipes.snowflake.batch
22

33
import com.amadeus.dataio.config.fields._
44
import com.amadeus.dataio.core.{Logging, Output}
5+
import com.amadeus.dataio.pipes.snowflake.SnowflakeCommons.SNOWFLAKE_CONNECTOR_NAME
56
import com.typesafe.config.{Config, ConfigFactory}
67
import org.apache.spark.sql.{Dataset, SparkSession}
78

8-
/**
9-
* Allows to write data to Snowflake.
9+
import scala.util.Try
10+
11+
/** Allows to write data to Snowflake.
1012
*
11-
* @param mode the mode to use.
12-
* @param options the snowflake connector options.
1313
* @param config the config object.
1414
*/
1515
case class SnowflakeOutput(
16+
name: String,
1617
mode: String,
17-
options: Map[String, String],
18+
options: Map[String, String] = Map(),
1819
config: Config = ConfigFactory.empty()
1920
) extends Output
2021
with Logging {
2122

22-
val SNOWFLAKE_CONNECTOR_NAME = "net.snowflake.spark.snowflake"
23-
24-
/**
25-
* Writes data to this output.
26-
*
27-
* @param data The data to write.
28-
* @param spark The SparkSession which will be used to write the data.
29-
*/
3023
override def write[T](data: Dataset[T])(implicit spark: SparkSession): Unit = {
24+
logger.info(s"reading: $name")
25+
if (options.nonEmpty) logger.info(s"options: $options")
26+
logger.info(s"mode: $mode")
3127

3228
data.write
3329
.format(SNOWFLAKE_CONNECTOR_NAME)
@@ -39,18 +35,26 @@ case class SnowflakeOutput(
3935

4036
object SnowflakeOutput {
4137

42-
/**
43-
* Creates a new instance of SnowflakeOutput from a typesafe Config object.
38+
/** Creates a new instance of SnowflakeOutput from a typesafe Config object.
4439
*
4540
* @param config typesafe Config object containing the configuration fields.
4641
* @return a new SnowflakeOutput object.
4742
* @throws com.typesafe.config.ConfigException If any of the mandatory fields is not available in the config argument.
4843
*/
4944
def apply(implicit config: Config): SnowflakeOutput = {
50-
51-
val mode = config.getString("Mode")
52-
53-
SnowflakeOutput(mode = mode, options = getOptions, config = config)
45+
val name = Try {
46+
config.getString("name")
47+
} getOrElse {
48+
throw new Exception("Missing required `name` field in configuration.")
49+
}
50+
51+
val mode = Try {
52+
config.getString("mode")
53+
} getOrElse {
54+
throw new Exception("Missing required `mode` field in configuration.")
55+
}
56+
57+
SnowflakeOutput(name, mode = mode, options = getOptions, config = config)
5458
}
5559

5660
}

snowflake/src/main/scala/com/amadeus/dataio/pipes/snowflake/streaming/SnowflakeOutput.scala

Lines changed: 0 additions & 137 deletions
This file was deleted.

snowflake/src/test/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeInputTest.scala

Lines changed: 0 additions & 58 deletions
This file was deleted.

0 commit comments

Comments
 (0)