diff --git a/src/ingest/__main__.py b/src/ingest/__main__.py index 42ec784..820b888 100644 --- a/src/ingest/__main__.py +++ b/src/ingest/__main__.py @@ -8,13 +8,16 @@ # from ingest.tasks.aftermarket.snks_drops import SnksDropsIngest # from ingest.tasks.aftermarket.stadium_goods import StadiumGoodsIngest -# from ingest.tasks.aftermarket.stockx.sitemap import SitemapIngest -# from ingest.tasks.aftermarket.tsdb import TheSneakerDatabaseIngest -from ingest.tasks.fix import fix_duplicate_skus +from ingest.tasks.aftermarket.stockx.sitemap import execute as sitemap_ingest +from ingest.utils.sessions import init_sessions # from ingest.tasks.retail.nike import NikeIngest # from ingest.tasks.retail.puma import PumaIngest -# from ingest.tasks.useragents import UserAgentIngest +# from ingest.tasks.useragents import execute as useragent_ingest + +# from ingest.tasks.aftermarket.tsdb import TheSneakerDatabaseIngest +# from ingest.tasks.fix import fix_duplicate_skus + logging.config.dictConfig(LOGGING_CONFIG) logger = logging.getLogger(__name__) @@ -22,12 +25,12 @@ tasks = { # "snks_drops": SnksDropsIngest().execute, # "stadium_goods": StadiumGoodsIngest().execute, - # "stockx_sitemap": SitemapIngest().execute, + "stockx_sitemap": sitemap_ingest, # "tsdb": TheSneakerDatabaseIngest().execute, # "nike": NikeIngest().execute, # "puma": PumaIngest().execute, - # "useragents": UserAgentIngest.execute, - "dupes": fix_duplicate_skus, + # "useragents": useragent_ingest, + # "dupes": fix_duplicate_skus, } @@ -39,6 +42,7 @@ def parse_arguments() -> argparse.Namespace: async def main(): await init_db() + await init_sessions() args = parse_arguments() if args.tasks: to_run = args.tasks.split(",") @@ -46,11 +50,11 @@ async def main(): to_run = [] async with asyncio.TaskGroup() as tg: for task_name, task in tasks.items(): - if task_name not in to_run: + if task_name in to_run: + logger.info(f"Running {task_name}") + tg.create_task(task()) + else: logger.info(f"Skipping {task_name}") - continue - logger.info(f"Running {task_name}") - tg.create_task(task()) logger.info("All tasks completed!") diff --git a/src/ingest/db/instance.py b/src/ingest/db/instance.py index 8f828bb..e2952fa 100644 --- a/src/ingest/db/instance.py +++ b/src/ingest/db/instance.py @@ -2,10 +2,11 @@ import os from beanie import init_beanie -from motor.motor_asyncio import AsyncIOMotorClient - from core.models.details import SiteMapLink from core.models.shoes import Sneaker +from motor.motor_asyncio import AsyncIOMotorClient + +from ingest.models.misc import StockxToken, Useragent logger = logging.getLogger(__name__) @@ -25,6 +26,6 @@ async def init_db(): client = AsyncIOMotorClient(CONNECTION_STRING) await init_beanie( database=client.get_database(DATABASE_NAME), - document_models=[Sneaker, SiteMapLink], + document_models=[Sneaker, SiteMapLink, StockxToken, Useragent], ) logger.info("Connected to database: %s", DATABASE_NAME) diff --git a/src/ingest/models/json.py b/src/ingest/models/json.py index b6d921d..588be2c 100644 --- a/src/ingest/models/json.py +++ b/src/ingest/models/json.py @@ -6,11 +6,10 @@ import requests from ingest.config import ARCHIVE_DIR, DATA_DIR -from ingest.models.task import IngestInterface from ingest.utils.helpers import copy_and_mkdir -class Ingest(IngestInterface, ABC): +class Ingest(ABC): def __init__(self, brand: str, download_url: str = None) -> None: self.brand = brand self.download_url = download_url @@ -25,7 +24,7 @@ def __init__(self, brand: str, download_url: str = None) -> None: def download(self, headers: dict) -> None: """Downloads the brand's web page and writes it to the filesystem.""" - r = requests.get(self.download_url) + r = requests.get(self.download_url, headers=headers) if r.status_code == 200: with open(file=self.paths["html"], mode="w", encoding="utf-8") as html_file: html_file.write(r.text) diff --git a/src/ingest/models/misc.py b/src/ingest/models/misc.py index c920a6a..8423620 100644 --- a/src/ingest/models/misc.py +++ b/src/ingest/models/misc.py @@ -1,16 +1,17 @@ from beanie import Document +from pydantic import Field class StockxToken(Document): - type: str - token: str + id: str + value: str class Settings: - collection = "OAuth" + name = "OAuth" class Useragent(Document): useragent: str class Settings: - collection = "useragents" + name = "useragents" diff --git a/src/ingest/models/stockx_product_page.py b/src/ingest/models/stockx_product_page.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/ingest/models/task.py b/src/ingest/models/task.py deleted file mode 100644 index 6d46cd3..0000000 --- a/src/ingest/models/task.py +++ /dev/null @@ -1,11 +0,0 @@ -from abc import ABC, abstractmethod - - -class IngestInterface(ABC): - @abstractmethod - def execute(self) -> None: - """ - Performs all of the necessary steps to ingest data. - Executed asynchronously by the main thread. - """ - pass diff --git a/src/ingest/tasks/aftermarket/goat.py b/src/ingest/tasks/aftermarket/goat.py index 4dbcbca..dffc763 100644 --- a/src/ingest/tasks/aftermarket/goat.py +++ b/src/ingest/tasks/aftermarket/goat.py @@ -35,8 +35,8 @@ def __get_from_hidden_api(url, additional_params={}, additional_headers={}): headers={**default_headers, **additional_headers}, params={**default_params, **additional_params}, ) - json = response.json() - return json + response_json = response.json() + return response_json def get_trending_sneakers(): diff --git a/src/ingest/tasks/aftermarket/sneakercrush.py b/src/ingest/tasks/aftermarket/sneakercrush.py index d1cfd6f..4279100 100644 --- a/src/ingest/tasks/aftermarket/sneakercrush.py +++ b/src/ingest/tasks/aftermarket/sneakercrush.py @@ -1,57 +1,58 @@ from core.graphql.sc_operations import Operations from sgqlc.endpoint.requests import RequestsEndpoint -from ingest.db.instance import client -from ingest.models.task import IngestInterface +BATCH_SIZE = 1000 -class SneakerCrushIngest(IngestInterface): - def execute(self) -> None: - endpoint = RequestsEndpoint( - url="https://thesneakercrush.com/graphql", - base_headers={ - "x-secret": "xDqkUbTE3B6KvSlEbKva1xopqMr9BR9bbwcY4+uGuRvpRyWiA60UTN5YkTCulD2x" - }, - ) - op = Operations.query.get_sneakers - hasNextPage = True - p = 0 - while hasNextPage: - data = endpoint(op, {"sort": {"date": -1}, "perPage": 1000, "page": p}) - if data["data"]["ReleasePagination"]["items"]: - for item in data["data"]["ReleasePagination"]["items"]: - if item["_id"]: - item["sneakerCrushId"] = item["_id"] - del item["_id"] - client["sneakercrush-sneakers"].insert_many( - data["data"]["ReleasePagination"]["items"], - ) - print(f"Inserted 1000 sneakers!") - if data["data"]["ReleasePagination"]["pageInfo"]: - hasNextPage = data["data"]["ReleasePagination"]["pageInfo"][ - "hasNextPage" - ] - print(f"Page {p} done!") - else: - hasNextPage = False - p += 1 - print("Done") +async def execute() -> None: + endpoint = RequestsEndpoint( + url="https://thesneakercrush.com/graphql", + base_headers={ + "x-secret": "xDqkUbTE3B6KvSlEbKva1xopqMr9BR9bbwcY4+uGuRvpRyWiA60UTN5YkTCulD2x" + }, + ) + op = Operations.query.get_sneakers + hasNextPage = True + p = 0 + while hasNextPage: + data = endpoint(op, {"sort": {"date": -1}, "perPage": BATCH_SIZE, "page": p}) + if data["data"]["ReleasePagination"]["items"]: + for item in data["data"]["ReleasePagination"]["items"]: + if item["_id"]: + item["sneakerCrushId"] = item["_id"] + del item["_id"] + sneakers = [] + for release in data["data"]["ReleasePagination"]["items"]: + # TODO parse each release into a Sneaker document + # attempt some merge strategy + sneakers.append(Sneaker()) + if len(sneakers) >= BATCH_SIZE: + await Sneaker.insert_many(sneakers) + print(f"Inserted {BATCH_SIZE} sneakers!") + pass + + if data["data"]["ReleasePagination"]["pageInfo"]: + hasNextPage = data["data"]["ReleasePagination"]["pageInfo"]["hasNextPage"] + print(f"Page {p} done!") + else: + hasNextPage = False + p += 1 + print("Done") - def drop_dupes(self) -> None: - dupes = client["sneakercrush-sneakers"].aggregate( - [ - { - "$group": { - "_id": "$sneakerCrushId", - "uniqueIds": {"$addToSet": "$_id"}, - "count": {"$sum": 1}, - } - }, - {"$match": {"count": {"$gt": 1}}}, - ] - ) - for document in dupes: - document["uniqueIds"].pop(0) - client["sneakercrush-sneakers"].delete_many( - {"_id": {"$in": document["uniqueIds"]}} - ) + +def drop_dupes() -> None: + dupes = Sneaker.aggregate( + [ + { + "$group": { + "_id": "$sneakerCrushId", + "uniqueIds": {"$addToSet": "$_id"}, + "count": {"$sum": 1}, + } + }, + {"$match": {"count": {"$gt": 1}}}, + ] + ) + for document in dupes: + document["uniqueIds"].pop(0) + Sneakers.delete_many({"_id": {"$in": document["uniqueIds"]}}) diff --git a/src/ingest/tasks/aftermarket/snks_drops.py b/src/ingest/tasks/aftermarket/snks_drops.py index bca24f5..6d96e46 100644 --- a/src/ingest/tasks/aftermarket/snks_drops.py +++ b/src/ingest/tasks/aftermarket/snks_drops.py @@ -1,52 +1,51 @@ +import logging from datetime import UTC, datetime from core.models.details import Images, Prices from core.models.shoes import Sneaker -from ingest.models.task import IngestInterface + from ingest.utils.helpers import try_guess_brand from ingest.utils.sessions import session +logger = logging.getLogger(__name__) -class SnksDropsIngest(IngestInterface): - def ingest(self) -> None: - releases = session.get( - "https://snkr-rest-api-nfrpf.ondigitalocean.app/api/ios/releases" - ) - if releases.status_code != 200: - raise Exception("Failed to fetch releases from snkrs-drops") - else: - sneakers = [] - for release in releases.json()["items"]: - if not release["pid"] or release["pid"] == "TBD": - continue - release_date = datetime.strptime( - release["date"][:-1], "%Y-%m-%dT%H:%M:%S.%f" - ) + +async def execute() -> None: + releases = session.get( + "https://snkr-rest-api-nfrpf.ondigitalocean.app/api/ios/releases" + ) + if releases.status_code != 200: + raise Exception("Failed to fetch releases from snkrs-drops") + else: + sneakers = [] + for release in releases.json()["items"]: + if not release["pid"] or release["pid"] == "TBD": + continue + release_date = datetime.strptime( + release["date"][:-1], "%Y-%m-%dT%H:%M:%S.%f" + ) + price = None + try: + price = float(release["price"].replace("$", "")) + except ValueError: price = None - try: - price = float(release["price"].replace("$", "")) - except ValueError: - price = None - sneakers.append( - Sneaker( - brand=try_guess_brand(release["name"]), - sku=release["pid"], - name=release["name"], - colorway=None, - audience=None, - releaseDate=release_date, - images=Images( - original=release["images"][0], - alternateAngles=release["images"][1:], - ), - links=None, - prices=Prices(retail=price) if price else None, - sizes=None, - description=None, - dateAdded=datetime.now(UTC), - ) + sneakers.append( + Sneaker( + brand=try_guess_brand(release["name"]), + sku=release["pid"], + name=release["name"], + colorway=None, + audience=None, + releaseDate=release_date, + images=Images( + original=release["images"][0], + alternateAngles=release["images"][1:], + ), + links=None, + prices=Prices(retail=price) if price else None, + sizes=None, + description=None, + dateAdded=datetime.now(UTC), ) - Sneaker.insert_many(sneakers) - - def execute(self) -> None: - self.ingest() + ) + Sneaker.insert_many(sneakers) diff --git a/src/ingest/tasks/aftermarket/stadium_goods.py b/src/ingest/tasks/aftermarket/stadium_goods.py index 338269b..ba21fa4 100644 --- a/src/ingest/tasks/aftermarket/stadium_goods.py +++ b/src/ingest/tasks/aftermarket/stadium_goods.py @@ -8,103 +8,136 @@ from selenium.webdriver.common.proxy import Proxy, ProxyType from ingest.db.mutations import upsert_many -from ingest.models.task import IngestInterface from ingest.utils.helpers import extract_redux_state, try_guess_audience from ingest.utils.proxies import get_working_proxy +driver = webdriver.Chrome() +collection_name = "stadiumgoods" +logger = logging.getLogger(__name__) -class StadiumGoodsIngest(IngestInterface): - def __init__(self): - self.driver = webdriver.Chrome() - self.collection_name = "stadiumgoods" - self.logger = logging.getLogger(__name__) - def grab_redux_with_selenium(self, url: str) -> dict: - self.driver.get(url) - redux_state_script = self.driver.find_element( - By.CSS_SELECTOR, "body > script:nth-child(3)" - ) - return extract_redux_state(redux_state_script.get_attribute("innerHTML")) +def grab_redux_with_selenium(url: str) -> dict: + driver.get(url) + redux_state_script = driver.find_element( + By.CSS_SELECTOR, "body > script:nth-child(3)" + ) + return extract_redux_state(redux_state_script.get_attribute("innerHTML")) - def shopping_index(self, url: str) -> int: - page_data = self.grab_redux_with_selenium(url) - shoes = [] - for productId, product in page_data["entities"]["products"].items(): - brand = page_data["entities"]["brands"][str(product["brand"])]["name"] - audience = [try_guess_audience(product["genderName"])] - links = Links( - stadium_goods=f"https://www.stadiumgoods.com/shopping/{product['slug'].strip()}" - ) - prices = Prices(stadium_goods=product["prices"]) - images = None - angles = [ - list(angle["sources"].values())[-1] for angle in product["images"] - ] - images = Images(original=angles[0], alternateAngles=angles[1:]) - shoes.append( - StadiumGoods( - stadiumGoodsId=productId, - brand=brand, - name=product["shortDescription"], - audience=audience, - links=links, - prices=prices, - images=images, - ) + +def shopping_index(url: str) -> int: + page_data = grab_redux_with_selenium(url) + sneakers = [] + for productId, product in page_data["entities"]["products"].items(): + brand = page_data["entities"]["brands"][str(product["brand"])]["name"] + audience = [try_guess_audience(product["genderName"])] + links = Links( + stadium_goods=f"https://www.stadiumgoods.com/shopping/{product['slug'].strip()}" + ) + prices = Prices(stadium_goods=product["prices"]) + images = None + angles = [list(angle["sources"].values())[-1] for angle in product["images"]] + images = Images(original=angles[0], alternateAngles=angles[1:]) + sneakers.append( + Sneaker( + stadiumGoodsId=productId, + brand=brand, + name=product["shortDescription"], + audience=audience, + links=links, + prices=prices, + images=images, ) - upsert_many(shoes, self.collection_name) - return int( - list(page_data["entities"]["searchResults"].values())[0]["products"][ - "totalPages" - ] ) + upsert_many(sneakers, collection_name) + return int( + list(page_data["entities"]["searchResults"].values())[0]["products"][ + "totalPages" + ] + ) - def product_page(self, url: str, productId: str) -> StadiumGoods: - page_data = self.grab_redux_with_selenium(url) - product_data = page_data["entities"]["products"][productId] - sku = product_data["sku"].strip().replace(" ", "-") - description = product_data["description"] - colorway = next( - ( - x["color"]["name"] - for x in product_data["colors"] - if "DesignerColor" in x["tags"] - ), - None, - ) - prices = {} - sizes = [] - for variant in product_data["variants"]: - prices[variant["sizeDescription"]] = variant["price"][ - "includingTaxesWithoutDiscount" - ] - sizes.append( - float( - "".join( - i for i in variant["sizeDescription"] if i.isdigit() or i == "." - ) + +def product_page(url: str, productId: str) -> Sneaker: + page_data = grab_redux_with_selenium(url) + product_data = page_data["entities"]["products"][productId] + sku = product_data["sku"].strip().replace(" ", "-") + description = product_data["description"] + colorway = next( + ( + x["color"]["name"] + for x in product_data["colors"] + if "DesignerColor" in x["tags"] + ), + None, + ) + prices = {} + sizes = [] + for variant in product_data["variants"]: + prices[variant["sizeDescription"]] = variant["price"][ + "includingTaxesWithoutDiscount" + ] + sizes.append( + float( + "".join( + i for i in variant["sizeDescription"] if i.isdigit() or i == "." ) ) - sizes = list(set(sizes)) - return StadiumGoods( - stadiumGoodsId=productId, - sku=sku, - description=description, - colorway=colorway, - prices=Prices(stadium_goods=prices), - sizes=sizes, ) + sizes = list(set(sizes)) + return Sneaker( + stadiumGoodsId=productId, + sku=sku, + description=description, + colorway=colorway, + prices=Prices(stadium_goods=prices), + sizes=sizes, + ) + - def execute(self) -> None: - page = 1 - totalPages = 500 - while page <= totalPages: - url = f"https://www.stadiumgoods.com/en-us/shopping?categories=139499|139522|139515|186123|139557|139569|195318&pageindex={page}" +def execute(self) -> None: + page = 1 + totalPages = 500 + while page <= totalPages: + url = f"https://www.stadiumgoods.com/en-us/shopping?categories=139499|139522|139515|186123|139557|139569|195318&pageindex={page}" + try: + totalPages = shopping_index(url) + page += 1 + except NoSuchElementException: + driver.quit() + proxy_address = get_working_proxy() + proxy = Proxy( + { + "proxyType": ProxyType.MANUAL, + "httpProxy": proxy_address, + "sslProxy": proxy_address, + "noProxy": "localhost", + } + ) + driver = webdriver.Chrome(proxy=proxy) + except Exception as e: + logger.error(e) + + +def execute_product_pages() -> None: + updates = [] + queue = Sneaker.find({"$or": [{"sku": {"$exists": False}}, {"sku": None}]}) + for i in range(len(queue)): + product = queue[i] + logger.info( + f"Scraping Stadium Goods '{product['stadiumGoodsId']}' {i+1}/{len(queue)}" + ) + keep_trying = True + retry_count = 0 + while keep_trying and retry_count < 5: try: - totalPages = self.shopping_index(url) - page += 1 + update = product_page( + product["links"]["stadium_goods"], product["stadiumGoodsId"] + ) + updates.append(update) + logger.info("Done") + keep_trying = False except NoSuchElementException: - self.driver.quit() + retry_count += 1 + driver.quit() proxy_address = get_working_proxy() proxy = Proxy( { @@ -114,42 +147,8 @@ def execute(self) -> None: "noProxy": "localhost", } ) - self.driver = webdriver.Chrome(proxy=proxy) + driver = webdriver.Chrome(proxy=proxy) except Exception as e: - self.logger.error(e) - - def execute_product_pages(self) -> None: - updates = [] - queue = Sneaker.find({"$or": [{"sku": {"$exists": False}}, {"sku": None}]}) - for i in range(len(queue)): - product = queue[i] - self.logger.info( - f"Scraping Stadium Goods '{product['stadiumGoodsId']}' {i+1}/{len(queue)}" - ) - keep_trying = True - retry_count = 0 - while keep_trying and retry_count < 5: - try: - update = self.product_page( - product["links"]["stadium_goods"], product["stadiumGoodsId"] - ) - updates.append(update) - self.logger.info("Done") - keep_trying = False - except NoSuchElementException: - retry_count += 1 - self.driver.quit() - proxy_address = get_working_proxy() - proxy = Proxy( - { - "proxyType": ProxyType.MANUAL, - "httpProxy": proxy_address, - "sslProxy": proxy_address, - "noProxy": "localhost", - } - ) - self.driver = webdriver.Chrome(proxy=proxy) - except Exception as e: - self.logger.error(e) - keep_trying = False - upsert_many(updates, self.collection_name) + logger.error(e) + keep_trying = False + upsert_many(updates, collection_name) diff --git a/src/ingest/tasks/aftermarket/stockx/official_api.py b/src/ingest/tasks/aftermarket/stockx/official_api.py index 0d7c044..22a78a2 100644 --- a/src/ingest/tasks/aftermarket/stockx/official_api.py +++ b/src/ingest/tasks/aftermarket/stockx/official_api.py @@ -1,28 +1,80 @@ -from ingest.models.task import IngestInterface +from core.models.details import SiteMapLink +from core.models.shoes import Images, Links, Sneaker + +from ingest.utils.helpers import should_skip_link, try_guess_audience from ingest.utils.sessions import stockx_session +base_url = "https://api.stockx.com/v2" + + +def _api_get(slug: str): + return stockx_session.get(f"{base_url}/{slug.strip()}").json() + + +def get_single_product(product_id: str) -> dict: + return _api_get(f"catalog/products/{product_id.strip()}") + + +def search_products(query: str) -> dict: + return _api_get(f"search?query={query.strip()}") -class StockXOfficialApiIngest(IngestInterface): - def __init__(self): - self.base_url = "https://api.stockx.com/v2" - def get_single_product(self, product_id: str) -> dict: - return stockx_session.get( - f"{self.base_url}/catalog/products/{product_id.strip()}" - ).json() +def get_single_product_variant(product_id: str, variant_id: str) -> dict: + return _api_get( + f"catalog/products/{product_id.strip()}/product/{variant_id.strip()}" + ) - def search_products(self, query: str) -> dict: - return stockx_session.get( - f"{self.base_url}/search?query={query.strip()}" - ).json() - def get_single_product_variant(self, product_id: str, variant_id: str) -> dict: - return stockx_session.get( - f"{self.base_url}/catalog/products/{product_id.strip()}/product/{variant_id.strip()}" - ).json() +def get_multiple_products(query: str) -> dict: + raise NotImplementedError("This method is not yet implemented") - def get_multiple_products(self, query: str) -> dict: - return - def execute(self) -> None: - pass +async def execute() -> None: + found_otherwise = [] + async for link in SiteMapLink.aggregate( + [ + { + "$match": { + "scraped": False, + "ignored": {"$ne": True}, + "error": {"$ne": True}, + } + }, + {"$addFields": {"linkLength": {"$strLenCP": "$url"}}}, + {"$sort": {"linkLength": 1}}, + {"$project": {"linkLength": 0}}, + ], + projection_model=SiteMapLink, + ): + link.ignored = should_skip_link(link) + if link.ignored: + link.save() + else: + slug = link.url.split("/")[-1] + product_data = search_products(slug) + if product_data: + Sneaker.find_one( + name=product_data["Product"]["title"], + sku=product_data["Product"]["styleId"], + images=[ + Images(url=image) + for image in product_data["Product"]["media"]["imageUrl"] + ], + links=Links(stockx=link.url), + audience=try_guess_audience(product_data["Product"]["title"]), + ).upsert() + else: + found_otherwise.append(link) + if product_data: + Sneaker.find_one( + name=product_data["Product"]["title"], + sku=product_data["Product"]["styleId"], + images=[ + Images(url=image) + for image in product_data["Product"]["media"]["imageUrl"] + ], + links=Links(stockx=link.url), + audience=try_guess_audience(product_data["Product"]["title"]), + ).upsert() + link.scraped = True + link.save() diff --git a/src/ingest/tasks/aftermarket/stockx/sitemap.py b/src/ingest/tasks/aftermarket/stockx/sitemap.py index 31ac524..b3cbd4a 100644 --- a/src/ingest/tasks/aftermarket/stockx/sitemap.py +++ b/src/ingest/tasks/aftermarket/stockx/sitemap.py @@ -1,77 +1,64 @@ import logging import os import urllib.parse -from os import path +from datetime import UTC, datetime +from beanie import BulkWriter +from beanie.odm.operators.update.general import Set from bs4 import BeautifulSoup - from core.models.details import SiteMapLink -from ingest.config import DATA_DIR -from ingest.db.mutations import upsert_sitemap_links -from ingest.models.task import IngestInterface -from ingest.utils.helpers import create_and_write_file -from ingest.utils.sessions import session +from ingest.utils.sessions import session -class SitemapIngest(IngestInterface): - """ - A class for ingesting sitemaps. +# The URL of the sitemap "root", can recursively ingest from it if it contains sub-sitemaps. +url = "https://stockx.com/sitemap/sitemap-index.xml" +# A list of functions that take a URL and return a boolean, when a function returns True the link is excluded. +filters = [lambda x: x.startswith("https://stockx.com/search?s=")] +SCRAPFLY_API_KEY = os.environ.get("SCRAPFLY_API_KEY") +if not SCRAPFLY_API_KEY: + raise EnvironmentError("Please set the SCRAPFLY_API_KEY environment variable.") +logger = logging.getLogger(__name__) - Parameters: - - url (str): The URL of the sitemap "root", can recursively ingest from it if it contains sub-sitemaps. - - name (str): The name of the sitemap. - - mongo_collection (str): The name of the MongoDB collection to store the sitemap links in. - - filters (List[Callable[[str], bool]]): A list of functions that take a URL and return a boolean, when a function returns True the link is excluded. - """ - def __init__(self) -> None: - self.url = "https://stockx.com/sitemap/sitemap-index.xml" - self.filters = ([lambda x: x.startswith("https://stockx.com/search?s=")],) - self.name = ("StockX",) - self.mongo_collection = ("stockx-links",) - self.API_KEY = os.environ.get("SCRAPFLY_API_KEY") - if not self.API_KEY: - raise EnvironmentError( - "Please set the SCRAPFLY_API_KEY environment variable." - ) - self.session = session - self.logger = logging.getLogger(__name__) +async def recursiveIngest(url: str) -> None: + logger.info(f"Ingesting {url}") + try: + params = { + "url": url, + "tags": "player,project:default", + "country": "us", + "asp": "true", + "render_js": "true", + "key": SCRAPFLY_API_KEY, + } + xmlText = session.get( + f"https://api.scrapfly.io/scrape?{urllib.parse.urlencode(params)}", + ).json()["result"]["content"] + xml_document = BeautifulSoup(xmlText, "lxml") + if xml_document.find(name="sitemapindex"): + for loc_element in xml_document.findAll(name="loc"): + await recursiveIngest(loc_element.text) + elif xml_document.find(name="urlset"): + async with BulkWriter() as bw: + for linkAddress in [ + loc_element.text for loc_element in xml_document.findAll(name="loc") + ]: + if not any([f(linkAddress) for f in filters]): + await SiteMapLink.find_one(url=linkAddress).upsert( + Set({SiteMapLink.lastSeenOnSitemap: datetime.now(UTC)}), + on_insert=SiteMapLink( + url=linkAddress, + scraped=False, + lastSeenOnSitemap=datetime.now(UTC), + ), + ) + bw.commit() + print(f"Done with {url}") + else: + raise Exception("Unknown XML/sitemap structure") + except Exception as e: + logger.error(e) - def recursiveIngest(self, url: str) -> None: - self.logger.info(f"Ingesting {url}") - try: - params = { - "url": url, - "tags": "player,project:default", - "country": "us", - "asp": "true", - "render_js": "true", - "key": SCRAPFLY_API_KEY, - } - xmlText = self.session.get( - f"https://api.scrapfly.io/scrape?{urllib.parse.urlencode(params)}", - ).json()["result"]["content"] - create_and_write_file( - path.join(DATA_DIR, "xml", url.split("/")[-1]), - xmlText, - ) - xml_document = BeautifulSoup(xmlText, "lxml") - if xml_document.find(name="sitemapindex"): - [self.recursiveIngest(l.text) for l in xml_document.findAll(name="loc")] - elif xml_document.find(name="urlset"): - productPages = [ - SiteMapLink(url=linkAddress, scraped=False, lastScraped=None) - for linkAddress in [ - locElement.text - for locElement in xml_document.findAll(name="loc") - ] - if not any([f(linkAddress) for f in self.filters]) - ] - upsert_sitemap_links(productPages, self.mongo_collection) - else: - raise Exception("Unknown XML/sitemap structure") - except Exception as e: - self.logger.error(e) - def execute(self) -> None: - self.recursiveIngest(self.url) +async def execute() -> None: + await recursiveIngest(url) diff --git a/src/ingest/tasks/useragents.py b/src/ingest/tasks/useragents.py index aa47a49..32f42d6 100644 --- a/src/ingest/tasks/useragents.py +++ b/src/ingest/tasks/useragents.py @@ -1,50 +1,43 @@ import asyncio import os -from beanie import Document - -from ingest.models.task import IngestInterface +from ingest.models.misc import Useragent from ingest.utils.sessions import session - -class UserAgent(Document): - useragent: str - - class Settings: - collection = "useragents" - - -class UserAgentIngest(IngestInterface): - def __init__(self): - self.url = "https://api.whatismybrowser.com/api/v2/user_agent_database_search" - self.headers = { - "X-API-KEY": os.environ.get("SOLESEARCH_WIMB_API_KEY", "API_KEY_NOT_SET"), - } - - async def __get_useragents_api( - self, operating_system_name: str, software_name: str - ) -> None: - querystring = { - "order_by": "first_seen_at", - "limit": "500", - "software_type": "browser", - "operating_system_name": operating_system_name, - "software_name": software_name, - } - useragents = [ - UserAgent(useragent=result["user_agent"]) - for result in session.get( - self.url, - headers=self.headers, - params=querystring, - ).json()["search_results"]["user_agents"] - ] - UserAgent.insert_many(useragents) - - async def execute(self) -> None: - await UserAgent.delete_all() - async with asyncio.TaskGroup() as tg: - tg.create_task(self.__get_useragents_api("macOS", "Safari")) - tg.create_task(self.__get_useragents_api("macOS", "Chrome")) - tg.create_task(self.__get_useragents_api("Windows", "Chrome")) - tg.create_task(self.__get_useragents_api("Windows", "Firefox")) +url = "https://api.whatismybrowser.com/api/v2/user_agent_database_search" +SOLESEARCH_WIMB_API_KEY = os.environ.get("SOLESEARCH_WIMB_API_KEY", None) +if SOLESEARCH_WIMB_API_KEY is None: + raise ValueError("SOLESEARCH_WIMB_API_KEY not set in environment variables") +headers = { + "X-API-KEY": os.environ.get("SOLESEARCH_WIMB_API_KEY"), +} + + +async def __get_useragents_api(operating_system_name: str, software_name: str) -> None: + querystring = { + "order_by": "first_seen_at", + "limit": "500", + "software_type": "browser", + "operating_system_name": operating_system_name, + "software_name": software_name, + } + useragents = [ + Useragent(useragent=result["user_agent"]) + for result in session.get( + url, + headers=headers, + params=querystring, + ).json()[ + "search_results" + ]["user_agents"] + ] + Useragent.insert_many(useragents) + + +async def execute() -> None: + await Useragent.delete_all() + async with asyncio.TaskGroup() as tg: + tg.create_task(__get_useragents_api("macOS", "Safari")) + tg.create_task(__get_useragents_api("macOS", "Chrome")) + tg.create_task(__get_useragents_api("Windows", "Chrome")) + tg.create_task(__get_useragents_api("Windows", "Firefox")) diff --git a/src/ingest/utils/sessions.py b/src/ingest/utils/sessions.py index 0cd86b9..f2e12ae 100644 --- a/src/ingest/utils/sessions.py +++ b/src/ingest/utils/sessions.py @@ -11,19 +11,7 @@ logger = logging.getLogger(__name__) session = Session() - -session.mount("https://stockx.com/", LimiterAdapter(per_second=1, per_minute=10)) -session.mount("https://www.nike.com/", LimiterAdapter(per_second=0.25, per_minute=11)) -session.mount("https://us.puma.com/", LimiterAdapter(per_second=1)) -session.mount( - "https://snkr-rest-api-nfrpf.ondigitalocean.app/", LimiterAdapter(per_second=1) -) -session.mount( - "https://the-sneaker-database.p.rapidapi.com/", - LimiterAdapter(per_second=0.25, per_minute=10), -) -session.mount("https://api.whatismybrowser.com/", LimiterAdapter(per_hour=1000)) -session.mount("https://api.stockx.vlour.me", LimiterAdapter(per_hour=1000)) +stockx_session = Session() class TokenRefreshAdapter(HTTPAdapter): @@ -44,12 +32,14 @@ def __init__(self, *args, **kwargs): "Please set the SOLESEARCH_STOCKX_CLIENT_SECRET environment variable." ) self.max_retries = 3 - self.token = StockxToken.find_one(StockxToken.type == "access_token").token - self.refresh_token = StockxToken.find_one( - StockxToken.type == "refresh_token" - ).token super().__init__(*args, **kwargs) + async def _init(self): + access_token = await StockxToken.get("access_token") + self.token = access_token.value + refresh_token = await StockxToken.get("refresh_token") + self.refresh_token = refresh_token.value + def send(self, request, **kwargs): request.headers["Content-Type"] = "application/json" request.headers["Authorization"] = f"Bearer {self.token}" @@ -81,18 +71,34 @@ def renew_token(self): ) ).json() self.token = tokens["access_token"] - update_stockx_tokens(tokens) + self.update_stockx_tokens(tokens) logging.info("Token renewed!") + async def update_stockx_tokens(self, tokens: dict): + for token_type in ["id_token", "access_token", "refresh_token"]: + if token_type in tokens: + db_token = await StockxToken.get(token_type) + db_token.value = tokens[token_type] + await db_token.save() + logger.info(f"Updated {token_type}") -async def update_stockx_tokens(tokens: dict): - for token_type in ["id_token", "access_token", "refresh_token"]: - if token_type in tokens: - db_token = await StockxToken.find_one(StockxToken.type == token_type) - db_token.token = tokens[token_type] - logger.info(f"Updated {token_type}") +async def init_sessions(): + session.mount("https://stockx.com/", LimiterAdapter(per_second=1, per_minute=10)) + session.mount( + "https://www.nike.com/", LimiterAdapter(per_second=0.25, per_minute=11) + ) + session.mount("https://us.puma.com/", LimiterAdapter(per_second=1)) + session.mount( + "https://snkr-rest-api-nfrpf.ondigitalocean.app/", LimiterAdapter(per_second=1) + ) + session.mount( + "https://the-sneaker-database.p.rapidapi.com/", + LimiterAdapter(per_second=0.25, per_minute=10), + ) + session.mount("https://api.whatismybrowser.com/", LimiterAdapter(per_hour=1000)) + session.mount("https://api.stockx.vlour.me", LimiterAdapter(per_hour=1000)) -stockx_session = Session() -adapter = TokenRefreshAdapter() -stockx_session.mount("https://api.stockx.com", adapter) + stockx_adapter = TokenRefreshAdapter() + await stockx_adapter._init() + stockx_session.mount("https://api.stockx.com", stockx_adapter)