Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added bitepy/__pycache__/__init__.cpython-311.pyc
Binary file not shown.
Binary file added bitepy/__pycache__/simulation.cpython-311.pyc
Binary file not shown.
330 changes: 329 additions & 1 deletion bitepy/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,20 @@
# Licensed under MIT License, see https://opensource.org/license/mit
######################################################################

import gc
import multiprocessing
import pandas as pd
import numpy as np
import polars as pl
from zipfile import ZipFile
import os
import time
import zipfile
from io import BytesIO, StringIO
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from pathlib import Path
from datetime import datetime
from datetime import datetime, date, timedelta

try:
from ._bitepy import Simulation_cpp
Expand All @@ -24,6 +31,266 @@
) from e


# ── Constants for fast EPEX parsing ──────────────────────────────────────────

_EPEX_DROP_COLS = [
"LinkedBasketId", "DeliveryArea", "ParentId", "DeliveryEnd", "Currency",
"Product", "UserDefinedBlock", "RevisionNo", "ExecutionRestriction",
"CreationTime", "QuantityUnit", "Volume", "VolumeUnit",
]

_EPEX_RENAME_MAP = {
"OrderId": "order",
"InitialId": "initial",
"DeliveryStart": "start",
"Side": "side",
"Price": "price",
"ValidityTime": "validity",
"ActionCode": "action",
"TransactionTime": "transaction",
"Quantity": "quantity",
}

_EPEX_DEDUP_COLS = ["OrderId", "InitialId", "ActionCode", "ValidityTime", "Price", "Quantity"]

_EPEX_PRODUCT_FILTERS = {
"Hourly": ["Intraday_Hour_Power", "XBID_Hour_Power"],
"Quarter-Hourly": ["Intraday_Quarter_Hour_Power", "XBID_Quarter_Hour_Power"],
}


def _read_raw_epex_file_fast(timestamp: date, datapath: str, product: str = "Hourly") -> pl.DataFrame:
"""Read and process a single raw EPEX zip file using Polars."""
year = timestamp.strftime("%Y")
month = timestamp.strftime("%m")
datestr = f"Continuous_Orders-DE-{timestamp.strftime('%Y%m%d')}"

folder = f"{datapath}/{year}/{month}"
zip_file_name = next(f for f in os.listdir(folder) if datestr in f)
csv_file_name = zip_file_name[:-4]

with ZipFile(f"{folder}/{zip_file_name}") as zf:
raw_bytes = zf.read(csv_file_name)

df = pl.read_csv(BytesIO(raw_bytes), skip_rows=1, infer_schema_length=10000)

# Filter and clean
df = (
df.unique(subset=_EPEX_DEDUP_COLS, keep="first", maintain_order=True)
.filter(pl.col("UserDefinedBlock") == "N")
.filter(pl.col("Product").is_in(_EPEX_PRODUCT_FILTERS[product]))
.filter(pl.col("ActionCode").is_in(["A", "D", "C", "I"]))
.drop(_EPEX_DROP_COLS)
.rename(_EPEX_RENAME_MAP)
.with_columns(
pl.col("start").str.strptime(pl.Datetime("ms"), "%Y-%m-%dT%H:%M:%SZ"),
pl.col("validity").str.strptime(pl.Datetime("ms"), "%Y-%m-%dT%H:%M:%SZ", strict=False),
pl.col("transaction").str.strptime(pl.Datetime("ms"), "%Y-%m-%dT%H:%M:%S%.fZ"),
)
)

# Remove iceberg orders
iceberg_ids = df.filter(pl.col("action") == "I")["initial"].unique().to_list()
df = df.filter(~pl.col("initial").is_in(iceberg_ids))

df = df.with_row_index("_idx")

# Process change messages (shift instead of while loop)
c_orders = df.filter(pl.col("action") == "C")["order"].unique().to_list()
a_orders = df.filter(pl.col("action") == "A")["order"].unique().to_list()
orders_to_process = list(set(c_orders) & set(a_orders))

if orders_to_process:
chain = (
df.filter(
pl.col("order").is_in(orders_to_process)
& pl.col("action").is_in(["A", "C"])
)
.sort("_idx")
)

chain = chain.with_columns(
pl.col("transaction").shift(-1).over("order").alias("_new_validity")
)

updates = chain.filter(pl.col("_new_validity").is_not_null())
update_map = dict(zip(updates["_idx"].to_list(), updates["_new_validity"].to_list()))

