Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/assets-controller/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- `TokenDataSource` splits v3 asset metadata fetches into batches of at most 120 asset IDs per request, matching the Tokens API limit, and runs chunk requests in parallel with bounded concurrency via `p-limit` ([#8294](https://github.com/MetaMask/core/pull/8294))
- `PriceDataSource` splits v3 spot-price fetches into batches of at most 120 asset IDs per request, matching the same scale as token metadata requests, with bounded concurrency via `p-limit` ([#8294](https://github.com/MetaMask/core/pull/8294))

## [3.2.0]

### Added
Expand Down
9 changes: 8 additions & 1 deletion packages/assets-controller/src/AssetsController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,14 @@ export class AssetsController extends BaseController<
chains: ChainId[],
previousChains: ChainId[],
): void => {
this.#handleActiveChainsUpdate(dataSourceName, chains, previousChains);
try {
this.#handleActiveChainsUpdate(dataSourceName, chains, previousChains);
} catch (error) {
log('Failed to handle active chains update', {
dataSourceName,
error,
});
}
};

this.#backendWebsocketDataSource = new BackendWebsocketDataSource({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import {
MockAnyNamespace,
} from '@metamask/messenger';
import { NetworkStatus } from '@metamask/network-controller';

import {
NetworkState,
RpcEndpoint,
RpcEndpointType,
} from '../../../network-controller/src/NetworkController';
} from '@metamask/network-controller/src/NetworkController';

import {
AssetsControllerMessenger,
getDefaultAssetsControllerState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,75 @@ describe('PriceDataSource', () => {
controller.destroy();
});

it('fetch batches spot price requests in chunks of 120 asset IDs', async () => {
const assetIds = Array.from({ length: 121 }, (_, i) => {
const hexString = i.toString(16).padStart(40, '0');
return `eip155:1/erc20:0x${hexString}` as Caip19AssetId;
});
const balanceState: Record<string, Record<string, unknown>> = {
'mock-account-id': Object.fromEntries(
assetIds.map((id) => [id, { amount: '1' }]),
),
};
const priceResponse = Object.fromEntries(
assetIds.map((id) => [id, createMockPriceData(1)]),
);

const { controller, apiClient, getAssetsState } = setupController({
balanceState,
priceResponse,
});

await controller.fetch(createDataRequest({ chainIds: [] }), getAssetsState);

expect(apiClient.prices.fetchV3SpotPrices).toHaveBeenCalledTimes(2);
const chunkSizes = apiClient.prices.fetchV3SpotPrices.mock.calls
.map((call) => (call[0] as string[]).length)
.sort((a, b) => b - a);
expect(chunkSizes).toStrictEqual([120, 1]);

controller.destroy();
});

it('fetch batches spot price requests per chunk for non-USD currency', async () => {
const assetIds = Array.from({ length: 121 }, (_, i) => {
const hexString = i.toString(16).padStart(40, '0');
return `eip155:1/erc20:0x${hexString}` as Caip19AssetId;
});
const balanceState: Record<string, Record<string, unknown>> = {
'mock-account-id': Object.fromEntries(
assetIds.map((id) => [id, { amount: '1' }]),
),
};
const priceResponse = Object.fromEntries(
assetIds.map((id) => [id, createMockPriceData(1)]),
);

const { controller, apiClient, getAssetsState } = setupController({
getSelectedCurrency: () => 'eur',
balanceState,
priceResponse,
});

await controller.fetch(createDataRequest({ chainIds: [] }), getAssetsState);

expect(apiClient.prices.fetchV3SpotPrices).toHaveBeenCalledTimes(4);
const eurChunks = apiClient.prices.fetchV3SpotPrices.mock.calls.filter(
(call) => (call[1] as { currency?: string }).currency === 'eur',
);
const usdChunks = apiClient.prices.fetchV3SpotPrices.mock.calls.filter(
(call) => (call[1] as { currency?: string }).currency === 'usd',
);
expect(eurChunks).toHaveLength(2);
expect(usdChunks).toHaveLength(2);
const eurSizes = eurChunks
.map((call) => (call[0] as string[]).length)
.sort((a, b) => b - a);
expect(eurSizes).toStrictEqual([120, 1]);

controller.destroy();
});

it('fetch handles getState error gracefully', async () => {
const { controller } = setupController();

Expand Down
63 changes: 45 additions & 18 deletions packages/assets-controller/src/data-sources/PriceDataSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type {
} from '@metamask/core-backend';
import { ApiPlatformClient } from '@metamask/core-backend';
import { parseCaipAssetType } from '@metamask/utils';
import pLimit from 'p-limit';

import type { SubscriptionRequest } from './AbstractDataSource';
import { projectLogger, createModuleLogger } from '../logger';
Expand All @@ -24,6 +25,12 @@ import type {
const CONTROLLER_NAME = 'PriceDataSource';
const DEFAULT_POLL_INTERVAL = 60_000; // 1 minute for price updates

/** Price API v3 spot-prices accepts at most this many `assetIds` per request (same cap as tokens `/v3/assets`). */
const V3_SPOT_PRICES_MAX_IDS_PER_REQUEST = 120;

/** Max concurrent spot-price chunk requests (aligned with TokenDataSource metadata fetches). */
const V3_SPOT_PRICES_FETCH_CONCURRENCY = 3;

const log = createModuleLogger(projectLogger, CONTROLLER_NAME);

// ============================================================================
Expand Down Expand Up @@ -219,30 +226,50 @@ export class PriceDataSource {
async #fetchSpotPrices(
assetIds: string[],
): Promise<Record<Caip19AssetId, FungibleAssetPrice>> {
if (assetIds.length === 0) {
return {};
}

const selectedCurrency = this.#getSelectedCurrency();
const chunks: string[][] = [];
for (
let i = 0;
i < assetIds.length;
i += V3_SPOT_PRICES_MAX_IDS_PER_REQUEST
) {
chunks.push(assetIds.slice(i, i + V3_SPOT_PRICES_MAX_IDS_PER_REQUEST));
}

const limit = pLimit(V3_SPOT_PRICES_FETCH_CONCURRENCY);
const queryOpts = { includeMarketData: true as const };

const selectedChunkResults = await Promise.all(
chunks.map((chunk) =>
limit(() =>
this.#apiClient.prices.fetchV3SpotPrices(chunk, {
currency: selectedCurrency,
...queryOpts,
}),
),
),
);
const selectedCurrencyPrices = Object.assign({}, ...selectedChunkResults);

let selectedCurrencyPrices: V3SpotPricesResponse;
let usdPrices: V3SpotPricesResponse;
if (selectedCurrency === 'usd') {
selectedCurrencyPrices = await this.#apiClient.prices.fetchV3SpotPrices(
assetIds,
{
currency: selectedCurrency,
includeMarketData: true,
},
);
usdPrices = selectedCurrencyPrices;
} else {
[selectedCurrencyPrices, usdPrices] = await Promise.all([
this.#apiClient.prices.fetchV3SpotPrices(assetIds, {
currency: selectedCurrency,
includeMarketData: true,
}),
this.#apiClient.prices.fetchV3SpotPrices(assetIds, {
currency: 'usd',
includeMarketData: true,
}),
]);
const usdChunkResults = await Promise.all(
chunks.map((chunk) =>
limit(() =>
this.#apiClient.prices.fetchV3SpotPrices(chunk, {
currency: 'usd',
...queryOpts,
}),
),
),
);
usdPrices = Object.assign({}, ...usdChunkResults);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-USD price fetches serialized instead of parallel

