diff --git a/labs/01_ingestion_with_glue/README.md b/labs/01_ingestion_with_glue/README.md index 8fd5b3f..49ab045 100644 --- a/labs/01_ingestion_with_glue/README.md +++ b/labs/01_ingestion_with_glue/README.md @@ -8,12 +8,15 @@ - [Configure Permissions](#configure-permissions) - [Creating a Policy for Amazon S3 Bucket (Console)](#creating-a-policy-for-amazon-s3-bucket-console) - [Creating a Role for AWS Service Glue (Console)](#creating-a-role-for-aws-service-glue-console) - - [Creating a Development Endpoint and Notebook - Step 1](#creating-a-development-endpoint-and-notebook---step-1) - [Create data catalog from S3 files](#create-data-catalog-from-s3-files) - [Transform the data to Parquet format](#transform-the-data-to-parquet-format) + - [Adding a source from the catalog](#adding-a-source-from-the-catalog) + - [Adding transforms](#adding-transforms) + - [Storing the results](#storing-the-results) + - [Running the job](#running-the-job) + - [Monitoring the job](#monitoring-the-job) - [Add a crawler for curated data](#add-a-crawler-for-curated-data) - [Schema Validation](#schema-validation) - - [Creating a Development Endpoint and Notebook - Step 2](#creating-a-development-endpoint-and-notebook---step-2) In this Lab we will create a schema from your data optimized for analytics and place the result in an S3 bucket-based data lake. @@ -108,30 +111,6 @@ In this lab we will: NOTE: “AWSGlueServiceRole” is an AWS Managed Policy to provide Glue with needed permissions to access S3 data. However, you still need to allow access to your specific S3 bucket for Glue by attaching “BYOD-S3Policy” created policy. -## Creating a Development Endpoint and Notebook - Step 1 - -> Development endpoint and notebook will be used in Lab 5 of this workshop. Since it takes a bit of time to create the resources, we are doing it now so that they will be ready when we need them. - -In AWS Glue, you can create an environment — known as a development endpoint — that you can use to iteratively develop and test your extract, transform, and load (ETL) scripts. - -You can then create a notebook that connects to the endpoint, and use your notebook to author and test your ETL script. When you're satisfied with the results of your development process, you can create an ETL job that runs your script. With this process, you can add functions and debug your scripts in an interactive manner. - -It is also possible to connect your local IDE to this endpoint, which is explained here: [Tutorial: Set Up PyCharm Professional with a Development Endpoint](https://docs.aws.amazon.com/glue/latest/dg/dev-endpoint-tutorial-pycharm.html) - -How to create an endpoint and use it from a notebook: - -Go to Glue in the console https://console.aws.amazon.com/glue/ -1. On the left menu, click in Dev endpoints and **Add endpoint**. -2. Development endpoint name: `byod` -3. IAM role: **glue-processor-role** -4. Click **Next** -5. Select Skip networking information -6. Click **Next** -7. Click **Next** \- No need to Add SSH public key for now -8. Click **Finish** - -It will take a while to create the endpoint. - ## Create data catalog from S3 files We will be using AWS Glue Crawlers to infer the schema of the files and create data catalog. Without a crawler, you can still read data from the S3 by a Glue job, but it will not be able to determine data types (string, int, etc) for each column. @@ -161,70 +140,106 @@ In the following section, we will create one job per each file to transform the We will place this data under the folder named "_curated_" in the data lake. -- In the Glue Console select the **Jobs** section in the left navigation panel' -- Click on the _Add job_ button; -- specify a name (preferably **TABLE-NAME-1-job**) in the name field, then select the _"glue-processor-role"_; -- select Type: **Spark** -- make sure Glue version 2 is selected: "Spark 2.4, Python 3 with improved job startup times (Glue Version 2.0)" (If you want to read more about version 2: [Glue version 2 announced](https://aws.amazon.com/blogs/aws/aws-glue-version-2-0-featuring-10x-faster-job-start-times-and-1-minute-minimum-billing-duration/)) -- select the option "_A new script to be authored by you_"; -- Provide a script name (preferably **TABLE-NAME-1-job-script.py**) -- Tick the checkbox for "_Job Metrics_", under **Monitoring Options** and DO NOT hit **Next** yet; -- Under "Security configuration, script libraries, and job parameters (optional)", check that **Worker type** is "Standard" and **Number of workers** is "10". This determines the worker type and the number of processing units to be used for the job. Higher numbers result in faster processing times but may incur higher costs. This should be determined according to data size, data type etc. (further info can be found in [Glue documentation](https://docs.aws.amazon.com/glue/latest/dg/add-job.html).) - hit **Next** -- click **Next**, then **Save job and edit script**. You will be redirected to script editor. -- Paste the following code to the editor. **DONT FORGET TO PUT IN YOUR INPUT AND OUTPUT FOLDER LOCATIONS.** +- In the Glue Console select **AWS Glue Studio** +- On the AWS Glue Studio home page, **Create and manage jobs** -This step needs to be done per each file you have. +![create and manage jobs](./img/ingestion/aws-glue-studio-2.jpg) -```python -import sys -import datetime -import re -from awsglue.transforms import * -from awsglue.utils import getResolvedOptions -from pyspark.context import SparkContext -from awsglue.context import GlueContext -from awsglue.job import Job -glueContext = GlueContext(SparkContext.getOrCreate()) -job = Job(glueContext) - -## DONT FORGET TO PUT IN YOUR INPUT AND OUTPUT LOCATIONS BELOW. -your_database_name = "YOUR-DATABASE-NAME" -your_table_name = "YOUR-TABLE-NAME" -output_location = "s3://YOUR-BUCKET-NAME/curated/TABLE-NAME" - -job.init("byod-workshop" + str(datetime.datetime.now().timestamp())) - -#load our data from the catalog that we created with a crawler -dynamicF = glueContext.create_dynamic_frame.from_catalog( - database = your_database_name, - table_name = your_table_name, - transformation_ctx = "dynamicF") - -# invalid characters in column names are replaced by _ -df = dynamicF.toDF() -def canonical(x): return re.sub("[ ,;{}()\n\t=]+", '_', x.lower()) -renamed_cols = [canonical(c) for c in df.columns] -df = df.toDF(*renamed_cols) - -# write our dataframe in parquet format to an output s3 bucket -df.write.mode("overwrite").format("parquet").save(output_location) - -job.commit() +- AWS Glue Studio supports different sources, including Amazon S3, Amazon RDS, Amazon Kinesis and Apache Kafka. For the transformation you will use one AWS table as the data source and one S3 bucket as the destination. + +- In the **Create Job** section, select **Source and target added to the graph**. Make sure **S3** is configured as the both the **Source** and **Target** then click **Create**. + +![create job](./img/ingestion/aws-glue-studio-3.png) +This takes you to the Visual Canvas to create an AWS Glue job. You should already see the canvas prepopulated with a basic diagram. +- Change the **Job name** from **Untitled job** to the desired name (preferably **TABLE-NAME-1-job**) +![rename job](./img/ingestion/aws-glue-studio-4.png) + +### Adding a source from the catalog +1. Select the **Data source - S3** bucket node. +2. On the **Data source properties - S3** tab, choose the relevant Database and table. Leave the partition predicate field empty. + + ![add source](./img/ingestion/aws-glue-studio-5.png) + +### Adding transforms + +A transform is the AWS Glue Studio component were the data is modified. You have the option of using different transforms that are part of this service or custom code. + +1. One **ApplyMapping** transform has automatically been added for you. Click it to modify it. +2. On the **transform** tab, change the data types for specific columns to the desired values. You can also choose to rename columns. +3. Drop the columns that you will not require downstream. + +![rename columns](./img/ingestion/aws-glue-studio-6.png) + +Now we will add a second custom transform to the data source, where we are replacing invalid characters that you may have in your column headers. Spark doesn't accept certain characters in field names including spaces, so it is better to fix this before we send the data down stream. +1. Click the first **ApplyMapping** transform node. +2. Click the **(+)** icon. + +![](./img/ingestion/aws-glue-studio-7.png) + +3. On the Node properties tab, for Name enter **Column Header Cleaner**. +4. For Node type, choose **Custom transform** + +![](./img/ingestion/aws-glue-studio-77.png) + +5. On the Transform tab for Code block, change the function name from MyTransform to **ColumnHeaderCleaner** +6. Enter the following code under the function body: +``` + import re + def canonical(x): return re.sub("[ ,;{}()\n\t=]+", '_', x.lower()) + + # select the first collection from the DynamicFrameCollection + selected = dfc.select(list(dfc.keys())[0]).toDF() + + renamed_cols = [canonical(c) for c in selected.columns] + cleaned_df = DynamicFrame.fromDF(selected.toDF(*renamed_cols), glueContext,"cleaned_df") + return DynamicFrameCollection({"cleaned_df": cleaned_df}, glueContext) ``` +![](./img/ingestion/aws-glue-studio-8.png) + +After adding the custom transformation to the AWS Glue job, you want to store the result of the aggregation in the S3 bucket. To do this, you need a Select from collection transform to read the output from the **Column Header Cleaner** node and send it to the destination. + +7. Choose the **New node** node. +8. Leave the **Transform** tab with the default values. +9. On the **Node Properties** tab, change the name of the transform to **Select Aggregated Data**. +10. Leave everything else with the default values. + +### Storing the results +1. Select the **Data target - S3 bucket** node +2. Under **Node properties**, change the Node parent to be the **Select Aggregated Data** Transform +![](./img/ingestion/aws-glue-studio-99.png) + +3. Under **Data target properties - S3**, select **Parquet** as format and the compression type to be **GZIP**. Select the curated location as the **S3 target location**. -Notice that we have a section in this script where we are replacing invalid characters that you may have in your column headers. Spark doesn't accept certain characters in field names including spaces. +![](./img/ingestion/aws-glue-studio-9.png) -Click \* **Save** and **Run Job** +If you followed this guide closely, your final schematic should look similar to the one below: +![](./img/ingestion/aws-glue-studio-10.png) -![add a glue job](./img/ingestion/glue-job3.png) +### Running the job -Check the status of the job by selecting the job and go to history tab in the lower panel. In order to continue we need to wait until this job is done, this can take around 5 minutes (and up to 10 minutes to start), depending on the size of your dataset. +1. Under Job details, select the _"glue-processor-role"_ as the IAM Role +2. Select Type: **Spark** +3. Make sure Glue version 2 is selected: "Glue 2.0 - Supports spark 2.4, Scala 2, Python 3" (If you want to read more about version 2: [Glue version 2 announced](https://aws.amazon.com/blogs/aws/aws-glue-version-2-0-featuring-10x-faster-job-start-times-and-1-minute-minimum-billing-duration/)) +4. Check that **G.1x** is selected as the worker type and that **Worker type** and that **Number of workers** is "10". This determines the worker type and the number of processing units to be used for the job. Higher numbers result in faster processing times but may incur higher costs. This should be determined according to data size, data type etc. (further info can be found in [Glue documentation](https://docs.aws.amazon.com/glue/latest/dg/add-job.html).) -![add a glue job](./img/ingestion/seejob.png) +![](./img/ingestion/aws-glue-studio-11.png) -To make sure the job transformed the data, go to S3, you should see a new sub-folder called curated with data on it. +5. Click **Save** and **Run Job** -Now, remember to repeat this last step per each file you had originally. +### Monitoring the job +AWS Glue Studio offers a job monitoring dashboard that provides comprehensive information about your jobs. You can get job statistics and see detailed info about the job and the job status when running. + +In the AWS Glue Studio navigation panel, choose Monitoring. +Choose the entry with the job name you have configured above. +To get more details about the job run, choose View run details. + +![](./img/ingestion/aws-glue-studio-13.jpg) + +Wait until **Run Status** changes to **Succeeded**. This can take up to several minutes, depending on the size of your dataset. + +![](./img/ingestion/aws-glue-studio-12.png) + +**NOTE: Now, remember to repeat this create job step for each file you had originally.** ## Add a crawler for curated data @@ -260,15 +275,6 @@ NOTE: If you have any "id" column as integer, please make sure type is set to "d - Click Save. -## Creating a Development Endpoint and Notebook - Step 2 - -1. In the glue console, Go to Notebooks, click Create notebook -2. Notebook name: aws-glue-`byod` -3. Attach to development: choose the endpoint created some steps back -4. Create a new IAM Role. -5. **Create notebook** - - Now go to lab 2 : [Orchestration](../02_orchestration/README.md) diff --git a/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-10.png b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-10.png new file mode 100644 index 0000000..d9fb124 Binary files /dev/null and b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-10.png differ diff --git a/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-11.png b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-11.png new file mode 100644 index 0000000..cf07795 Binary files /dev/null and b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-11.png differ diff --git a/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-12.png b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-12.png new file mode 100644 index 0000000..451b237 Binary files /dev/null and b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-12.png differ diff --git a/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-13.jpg b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-13.jpg new file mode 100644 index 0000000..b9730f1 Binary files /dev/null and b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-13.jpg differ diff --git a/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-2.jpg b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-2.jpg new file mode 100644 index 0000000..21374a0 Binary files /dev/null and b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-2.jpg differ diff --git a/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-3.png b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-3.png new file mode 100644 index 0000000..c60892c Binary files /dev/null and b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-3.png differ diff --git a/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-4.png b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-4.png new file mode 100644 index 0000000..5d2c0b3 Binary files /dev/null and b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-4.png differ diff --git a/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-5.png b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-5.png new file mode 100644 index 0000000..0019140 Binary files /dev/null and b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-5.png differ diff --git a/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-6.png b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-6.png new file mode 100644 index 0000000..ed2e59e Binary files /dev/null and b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-6.png differ diff --git a/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-7.png b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-7.png new file mode 100644 index 0000000..139a752 Binary files /dev/null and b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-7.png differ diff --git a/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-77.png b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-77.png new file mode 100644 index 0000000..96cb7db Binary files /dev/null and b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-77.png differ diff --git a/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-8.png b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-8.png new file mode 100644 index 0000000..9045b32 Binary files /dev/null and b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-8.png differ diff --git a/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-9.png b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-9.png new file mode 100644 index 0000000..0bf5615 Binary files /dev/null and b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-9.png differ diff --git a/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-99.png b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-99.png new file mode 100644 index 0000000..ccbb14c Binary files /dev/null and b/labs/01_ingestion_with_glue/img/ingestion/aws-glue-studio-99.png differ diff --git a/labs/01_ingestion_with_glue/img/ingestion/seejob.png b/labs/01_ingestion_with_glue/img/ingestion/seejob.png deleted file mode 100644 index 2b8812d..0000000 Binary files a/labs/01_ingestion_with_glue/img/ingestion/seejob.png and /dev/null differ diff --git a/labs/05_transformations/README.md b/labs/05_transformations/README.md index 3ff5474..7591ac4 100644 --- a/labs/05_transformations/README.md +++ b/labs/05_transformations/README.md @@ -2,127 +2,89 @@ # Lab 05 - Transformations -- [Open the notebook](#open-the-notebook) -- [Transformations](#transformations) - - [Drop Columns](#drop-columns) - - [Example NY Taxis dataset](#example-ny-taxis-dataset) - - [Convert to Time stamp](#convert-to-time-stamp) - - [Example NY Taxis dataset](#example-ny-taxis-dataset-1) -- [Partitioning](#partitioning) -- [Run this in a Glue Job](#run-this-in-a-glue-job) -- [Other time formats](#other-time-formats) +- [Lab 05 - Transformations](#lab-05---transformations) + - [Accessing Glue](#accessing-glue) + - [Transformations](#transformations) + - [Altering the Columns](#altering-the-columns) + - [Convert to Time stamp](#convert-to-time-stamp) + - [Example NY Taxis dataset](#example-ny-taxis-dataset) + - [Partitioning](#partitioning) + - [Other time formats](#other-time-formats) -Now we are going to start cleaning, transforming, aggregating and partitioning data. For development and debugging purposes, we are going to use the Developer Endpoint and Notebook we created some steps back. +Below are some considerations for cleaning, transforming, aggregating and partitioning data. For development and debugging purposes Glue Studio, just like we did a few steps back. -## Open the notebook +## Accessing Glue -Click in the Notebooks and Open the Notebook created. This will launch Jupyter Notebook. Go to New -> Sparkmagic (PySpark) +- In the Glue Console select **AWS Glue Studio** +- On the AWS Glue Studio home page, **Create and manage jobs** -A brand new notebook will be opened. We will be adding and running code blocks one by one, to make it easier to understand operations step by step and we will be able to find errors faster. It should look something like this: +![](./img/aws-glue-studio-2.jpg) -![notebook](./img/notebook.png) +- Select **Blank graph** and click **Create** -1. Make sure that the notebook is running pyspark -2. This is the plus button for adding new lines - In this image I have added two lines of code -3. Once you add a line of code, then click Run -4. Once it has run, then you should see a number here +![](./img/aws-glue-studio-1.png) -Click plus [2] - We will start by importing all the libraries we need +Create a new node by clicking **(+)** and select its type to be **Data source - S3**. -``` python -import sys -import datetime -from awsglue.transforms import * -from awsglue.utils import getResolvedOptions -from pyspark.context import SparkContext -from awsglue.context import GlueContext -from awsglue.job import Job - -from pyspark.sql.functions import lit -from awsglue.dynamicframe import DynamicFrame +![](./img/aws-glue-studio-3.png) -glueContext = GlueContext(SparkContext.getOrCreate()) -job = Job(glueContext) -job.init("byod-workshop-" + str(datetime.datetime.now().timestamp())) - -``` -Then click Run - -**Dynamic Frame vs Spark/ Data frames** -One of the major abstractions in Apache Spark is the SparkSQL DataFrame, which is similar to the DataFrame construct found in R and Pandas. A DataFrame is similar to a table and supports functional-style (map/reduce/filter/etc.) operations and SQL operations (select, project, aggregate). - -DataFrames are powerful and widely used, but they have limitations with respect to extract, transform, and load (ETL) operations. Most significantly, they require a schema to be specified before any data is loaded. To address these limitations, AWS Glue introduces the DynamicFrame. A DynamicFrame is similar to a DataFrame, except that each record is self-describing, so no schema is required initially. Instead, AWS Glue computes a schema on-the-fly when required, and explicitly encodes schema inconsistencies using a choice (or union) type. - -It is possible to convert a DataFrame to a DynamicFrame and vice versa with ```toDF()``` and ```fromDF()``` methods. - -We are going to use the data we transformed to parquet in previous steps. For that, we create a dynamic frame pointing to the database and table that our crawler inferred, then we are going to show the schema - -If you do not remember the database/table names, just go to Databases/ Table tab in Glue and copy its names. - -Click plus [2] and add the following code in a separate line - -``` python -dynamicF = glueContext.create_dynamic_frame.from_catalog(database="DATABASE_NAME", table_name="TABLE_NAME") -dynamicF.printSchema() -``` -Then click Run +Point it to the desired source data. ## Transformations You probably have a large number of columns and some of them can have complicated names. To analyze the data, perhaps we may not need all the columns, just a small set of them, and to make easier to recall, we may want to change the name of the columns. Therefore, we are going to select only the columns we are interested in, drop the rest of them and we are going to rename them. -### Drop Columns +### Altering the Columns -There are two different ways to drop columns +There are two different ways to drop columns from your data: -1. You use the select_fields method to drop all the columns and keep just the ones you need +1. You use the **SelectFields** method to drop all the columns and keep just the ones you need -``` python -dynamicF= dynamicF.select_fields(['COLUMN1_TO_KEEP','COLUMN2_TO_KEEP']).rename_field('COLUMN1_TO_RENAME', 'NEW_COLUMN_NAME').rename_field('COLUMN2_TO_RENAME', 'NEW_COLUMN_NAME') -dynamicF.printSchema() -``` +![](./img/aws-glue-studio-select-fields.png) -2. You use the drop_fields method to keep all the columns and just drop the ones you do not need. +2. You use the **DropFields** node to keep all the columns and just drop the ones you do not need. -``` python -dynamicF = dynamicF.drop_fields(['COLUMN1_TO_DROP','COLUMN2_TO_DROP']).rename_field('COLUMN1_TO_RENAME', 'NEW_COLUMN_NAME').rename_field('COLUMN2_TO_RENAME', 'NEW_COLUMN_NAME') -dynamicF.printSchema() -``` - -For the rename part, we are using the ```rename_field()``` method. This should be invoked for each column you want to rename +![](./img/aws-glue-studio-drop-fields.png) +You can also use the **RenameField** node to rename columns. This should be invoked for each column you want to rename. -#### Example NY Taxis dataset - -``` python -dynamicF = dynamicF.select_fields(['tpep_pickup_datetime','trip_distance']).rename_field('tpep_pickup_datetime', 'pickup_datetime') -dynamicF.printSchema() -``` +![](./img/aws-glue-studio-rename-field.png) ### Convert to Time stamp Please check the datetime column schema, from the previous step. It may be string or another type different than what we may need it to be. Therefore, we are going to do some transformations. +We will achieve this using a custom transform. +1. Click the first last node in your schema. +2. Click the **(+)** icon. +3. On the Node properties tab, for Name enter **Date Converter**. +4. For Node type, choose **Custom transform** +5. On the Transform tab for Code block, change the function name from MyTransform to **DateConverter** +6. Enter the following code under the function body: + First, let's add the libraries we need to make this conversion: -``` python -from pyspark.sql.functions import date_format -from pyspark.sql.functions import to_date -from pyspark.sql.types import DateType -from pyspark.sql.functions import year, month, dayofmonth, date_format +``` python + from pyspark.sql.functions import date_format + from pyspark.sql.functions import to_date + from pyspark.sql.types import DateType + from pyspark.sql.functions import year, month, dayofmonth, date_format ``` + Then, depending on the format of our current field, we may want to convert it into another format that contains year and month only. This will allow us later to partition our data according to year and month easily. Select which line of code you will use according to your date type format. -First, we need to change the format from dynamic frame to dataframe (If you do not remember the difference between dynamic frame and data frame, you can read again the explanation above). This will allow us to use some the libraries previously imported: +**Dynamic Frame vs Spark/ Data frames** One of the major abstractions in Apache Spark is the SparkSQL DataFrame, which is similar to the DataFrame construct found in R and Pandas. A DataFrame is similar to a table and supports functional-style (map/reduce/filter/etc.) operations and SQL operations (select, project, aggregate). +DataFrames are powerful and widely used, but they have limitations with respect to extract, transform, and load (ETL) operations. Most significantly, they require a schema to be specified before any data is loaded. To address these limitations, AWS Glue introduces the DynamicFrame. A DynamicFrame is similar to a DataFrame, except that each record is self-describing, so no schema is required initially. Instead, AWS Glue computes a schema on-the-fly when required, and explicitly encodes schema inconsistencies using a choice (or union) type. + +It is possible to convert a DataFrame to a DynamicFrame and vice versa with toDF() and fromDF() methods. +First, we need to change the format from dynamic frame collection to dataframe. This will allow us to use some the libraries previously imported. ``` python -df = dynamicF.toDF() -df.show() +df = dfc.select(list(dfc.keys())[0]).toDF() ``` - **ISO 8601 TIMESTAMP** Below is example code that can be used to do the conversion from ISO 8601 date format. Please substitute your own date-format in place of yyyy-MM-dd @@ -142,11 +104,21 @@ df = df.withColumn('pickup_datetime', to_date("pickup_datetime", "yyyy-MM-dd")). df.show() ``` +Finally, convert the DataFrame back to a Dynamic Frame and return the required Dynamic Frame Collection: +``` python + dyf = DynamicFrame.fromDF(df, glueContext,"dyf") + return DynamicFrameCollection({"dyf": dyf}, glueContext) +``` + +If you followed along with this guide, your custom code should look similar to the following: + +![](./img/aws-glue-studio-to-date.png) + ## Partitioning Partitioning the data greatly increases the performance of your queries and reduce costs. For example, if you only need last month's data from a large dataset, if the data is partitioned by day, month and year, then you can use a "where" clause in your query and Athena will only use relevant folders and will not scan the unnecessary ones. -In order to partition our data, we will first create extra columns for year, month and day. Then when writing the data into S3, we will partition it by year, month and day. So, our final folder structure will be like: +In order to partition the data, you can add one or multiple partition keys when writing the data to S3 using the output node in Glue Studio. You can try creating separate columns for year, month and day to use as partitions. Then, the final folder structure will be like: ``` /curated/TABLE-NAME-1/year=YYYY/month=MM/day=DD/file1.parquet @@ -154,52 +126,13 @@ In order to partition our data, we will first create extra columns for year, mon You can also add additional partitions if you know you will often use those fields to filter data. For example, if you will often filter your data on product types, you can add a column for that field and also partition by that column additionally. -Add this code at the end of your script: +Add this code to your custom script, before converting the data frame back into a dynamic frame: ```python df = df.withColumn('year', year(df.trx_date)).withColumn('month', month(df.trx_date)).withColumn('day', dayofmonth(df.trx_date)) - -df.show() - -``` - -See that there are three extra fields for year, month and day - If you want, you can also drop the "trx_date" column using ```df.drop('trx_date')``` - Please note that since we are using data frame instead of dynamic frame, we can not use the same method *drop_fields* introduced earlier. - -## Run this in a Glue Job - - -Please add these lines to the end of your notebook - -``` python -## DONT FORGET TO PUT IN YOUR BUCKET NAME. -output_location = "s3://YOUR-BUCKET/curated" - -df.write.mode("overwrite").partitionBy("year","month","day").parquet(output_location) - -job.commit() -``` - -Now, let's export our job and move it into a glue job. - -![exporting notebook to glue](./img/notebook-to-glue.png) - -1. Click File -2. Download as > Pyspark (txt) - -Please open the txt file, **Remove any line containing**: -```python -.show() -.printSchema() ``` -and copy it. In the AWS Glue Console (https://console.aws.amazon.com/glue/), click on **Jobs**, and **Add Job** -- Name: `byod-data-transformation` -- IAM Role: glue-processor-role -- This job runs: A new script to be authored by you -- Monitoring - Job metrics -- Connections - Save job and edit script -- Now, paste the txt downloaded from the notebook -- Save and Run +See that there are three extra fields for year, month and day - If you want, you can also drop the original "trx_date" column using ```df.drop('trx_date')```. Alternatively, once you convert the data frame back to a dynamic frame, you can use a *Drop Fields* node transformation as introduced earlier. ## Other time formats @@ -214,7 +147,7 @@ df = df.withColumn('trx_date', date_format(from_unixtime(df['{YOUR_DATE_COL_NAME **OTHER DATE FORMATS** -To convert unique data formats, we use to_date() function to specify how to parse your value specifying date literals in second attribute (Look at resources section for more information). +To convert unique data formats, we use to_date() function to specify how to parse your value specifying date literals in second attribute. You can read the reference for date literals here: https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html. ``` python ## Adding trx_date date column with yyyy-MM-dd format converting a current timestamp/unix date format diff --git a/labs/05_transformations/img/Screenshot 2020-03-12 at 15.42.04 copy.png b/labs/05_transformations/img/Screenshot 2020-03-12 at 15.42.04 copy.png deleted file mode 100644 index badc059..0000000 Binary files a/labs/05_transformations/img/Screenshot 2020-03-12 at 15.42.04 copy.png and /dev/null differ diff --git a/labs/05_transformations/img/Screenshot 2020-03-12 at 15.42.04.png b/labs/05_transformations/img/Screenshot 2020-03-12 at 15.42.04.png deleted file mode 100644 index badc059..0000000 Binary files a/labs/05_transformations/img/Screenshot 2020-03-12 at 15.42.04.png and /dev/null differ diff --git a/labs/05_transformations/img/aws-glue-studio-1.png b/labs/05_transformations/img/aws-glue-studio-1.png new file mode 100644 index 0000000..7021937 Binary files /dev/null and b/labs/05_transformations/img/aws-glue-studio-1.png differ diff --git a/labs/05_transformations/img/aws-glue-studio-2.jpg b/labs/05_transformations/img/aws-glue-studio-2.jpg new file mode 100644 index 0000000..21374a0 Binary files /dev/null and b/labs/05_transformations/img/aws-glue-studio-2.jpg differ diff --git a/labs/05_transformations/img/aws-glue-studio-3.png b/labs/05_transformations/img/aws-glue-studio-3.png new file mode 100644 index 0000000..7f4e658 Binary files /dev/null and b/labs/05_transformations/img/aws-glue-studio-3.png differ diff --git a/labs/05_transformations/img/aws-glue-studio-drop-fields.png b/labs/05_transformations/img/aws-glue-studio-drop-fields.png new file mode 100644 index 0000000..b8b809c Binary files /dev/null and b/labs/05_transformations/img/aws-glue-studio-drop-fields.png differ diff --git a/labs/05_transformations/img/aws-glue-studio-rename-field.png b/labs/05_transformations/img/aws-glue-studio-rename-field.png new file mode 100644 index 0000000..ab7ef33 Binary files /dev/null and b/labs/05_transformations/img/aws-glue-studio-rename-field.png differ diff --git a/labs/05_transformations/img/aws-glue-studio-select-fields.png b/labs/05_transformations/img/aws-glue-studio-select-fields.png new file mode 100644 index 0000000..68242fd Binary files /dev/null and b/labs/05_transformations/img/aws-glue-studio-select-fields.png differ diff --git a/labs/05_transformations/img/aws-glue-studio-to-date.png b/labs/05_transformations/img/aws-glue-studio-to-date.png new file mode 100644 index 0000000..61479e4 Binary files /dev/null and b/labs/05_transformations/img/aws-glue-studio-to-date.png differ diff --git a/labs/05_transformations/img/notebook-to-glue.png b/labs/05_transformations/img/notebook-to-glue.png deleted file mode 100644 index 640f011..0000000 Binary files a/labs/05_transformations/img/notebook-to-glue.png and /dev/null differ diff --git a/labs/05_transformations/img/notebook.png b/labs/05_transformations/img/notebook.png deleted file mode 100644 index 3e91f73..0000000 Binary files a/labs/05_transformations/img/notebook.png and /dev/null differ diff --git a/labs/99_Wrap_up_and_clean/README.md b/labs/99_Wrap_up_and_clean/README.md index 7956630..d8df942 100644 --- a/labs/99_Wrap_up_and_clean/README.md +++ b/labs/99_Wrap_up_and_clean/README.md @@ -2,34 +2,15 @@ # Conclusion Lab : Wrap up and cleaning existing resources -- [Terminate your resources](#terminate-your-resources) -- [Clean other resources:](#clean-other-resources) +- [Conclusion Lab : Wrap up and cleaning existing resources](#conclusion-lab--wrap-up-and-cleaning-existing-resources) - [Glue Crawlers and Jobs](#glue-crawlers-and-jobs) - [Quicksight](#quicksight) Before wrapping up, we strongly recommend you go through these steps to clean the recurrent resources incurred by the workshop. -Most of the charges incurred follows a pay as you go model, but some resources incur charges unless you stop them: +Most of the charges incurred follows a pay as you go model, but some resources incur charges unless you stop them. -- Glue development endpoints incur charges until you stop them. cf. (cf. [Glue detailed pricing](https://aws.amazon.com/glue/pricing/)) -- Quicksight is priced per user/month (cf. [Quicksight pricing](https://aws.amazon.com/quicksight/pricing/)) - - -## Terminate your resources - -As the Glue development endpoint and the notebook will incur charges, let's stop them : - -* Go to [Glue](https://console.aws.amazon.com/glue/home?region=us-east-1#etl:tab=devEndpoints) - * Select the endpoint - Action -> Delete - -* Go to the [notebook](https://console.aws.amazon.com/glue/home?region=us-east-1#etl:tab=notebooks) - * Select the notebook - Action -> Stop, then Delete - -## Clean other resources: - -Other resources that potentially incur charges may need to be cleaned: - -### Glue Crawlers and Jobs +## Glue Crawlers and Jobs Make sure you haven't put any recurrence on jobs and triggers: @@ -39,7 +20,9 @@ Make sure you haven't put any recurrence on jobs and triggers: * Now click on **Triggers** * check you have no triggers with a schedule type -### Quicksight +## Quicksight + +Quicksight is priced per user/month (cf. [Quicksight pricing](https://aws.amazon.com/quicksight/pricing/)) If you want to cancel your subscription to Quicksight you can follow these procedure: (https://docs.aws.amazon.com/quicksight/latest/user/closing-account.html)