diff --git a/ably.d.ts b/ably.d.ts index bd596e4a0d..58b35fc4a1 100644 --- a/ably.d.ts +++ b/ably.d.ts @@ -1467,6 +1467,11 @@ export interface BatchPublishSuccessResult { * A unique ID prefixed to the {@link Message.id} of each published message. */ messageId: string; + /** + * An array of message serials corresponding 1:1 to the messages that were published. + * A serial may be null if the message was discarded due to a configured conflation rule. + */ + serials: (string | null)[]; } /** @@ -2317,26 +2322,26 @@ export declare interface Channel { * * @param messages - An array of {@link Message} objects. * @param options - Optional parameters, such as [`quickAck`](https://faqs.ably.com/why-are-some-rest-publishes-on-a-channel-slow-and-then-typically-faster-on-subsequent-publishes) sent as part of the query string. - * @returns A promise which resolves upon success of the operation and rejects with an {@link ErrorInfo} object upon its failure. + * @returns A promise which, upon success, will be fulfilled with a {@link PublishResult} object containing the serials of the published messages. Upon failure, the promise will be rejected with an {@link ErrorInfo} object which explains the error. */ - publish(messages: Message[], options?: PublishOptions): Promise; + publish(messages: Message[], options?: PublishOptions): Promise; /** * Publishes a message to the channel. * * @param message - A {@link Message} object. * @param options - Optional parameters, such as [`quickAck`](https://faqs.ably.com/why-are-some-rest-publishes-on-a-channel-slow-and-then-typically-faster-on-subsequent-publishes) sent as part of the query string. - * @returns A promise which resolves upon success of the operation and rejects with an {@link ErrorInfo} object upon its failure. + * @returns A promise which, upon success, will be fulfilled with a {@link PublishResult} object containing the serial of the published message. Upon failure, the promise will be rejected with an {@link ErrorInfo} object which explains the error. */ - publish(message: Message, options?: PublishOptions): Promise; + publish(message: Message, options?: PublishOptions): Promise; /** * Publishes a single message to the channel with the given event name and payload. * * @param name - The name of the message. * @param data - The payload of the message. * @param options - Optional parameters, such as [`quickAck`](https://faqs.ably.com/why-are-some-rest-publishes-on-a-channel-slow-and-then-typically-faster-on-subsequent-publishes) sent as part of the query string. - * @returns A promise which resolves upon success of the operation and rejects with an {@link ErrorInfo} object upon its failure. + * @returns A promise which, upon success, will be fulfilled with a {@link PublishResult} object containing the serial of the published message. Upon failure, the promise will be rejected with an {@link ErrorInfo} object which explains the error. */ - publish(name: string, data: any, options?: PublishOptions): Promise; + publish(name: string, data: any, options?: PublishOptions): Promise; /** * Retrieves a {@link ChannelDetails} object for the channel, which includes status and occupancy metrics. * @@ -2355,19 +2360,28 @@ export declare interface Channel { * * @param message - A {@link Message} object containing a populated `serial` field and the fields to update. * @param operation - An optional {@link MessageOperation} object containing metadata about the update operation. - * @param params - Optional parameters sent as part of the query string. - * @returns A promise which on success will be fulfilled, and on failure, rejected with an {@link ErrorInfo} object which explains the error. + * @param options - Optional parameters to modify how the publish is made. + * @returns A promise which, upon success, will be fulfilled with an {@link UpdateDeleteResult} object containing the serial of the new version of the message. Upon failure, the promise will be rejected with an {@link ErrorInfo} object which explains the error. */ - updateMessage(message: Message, operation?: MessageOperation, params?: Record): Promise; + updateMessage(message: Message, operation?: MessageOperation, options?: PublishOptions): Promise; /** * Marks a message as deleted by publishing an update with an action of `MESSAGE_DELETE`. This does not remove the message from the server, and the full message history remains accessible. Uses patch semantics: non-null `name`, `data`, and `extras` fields in the provided message will replace the corresponding fields in the existing message, while null fields will be left unchanged (meaning that if you for example want the `MESSAGE_DELETE` to have an empty data, you should explicitly set the `data` to an empty object). * * @param message - A {@link Message} object containing a populated `serial` field. * @param operation - An optional {@link MessageOperation} object containing metadata about the delete operation. - * @param params - Optional parameters sent as part of the query string. - * @returns A promise which on success will be fulfilled, and on failure, rejected with an {@link ErrorInfo} object which explains the error. + * @param options - Optional parameters to modify how the publish is made. + * @returns A promise which, upon success, will be fulfilled with an {@link UpdateDeleteResult} object containing the serial of the new version of the message. Upon failure, the promise will be rejected with an {@link ErrorInfo} object which explains the error. */ - deleteMessage(message: Message, operation?: MessageOperation, params?: Record): Promise; + deleteMessage(message: Message, operation?: MessageOperation, options?: PublishOptions): Promise; + /** + * Appends data to an existing message. The supplied `data` field is appended to the previous message's data, while all other fields (`name`, `extras`) replace the previous values if provided. + * + * @param message - A {@link Message} object containing a populated `serial` field and the data to append. + * @param operation - An optional {@link MessageOperation} object containing metadata about the append operation. + * @param options - Optional parameters to modify how the publish is made. + * @returns A promise which, upon success, will be fulfilled with an {@link UpdateDeleteResult} object containing the serial of the new version of the message. Upon failure, the promise will be rejected with an {@link ErrorInfo} object which explains the error. + */ + appendMessage(message: Message, operation?: MessageOperation, options?: PublishOptions): Promise; /** * Retrieves all historical versions of a specific message, ordered by version. This includes the original message and all subsequent updates or delete operations. * @@ -2564,23 +2578,26 @@ export declare interface RealtimeChannel extends EventEmitter; + publish(name: string, data: any, options?: PublishOptions): Promise; /** * Publishes an array of messages to the channel. When publish is called with this client library, it won't attempt to implicitly attach to the channel. * * @param messages - An array of {@link Message} objects. - * @returns A promise which resolves upon success of the operation and rejects with an {@link ErrorInfo} object upon its failure. + * @param options - Optional parameters sent as part of the protocol message. + * @returns A promise which, upon success, will be fulfilled with a {@link PublishResult} object containing the serials of the published messages. Upon failure, the promise will be rejected with an {@link ErrorInfo} object which explains the error. */ - publish(messages: Message[]): Promise; + publish(messages: Message[], options?: PublishOptions): Promise; /** * Publish a message to the channel. When publish is called with this client library, it won't attempt to implicitly attach to the channel. * * @param message - A {@link Message} object. - * @returns A promise which resolves upon success of the operation and rejects with an {@link ErrorInfo} object upon its failure. + * @param options - Optional parameters sent as part of the protocol message. + * @returns A promise which, upon success, will be fulfilled with a {@link PublishResult} object containing the serial of the published message. Upon failure, the promise will be rejected with an {@link ErrorInfo} object which explains the error. */ - publish(message: Message): Promise; + publish(message: Message, options?: PublishOptions): Promise; /** * If the channel is already in the given state, returns a promise which immediately resolves to `null`. Else, calls {@link EventEmitter.once | `once()`} to return a promise which resolves the next time the channel transitions to the given state. * @@ -2599,19 +2616,28 @@ export declare interface RealtimeChannel extends EventEmitter): Promise; + updateMessage(message: Message, operation?: MessageOperation, options?: PublishOptions): Promise; /** * Marks a message as deleted by publishing an update with an action of `MESSAGE_DELETE`. This does not remove the message from the server, and the full message history remains accessible. Uses patch semantics: non-null `name`, `data`, and `extras` fields in the provided message will replace the corresponding fields in the existing message, while null fields will be left unchanged (meaning that if you for example want the `MESSAGE_DELETE` to have an empty data, you should explicitly set the `data` to an empty object). * * @param message - A {@link Message} object containing a populated `serial` field. * @param operation - An optional {@link MessageOperation} object containing metadata about the delete operation. - * @param params - Optional parameters sent as part of the query string. - * @returns A promise which on success will be fulfilled, and on failure, rejected with an {@link ErrorInfo} object which explains the error. + * @param options - Optional parameters to modify how the publish is made. + * @returns A promise which, upon success, will be fulfilled with an {@link UpdateDeleteResult} object containing the serial of the new version of the message. Upon failure, the promise will be rejected with an {@link ErrorInfo} object which explains the error. + */ + deleteMessage(message: Message, operation?: MessageOperation, options?: PublishOptions): Promise; + /** + * Appends data to an existing message. The supplied `data` field is appended to the previous message's data, while all other fields (`name`, `extras`) replace the previous values if provided. + * + * @param message - A {@link Message} object containing a populated `serial` field and the data to append. + * @param operation - An optional {@link MessageOperation} object containing metadata about the append operation. + * @param options - Optional parameters to modify how the publish is made. + * @returns A promise which, upon success, will be fulfilled with an {@link UpdateDeleteResult} object containing the serial of the new version of the message. Upon failure, the promise will be rejected with an {@link ErrorInfo} object which explains the error. */ - deleteMessage(message: Message, operation?: MessageOperation, params?: Record): Promise; + appendMessage(message: Message, operation?: MessageOperation, options?: PublishOptions): Promise; /** * Retrieves all historical versions of a specific message, ordered by version. This includes the original message and all subsequent updates or delete operations. * @@ -2630,12 +2656,7 @@ export declare interface RealtimeChannel extends EventEmitter; } +/** + * Contains the result of a publish operation. + */ +export interface PublishResult { + /** + * An array of message serials corresponding 1:1 to the messages that were published. + * A serial may be null if the message was discarded due to a configured conflation rule. + */ + serials: (string | null)[]; +} + +/** + * Contains the result of an update or delete message operation. + */ +export interface UpdateDeleteResult { + /** + * The serial of the new version of the updated or deleted message. + * Will be null if the message was superseded by a subsequent update before it could be published. + */ + versionSerial: string | null; +} + /** * The namespace containing the different types of message actions. */ @@ -3003,6 +3046,12 @@ declare namespace MessageActions { * the message.serial is the serial of the message for which this is a summary. */ type MESSAGE_SUMMARY = 'message.summary'; + /** + * Message action for an appended message. The `serial` field identifies the message to which + * data is being appended. The `data` field is appended to the previous message's data, while + * all other fields replace the previous values. + */ + type MESSAGE_APPEND = 'message.append'; } /** @@ -3013,7 +3062,8 @@ export type MessageAction = | MessageActions.MESSAGE_UPDATE | MessageActions.MESSAGE_DELETE | MessageActions.META - | MessageActions.MESSAGE_SUMMARY; + | MessageActions.MESSAGE_SUMMARY + | MessageActions.MESSAGE_APPEND; /** * The namespace containing the different types of annotation actions. diff --git a/src/common/lib/client/realtimeannotations.ts b/src/common/lib/client/realtimeannotations.ts index a4f86d30bc..b87a132994 100644 --- a/src/common/lib/client/realtimeannotations.ts +++ b/src/common/lib/client/realtimeannotations.ts @@ -45,12 +45,12 @@ class RealtimeAnnotations { channel: channelName, annotations: [wireAnnotation], }); - return this.channel.sendMessage(pm); + await this.channel.sendMessage(pm); } async delete(msgOrSerial: string | Message, annotationValues: Partial>): Promise { annotationValues.action = 'annotation.delete'; - return this.publish(msgOrSerial, annotationValues); + await this.publish(msgOrSerial, annotationValues); } async subscribe(..._args: unknown[] /* [type], listener */): Promise { diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index 74792cb4f0..929b1115ed 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -237,24 +237,27 @@ class RealtimeChannel extends EventEmitter { return false; } - async publish(...args: any[]): Promise { + async publish(...args: any[]): Promise { + const first = args[0], + second = args[1]; let messages: Message[]; - let argCount = args.length; - - if (argCount == 1) { - if (Utils.isObject(args[0])) { - messages = [Message.fromValues(args[0])]; - } else if (Array.isArray(args[0])) { - messages = Message.fromValuesArray(args[0]); - } else { - throw new ErrorInfo( - 'The single-argument form of publish() expects a message object or an array of message objects', - 40013, - 400, - ); - } + let params: Record | undefined; + + if (typeof first === 'string' || first === null || first === undefined) { + messages = [Message.fromValues({ name: first, data: second })]; + params = args[2]; + } else if (Utils.isObject(first)) { + messages = [Message.fromValues(first)]; + params = args[1]; + } else if (Array.isArray(first)) { + messages = Message.fromValuesArray(first); + params = args[1]; } else { - messages = [Message.fromValues({ name: args[0], data: args[1] })]; + throw new ErrorInfo( + 'The single-argument form of publish() expects a message object or an array of message objects', + 40013, + 400, + ); } const maxMessageSize = this.client.options.maxMessageSize; // TODO get rid of CipherOptions type assertion, indicates channeloptions types are broken @@ -278,8 +281,14 @@ class RealtimeChannel extends EventEmitter { 'sending message; channel state is ' + this.state + ', message count = ' + wireMessages.length, ); - const pm = protocolMessageFromValues({ action: actions.MESSAGE, channel: this.name, messages: wireMessages }); - return this.sendMessage(pm); + const pm = protocolMessageFromValues({ + action: actions.MESSAGE, + channel: this.name, + messages: wireMessages, + params: params ? Utils.stringifyValues(params) : undefined, + }); + const res = await this.sendMessage(pm); + return res || { serials: [] }; } throwIfUnpublishableState(): void { @@ -486,13 +495,13 @@ class RealtimeChannel extends EventEmitter { connectionManager.send(syncMessage); } - async sendMessage(msg: ProtocolMessage): Promise { + async sendMessage(msg: ProtocolMessage): Promise { return new Promise((resolve, reject) => { - this.connectionManager.send(msg, this.client.options.queueMessages, (err) => { + this.connectionManager.send(msg, this.client.options.queueMessages, (err, publishResponse) => { if (err) { reject(err); } else { - resolve(); + resolve(publishResponse); } }); }); @@ -504,16 +513,16 @@ class RealtimeChannel extends EventEmitter { channel: this.name, presence: presence, }); - return this.sendMessage(msg); + await this.sendMessage(msg); } - sendState(objectMessages: WireObjectMessage[]): Promise { + async sendState(objectMessages: WireObjectMessage[]): Promise { const msg = protocolMessageFromValues({ action: actions.OBJECT, channel: this.name, state: objectMessages, }); - return this.sendMessage(msg); + await this.sendMessage(msg); } // Access to this method is synchronised by ConnectionManager#processChannelMessage, in order to synchronise access to the state stored in _decodingContext. @@ -1023,16 +1032,64 @@ class RealtimeChannel extends EventEmitter { return restMixin.getMessage(this, serialOrMessage); } - async updateMessage(message: Message, operation?: API.MessageOperation, params?: Record): Promise { + async updateMessage( + message: Message, + operation?: API.MessageOperation, + params?: Record, + ): Promise { Logger.logAction(this.logger, Logger.LOG_MICRO, 'RealtimeChannel.updateMessage()', 'channel = ' + this.name); - const restMixin = this.client.rest.channelMixin; - return restMixin.updateDeleteMessage(this, { isDelete: false }, message, operation, params); + return this.sendUpdate(message, 'message.update', operation, params); } - async deleteMessage(message: Message, operation?: API.MessageOperation, params?: Record): Promise { + async deleteMessage( + message: Message, + operation?: API.MessageOperation, + params?: Record, + ): Promise { Logger.logAction(this.logger, Logger.LOG_MICRO, 'RealtimeChannel.deleteMessage()', 'channel = ' + this.name); - const restMixin = this.client.rest.channelMixin; - return restMixin.updateDeleteMessage(this, { isDelete: true }, message, operation, params); + return this.sendUpdate(message, 'message.delete', operation, params); + } + + async appendMessage( + message: Message, + operation?: API.MessageOperation, + params?: Record, + ): Promise { + Logger.logAction(this.logger, Logger.LOG_MICRO, 'RealtimeChannel.appendMessage()', 'channel = ' + this.name); + return this.sendUpdate(message, 'message.append', operation, params); + } + + private async sendUpdate( + message: Message, + action: 'message.update' | 'message.delete' | 'message.append', + operation?: API.MessageOperation, + params?: Record, + ): Promise { + if (!message.serial) { + throw new ErrorInfo( + 'This message lacks a serial and cannot be updated. Make sure you have enabled "Message annotations, updates, and deletes" in channel settings on your dashboard.', + 40003, + 400, + ); + } + + this.throwIfUnpublishableState(); + + const updateDeleteMsg = Message.fromValues({ + ...message, + action: action, + version: operation, + }); + + const wireMessage = await updateDeleteMsg.encode(this.channelOptions as CipherOptions); + const pm = protocolMessageFromValues({ + action: actions.MESSAGE, + channel: this.name, + messages: [wireMessage], + params: params ? Utils.stringifyValues(params) : undefined, + }); + const publishResponse = await this.sendMessage(pm); + return { versionSerial: publishResponse?.serials?.[0] ?? null }; } async getMessageVersions( diff --git a/src/common/lib/client/restchannel.ts b/src/common/lib/client/restchannel.ts index 9ac6c3dc73..5b5cf0be85 100644 --- a/src/common/lib/client/restchannel.ts +++ b/src/common/lib/client/restchannel.ts @@ -21,6 +21,8 @@ import type RestAnnotations from './restannotations'; const MSG_ID_ENTROPY_BYTES = 9; +type RestPublishResponse = API.PublishResult & { channel?: string; messageId?: string }; + function allEmptyIds(messages: Array) { return messages.every(function (message: Message) { return !message.id; @@ -75,7 +77,7 @@ class RestChannel { return this.client.rest.channelMixin.history(this, params); } - async publish(...args: any[]): Promise { + async publish(...args: any[]): Promise { const first = args[0], second = args[1]; let messages: Array; @@ -132,19 +134,31 @@ class RestChannel { ); } - await this._publish(serializeMessage(wireMessages, client._MsgPack, format), headers, params); + return this._publish(serializeMessage(wireMessages, client._MsgPack, format), headers, params); } - async _publish(requestBody: RequestBody | null, headers: Record, params: any): Promise { - await Resource.post( - this.client, - this.client.rest.channelMixin.basePath(this) + '/messages', + async _publish( + requestBody: RequestBody | null, + headers: Record, + params: any, + ): Promise { + const client = this.client; + const format = client.options.useBinaryProtocol ? Utils.Format.msgpack : Utils.Format.json; + const { body, unpacked } = await Resource.post( + client, + client.rest.channelMixin.basePath(this) + '/messages', requestBody, headers, params, null, true, ); + const decoded = + (unpacked ? body : Utils.decodeBody(body, client._MsgPack, format)) || + ({} as RestPublishResponse); + delete decoded['channel']; + delete decoded['messageId']; + return decoded; } async status(): Promise { @@ -156,14 +170,31 @@ class RestChannel { return this.client.rest.channelMixin.getMessage(this, serialOrMessage); } - async updateMessage(message: Message, operation?: API.MessageOperation, params?: Record): Promise { + async updateMessage( + message: Message, + operation?: API.MessageOperation, + params?: Record, + ): Promise { Logger.logAction(this.logger, Logger.LOG_MICRO, 'RestChannel.updateMessage()', 'channel = ' + this.name); - return this.client.rest.channelMixin.updateDeleteMessage(this, { isDelete: false }, message, operation, params); + return this.client.rest.channelMixin.updateDeleteMessage(this, 'message.update', message, operation, params); } - async deleteMessage(message: Message, operation?: API.MessageOperation, params?: Record): Promise { + async deleteMessage( + message: Message, + operation?: API.MessageOperation, + params?: Record, + ): Promise { Logger.logAction(this.logger, Logger.LOG_MICRO, 'RestChannel.deleteMessage()', 'channel = ' + this.name); - return this.client.rest.channelMixin.updateDeleteMessage(this, { isDelete: true }, message, operation, params); + return this.client.rest.channelMixin.updateDeleteMessage(this, 'message.delete', message, operation, params); + } + + async appendMessage( + message: Message, + operation?: API.MessageOperation, + params?: Record, + ): Promise { + Logger.logAction(this.logger, Logger.LOG_MICRO, 'RestChannel.appendMessage()', 'channel = ' + this.name); + return this.client.rest.channelMixin.updateDeleteMessage(this, 'message.append', message, operation, params); } async getMessageVersions( diff --git a/src/common/lib/client/restchannelmixin.ts b/src/common/lib/client/restchannelmixin.ts index dc24fb3dd4..85dae2b732 100644 --- a/src/common/lib/client/restchannelmixin.ts +++ b/src/common/lib/client/restchannelmixin.ts @@ -8,7 +8,6 @@ import { CipherOptions } from '../types/basemessage'; import Defaults from '../util/defaults'; import PaginatedResource, { PaginatedResult } from './paginatedresource'; import Resource from './resource'; -import { RequestBody } from 'common/types/http'; export interface RestHistoryParams { start?: number; @@ -17,15 +16,6 @@ export interface RestHistoryParams { limit?: number; } -type UpdateDeleteRequest = { - serial: string; - data?: any; - name?: string | null; - encoding?: string | null; - extras?: any; - operation?: API.MessageOperation; -}; - export class RestChannelMixin { static basePath(channel: RestChannel | RealtimeChannel) { return '/channels/' + encodeURIComponent(channel.name); @@ -101,11 +91,11 @@ export class RestChannelMixin { static async updateDeleteMessage( channel: RestChannel | RealtimeChannel, - opts: { isDelete: boolean }, + action: 'message.update' | 'message.delete' | 'message.append', message: Message, operation?: API.MessageOperation, params?: Record, - ): Promise { + ): Promise { if (!message.serial) { throw new ErrorInfo( 'This message lacks a serial and cannot be updated. Make sure you have enabled "Message annotations, updates, and deletes" in channel settings on your dashboard.', @@ -119,33 +109,27 @@ export class RestChannelMixin { const headers = Defaults.defaultPostHeaders(client.options); Utils.mixin(headers, client.options.headers); - let encoded: WireMessage | null = null; - if (message.data !== undefined) { - encoded = await Message.fromValues(message).encode(channel.channelOptions as CipherOptions); - } - - const req: UpdateDeleteRequest = { - serial: message.serial, - operation: operation, - name: message.name, - data: encoded && encoded.data, - encoding: encoded && encoded.encoding, - extras: message.extras, - }; + // construct a new Message to avoid mutating the message the user passes in + const requestMessage = Message.fromValues(message); + requestMessage.action = action; + requestMessage.version = operation; - const requestBody: RequestBody = serializeMessage(req, client._MsgPack, format); + const encoded = await requestMessage.encode(channel.channelOptions as CipherOptions); + const requestBody = serializeMessage(encoded, client._MsgPack, format); - const method = opts.isDelete ? Resource.post : Resource.patch; - const pathSuffix = opts.isDelete ? '/delete' : ''; - await method( + let method = Resource.patch; + const { body, unpacked } = await method( client, - this.basePath(channel) + '/messages/' + encodeURIComponent(message.serial) + pathSuffix, + this.basePath(channel) + '/messages/' + encodeURIComponent(message.serial), requestBody, headers, params || {}, null, true, ); + + const decoded = unpacked ? body : Utils.decodeBody(body, client._MsgPack, format); + return decoded || { versionSerial: null }; } static getMessageVersions( diff --git a/src/common/lib/transport/connectionmanager.ts b/src/common/lib/transport/connectionmanager.ts index e2d00a2f41..1e69794add 100644 --- a/src/common/lib/transport/connectionmanager.ts +++ b/src/common/lib/transport/connectionmanager.ts @@ -4,7 +4,7 @@ import ProtocolMessage, { fromValues as protocolMessageFromValues, } from 'common/lib/types/protocolmessage'; import * as Utils from 'common/lib/util/utils'; -import Protocol, { PendingMessage } from './protocol'; +import Protocol, { PendingMessage, PublishCallback } from './protocol'; import Defaults, { getAgentString } from 'common/lib/util/defaults'; import Platform, { TransportImplementations } from 'common/platform'; import EventEmitter from '../util/eventemitter'; @@ -14,11 +14,8 @@ import ConnectionStateChange from 'common/lib/client/connectionstatechange'; import ConnectionErrors, { isRetriable } from './connectionerrors'; import ErrorInfo, { IPartialErrorInfo, PartialErrorInfo } from 'common/lib/types/errorinfo'; import Auth from 'common/lib/client/auth'; -import Message, { getMessagesSize } from 'common/lib/types/message'; -import Multicaster, { MulticasterInstance } from 'common/lib/util/multicaster'; import Transport, { TransportCtor } from './transport'; import * as API from '../../../../ably'; -import { ErrCallback } from 'common/types/utils'; import HttpStatusCodes from 'common/constants/HttpStatusCodes'; import BaseRealtime from '../client/baserealtime'; import { NormalisedClientOptions } from 'common/types/ClientOptions'; @@ -31,44 +28,6 @@ const haveSessionStorage = () => typeof Platform.WebStorage !== 'undefined' && P const noop = function () {}; const transportPreferenceName = 'ably-transport-preference'; -function bundleWith(dest: ProtocolMessage, src: ProtocolMessage, maxSize: number) { - let action; - if (dest.channel !== src.channel) { - /* RTL6d3 */ - return false; - } - if ((action = dest.action) !== actions.PRESENCE && action !== actions.MESSAGE) { - /* RTL6d - can only bundle messages or presence */ - return false; - } - if (action !== src.action) { - /* RTL6d4 */ - return false; - } - const kind = action === actions.PRESENCE ? 'presence' : 'messages', - proposed = (dest as Record)[kind].concat((src as Record)[kind]), - size = getMessagesSize(proposed); - if (size > maxSize) { - /* RTL6d1 */ - return false; - } - if (!Utils.allSame(proposed, 'clientId')) { - /* RTL6d2 */ - return false; - } - if ( - !proposed.every(function (msg: Message) { - return !msg.id; - }) - ) { - /* RTL6d7 */ - return false; - } - /* we're good to go! */ - (dest as Record)[kind] = proposed; - return true; -} - type RecoveryContext = { connectionKey: string; msgSerial: number; @@ -1785,7 +1744,7 @@ class ConnectionManager extends EventEmitter { * event queueing ******************/ - send(msg: ProtocolMessage, queueEvent?: boolean, callback?: ErrCallback): void { + send(msg: ProtocolMessage, queueEvent?: boolean, callback?: PublishCallback): void { callback = callback || noop; const state = this.state; @@ -1839,22 +1798,9 @@ class ConnectionManager extends EventEmitter { } } - queue(msg: ProtocolMessage, callback: ErrCallback): void { + queue(msg: ProtocolMessage, callback: PublishCallback): void { Logger.logAction(this.logger, Logger.LOG_MICRO, 'ConnectionManager.queue()', 'queueing event'); - const lastQueued = this.queuedMessages.last(); - const maxSize = this.options.maxMessageSize; - /* If have already attempted to send a message, don't merge more messages - * into it, as if the previous send actually succeeded and realtime ignores - * the dup, they'll be lost */ - if (lastQueued && !lastQueued.sendAttempted && bundleWith(lastQueued.message, msg, maxSize)) { - if (!lastQueued.merged) { - lastQueued.callback = Multicaster.create(this.logger, [lastQueued.callback]); - lastQueued.merged = true; - } - (lastQueued.callback as MulticasterInstance).push(callback); - } else { - this.queuedMessages.push(new PendingMessage(msg, callback)); - } + this.queuedMessages.push(new PendingMessage(msg, callback)); } sendQueuedMessages(): void { diff --git a/src/common/lib/transport/messagequeue.ts b/src/common/lib/transport/messagequeue.ts index f5d775e0a8..75ef5594f6 100644 --- a/src/common/lib/transport/messagequeue.ts +++ b/src/common/lib/transport/messagequeue.ts @@ -2,6 +2,7 @@ import ErrorInfo from '../types/errorinfo'; import EventEmitter from '../util/eventemitter'; import Logger from '../util/logger'; import { PendingMessage } from './protocol'; +import type * as API from '../../../../ably'; class MessageQueue extends EventEmitter { messages: Array; @@ -44,7 +45,11 @@ class MessageQueue extends EventEmitter { * * @param selector - Describes which messages to target. 'all' means all messages in the queue (regardless of whether they have had a `msgSerial` assigned); `serial` / `count` targets a range of messages described by an `ACK` or `NACK` received from Ably (this assumes that all the messages in the queue have had a `msgSerial` assigned). */ - completeMessages(selector: 'all' | { serial: number; count: number }, err?: ErrorInfo | null): void { + completeMessages( + selector: 'all' | { serial: number; count: number }, + err?: ErrorInfo | null, + res?: API.PublishResult[], + ): void { Logger.logAction( this.logger, Logger.LOG_MICRO, @@ -73,8 +78,10 @@ class MessageQueue extends EventEmitter { } } - for (const message of completeMessages) { - (message.callback as Function)(err); + for (let i = 0; i < completeMessages.length; i++) { + const message = completeMessages[i]; + const publishResponse = res?.[i]; + (message.callback as Function)(err, publishResponse); } if (messages.length == 0) this.emit('idle'); diff --git a/src/common/lib/transport/protocol.ts b/src/common/lib/transport/protocol.ts index a9eb3eb7d6..da82984a7a 100644 --- a/src/common/lib/transport/protocol.ts +++ b/src/common/lib/transport/protocol.ts @@ -6,16 +6,19 @@ import Logger from '../util/logger'; import MessageQueue from './messagequeue'; import ErrorInfo from '../types/errorinfo'; import Transport from './transport'; -import { ErrCallback } from '../../types/utils'; +import { StandardCallback, ErrCallback } from '../../types/utils'; +import * as API from '../../../../ably'; + +export type PublishCallback = StandardCallback; export class PendingMessage { message: ProtocolMessage; - callback?: ErrCallback; + callback?: PublishCallback; merged: boolean; sendAttempted: boolean; ackRequired: boolean; - constructor(message: ProtocolMessage, callback?: ErrCallback) { + constructor(message: ProtocolMessage, callback?: PublishCallback) { this.message = message; this.callback = callback; this.merged = false; @@ -35,17 +38,17 @@ class Protocol extends EventEmitter { super(transport.logger); this.transport = transport; this.messageQueue = new MessageQueue(this.logger); - transport.on('ack', (serial: number, count: number) => { - this.onAck(serial, count); + transport.on('ack', (serial: number, count: number, res?: API.PublishResult[]) => { + this.onAck(serial, count, res); }); transport.on('nack', (serial: number, count: number, err: ErrorInfo) => { this.onNack(serial, count, err); }); } - onAck(serial: number, count: number): void { + onAck(serial: number, count: number, res?: API.PublishResult[]): void { Logger.logAction(this.logger, Logger.LOG_MICRO, 'Protocol.onAck()', 'serial = ' + serial + '; count = ' + count); - this.messageQueue.completeMessages({ serial, count }); + this.messageQueue.completeMessages({ serial, count }, null, res); } onNack(serial: number, count: number, err: ErrorInfo): void { diff --git a/src/common/lib/transport/transport.ts b/src/common/lib/transport/transport.ts index 28024bcb07..afa2db1945 100644 --- a/src/common/lib/transport/transport.ts +++ b/src/common/lib/transport/transport.ts @@ -161,7 +161,7 @@ abstract class Transport extends EventEmitter { this.onDisconnect(message); break; case actions.ACK: - this.emit('ack', message.msgSerial, message.count); + this.emit('ack', message.msgSerial, message.count, message.res); break; case actions.NACK: this.emit('nack', message.msgSerial, message.count, message.error); diff --git a/src/common/lib/types/message.ts b/src/common/lib/types/message.ts index 8516fc05cc..061cde9528 100644 --- a/src/common/lib/types/message.ts +++ b/src/common/lib/types/message.ts @@ -20,12 +20,23 @@ import type RealtimeChannel from '../client/realtimechannel'; import type ErrorInfo from './errorinfo'; type Channel = RestChannel | RealtimeChannel; -const actions: API.MessageAction[] = ['message.create', 'message.update', 'message.delete', 'meta', 'message.summary']; - -function stringifyAction(action: number | undefined): string { +const actions: API.MessageAction[] = [ + 'message.create', + 'message.update', + 'message.delete', + 'meta', + 'message.summary', + 'message.append', +]; + +export function stringifyAction(action: number | undefined): API.MessageAction { return actions[action || 0] || 'unknown'; } +export function encodeAction(action: API.MessageAction): number { + return actions.indexOf(action || 'message.create'); +} + function getMessageSize(msg: WireMessage) { let size = 0; if (msg.name) { diff --git a/src/common/lib/types/protocolmessage.ts b/src/common/lib/types/protocolmessage.ts index 031100c77f..14ab7c9288 100644 --- a/src/common/lib/types/protocolmessage.ts +++ b/src/common/lib/types/protocolmessage.ts @@ -175,6 +175,7 @@ class ProtocolMessage { auth?: unknown; connectionDetails?: Record; params?: Record; + res?: API.PublishResult[]; hasFlag = (flag: string): boolean => { return ((this.flags as number) & flags[flag]) > 0; diff --git a/src/common/lib/util/defaults.ts b/src/common/lib/util/defaults.ts index d544352c2c..76f9b68135 100644 --- a/src/common/lib/util/defaults.ts +++ b/src/common/lib/util/defaults.ts @@ -91,7 +91,7 @@ const Defaults = { maxMessageSize: 65536, version, - protocolVersion: 4, + protocolVersion: 5, agent, getPort, getHttpScheme, diff --git a/src/common/lib/util/utils.ts b/src/common/lib/util/utils.ts index c37d2ca346..9c158442cd 100644 --- a/src/common/lib/util/utils.ts +++ b/src/common/lib/util/utils.ts @@ -245,6 +245,10 @@ export function toQueryString(params?: Record | null): string { return parts.length ? '?' + parts.join('&') : ''; } +export function stringifyValues(params: Record): Record { + return Object.fromEntries(Object.entries(params).map(([k, v]) => [k, String(v)])); +} + export function parseQueryString(query: string): Record { let match; const search = /([^?&=]+)=?([^&]*)/g; diff --git a/src/platform/react-hooks/src/hooks/useChannelAttach.ts b/src/platform/react-hooks/src/hooks/useChannelAttach.ts index 6d525e14ef..533382b3a8 100644 --- a/src/platform/react-hooks/src/hooks/useChannelAttach.ts +++ b/src/platform/react-hooks/src/hooks/useChannelAttach.ts @@ -39,7 +39,7 @@ export function useChannelAttach( logError(ably, reason.toString()); }); } - }, [shouldAttachToTheChannel, channel]); + }, [shouldAttachToTheChannel, channel, ably]); // we expose `connectionState` here for reuse in the usePresence hook, where we need to prevent // entering and leaving presence in a similar manner diff --git a/test/browser/modular.test.js b/test/browser/modular.test.js index 94a0460f9e..008351d02e 100644 --- a/test/browser/modular.test.js +++ b/test/browser/modular.test.js @@ -22,6 +22,7 @@ import { XHRRequest, MessageInteractions, Annotations, + ErrorInfo, } from '../../build/modular/index.mjs'; function registerAblyModularTests(Helper) { @@ -609,12 +610,12 @@ function registerAblyModularTests(Helper) { const channelName = 'channel'; const channel = rest.channels.get(channelName); const contentTypeUsedForPublishPromise = new Promise((resolve, reject) => { + const originalDo = rest.http.do; rest.http.do = async (method, path, headers, body, params) => { - if (!(method == 'post' && path == `/channels/${channelName}/messages`)) { - return new Promise(() => {}); + if (method == 'post' && path == `/channels/${channelName}/messages`) { + resolve(headers['content-type']); } - resolve(headers['content-type']); - return { error: null }; + return originalDo.call(rest.http, method, path, headers, body, params); }; }); diff --git a/test/realtime/init.test.js b/test/realtime/init.test.js index a5d3587e9f..67d8df6609 100644 --- a/test/realtime/init.test.js +++ b/test/realtime/init.test.js @@ -43,7 +43,7 @@ define(['ably', 'shared_helper', 'chai'], function (Ably, Helper, chai) { return transport.recvRequest.recvUri; })(); try { - expect(connectUri.indexOf('v=4') > -1, 'Check uri includes v=4').to.be.ok; + expect(connectUri.indexOf('v=5') > -1, 'Check uri includes v=5').to.be.ok; } catch (err) { helper.closeAndFinish(done, realtime, err); return; diff --git a/test/realtime/message.test.js b/test/realtime/message.test.js index 81a004c3ec..8c3d234eb3 100644 --- a/test/realtime/message.test.js +++ b/test/realtime/message.test.js @@ -1179,82 +1179,6 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async }); }); - /** - * Publish a series of messages that exercise various bundling constraints, check they're satisfied. - * - * @spec RTL6d - * @spec RTL6d1 - * @spec RTL6d2 - * @spec RTL6d3 - * @spec RTL6d5 - * @spec RTL6d6 - * @spec RTL6d7 - * @specskip - */ - it.skip('bundling', function (done) { - var helper = this.test.helper, - realtime = helper.AblyRealtime({ maxMessageSize: 256, autoConnect: false }), - channelOne = realtime.channels.get('bundlingOne'), - channelTwo = realtime.channels.get('bundlingTwo'); - - /* RTL6d3; RTL6d5 */ - channelTwo.publish('2a', { expectedBundle: 0 }); - channelOne.publish('a', { expectedBundle: 1 }); - channelOne.publish([ - { name: 'b', data: { expectedBundle: 1 } }, - { name: 'c', data: { expectedBundle: 1 } }, - ]); - channelOne.publish('d', { expectedBundle: 1 }); - channelTwo.publish('2b', { expectedBundle: 2 }); - channelOne.publish('e', { expectedBundle: 3 }); - channelOne.publish({ name: 'f', data: { expectedBundle: 3 } }); - /* RTL6d2 */ - channelOne.publish({ name: 'g', data: { expectedBundle: 4 }, clientId: 'foo' }); - channelOne.publish({ name: 'h', data: { expectedBundle: 4 }, clientId: 'foo' }); - channelOne.publish({ name: 'i', data: { expectedBundle: 5 }, clientId: 'bar' }); - channelOne.publish('j', { expectedBundle: 6 }); - /* RTL6d1 */ - channelOne.publish('k', { - expectedBundle: 7, - moreData: - 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', - }); - channelOne.publish('l', { expectedBundle: 8 }); - /* RTL6d7 */ - channelOne.publish({ name: 'm', id: 'bundle_m', data: { expectedBundle: 9 } }); - channelOne.publish('z_last', { expectedBundle: 10 }); - - helper.recordPrivateApi('call.transport.onProtocolMessage'); - var queue = realtime.connection.connectionManager.queuedMessages; - var messages; - try { - for (var i = 0; i <= 10; i++) { - messages = queue.messages[i].message.messages || queue.messages[i].message.presence; - for (var j = 0; j < messages.length; j++) { - expect(JSON.parse(messages[j].data).expectedBundle).to.equal(i); - } - } - } catch (err) { - helper.closeAndFinish(done, realtime, err); - return; - } - - /* RTL6d6 */ - var currentName = ''; - channelOne.subscribe(function (msg) { - try { - expect(currentName < msg.name, 'Check final ordering preserved').to.be.ok; - } catch (err) { - helper.closeAndFinish(done, realtime, err); - } - currentName = msg.name; - if (currentName === 'z_last') { - helper.closeAndFinish(done, realtime); - } - }); - realtime.connect(); - }); - /** * @spec RSL1k2 * @spec RSL1k5 diff --git a/test/realtime/updates-deletes.test.js b/test/realtime/updates-deletes.test.js new file mode 100644 index 0000000000..76f14ad36a --- /dev/null +++ b/test/realtime/updates-deletes.test.js @@ -0,0 +1,280 @@ +'use strict'; + +define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async, chai) { + const expect = chai.expect; + + describe('realtime/message-operations', function () { + this.timeout(60 * 1000); + + before(function (done) { + const helper = Helper.forHook(this); + helper.setupApp(function (err) { + if (err) { + done(err); + return; + } + done(); + }); + }); + + /** + * Test that publish returns serials + */ + it('Should return serials from publish', async function () { + const helper = this.test.helper; + const realtime = helper.AblyRealtime(); + + try { + const channel = realtime.channels.get('mutable:rt_publish_serials'); + await channel.attach(); + + const result = await channel.publish('test-message', { value: 'test' }); + expect(result).to.have.property('serials'); + expect(result.serials).to.be.an('array'); + expect(result.serials.length).to.equal(1); + expect(result.serials[0]).to.be.a('string'); + } finally { + realtime.close(); + } + }); + + /** + * Test that publish with multiple messages returns multiple serials + */ + it('Should return multiple serials for batch publish', async function () { + const helper = this.test.helper; + const realtime = helper.AblyRealtime(); + + try { + const channel = realtime.channels.get('mutable:rt_publish_batch_serials'); + await channel.attach(); + + const messages = [ + { name: 'msg1', data: 'data1' }, + { name: 'msg2', data: 'data2' }, + { name: 'msg3', data: 'data3' }, + ]; + + const result = await channel.publish(messages); + expect(result.serials).to.be.an('array'); + expect(result.serials.length).to.equal(3); + result.serials.forEach((serial) => { + expect(serial).to.be.a('string'); + }); + } finally { + realtime.close(); + } + }); + + /** + * Test updateMessage over realtime connection + */ + it('Should update a message over realtime', async function () { + const helper = this.test.helper; + const realtime = helper.AblyRealtime(); + + try { + const channel = realtime.channels.get('mutable:rt_updatesanddeletes_update'); + await channel.attach(); + + // Set up subscription to capture the update + const updatePromise = new Promise((resolve) => { + channel.subscribe((msg) => { + if (msg.action === 'message.update') { + resolve(msg); + } + }); + }); + + const { serials } = await channel.publish('original-message', { value: 'original' }); + const serial = serials[0]; + + const updateMessage = { + serial: serial, + data: { value: 'updated via realtime' }, + }; + + const operation = { + clientId: 'rt-updater-client', + description: 'Realtime update operation', + metadata: { reason: 'testing realtime' }, + }; + + const updateResult = await channel.updateMessage(updateMessage, operation); + expect(updateResult).to.have.property('versionSerial'); + expect(updateResult.versionSerial).to.be.a('string'); + + const updateMsg = await updatePromise; + + expect(updateMsg.serial).to.equal(serial); + expect(updateMsg.version?.serial).to.equal(updateResult.versionSerial); + expect(updateMsg.version?.clientId).to.equal('rt-updater-client'); + expect(updateMsg.version?.description).to.equal('Realtime update operation'); + expect(updateMsg.version?.metadata).to.deep.equal({ reason: 'testing realtime' }); + expect(updateMsg.action).to.equal('message.update'); + expect(updateMsg.data).to.deep.equal({ value: 'updated via realtime' }); + expect(updateMsg.name).to.equal('original-message'); + } finally { + realtime.close(); + } + }); + + /** + * Test deleteMessage over realtime connection + */ + it('Should delete a message over realtime', async function () { + const helper = this.test.helper; + const realtime = helper.AblyRealtime(); + + try { + const channel = realtime.channels.get('mutable:rt_updatesanddeletes_delete'); + await channel.attach(); + + // Set up subscription to capture the delete + const deletePromise = new Promise((resolve) => { + channel.subscribe((msg) => { + if (msg.action === 'message.delete') { + resolve(msg); + } + }); + }); + + const { serials } = await channel.publish('message-to-delete', { value: 'will be deleted' }); + const serial = serials[0]; + + const operation = { + clientId: 'rt-deleter-client', + description: 'Realtime delete operation', + metadata: { reason: 'testing realtime delete' }, + }; + + const deletion = { serial, data: {} }; + + const deleteResult = await channel.deleteMessage(deletion, operation); + expect(deleteResult).to.have.property('versionSerial'); + expect(deleteResult.versionSerial).to.be.a('string'); + + const deleteMsg = await deletePromise; + + expect(deleteMsg.serial).to.equal(serial); + expect(deleteMsg.version?.serial).to.equal(deleteResult.versionSerial); + expect(deleteMsg.data).to.deep.equal({}); + expect(deleteMsg.name).to.equal('message-to-delete'); + expect(deleteMsg.version?.clientId).to.equal('rt-deleter-client'); + expect(deleteMsg.version?.description).to.equal('Realtime delete operation'); + expect(deleteMsg.version?.metadata).to.deep.equal({ reason: 'testing realtime delete' }); + expect(deleteMsg.action).to.equal('message.delete'); + } finally { + realtime.close(); + } + }); + + /** + * Test error handling for updateMessage without serial + */ + it('Should error when called without serial', async function () { + const helper = this.test.helper; + const realtime = helper.AblyRealtime(); + + try { + const channel = realtime.channels.get('mutable:rt_updatesanddeletes_error'); + await channel.attach(); + + try { + await channel.updateMessage({ data: 'test' }); + expect.fail('Should have thrown an error'); + } catch (err) { + expect(err).to.have.property('code', 40003); + } + + try { + await channel.deleteMessage({}); + expect.fail('Should have thrown an error'); + } catch (err) { + expect(err).to.have.property('code', 40003); + } + + try { + await channel.appendMessage({ data: 'test' }); + expect.fail('Should have thrown an error'); + } catch (err) { + expect(err).to.have.property('code', 40003); + } + } finally { + realtime.close(); + } + }); + + /** + * Test appendMessage over realtime connection + */ + it('Should append to a message over realtime', async function () { + const helper = this.test.helper; + const realtime = helper.AblyRealtime(); + + try { + const channel = realtime.channels.get('mutable:rt_updatesanddeletes_append'); + await channel.attach(); + + const appendPromise = new Promise((resolve) => { + channel.subscribe((msg) => { + if (msg.action === 'message.append') { + resolve(msg); + } + }); + }); + + const { serials } = await channel.publish('original-message', 'Hello'); + const serial = serials[0]; + + const appendMessage = { + serial: serial, + data: ' World', + }; + + const operation = { + clientId: 'rt-appender-client', + description: 'Realtime append operation', + metadata: { reason: 'testing realtime append' }, + }; + + const appendResult = await channel.appendMessage(appendMessage, operation); + expect(appendResult).to.have.property('versionSerial'); + expect(appendResult.versionSerial).to.be.a('string'); + + const appendMsg = await appendPromise; + + expect(appendMsg.serial).to.equal(serial); + expect(appendMsg.version?.serial).to.equal(appendResult.versionSerial); + expect(appendMsg.version?.clientId).to.equal('rt-appender-client'); + expect(appendMsg.version?.description).to.equal('Realtime append operation'); + expect(appendMsg.version?.metadata).to.deep.equal({ reason: 'testing realtime append' }); + expect(appendMsg.action).to.equal('message.append'); + expect(appendMsg.name).to.equal('original-message'); + expect(appendMsg.data).to.equal(' World'); + + // now reattach with rewind to get the full concatenated message + await channel.detach(); + const updatePromise = new Promise((resolve) => { + channel.subscribe((msg) => { + resolve(msg); + }); + }); + + await channel.setOptions({ params: { rewind: '1' } }); + await channel.attach(); + const updatedMsg = await updatePromise; + expect(updatedMsg.serial).to.equal(serial); + expect(updatedMsg.version?.serial).to.equal(appendResult.versionSerial); + expect(updatedMsg.version?.clientId).to.equal('rt-appender-client'); + expect(updatedMsg.version?.description).to.equal('Realtime append operation'); + expect(updatedMsg.version?.metadata).to.deep.equal({ reason: 'testing realtime append' }); + expect(updatedMsg.action).to.equal('message.update'); + expect(updatedMsg.name).to.equal('original-message'); + expect(updatedMsg.data).to.equal('Hello World'); + } finally { + realtime.close(); + } + }); + }); +}); diff --git a/test/rest/http.test.js b/test/rest/http.test.js index b6bc179b56..450156fc7a 100644 --- a/test/rest/http.test.js +++ b/test/rest/http.test.js @@ -34,7 +34,7 @@ define(['ably', 'shared_helper', 'chai'], function (Ably, Helper, chai) { // This test should not directly validate version against Defaults.version, as // ultimately the version header has been derived from that value. - expect(headers['X-Ably-Version']).to.equal('4', 'Verify current version number'); + expect(headers['X-Ably-Version']).to.equal('5', 'Verify current version number'); helper.recordPrivateApi('read.Defaults.version'); expect(headers['Ably-Agent'].indexOf('ably-js/' + Defaults.version) > -1, 'Verify agent').to.be.ok; expect(headers['Ably-Agent'].indexOf('custom-agent/0.1.2') > -1, 'Verify custom agent').to.be.ok; diff --git a/test/rest/updates-deletes.test.js b/test/rest/updates-deletes.test.js index 9591052c8d..53c389f730 100644 --- a/test/rest/updates-deletes.test.js +++ b/test/rest/updates-deletes.test.js @@ -7,18 +7,53 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async this.timeout(60 * 1000); before(function (done) { - console.log('Setting up app'); const helper = Helper.forHook(this); helper.setupApp(function (err) { if (err) { - console.log('Error setting up app: ', err); done(err); + return; } - console.log('App setup complete'); done(); }); }); + /** + * Test that publish returns serials + */ + it('Should return serials from publish', async function () { + const helper = this.test.helper; + const rest = helper.AblyRest({}); + const channel = rest.channels.get('mutable:publish_serials'); + + const result = await channel.publish('test-message', { value: 'test' }); + expect(result).to.have.property('serials'); + expect(result.serials).to.be.an('array'); + expect(result.serials.length).to.equal(1); + expect(result.serials[0]).to.be.a('string'); + }); + + /** + * Test that publish with multiple messages returns multiple serials + */ + it('Should return multiple serials for batch publish', async function () { + const helper = this.test.helper; + const rest = helper.AblyRest({}); + const channel = rest.channels.get('mutable:publish_batch_serials'); + + const messages = [ + { name: 'msg1', data: 'data1' }, + { name: 'msg2', data: 'data2' }, + { name: 'msg3', data: 'data3' }, + ]; + + const result = await channel.publish(messages); + expect(result.serials).to.be.an('array'); + expect(result.serials.length).to.equal(3); + result.serials.forEach((serial) => { + expect(serial).to.be.a('string'); + }); + }); + /** * Test getMessage functionality * @@ -29,25 +64,11 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async const rest = helper.AblyRest({}); const channel = rest.channels.get('mutable:updatesanddeletes_get'); - // First publish a message - await channel.publish('test-message', { value: 'original' }); - - // Wait for the message to appear in history - let originalMessage; - await helper.waitFor(async () => { - const page = await channel.history(); - if (page.items.length > 0) { - originalMessage = page.items[0]; - return true; - } - return false; - }, 10000); - - expect(originalMessage.serial, 'Message has a serial').to.be.ok; + const { serials } = await channel.publish('test-message', { value: 'original' }); + const serial = serials[0]; - // Now retrieve the message by serial - const retrievedMessage = await channel.getMessage(originalMessage.serial); - expect(retrievedMessage.serial).to.equal(originalMessage.serial); + const retrievedMessage = await channel.getMessage(serial); + expect(retrievedMessage.serial).to.equal(serial); expect(retrievedMessage.name).to.equal('test-message'); expect(retrievedMessage.data).to.deep.equal({ value: 'original' }); }); @@ -62,23 +83,11 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async const rest = helper.AblyRest({}); const channel = rest.channels.get('mutable:updatesanddeletes_get_obj'); - // First publish a message - await channel.publish('test-message-obj', { value: 'original' }); - - // Wait for the message to appear in history - let originalMessage; - await helper.waitFor(async () => { - const page = await channel.history(); - if (page.items.length > 0) { - originalMessage = page.items[0]; - return true; - } - return false; - }, 10000); + const { serials } = await channel.publish('test-message-obj', { value: 'original' }); + const serial = serials[0]; - // Retrieve the message by passing the Message object - const retrievedMessage = await channel.getMessage(originalMessage); - expect(retrievedMessage.serial).to.equal(originalMessage.serial); + const retrievedMessage = await channel.getMessage({ serial }); + expect(retrievedMessage.serial).to.equal(serial); expect(retrievedMessage.name).to.equal('test-message-obj'); }); @@ -92,23 +101,12 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async const rest = helper.AblyRest({}); const channel = rest.channels.get('mutable:updatesanddeletes_update_meta'); - // First publish a message - await channel.publish('original-message', { value: 'original' }); - - // Wait for the message to appear in history - let originalMessage; - await helper.waitFor(async () => { - const page = await channel.history(); - if (page.items.length > 0) { - originalMessage = page.items[0]; - return true; - } - return false; - }, 10000); + const { serials } = await channel.publish('original-message', { value: 'original' }); + const serial = serials[0]; + const originalMessage = await channel.getMessage(serial); - // Update the message with operation metadata const updateMessage = { - serial: originalMessage.serial, + serial: serial, data: { value: 'updated with metadata' }, }; @@ -118,7 +116,9 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async metadata: { reason: 'testing' }, }; - await channel.updateMessage(updateMessage, operation); + const updateResult = await channel.updateMessage(updateMessage, operation); + expect(updateResult).to.have.property('versionSerial'); + expect(updateResult.versionSerial).to.be.a('string'); // Wait for the update to be the latest message let latestMessage; @@ -126,14 +126,13 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async latestMessage = await channel.getMessage(originalMessage.serial); return latestMessage.data.value === 'updated with metadata'; }, 10000); - expect(latestMessage.serial).to.equal(originalMessage.serial); + expect(latestMessage.serial).to.equal(serial); expect(latestMessage.version?.serial > originalMessage.serial).to.be.ok; expect(latestMessage.version?.timestamp).to.be.greaterThan(originalMessage.timestamp); expect(latestMessage.version?.clientId).to.equal('updater-client'); expect(latestMessage.version?.description).to.equal('Test update operation'); expect(latestMessage.version?.metadata).to.deep.equal({ reason: 'testing' }); expect(latestMessage.action).to.equal('message.update'); - // patch semantics: data was updated, name should be unchanged expect(latestMessage.data).to.deep.equal({ value: 'updated with metadata' }); expect(latestMessage.name).to.equal('original-message'); }); @@ -141,37 +140,28 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async /** * Test deleteMessage with operation metadata * - * @spec RSL13a + * @spec RSL15a */ it('Should delete a message (with operation metadata)', async function () { const helper = this.test.helper; const rest = helper.AblyRest({}); const channel = rest.channels.get('mutable:updatesanddeletes_delete_meta'); - // First publish a message - await channel.publish('message-to-delete', { value: 'will be deleted' }); + const { serials } = await channel.publish('message-to-delete', { value: 'will be deleted' }); + const serial = serials[0]; + const originalMessage = await channel.getMessage(serial); - // Wait for the message to appear in history - let originalMessage; - await helper.waitFor(async () => { - const page = await channel.history(); - if (page.items.length > 0) { - originalMessage = page.items[0]; - return true; - } - return false; - }, 10000); - - // Delete the message with operation metadata const operation = { clientId: 'deleter-client', description: 'Test delete operation', metadata: { reason: 'inappropriate content' }, }; - const deletion = Object.assign({}, originalMessage, { data: {} }); + const deletion = { serial, data: {} }; - await channel.deleteMessage(deletion, operation); + const deleteResult = await channel.deleteMessage(deletion, operation); + expect(deleteResult).to.have.property('versionSerial'); + expect(deleteResult.versionSerial).to.be.a('string'); // Wait for the delete to be the latest message let latestMessage; @@ -179,9 +169,8 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async latestMessage = await channel.getMessage(originalMessage.serial); return latestMessage.action === 'message.delete'; }, 10000); - expect(latestMessage.serial).to.equal(originalMessage.serial); + expect(latestMessage.serial).to.equal(serial); expect(latestMessage.data).to.deep.equal({}); - // expect name to be still present if not explicitly erased expect(latestMessage.name).to.equal('message-to-delete'); expect(latestMessage.version?.serial > originalMessage.serial).to.be.ok; expect(latestMessage.version?.timestamp).to.be.greaterThan(originalMessage.timestamp); @@ -201,37 +190,25 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async const rest = helper.AblyRest({}); const channel = rest.channels.get('mutable:updatesanddeletes_versions'); - // First publish a message - await channel.publish('versioned-message', { value: 'version-1' }); + const { serials } = await channel.publish('versioned-message', { value: 'version-1' }); + const serial = serials[0]; - // Wait for the message to appear in history - let originalMessage; - await helper.waitFor(async () => { - const page = await channel.history(); - if (page.items.length > 0) { - originalMessage = page.items[0]; - return true; - } - return false; - }, 10000); - - // Update the message const updateMessage = { - serial: originalMessage.serial, + serial: serial, data: { value: 'version-2' }, }; await channel.updateMessage(updateMessage); - // Get all versions + // Wait for versions to be available let items; await helper.waitFor(async () => { - const versionsPage = await channel.getMessageVersions(originalMessage.serial); + const versionsPage = await channel.getMessageVersions(serial); items = versionsPage.items; return items.length >= 2; }, 10000); + expect(items.length).to.be.at.least(2); - // Check that we have both the original and the update const actions = items.map((m) => m.action); expect(actions).to.include('message.create'); expect(actions).to.include('message.update'); @@ -300,5 +277,67 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async expect(err).to.have.property('code', 40003); } }); + + /** + * Test appendMessage with operation metadata + * + * @spec RSL15 + */ + it('Should append to a message (with operation metadata)', async function () { + const helper = this.test.helper; + const rest = helper.AblyRest({}); + const channel = rest.channels.get('mutable:updatesanddeletes_append_meta'); + + const { serials } = await channel.publish('original-message', 'Hello'); + const serial = serials[0]; + const originalMessage = await channel.getMessage(serial); + + const appendMessage = { + serial: serial, + data: ' World', + }; + + const operation = { + clientId: 'appender-client', + description: 'Test append operation', + metadata: { reason: 'testing append' }, + }; + + const appendResult = await channel.appendMessage(appendMessage, operation); + expect(appendResult).to.have.property('versionSerial'); + expect(appendResult.versionSerial).to.be.a('string'); + + // Wait for the append to be the latest message + let latestMessage; + await helper.waitFor(async () => { + latestMessage = await channel.getMessage(originalMessage.serial); + return latestMessage.data === 'Hello World'; + }, 10000); + expect(latestMessage.serial).to.equal(serial); + expect(latestMessage.version?.serial > originalMessage.serial).to.be.ok; + expect(latestMessage.version?.timestamp).to.be.greaterThan(originalMessage.timestamp); + expect(latestMessage.version?.clientId).to.equal('appender-client'); + expect(latestMessage.version?.description).to.equal('Test append operation'); + expect(latestMessage.version?.metadata).to.deep.equal({ reason: 'testing append' }); + expect(latestMessage.action).to.equal('message.update'); + expect(latestMessage.data).to.equal('Hello World'); + expect(latestMessage.name).to.equal('original-message'); + }); + + /** + * Test error handling for appendMessage without serial + */ + it('Should error when appendMessage called without serial', async function () { + const helper = this.test.helper; + const rest = helper.AblyRest({}); + const channel = rest.channels.get('mutable:updatesanddeletes_error'); + + try { + await channel.appendMessage({ data: 'test' }); + expect.fail('Should have thrown an error'); + } catch (err) { + expect(err).to.have.property('code', 40003); + } + }); }); });