Medium Severity

For non-USD users, the old code ran selected-currency and USD fetchV3SpotPrices calls in parallel via Promise.all. The new code fully awaits all selected-currency chunk results before starting any USD chunk requests, serializing what was previously parallel. Since the pLimit instance is shared, both sets of chunks could safely be dispatched together through the same limiter. For the common case (≤120 assets, single chunk per currency), this roughly doubles the price-fetch latency on every poll tick for all non-USD users.

Fix in Cursor Fix in Web

}

const prices: Record<Caip19AssetId, FungibleAssetPrice> = {};
Expand Down
22 changes: 16 additions & 6 deletions packages/assets-controller/src/data-sources/RpcDataSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,14 @@ export class RpcDataSource extends AbstractDataSource<
balanceFetcherMessenger,
{ pollingInterval: balanceInterval },
);
this.#balanceFetcher.setOnBalanceUpdate(
this.#handleBalanceUpdate.bind(this),
);
// Polling controller awaits this callback; rejections must not become unhandled.
this.#balanceFetcher.setOnBalanceUpdate(async (result) => {
try {
await this.#handleBalanceUpdate(result);
} catch (error) {
log('Balance update handler failed', { error });
}
});

// Initialize TokenDetector with polling interval
this.#tokenDetector = new TokenDetector(
Expand All @@ -295,9 +300,14 @@ export class RpcDataSource extends AbstractDataSource<
useExternalService: this.#useExternalService,
},
);
this.#tokenDetector.setOnDetectionUpdate(
this.#handleDetectionUpdate.bind(this),
);
// Sync throw in the detector would reject the poll tick if uncaught.
this.#tokenDetector.setOnDetectionUpdate((result) => {
try {
this.#handleDetectionUpdate(result);
} catch (error) {
log('Detection update handler failed', { error });
}
});

this.#subscribeToNetworkController();
this.#subscribeToTransactionEvents();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,10 @@ export class SnapDataSource extends AbstractDataSource<
// Transform the snap keyring payload to DataResponse format
let assetsBalance: NonNullable<DataResponse['assetsBalance']> | undefined;

