Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
"private_key = PRIVATE_KEY.encode('utf-8')\n",
"\n",
"table_to_read = \"\"\n",
"request_uri = f'https://hackney-planning.tascomi.com/rest/v1/{table_to_read}'\n",
"request_uri = f'https://hackney-planning.idoxcloud.com/rest/v1/{table_to_read}'\n",
"request_method = 'GET'"
]
},
Expand Down
100 changes: 64 additions & 36 deletions scripts/jobs/planning/tascomi_api_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,37 @@
from pyspark.sql.functions import udf, col, explode, lit
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from pyspark.sql import Row
import hmac, hashlib;
import base64;
import hmac
import hashlib
import base64
from datetime import datetime, timedelta
from dateutil import tz
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
from math import ceil
from scripts.helpers.helpers import get_glue_env_var, get_secret, add_import_time_columns, PARTITION_KEYS, table_exists_in_catalog
from scripts.helpers.helpers import get_glue_env_var, get_secret, add_import_time_columns, PARTITION_KEYS, \
table_exists_in_catalog
import json
from distutils.util import strtobool


def authenticate_tascomi(headers, public_key, private_key):
auth_hash = calculate_auth_hash(public_key, private_key)
headers['X-Public'] = public_key
headers['X-Hash'] = auth_hash
return headers


def not_today(date_str):
if not date_str:
return True
date = datetime.strptime(date_str[:19], "%Y-%m-%d %H:%M:%S")
return date.date() != datetime.now().date()


def get_tascomi_resource(page_number, url, body):
global public_key
global private_key
Expand All @@ -46,20 +50,23 @@ def get_tascomi_resource(page_number, url, body):
res = {}
try:
res = requests.get(url, data=body, headers=headers)
if not res.text or json.loads(res.text) == None:
if not res.text or json.loads(res.text) is None:
print(f"Null data response, with status code {res.status_code} for page {page_number}")
return ([""], url, res.status_code, "Null data response.")
records = json.loads(res.text)

serialized_records = [json.dumps(remove_gis_image(record)) for record in records if not_today(record['last_updated']) ]
serialized_records = [json.dumps(remove_gis_image(record)) for record in records if
not_today(record['last_updated'])]

return (serialized_records, url, res.status_code, "")

except Exception as e:
exception = str(e)
print(f"ERROR: {exception} when getting page {page_number}. Status code {res.status_code}, response text {res.text}")
print(
f"ERROR: {exception} when getting page {page_number}. Status code {res.status_code}, response text {res.text}")
return ([""], url, res.status_code, exception)


def calculate_auth_hash(public_key, private_key):
tz_ldn = tz.gettz('Europe/London')
now = datetime.now(tz_ldn)
Expand All @@ -68,7 +75,8 @@ def calculate_auth_hash(public_key, private_key):
token = crypt.hexdigest().encode('utf-8')
return base64.b64encode(hmac.new(private_key.encode('utf-8'), token, hashlib.sha256).hexdigest().encode('utf-8'))

def get_number_of_pages(resource, query = ""):

def get_number_of_pages(resource, query=""):
global public_key
global private_key

Expand All @@ -79,45 +87,53 @@ def get_number_of_pages(resource, query = ""):

headers = authenticate_tascomi(headers, public_key, private_key)

url = f'https://hackney-planning.tascomi.com/rest/v1/{resource}{query}'
url = f'https://hackney-planning.idoxcloud.com/rest/v1/{resource}{query}'
res = requests.get(url, data="", headers=headers)
if res.status_code == 202:
logger.info(f"received status code 202, whilst getting number of pages for {resource}, with query {query}")
return { 'success': True, 'number_of_pages': 0 }
return {'success': True, 'number_of_pages': 0}
if res.status_code == 200:
number_of_results = res.headers['X-Number-Of-Results']
results_per_page = res.headers['X-Results-Per-Page']

