From e3726d1907ec5d6c940d27b18c02be589de1c58c Mon Sep 17 00:00:00 2001 From: XuehengWang Date: Sat, 15 Feb 2025 10:51:45 +0000 Subject: [PATCH 01/19] add poisson and randome patterns for timestamps --- tracestorm/cli.py | 21 +++++++++++++++------ tracestorm/trace_generator.py | 20 +++++++++++++++++++- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/tracestorm/cli.py b/tracestorm/cli.py index 331b5a1..4dbd556 100644 --- a/tracestorm/cli.py +++ b/tracestorm/cli.py @@ -10,17 +10,18 @@ SyntheticTraceGenerator, TraceGenerator, ) +from tracestorm.data_loader import load_datasets logger = init_logger(__name__) # Valid patterns -SYNTHETIC_PATTERNS = {"uniform"} +SYNTHETIC_PATTERNS = {"uniform", "poisson", "random"} AZURE_PATTERNS = {"azure_code", "azure_conv"} VALID_PATTERNS = SYNTHETIC_PATTERNS | AZURE_PATTERNS - +# TODO: VALID_DATASETS = def create_trace_generator( - pattern: str, rps: int, duration: int + pattern: str, rps: int, duration: int, seed: int ) -> Tuple[TraceGenerator, str]: """ Create appropriate trace generator based on pattern and validate parameters. @@ -29,6 +30,7 @@ def create_trace_generator( pattern: Pattern for trace generation rps: Requests per second (only for synthetic patterns) duration: Duration in seconds (only for synthetic patterns) + seed: Random seed for reproducibility of trace patterns Returns: Tuple of (TraceGenerator instance, Warning message or empty string) @@ -50,7 +52,7 @@ def create_trace_generator( raise ValueError( "Duration must be non-negative for synthetic patterns" ) - return SyntheticTraceGenerator(rps, pattern, duration), warning_msg + return SyntheticTraceGenerator(rps, pattern, duration, seed), warning_msg # Azure patterns if rps != 1: @@ -75,6 +77,7 @@ def create_trace_generator( @click.option( "--pattern", default="uniform", + type=click.Choice(sorted(VALID_PATTERNS), case_sensitive=False), help=f"Pattern for generating trace. Valid patterns: {sorted(VALID_PATTERNS)}", ) @click.option( @@ -83,6 +86,12 @@ def create_trace_generator( default=10, help="Duration in seconds (only used with synthetic patterns)", ) +@click.option( + "--seed", + type=int, + default=None, + help="Random seed for reproducibility of trace patterns", +) @click.option( "--subprocesses", type=int, default=1, help="Number of subprocesses" ) @@ -98,11 +107,11 @@ def create_trace_generator( default=lambda: os.environ.get("OPENAI_API_KEY", "none"), help="OpenAI API Key", ) -def main(model, rps, pattern, duration, subprocesses, base_url, api_key): +def main(model, rps, pattern, duration, seed, subprocesses, base_url, api_key): """Run trace-based load testing for OpenAI API endpoints.""" try: trace_generator, warning_msg = create_trace_generator( - pattern, rps, duration + pattern, rps, duration, seed ) if warning_msg: logger.warning(warning_msg) diff --git a/tracestorm/trace_generator.py b/tracestorm/trace_generator.py index db7f1b0..24767f1 100644 --- a/tracestorm/trace_generator.py +++ b/tracestorm/trace_generator.py @@ -5,6 +5,7 @@ import pandas as pd import requests +import numpy as np from tracestorm.constants import AZURE_DATASET_PATHS, AZURE_REPO_URL from tracestorm.logger import init_logger @@ -32,7 +33,7 @@ def generate(self) -> List[int]: class SyntheticTraceGenerator(TraceGenerator): """Generate synthetic traces based on patterns.""" - def __init__(self, rps: int, pattern: str, duration: int): + def __init__(self, rps: int, pattern: str, duration: int, seed: Optional[int] = None): """ Initialize synthetic trace generator. @@ -40,6 +41,7 @@ def __init__(self, rps: int, pattern: str, duration: int): rps (int): Requests per second. Must be non-negative. pattern (str): Distribution pattern ('uniform', 'random', 'poisson', etc.). duration (int): Total duration in seconds. Must be non-negative. + seed (int): Seed for reproducibility of 'poisson' and 'random' patterns """ if not isinstance(rps, int) or rps < 0: raise ValueError("rps must be a non-negative integer") @@ -49,6 +51,8 @@ def __init__(self, rps: int, pattern: str, duration: int): self.rps = rps self.pattern = pattern self.duration = duration + if seed is not None: + np.random.seed(seed) def generate(self) -> List[int]: total_requests = self.rps * self.duration @@ -59,6 +63,7 @@ def generate(self) -> List[int]: return timestamps if self.pattern == "uniform": + # Distribute requests evenly across the duration interval = total_duration_ms / total_requests current_time = 0.0 for _ in range(total_requests): @@ -66,6 +71,19 @@ def generate(self) -> List[int]: timestamp = min(timestamp, total_duration_ms - 1) timestamps.append(timestamp) current_time += interval + elif self.pattern == "poisson": + # Exponential distribution for intervals + rate_ms = self.rps / 1000 + intervals = np.random.exponential(1 / rate_ms, total_requests) + current_time = 0.0 + for i in range(total_requests): + timestamp = int(round(current_time)) + timestamps.append(timestamp) + current_time += intervals[i] + elif self.pattern == "random": + timestamps = np.random.randint( + 0, total_duration_ms, size=total_requests + ).tolist() else: raise ValueError(f"Unknown pattern: {self.pattern}") From 6db8778e7f1198027f7583fea3cf009295b2b885 Mon Sep 17 00:00:00 2001 From: XuehengWang Date: Sat, 15 Feb 2025 16:33:35 +0000 Subject: [PATCH 02/19] add data loader --- requirements.txt | 1 + tracestorm/data_loader.py | 145 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+) create mode 100644 tracestorm/data_loader.py diff --git a/requirements.txt b/requirements.txt index ba8db04..1fe4043 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ numpy>=1.26.4 pandas>=2.2.3 requests>=2.31.0 seaborn>=0.13.2 +datasets \ No newline at end of file diff --git a/tracestorm/data_loader.py b/tracestorm/data_loader.py new file mode 100644 index 0000000..e6efb78 --- /dev/null +++ b/tracestorm/data_loader.py @@ -0,0 +1,145 @@ +import os +import json +from typing import List, Optional, Tuple +import pandas as pd +from dataclasses import dataclass +from datasets import load_dataset + +from tracestorm.constants import DEFAULT_DATASET_FOLDER +from tracestorm.logger import init_logger + +logger = init_logger(__name__) + + +@dataclass +class Dataset: + """ + Each Dataset object contains name of the dataset, a list of prompts, + the select ratio among all datasets, and the total number of prompts + """ + file_name: str + prompts: List[str] + select_ratio: float + length: int + + +def normalize_prompts(row) -> List[str]: + """ + Convert one row to a list of prompts based on the format. + """ + prompts = [] + if isinstance(row, list): # if the row contains a list of prompts + for item in row: + if isinstance(item, str): + prompts.append(item) + elif isinstance(item, dict) and item.get("role") == "user": + prompts.append(item.get("content", "")) + elif isinstance(row, str): # if the row is already a prompt + prompts.append(row) + elif isinstance(row, dict) and row.get("role") == "user": # if the row is a template, retrieve user prompt + prompts.append(row.get("content", "")) + else: + logger.error(f"Unrecognized row format: {row}") + return [p for p in prompts if p] # Remove empty prompts + +def load_datasets( + datasets_config_file: Optional[str] = None, +) -> Tuple[List[Dataset], Optional[str]]: + """ + Load datasets from local files or Hugging Face datasets. + + Args: + datasets_config_file: A dataset configuration file containing file paths, + prompt fields, selection ratios, and sorting strategies. + A customized data loading logic needs to be implemented if no + datasets_config_file is provided. + + Return: + (List[Dataset], str): A list of Dataset objects and the sorting strategy. + """ + # Load datasets configuration file + if datasets_config_file: + try: + with open(datasets_config_file, "r") as f: + datasets_config = json.load(f) + except FileNotFoundError: + logger.error(f"Configuration file '{datasets_config_file}' not found") + return [], None + except Exception as e: + logger.error(f"Error reading '{datasets_config_file}': {e}") + return [], None + + # Strategy to sort the provided datasets + sort_strategy = datasets_config.get("sort", "random") + + # List to store each Dataset + datasets = [] + + for name, config in datasets_config.items(): + file_name = config.get("file_name") + prompt_field = config.get("prompt_field") + + try: + ratio = float(config.get("select_ratio", 1.0)) + except ValueError: + logger.error(f"Invalid 'select_ratio' for dataset '{name}', using default 1") + ratio = 1.0 + + if not file_name or not prompt_field: + logger.error( + f"Missing required 'file_name' or 'prompt_field' for dataset '{name}'" + ) + continue + if os.path.isfile(file_name): + file_path = os.path.abspath(file_name) + else: + file_path = os.path.join(DEFAULT_DATASET_FOLDER, file_name) + + # Load dataset from local files + if os.path.exists(file_path): + prompts = [] + # CSV files + if file_name.endswith(".csv"): + data = pd.read_csv(file_path) + + if prompt_field not in set(data.column_names): + logger.error(f"Field '{prompt_field}' not found in '{file_path}'.") + continue + prompts = data[prompt_field].dropna().astype(str).tolist() + # JSON files + elif file_name.endswith(".json"): + with open(file_path, "r") as f: + data = json.load(f) + + if isinstance(data, dict): + prompts = data.get(prompt_field, []) + if not isinstance(prompts, list): + logger.error(f"Field '{prompt_field}' in '{file_path}' is not a list.") + continue + else: + logger.error(f"Unsupported file format for '{file_name}'") + continue + else: # Load HF datasets + # data = load_dataset("lmsys/lmsys-chat-1m") + data = load_dataset(file_name)["train"] + if prompt_field not in data.column_names: + logger.error(f"'{prompt_field}' not found in dataset '{file_name}'") + continue + + prompts = [] + for row in data[prompt_field]: + prompts.extend(normalize_prompts(row)) + + # Add the dataset information (file name, a list of prompts, select ratio among all datasets, total number of prompts) + dataset_obj = Dataset(file_name, prompts, ratio, len(prompts)) + datasets.append(dataset_obj) + + logger.info( + f"loaded {file_path} with {len(prompts)} prompts, selection ratio = {ratio}" + ) + + return datasets, sort_strategy + + else: + logger.error("Customized data loading logic needs to be implemented!") + return [], None From 4fbf2f48de6b5dcf237b403d956958e82a1c0f36 Mon Sep 17 00:00:00 2001 From: XuehengWang Date: Sat, 15 Feb 2025 23:42:25 +0000 Subject: [PATCH 03/19] update dataset loading --- tracestorm/cli.py | 19 ++++++++- tracestorm/constants.py | 2 + tracestorm/core.py | 10 ++++- tracestorm/data_loader.py | 54 ++++++++++++++------------ tracestorm/request_generator.py | 68 ++++++++++++++++++++++++++++++--- 5 files changed, 119 insertions(+), 34 deletions(-) diff --git a/tracestorm/cli.py b/tracestorm/cli.py index 4dbd556..49a1c5e 100644 --- a/tracestorm/cli.py +++ b/tracestorm/cli.py @@ -18,7 +18,7 @@ SYNTHETIC_PATTERNS = {"uniform", "poisson", "random"} AZURE_PATTERNS = {"azure_code", "azure_conv"} VALID_PATTERNS = SYNTHETIC_PATTERNS | AZURE_PATTERNS -# TODO: VALID_DATASETS = + def create_trace_generator( pattern: str, rps: int, duration: int, seed: int @@ -107,7 +107,13 @@ def create_trace_generator( default=lambda: os.environ.get("OPENAI_API_KEY", "none"), help="OpenAI API Key", ) -def main(model, rps, pattern, duration, seed, subprocesses, base_url, api_key): +@click.option( + "--datasets-config-file", + default=None, + help="Config file for datasets" +) + +def main(model, rps, pattern, duration, seed, subprocesses, base_url, api_key, datasets_config_file): """Run trace-based load testing for OpenAI API endpoints.""" try: trace_generator, warning_msg = create_trace_generator( @@ -116,12 +122,21 @@ def main(model, rps, pattern, duration, seed, subprocesses, base_url, api_key): if warning_msg: logger.warning(warning_msg) + if datasets_config_file is None: + datasets = [] + sort = None + else: + datasets, sort = load_datasets(datasets_config_file) + _, result_analyzer = run_load_test( trace_generator=trace_generator, model=model, subprocesses=subprocesses, base_url=base_url, api_key=api_key, + datasets=datasets, + sort=sort, + seed=seed ) print(result_analyzer) diff --git a/tracestorm/constants.py b/tracestorm/constants.py index 6c97319..6f5e076 100644 --- a/tracestorm/constants.py +++ b/tracestorm/constants.py @@ -11,3 +11,5 @@ DEFAULT_SUBPROCESSES = 1 DEFAULT_MESSAGES = "Tell me a story" + +DEFAULT_DATASET_FOLDER = "../datasets/" \ No newline at end of file diff --git a/tracestorm/core.py b/tracestorm/core.py index 5185c9e..b9f81a6 100644 --- a/tracestorm/core.py +++ b/tracestorm/core.py @@ -1,5 +1,5 @@ import multiprocessing -from typing import List, Tuple +from typing import List, Tuple, Optional from tracestorm.logger import init_logger from tracestorm.request_generator import generate_request @@ -17,6 +17,9 @@ def run_load_test( subprocesses: int, base_url: str, api_key: str, + datasets: List, + sort: Optional[str] = None, + seed: Optional[int] = None ) -> Tuple[List[Tuple], ResultAnalyzer]: """ Run load test with given configuration. @@ -27,6 +30,9 @@ def run_load_test( subprocesses: Number of subprocesses to use base_url: Base URL for API calls api_key: API key for authentication + datasets: List of datasets to generate prompts + sort: Sorting strategy for prompts in datasets. + seed: Random seed for sorting. Returns: Tuple of (List of results, ResultAnalyzer instance) @@ -38,7 +44,7 @@ def run_load_test( logger.warning("No requests to process. Trace is empty.") return [], ResultAnalyzer() - requests = generate_request(model, total_requests) + requests = generate_request(model_name=model, nums=total_requests, datasets=datasets, sort=sort, seed=seed) ipc_queue = multiprocessing.Queue() processes = [] diff --git a/tracestorm/data_loader.py b/tracestorm/data_loader.py index e6efb78..7f48721 100644 --- a/tracestorm/data_loader.py +++ b/tracestorm/data_loader.py @@ -19,7 +19,7 @@ class Dataset: """ file_name: str prompts: List[str] - select_ratio: float + select_ratio: int length: int @@ -33,11 +33,13 @@ def normalize_prompts(row) -> List[str]: if isinstance(item, str): prompts.append(item) elif isinstance(item, dict) and item.get("role") == "user": - prompts.append(item.get("content", "")) + prompt = next((item.get(k, "") for k in ["message", "content", "value"] if item.get(k, "")), "") + prompts.append(prompt) elif isinstance(row, str): # if the row is already a prompt prompts.append(row) elif isinstance(row, dict) and row.get("role") == "user": # if the row is a template, retrieve user prompt - prompts.append(row.get("content", "")) + prompt = next((item.get(k, "") for k in ["message", "content", "value"] if item.get(k, "")), "") + prompts.append(prompt) else: logger.error(f"Unrecognized row format: {row}") return [p for p in prompts if p] # Remove empty prompts @@ -70,7 +72,7 @@ def load_datasets( return [], None # Strategy to sort the provided datasets - sort_strategy = datasets_config.get("sort", "random") + sort_strategy = datasets_config.pop("sort", "random") # List to store each Dataset datasets = [] @@ -80,21 +82,19 @@ def load_datasets( prompt_field = config.get("prompt_field") try: - ratio = float(config.get("select_ratio", 1.0)) + ratio = int(config.get("select_ratio", 1)) except ValueError: logger.error(f"Invalid 'select_ratio' for dataset '{name}', using default 1") - ratio = 1.0 + ratio = 1 if not file_name or not prompt_field: logger.error( f"Missing required 'file_name' or 'prompt_field' for dataset '{name}'" ) continue - if os.path.isfile(file_name): - file_path = os.path.abspath(file_name) - else: - file_path = os.path.join(DEFAULT_DATASET_FOLDER, file_name) - + + file_path = os.path.abspath(file_name) if os.path.exists(file_name) else os.path.join(DEFAULT_DATASET_FOLDER, file_name) + # Load dataset from local files if os.path.exists(file_path): prompts = [] @@ -102,7 +102,7 @@ def load_datasets( if file_name.endswith(".csv"): data = pd.read_csv(file_path) - if prompt_field not in set(data.column_names): + if prompt_field not in set(data.columns): logger.error(f"Field '{prompt_field}' not found in '{file_path}'.") continue prompts = data[prompt_field].dropna().astype(str).tolist() @@ -119,23 +119,29 @@ def load_datasets( else: logger.error(f"Unsupported file format for '{file_name}'") continue - else: # Load HF datasets - # data = load_dataset("lmsys/lmsys-chat-1m") - data = load_dataset(file_name)["train"] - if prompt_field not in data.column_names: - logger.error(f"'{prompt_field}' not found in dataset '{file_name}'") - continue - - prompts = [] - for row in data[prompt_field]: - prompts.extend(normalize_prompts(row)) - + else: + try: + if file_name.endswith(".csv"): # CSV format + data = pd.read_csv(file_name) + + if prompt_field not in set(data.columns): + logger.error(f"Field '{prompt_field}' not found in '{file_name}'.") + continue + prompts = data[prompt_field].dropna().astype(str).tolist() + else: # use datasets to load + data = load_dataset(file_name)["train"] + prompts = [] + for row in data[prompt_field]: + prompts.extend(normalize_prompts(row)) + except Exception as e: + logger.error(f"Failed to load '{file_name}': {e}") + # Add the dataset information (file name, a list of prompts, select ratio among all datasets, total number of prompts) dataset_obj = Dataset(file_name, prompts, ratio, len(prompts)) datasets.append(dataset_obj) logger.info( - f"loaded {file_path} with {len(prompts)} prompts, selection ratio = {ratio}" + f"loaded {file_name} with {len(prompts)} prompts, selection ratio = {ratio}" ) return datasets, sort_strategy diff --git a/tracestorm/request_generator.py b/tracestorm/request_generator.py index d858410..b54b82d 100644 --- a/tracestorm/request_generator.py +++ b/tracestorm/request_generator.py @@ -1,18 +1,74 @@ from typing import Any, Dict, List +import random from tracestorm.constants import DEFAULT_MESSAGES +from tracestorm.logger import init_logger +from tracestorm.data_loader import Dataset +logger = init_logger(__name__) def generate_request( - model_name: str, nums: int, messages: str = DEFAULT_MESSAGES + model_name: str, nums: int, messages: str = DEFAULT_MESSAGES, + datasets: List[Dataset] = [], sort: str = "random", seed: int = None ) -> List[Dict[str, Any]]: - requests = [] - for _ in range(nums): - requests.append( + + # generate default requests without datasets + if not datasets: + for _ in range(nums): + return [ + { + "model": model_name, + "messages": [{"role": "user", "content": messages}], + "stream": True, + } + for _ in range(nums) + ] + else: # Add and sort requests from the provided datasets + dataset_samples = [] + + # Total ratio to calculate number of requests for each dataset + total_ratio = sum(dataset_obj.select_ratio for dataset_obj in datasets) + + for dataset_obj in datasets: + num_requests = int(round(nums * dataset_obj.select_ratio / total_ratio)) + + # We don't have enough available prompts, repeat the dataset + available_prompts = dataset_obj.length + prompts = dataset_obj.prompts + if num_requests > available_prompts: + repeat_count = num_requests // available_prompts + prompts.extend(prompts * repeat_count) + + assert len(prompts) >= num_requests + + # Store prompts with indexing for round-robin + # For example, if ratio of dataset1 is 5, we will append 5 requests for each idx + for i, sample in enumerate(prompts[:num_requests]): + idx = i // dataset_obj.select_ratio + dataset_samples.append((idx, sample)) + + logger.info(f"Selected {num_requests} requests from {dataset_obj.file_name}.") + + # 1. Randomly sort the requests + if sort == "random": + if seed is not None: + random.seed(seed) + random.shuffle(dataset_samples) + elif sort == "original": # 2. original order + dataset_samples.sort(key=lambda x: x[0]) + else: + raise ValueError(f"Unknown sort strategy: {sort}") + + # Extract the prompts from the list + requests = [ { "model": model_name, - "messages": [{"role": "user", "content": messages}], + "messages": [{"role": "user", "content": prompt}], "stream": True, } - ) + for _, prompt in dataset_samples + ] + + return requests + From 8aef00b62224459f087e238241acd00e9862a549 Mon Sep 17 00:00:00 2001 From: XuehengWang Date: Sat, 15 Feb 2025 23:43:05 +0000 Subject: [PATCH 04/19] add test for data_loader --- tests/test_data_loader.py | 45 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 tests/test_data_loader.py diff --git a/tests/test_data_loader.py b/tests/test_data_loader.py new file mode 100644 index 0000000..fee0a91 --- /dev/null +++ b/tests/test_data_loader.py @@ -0,0 +1,45 @@ +import unittest + +from tracestorm.data_loader import ( + load_datasets, + Dataset +) + +class TestDataLoader(unittest.TestCase): + def test_local_files(self): + """Test loading from local files""" + datasets, sort = load_datasets("../tracestorm/datasets_config/datasets_config_local.json") + assert isinstance(datasets, list) + assert isinstance(datasets[0], Dataset) and isinstance(datasets[1], Dataset) + assert sort == "random" + assert len(datasets) == 2 + assert datasets[0].select_ratio == 6 and datasets[1].select_ratio == 4 + assert datasets[0].length > 0 and datasets[1].length > 0 + + # separate + def test_remote_files(self): + """ + Test loading datasets from hugging face. + There are 2 datasets, testing for: + 1. loading with datasets.load_dataset + 2. loading csv format with pandas + """ + datasets, sort = load_datasets("../tracestorm/datasets_config/datasets_config_hf.json") + assert isinstance(datasets, list) + assert isinstance(datasets[0], Dataset) and isinstance(datasets[1], Dataset) + assert sort == "original" + assert len(datasets) == 2 + assert datasets[0].select_ratio == 2 and datasets[1].select_ratio == 8 + assert datasets[0].length > 0 and datasets[1].length > 0 + + def test_missing_fields(self): + """Test loading with missing sort strategy and selection ratio in the config file""" + datasets, sort = load_datasets("../tracestorm/datasets_config/datasets_config_missing.json") + assert isinstance(datasets, list) and len(datasets) == 2 + assert sort == "random" + assert datasets[0].select_ratio == 1 + assert datasets[1].select_ratio == 1 + assert datasets[0].length > 0 and datasets[1].length > 0 + +if __name__ == "__main__": + unittest.main() From 3d225c1e5ed41ef2930783ffdab7fedbb27e3e0c Mon Sep 17 00:00:00 2001 From: XuehengWang Date: Sat, 15 Feb 2025 23:43:54 +0000 Subject: [PATCH 05/19] add examples of datasets_config_files --- tracestorm/datasets_config/datasets_config_hf.json | 13 +++++++++++++ .../datasets_config/datasets_config_local.json | 13 +++++++++++++ .../datasets_config/datasets_config_missing.json | 10 ++++++++++ 3 files changed, 36 insertions(+) create mode 100644 tracestorm/datasets_config/datasets_config_hf.json create mode 100644 tracestorm/datasets_config/datasets_config_local.json create mode 100644 tracestorm/datasets_config/datasets_config_missing.json diff --git a/tracestorm/datasets_config/datasets_config_hf.json b/tracestorm/datasets_config/datasets_config_hf.json new file mode 100644 index 0000000..ee59698 --- /dev/null +++ b/tracestorm/datasets_config/datasets_config_hf.json @@ -0,0 +1,13 @@ +{ + "sort": "original", + "dataset_1": { + "file_name": "hf://datasets/fka/awesome-chatgpt-prompts/prompts.csv", + "prompt_field": "prompt", + "select_ratio": 2 + }, + "dataset_2": { + "file_name": "lmsys/chatbot_arena_conversations", + "prompt_field": "conversation_a", + "select_ratio": 8 + } +} \ No newline at end of file diff --git a/tracestorm/datasets_config/datasets_config_local.json b/tracestorm/datasets_config/datasets_config_local.json new file mode 100644 index 0000000..eaf4007 --- /dev/null +++ b/tracestorm/datasets_config/datasets_config_local.json @@ -0,0 +1,13 @@ +{ + "sort": "random", + "dataset_1": { + "file_name": "conversation_sample.csv", + "prompt_field": "user_message", + "select_ratio": 6 + }, + "dataset_2": { + "file_name": "../datasets/GPT4_coding_sample.csv", + "prompt_field": "user_prompt", + "select_ratio": 4 + } +} \ No newline at end of file diff --git a/tracestorm/datasets_config/datasets_config_missing.json b/tracestorm/datasets_config/datasets_config_missing.json new file mode 100644 index 0000000..e5db510 --- /dev/null +++ b/tracestorm/datasets_config/datasets_config_missing.json @@ -0,0 +1,10 @@ +{ + "dataset_1": { + "file_name": "hf://datasets/fka/awesome-chatgpt-prompts/prompts.csv", + "prompt_field": "prompt" + }, + "dataset_2": { + "file_name": "../datasets/GPT4_coding_sample.csv", + "prompt_field": "user_prompt" + } +} \ No newline at end of file From 86d334762c7b3a71920721f4e5a2f63ef4a14a16 Mon Sep 17 00:00:00 2001 From: XuehengWang Date: Sat, 15 Feb 2025 23:45:27 +0000 Subject: [PATCH 06/19] format --- requirements.txt | 2 +- tests/test_data_loader.py | 29 ++++++--- tracestorm/cli.py | 29 ++++++--- tracestorm/constants.py | 2 +- tracestorm/core.py | 12 +++- tracestorm/data_loader.py | 109 +++++++++++++++++++++----------- tracestorm/request_generator.py | 46 ++++++++------ tracestorm/trace_generator.py | 6 +- 8 files changed, 151 insertions(+), 84 deletions(-) diff --git a/requirements.txt b/requirements.txt index 1fe4043..2c61bb6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ +datasets matplotlib>=3.9 numpy>=1.26.4 pandas>=2.2.3 requests>=2.31.0 seaborn>=0.13.2 -datasets \ No newline at end of file diff --git a/tests/test_data_loader.py b/tests/test_data_loader.py index fee0a91..6b820ae 100644 --- a/tests/test_data_loader.py +++ b/tests/test_data_loader.py @@ -1,16 +1,18 @@ import unittest -from tracestorm.data_loader import ( - load_datasets, - Dataset -) +from tracestorm.data_loader import Dataset, load_datasets + class TestDataLoader(unittest.TestCase): def test_local_files(self): """Test loading from local files""" - datasets, sort = load_datasets("../tracestorm/datasets_config/datasets_config_local.json") + datasets, sort = load_datasets( + "../tracestorm/datasets_config/datasets_config_local.json" + ) assert isinstance(datasets, list) - assert isinstance(datasets[0], Dataset) and isinstance(datasets[1], Dataset) + assert isinstance(datasets[0], Dataset) and isinstance( + datasets[1], Dataset + ) assert sort == "random" assert len(datasets) == 2 assert datasets[0].select_ratio == 6 and datasets[1].select_ratio == 4 @@ -19,14 +21,18 @@ def test_local_files(self): # separate def test_remote_files(self): """ - Test loading datasets from hugging face. + Test loading datasets from hugging face. There are 2 datasets, testing for: 1. loading with datasets.load_dataset 2. loading csv format with pandas """ - datasets, sort = load_datasets("../tracestorm/datasets_config/datasets_config_hf.json") + datasets, sort = load_datasets( + "../tracestorm/datasets_config/datasets_config_hf.json" + ) assert isinstance(datasets, list) - assert isinstance(datasets[0], Dataset) and isinstance(datasets[1], Dataset) + assert isinstance(datasets[0], Dataset) and isinstance( + datasets[1], Dataset + ) assert sort == "original" assert len(datasets) == 2 assert datasets[0].select_ratio == 2 and datasets[1].select_ratio == 8 @@ -34,12 +40,15 @@ def test_remote_files(self): def test_missing_fields(self): """Test loading with missing sort strategy and selection ratio in the config file""" - datasets, sort = load_datasets("../tracestorm/datasets_config/datasets_config_missing.json") + datasets, sort = load_datasets( + "../tracestorm/datasets_config/datasets_config_missing.json" + ) assert isinstance(datasets, list) and len(datasets) == 2 assert sort == "random" assert datasets[0].select_ratio == 1 assert datasets[1].select_ratio == 1 assert datasets[0].length > 0 and datasets[1].length > 0 + if __name__ == "__main__": unittest.main() diff --git a/tracestorm/cli.py b/tracestorm/cli.py index 49a1c5e..63a8dd8 100644 --- a/tracestorm/cli.py +++ b/tracestorm/cli.py @@ -4,13 +4,13 @@ import click from tracestorm.core import run_load_test +from tracestorm.data_loader import load_datasets from tracestorm.logger import init_logger from tracestorm.trace_generator import ( AzureTraceGenerator, SyntheticTraceGenerator, TraceGenerator, ) -from tracestorm.data_loader import load_datasets logger = init_logger(__name__) @@ -52,7 +52,9 @@ def create_trace_generator( raise ValueError( "Duration must be non-negative for synthetic patterns" ) - return SyntheticTraceGenerator(rps, pattern, duration, seed), warning_msg + return SyntheticTraceGenerator( + rps, pattern, duration, seed + ), warning_msg # Azure patterns if rps != 1: @@ -108,12 +110,19 @@ def create_trace_generator( help="OpenAI API Key", ) @click.option( - "--datasets-config-file", - default=None, - help="Config file for datasets" + "--datasets-config-file", default=None, help="Config file for datasets" ) - -def main(model, rps, pattern, duration, seed, subprocesses, base_url, api_key, datasets_config_file): +def main( + model, + rps, + pattern, + duration, + seed, + subprocesses, + base_url, + api_key, + datasets_config_file, +): """Run trace-based load testing for OpenAI API endpoints.""" try: trace_generator, warning_msg = create_trace_generator( @@ -125,9 +134,9 @@ def main(model, rps, pattern, duration, seed, subprocesses, base_url, api_key, d if datasets_config_file is None: datasets = [] sort = None - else: + else: datasets, sort = load_datasets(datasets_config_file) - + _, result_analyzer = run_load_test( trace_generator=trace_generator, model=model, @@ -136,7 +145,7 @@ def main(model, rps, pattern, duration, seed, subprocesses, base_url, api_key, d api_key=api_key, datasets=datasets, sort=sort, - seed=seed + seed=seed, ) print(result_analyzer) diff --git a/tracestorm/constants.py b/tracestorm/constants.py index 6f5e076..a7d66e0 100644 --- a/tracestorm/constants.py +++ b/tracestorm/constants.py @@ -12,4 +12,4 @@ DEFAULT_MESSAGES = "Tell me a story" -DEFAULT_DATASET_FOLDER = "../datasets/" \ No newline at end of file +DEFAULT_DATASET_FOLDER = "../datasets/" diff --git a/tracestorm/core.py b/tracestorm/core.py index b9f81a6..0889105 100644 --- a/tracestorm/core.py +++ b/tracestorm/core.py @@ -1,5 +1,5 @@ import multiprocessing -from typing import List, Tuple, Optional +from typing import List, Optional, Tuple from tracestorm.logger import init_logger from tracestorm.request_generator import generate_request @@ -19,7 +19,7 @@ def run_load_test( api_key: str, datasets: List, sort: Optional[str] = None, - seed: Optional[int] = None + seed: Optional[int] = None, ) -> Tuple[List[Tuple], ResultAnalyzer]: """ Run load test with given configuration. @@ -44,7 +44,13 @@ def run_load_test( logger.warning("No requests to process. Trace is empty.") return [], ResultAnalyzer() - requests = generate_request(model_name=model, nums=total_requests, datasets=datasets, sort=sort, seed=seed) + requests = generate_request( + model_name=model, + nums=total_requests, + datasets=datasets, + sort=sort, + seed=seed, + ) ipc_queue = multiprocessing.Queue() processes = [] diff --git a/tracestorm/data_loader.py b/tracestorm/data_loader.py index 7f48721..655fa96 100644 --- a/tracestorm/data_loader.py +++ b/tracestorm/data_loader.py @@ -1,10 +1,11 @@ -import os import json +import os +from dataclasses import dataclass from typing import List, Optional, Tuple + import pandas as pd -from dataclasses import dataclass -from datasets import load_dataset +from datasets import load_dataset from tracestorm.constants import DEFAULT_DATASET_FOLDER from tracestorm.logger import init_logger @@ -14,48 +15,66 @@ @dataclass class Dataset: """ - Each Dataset object contains name of the dataset, a list of prompts, - the select ratio among all datasets, and the total number of prompts + Each Dataset object contains name of the dataset, a list of prompts, + the select ratio among all datasets, and the total number of prompts """ + file_name: str prompts: List[str] select_ratio: int length: int - + def normalize_prompts(row) -> List[str]: """ Convert one row to a list of prompts based on the format. """ prompts = [] - if isinstance(row, list): # if the row contains a list of prompts + if isinstance(row, list): # if the row contains a list of prompts for item in row: if isinstance(item, str): prompts.append(item) elif isinstance(item, dict) and item.get("role") == "user": - prompt = next((item.get(k, "") for k in ["message", "content", "value"] if item.get(k, "")), "") + prompt = next( + ( + item.get(k, "") + for k in ["message", "content", "value"] + if item.get(k, "") + ), + "", + ) prompts.append(prompt) - elif isinstance(row, str): # if the row is already a prompt + elif isinstance(row, str): # if the row is already a prompt prompts.append(row) - elif isinstance(row, dict) and row.get("role") == "user": # if the row is a template, retrieve user prompt - prompt = next((item.get(k, "") for k in ["message", "content", "value"] if item.get(k, "")), "") + elif ( + isinstance(row, dict) and row.get("role") == "user" + ): # if the row is a template, retrieve user prompt + prompt = next( + ( + item.get(k, "") + for k in ["message", "content", "value"] + if item.get(k, "") + ), + "", + ) prompts.append(prompt) else: logger.error(f"Unrecognized row format: {row}") - return [p for p in prompts if p] # Remove empty prompts - + return [p for p in prompts if p] # Remove empty prompts + + def load_datasets( datasets_config_file: Optional[str] = None, ) -> Tuple[List[Dataset], Optional[str]]: """ Load datasets from local files or Hugging Face datasets. - + Args: - datasets_config_file: A dataset configuration file containing file paths, + datasets_config_file: A dataset configuration file containing file paths, prompt fields, selection ratios, and sorting strategies. - A customized data loading logic needs to be implemented if no + A customized data loading logic needs to be implemented if no datasets_config_file is provided. - + Return: (List[Dataset], str): A list of Dataset objects and the sorting strategy. """ @@ -65,12 +84,14 @@ def load_datasets( with open(datasets_config_file, "r") as f: datasets_config = json.load(f) except FileNotFoundError: - logger.error(f"Configuration file '{datasets_config_file}' not found") + logger.error( + f"Configuration file '{datasets_config_file}' not found" + ) return [], None except Exception as e: logger.error(f"Error reading '{datasets_config_file}': {e}") return [], None - + # Strategy to sort the provided datasets sort_strategy = datasets_config.pop("sort", "random") @@ -80,11 +101,13 @@ def load_datasets( for name, config in datasets_config.items(): file_name = config.get("file_name") prompt_field = config.get("prompt_field") - + try: ratio = int(config.get("select_ratio", 1)) except ValueError: - logger.error(f"Invalid 'select_ratio' for dataset '{name}', using default 1") + logger.error( + f"Invalid 'select_ratio' for dataset '{name}', using default 1" + ) ratio = 1 if not file_name or not prompt_field: @@ -92,9 +115,13 @@ def load_datasets( f"Missing required 'file_name' or 'prompt_field' for dataset '{name}'" ) continue - - file_path = os.path.abspath(file_name) if os.path.exists(file_name) else os.path.join(DEFAULT_DATASET_FOLDER, file_name) - + + file_path = ( + os.path.abspath(file_name) + if os.path.exists(file_name) + else os.path.join(DEFAULT_DATASET_FOLDER, file_name) + ) + # Load dataset from local files if os.path.exists(file_path): prompts = [] @@ -103,49 +130,57 @@ def load_datasets( data = pd.read_csv(file_path) if prompt_field not in set(data.columns): - logger.error(f"Field '{prompt_field}' not found in '{file_path}'.") + logger.error( + f"Field '{prompt_field}' not found in '{file_path}'." + ) continue prompts = data[prompt_field].dropna().astype(str).tolist() # JSON files elif file_name.endswith(".json"): with open(file_path, "r") as f: data = json.load(f) - + if isinstance(data, dict): prompts = data.get(prompt_field, []) if not isinstance(prompts, list): - logger.error(f"Field '{prompt_field}' in '{file_path}' is not a list.") + logger.error( + f"Field '{prompt_field}' in '{file_path}' is not a list." + ) continue else: logger.error(f"Unsupported file format for '{file_name}'") - continue + continue else: try: - if file_name.endswith(".csv"): # CSV format + if file_name.endswith(".csv"): # CSV format data = pd.read_csv(file_name) - + if prompt_field not in set(data.columns): - logger.error(f"Field '{prompt_field}' not found in '{file_name}'.") + logger.error( + f"Field '{prompt_field}' not found in '{file_name}'." + ) continue - prompts = data[prompt_field].dropna().astype(str).tolist() - else: # use datasets to load + prompts = ( + data[prompt_field].dropna().astype(str).tolist() + ) + else: # use datasets to load data = load_dataset(file_name)["train"] prompts = [] for row in data[prompt_field]: prompts.extend(normalize_prompts(row)) except Exception as e: logger.error(f"Failed to load '{file_name}': {e}") - + # Add the dataset information (file name, a list of prompts, select ratio among all datasets, total number of prompts) dataset_obj = Dataset(file_name, prompts, ratio, len(prompts)) datasets.append(dataset_obj) - + logger.info( f"loaded {file_name} with {len(prompts)} prompts, selection ratio = {ratio}" ) - + return datasets, sort_strategy - + else: logger.error("Customized data loading logic needs to be implemented!") return [], None diff --git a/tracestorm/request_generator.py b/tracestorm/request_generator.py index b54b82d..36b85a8 100644 --- a/tracestorm/request_generator.py +++ b/tracestorm/request_generator.py @@ -1,17 +1,21 @@ -from typing import Any, Dict, List import random +from typing import Any, Dict, List from tracestorm.constants import DEFAULT_MESSAGES -from tracestorm.logger import init_logger from tracestorm.data_loader import Dataset +from tracestorm.logger import init_logger logger = init_logger(__name__) + def generate_request( - model_name: str, nums: int, messages: str = DEFAULT_MESSAGES, - datasets: List[Dataset] = [], sort: str = "random", seed: int = None + model_name: str, + nums: int, + messages: str = DEFAULT_MESSAGES, + datasets: List[Dataset] = [], + sort: str = "random", + seed: int = None, ) -> List[Dict[str, Any]]: - # generate default requests without datasets if not datasets: for _ in range(nums): @@ -23,42 +27,46 @@ def generate_request( } for _ in range(nums) ] - else: # Add and sort requests from the provided datasets + else: # Add and sort requests from the provided datasets dataset_samples = [] - + # Total ratio to calculate number of requests for each dataset total_ratio = sum(dataset_obj.select_ratio for dataset_obj in datasets) - + for dataset_obj in datasets: - num_requests = int(round(nums * dataset_obj.select_ratio / total_ratio)) - + num_requests = int( + round(nums * dataset_obj.select_ratio / total_ratio) + ) + # We don't have enough available prompts, repeat the dataset available_prompts = dataset_obj.length prompts = dataset_obj.prompts if num_requests > available_prompts: repeat_count = num_requests // available_prompts prompts.extend(prompts * repeat_count) - + assert len(prompts) >= num_requests - + # Store prompts with indexing for round-robin # For example, if ratio of dataset1 is 5, we will append 5 requests for each idx for i, sample in enumerate(prompts[:num_requests]): idx = i // dataset_obj.select_ratio dataset_samples.append((idx, sample)) - - logger.info(f"Selected {num_requests} requests from {dataset_obj.file_name}.") + + logger.info( + f"Selected {num_requests} requests from {dataset_obj.file_name}." + ) # 1. Randomly sort the requests if sort == "random": if seed is not None: random.seed(seed) random.shuffle(dataset_samples) - elif sort == "original": # 2. original order + elif sort == "original": # 2. original order dataset_samples.sort(key=lambda x: x[0]) - else: + else: raise ValueError(f"Unknown sort strategy: {sort}") - + # Extract the prompts from the list requests = [ { @@ -68,7 +76,5 @@ def generate_request( } for _, prompt in dataset_samples ] - - + return requests - diff --git a/tracestorm/trace_generator.py b/tracestorm/trace_generator.py index 24767f1..36b98b5 100644 --- a/tracestorm/trace_generator.py +++ b/tracestorm/trace_generator.py @@ -3,9 +3,9 @@ from abc import ABC, abstractmethod from typing import List, Optional +import numpy as np import pandas as pd import requests -import numpy as np from tracestorm.constants import AZURE_DATASET_PATHS, AZURE_REPO_URL from tracestorm.logger import init_logger @@ -33,7 +33,9 @@ def generate(self) -> List[int]: class SyntheticTraceGenerator(TraceGenerator): """Generate synthetic traces based on patterns.""" - def __init__(self, rps: int, pattern: str, duration: int, seed: Optional[int] = None): + def __init__( + self, rps: int, pattern: str, duration: int, seed: Optional[int] = None + ): """ Initialize synthetic trace generator. From 1e81bdeeaa8a6c46d64bd3fb5237450a63e58daa Mon Sep 17 00:00:00 2001 From: XuehengWang Date: Sun, 16 Feb 2025 10:20:48 +0000 Subject: [PATCH 07/19] remove test for local datasets --- tests/test_data_loader.py | 23 ++++++++++--------- .../datasets_config_missing.json | 2 +- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/tests/test_data_loader.py b/tests/test_data_loader.py index 6b820ae..50e0169 100644 --- a/tests/test_data_loader.py +++ b/tests/test_data_loader.py @@ -6,17 +6,18 @@ class TestDataLoader(unittest.TestCase): def test_local_files(self): """Test loading from local files""" - datasets, sort = load_datasets( - "../tracestorm/datasets_config/datasets_config_local.json" - ) - assert isinstance(datasets, list) - assert isinstance(datasets[0], Dataset) and isinstance( - datasets[1], Dataset - ) - assert sort == "random" - assert len(datasets) == 2 - assert datasets[0].select_ratio == 6 and datasets[1].select_ratio == 4 - assert datasets[0].length > 0 and datasets[1].length > 0 + # datasets, sort = load_datasets( + # "../tracestorm/datasets_config/datasets_config_local.json" + # ) + # assert isinstance(datasets, list) + # assert isinstance(datasets[0], Dataset) and isinstance( + # datasets[1], Dataset + # ) + # assert sort == "random" + # assert len(datasets) == 2 + # assert datasets[0].select_ratio == 6 and datasets[1].select_ratio == 4 + # assert datasets[0].length > 0 and datasets[1].length > 0 + pass # separate def test_remote_files(self): diff --git a/tracestorm/datasets_config/datasets_config_missing.json b/tracestorm/datasets_config/datasets_config_missing.json index e5db510..d6a0944 100644 --- a/tracestorm/datasets_config/datasets_config_missing.json +++ b/tracestorm/datasets_config/datasets_config_missing.json @@ -4,7 +4,7 @@ "prompt_field": "prompt" }, "dataset_2": { - "file_name": "../datasets/GPT4_coding_sample.csv", + "file_name": "lmsys/chatbot_arena_conversations", "prompt_field": "user_prompt" } } \ No newline at end of file From d7b96ced832104df3bc6861c66f2d20f77bc23a3 Mon Sep 17 00:00:00 2001 From: XuehengWang Date: Sun, 16 Feb 2025 10:33:36 +0000 Subject: [PATCH 08/19] fix errors in tests --- tests/test_cli.py | 2 +- tests/test_data_loader.py | 4 ++-- tracestorm/cli.py | 4 ++-- tracestorm/datasets_config/datasets_config_missing.json | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index e435630..8ed12a2 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -81,7 +81,7 @@ def test_cli_invalid_pattern(self): ) self.assertNotEqual(result.exit_code, 0) - self.assertIn("Invalid pattern", result.output) + self.assertIn("Invalid value for '--pattern'", result.output) if __name__ == "__main__": diff --git a/tests/test_data_loader.py b/tests/test_data_loader.py index 50e0169..a0decac 100644 --- a/tests/test_data_loader.py +++ b/tests/test_data_loader.py @@ -28,7 +28,7 @@ def test_remote_files(self): 2. loading csv format with pandas """ datasets, sort = load_datasets( - "../tracestorm/datasets_config/datasets_config_hf.json" + "tracestorm/datasets_config/datasets_config_hf.json" ) assert isinstance(datasets, list) assert isinstance(datasets[0], Dataset) and isinstance( @@ -42,7 +42,7 @@ def test_remote_files(self): def test_missing_fields(self): """Test loading with missing sort strategy and selection ratio in the config file""" datasets, sort = load_datasets( - "../tracestorm/datasets_config/datasets_config_missing.json" + "tracestorm/datasets_config/datasets_config_missing.json" ) assert isinstance(datasets, list) and len(datasets) == 2 assert sort == "random" diff --git a/tracestorm/cli.py b/tracestorm/cli.py index 63a8dd8..4d79986 100644 --- a/tracestorm/cli.py +++ b/tracestorm/cli.py @@ -1,5 +1,5 @@ import os -from typing import Tuple +from typing import Tuple, Optional import click @@ -21,7 +21,7 @@ def create_trace_generator( - pattern: str, rps: int, duration: int, seed: int + pattern: str, rps: int, duration: int, seed: Optional[int] = None ) -> Tuple[TraceGenerator, str]: """ Create appropriate trace generator based on pattern and validate parameters. diff --git a/tracestorm/datasets_config/datasets_config_missing.json b/tracestorm/datasets_config/datasets_config_missing.json index d6a0944..0ae0324 100644 --- a/tracestorm/datasets_config/datasets_config_missing.json +++ b/tracestorm/datasets_config/datasets_config_missing.json @@ -5,6 +5,6 @@ }, "dataset_2": { "file_name": "lmsys/chatbot_arena_conversations", - "prompt_field": "user_prompt" + "prompt_field": "conversation_a" } } \ No newline at end of file From 2e5004119b11346cc76295f8365799dad560d7a2 Mon Sep 17 00:00:00 2001 From: XuehengWang Date: Thu, 20 Feb 2025 18:25:29 +0000 Subject: [PATCH 09/19] fix data loader --- tracestorm/cli.py | 6 +- tracestorm/core.py | 6 +- tracestorm/data_loader.py | 187 ++++++++++++++++---------------- tracestorm/request_generator.py | 8 +- 4 files changed, 106 insertions(+), 101 deletions(-) diff --git a/tracestorm/cli.py b/tracestorm/cli.py index 4d79986..e047dc6 100644 --- a/tracestorm/cli.py +++ b/tracestorm/cli.py @@ -133,9 +133,9 @@ def main( if datasets_config_file is None: datasets = [] - sort = None + sort_strategy = None else: - datasets, sort = load_datasets(datasets_config_file) + datasets, sort_strategy = load_datasets(datasets_config_file) _, result_analyzer = run_load_test( trace_generator=trace_generator, @@ -144,7 +144,7 @@ def main( base_url=base_url, api_key=api_key, datasets=datasets, - sort=sort, + sort_strategy=sort_strategy, seed=seed, ) diff --git a/tracestorm/core.py b/tracestorm/core.py index 0889105..81ebef7 100644 --- a/tracestorm/core.py +++ b/tracestorm/core.py @@ -18,7 +18,7 @@ def run_load_test( base_url: str, api_key: str, datasets: List, - sort: Optional[str] = None, + sort_strategy: Optional[str] = None, seed: Optional[int] = None, ) -> Tuple[List[Tuple], ResultAnalyzer]: """ @@ -31,7 +31,7 @@ def run_load_test( base_url: Base URL for API calls api_key: API key for authentication datasets: List of datasets to generate prompts - sort: Sorting strategy for prompts in datasets. + sort_strategy: Sorting strategy for prompts in datasets. seed: Random seed for sorting. Returns: @@ -48,7 +48,7 @@ def run_load_test( model_name=model, nums=total_requests, datasets=datasets, - sort=sort, + sort_strategy=sort_strategy, seed=seed, ) ipc_queue = multiprocessing.Queue() diff --git a/tracestorm/data_loader.py b/tracestorm/data_loader.py index 655fa96..f507e04 100644 --- a/tracestorm/data_loader.py +++ b/tracestorm/data_loader.py @@ -44,6 +44,8 @@ def normalize_prompts(row) -> List[str]: "", ) prompts.append(prompt) + else: # we cannot handle this type + continue elif isinstance(row, str): # if the row is already a prompt prompts.append(row) elif ( @@ -78,109 +80,112 @@ def load_datasets( Return: (List[Dataset], str): A list of Dataset objects and the sorting strategy. """ + if datasets_config_file is None: + logger.error("Customized data loading logic needs to be implemented!") + return [], None + # Load datasets configuration file - if datasets_config_file: - try: - with open(datasets_config_file, "r") as f: - datasets_config = json.load(f) - except FileNotFoundError: - logger.error( - f"Configuration file '{datasets_config_file}' not found" - ) - return [], None - except Exception as e: - logger.error(f"Error reading '{datasets_config_file}': {e}") - return [], None - - # Strategy to sort the provided datasets - sort_strategy = datasets_config.pop("sort", "random") - - # List to store each Dataset - datasets = [] + try: + with open(datasets_config_file, "r") as f: + datasets_config = json.load(f) + except FileNotFoundError: + logger.error( + f"Configuration file '{datasets_config_file}' not found" + ) + return [], None + except Exception as e: + logger.error(f"Error reading '{datasets_config_file}': {e}") + return [], None - for name, config in datasets_config.items(): - file_name = config.get("file_name") - prompt_field = config.get("prompt_field") + # Strategy to sort the provided datasets + sort_strategy = datasets_config.pop("sort_strategy", "random") - try: - ratio = int(config.get("select_ratio", 1)) - except ValueError: - logger.error( - f"Invalid 'select_ratio' for dataset '{name}', using default 1" - ) - ratio = 1 + # List to store each Dataset + datasets = [] - if not file_name or not prompt_field: - logger.error( - f"Missing required 'file_name' or 'prompt_field' for dataset '{name}'" - ) - continue + for name, config in datasets_config.items(): + file_name = config.get("file_name") + prompt_field = config.get("prompt_field") - file_path = ( - os.path.abspath(file_name) - if os.path.exists(file_name) - else os.path.join(DEFAULT_DATASET_FOLDER, file_name) + try: + ratio = int(config.get("select_ratio", 1)) + except ValueError: + logger.error( + f"Invalid 'select_ratio' for dataset '{name}', using default 1" ) + ratio = 1 - # Load dataset from local files - if os.path.exists(file_path): - prompts = [] - # CSV files - if file_name.endswith(".csv"): - data = pd.read_csv(file_path) + if not file_name or not prompt_field: + logger.error( + f"Missing required 'file_name' or 'prompt_field' for dataset '{name}'" + ) + continue + + os.makedirs(DEFAULT_DATASET_FOLDER, exist_ok=True) + + file_path = ( + os.path.abspath(file_name) + if os.path.exists(file_name) + else os.path.join(DEFAULT_DATASET_FOLDER, file_name) + ) - if prompt_field not in set(data.columns): + # Load dataset from local files + if os.path.exists(file_path): + prompts = [] + # CSV files + if file_name.endswith(".csv"): + data = pd.read_csv(file_path) + + if prompt_field not in set(data.columns): + logger.error( + f"Field '{prompt_field}' not found in '{file_path}'." + ) + continue + prompts = data[prompt_field].dropna().astype(str).tolist() + # JSON files + elif file_name.endswith(".json"): + with open(file_path, "r") as f: + data = json.load(f) + + if isinstance(data, dict): + prompts = data.get(prompt_field, []) + if not isinstance(prompts, list): logger.error( - f"Field '{prompt_field}' not found in '{file_path}'." + f"Field '{prompt_field}' in '{file_path}' is not a list." ) continue - prompts = data[prompt_field].dropna().astype(str).tolist() - # JSON files - elif file_name.endswith(".json"): - with open(file_path, "r") as f: - data = json.load(f) - - if isinstance(data, dict): - prompts = data.get(prompt_field, []) - if not isinstance(prompts, list): - logger.error( - f"Field '{prompt_field}' in '{file_path}' is not a list." - ) - continue - else: - logger.error(f"Unsupported file format for '{file_name}'") - continue else: - try: - if file_name.endswith(".csv"): # CSV format - data = pd.read_csv(file_name) - - if prompt_field not in set(data.columns): - logger.error( - f"Field '{prompt_field}' not found in '{file_name}'." - ) - continue - prompts = ( - data[prompt_field].dropna().astype(str).tolist() + logger.error(f"Unsupported file format for '{file_name}'") + continue + else: + try: + if file_name.endswith(".csv"): # CSV format + data = pd.read_csv(file_name) + + if prompt_field not in set(data.columns): + logger.error( + f"Field '{prompt_field}' not found in '{file_name}'." ) - else: # use datasets to load - data = load_dataset(file_name)["train"] - prompts = [] - for row in data[prompt_field]: - prompts.extend(normalize_prompts(row)) - except Exception as e: - logger.error(f"Failed to load '{file_name}': {e}") - - # Add the dataset information (file name, a list of prompts, select ratio among all datasets, total number of prompts) - dataset_obj = Dataset(file_name, prompts, ratio, len(prompts)) - datasets.append(dataset_obj) - - logger.info( - f"loaded {file_name} with {len(prompts)} prompts, selection ratio = {ratio}" - ) + continue + prompts = ( + data[prompt_field].dropna().astype(str).tolist() + ) + else: # use datasets to load + data = load_dataset(file_name)["train"] + prompts = [] + for row in data[prompt_field]: + prompts.extend(normalize_prompts(row)) + except Exception as e: + logger.error(f"Failed to load '{file_name}': {e}") + + # Add the dataset information (file name, a list of prompts, select ratio among all datasets, total number of prompts) + dataset_obj = Dataset(file_name, prompts, ratio, len(prompts)) + datasets.append(dataset_obj) + + logger.info( + f"loaded {file_name} with {len(prompts)} prompts, selection ratio = {ratio}" + ) - return datasets, sort_strategy + return datasets, sort_strategy - else: - logger.error("Customized data loading logic needs to be implemented!") - return [], None + diff --git a/tracestorm/request_generator.py b/tracestorm/request_generator.py index 36b85a8..4ed4f9c 100644 --- a/tracestorm/request_generator.py +++ b/tracestorm/request_generator.py @@ -13,7 +13,7 @@ def generate_request( nums: int, messages: str = DEFAULT_MESSAGES, datasets: List[Dataset] = [], - sort: str = "random", + sort_strategy: str = "random", seed: int = None, ) -> List[Dict[str, Any]]: # generate default requests without datasets @@ -58,14 +58,14 @@ def generate_request( ) # 1. Randomly sort the requests - if sort == "random": + if sort_strategy == "random": if seed is not None: random.seed(seed) random.shuffle(dataset_samples) - elif sort == "original": # 2. original order + elif sort_strategy == "original": # 2. original order dataset_samples.sort(key=lambda x: x[0]) else: - raise ValueError(f"Unknown sort strategy: {sort}") + raise ValueError(f"Unknown sorting strategy: {sort_strategy}") # Extract the prompts from the list requests = [ From bc52f1628b7fb68fa84d6288806106c7776ce382 Mon Sep 17 00:00:00 2001 From: XuehengWang Date: Thu, 20 Feb 2025 18:25:53 +0000 Subject: [PATCH 10/19] change default folder --- tracestorm/constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tracestorm/constants.py b/tracestorm/constants.py index a7d66e0..d551a25 100644 --- a/tracestorm/constants.py +++ b/tracestorm/constants.py @@ -12,4 +12,4 @@ DEFAULT_MESSAGES = "Tell me a story" -DEFAULT_DATASET_FOLDER = "../datasets/" +DEFAULT_DATASET_FOLDER = "~/.cache/tracestorm/" From 83fe0f10fbd2ccf65b9cc001c3ebead0bc60ef59 Mon Sep 17 00:00:00 2001 From: XuehengWang Date: Thu, 20 Feb 2025 18:29:03 +0000 Subject: [PATCH 11/19] change config files --- examples/datasets_config_default.json | 13 +++++++++++++ .../datasets_config_hf.json | 2 +- .../datasets_config_local.json | 4 ++-- pyproject.toml | 5 ++++- .../datasets_config/datasets_config_missing.json | 10 ---------- 5 files changed, 20 insertions(+), 14 deletions(-) create mode 100644 examples/datasets_config_default.json rename {tracestorm/datasets_config => examples}/datasets_config_hf.json (90%) rename {tracestorm/datasets_config => examples}/datasets_config_local.json (70%) delete mode 100644 tracestorm/datasets_config/datasets_config_missing.json diff --git a/examples/datasets_config_default.json b/examples/datasets_config_default.json new file mode 100644 index 0000000..b941dd4 --- /dev/null +++ b/examples/datasets_config_default.json @@ -0,0 +1,13 @@ +{ + "sort_strategy": "random", + "dataset_1": { + "file_name": "", + "prompt_field": "", + "select_ratio": 1 + }, + "dataset_2": { + "file_name": "", + "prompt_field": "", + "select_ratio": 1 + } +} \ No newline at end of file diff --git a/tracestorm/datasets_config/datasets_config_hf.json b/examples/datasets_config_hf.json similarity index 90% rename from tracestorm/datasets_config/datasets_config_hf.json rename to examples/datasets_config_hf.json index ee59698..cbc028a 100644 --- a/tracestorm/datasets_config/datasets_config_hf.json +++ b/examples/datasets_config_hf.json @@ -1,5 +1,5 @@ { - "sort": "original", + "sort_strategy": "original", "dataset_1": { "file_name": "hf://datasets/fka/awesome-chatgpt-prompts/prompts.csv", "prompt_field": "prompt", diff --git a/tracestorm/datasets_config/datasets_config_local.json b/examples/datasets_config_local.json similarity index 70% rename from tracestorm/datasets_config/datasets_config_local.json rename to examples/datasets_config_local.json index eaf4007..30a9423 100644 --- a/tracestorm/datasets_config/datasets_config_local.json +++ b/examples/datasets_config_local.json @@ -1,12 +1,12 @@ { - "sort": "random", + "sort_strategy": "random", "dataset_1": { "file_name": "conversation_sample.csv", "prompt_field": "user_message", "select_ratio": 6 }, "dataset_2": { - "file_name": "../datasets/GPT4_coding_sample.csv", + "file_name": "~/.cache/tracestorm/GPT4_coding_sample.csv", "prompt_field": "user_prompt", "select_ratio": 4 } diff --git a/pyproject.toml b/pyproject.toml index cbc3b0c..e6edc77 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,4 +43,7 @@ ignore = ["B007"] # Loop control variable not used within loop body [tool.isort] use_parentheses = true -skip_gitignore = true \ No newline at end of file +skip_gitignore = true + +[tool.setuptools] +packages = { find = { exclude = ["examples"] } } diff --git a/tracestorm/datasets_config/datasets_config_missing.json b/tracestorm/datasets_config/datasets_config_missing.json deleted file mode 100644 index 0ae0324..0000000 --- a/tracestorm/datasets_config/datasets_config_missing.json +++ /dev/null @@ -1,10 +0,0 @@ -{ - "dataset_1": { - "file_name": "hf://datasets/fka/awesome-chatgpt-prompts/prompts.csv", - "prompt_field": "prompt" - }, - "dataset_2": { - "file_name": "lmsys/chatbot_arena_conversations", - "prompt_field": "conversation_a" - } -} \ No newline at end of file From 01bb988ec0a558b42ac571b28e15fc26ea6cabe5 Mon Sep 17 00:00:00 2001 From: XuehengWang Date: Thu, 20 Feb 2025 22:02:17 +0000 Subject: [PATCH 12/19] adjust data_loader and revise tests --- examples/datasets_config_local.json | 6 +- examples/save_test_datasets.py | 20 +++++ tests/test_data_loader.py | 54 +++++++------ tracestorm/constants.py | 4 +- tracestorm/data_loader.py | 120 +++++++++++++++------------- 5 files changed, 119 insertions(+), 85 deletions(-) create mode 100644 examples/save_test_datasets.py diff --git a/examples/datasets_config_local.json b/examples/datasets_config_local.json index 30a9423..0ca6d96 100644 --- a/examples/datasets_config_local.json +++ b/examples/datasets_config_local.json @@ -1,13 +1,13 @@ { "sort_strategy": "random", "dataset_1": { - "file_name": "conversation_sample.csv", - "prompt_field": "user_message", + "file_name": "Conversational_dataset.jsonl", + "prompt_field": "messages", "select_ratio": 6 }, "dataset_2": { "file_name": "~/.cache/tracestorm/GPT4_coding_sample.csv", - "prompt_field": "user_prompt", + "prompt_field": "user", "select_ratio": 4 } } \ No newline at end of file diff --git a/examples/save_test_datasets.py b/examples/save_test_datasets.py new file mode 100644 index 0000000..f1704fa --- /dev/null +++ b/examples/save_test_datasets.py @@ -0,0 +1,20 @@ +import pandas as pd +import os +from tracestorm.constants import DEFAULT_DATASET_FOLDER + +def prepare_test_datasets(): + df1 = pd.read_json("hf://datasets/MAsad789565/Coding_GPT4_Data/Data/GPT_4_Coding.json") + df2 = pd.read_json("hf://datasets/olathepavilion/Conversational-datasets-json/Validation.jsonl", lines=True) + + # save the pre-processed dataset to the default folder for test + os.makedirs(DEFAULT_DATASET_FOLDER, exist_ok=True) + path1 = os.path.join(DEFAULT_DATASET_FOLDER, "GPT4_coding_sample.csv") + path2 = os.path.join(DEFAULT_DATASET_FOLDER, "Conversational_dataset.jsonl") + + # test with different file formats + df1.to_csv(path1, index=False) + df2.to_json(path2, orient="records", lines=True) + + +if __name__ == "__main__": + prepare_test_datasets() diff --git a/tests/test_data_loader.py b/tests/test_data_loader.py index a0decac..f3a008d 100644 --- a/tests/test_data_loader.py +++ b/tests/test_data_loader.py @@ -1,25 +1,13 @@ import unittest +import pandas as pd +import os from tracestorm.data_loader import Dataset, load_datasets +from tracestorm.constants import DEFAULT_DATASET_FOLDER class TestDataLoader(unittest.TestCase): - def test_local_files(self): - """Test loading from local files""" - # datasets, sort = load_datasets( - # "../tracestorm/datasets_config/datasets_config_local.json" - # ) - # assert isinstance(datasets, list) - # assert isinstance(datasets[0], Dataset) and isinstance( - # datasets[1], Dataset - # ) - # assert sort == "random" - # assert len(datasets) == 2 - # assert datasets[0].select_ratio == 6 and datasets[1].select_ratio == 4 - # assert datasets[0].length > 0 and datasets[1].length > 0 - pass - - # separate + def test_remote_files(self): """ Test loading datasets from hugging face. @@ -28,7 +16,7 @@ def test_remote_files(self): 2. loading csv format with pandas """ datasets, sort = load_datasets( - "tracestorm/datasets_config/datasets_config_hf.json" + "examples/datasets_config_hf.json" ) assert isinstance(datasets, list) assert isinstance(datasets[0], Dataset) and isinstance( @@ -39,15 +27,33 @@ def test_remote_files(self): assert datasets[0].select_ratio == 2 and datasets[1].select_ratio == 8 assert datasets[0].length > 0 and datasets[1].length > 0 - def test_missing_fields(self): - """Test loading with missing sort strategy and selection ratio in the config file""" - datasets, sort = load_datasets( - "tracestorm/datasets_config/datasets_config_missing.json" + + def test_local_files(self): + """Test loading from local files""" + + os.makedirs(DEFAULT_DATASET_FOLDER, exist_ok=True) + # testing datasets + df1 = pd.read_json("hf://datasets/MAsad789565/Coding_GPT4_Data/Data/GPT_4_Coding.json") + df2 = pd.read_json("hf://datasets/olathepavilion/Conversational-datasets-json/Validation.jsonl", lines=True) + + # test with different file formats + path1 = os.path.join(DEFAULT_DATASET_FOLDER, "GPT4_coding_sample.csv") + path2 = os.path.join(DEFAULT_DATASET_FOLDER, "Conversational_dataset.jsonl") + + # save the pre-processed dataset to the default folder for test + df1.to_csv(path1, index=False) + df2.to_json(path2, orient="records", lines=True) + + datasets, sort = load_datasets(os. + "examples/datasets_config_local.json" + ) + assert isinstance(datasets, list) + assert isinstance(datasets[0], Dataset) and isinstance( + datasets[1], Dataset ) - assert isinstance(datasets, list) and len(datasets) == 2 assert sort == "random" - assert datasets[0].select_ratio == 1 - assert datasets[1].select_ratio == 1 + assert len(datasets) == 2 + assert datasets[0].select_ratio == 6 and datasets[1].select_ratio == 4 assert datasets[0].length > 0 and datasets[1].length > 0 diff --git a/tracestorm/constants.py b/tracestorm/constants.py index d551a25..1b3d453 100644 --- a/tracestorm/constants.py +++ b/tracestorm/constants.py @@ -1,3 +1,5 @@ +import os + AZURE_REPO_URL = "Azure/AzurePublicDataset" AZURE_DATASET_PATHS = { @@ -12,4 +14,4 @@ DEFAULT_MESSAGES = "Tell me a story" -DEFAULT_DATASET_FOLDER = "~/.cache/tracestorm/" +DEFAULT_DATASET_FOLDER = os.path.expanduser("~/.cache/tracestorm") diff --git a/tracestorm/data_loader.py b/tracestorm/data_loader.py index f507e04..47c9bbe 100644 --- a/tracestorm/data_loader.py +++ b/tracestorm/data_loader.py @@ -1,11 +1,12 @@ import json import os -from dataclasses import dataclass -from typing import List, Optional, Tuple - +import re import pandas as pd +from dataclasses import dataclass +from typing import List, Optional, Tuple from datasets import load_dataset + from tracestorm.constants import DEFAULT_DATASET_FOLDER from tracestorm.logger import init_logger @@ -24,7 +25,27 @@ class Dataset: select_ratio: int length: int +def is_file_type(file_name, extensions): + return any(re.search(rf"\.{ext}$", file_name, re.IGNORECASE) for ext in extensions) +def resolve_file_path(file_name: str) -> str: + """ + Resolve the file path: + - If the file exists locally (relative or absolute path), return its absolute path. + - If the file exists in DEFAULT_DATASET_FOLDER, return that path. + - If the file does not exist in either location, return file_name, assuming it is to be loaded remotely from hugging face. + """ + # os.makedirs(DEFAULT_DATASET_FOLDER, exist_ok=True) + if os.path.exists(file_name): + return os.path.abspath(file_name) + + # check if file exists in DEFAULT_DATASET_FOLDER + file_path = os.path.join(DEFAULT_DATASET_FOLDER, file_name) + if os.path.exists(file_path): + return file_path + + return file_name + def normalize_prompts(row) -> List[str]: """ Convert one row to a list of prompts based on the format. @@ -106,6 +127,7 @@ def load_datasets( for name, config in datasets_config.items(): file_name = config.get("file_name") prompt_field = config.get("prompt_field") + split = config.get("split", "train") try: ratio = int(config.get("select_ratio", 1)) @@ -121,63 +143,47 @@ def load_datasets( ) continue - os.makedirs(DEFAULT_DATASET_FOLDER, exist_ok=True) - - file_path = ( - os.path.abspath(file_name) - if os.path.exists(file_name) - else os.path.join(DEFAULT_DATASET_FOLDER, file_name) - ) - # Load dataset from local files - if os.path.exists(file_path): - prompts = [] - # CSV files - if file_name.endswith(".csv"): - data = pd.read_csv(file_path) - - if prompt_field not in set(data.columns): - logger.error( - f"Field '{prompt_field}' not found in '{file_path}'." - ) + prompts = [] + file_path = resolve_file_path(file_name) + check_field = False + try: + # If the file does not exist locally and is not of csv or json format, + # try to load it from hugging face using datasets.load_dataset() first + if not os.path.exists(file_path) and not is_file_type(file_name, ["csv", "json", "jsonl"]): + data = load_dataset(file_name)[split] + + if prompt_field not in data.column_names: + logger.error(f"Field '{prompt_field}' not found in '{file_name}'.") continue - prompts = data[prompt_field].dropna().astype(str).tolist() - # JSON files - elif file_name.endswith(".json"): - with open(file_path, "r") as f: - data = json.load(f) - - if isinstance(data, dict): - prompts = data.get(prompt_field, []) - if not isinstance(prompts, list): - logger.error( - f"Field '{prompt_field}' in '{file_path}' is not a list." - ) - continue + + check_field = True + + elif is_file_type(file_name, ["csv"]): # CSV files, could be either local or remote file + data = pd.read_csv(file_path) + + elif is_file_type(file_name, ["json", "jsonl"]): # JSON files + data = pd.read_json(file_path, lines=is_file_type(file_name, ["jsonl"])) + else: - logger.error(f"Unsupported file format for '{file_name}'") - continue - else: - try: - if file_name.endswith(".csv"): # CSV format - data = pd.read_csv(file_name) - - if prompt_field not in set(data.columns): - logger.error( - f"Field '{prompt_field}' not found in '{file_name}'." - ) - continue - prompts = ( - data[prompt_field].dropna().astype(str).tolist() - ) - else: # use datasets to load - data = load_dataset(file_name)["train"] - prompts = [] - for row in data[prompt_field]: - prompts.extend(normalize_prompts(row)) - except Exception as e: - logger.error(f"Failed to load '{file_name}': {e}") - + logger.error(f"Unsupported file format for '{file_name}'. Please implement customized loading logic.") + continue + + except Exception as e: + logger.error(f"Failed to load '{file_name}': {e}") + continue + + if not check_field and prompt_field not in set(data.columns): + logger.error( + f"Field '{prompt_field}' not found in '{file_name}'." + ) + continue + + # prompts = data[prompt_field].dropna().astype(str).tolist() + # load each row + for row in data[prompt_field]: + prompts.extend(normalize_prompts(row)) + # Add the dataset information (file name, a list of prompts, select ratio among all datasets, total number of prompts) dataset_obj = Dataset(file_name, prompts, ratio, len(prompts)) datasets.append(dataset_obj) From 0bedb99d620a222aac31b10ec8279e92e93cd733 Mon Sep 17 00:00:00 2001 From: XuehengWang Date: Thu, 20 Feb 2025 22:03:36 +0000 Subject: [PATCH 13/19] fix format --- examples/save_test_datasets.py | 14 ++++-- tests/test_data_loader.py | 34 +++++++------- tracestorm/cli.py | 2 +- tracestorm/data_loader.py | 82 +++++++++++++++++++--------------- 4 files changed, 75 insertions(+), 57 deletions(-) diff --git a/examples/save_test_datasets.py b/examples/save_test_datasets.py index f1704fa..2aee421 100644 --- a/examples/save_test_datasets.py +++ b/examples/save_test_datasets.py @@ -1,10 +1,18 @@ -import pandas as pd import os + +import pandas as pd + from tracestorm.constants import DEFAULT_DATASET_FOLDER + def prepare_test_datasets(): - df1 = pd.read_json("hf://datasets/MAsad789565/Coding_GPT4_Data/Data/GPT_4_Coding.json") - df2 = pd.read_json("hf://datasets/olathepavilion/Conversational-datasets-json/Validation.jsonl", lines=True) + df1 = pd.read_json( + "hf://datasets/MAsad789565/Coding_GPT4_Data/Data/GPT_4_Coding.json" + ) + df2 = pd.read_json( + "hf://datasets/olathepavilion/Conversational-datasets-json/Validation.jsonl", + lines=True, + ) # save the pre-processed dataset to the default folder for test os.makedirs(DEFAULT_DATASET_FOLDER, exist_ok=True) diff --git a/tests/test_data_loader.py b/tests/test_data_loader.py index f3a008d..b323ddf 100644 --- a/tests/test_data_loader.py +++ b/tests/test_data_loader.py @@ -1,13 +1,13 @@ +import os import unittest + import pandas as pd -import os -from tracestorm.data_loader import Dataset, load_datasets from tracestorm.constants import DEFAULT_DATASET_FOLDER +from tracestorm.data_loader import Dataset, load_datasets class TestDataLoader(unittest.TestCase): - def test_remote_files(self): """ Test loading datasets from hugging face. @@ -15,9 +15,7 @@ def test_remote_files(self): 1. loading with datasets.load_dataset 2. loading csv format with pandas """ - datasets, sort = load_datasets( - "examples/datasets_config_hf.json" - ) + datasets, sort = load_datasets("examples/datasets_config_hf.json") assert isinstance(datasets, list) assert isinstance(datasets[0], Dataset) and isinstance( datasets[1], Dataset @@ -27,26 +25,30 @@ def test_remote_files(self): assert datasets[0].select_ratio == 2 and datasets[1].select_ratio == 8 assert datasets[0].length > 0 and datasets[1].length > 0 - def test_local_files(self): """Test loading from local files""" - + os.makedirs(DEFAULT_DATASET_FOLDER, exist_ok=True) # testing datasets - df1 = pd.read_json("hf://datasets/MAsad789565/Coding_GPT4_Data/Data/GPT_4_Coding.json") - df2 = pd.read_json("hf://datasets/olathepavilion/Conversational-datasets-json/Validation.jsonl", lines=True) + df1 = pd.read_json( + "hf://datasets/MAsad789565/Coding_GPT4_Data/Data/GPT_4_Coding.json" + ) + df2 = pd.read_json( + "hf://datasets/olathepavilion/Conversational-datasets-json/Validation.jsonl", + lines=True, + ) - # test with different file formats + # test with different file formats path1 = os.path.join(DEFAULT_DATASET_FOLDER, "GPT4_coding_sample.csv") - path2 = os.path.join(DEFAULT_DATASET_FOLDER, "Conversational_dataset.jsonl") + path2 = os.path.join( + DEFAULT_DATASET_FOLDER, "Conversational_dataset.jsonl" + ) # save the pre-processed dataset to the default folder for test df1.to_csv(path1, index=False) df2.to_json(path2, orient="records", lines=True) - - datasets, sort = load_datasets(os. - "examples/datasets_config_local.json" - ) + + datasets, sort = load_datasets("examples/datasets_config_local.json") assert isinstance(datasets, list) assert isinstance(datasets[0], Dataset) and isinstance( datasets[1], Dataset diff --git a/tracestorm/cli.py b/tracestorm/cli.py index e047dc6..360e323 100644 --- a/tracestorm/cli.py +++ b/tracestorm/cli.py @@ -1,5 +1,5 @@ import os -from typing import Tuple, Optional +from typing import Optional, Tuple import click diff --git a/tracestorm/data_loader.py b/tracestorm/data_loader.py index 47c9bbe..3348995 100644 --- a/tracestorm/data_loader.py +++ b/tracestorm/data_loader.py @@ -1,12 +1,12 @@ import json import os import re -import pandas as pd - from dataclasses import dataclass from typing import List, Optional, Tuple -from datasets import load_dataset +import pandas as pd + +from datasets import load_dataset from tracestorm.constants import DEFAULT_DATASET_FOLDER from tracestorm.logger import init_logger @@ -25,8 +25,12 @@ class Dataset: select_ratio: int length: int + def is_file_type(file_name, extensions): - return any(re.search(rf"\.{ext}$", file_name, re.IGNORECASE) for ext in extensions) + return any( + re.search(rf"\.{ext}$", file_name, re.IGNORECASE) for ext in extensions + ) + def resolve_file_path(file_name: str) -> str: """ @@ -38,14 +42,15 @@ def resolve_file_path(file_name: str) -> str: # os.makedirs(DEFAULT_DATASET_FOLDER, exist_ok=True) if os.path.exists(file_name): return os.path.abspath(file_name) - + # check if file exists in DEFAULT_DATASET_FOLDER file_path = os.path.join(DEFAULT_DATASET_FOLDER, file_name) if os.path.exists(file_path): return file_path - + return file_name - + + def normalize_prompts(row) -> List[str]: """ Convert one row to a list of prompts based on the format. @@ -65,7 +70,7 @@ def normalize_prompts(row) -> List[str]: "", ) prompts.append(prompt) - else: # we cannot handle this type + else: # we cannot handle this type continue elif isinstance(row, str): # if the row is already a prompt prompts.append(row) @@ -104,15 +109,13 @@ def load_datasets( if datasets_config_file is None: logger.error("Customized data loading logic needs to be implemented!") return [], None - + # Load datasets configuration file try: with open(datasets_config_file, "r") as f: datasets_config = json.load(f) except FileNotFoundError: - logger.error( - f"Configuration file '{datasets_config_file}' not found" - ) + logger.error(f"Configuration file '{datasets_config_file}' not found") return [], None except Exception as e: logger.error(f"Error reading '{datasets_config_file}': {e}") @@ -142,7 +145,6 @@ def load_datasets( f"Missing required 'file_name' or 'prompt_field' for dataset '{name}'" ) continue - prompts = [] file_path = resolve_file_path(file_name) @@ -150,40 +152,48 @@ def load_datasets( try: # If the file does not exist locally and is not of csv or json format, # try to load it from hugging face using datasets.load_dataset() first - if not os.path.exists(file_path) and not is_file_type(file_name, ["csv", "json", "jsonl"]): + if not os.path.exists(file_path) and not is_file_type( + file_name, ["csv", "json", "jsonl"] + ): data = load_dataset(file_name)[split] - + if prompt_field not in data.column_names: - logger.error(f"Field '{prompt_field}' not found in '{file_name}'.") + logger.error( + f"Field '{prompt_field}' not found in '{file_name}'." + ) continue - + check_field = True - - elif is_file_type(file_name, ["csv"]): # CSV files, could be either local or remote file - data = pd.read_csv(file_path) - - elif is_file_type(file_name, ["json", "jsonl"]): # JSON files - data = pd.read_json(file_path, lines=is_file_type(file_name, ["jsonl"])) - + + elif is_file_type( + file_name, ["csv"] + ): # CSV files, could be either local or remote file + data = pd.read_csv(file_path) + + elif is_file_type(file_name, ["json", "jsonl"]): # JSON files + data = pd.read_json( + file_path, lines=is_file_type(file_name, ["jsonl"]) + ) + else: - logger.error(f"Unsupported file format for '{file_name}'. Please implement customized loading logic.") - continue - + logger.error( + f"Unsupported file format for '{file_name}'. Please implement customized loading logic." + ) + continue + except Exception as e: logger.error(f"Failed to load '{file_name}': {e}") continue - - if not check_field and prompt_field not in set(data.columns): - logger.error( - f"Field '{prompt_field}' not found in '{file_name}'." - ) - continue - + + if not check_field and prompt_field not in set(data.columns): + logger.error(f"Field '{prompt_field}' not found in '{file_name}'.") + continue + # prompts = data[prompt_field].dropna().astype(str).tolist() # load each row for row in data[prompt_field]: prompts.extend(normalize_prompts(row)) - + # Add the dataset information (file name, a list of prompts, select ratio among all datasets, total number of prompts) dataset_obj = Dataset(file_name, prompts, ratio, len(prompts)) datasets.append(dataset_obj) @@ -193,5 +203,3 @@ def load_datasets( ) return datasets, sort_strategy - - From 6f96338f311f0fd3663d3f7249b8a1c7f375a152 Mon Sep 17 00:00:00 2001 From: XuehengWang Date: Thu, 20 Feb 2025 22:10:46 +0000 Subject: [PATCH 14/19] fix test --- examples/datasets_config_hf.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/datasets_config_hf.json b/examples/datasets_config_hf.json index cbc028a..1ad9093 100644 --- a/examples/datasets_config_hf.json +++ b/examples/datasets_config_hf.json @@ -6,8 +6,8 @@ "select_ratio": 2 }, "dataset_2": { - "file_name": "lmsys/chatbot_arena_conversations", - "prompt_field": "conversation_a", + "file_name": "MAsad789565/Coding_GPT4_Data", + "prompt_field": "user", "select_ratio": 8 } } \ No newline at end of file From 1c5c4249f5d6451a0c96bbae1492b1ae34700aa8 Mon Sep 17 00:00:00 2001 From: XuehengWang Date: Thu, 20 Feb 2025 22:57:41 +0000 Subject: [PATCH 15/19] add split for dataset config file --- examples/datasets_config_default.json | 3 ++- examples/datasets_config_hf.json | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/examples/datasets_config_default.json b/examples/datasets_config_default.json index b941dd4..0746f64 100644 --- a/examples/datasets_config_default.json +++ b/examples/datasets_config_default.json @@ -3,7 +3,8 @@ "dataset_1": { "file_name": "", "prompt_field": "", - "select_ratio": 1 + "select_ratio": 1, + "split": "train" }, "dataset_2": { "file_name": "", diff --git a/examples/datasets_config_hf.json b/examples/datasets_config_hf.json index 1ad9093..9539d50 100644 --- a/examples/datasets_config_hf.json +++ b/examples/datasets_config_hf.json @@ -3,11 +3,13 @@ "dataset_1": { "file_name": "hf://datasets/fka/awesome-chatgpt-prompts/prompts.csv", "prompt_field": "prompt", - "select_ratio": 2 + "select_ratio": 2, + "split": "train" }, "dataset_2": { "file_name": "MAsad789565/Coding_GPT4_Data", "prompt_field": "user", - "select_ratio": 8 + "select_ratio": 8, + "split": "train" } } \ No newline at end of file From 5adb622eff416224878907d1166d039d6021c462 Mon Sep 17 00:00:00 2001 From: Yao Fu Date: Fri, 21 Feb 2025 14:08:36 +0000 Subject: [PATCH 16/19] fix: add dependency --- pyproject.toml | 2 ++ requirements.txt | 4 +++- tracestorm/data_loader.py | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index e6edc77..1fc7b00 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,9 +12,11 @@ authors = [ ] dependencies = [ + "datasets>=3.3.2", "openai>=1.58.0", "numpy>=1.26.4", "pandas>=2.2.3", + "requests>=2.31.0", "seaborn>=0.13.2", "matplotlib>=3.9", "click>=8.1.8" diff --git a/requirements.txt b/requirements.txt index 2c61bb6..8f874c0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,8 @@ -datasets +click>=8.1.8 +datasets>=3.3.2 matplotlib>=3.9 numpy>=1.26.4 +openai>=1.58.0 pandas>=2.2.3 requests>=2.31.0 seaborn>=0.13.2 diff --git a/tracestorm/data_loader.py b/tracestorm/data_loader.py index 3348995..29eb79e 100644 --- a/tracestorm/data_loader.py +++ b/tracestorm/data_loader.py @@ -5,8 +5,8 @@ from typing import List, Optional, Tuple import pandas as pd - from datasets import load_dataset + from tracestorm.constants import DEFAULT_DATASET_FOLDER from tracestorm.logger import init_logger From 90533473111be04b1149c50287d69eb39030c33a Mon Sep 17 00:00:00 2001 From: Yao Fu Date: Fri, 21 Feb 2025 14:09:31 +0000 Subject: [PATCH 17/19] chore: simplify cli args --- tracestorm/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tracestorm/cli.py b/tracestorm/cli.py index 360e323..9b8f6e7 100644 --- a/tracestorm/cli.py +++ b/tracestorm/cli.py @@ -110,7 +110,7 @@ def create_trace_generator( help="OpenAI API Key", ) @click.option( - "--datasets-config-file", default=None, help="Config file for datasets" + "--datasets-config", default=None, help="Config file for datasets" ) def main( model, From a2a210300d03c0e3236c88fd80746d9d0ad2128d Mon Sep 17 00:00:00 2001 From: Yao Fu Date: Fri, 21 Feb 2025 14:14:21 +0000 Subject: [PATCH 18/19] fix: cli args --- README.md | 6 +++--- tracestorm/cli.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 38fc88b..748ccaf 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ tracestorm --model "Qwen/Qwen2.5-1.5B-Instruct" --pattern azure_code #### Example Command for Loading Prompts from Datasets ```bash -tracestorm --model "Qwen/Qwen2.5-1.5B-Instruct" --duration 30 --datasets-config-file ./examples/datasets_config_hf.json +tracestorm --model "Qwen/Qwen2.5-1.5B-Instruct" --duration 30 --datasets-config ./examples/datasets_config_hf.json ``` @@ -60,7 +60,7 @@ tracestorm --model "Qwen/Qwen2.5-1.5B-Instruct" --duration 30 --datasets-config- - Refer to `./examples/datasets_config_local.json` for an example configuration. - If you want to test loading from local files, please run `./examples/test_data_loader.py` first to download and save two datasets. -2. Remote datasets from Hugging Face +2. Remote datasets from Hugging Face - Refer to `./examples/datasets_config_hf.json` for an example configuration. **Sorting Strategy**: Defines how prompts from multiple datasets are ordered @@ -85,6 +85,6 @@ Please check `./examples/datasets_config_default.json` for required fields in `d - `--base-url`: Optional. OpenAI Base URL (default is `http://localhost:8000/v1`). - `--api-key`: Optional. OpenAI API Key (default is `none`). - `--seed`: Optional. Random seed for trace pattern reproducibility (default is `none`). -- `--datasets-config-file`: Optional. Configuration file for loading prompt messages from provided datasets. Uses `DEFAULT_MESSAGES` is not specified. +- `--datasets-config`: Optional. Configuration file for loading prompt messages from provided datasets. Uses `DEFAULT_MESSAGES` is not specified. Make sure to adjust the parameters according to your testing needs! diff --git a/tracestorm/cli.py b/tracestorm/cli.py index 9b8f6e7..2647858 100644 --- a/tracestorm/cli.py +++ b/tracestorm/cli.py @@ -121,7 +121,7 @@ def main( subprocesses, base_url, api_key, - datasets_config_file, + datasets_config, ): """Run trace-based load testing for OpenAI API endpoints.""" try: @@ -131,11 +131,11 @@ def main( if warning_msg: logger.warning(warning_msg) - if datasets_config_file is None: + if datasets_config is None: datasets = [] sort_strategy = None else: - datasets, sort_strategy = load_datasets(datasets_config_file) + datasets, sort_strategy = load_datasets(datasets_config) _, result_analyzer = run_load_test( trace_generator=trace_generator, From f769f038e92facf8bcb081cbc4ba6d4e3bf4e65f Mon Sep 17 00:00:00 2001 From: Yao Fu Date: Fri, 21 Feb 2025 14:16:43 +0000 Subject: [PATCH 19/19] fix: output figure x label --- tracestorm/result_analyzer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tracestorm/result_analyzer.py b/tracestorm/result_analyzer.py index 5130fd2..d35aef9 100644 --- a/tracestorm/result_analyzer.py +++ b/tracestorm/result_analyzer.py @@ -178,7 +178,7 @@ def plot_cdf( plt.figure(figsize=(8, 6)) sns.ecdfplot(self.ttft, color="blue") plt.title("CDF of Time to First Token (TTFT)") - plt.xlabel("TTFT (ms)") + plt.xlabel("TTFT") plt.ylabel("Cumulative Probability") plt.tight_layout() ttft_file = get_unique_file_path(ttft_file) @@ -196,7 +196,7 @@ def plot_cdf( plt.figure(figsize=(8, 6)) sns.ecdfplot(tpot_flat, color="green") plt.title("CDF of Time per Output Token (TPOT)") - plt.xlabel("TPOT (ms)") + plt.xlabel("TPOT") plt.ylabel("Cumulative Probability") plt.tight_layout() tpot_file = get_unique_file_path(tpot_file)