Skip to content
This repository was archived by the owner on Mar 4, 2025. It is now read-only.
Open
26 changes: 15 additions & 11 deletions src/ingest/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,29 @@

# from ingest.tasks.aftermarket.snks_drops import SnksDropsIngest
# from ingest.tasks.aftermarket.stadium_goods import StadiumGoodsIngest
# from ingest.tasks.aftermarket.stockx.sitemap import SitemapIngest
# from ingest.tasks.aftermarket.tsdb import TheSneakerDatabaseIngest
from ingest.tasks.fix import fix_duplicate_skus
from ingest.tasks.aftermarket.stockx.sitemap import execute as sitemap_ingest
from ingest.utils.sessions import init_sessions

# from ingest.tasks.retail.nike import NikeIngest
# from ingest.tasks.retail.puma import PumaIngest
# from ingest.tasks.useragents import UserAgentIngest
# from ingest.tasks.useragents import execute as useragent_ingest

# from ingest.tasks.aftermarket.tsdb import TheSneakerDatabaseIngest
# from ingest.tasks.fix import fix_duplicate_skus


logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger(__name__)

tasks = {
# "snks_drops": SnksDropsIngest().execute,
# "stadium_goods": StadiumGoodsIngest().execute,
# "stockx_sitemap": SitemapIngest().execute,
"stockx_sitemap": sitemap_ingest,
# "tsdb": TheSneakerDatabaseIngest().execute,
# "nike": NikeIngest().execute,
# "puma": PumaIngest().execute,
# "useragents": UserAgentIngest.execute,
"dupes": fix_duplicate_skus,
# "useragents": useragent_ingest,
# "dupes": fix_duplicate_skus,
}


Expand All @@ -39,18 +42,19 @@ def parse_arguments() -> argparse.Namespace:

async def main():
await init_db()
await init_sessions()
args = parse_arguments()
if args.tasks:
to_run = args.tasks.split(",")
else:
to_run = []
async with asyncio.TaskGroup() as tg:
for task_name, task in tasks.items():
if task_name not in to_run:
if task_name in to_run:
logger.info(f"Running {task_name}")
tg.create_task(task())
else:
logger.info(f"Skipping {task_name}")
continue
logger.info(f"Running {task_name}")
tg.create_task(task())
logger.info("All tasks completed!")


Expand Down
7 changes: 4 additions & 3 deletions src/ingest/db/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
import os

from beanie import init_beanie
from motor.motor_asyncio import AsyncIOMotorClient

from core.models.details import SiteMapLink
from core.models.shoes import Sneaker
from motor.motor_asyncio import AsyncIOMotorClient

from ingest.models.misc import StockxToken, Useragent

logger = logging.getLogger(__name__)

Expand All @@ -25,6 +26,6 @@ async def init_db():
client = AsyncIOMotorClient(CONNECTION_STRING)
await init_beanie(
database=client.get_database(DATABASE_NAME),
document_models=[Sneaker, SiteMapLink],
document_models=[Sneaker, SiteMapLink, StockxToken, Useragent],
)
logger.info("Connected to database: %s", DATABASE_NAME)
5 changes: 2 additions & 3 deletions src/ingest/models/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
import requests

from ingest.config import ARCHIVE_DIR, DATA_DIR
from ingest.models.task import IngestInterface
from ingest.utils.helpers import copy_and_mkdir


class Ingest(IngestInterface, ABC):
class Ingest(ABC):
def __init__(self, brand: str, download_url: str = None) -> None:
self.brand = brand
self.download_url = download_url
Expand All @@ -25,7 +24,7 @@ def __init__(self, brand: str, download_url: str = None) -> None:

def download(self, headers: dict) -> None:
"""Downloads the brand's web page and writes it to the filesystem."""
r = requests.get(self.download_url)
r = requests.get(self.download_url, headers=headers)
if r.status_code == 200:
with open(file=self.paths["html"], mode="w", encoding="utf-8") as html_file:
html_file.write(r.text)
Expand Down
9 changes: 5 additions & 4 deletions src/ingest/models/misc.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
from beanie import Document
from pydantic import Field


class StockxToken(Document):
type: str
token: str
id: str
value: str

class Settings:
collection = "OAuth"
name = "OAuth"


class Useragent(Document):
useragent: str

class Settings:
collection = "useragents"
name = "useragents"
Empty file.
11 changes: 0 additions & 11 deletions src/ingest/models/task.py

This file was deleted.

4 changes: 2 additions & 2 deletions src/ingest/tasks/aftermarket/goat.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ def __get_from_hidden_api(url, additional_params={}, additional_headers={}):
headers={**default_headers, **additional_headers},
params={**default_params, **additional_params},
)
json = response.json()
return json
response_json = response.json()
return response_json


def get_trending_sneakers():
Expand Down
103 changes: 52 additions & 51 deletions src/ingest/tasks/aftermarket/sneakercrush.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,58 @@
from core.graphql.sc_operations import Operations
from sgqlc.endpoint.requests import RequestsEndpoint

from ingest.db.instance import client
from ingest.models.task import IngestInterface
BATCH_SIZE = 1000