if update_map:
update_indices = list(update_map.keys())
df = df.with_columns(
pl.when(pl.col("_idx").is_in(update_indices))
.then(
pl.col("_idx").replace_strict(
update_map, default=None, return_dtype=pl.Datetime("ms")
)
)
.otherwise(pl.col("validity"))
.alias("validity")
)

c_indices = df.filter(
pl.col("order").is_in(orders_to_process) & (pl.col("action") == "C")
)["_idx"].to_list()
df = df.with_columns(
pl.when(pl.col("_idx").is_in(c_indices))
.then(pl.lit("A"))
.otherwise(pl.col("action"))
.alias("action")
)

# Process cancel messages
cancel_messages = df.filter(pl.col("action") == "D")
a_orders_for_cancel = df.filter(pl.col("action") == "A")["order"].unique().to_list()
cancel_messages = cancel_messages.filter(pl.col("order").is_in(a_orders_for_cancel))

if not cancel_messages.is_empty():
a_rows = (
df.filter(
(pl.col("action") == "A") & pl.col("order").is_in(cancel_messages["order"].to_list())
)
.sort("transaction", "_idx")
.unique(subset=["order"], keep="last", maintain_order=True)
.select("order", "_idx")
.rename({"_idx": "_a_idx"})
)

merged = cancel_messages.select("order", "transaction", "_idx").join(a_rows, on="order")

update_map = dict(zip(merged["_a_idx"].to_list(), merged["transaction"].to_list()))
if update_map:
df = df.with_columns(
pl.when(pl.col("_idx").is_in(list(update_map.keys())))
.then(
pl.col("_idx").replace_strict(
update_map, default=None, return_dtype=pl.Datetime("ms")
)
)
.otherwise(pl.col("validity"))
.alias("validity")
)

df = df.filter(pl.col("action") != "D").drop("order", "action", "_idx")

df = df.with_columns(
pl.col("start").dt.strftime("%Y-%m-%dT%H:%M:%SZ"),
pl.col("transaction").dt.strftime("%Y-%m-%dT%H:%M:%S.%3fZ"),
pl.col("validity").dt.strftime("%Y-%m-%dT%H:%M:%S.%3fZ"),
)

return df


def _read_raw_files_parallel(
dates: list[date],
marketdatapath: str,
product: str = "Hourly",
max_workers: int = 2,
verbose: bool = False,
) -> dict[date, pl.DataFrame]:
"""Read a batch of raw EPEX files in parallel. Returns dict of date -> DataFrame."""
empty_schema = {
"initial": pl.Int64, "side": pl.Utf8, "start": pl.Utf8,
"transaction": pl.Utf8, "validity": pl.Utf8,
"price": pl.Float64, "quantity": pl.Float64,
}
raw_data: dict[date, pl.DataFrame] = {}

with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(_read_raw_epex_file_fast, dt, marketdatapath, product): dt
for dt in dates
}
for future in as_completed(futures):
dt = futures[future]
try:
raw_data[dt] = future.result()
if verbose:
print(f" Read raw file for {dt} ({raw_data[dt].shape[0]} rows)")
except Exception as e:
print(f" ERROR reading {dt}: {e}")
raw_data[dt] = pl.DataFrame(schema=empty_schema)

return raw_data


def _save_day(dt1: date, raw_data: dict[date, pl.DataFrame], savepath: str, verbose: bool):
"""Combine raw files for dt1 and dt1+1, filter to transaction date dt1, and save."""
dt2 = dt1 + timedelta(days=1)
df1 = raw_data.get(dt1, pl.DataFrame())
df2 = raw_data.get(dt2, pl.DataFrame())

frames = [f for f in [df1, df2] if not f.is_empty()]
if not frames:
if verbose:
print(f" No data for {dt1}, skipping")
return

df = pl.concat(frames)

df = df.with_columns(
pl.col("transaction").str.slice(0, 10).alias("transaction_date"),
pl.col("price").cast(pl.Float64).round(2),
pl.col("quantity").cast(pl.Float64).round(1),
)

group = df.filter(pl.col("transaction_date") == dt1.isoformat())

if group.is_empty():
if verbose:
print(f" No data for {dt1}, skipping")
return

group = (
group.sort("transaction")
.with_row_index("")
.drop("transaction_date")
)

group = group.with_columns(pl.col("validity").fill_null(""))
group = group.select(["", "initial", "side", "start", "transaction", "validity", "price", "quantity"])