return { 'success': True, 'number_of_pages': ceil(int(number_of_results) / int(results_per_page)) }
return {'success': True, 'number_of_pages': ceil(int(number_of_results) / int(results_per_page))}
error_message = f"Recieved status code {res.status_code} whilst trying to get number of pages for {resource}, {query}"
logger.info(error_message)
return { 'success': False, 'error_message': error_message }
return {'success': False, 'error_message': error_message}


def get_days_since_last_import(last_import_date):
yesterday = datetime.now()
last_import_datetime = datetime.strptime(last_import_date, "%Y%m%d")
number_days_to_query = (yesterday - last_import_datetime).days
days = [ datetime.strftime(yesterday - timedelta(days=day), "%Y-%m-%d") for day in range(1, number_days_to_query + 1)]
days = [datetime.strftime(yesterday - timedelta(days=day), "%Y-%m-%d") for day in
range(1, number_days_to_query + 1)]
days.sort()
return days

def get_last_import_date(glue_context, database, resource):

def get_last_import_date(glue_context, database, resource):
if not table_exists_in_catalog(glue_context, f"api_response_{resource}", database):
logger.info(f"Couldn't find table api_response_{resource} in database {database}.")
return None

logger.info(f"found table for {resource} api response in {database}")
return glue_context.sql(f"SELECT max(import_date) as max_import_date FROM `{database}`.api_response_{resource} where import_api_status_code = '200'").take(1)[0].max_import_date
return glue_context.sql(
f"SELECT max(import_date) as max_import_date FROM `{database}`.api_response_{resource} where import_api_status_code = '200'").take(
1)[0].max_import_date


def throw_if_unsuccessful(success_state, message):
if not success_state:
raise Exception(message)


def get_failures_from_last_import(database, resource, last_import_date):
requests_df = glue_context.sql(f"SELECT page_number, import_api_url_requested as url, '' as body from `{database}`.api_response_{resource} where import_api_status_code != '200' and import_date={last_import_date}")
return { "requests": requests_df, "count": requests_df.count() }

requests_df = glue_context.sql(
f"SELECT page_number, import_api_url_requested as url, '' as body from `{database}`.api_response_{resource} where import_api_status_code != '200' and import_date={last_import_date}")
return {"requests": requests_df, "count": requests_df.count()}


def get_requests_since_last_import(resource, last_import_date):
requests_list = []
for day in get_days_since_last_import(last_import_date):
Expand All @@ -127,13 +143,16 @@ def get_requests_since_last_import(resource, last_import_date):

number_of_pages = number_of_pages_reponse["number_of_pages"]
logger.info(f"Number of pages to retrieve for {day}: {number_of_pages}")
requests_list += [ RequestRow(page_number, f'https://hackney-planning.tascomi.com/rest/v1/{resource}?page={page_number}&last_updated={day}', "") for page_number in range(1, number_of_pages + 1)]
requests_list += [RequestRow(page_number,
f'https://hackney-planning.idoxcloud.com/rest/v1/{resource}?page={page_number}&last_updated={day}',
"") for page_number in range(1, number_of_pages + 1)]
number_of_requests = len(requests_list)
if number_of_requests == 0:
return { "requests": [], "count": 0 }
return {"requests": [], "count": 0}
requests_list = sc.parallelize(requests_list)
requests_list = glue_context.createDataFrame(requests_list)
return { "requests": requests_list, "count": number_of_requests }
return {"requests": requests_list, "count": number_of_requests}


def get_requests_for_full_load(resource):
number_of_pages_reponse = get_number_of_pages(resource)
Expand All @@ -142,22 +161,25 @@ def get_requests_for_full_load(resource):