class SneakerCrushIngest(IngestInterface):
def execute(self) -> None:
endpoint = RequestsEndpoint(
url="https://thesneakercrush.com/graphql",
base_headers={
"x-secret": "xDqkUbTE3B6KvSlEbKva1xopqMr9BR9bbwcY4+uGuRvpRyWiA60UTN5YkTCulD2x"
},
)
op = Operations.query.get_sneakers
hasNextPage = True
p = 0
while hasNextPage:
data = endpoint(op, {"sort": {"date": -1}, "perPage": 1000, "page": p})
if data["data"]["ReleasePagination"]["items"]:
for item in data["data"]["ReleasePagination"]["items"]:
if item["_id"]:
item["sneakerCrushId"] = item["_id"]
del item["_id"]
client["sneakercrush-sneakers"].insert_many(
data["data"]["ReleasePagination"]["items"],
)
print(f"Inserted 1000 sneakers!")
if data["data"]["ReleasePagination"]["pageInfo"]:
hasNextPage = data["data"]["ReleasePagination"]["pageInfo"][
"hasNextPage"
]
print(f"Page {p} done!")
else:
hasNextPage = False
p += 1
print("Done")
async def execute() -> None:
endpoint = RequestsEndpoint(
url="https://thesneakercrush.com/graphql",
base_headers={
"x-secret": "xDqkUbTE3B6KvSlEbKva1xopqMr9BR9bbwcY4+uGuRvpRyWiA60UTN5YkTCulD2x"
},
)
op = Operations.query.get_sneakers
hasNextPage = True
p = 0
while hasNextPage:
data = endpoint(op, {"sort": {"date": -1}, "perPage": BATCH_SIZE, "page": p})
if data["data"]["ReleasePagination"]["items"]:
for item in data["data"]["ReleasePagination"]["items"]:
if item["_id"]:
item["sneakerCrushId"] = item["_id"]
del item["_id"]
sneakers = []
for release in data["data"]["ReleasePagination"]["items"]:
# TODO parse each release into a Sneaker document
# attempt some merge strategy
sneakers.append(Sneaker())
if len(sneakers) >= BATCH_SIZE:
await Sneaker.insert_many(sneakers)
print(f"Inserted {BATCH_SIZE} sneakers!")
pass

if data["data"]["ReleasePagination"]["pageInfo"]:
hasNextPage = data["data"]["ReleasePagination"]["pageInfo"]["hasNextPage"]
print(f"Page {p} done!")
else:
hasNextPage = False
p += 1
print("Done")

def drop_dupes(self) -> None:
dupes = client["sneakercrush-sneakers"].aggregate(
[
{
"$group": {
"_id": "$sneakerCrushId",
"uniqueIds": {"$addToSet": "$_id"},
"count": {"$sum": 1},
}
},
{"$match": {"count": {"$gt": 1}}},
]
)
for document in dupes:
document["uniqueIds"].pop(0)
client["sneakercrush-sneakers"].delete_many(
{"_id": {"$in": document["uniqueIds"]}}
)

def drop_dupes() -> None:
dupes = Sneaker.aggregate(
[
{
"$group": {
"_id": "$sneakerCrushId",
"uniqueIds": {"$addToSet": "$_id"},
"count": {"$sum": 1},
}
},
{"$match": {"count": {"$gt": 1}}},
]
)
for document in dupes:
document["uniqueIds"].pop(0)
Sneakers.delete_many({"_id": {"$in": document["uniqueIds"]}})
83 changes: 41 additions & 42 deletions src/ingest/tasks/aftermarket/snks_drops.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,51 @@
import logging
from datetime import UTC, datetime

from core.models.details import Images, Prices
from core.models.shoes import Sneaker
from ingest.models.task import IngestInterface

from ingest.utils.helpers import try_guess_brand
from ingest.utils.sessions import session

logger = logging.getLogger(__name__)

class SnksDropsIngest(IngestInterface):
def ingest(self) -> None:
releases = session.get(
"https://snkr-rest-api-nfrpf.ondigitalocean.app/api/ios/releases"
)
if releases.status_code != 200:
raise Exception("Failed to fetch releases from snkrs-drops")
else:
sneakers = []
for release in releases.json()["items"]:
if not release["pid"] or release["pid"] == "TBD":
continue
release_date = datetime.strptime(
release["date"][:-1], "%Y-%m-%dT%H:%M:%S.%f"
)

async def execute() -> None:
releases = session.get(
"https://snkr-rest-api-nfrpf.ondigitalocean.app/api/ios/releases"
)
if releases.status_code != 200:
raise Exception("Failed to fetch releases from snkrs-drops")
else:
sneakers = []
for release in releases.json()["items"]:
if not release["pid"] or release["pid"] == "TBD":
continue
release_date = datetime.strptime(
release["date"][:-1], "%Y-%m-%dT%H:%M:%S.%f"
)
price = None
try:
price = float(release["price"].replace("$", ""))
except ValueError:
price = None
try:
price = float(release["price"].replace("$", ""))
except ValueError:
price = None
sneakers.append(
Sneaker(
brand=try_guess_brand(release["name"]),
sku=release["pid"],
name=release["name"],
colorway=None,
audience=None,
releaseDate=release_date,
images=Images(
original=release["images"][0],
alternateAngles=release["images"][1:],
),
links=None,
prices=Prices(retail=price) if price else None,
sizes=None,
description=None,
dateAdded=datetime.now(UTC),
)
sneakers.append(
Sneaker(
brand=try_guess_brand(release["name"]),
sku=release["pid"],
name=release["name"],
colorway=None,
audience=None,
releaseDate=release_date,
images=Images(
original=release["images"][0],
alternateAngles=release["images"][1:],
),
links=None,
prices=Prices(retail=price) if price else None,
sizes=None,
description=None,
dateAdded=datetime.now(UTC),
)
Sneaker.insert_many(sneakers)

def execute(self) -> None:
self.ingest()
)
Sneaker.insert_many(sneakers)
Loading