daily_filename = f"orderbook_{dt1}.csv"
zip_path = f"{savepath}{daily_filename}.zip"

buf = StringIO()
group.write_csv(buf)
csv_bytes = buf.getvalue().encode("utf-8")

with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
zf.writestr(daily_filename, csv_bytes)

if verbose:
print(f" Saved {dt1}: {group.shape[0]} rows")


def _process_chunk(
chunk_start: date,
chunk_end: date,
marketdatapath: str,
savepath: str,
product: str,
max_workers: int,
verbose: bool,
):
"""Process a single chunk in a subprocess. All memory is freed when this exits."""
raw_dates = []
d = chunk_start
while d <= chunk_end + timedelta(days=1):
raw_dates.append(d)
d += timedelta(days=1)

if verbose:
print(f"\n Chunk {chunk_start} to {chunk_end} ({len(raw_dates)} raw files)")

raw_data = _read_raw_files_parallel(raw_dates, marketdatapath, product, max_workers, verbose)

d = chunk_start
while d <= chunk_end:
_save_day(d, raw_data, savepath, verbose)
d += timedelta(days=1)


class Data:
def __init__(self):
"""Initialize a Data instance."""
Expand Down Expand Up @@ -467,6 +734,67 @@ def parse_market_data(self, start_date_str: str, end_date_str: str, marketdatapa

print("\nWriting CSV data completed.")

def parse_market_data_fast(self, start_date_str: str, end_date_str: str, marketdatapath: str,
savepath: str, market_type: str, product: str = "Hourly",
max_workers: int = 2, chunk_size: int = 3, verbose: bool = True):
"""
Fast version of parse_market_data using Polars + parallel raw file reading.

Same interface as parse_market_data, but ~2x faster for EPEX data (2021+ format).
Currently supports EPEX only. Falls back to parse_market_data for NordPool.

Processes the date range in chunks of `chunk_size` days to limit memory usage.
Each chunk runs in a subprocess so memory is truly freed by the OS between chunks.

Args:
start_date_str (str): Start date in format "YYYY-MM-DD"
end_date_str (str): End date in format "YYYY-MM-DD"
marketdatapath (str): Path to market data folder with yearly/monthly subfolders
savepath (str): Directory where processed CSV files will be saved
market_type (str): "EPEX" or "NordPool" (NordPool falls back to original method)
product (str, optional): "Hourly" or "Quarter-Hourly". Defaults to "Hourly".
max_workers (int, optional): Number of parallel threads for reading raw files. Defaults to 2.
chunk_size (int, optional): Number of target days per chunk. Defaults to 3.
verbose (bool, optional): Print progress messages. Defaults to True.
"""
if market_type != "EPEX":
return self.parse_market_data(start_date_str, end_date_str, marketdatapath, savepath, market_type, verbose)

if product not in _EPEX_PRODUCT_FILTERS:
raise ValueError(f"Unknown product '{product}'. Must be one of: {list(_EPEX_PRODUCT_FILTERS.keys())}")

os.makedirs(savepath, exist_ok=True)

start = date.fromisoformat(start_date_str)
end = date.fromisoformat(end_date_str)

if start > end:
raise ValueError("Error: Start date is after end date.")
if start.year < 2020:
raise ValueError("Error: Years before 2020 are not supported.")

t0 = time.time()

# Process in chunks, each in a subprocess so memory is truly freed
chunk_start = start
while chunk_start <= end:
chunk_end = min(chunk_start + timedelta(days=chunk_size - 1), end)

p = multiprocessing.Process(
target=_process_chunk,
args=(chunk_start, chunk_end, marketdatapath, savepath, product, max_workers, verbose),
)
p.start()
p.join()

if p.exitcode != 0:
raise RuntimeError(f"Chunk {chunk_start} to {chunk_end} failed (exit code {p.exitcode})")

chunk_start = chunk_end + timedelta(days=1)

if verbose:
print(f"\nTotal time: {time.time() - t0:.1f}s")

def create_bins_from_csv(self, csv_list: list, save_path: str, verbose: bool = True):
"""
Convert zipped CSV files of pre-processed order book data into binary files.
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ classifiers = [
dependencies = [
"numpy>=1.16.0",
"pandas>=0.24.0",
"polars>=1.0.0",
"matplotlib>=3.0.0",
"tqdm>=4.0.0",
]
Expand Down