Skip to content

Commit f1a3b78

Browse files
authored
restore parking_copy_ringgo_sftp_data_to_raw (#2280)
1 parent f5e5a67 commit f1a3b78

1 file changed

Lines changed: 79 additions & 0 deletions

File tree

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import sys
2+
3+
import boto3
4+
from awsglue.context import GlueContext
5+
from awsglue.job import Job
6+
from awsglue.transforms import *
7+
from awsglue.utils import getResolvedOptions
8+
from pyspark.context import SparkContext
9+
from pyspark.sql import SparkSession
10+
from pyspark.sql.functions import col, concat, lit, when
11+
from pyspark.sql.types import IntegerType
12+
13+
from scripts.helpers.helpers import (
14+
PARTITION_KEYS,
15+
add_import_time_columns,
16+
clean_column_names,
17+
get_glue_env_var,
18+
get_latest_partitions,
19+
get_s3_subfolders,
20+
)
21+
22+
s3_client = boto3.client("s3")
23+
sc = SparkContext.getOrCreate()
24+
spark = SparkSession.builder.getOrCreate()
25+
glue_context = GlueContext(sc)
26+
27+
logger = glue_context.get_logger()
28+
29+
30+
def data_source_landing_to_raw(bucket_source, bucket_target, s3_prefix):
31+
logger.info("bucket_target" + bucket_target)
32+
logger.info("s3_prefix" + s3_prefix)
33+
data_source = spark.read.option("header", "true").csv(
34+
bucket_source + "/" + s3_prefix
35+
)
36+
latest_data = get_latest_partitions(data_source)
37+
logger.info(f"latest_data: {latest_data}")
38+
39+
logger.info(f"Retrieved data source from s3 path {bucket_source}/{s3_prefix}")
40+
41+
data_frame = clean_column_names(latest_data)
42+
logger.info("Using Columns: " + str(data_frame.columns))
43+
44+
date_partition_formatted = (
45+
data_frame.withColumn("import_month", col("import_month").cast(IntegerType()))
46+
.withColumn("import_day", col("import_day").cast(IntegerType()))
47+
.withColumn(
48+
"import_month",
49+
when(
50+
col("import_month") < 10, concat(lit("0"), col("import_month"))
51+
).otherwise(col("import_month")),
52+
)
53+
.withColumn(
54+
"import_day",
55+
when(col("import_day") < 10, concat(lit("0"), col("import_day"))).otherwise(
56+
col("import_day")
57+
),
58+
)
59+
)
60+
61+
date_partition_formatted.write.mode("append").partitionBy(*PARTITION_KEYS).parquet(
62+
bucket_target + "/" + s3_prefix
63+
)
64+
65+
66+
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
67+
68+
job = Job(glue_context)
69+
job.init(args["JOB_NAME"], args)
70+
71+
s3_bucket_target = get_glue_env_var("s3_bucket_target", "")
72+
s3_prefix = get_glue_env_var("s3_prefix", "")
73+
s3_bucket_source = get_glue_env_var("s3_bucket_source", "")
74+
75+
data_source_landing_to_raw(
76+
"s3://" + s3_bucket_source, "s3://" + s3_bucket_target, s3_prefix
77+
)
78+
79+
job.commit()

0 commit comments

Comments
 (0)