Skip to content

Commit 510e69d

Browse files
committed
[ECO-5065] feat: add support for managing message annotations via REST and Realtime APIs
Introduced new classes and functionality to enable creating, retrieving, and deleting annotations on messages, both synchronously and asynchronously. Extended protocol support to handle annotation operations and added serialization/deserialization logic for annotations and summaries. Add 256 bit AES CBC encrypted variable length data generated by Java client library SDK (#49)
1 parent 41e3213 commit 510e69d

16 files changed

Lines changed: 1321 additions & 32 deletions

lib/src/main/java/io/ably/lib/realtime/ChannelBase.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io.ably.lib.http.HttpUtils;
1616
import io.ably.lib.objects.LiveObjects;
1717
import io.ably.lib.objects.LiveObjectsPlugin;
18+
import io.ably.lib.rest.RestAnnotation;
1819
import io.ably.lib.transport.ConnectionManager;
1920
import io.ably.lib.transport.ConnectionManager.QueuedMessage;
2021
import io.ably.lib.transport.Defaults;
@@ -105,6 +106,8 @@ public LiveObjects getObjects() throws AblyException {
105106
return liveObjectsPlugin.getInstance(name);
106107
}
107108

109+
public final RealtimeAnnotation annotations;
110+
108111
/***
109112
* internal
110113
*
@@ -887,7 +890,7 @@ private void onMessage(final ProtocolMessage protocolMessage) {
887890
if(msg.createdAt == null && msg.action == MessageAction.MESSAGE_CREATE) msg.createdAt = msg.timestamp;
888891

889892
try {
890-
msg.decode(options, decodingContext);
893+
if (msg.data != null) msg.decode(options, decodingContext);
891894
} catch (MessageDecodeException e) {
892895
if (e.errorInfo.code == 40018) {
893896
Log.e(TAG, String.format(Locale.ROOT, "Delta message decode failure - %s. Message id = %s, channel = %s", e.errorInfo.message, msg.id, name));
@@ -1310,6 +1313,10 @@ else if(stateChange.current.equals(failureState)) {
13101313
state = ChannelState.initialized;
13111314
this.decodingContext = new DecodingContext();
13121315
this.liveObjectsPlugin = liveObjectsPlugin;
1316+
this.annotations = new RealtimeAnnotation(
1317+
this,
1318+
new RestAnnotation(name, ably.http, ably.options, options)
1319+
);
13131320
}
13141321

13151322
void onChannelMessage(ProtocolMessage msg) {
@@ -1376,6 +1383,9 @@ void onChannelMessage(ProtocolMessage msg) {
13761383
case error:
13771384
setFailed(msg.error);
13781385
break;
1386+
case annotation:
1387+
annotations.onAnnotation(msg);
1388+
break;
13791389
default:
13801390
Log.e(TAG, "onChannelMessage(): Unexpected message action (" + msg.action + ")");
13811391
}
@@ -1402,6 +1412,17 @@ public void once(ChannelState state, ChannelStateListener listener) {
14021412
super.once(state.getChannelEvent(), listener);
14031413
}
14041414

1415+
/**
1416+
* (Internal) Sends a protocol message and provides a callback for completion.
1417+
*
1418+
* @param protocolMessage the protocol message to be sent
1419+
* @param listener the listener to be notified upon completion of the message delivery
1420+
*/
1421+
public void sendProtocolMessage(ProtocolMessage protocolMessage, CompletionListener listener) throws AblyException {
1422+
ConnectionManager connectionManager = ably.connection.connectionManager;
1423+
connectionManager.send(protocolMessage, ably.options.queueMessages, listener);
1424+
}
1425+
14051426
private static final String TAG = Channel.class.getName();
14061427
final AblyRealtime ably;
14071428
final String basePath;
Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
package io.ably.lib.realtime;
2+
3+
import io.ably.lib.rest.RestAnnotation;
4+
import io.ably.lib.types.AblyException;
5+
import io.ably.lib.types.Annotation;
6+
import io.ably.lib.types.AnnotationAction;
7+
import io.ably.lib.types.AsyncPaginatedResult;
8+
import io.ably.lib.types.Callback;
9+
import io.ably.lib.types.ErrorInfo;
10+
import io.ably.lib.types.MessageDecodeException;
11+
import io.ably.lib.types.PaginatedResult;
12+
import io.ably.lib.types.Param;
13+
import io.ably.lib.types.ProtocolMessage;
14+
import io.ably.lib.util.Log;
15+
import io.ably.lib.util.Multicaster;
16+
17+
import java.util.ArrayList;
18+
import java.util.HashMap;
19+
import java.util.List;
20+
import java.util.Locale;
21+
import java.util.Map;
22+
23+
/**
24+
* RealtimeAnnotation provides subscription capabilities for annotations received on a channel.
25+
* It allows adding or removing listeners to handle annotation events and facilitates broadcasting
26+
* those events to the appropriate listeners.
27+
* <p>
28+
* Note: This is an experimental API. While the underlying functionality is stable,
29+
* the public API may change in future releases.
30+
*/
31+
public class RealtimeAnnotation {
32+
33+
private static final String TAG = RealtimeAnnotation.class.getName();
34+
35+
private final ChannelBase channel;
36+
private final RestAnnotation restAnnotation;
37+
private final AnnotationMulticaster listeners = new AnnotationMulticaster();
38+
private final Map<String, AnnotationMulticaster> typeListeners = new HashMap<>();
39+
40+
public RealtimeAnnotation(ChannelBase channel, RestAnnotation restAnnotation) {
41+
this.channel = channel;
42+
this.restAnnotation = restAnnotation;
43+
}
44+
45+
public void publish(String messageSerial, Annotation annotation, CompletionListener listener) throws AblyException {
46+
Log.v(TAG, String.format("publish(MsgSerial, Annotation); channel = %s", channel.name));
47+
48+
// (RSAN1, RSAN1a3)
49+
if (annotation.type == null) {
50+
throw AblyException.fromErrorInfo(new ErrorInfo("Annotation type must be specified", 400, 40000));
51+
}
52+
53+
// (RSAN1, RSAN1c1)
54+
annotation.messageSerial = messageSerial;
55+
// (RSAN1, RSAN1c2)
56+
if (annotation.action == null) {
57+
annotation.action = AnnotationAction.ANNOTATION_CREATE;
58+
}
59+
60+
try {
61+
// (RSAN1, RSAN1c3)
62+
annotation.encode(channel.options);
63+
} catch (MessageDecodeException e) {
64+
throw AblyException.fromThrowable(e);
65+
}
66+
67+
Log.v(TAG, String.format("RealtimeAnnotations.publish(): channelName = %s, sending annotation with messageSerial = %s, type = %s",
68+
channel.name, messageSerial, annotation.type));
69+
70+
ProtocolMessage protocolMessage = new ProtocolMessage();
71+
protocolMessage.action = ProtocolMessage.Action.annotation;
72+
protocolMessage.channel = channel.name;
73+
protocolMessage.annotations = new Annotation[]{annotation};
74+
75+
channel.sendProtocolMessage(protocolMessage, listener);
76+
}
77+
78+
public void publish(String messageSerial, Annotation annotation) throws AblyException {
79+
publish(messageSerial, annotation, null);
80+
}
81+
82+
public void delete(String messageSerial, Annotation annotation, CompletionListener listener) throws AblyException {
83+
Log.v(TAG, String.format("delete(MsgSerial, Annotation); channel = %s", channel.name));
84+
annotation.action = AnnotationAction.ANNOTATION_DELETE;
85+
publish(messageSerial, annotation, listener);
86+
}
87+
88+
public void delete(String messageSerial, Annotation annotation) throws AblyException {
89+
delete(messageSerial, annotation, null);
90+
}
91+
92+
/**
93+
* Retrieves a paginated list of annotations associated with the specified message serial.
94+
* <p>
95+
* Note: This is an experimental API. While the underlying functionality is stable,
96+
* the public API may change in future releases.
97+
*
98+
* @param messageSerial the unique serial identifier for the message being annotated.
99+
* @param params an array of query parameters for filtering or modifying the request.
100+
* @return a {@link PaginatedResult} containing the matching annotations.
101+
* @throws AblyException if an error occurs during the retrieval process.
102+
*/
103+
public PaginatedResult<Annotation> get(String messageSerial, Param[] params) throws AblyException {
104+
return restAnnotation.get(messageSerial, params);
105+
}
106+
107+
/**
108+
* Retrieves a paginated list of annotations associated with the specified message serial.
109+
* <p>
110+
* Note: This is an experimental API. While the underlying functionality is stable,
111+
* the public API may change in future releases.
112+
*
113+
* @param messageSerial the unique serial identifier for the message being annotated
114+
* @return a PaginatedResult containing the matching annotations
115+
* @throws AblyException if an error occurs during the retrieval process
116+
*/
117+
public PaginatedResult<Annotation> get(String messageSerial) throws AblyException {
118+
return restAnnotation.get(messageSerial, null);
119+
}
120+
121+
/**
122+
* Asynchronously retrieves a paginated list of annotations associated with the specified message serial.
123+
* <p>
124+
* Note: This is an experimental API. While the underlying functionality is stable,
125+
* the public API may change in future releases.
126+
*
127+
* @param messageSerial the unique serial identifier for the message being annotated.
128+
* @param params an array of query parameters for filtering or modifying the request.
129+
* @param callback a callback to handle the result asynchronously, providing an {@link AsyncPaginatedResult} containing the matching annotations.
130+
*/
131+
public void getAsync(String messageSerial, Param[] params, Callback<AsyncPaginatedResult<Annotation>> callback) {
132+
restAnnotation.getAsync(messageSerial, params, callback);
133+
}
134+
135+
/**
136+
* Asynchronously retrieves a paginated list of annotations associated with the specified message serial.
137+
* <p>
138+
* Note: This is an experimental API. While the underlying functionality is stable,
139+
* the public API may change in future releases.
140+
*
141+
* @param messageSerial the unique serial identifier for the message being annotated.
142+
* @param callback a callback to handle the result asynchronously, providing an {@link AsyncPaginatedResult} containing the matching annotations.
143+
*/
144+
public void getAsync(String messageSerial, Callback<AsyncPaginatedResult<Annotation>> callback) {
145+
restAnnotation.getAsync(messageSerial, null, callback);
146+
}
147+
148+
/**
149+
* Subscribes the given {@link AnnotationListener} to the channel, allowing it to receive annotations.
150+
* If the channel's attach on subscribe option is enabled, the channel is attached automatically.
151+
* <p>
152+
* Note: This is an experimental API. While the underlying functionality is stable,
153+
* the public API may change in future releases.
154+
*
155+
* @param listener the listener to be subscribed to the channel
156+
* @throws AblyException if an error occurs during channel attachment
157+
*/
158+
public synchronized void subscribe(AnnotationListener listener) throws AblyException {
159+
Log.v(TAG, String.format("subscribe(); annotations in channel = %s", channel.name));
160+
listeners.add(listener);
161+
if (channel.attachOnSubscribeEnabled()) {
162+
channel.attach();
163+
}
164+
}
165+
166+
/**
167+
* Unsubscribes the specified {@link AnnotationListener} from the channel, stopping it
168+
* from receiving further annotations. Any corresponding type-specific listeners
169+
* associated with the listener are also removed.
170+
* <p>
171+
* Note: This is an experimental API. While the underlying functionality is stable,
172+
* the public API may change in future releases.
173+
*
174+
* @param listener the {@link AnnotationListener} to be unsubscribed
175+
*/
176+
public synchronized void unsubscribe(AnnotationListener listener) {
177+
Log.v(TAG, String.format("unsubscribe(); annotations in channel = %s", channel.name));
178+
listeners.remove(listener);
179+
for (AnnotationMulticaster multicaster : typeListeners.values()) {
180+
multicaster.remove(listener);
181+
}
182+
}
183+
184+
/**
185+
* Subscribes the given {@link AnnotationListener} to the channel for a specific annotation type,
186+
* allowing it to receive annotations of the specified type. If the channel's attach on subscribe
187+
* option is enabled, the channel is attached automatically.
188+
* <p>
189+
* Note: This is an experimental API. While the underlying functionality is stable,
190+
* the public API may change in future releases.
191+
*
192+
* @param type the specific annotation type to subscribe to; if null, subscribes to all types
193+
* @param listener the {@link AnnotationListener} to be subscribed
194+
*/
195+
public synchronized void subscribe(String type, AnnotationListener listener) throws AblyException {
196+
Log.v(TAG, String.format("subscribe(); annotations in channel = %s; single type = %s", channel.name, type));
197+
subscribeImpl(type, listener);
198+
if (channel.attachOnSubscribeEnabled()) {
199+
channel.attach();
200+
}
201+
}
202+
203+
/**
204+
* Unsubscribes the specified {@link AnnotationListener} from receiving annotations
205+
* of a particular type within the channel. If there are no remaining listeners
206+
* for the specified type, the type-specific listener collection is also removed.
207+
* <p>
208+
* Note: This is an experimental API. While the underlying functionality is stable,
209+
* the public API may change in future releases.
210+
*
211+
* @param type the specific annotation type to unsubscribe from; if null, unsubscribes
212+
* from all annotations associated with the listener
213+
* @param listener the {@link AnnotationListener} to be unsubscribed
214+
*/
215+
public synchronized void unsubscribe(String type, AnnotationListener listener) {
216+
Log.v(TAG, String.format("unsubscribe(); annotations in channel = %s; single type = %s", channel.name, type));
217+
unsubscribeImpl(type, listener);
218+
}
219+
220+
/**
221+
* Internal method. Handles incoming annotation messages from the protocol layer.
222+
*
223+
* @param protocolMessage the protocol message containing annotation data
224+
*/
225+
public void onAnnotation(ProtocolMessage protocolMessage) {
226+
List<Annotation> annotations = new ArrayList<>();
227+
for (int i = 0; i < protocolMessage.annotations.length; i++) {
228+
Annotation annotation = protocolMessage.annotations[i];
229+
try {
230+
if (annotation.data != null) annotation.decode(channel.options);
231+
} catch (MessageDecodeException e) {
232+
Log.e(TAG, String.format(Locale.ROOT, "%s on channel %s", e.errorInfo.message, channel.name));
233+
}
234+
/* populate fields derived from protocol message */
235+
if (annotation.connectionId == null) annotation.connectionId = protocolMessage.connectionId;
236+
if (annotation.timestamp == 0) annotation.timestamp = protocolMessage.timestamp;
237+
if (annotation.id == null) annotation.id = protocolMessage.id + ':' + i;
238+
annotations.add(annotation);
239+
}
240+
broadcastAnnotation(annotations);
241+
}
242+
243+
private void broadcastAnnotation(List<Annotation> annotations) {
244+
for (Annotation annotation : annotations) {
245+
listeners.onAnnotation(annotation);
246+
247+
String type = annotation.type != null ? annotation.type : "";
248+
AnnotationMulticaster eventListener = typeListeners.get(type);
249+
if (eventListener != null) eventListener.onAnnotation(annotation);
250+
}
251+
}
252+
253+
private void subscribeImpl(String type, AnnotationListener listener) {
254+
String annotationType = type != null ? type : "";
255+
AnnotationMulticaster typeSpecificListeners = typeListeners.get(annotationType);
256+
if (typeSpecificListeners == null) {
257+
typeSpecificListeners = new AnnotationMulticaster();
258+
typeListeners.put(annotationType, typeSpecificListeners);
259+
}
260+
typeSpecificListeners.add(listener);
261+
}
262+
263+
private void unsubscribeImpl(String type, AnnotationListener listener) {
264+
AnnotationMulticaster listeners = typeListeners.get(type);
265+
if (listeners != null) {
266+
listeners.remove(listener);
267+
if (listeners.isEmpty()) {
268+
typeListeners.remove(type);
269+
}
270+
}
271+
}
272+
273+
public interface AnnotationListener {
274+
void onAnnotation(Annotation annotation);
275+
}
276+
277+
private static class AnnotationMulticaster extends Multicaster<AnnotationListener> implements AnnotationListener {
278+
@Override
279+
public void onAnnotation(Annotation annotation) {
280+
for (final AnnotationListener member : getMembers()) {
281+
try {
282+
member.onAnnotation(annotation);
283+
} catch (Exception e) {
284+
Log.e(TAG, e.getMessage(), e);
285+
}
286+
}
287+
}
288+
}
289+
}

lib/src/main/java/io/ably/lib/rest/ChannelBase.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ public class ChannelBase {
3838
*/
3939
public final Presence presence;
4040

41+
/**
42+
* Represents the annotations associated with a channel message.
43+
* This field provides functionality for managing annotations.
44+
*/
45+
public final RestAnnotation annotations;
46+
47+
4148
/**
4249
* Publish a message on this channel using the REST API.
4350
* Since the REST API is stateless, this request is made independently
@@ -315,6 +322,7 @@ private BasePaginatedQuery.ResultRequest<PresenceMessage> historyImpl(Http http,
315322
this.options = options;
316323
this.basePath = "/channels/" + HttpUtils.encodeURIComponent(name);
317324
this.presence = new Presence();
325+
this.annotations = new RestAnnotation(name, ably.http, ably.options, options);
318326
}
319327

320328
private final AblyBase ably;

0 commit comments

Comments
 (0)