number_of_pages = number_of_pages_reponse["number_of_pages"]
logger.info(f"Number of pages to retrieve: {number_of_pages}")
requests_list = [RequestRow(page_number, f'https://hackney-planning.tascomi.com/rest/v1/{resource}?page={page_number}', "") for page_number in range(1, number_of_pages + 1)]
requests_list = [
RequestRow(page_number, f'https://hackney-planning.idoxcloud.com/rest/v1/{resource}?page={page_number}', "") for
page_number in range(1, number_of_pages + 1)]
number_of_requests = len(requests_list)
requests_list = sc.parallelize(requests_list)
requests_list = glue_context.createDataFrame(requests_list)
return { "requests": requests_list, "count": number_of_requests }
return {"requests": requests_list, "count": number_of_requests}


def get_requests(last_import_date, resource, database):
retry_arg_value = get_glue_env_var('retry_failure_from_previous_import', 'false')
try:
retry_failures = strtobool(retry_arg_value)
except ValueError:
raise Exception(f"--retry_failure_from_previous_import value must be recognised as a bool, received: {retry_arg_value}.")
raise Exception(
f"--retry_failure_from_previous_import value must be recognised as a bool, received: {retry_arg_value}.")

if not last_import_date:
logger.info(f"Retrieving full load of data")
logger.info("Retrieving full load of data")
return get_requests_for_full_load(resource)
if retry_failures:
logger.info(f"Getting failed requests from import on date {last_import_date}")
Expand All @@ -175,20 +197,22 @@ def calculate_number_of_partitions(number_of_requests, number_of_workers):
else:
return max_partitions


def remove_gis_image(records):
records.pop("gis_map_image_base64", None)
return records


def retrieve_and_write_tascomi_data(glue_context, s3_target_url, resource, requests_list, partitions):
request_df = requests_list.repartition(partitions)
response_df = request_df.withColumn("response", get_tascomi_resource_udf(col("page_number"), col("url"), col("body")))
response_df = request_df.withColumn("response",
get_tascomi_resource_udf(col("page_number"), col("url"), col("body")))

tascomi_responses_df = response_df.select( \
col("page_number"),
explode(col("response.response_data")).alias(f"{resource}"), \
col("response.import_api_url_requested").alias("import_api_url_requested"), \
col("response.import_api_status_code").alias("import_api_status_code"), \
col("response.import_exception_thrown").alias("import_exception_thrown"))
tascomi_responses_df = response_df.select(col("page_number"),
explode(col("response.response_data")).alias(f"{resource}"),
col("response.import_api_url_requested").alias("import_api_url_requested"),
col("response.import_api_status_code").alias("import_api_status_code"),
col("response.import_exception_thrown").alias("import_exception_thrown"))

tascomi_responses_df = add_import_time_columns(tascomi_responses_df)

