Skip to content

Commit f100b6a

Browse files
committed
fix: added expired_at filter to message pipeline
1 parent 6a8ccb4 commit f100b6a

2 files changed

Lines changed: 46 additions & 2 deletions

File tree

src/handlers/subscribe-message-handler.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ import { anyPass, equals, isNil, map, propSatisfies, uniqWith } from 'ramda'
33
import { pipeline } from 'stream/promises'
44

55
import { createEndOfStoredEventsNoticeMessage, createNoticeMessage, createOutgoingEventMessage } from '../utils/messages'
6+
import { DBEvent, Event } from '../@types/event'
67
import { IAbortable, IMessageHandler } from '../@types/message-handlers'
78
import { isEventMatchingFilter, toNostrEvent } from '../utils/event'
89
import { streamEach, streamEnd, streamFilter, streamMap } from '../utils/stream'
910
import { SubscriptionFilter, SubscriptionId } from '../@types/subscription'
1011
import { createLogger } from '../factories/logger-factory'
11-
import { Event } from '../@types/event'
1212
import { IEventRepository } from '../@types/repositories'
1313
import { IWebSocketAdapter } from '../@types/adapters'
1414
import { Settings } from '../@types/settings'
@@ -55,6 +55,8 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable {
5555
const sendEOSE = () =>
5656
this.webSocket.emit(WebSocketAdapterEvent.Message, createEndOfStoredEventsNoticeMessage(subscriptionId))
5757
const isSubscribedToEvent = SubscribeMessageHandler.isClientSubscribedToEvent(filters)
58+
const isNotExpired = (event: DBEvent) =>
59+
typeof event.expires_at !== 'number' || event.expires_at > Math.floor(Date.now() / 1000)
5860

5961
const findEvents = this.eventRepository.findByFilters(filters).stream()
6062

@@ -64,6 +66,7 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable {
6466
await pipeline(
6567
findEvents,
6668
streamFilter(propSatisfies(isNil, 'deleted_at')),
69+
streamFilter(isNotExpired),
6770
streamMap(toNostrEvent),
6871
streamFilter(isSubscribedToEvent),
6972
streamEach(sendEvent),

test/unit/handlers/subscribe-message-handler.spec.ts

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,18 @@ import { WebSocketAdapterEvent } from '../../../src/constants/adapter'
1717
chai.use(chaiAsPromised)
1818
const { expect } = chai
1919

20-
const toDbEvent = (event: Event) => ({
20+
const toDbEvent = (
21+
event: Event,
22+
metadata: { expires_at?: number, deleted_at?: Date | null } = {},
23+
) => ({
2124
event_id: Buffer.from(event.id, 'hex'),
2225
event_kind: event.kind,
2326
event_pubkey: Buffer.from(event.pubkey, 'hex'),
2427
event_created_at: event.created_at,
2528
event_content: event.content,
2629
event_tags: event.tags,
2730
event_signature: Buffer.from(event.sig, 'hex'),
31+
...metadata,
2832
})
2933

3034
describe('SubscribeMessageHandler', () => {
@@ -165,6 +169,43 @@ describe('SubscribeMessageHandler', () => {
165169
)
166170
})
167171

172+
it('does not send expired events', async () => {
173+
isClientSubscribedToEventStub.returns(always(true))
174+
175+
const now = Math.floor(Date.now() / 1000)
176+
const promise = (handler as any).fetchAndSend(subscriptionId, filters)
177+
178+
stream.write(toDbEvent(event, { expires_at: now - 1 }))
179+
stream.end()
180+
181+
await promise
182+
183+
expect(eventRepositoryFindByFiltersStub).to.have.been.calledOnceWithExactly(filters)
184+
expect(webSocketOnMessageStub).to.have.been.calledOnceWithExactly(
185+
['EOSE', subscriptionId],
186+
)
187+
})
188+
189+
it('sends event if expiration is in the future', async () => {
190+
isClientSubscribedToEventStub.returns(always(true))
191+
192+
const now = Math.floor(Date.now() / 1000)
193+
const promise = (handler as any).fetchAndSend(subscriptionId, filters)
194+
195+
stream.write(toDbEvent(event, { expires_at: now + 60 }))
196+
stream.end()
197+
198+
await promise
199+
200+
expect(eventRepositoryFindByFiltersStub).to.have.been.calledOnceWithExactly(filters)
201+
expect(webSocketOnMessageStub).to.have.been.calledWithExactly(
202+
['EVENT', subscriptionId, event],
203+
)
204+
expect(webSocketOnMessageStub).to.have.been.calledWithExactly(
205+
['EOSE', subscriptionId],
206+
)
207+
})
208+
168209
it('sends EOSE', async () => {
169210
const promise = (handler as any).fetchAndSend(subscriptionId, filters)
170211

0 commit comments

Comments
 (0)