for (const [accountId, assets] of Object.entries(payload.balances)) {
for (const [accountId, assets] of Object.entries(payload?.balances ?? {})) {
let accountAssets: Record<Caip19AssetId, AssetBalance> | undefined;

for (const [assetId, balance] of Object.entries(assets)) {
for (const [assetId, balance] of Object.entries(assets ?? {})) {
let chainId: ChainId;
try {
chainId = extractChainFromAssetId(assetId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,14 @@ export class StakedBalanceDataSource extends AbstractDataSource<
});

// Wire the callback so polling results flow back to subscriptions
this.#stakedBalanceFetcher.setOnStakedBalanceUpdate(
this.#handleStakedBalanceUpdate.bind(this),
);
// Polling controller invokes this synchronously; keep failures inside the poll tick.
this.#stakedBalanceFetcher.setOnStakedBalanceUpdate((result) => {
try {
this.#handleStakedBalanceUpdate(result);
} catch (error) {
log('Staked balance update handler failed', { error });
}
});

this.#messenger.subscribe(
'TransactionController:transactionConfirmed',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,39 @@ describe('TokenDataSource', () => {
expect(next).toHaveBeenCalledWith(context);
});

it('middleware chunks fetchV3Assets when more than 120 asset IDs are requested', async () => {
const assetIds = Array.from({ length: 121 }, (_, i) => {
const hexString = (i + 1).toString(16).padStart(40, '0');
return `eip155:1/erc20:0x${hexString}` as Caip19AssetId;
});

const { controller, apiClient } = setupController({
supportedNetworks: ['eip155:1'],
});

apiClient.tokens.fetchV3Assets.mockImplementation((ids: string[]) =>
Promise.resolve(ids.map((id) => createMockAssetResponse(id))),
);

const next = jest.fn().mockResolvedValue(undefined);
const context = createMiddlewareContext({
response: {
detectedAssets: {
'mock-account-id': assetIds,
},
},
});

await controller.assetsMiddleware(context, next);

expect(apiClient.tokens.fetchV3Assets).toHaveBeenCalledTimes(2);
expect(apiClient.tokens.fetchV3Assets.mock.calls[0][0]).toHaveLength(120);
expect(apiClient.tokens.fetchV3Assets.mock.calls[1][0]).toHaveLength(1);
expect(context.response.assetsInfo?.[assetIds[0]]?.symbol).toBe('TEST');
expect(context.response.assetsInfo?.[assetIds[120]]?.symbol).toBe('TEST');
expect(next).toHaveBeenCalledWith(context);
});

it('middleware transforms native asset type correctly', async () => {
const { controller } = setupController({
supportedNetworks: ['eip155:1'],
Expand Down
51 changes: 38 additions & 13 deletions packages/assets-controller/src/data-sources/TokenDataSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { V3AssetResponse } from '@metamask/core-backend';
import { ApiPlatformClient } from '@metamask/core-backend';
import { parseCaipAssetType } from '@metamask/utils';
import type { CaipAssetType } from '@metamask/utils';
import pLimit from 'p-limit';

import { isStakingContractAssetId } from './evm-rpc-services';
import { projectLogger, createModuleLogger } from '../logger';
Expand All @@ -19,6 +20,12 @@ import type {

const CONTROLLER_NAME = 'TokenDataSource';

/** Tokens API `/v3/assets` accepts at most this many `assetIds` per request. */
const V3_ASSETS_MAX_IDS_PER_REQUEST = 120;

/** Max concurrent `/v3/assets` chunk requests (same default scale as balance middleware). */
const V3_ASSETS_FETCH_CONCURRENCY = 3;

const MIN_TOKEN_OCCURRENCES = 3;

const log = createModuleLogger(projectLogger, CONTROLLER_NAME);
Expand Down Expand Up @@ -243,20 +250,38 @@ export class TokenDataSource {
}

try {
// Use ApiPlatformClient for fetching asset metadata
// API returns an array with assetId as a property on each item
const metadataResponse = await this.#apiClient.tokens.fetchV3Assets(
supportedAssetIds,
{
includeIconUrl: true,
includeMarketData: true,
includeMetadata: true,
includeLabels: true,
includeRwaData: true,
includeAggregators: true,
includeOccurrences: true,
},
const metadataQueryOptions = {
includeIconUrl: true,
includeMarketData: true,
includeMetadata: true,
includeLabels: true,
includeRwaData: true,
includeAggregators: true,
includeOccurrences: true,
};

// API returns an array with assetId as a property on each item.
// Request in chunks to stay within the per-request asset ID limit.
const chunks: string[][] = [];
for (
let i = 0;
i < supportedAssetIds.length;
i += V3_ASSETS_MAX_IDS_PER_REQUEST
) {
chunks.push(
supportedAssetIds.slice(i, i + V3_ASSETS_MAX_IDS_PER_REQUEST),
);
}

const limit = pLimit(V3_ASSETS_FETCH_CONCURRENCY);
const chunkResponses = await Promise.all(
chunks.map((chunk) =>
limit(() =>
this.#apiClient.tokens.fetchV3Assets(chunk, metadataQueryOptions),
),
),
);
const metadataResponse = chunkResponses.flat();

response.assetsInfo ??= {};

Expand Down