Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions docs/guides/session_management_standalone.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,16 @@ const sessionPool = await SessionPool.open(sessionPoolOptions);
// Get session.
const session = await sessionPool.getSession();

// Increase the errorScore.
session.markBad();
if (session) {
// Increase the errorScore.
session.markBad();

// Throw away the session.
session.retire();
// Throw away the session.
session.retire();

// Lower the errorScore and mark the session good.
session.markGood();
// Lower the errorScore and mark the session good.
session.markGood();

// Return the session to the pool, so it can be reused
sessionPool.reclaimSession(session);
}
20 changes: 17 additions & 3 deletions packages/basic-crawler/src/internals/basic-crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,14 @@ export class BasicCrawler<
await Promise.all(context[deferredCleanupKey].map((fn) => fn()));
},
})
.compose({ action: this.resolveSession.bind(this) })
.compose({
action: async (context) => this.resolveSession(context),
cleanup: async (context) => {
if (context?.session) {
await this.sessionPool?.reclaimSession(context.session);
}
},
})
.compose({ action: this.createContextHelpers.bind(this) });
}

Expand Down Expand Up @@ -1068,7 +1075,7 @@ export class BasicCrawler<
const session = this.useSessionPool
? await this._timeoutAndRetry(
async () => {
return await this.sessionPool!.newSession({
return await this.sessionPool!.getSession({
proxyInfo: await this.proxyConfiguration?.newProxyInfo({
request: request ?? undefined,
}),
Expand All @@ -1080,7 +1087,7 @@ export class BasicCrawler<
)
: undefined;

return { session, proxyInfo: session?.proxyInfo };
return { session, proxyInfo: session?.proxyInfo } as const;
}

private async createContextHelpers({ request, session }: { request: Request; session?: Session }) {
Expand Down Expand Up @@ -1994,6 +2001,13 @@ export class BasicCrawler<
* Returns true if either RequestList or RequestQueue have a request ready for processing.
*/
protected async _isTaskReadyFunction() {
if (this.sessionPool && !(await this.sessionPool.hasIdleSessions())) {
this.log.warning(
`No idle session available for the next task. Lower your concurrency or increase the session pool size.`,
);
return false;
}

return this.requestManager !== undefined && !(await this.requestManager.isEmpty());
}

Expand Down
2 changes: 1 addition & 1 deletion packages/browser-crawler/src/internals/browser-crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ export abstract class BrowserCrawler<
const launchContextExtends: { session?: Session; proxyInfo?: ProxyInfo } = {};

if (this.sessionPool) {
launchContextExtends.session = await this.sessionPool.newSession({
launchContextExtends.session = await this.sessionPool.getSession({
proxyInfo: await this.proxyConfiguration?.newProxyInfo({
// cannot pass a request here, since session is created on browser launch
}),
Expand Down
179 changes: 58 additions & 121 deletions packages/core/src/session_pool/session_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { EventType } from '../events/event_manager.js';
import type { CrawleeLogger } from '../log.js';
import { serviceLocator } from '../service_locator.js';
import { KeyValueStore } from '../storages/key_value_store.js';
import { entries } from '../typedefs.js';
import { BLOCKED_STATUS_CODES, MAX_POOL_SIZE, PERSIST_STATE_KEY } from './consts.js';
import type { SessionOptions } from './session.js';
import { Session } from './session.js';
Expand Down Expand Up @@ -137,8 +138,7 @@ export class SessionPool extends EventEmitter {
protected maxPoolSize: number;
protected createSessionFunction: CreateSession;
protected keyValueStore!: KeyValueStore;
protected sessions: Session[] = [];
protected sessionMap = new Map<string, Session>();
protected sessions = new Map<string, { busy: boolean; session: Session }>();
protected sessionOptions: SessionOptions;
protected persistStateKeyValueStoreId?: string;
protected persistStateKey: string;
Expand Down Expand Up @@ -205,20 +205,6 @@ export class SessionPool extends EventEmitter {
this.persistStateKey = persistStateKey;
}

/**
* Gets count of usable sessions in the pool.
*/
get usableSessionsCount(): number {
return this.sessions.filter((session) => session.isUsable()).length;
}

/**
* Gets count of retired sessions in the pool.
*/
get retiredSessionsCount(): number {
return this.sessions.filter((session) => !session.isUsable()).length;
}

/**
* Starts periodic state persistence and potentially loads SessionPool state from {@apilink KeyValueStore}.
* It is called automatically by the {@apilink SessionPool.open} function.
Expand Down Expand Up @@ -251,60 +237,38 @@ export class SessionPool extends EventEmitter {
this.isInitialized = true;
}

/**
* Adds a new session to the session pool. The pool automatically creates sessions up to the maximum size of the pool,
* but this allows you to add more sessions once the max pool size is reached.
* This also allows you to add session with overridden session options (e.g. with specific session id).
* @param [options] The configuration options for the session being added to the session pool.
*/
async addSession(options: Session | SessionOptions = {}): Promise<void> {
private async markAsBusy(session: Session) {
this._throwIfNotInitialized();
const { id } = options;
if (id) {
const sessionExists = this.sessionMap.has(id);
if (sessionExists) {
throw new Error(`Cannot add session with id '${id}' as it already exists in the pool`);
}
}

if (!this._hasSpaceForSession()) {
this._removeRetiredSessions();
const sessionData = this.sessions.get(session.id);
if (!sessionData) {
throw new Error('Marking session as busy that is not in the pool');
}

const newSession =
options instanceof Session ? options : await this.createSessionFunction(this, { sessionOptions: options });
this.log.debug(`Adding new Session - ${newSession.id}`);

this._addSession(newSession);
sessionData.busy = true;
}

/**
* Adds a new session to the session pool. The pool automatically creates sessions up to the maximum size of the pool,
* but this allows you to add more sessions once the max pool size is reached.
* This also allows you to add session with overridden session options (e.g. with specific session id).
* @param [options] The configuration options for the session being added to the session pool.
*/
async newSession(sessionOptions?: SessionOptions): Promise<Session> {
async hasIdleSessions(): Promise<boolean> {
this._throwIfNotInitialized();

const newSession = await this.createSessionFunction(this, { sessionOptions });
this._addSession(newSession);

return newSession;
return !!this.sessions.values().find(({ busy }) => !busy);
}

/**
* Gets session.
* If there is space for new session, it creates and returns new session.
* If the session pool is full, it picks a session from the pool,
* If the picked session is usable it is returned, otherwise it creates and returns a new one.
*/
async getSession(): Promise<Session>;
async reclaimSession(session: Session): Promise<void> {
this._throwIfNotInitialized();

/**
* Gets session based on the provided session id or `undefined.
*/
async getSession(sessionId: string): Promise<Session>;
if (!session.isUsable()) {
this.sessions.delete(session.id);
return;
}

const sessionData = this.sessions.get(session.id);
if (!sessionData) {
throw new Error('Reclaiming session that is not in the pool');
}

sessionData.busy = false;
}

/**
* Gets session.
Expand All @@ -313,29 +277,36 @@ export class SessionPool extends EventEmitter {
* If the picked session is usable it is returned, otherwise it creates and returns a new one.
* @param [sessionId] If provided, it returns the usable session with this id, `undefined` otherwise.
*/
async getSession(sessionId?: string): Promise<Session | undefined> {
async getSession(options: SessionOptions = {}): Promise<Session | undefined> {
await this.queue.wait();

try {
this._throwIfNotInitialized();

if (sessionId) {
const session = this.sessionMap.get(sessionId);
if (session && session.isUsable()) return session;
return undefined;
}
// TODO use the custom fetch strategy here
let idleSession = this.sessions.values().find((s) => {
if (s.busy) return false;

if (this._hasSpaceForSession()) {
return await this._createSession();
}
for (const [key, value] of entries(options)) {
if (s.session[key] !== value) return false;
}

return true;
})?.session;

if (!idleSession) {
// The user has requested a specific session, and it's present (but busy)
if (options.id && this.sessions.has(options.id)) {
return undefined;
}

const pickedSession = this._pickSession();
if (pickedSession.isUsable()) {
return pickedSession;
if (this.sessions.size < this.maxPoolSize) {
idleSession = await this._createSession(options);
}
}

this._removeRetiredSessions();
return await this._createSession();
if (idleSession) await this.markAsBusy(idleSession);
return idleSession;
} finally {
this.queue.shift();
}
Expand All @@ -357,11 +328,7 @@ export class SessionPool extends EventEmitter {
* Note that the object's fields can change in future releases.
*/
getState() {
return {
usableSessionsCount: this.usableSessionsCount,
retiredSessionsCount: this.retiredSessionsCount,
sessions: this.sessions.map((session) => session.getState()),
};
return [...this.sessions.values().map(({ session }) => session.getState())];
}

/**
Expand Down Expand Up @@ -408,34 +375,12 @@ export class SessionPool extends EventEmitter {
if (!this.isInitialized) throw new Error('SessionPool is not initialized.');
}

/**
* Removes retired `Session` instances from `SessionPool`.
*/
protected _removeRetiredSessions() {
this.sessions = this.sessions.filter((storedSession) => {
if (storedSession.isUsable()) return true;

this.sessionMap.delete(storedSession.id);
this.log.debug(`Removed Session - ${storedSession.id}`);

return false;
});
}

/**
* Adds `Session` instance to `SessionPool`.
* @param newSession `Session` instance to be added.
*/
protected _addSession(newSession: Session) {
this.sessions.push(newSession);
this.sessionMap.set(newSession.id, newSession);
}

/**
* Gets random index.
*/
protected _getRandomIndex(): number {
return Math.floor(Math.random() * this.sessions.length);
protected _addSession(session: Session) {
this.sessions.set(session.id, { busy: false, session });
}

/**
Expand Down Expand Up @@ -463,8 +408,8 @@ export class SessionPool extends EventEmitter {
* Creates new session and adds it to the pool.
* @returns Newly created `Session` instance.
*/
protected async _createSession(): Promise<Session> {
const newSession = await this.createSessionFunction(this);
protected async _createSession(sessionOptions: SessionOptions): Promise<Session> {
const newSession = await this.createSessionFunction(this, { sessionOptions });
this._addSession(newSession);
this.log.debug(`Created new Session - ${newSession.id}`);

Expand All @@ -475,44 +420,36 @@ export class SessionPool extends EventEmitter {
* Decides whether there is enough space for creating new session.
*/
protected _hasSpaceForSession(): boolean {
return this.sessions.length < this.maxPoolSize;
}

/**
* Picks random session from the `SessionPool`.
* @returns Picked `Session`.
*/
protected _pickSession(): Session {
return this.sessions[this._getRandomIndex()]; // Or maybe we should let the developer to customize the picking algorithm
return this.sessions.size < this.maxPoolSize;
}

/**
* Potentially loads `SessionPool`.
* If the state was persisted it loads the `SessionPool` from the persisted state.
*/
protected async _maybeLoadSessionPool(): Promise<void> {
const loadedSessionPool = await this.keyValueStore.getValue<{ sessions: Dictionary[] }>(this.persistStateKey);
const loadedSessions = await this.keyValueStore.getValue<Dictionary[]>(this.persistStateKey);

if (!loadedSessionPool) return;
if (!loadedSessions) return;

// Invalidate old sessions and load active sessions only
this.log.debug('Recreating state from KeyValueStore', {
persistStateKeyValueStoreId: this.persistStateKeyValueStoreId,
persistStateKey: this.persistStateKey,
});

for (const sessionObject of loadedSessionPool.sessions) {
sessionObject.sessionPool = this;
sessionObject.createdAt = new Date(sessionObject.createdAt as string);
sessionObject.expiresAt = new Date(sessionObject.expiresAt as string);
const recreatedSession = await this.createSessionFunction(this, { sessionOptions: sessionObject });
for (const sessionOptions of loadedSessions) {
sessionOptions.sessionPool = this;
sessionOptions.createdAt = new Date(sessionOptions.createdAt as string);
sessionOptions.expiresAt = new Date(sessionOptions.expiresAt as string);
const recreatedSession = await this.createSessionFunction(this, { sessionOptions });

if (recreatedSession.isUsable()) {
this._addSession(recreatedSession);
}
}

this.log.debug(`${this.usableSessionsCount} active sessions loaded from KeyValueStore`);
this.log.debug(`${this.sessions.size} sessions loaded from KeyValueStore`);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion packages/memory-storage/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
"fs-extra": "^11.3.0",
"json5": "^2.2.3",
"mime-types": "^3.0.1",
"proper-lockfile": "^4.1.2",
"p-limit": "^6.2.0",
"proper-lockfile": "^4.1.2",
"tslib": "^2.8.1"
}
}
Loading
Loading