Expand All @@ -201,10 +225,13 @@ def retrieve_and_write_tascomi_data(glue_context, s3_target_url, resource, reque

return tascomi_responses_df


def get_failed_requests(data_frame):
return data_frame\
.where((data_frame.import_api_status_code != '200') & (data_frame.import_api_status_code != '202'))\
.select(data_frame.page_number.alias("page_number"), data_frame.import_api_url_requested.alias("url"), lit("").alias("body"))
return data_frame \
.where((data_frame.import_api_status_code != '200') & (data_frame.import_api_status_code != '202')) \
.select(data_frame.page_number.alias("page_number"), data_frame.import_api_url_requested.alias("url"),
lit("").alias("body"))


if __name__ == "__main__":
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
Expand Down Expand Up @@ -247,7 +274,8 @@ def get_failed_requests(data_frame):
partitions = calculate_number_of_partitions(requests_list["count"], number_of_workers)
logger.info(f"Using {partitions} partitions to repartition the RDD.")

tascomi_responses = retrieve_and_write_tascomi_data(glue_context, s3_target_url, resource, requests_list["requests"], partitions)
tascomi_responses = retrieve_and_write_tascomi_data(glue_context, s3_target_url, resource,
requests_list["requests"], partitions)
else:
logger.info("No requests, exiting job")

Expand Down
102 changes: 75 additions & 27 deletions scripts/tests/planning/test_tascomi_parse_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,87 @@


class TestTascomiParsingRefinement:

def test_column_expansion(self, spark):
response = self.parse_json_into_dataframe(spark, 'contacts', [{'contacts': '{"id": "34607",'
' "creation_user_id": null,'
' "title_id": "4"}'}])
expected = ['id', 'creation_user_id', 'title_id',
'page_number', 'import_api_url_requested', 'import_api_status_code',
'import_exception_thrown', 'import_datetime', 'import_timestamp',
'import_year', 'import_month', 'import_day', 'import_date']
response = self.parse_json_into_dataframe(
spark,
"contacts",
[
{
"contacts": '{"id": "34607",'
' "creation_user_id": null,'
' "title_id": "4"}'
}
],
)
expected = [
"id",
"creation_user_id",
"title_id",
"page_number",
"import_api_url_requested",
"import_api_status_code",
"import_exception_thrown",
"import_datetime",
"import_timestamp",
"import_year",
"import_month",
"import_day",
"import_date",
]
TestCase().assertCountEqual(list(response[0]), expected)

def test_parsed_row_data(self, spark):
response = self.parse_json_into_dataframe(spark, 'contacts', [{'contacts': '{"id": "34607",'
' "creation_user_id": null,'
' "title_id": "4"}'}])
expected = {'id': '34607', 'creation_user_id': None, 'title_id': '4', 'page_number': 691,
'import_api_url_requested': 'https://hackney-planning.tascomi.com/rest/v1/contacts?page=691',
'import_api_status_code': 200, 'import_exception_thrown': '',
'import_datetime': datetime(2021, 9, 16, 13, 10), 'import_timestamp': '1631797859.247579',
'import_year': '2021', 'import_month': '09', 'import_day': '16',
'import_date': '20210916'}
response = self.parse_json_into_dataframe(
spark,
"contacts",
[
{
"contacts": '{"id": "34607",'
' "creation_user_id": null,'
' "title_id": "4"}'
}
],
)
expected = {
"id": "34607",
"creation_user_id": None,
"title_id": "4",
"page_number": 691,
"import_api_url_requested": "https://hackney-planning.idoxcloud.com/rest/v1/contacts?page=691",
"import_api_status_code": 200,
"import_exception_thrown": "",
"import_datetime": datetime(2021, 9, 16, 13, 10),
"import_timestamp": "1631797859.247579",
"import_year": "2021",
"import_month": "09",
"import_day": "16",
"import_date": "20210916",
}
assertions.dictionaryContains(response[0], expected)

def parse_json_into_dataframe(self, spark, column, data):
data_with_imports = [{'page_number': 691,
'import_api_url_requested': 'https://hackney-planning.tascomi.com/rest/v1/contacts?page=691',
'import_api_status_code': 200, 'import_exception_thrown': '',
'import_datetime': datetime(2021, 9, 16, 13, 10), 'import_timestamp': '1631797859.247579',
'import_year': '2021', 'import_month': '09', 'import_day': '16',
'import_date': '20210916', **i} for i in data]
data_with_imports = [
{
"page_number": 691,
"import_api_url_requested": "https://hackney-planning.idoxcloud.com/rest/v1/contacts?page=691",
"import_api_status_code": 200,
"import_exception_thrown": "",
"import_datetime": datetime(2021, 9, 16, 13, 10),
"import_timestamp": "1631797859.247579",
"import_year": "2021",
"import_month": "09",
"import_day": "16",
"import_date": "20210916",
**i,
}
for i in data
]
query_data = spark.createDataFrame(
spark.sparkContext.parallelize(
[Row(**i) for i in data_with_imports]
)
spark.sparkContext.parallelize([Row(**i) for i in data_with_imports])
)
return [row.asDict() for row in parse_json_into_dataframe(spark, column, query_data).rdd.collect()]
return [
row.asDict()
for row in parse_json_into_dataframe(
spark, column, query_data
).rdd.collect()
]
Loading