Skip to content

Commit 47b3c86

Browse files
committed
[ECO-5426] refactor: created ObjectsManager for handling incomingobjects
1 parent 6982441 commit 47b3c86

File tree

3 files changed

+258
-260
lines changed

3 files changed

+258
-260
lines changed
Lines changed: 21 additions & 260 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,35 @@
11
package io.ably.lib.objects
22

3-
import io.ably.lib.objects.type.BaseLiveObject
4-
import io.ably.lib.objects.type.livecounter.DefaultLiveCounter
5-
import io.ably.lib.objects.type.livemap.DefaultLiveMap
63
import io.ably.lib.types.Callback
74
import io.ably.lib.types.ProtocolMessage
85
import io.ably.lib.util.Log
9-
import java.util.concurrent.ConcurrentHashMap
6+
7+
/**
8+
* @spec RTO2 - enum representing objects state
9+
*/
10+
internal enum class ObjectsState {
11+
INITIALIZED,
12+
SYNCING,
13+
SYNCED
14+
}
1015

1116
/**
1217
* Default implementation of LiveObjects interface.
1318
* Provides the core functionality for managing live objects on a channel.
14-
*
15-
* @spec RTO1 - Provides access to the root LiveMap object
16-
* @spec RTO2 - Validates channel modes for operations
17-
* @spec RTO3 - Maintains an objects pool for all live objects on the channel
18-
* @spec RTO4 - Handles channel attachment and sync initiation
19-
* @spec RTO5 - Processes OBJECT_SYNC messages during sync sequences
20-
* @spec RTO6 - Creates zero-value objects when needed
2119
*/
22-
internal class DefaultLiveObjects(private val channelName: String, private val adapter: LiveObjectsAdapter): LiveObjects {
20+
internal class DefaultLiveObjects(private val channelName: String, internal val adapter: LiveObjectsAdapter): LiveObjects {
2321
private val tag = "DefaultLiveObjects"
24-
25-
/**
26-
* @spec RTO2 - Objects state enum matching JavaScript ObjectsState
27-
*/
28-
private enum class ObjectsState {
29-
INITIALIZED,
30-
SYNCING,
31-
SYNCED
32-
}
33-
34-
private var state = ObjectsState.INITIALIZED
35-
3622
/**
3723
* @spec RTO3 - Objects pool storing all live objects by object ID
3824
*/
39-
private val objectsPool = ObjectsPool(adapter)
25+
internal val objectsPool = ObjectsPool(adapter)
26+
27+
internal var state = ObjectsState.INITIALIZED
4028

4129
/**
42-
* @spec RTO5 - Sync objects data pool for collecting sync messages
43-
*/
44-
private val syncObjectsDataPool = ConcurrentHashMap<String, ObjectState>()
45-
private var currentSyncId: String? = null
46-
/**
47-
* @spec RTO7 - Buffered object operations during sync
30+
* @spec RTO4 - Used for handling object messages and object sync messages
4831
*/
49-
private val bufferedObjectOperations = mutableListOf<ObjectMessage>() // RTO7a
32+
private val objectsManager = ObjectsManager(this)
5033

5134
/**
5235
* @spec RTO1 - Returns the root LiveMap object with proper validation and sync waiting
@@ -93,17 +76,14 @@ internal class DefaultLiveObjects(private val channelName: String, private val a
9376

9477
/**
9578
* Handles a ProtocolMessage containing proto action as `object` or `object_sync`.
96-
* This method implements the same logic as the JavaScript handleObjectMessages and handleObjectSyncMessages.
9779
*
9880
* @spec RTL1 - Processes incoming object messages and object sync messages
9981
* @spec RTL15b - Sets channel serial for OBJECT messages
10082
* @spec OM2 - Populates missing fields from parent protocol message
10183
*/
10284
fun handle(protocolMessage: ProtocolMessage) {
10385
// RTL15b - Set channel serial for OBJECT messages
104-
if (protocolMessage.action == ProtocolMessage.Action.`object`) {
105-
setChannelSerial(protocolMessage.channelSerial)
106-
}
86+
adapter.setChannelSerial(channelName, protocolMessage)
10787

10888
if (protocolMessage.state == null || protocolMessage.state.isEmpty()) {
10989
Log.w(tag, "Received ProtocolMessage with null or empty objects, ignoring")
@@ -121,217 +101,18 @@ internal class DefaultLiveObjects(private val channelName: String, private val a
121101
}
122102

123103
when (protocolMessage.action) {
124-
ProtocolMessage.Action.`object` -> handleObjectMessages(objects)
125-
ProtocolMessage.Action.object_sync -> handleObjectSyncMessages(objects, protocolMessage.channelSerial)
104+
ProtocolMessage.Action.`object` -> objectsManager.handleObjectMessages(objects)
105+
ProtocolMessage.Action.object_sync -> objectsManager.handleObjectSyncMessages(objects, protocolMessage.channelSerial)
126106
else -> Log.w(tag, "Ignoring protocol message with unhandled action: ${protocolMessage.action}")
127107
}
128108
}
129109

130-
/**
131-
* Handles object messages (non-sync messages).
132-
*
133-
* @spec RTO8 - Buffers messages if not synced, applies immediately if synced
134-
*/
135-
private fun handleObjectMessages(objectMessages: List<ObjectMessage>) {
136-
if (state != ObjectsState.SYNCED) {
137-
// RTO7 - The client receives object messages in realtime over the channel concurrently with the sync sequence.
138-
// Some of the incoming object messages may have already been applied to the objects described in
139-
// the sync sequence, but others may not; therefore we must buffer these messages so that we can apply
140-
// them to the objects once the sync is complete.
141-
Log.v(tag, "Buffering ${objectMessages.size} object messages, state: $state")
142-
bufferedObjectOperations.addAll(objectMessages) // RTO8a
143-
return
144-
}
145-
146-
// Apply messages immediately if synced
147-
applyObjectMessages(objectMessages) // RTO8b
148-
}
149-
150-
/**
151-
* Handles object sync messages.
152-
*
153-
* @spec RTO5 - Parses sync channel serial and manages sync sequences
154-
*/
155-
private fun handleObjectSyncMessages(objectMessages: List<ObjectMessage>, syncChannelSerial: String?) {
156-
val (syncId, syncCursor) = parseSyncChannelSerial(syncChannelSerial) // RTO5a
157-
val newSyncSequence = currentSyncId != syncId
158-
if (newSyncSequence) {
159-
// RTO5a2 - new sync sequence started
160-
startNewSync(syncId)
161-
}
162-
163-
// RTO5a3 - continue current sync sequence
164-
applyObjectSyncMessages(objectMessages) // RTO5b
165-
166-
// RTO5a4 - if this is the last (or only) message in a sequence of sync updates, end the sync
167-
if (syncChannelSerial.isNullOrEmpty() || syncCursor.isNullOrEmpty()) {
168-
// defer the state change event until the next tick if this was a new sync sequence
169-
// to allow any event listeners to process the start of the new sequence event that was emitted earlier during this event loop.
170-
endSync(newSyncSequence)
171-
}
172-
}
173-
174-
/**
175-
* Parses sync channel serial to extract syncId and syncCursor.
176-
*
177-
* @spec RTO5 - Sync channel serial parsing logic
178-
*/
179-
private fun parseSyncChannelSerial(syncChannelSerial: String?): Pair<String?, String?> {
180-
if (syncChannelSerial.isNullOrEmpty()) {
181-
return Pair(null, null)
182-
}
183-
184-
// RTO5a1 - syncChannelSerial is a two-part identifier: <sequence id>:<cursor value>
185-
val match = Regex("^([\\w-]+):(.*)$").find(syncChannelSerial)
186-
return if (match != null) {
187-
val syncId = match.groupValues[1]
188-
val syncCursor = match.groupValues[2]
189-
Pair(syncId, syncCursor)
190-
} else {
191-
Pair(null, null)
192-
}
193-
}
194-
195-
/**
196-
* Starts a new sync sequence.
197-
*
198-
* @spec RTO5 - Sync sequence initialization
199-
*/
200-
private fun startNewSync(syncId: String?) {
201-
Log.v(tag, "Starting new sync sequence: syncId=$syncId")
202-
203-
// need to discard all buffered object operation messages on new sync start
204-
bufferedObjectOperations.clear() // RTO5a2b
205-
syncObjectsDataPool.clear() // RTO5a2a
206-
currentSyncId = syncId
207-
stateChange(ObjectsState.SYNCING, false)
208-
}
209-
210-
/**
211-
* Ends the current sync sequence.
212-
*
213-
* @spec RTO5c - Applies sync data and buffered operations
214-
*/
215-
private fun endSync(deferStateEvent: Boolean) {
216-
Log.v(tag, "Ending sync sequence")
217-
applySync()
218-
// should apply buffered object operations after we applied the sync.
219-
// can use regular non-sync object.operation logic
220-
applyObjectMessages(bufferedObjectOperations) // RTO5c6
221-
222-
bufferedObjectOperations.clear() // RTO5c5
223-
syncObjectsDataPool.clear() // RTO5c4
224-
currentSyncId = null // RTO5c3
225-
stateChange(ObjectsState.SYNCED, deferStateEvent)
226-
}
227-
228-
/**
229-
* Applies sync data to objects pool.
230-
*
231-
* @spec RTO5c - Processes sync data and updates objects pool
232-
*/
233-
private fun applySync() {
234-
if (syncObjectsDataPool.isEmpty()) {
235-
return
236-
}
237-
238-
val receivedObjectIds = mutableSetOf<String>()
239-
val existingObjectUpdates = mutableListOf<Pair<BaseLiveObject, Any>>()
240-
241-
// RTO5c1
242-
for ((objectId, objectState) in syncObjectsDataPool) {
243-
receivedObjectIds.add(objectId)
244-
val existingObject = objectsPool.get(objectId)
245-
246-
// RTO5c1a
247-
if (existingObject != null) {
248-
// Update existing object
249-
val update = existingObject.applyObjectSync(objectState) // RTO5c1a1
250-
existingObjectUpdates.add(Pair(existingObject, update))
251-
} else { // RTO5c1b
252-
// RTO5c1b1, RTO5c1b1a, RTO5c1b1b - Create new object and add it to the pool
253-
val newObject = createObjectFromState(objectState)
254-
newObject.applyObjectSync(objectState)
255-
objectsPool.set(objectId, newObject)
256-
}
257-
}
258-
259-
// RTO5c2 - need to remove LiveObject instances from the ObjectsPool for which objectIds were not received during the sync sequence
260-
objectsPool.deleteExtraObjectIds(receivedObjectIds)
261-
262-
// call subscription callbacks for all updated existing objects
263-
existingObjectUpdates.forEach { (obj, update) ->
264-
obj.notifyUpdated(update)
265-
}
266-
}
267-
268-
/**
269-
* Applies object messages to objects.
270-
*
271-
* @spec RTO9 - Creates zero-value objects if they don't exist
272-
*/
273-
private fun applyObjectMessages(objectMessages: List<ObjectMessage>) {
274-
// RTO9a
275-
for (objectMessage in objectMessages) {
276-
if (objectMessage.operation == null) {
277-
// RTO9a1
278-
Log.w(tag, "Object message received without operation field, skipping message: ${objectMessage.id}")
279-
continue
280-
}
281-
282-
val objectOperation: ObjectOperation = objectMessage.operation // RTO9a2
283-
// RTO9a2a - we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations,
284-
// we can create a zero-value object for the provided object id and apply the operation to that zero-value object.
285-
// this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves,
286-
// since they need to be able to eventually initialize themselves from that *_CREATE op.
287-
// so to simplify operations handling, we always try to create a zero-value object in the pool first,
288-
// and then we can always apply the operation on the existing object in the pool.
289-
val obj = objectsPool.createZeroValueObjectIfNotExists(objectOperation.objectId) // RTO9a2a1
290-
obj.applyObject(objectMessage) // RTO9a2a2, RTO9a2a3
291-
}
292-
}
293-
294-
/**
295-
* Applies sync messages to sync data pool.
296-
*
297-
* @spec RTO5b - Collects object states during sync sequence
298-
*/
299-
private fun applyObjectSyncMessages(objectMessages: List<ObjectMessage>) {
300-
for (objectMessage in objectMessages) {
301-
if (objectMessage.objectState == null) {
302-
Log.w(tag, "Object message received during OBJECT_SYNC without object field, skipping message: ${objectMessage.id}")
303-
continue
304-
}
305-
306-
val objectState: ObjectState = objectMessage.objectState
307-
if (objectState.counter != null || objectState.map != null) {
308-
syncObjectsDataPool[objectState.objectId] = objectState
309-
} else {
310-
// RTO5c1b1c - object state must contain either counter or map data
311-
Log.w(tag, "Object state received without counter or map data, skipping message: ${objectMessage.id}")
312-
}
313-
}
314-
}
315-
316-
/**
317-
* Creates an object from object state.
318-
*
319-
* @spec RTO5c1b - Creates objects from object state based on type
320-
*/
321-
private fun createObjectFromState(objectState: ObjectState): BaseLiveObject {
322-
return when {
323-
objectState.counter != null -> DefaultLiveCounter.zeroValue(objectState.objectId, adapter) // RTO5c1b1a
324-
objectState.map != null -> DefaultLiveMap.zeroValue(objectState.objectId, adapter, objectsPool) // RTO5c1b1b
325-
else -> throw clientError("Object state must contain either counter or map data") // RTO5c1b1c
326-
}
327-
}
328-
329110
/**
330111
* Changes the state and emits events.
331112
*
332113
* @spec RTO2 - Emits state change events for syncing and synced states
333114
*/
334-
private fun stateChange(newState: ObjectsState, deferEvent: Boolean) {
115+
internal fun stateChange(newState: ObjectsState, deferEvent: Boolean) {
335116
if (state == newState) {
336117
return
337118
}
@@ -342,29 +123,9 @@ internal class DefaultLiveObjects(private val channelName: String, private val a
342123
// TODO: Emit state change events
343124
}
344125

345-
/**
346-
* @spec RTO2 - Validates channel modes for operations
347-
*/
348-
private fun throwIfMissingChannelMode(expectedMode: String) {
349-
// TODO: Implement channel mode validation
350-
// RTO2a - channel.modes is only populated on channel attachment, so use it only if it is set
351-
// RTO2b - otherwise as a best effort use user provided channel options
352-
}
353-
354-
private fun setChannelSerial(channelSerial: String?) {
355-
if (channelSerial.isNullOrEmpty()) {
356-
Log.w(tag, "setChannelSerial called with null or empty value, ignoring")
357-
return
358-
}
359-
Log.v(tag, "Setting channel serial for channelName: $channelName, value: $channelSerial")
360-
adapter.setChannelSerial(channelName, channelSerial)
361-
}
362-
126+
// Dispose of any resources associated with this LiveObjects instance
363127
fun dispose() {
364-
// Dispose of any resources associated with this LiveObjects instance
365-
// For example, close any open connections or clean up references
366128
objectsPool.dispose()
367-
syncObjectsDataPool.clear()
368-
bufferedObjectOperations.clear()
129+
objectsManager.dispose()
369130
}
370131
}

live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,14 @@ internal fun LiveObjectsAdapter.ensureMessageSizeWithinLimit(objectMessages: Arr
3232
}
3333
}
3434

35+
internal fun LiveObjectsAdapter.setChannelSerial(channelName: String, protocolMessage: ProtocolMessage) {
36+
if (protocolMessage.action == ProtocolMessage.Action.`object`) {
37+
val channelSerial = protocolMessage.channelSerial
38+
if (channelSerial.isNullOrEmpty()) return
39+
setChannelSerial(channelName, channelSerial)
40+
}
41+
}
42+
3543
internal enum class ProtocolMessageFormat(private val value: String) {
3644
Msgpack("msgpack"),
3745
Json("json");

0 commit comments

Comments
 (0)