Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Storage/ETLS/AveragePurchaseValueDailyETL.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import sqlite3
import pandas as pd

def load_customer_invoices_count_etl_increment():
def incremental_load_average_purchase_value_etl():
# Step 1: Initialize Spark session
spark = SparkSession.builder \
.appName("ETL - Average Purchase Value Over Time by Customer Type") \
Expand Down Expand Up @@ -99,4 +99,4 @@ def load_customer_invoices_count_etl_increment():
spark.stop()

if __name__ == "__main__":
load_customer_invoices_count_etl_increment()
incremental_load_average_purchase_value_etl()
4 changes: 2 additions & 2 deletions Storage/ETLS/AveragePurchaseValueWeeklyETL.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# import KT_DB # Assuming KT_DB is the library for SQLite operations


def load_customer_invoices_count_etl():
def load_average_purchase_value_etl():
# Step 1: Initialize Spark session
spark = SparkSession.builder \
.appName("ETL - Average Purchase Value Over Time by Customer Type") \
Expand Down Expand Up @@ -66,4 +66,4 @@ def load_customer_invoices_count_etl():


if __name__ == "__main__":
load_customer_invoices_count_etl()
load_average_purchase_value_etl()
16 changes: 16 additions & 0 deletions Storage/Orchestration/ELTWeeklyProcess.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from datetime import datetime
from ..ELTS.CustomerPurchaseFrequencyTotalSpendWeeklyELT import load_elt
from ..ELTS.topSellingArtistsWeeklyELT import load_and_transform_data
from ..ELTS.employeeSalePerformanceCustomerInteractionsWeeklyELT import load_employees_sales_customer_interactions_elt
from ..ELTS.CustomerAverageSpendWeeklyELT import load_average_purchase_value_elt
# from ELTS import X

# Define your Python functions here
Expand All @@ -24,6 +26,12 @@ def run_customer_purchase_frequency_total_spend():
def run_top_sell_artists():
load_and_transform_data()

def run_employees_sales_customer_interactions():
load_employees_sales_customer_interactions_elt()

def run_customer_invoices_count():
load_average_purchase_value_elt()

# Define default arguments for the DAG
default_args = {
'owner': 'airflow',
Expand Down Expand Up @@ -75,6 +83,14 @@ def run_top_sell_artists():
task_id='run_top_sell_artists',
python_callable=run_top_sell_artists,
)
task_employees_sales_customer_interactions(
task_id='run_employees_sales_customer_interactions',
python_callable=run_employees_sales_customer_interactions,
)
task_average_purchase_value_elt(
task_id='run_customer_invoices_count_etl',
python_callable=run_customer_invoices_count,
)
# Define dependencies
task_1 >> task_2
task_3 >> task_4
Expand Down