Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.target.nativememoryallocator.map

import com.github.benmanes.caffeine.cache.stats.CacheStats
import com.target.nativememoryallocator.buffer.NativeMemoryBuffer
import com.target.nativememoryallocator.buffer.NativeMemoryBufferMetadata
import com.target.nativememoryallocator.buffer.OnHeapMemoryBuffer

Expand Down Expand Up @@ -86,6 +87,13 @@ interface NativeMemoryMap<KEY_TYPE : Any, VALUE_TYPE : Any> : BaseNativeMemoryMa
*/
fun get(key: KEY_TYPE): VALUE_TYPE?

/**
* Get a value from the map using the [key], copying the native memory contents into [onHeapMemoryBuffer].
*
* @return true if key was found and value copied into [onHeapMemoryBuffer], false otherwise.
*/
fun getIntoOnHeapMemoryBuffer(key: KEY_TYPE, onHeapMemoryBuffer: OnHeapMemoryBuffer): Boolean

/**
* [Set] of [KEY_TYPE] for the map.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,20 @@ internal class NativeMemoryMapImpl<KEY_TYPE : Any, VALUE_TYPE : Any>(
}
}

override fun getIntoOnHeapMemoryBuffer(key: KEY_TYPE, onHeapMemoryBuffer: OnHeapMemoryBuffer): Boolean {
var found = false

cacheMap.computeIfPresent(key) { _, currentBuffer ->
found = true

currentBuffer.copyToOnHeapMemoryBuffer(onHeapMemoryBuffer = onHeapMemoryBuffer)

currentBuffer
}

return found
}

override val keys: Set<KEY_TYPE>
get() = cacheMap.keys

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.target.nativememoryallocator.map.impl

import com.target.nativememoryallocator.buffer.OnHeapMemoryBuffer
import com.target.nativememoryallocator.map.NativeMemoryMap
import com.target.nativememoryallocator.map.NativeMemoryMapOperationCounters
import java.util.concurrent.atomic.AtomicLong
Expand Down Expand Up @@ -76,6 +77,23 @@ internal class OperationCountedNativeMemoryMapImpl<KEY_TYPE : Any, VALUE_TYPE :
return getResult
}

/**
* Delegate to [NativeMemoryMap.getIntoOnHeapMemoryBuffer] and then update [operationCounters].
*/
override fun getIntoOnHeapMemoryBuffer(key: KEY_TYPE, onHeapMemoryBuffer: OnHeapMemoryBuffer): Boolean {
val getResult = nativeMemoryMap.getIntoOnHeapMemoryBuffer(key = key, onHeapMemoryBuffer = onHeapMemoryBuffer)

operationCounters.apply {
if (getResult) {
numGetsNonNullValue.incrementAndGet()
} else {
numGetsNullValue.incrementAndGet()
}
}

return getResult
}

/**
* Delegate to [NativeMemoryMap.put] and then update [operationCounters].
*/
Expand Down
52 changes: 52 additions & 0 deletions src/test/kotlin/com/target/map/impl/NativeMemoryMapImplTest.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.target.map.impl

import com.target.nativememoryallocator.allocator.NativeMemoryAllocator
import com.target.nativememoryallocator.allocator.NativeMemoryAllocatorBuilder
import com.target.nativememoryallocator.buffer.NativeMemoryBuffer
import com.target.nativememoryallocator.buffer.OnHeapMemoryBuffer
import com.target.nativememoryallocator.buffer.OnHeapMemoryBufferFactory
Expand Down Expand Up @@ -279,6 +280,7 @@ class NativeMemoryMapImplTest {
}
}


@Test
fun `test put reuse buffer`() {
val serializedValue1 = ByteArray(10)
Expand Down Expand Up @@ -441,4 +443,54 @@ class NativeMemoryMapImplTest {
mockNativeMemoryAllocator.freeNativeMemoryBuffer(buffer = mockNativeMemoryBuffer)
}
}

@Test
fun `test put then getIntoOnHeapMemoryBuffer`() {

class NMAStringSerializer : NativeMemoryMapSerializer<String> {

override fun deserializeFromOnHeapMemoryBuffer(onHeapMemoryBuffer: OnHeapMemoryBuffer): String =
String(
onHeapMemoryBuffer.array, 0, onHeapMemoryBuffer.getReadableBytes()
)

override fun serializeToByteArray(value: String): ByteArray = value.toByteArray()
}

clearAllMocks()

val nativeMemoryAllocator = NativeMemoryAllocatorBuilder(
pageSizeBytes = 1024,
nativeMemorySizeBytes = 16 * 1024 * 1024, // 16 MB
).build()

val testValue = "hello world"

val nativeMemoryMap = NativeMemoryMapImpl(
valueSerializer = NMAStringSerializer(),
nativeMemoryAllocator = nativeMemoryAllocator,
useThreadLocalOnHeapReadBuffer = false,
threadLocalOnHeapReadBufferInitialCapacityBytes = (256 * 1024),
cacheMap = ConcurrentHashMap(),
)

val putResult = nativeMemoryMap.put(key = 1, value = testValue)
putResult shouldBe NativeMemoryMap.PutResult.ALLOCATED_NEW_BUFFER

val readOnHeapMemoryBuffer = OnHeapMemoryBufferFactory.newOnHeapMemoryBuffer(initialCapacityBytes = 1)

nativeMemoryMap.getIntoOnHeapMemoryBuffer(
key = 1,
onHeapMemoryBuffer = readOnHeapMemoryBuffer
) shouldBe true

readOnHeapMemoryBuffer.getReadableBytes() shouldBe 11
String(readOnHeapMemoryBuffer.toTrimmedArray(), 0, readOnHeapMemoryBuffer.getReadableBytes()) shouldBe testValue

nativeMemoryMap.getIntoOnHeapMemoryBuffer(key = 2, onHeapMemoryBuffer = readOnHeapMemoryBuffer) shouldBe false

nativeMemoryMap.keys shouldBe setOf(1)
nativeMemoryMap.size shouldBe 1
}

}