diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala index 5ac664de..5b5f0d89 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala @@ -44,6 +44,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], handler: RecordHandler[T]) extends VarLenReader with Logging with Serializable { private val DEFAULT_INDEX_SIZE_COMPRESSED_FILES_MB = 1024 private val DEFAULT_FS_INDEX_SIZE_MULTIPLIER = 4 + private val DEFAULT_MAX_FS_BASED_SPLIT_SIZE_MB = 256 protected val cobolSchema: CobolSchema = loadCopyBook(copybookContents) @@ -221,7 +222,10 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], if (isCompressed) { readerProperties.inputSplitSizeCompressedMB.orElse(Some(DEFAULT_INDEX_SIZE_COMPRESSED_FILES_MB)) } else { - readerProperties.inputSplitSizeMB.orElse(readerProperties.fsDefaultBlockSize).map(_ * DEFAULT_FS_INDEX_SIZE_MULTIPLIER) + val defaultIndexSizeBasedOnFsBlock = readerProperties.fsDefaultBlockSize.map { size => + Math.min(size * DEFAULT_FS_INDEX_SIZE_MULTIPLIER, DEFAULT_MAX_FS_BASED_SPLIT_SIZE_MB) + } + readerProperties.inputSplitSizeMB.orElse(defaultIndexSizeBasedOnFsBlock) } } diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala index 4d01f8c0..3170ee00 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala @@ -32,9 +32,8 @@ import za.co.absa.cobrix.spark.cobol.source.SerializableConfiguration import za.co.absa.cobrix.spark.cobol.source.parameters.LocalityParameters import za.co.absa.cobrix.spark.cobol.source.streaming.FileStreamer import za.co.absa.cobrix.spark.cobol.source.types.FileWithOrder -import za.co.absa.cobrix.spark.cobol.utils.{FileUtils, HDFSUtils, SparkUtils} +import za.co.absa.cobrix.spark.cobol.utils.{FileUtils, HDFSUtils, LRUCache, SparkUtils} -import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable.ArrayBuffer /** @@ -46,7 +45,9 @@ import scala.collection.mutable.ArrayBuffer * In a nutshell, ideally, there will be as many partitions as are there are indexes. */ private[cobol] object IndexBuilder extends Logging { - private[cobol] val indexCache = new ConcurrentHashMap[String, Array[SparseIndexEntry]]() + private val maxCacheSize = 10000 + + private[cobol] val indexCache = new LRUCache[IndexKey, Array[SparseIndexEntry]](maxCacheSize) def buildIndex(filesList: Array[FileWithOrder], cobolReader: Reader, @@ -123,10 +124,11 @@ private[cobol] object IndexBuilder extends Logging { cachingAllowed: Boolean): RDD[SparseIndexEntry] = { val conf = sqlContext.sparkContext.hadoopConfiguration val sconf = new SerializableConfiguration(conf) + val readerOptionsHashCode = reader.getReaderProperties.hashCode() // Splitting between files for which indexes are cached and the list of files for which indexes are not cached val cachedFiles = if (cachingAllowed) { - filesList.filter(f => indexCache.containsKey(f.filePath)) + filesList.filter(f => indexCache.containsKey(IndexKey(f.filePath, readerOptionsHashCode))) } else { Array.empty[FileWithOrder] } @@ -157,7 +159,7 @@ private[cobol] object IndexBuilder extends Logging { filePathOpt.foreach { filePath => logger.info(s"Index stored to cache for file: $filePath.") - indexCache.put(filePath, indexEntries.sortBy(_.offsetFrom)) + indexCache.put(IndexKey(filePath, readerOptionsHashCode), indexEntries.sortBy(_.offsetFrom)) } } } @@ -165,7 +167,7 @@ private[cobol] object IndexBuilder extends Logging { // Getting indexes for files for which indexes are in the cache val cachedIndexes = cachedFiles.flatMap { f => logger.info("Index fetched from cache for file: " + f.filePath) - indexCache.get(f.filePath) + indexCache(IndexKey(f.filePath, readerOptionsHashCode)) .map(ind => ind.copy(fileId = f.order)) } diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexKey.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexKey.scala new file mode 100644 index 00000000..d1905d64 --- /dev/null +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexKey.scala @@ -0,0 +1,19 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.spark.cobol.source.index + +case class IndexKey(fileName: String, optionsHashCode: Long) diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/LRUCache.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/LRUCache.scala new file mode 100644 index 00000000..f7bbd887 --- /dev/null +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/LRUCache.scala @@ -0,0 +1,49 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.spark.cobol.utils + +import scala.collection.JavaConverters._ + +class LRUCache[K,V](maxSize: Int, loadFactor: Float = 0.75f) { + private val cache = new java.util.LinkedHashMap[K, V](Math.min(maxSize, 128), loadFactor, true) { + override def removeEldestEntry(eldest: java.util.Map.Entry[K, V]): Boolean = size() > maxSize + } + + def apply(key: K): V = synchronized { + cache.get(key) + } + + def containsKey(key: K): Boolean = synchronized { + cache.containsKey(key) + } + + def get(key: K): Option[V] = synchronized { + Option(cache.get(key)) + } + + def getMap: Map[K, V] = synchronized { + cache.asScala.toMap + } + + def put(key: K, value: V): Unit = synchronized { + cache.put(key, value) + } + + def remove(key: K): Unit = synchronized { + cache.remove(key) + } +} diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala index af627b41..947eac93 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala @@ -197,8 +197,12 @@ class Test37RecordLengthMappingSpec extends AnyWordSpec with SparkTestBase with val pathNameAsCached = s"file:$tempFile" - assert(IndexBuilder.indexCache.get(pathNameAsCached) != null) - assert(IndexBuilder.indexCache.get(pathNameAsCached).length == 2) + val indexCacheSimplified = IndexBuilder.indexCache.getMap.map { + case (k, v) => (k.fileName, v) + } + + assert(indexCacheSimplified.contains(pathNameAsCached)) + assert(indexCacheSimplified(pathNameAsCached).length == 2) assert(actualInitial == expected) assert(actualCached == expected) diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/utils/LRUCacheSuite.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/utils/LRUCacheSuite.scala new file mode 100644 index 00000000..bbb234ee --- /dev/null +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/utils/LRUCacheSuite.scala @@ -0,0 +1,91 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.spark.cobol.source.utils + +import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.cobrix.spark.cobol.utils.LRUCache + +class LRUCacheSuite extends AnyWordSpec { + "LRUCache" should { + "remember items put there" in { + val cache = new LRUCache[Int, String](3) + cache.put(1, "one") + cache.put(2, "two") + cache.put(3, "three") + assert(cache.get(1).contains("one")) + assert(cache.get(2).contains("two")) + assert(cache.get(3).contains("three")) + } + + "forget old items" in { + val cache = new LRUCache[Int, String](3) + cache.put(1, "one") + cache.put(2, "two") + cache.put(3, "three") + cache.put(4, "four") + assert(cache.get(1).isEmpty) + assert(cache(1) == null) + assert(cache.get(2).contains("two")) + assert(cache(2) == "two") + assert(cache.get(3).contains("three")) + assert(cache.get(4).contains("four")) + } + + "remember frequently used items" in { + val cache = new LRUCache[Int, String](3) + cache.put(1, "one") + cache.put(2, "two") + cache.put(3, "three") + cache.get(1) + cache.get(3) + cache.put(4, "four") + + assert(cache.get(1).contains("one")) + assert(cache.get(2).isEmpty) + assert(cache.get(3).contains("three")) + assert(cache.get(4).contains("four")) + } + + "allow invalidating of values" in { + val cache = new LRUCache[Int, String](3) + cache.put(1, "one") + cache.put(2, "two") + cache.put(3, "three") + cache.remove(3) + cache.remove(4) + cache.get(1) + cache.get(3) + cache.put(4, "four") + + assert(cache.containsKey(1)) + assert(!cache.containsKey(8)) + assert(cache(1) == "one") + assert(cache(2) == "two") + assert(cache(3) == null) + assert(cache(4) == "four") + } + + "return the cache as a map" in { + val cache = new LRUCache[Int, String](3) + cache.put(1, "one") + cache.put(2, "two") + cache.put(3, "three") + + assert(cache.getMap.size == 3) + } + } +}