diff --git a/tape/build.gradle b/tape/build.gradle index 2148d0e2..33aecac0 100644 --- a/tape/build.gradle +++ b/tape/build.gradle @@ -18,4 +18,8 @@ checkstyle { toolVersion = '7.7' } +test { + maxHeapSize = '4G' // maximum heap size +} + apply from: rootProject.file('gradle/gradle-mvn-push.gradle') diff --git a/tape/src/main/java/com/squareup/tape2/QueueFile.java b/tape/src/main/java/com/squareup/tape2/QueueFile.java index c1cae7b6..4f8a68b7 100644 --- a/tape/src/main/java/com/squareup/tape2/QueueFile.java +++ b/tape/src/main/java/com/squareup/tape2/QueueFile.java @@ -454,13 +454,21 @@ private void expandIfNecessary(long dataLength) throws IOException { // Calculate the position of the tail end of the data in the ring buffer long endOfLastElement = wrapPosition(last.position + Element.HEADER_LENGTH + last.length); long count = 0; + long pos = headerLength; // If the buffer is split, we need to make it contiguous if (endOfLastElement <= first.position) { FileChannel channel = raf.getChannel(); channel.position(fileLength); // destination position count = endOfLastElement - headerLength; - if (channel.transferTo(headerLength, count, channel) != count) { - throw new AssertionError("Copied insufficient number of bytes!"); + // Transfer data in batches because of the write limitation of 2GB in FileChannelImpl. + long remainingToTransfer = count; + while (remainingToTransfer > 0) { + long sizeToTransfer = min(remainingToTransfer, Integer.MAX_VALUE); + if (channel.transferTo(pos, sizeToTransfer, channel) != sizeToTransfer) { + throw new AssertionError("Copied insufficient number of bytes!"); + } + remainingToTransfer -= sizeToTransfer; + pos += sizeToTransfer; } } diff --git a/tape/src/test/java/com/squareup/tape2/QueueFileTest.java b/tape/src/test/java/com/squareup/tape2/QueueFileTest.java index 669d0beb..73b73a14 100644 --- a/tape/src/test/java/com/squareup/tape2/QueueFileTest.java +++ b/tape/src/test/java/com/squareup/tape2/QueueFileTest.java @@ -751,6 +751,51 @@ private QueueFile newQueueFile(boolean zero) throws IOException { queue.close(); } + /** + * Exercise a bug where an expanding queue file needs to transfer large data (more than 2GB). + */ + @Test public void testFileExpansionMoveLargeElements() throws IOException { + QueueFile queue = newQueueFile(); + + // Create test data - (1GB - 4 byte - 4 byte) block + // We can mock the large elements transfer as follows: + // 1. add four blocks to saturate the queue file; + // 2. remove first three blocks; + // 3. add three blocks to make the last position smaller than first position; + // 4. add one new block which trigger the expansion of queue file + // and large data transfer (first three blocks approximately 3GB). + int blockSize = 1024 * 1024 * 1024 - headerLength - Element.HEADER_LENGTH; + byte[] values = new byte[blockSize]; + for (int i = 0; i < blockSize; i++) { + values[i] = (byte) (i + 1); + } + + // Saturate the queue file + queue.add(values); + queue.add(values); + queue.add(values); + queue.add(values); + + // Remove first three blocks and add three blocks + queue.remove(); + queue.remove(); + queue.remove(); + queue.add(values); + queue.add(values); + queue.add(values); + + // Cause the queue file to expand and a large data transfer + queue.add(values); + + // Make sure values are not corrupted + for (int i = 0; i < 5; i++) { + assertThat(queue.peek()).isEqualTo(values); + queue.remove(); + } + + queue.close(); + } + /** * Exercise a bug where opening a queue whose first or last element's header * was non contiguous throws an {@link java.io.EOFException}.