Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 11 additions & 30 deletions apps/worker-service/src/backtests/backtests.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ export class BacktestsService implements OnModuleInit, OnModuleDestroy {
}

/**
* Fetch historical candles by paginating through exchange API.
* 200 candles per request, walking backwards from endDate.
* Fetch historical candles for the given date range using paginated exchange API calls.
*/
private async fetchHistoricalCandles(
exchange: ExchangeId,
Expand All @@ -233,36 +232,18 @@ export class BacktestsService implements OnModuleInit, OnModuleDestroy {
if (!adapterFactory) throw new Error(`Unsupported exchange: ${exchange}`);

const adapter = adapterFactory();
const allCandles: Candle[] = [];
const startMs = startDate.getTime();
const endMs = endDate.getTime();

// Fetch in pages of 200 candles
// Most exchanges return candles in reverse chronological order,
// but our adapter reverses them to chronological
let fetchedCandles = await adapter.getCandles(symbol, interval, 200);

// Filter to date range
for (const c of fetchedCandles) {
if (c.timestamp >= startMs && c.timestamp <= endMs) {
allCandles.push(c);
}
}

// If we need more historical data, paginate
// This is a simplified approach — for a production system we'd
// use exchange-specific pagination parameters (e.g., `endTime`)
if (allCandles.length > 0) {
// Sort chronologically
allCandles.sort((a, b) => a.timestamp - b.timestamp);
}

// Cache for reuse
if (allCandles.length > 0) {
await this.redis.set(cacheKey, JSON.stringify(allCandles), 'EX', CANDLE_CACHE_TTL);
const candles = await adapter.getCandlesByRange(
symbol,
interval,
startDate.getTime(),
endDate.getTime(),
);

if (candles.length > 0) {
await this.redis.set(cacheKey, JSON.stringify(candles), 'EX', CANDLE_CACHE_TTL);
}

return allCandles;
return candles;
}

/**
Expand Down
72 changes: 72 additions & 0 deletions packages/exchange-adapters/src/binance/binance.rest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@ import { IExchangeRest } from '../interfaces/exchange-rest';

const BASE_URL = 'https://api.binance.com';

function parseIntervalMs(interval: string): number {
const INTERVAL_MS: Record<string, number> = {
'1m': 60_000,
'5m': 300_000,
'15m': 900_000,
'1h': 3_600_000,
'4h': 14_400_000,
'1d': 86_400_000,
};
return INTERVAL_MS[interval] ?? 60_000;
}

interface BinanceOrderResponse {
orderId: number;
symbol: string;
Expand Down Expand Up @@ -168,6 +180,66 @@ export class BinanceRest implements IExchangeRest {
}));
}

async getCandlesByRange(
symbol: string,
interval: string,
startTime: number,
endTime: number,
): Promise<Candle[]> {
const PAGE_LIMIT = 1000;
const intervalMs = parseIntervalMs(interval);
const allCandles: Candle[] = [];
let pageStart = startTime;
const MAX_PAGES = 100;

for (let page = 0; page < MAX_PAGES; page++) {
const res = await fetch(
`${BASE_URL}/api/v3/klines?symbol=${symbol}&interval=${interval}&limit=${PAGE_LIMIT}&startTime=${pageStart}&endTime=${endTime}`,
);
if (!res.ok) {
const body = await res.text();
throw new Error(`Binance API error ${res.status}: ${body}`);
}
const data = (await res.json()) as Array<
[
number,
string,
string,
string,
string,
string,
number,
string,
number,
string,
string,
string,
]
>;
if (data.length === 0) break;

for (const k of data) {
allCandles.push({
exchange: this.exchangeId,
symbol,
interval,
open: k[1],
high: k[2],
low: k[3],
close: k[4],
volume: k[5],
timestamp: k[0],
});
}

if (data.length < PAGE_LIMIT) break;
pageStart = data[data.length - 1][0] + intervalMs;
if (pageStart > endTime) break;
}

return allCandles;
}

private mapOrderResult(o: BinanceOrderResponse): OrderResult {
return {
exchange: this.exchangeId,
Expand Down
74 changes: 74 additions & 0 deletions packages/exchange-adapters/src/bybit/bybit.rest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@ import { IExchangeRest } from '../interfaces/exchange-rest';
const BASE_URL = 'https://api.bybit.com';
const RECV_WINDOW = '5000';

function parseIntervalMs(interval: string): number {
const INTERVAL_MS: Record<string, number> = {
'1m': 60_000,
'5m': 300_000,
'15m': 900_000,
'1h': 3_600_000,
'4h': 14_400_000,
'1d': 86_400_000,
};
return INTERVAL_MS[interval] ?? 60_000;
}

export class BybitRest implements IExchangeRest {
readonly exchangeId = 'bybit' as const;

Expand Down Expand Up @@ -214,6 +226,68 @@ export class BybitRest implements IExchangeRest {
}));
}

async getCandlesByRange(
symbol: string,
interval: string,
startTime: number,
endTime: number,
): Promise<Candle[]> {
const INTERVAL_MAP: Record<string, string> = {
'1m': '1',
'5m': '5',
'15m': '15',
'1h': '60',
'4h': '240',
'1d': 'D',
};
const bybitInterval = INTERVAL_MAP[interval] || '1';
const intervalMs = parseIntervalMs(interval);
Comment on lines +243 to +244
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Bybit candle interval handling has the same silent fallback-to-1m behavior as Upbit.

An unsupported interval currently falls back to Bybit '1' with intervalMs as 1 minute, which can silently distort backtest data. Consider validating interval against INTERVAL_MAP and throwing for unsupported values instead of defaulting.

const PAGE_LIMIT = 1000;
const allCandles: Candle[] = [];
let pageStart = startTime;
const MAX_PAGES = 100;

for (let page = 0; page < MAX_PAGES; page++) {
const res = await fetch(
`${BASE_URL}/v5/market/kline?category=spot&symbol=${symbol}&interval=${bybitInterval}&limit=${PAGE_LIMIT}&start=${pageStart}&end=${endTime}`,
);
if (!res.ok) {
const body = await res.text();
throw new Error(`Bybit API error ${res.status}: ${body}`);
}
const data = (await res.json()) as {
retCode: number;
retMsg: string;
result: { list: Array<[string, string, string, string, string, string, string]> };
};
if (data.retCode !== 0) {
throw new Error(`Bybit API error: ${data.retMsg}`);
}
const list = data.result?.list ?? [];
if (list.length === 0) break;

// Bybit returns newest-first; reverse to chronological
const page_candles = list.reverse().map((k) => ({
exchange: this.exchangeId,
symbol,
interval,
open: k[1],
high: k[2],
low: k[3],
close: k[4],
volume: k[5],
timestamp: Number(k[0]),
}));
allCandles.push(...page_candles);

if (list.length < PAGE_LIMIT) break;
pageStart = page_candles[page_candles.length - 1].timestamp + intervalMs;
if (pageStart > endTime) break;
}

return allCandles;
}

private mapOrder(o: BybitOrder): OrderResult {
return {
exchange: this.exchangeId,
Expand Down
6 changes: 6 additions & 0 deletions packages/exchange-adapters/src/interfaces/exchange-rest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,10 @@ export interface IExchangeRest {
): Promise<OrderResult>;
getMarkets(): Promise<Market[]>;
getCandles(symbol: string, interval: string, limit?: number): Promise<Candle[]>;
getCandlesByRange(
symbol: string,
interval: string,
startTime: number,
endTime: number,
): Promise<Candle[]>;
}
84 changes: 84 additions & 0 deletions packages/exchange-adapters/src/upbit/upbit.rest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@ import { IExchangeRest } from '../interfaces/exchange-rest';

const BASE_URL = 'https://api.upbit.com';

function parseIntervalMs(interval: string): number {
const INTERVAL_MS: Record<string, number> = {
'1m': 60_000,
'5m': 300_000,
'15m': 900_000,
'1h': 3_600_000,
'4h': 14_400_000,
'1d': 86_400_000,
};
return INTERVAL_MS[interval] ?? 60_000;
}

interface UpbitTrade {
market: string;
uuid: string;
Expand Down Expand Up @@ -177,6 +189,78 @@ export class UpbitRest implements IExchangeRest {
}));
}

async getCandlesByRange(
symbol: string,
interval: string,
startTime: number,
endTime: number,
): Promise<Candle[]> {
const INTERVAL_MAP: Record<string, string> = {
'1m': 'minutes/1',
'5m': 'minutes/5',
'15m': 'minutes/15',
'1h': 'minutes/60',
'4h': 'minutes/240',
'1d': 'days',
};
const path = INTERVAL_MAP[interval] || 'minutes/1';
const intervalMs = parseIntervalMs(interval);
const PAGE_LIMIT = 200;
const allCandles: Candle[] = [];
// Upbit paginates backward via `to` (exclusive end, ISO 8601)
let pageEnd = endTime;
const MAX_PAGES = 100;

for (let page = 0; page < MAX_PAGES; page++) {
const toParam = new Date(pageEnd + 1).toISOString();
const res = await fetch(
`${BASE_URL}/v1/candles/${path}?market=${symbol}&count=${PAGE_LIMIT}&to=${toParam}`,
);
if (!res.ok) {
const body = await res.text();
throw new Error(`Upbit API error ${res.status}: ${body}`);
}
const data = (await res.json()) as Array<{
candle_date_time_utc: string;
opening_price: number;
high_price: number;
low_price: number;
trade_price: number;
candle_acc_trade_volume: number;
timestamp: number;
}>;
if (data.length === 0) break;

// data is newest-first; filter to range and prepend to result
const inRange: Candle[] = [];
for (const k of data) {
if (k.timestamp < startTime) continue;
if (k.timestamp > endTime) continue;
inRange.push({
exchange: this.exchangeId,
symbol,
interval,
open: String(k.opening_price),
high: String(k.high_price),
low: String(k.low_price),
close: String(k.trade_price),
volume: String(k.candle_acc_trade_volume),
timestamp: k.timestamp,
});
}
// prepend so final array is chronological
allCandles.unshift(...inRange.reverse());

const oldestInPage = data[data.length - 1].timestamp;
if (oldestInPage <= startTime) break;
if (data.length < PAGE_LIMIT) break;
pageEnd = oldestInPage - intervalMs;
if (pageEnd < startTime) break;
}

return allCandles;
}

private mapOrderResponse(o: UpbitOrderResponse): OrderResult {
const execVol = parseFloat(o.executed_volume);

Expand Down
Loading