Skip to content

Commit 0a0c44b

Browse files
committed
ByteBufBsonDocument & ByteBufBsonArray refactoring
* Now implement `Closeable` to track and manage lifecycle with try-with-resources * `ByteBufBsonDocument`: Added resource tracking, OP_MSG parsing, caching strategy * `ByteBufBsonArray`: Added resource tracking and cleanup CommandMessage Changes: * `getCommandDocument()` returns `ByteBufBsonDocument` (was `BsonDocument`) * Delegates document composition to `ByteBufBsonDocument` * Simplified `OP_MSG` document sequence parsing JAVA-6010
1 parent 5560012 commit 0a0c44b

11 files changed

Lines changed: 2253 additions & 1103 deletions

driver-core/src/main/com/mongodb/internal/connection/ByteBufBsonArray.java

Lines changed: 81 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.bson.ByteBuf;
2424
import org.bson.io.ByteBufferBsonInput;
2525

26+
import java.io.Closeable;
2627
import java.util.ArrayList;
2728
import java.util.Collection;
2829
import java.util.Iterator;
@@ -33,20 +34,31 @@
3334

3435
import static com.mongodb.internal.connection.ByteBufBsonHelper.readBsonValue;
3536

36-
final class ByteBufBsonArray extends BsonArray {
37+
final class ByteBufBsonArray extends BsonArray implements Closeable {
3738
private final ByteBuf byteBuf;
3839

40+
/**
41+
* List of resources that need to be closed when this array is closed.
42+
* Tracks the main ByteBuf and iterator duplicates. Iterator buffers are automatically
43+
* removed and released when iteration completes normally to prevent memory accumulation.
44+
*/
45+
private final List<Closeable> trackedResources = new ArrayList<>();
46+
private boolean closed;
47+
3948
ByteBufBsonArray(final ByteBuf byteBuf) {
4049
this.byteBuf = byteBuf;
50+
trackedResources.add(byteBuf::release);
4151
}
4252

4353
@Override
4454
public Iterator<BsonValue> iterator() {
55+
ensureOpen();
4556
return new ByteBufBsonArrayIterator();
4657
}
4758

4859
@Override
4960
public List<BsonValue> getValues() {
61+
ensureOpen();
5062
List<BsonValue> values = new ArrayList<>();
5163
for (BsonValue cur: this) {
5264
//noinspection UseBulkOperation
@@ -59,6 +71,7 @@ public List<BsonValue> getValues() {
5971

6072
@Override
6173
public int size() {
74+
ensureOpen();
6275
int size = 0;
6376
for (BsonValue ignored : this) {
6477
size++;
@@ -68,11 +81,13 @@ public int size() {
6881

6982
@Override
7083
public boolean isEmpty() {
84+
ensureOpen();
7185
return !iterator().hasNext();
7286
}
7387

7488
@Override
7589
public boolean equals(final Object o) {
90+
ensureOpen();
7691
if (o == this) {
7792
return true;
7893
}
@@ -91,6 +106,7 @@ public boolean equals(final Object o) {
91106

92107
@Override
93108
public int hashCode() {
109+
ensureOpen();
94110
int hashCode = 1;
95111
for (BsonValue cur : this) {
96112
hashCode = 31 * hashCode + (cur == null ? 0 : cur.hashCode());
@@ -100,6 +116,7 @@ public int hashCode() {
100116

101117
@Override
102118
public boolean contains(final Object o) {
119+
ensureOpen();
103120
for (BsonValue cur : this) {
104121
if (Objects.equals(o, cur)) {
105122
return true;
@@ -111,6 +128,7 @@ public boolean contains(final Object o) {
111128

112129
@Override
113130
public Object[] toArray() {
131+
ensureOpen();
114132
Object[] retVal = new Object[size()];
115133
Iterator<BsonValue> it = iterator();
116134
for (int i = 0; i < retVal.length; i++) {
@@ -122,6 +140,7 @@ public Object[] toArray() {
122140
@Override
123141
@SuppressWarnings("unchecked")
124142
public <T> T[] toArray(final T[] a) {
143+
ensureOpen();
125144
int size = size();
126145
T[] retVal = a.length >= size ? a : (T[]) java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), size);
127146
Iterator<BsonValue> it = iterator();
@@ -133,6 +152,7 @@ public <T> T[] toArray(final T[] a) {
133152

134153
@Override
135154
public boolean containsAll(final Collection<?> c) {
155+
ensureOpen();
136156
for (Object e : c) {
137157
if (!contains(e)) {
138158
return false;
@@ -143,6 +163,7 @@ public boolean containsAll(final Collection<?> c) {
143163

144164
@Override
145165
public BsonValue get(final int index) {
166+
ensureOpen();
146167
if (index < 0) {
147168
throw new IndexOutOfBoundsException("Index out of range: " + index);
148169
}
@@ -159,6 +180,7 @@ public BsonValue get(final int index) {
159180

160181
@Override
161182
public int indexOf(final Object o) {
183+
ensureOpen();
162184
int i = 0;
163185
for (BsonValue cur : this) {
164186
if (Objects.equals(o, cur)) {
@@ -172,6 +194,7 @@ public int indexOf(final Object o) {
172194

173195
@Override
174196
public int lastIndexOf(final Object o) {
197+
ensureOpen();
175198
ListIterator<BsonValue> listIterator = listIterator(size());
176199
while (listIterator.hasPrevious()) {
177200
if (Objects.equals(o, listIterator.previous())) {
@@ -183,17 +206,20 @@ public int lastIndexOf(final Object o) {
183206

184207
@Override
185208
public ListIterator<BsonValue> listIterator() {
209+
ensureOpen();
186210
return listIterator(0);
187211
}
188212

189213
@Override
190214
public ListIterator<BsonValue> listIterator(final int index) {
215+
ensureOpen();
191216
// Not the most efficient way to do this, but unlikely anyone will notice in practice
192217
return new ArrayList<>(this).listIterator(index);
193218
}
194219

195220
@Override
196221
public List<BsonValue> subList(final int fromIndex, final int toIndex) {
222+
ensureOpen();
197223
if (fromIndex < 0) {
198224
throw new IndexOutOfBoundsException("fromIndex = " + fromIndex);
199225
}
@@ -234,6 +260,7 @@ public boolean addAll(final Collection<? extends BsonValue> c) {
234260

235261
@Override
236262
public boolean addAll(final int index, final Collection<? extends BsonValue> c) {
263+
ensureOpen();
237264
throw new UnsupportedOperationException(READ_ONLY_MESSAGE);
238265
}
239266

@@ -267,11 +294,43 @@ public BsonValue remove(final int index) {
267294
throw new UnsupportedOperationException(READ_ONLY_MESSAGE);
268295
}
269296

297+
@Override
298+
public void close(){
299+
if (!closed) {
300+
for (Closeable closeable : trackedResources) {
301+
try {
302+
closeable.close();
303+
} catch (Exception e) {
304+
// Log and continue closing other resources
305+
}
306+
}
307+
trackedResources.clear();
308+
closed = true;
309+
}
310+
}
311+
312+
private void ensureOpen() {
313+
if (closed) {
314+
throw new IllegalStateException("The BsonArray resources have been released.");
315+
}
316+
}
317+
270318
private class ByteBufBsonArrayIterator implements Iterator<BsonValue> {
271-
private final ByteBuf duplicatedByteBuf = byteBuf.duplicate();
272-
private final BsonBinaryReader bsonReader;
319+
private ByteBuf duplicatedByteBuf;
320+
private BsonBinaryReader bsonReader;
321+
private Closeable resourceHandle;
322+
private boolean finished;
273323

274324
{
325+
ensureOpen();
326+
duplicatedByteBuf = byteBuf.duplicate();
327+
resourceHandle = () -> {
328+
if (duplicatedByteBuf != null) {
329+
duplicatedByteBuf.release();
330+
duplicatedByteBuf = null;
331+
}
332+
};
333+
trackedResources.add(resourceHandle);
275334
bsonReader = new BsonBinaryReader(new ByteBufferBsonInput(duplicatedByteBuf));
276335
// While one might expect that this would be a call to BsonReader#readStartArray that doesn't work because BsonBinaryReader
277336
// expects to be positioned at the start at the beginning of a document, not an array. Fortunately, a BSON array has exactly
@@ -283,7 +342,11 @@ private class ByteBufBsonArrayIterator implements Iterator<BsonValue> {
283342

284343
@Override
285344
public boolean hasNext() {
286-
return bsonReader.getCurrentBsonType() != BsonType.END_OF_DOCUMENT;
345+
boolean hasNext = bsonReader.getCurrentBsonType() != BsonType.END_OF_DOCUMENT;
346+
if (!hasNext) {
347+
cleanup();
348+
}
349+
return hasNext;
287350
}
288351

289352
@Override
@@ -292,9 +355,22 @@ public BsonValue next() {
292355
throw new NoSuchElementException();
293356
}
294357
bsonReader.skipName();
295-
BsonValue value = readBsonValue(duplicatedByteBuf, bsonReader);
358+
BsonValue value = readBsonValue(duplicatedByteBuf, bsonReader, trackedResources);
296359
bsonReader.readBsonType();
297360
return value;
298361
}
362+
363+
private void cleanup() {
364+
if (!finished) {
365+
finished = true;
366+
// Remove from tracked resources since we're cleaning up immediately
367+
trackedResources.remove(resourceHandle);
368+
try {
369+
resourceHandle.close();
370+
} catch (Exception e) {
371+
// Ignore
372+
}
373+
}
374+
}
299375
}
300376
}

0 commit comments

Comments
 (0)