Service: Use iterator to avoid high space complexity#3415
Service: Use iterator to avoid high space complexity#3415flyrain wants to merge 3 commits intoapache:mainfrom
Conversation
| .filter(mf -> seenPaths.add(mf.path())) | ||
| .filter(mf -> TaskUtils.exists(mf.path(), fileIO)) |
There was a problem hiding this comment.
Set<String> uniquePaths = tableMetadata.snapshots().stream()
.flatMap(sn -> sn.allManifests(fileIO).stream())
.map(ManifestFile::path)
.collect(Collectors.toSet());
return uniquePaths.parallelStream() // Parallel here!
.filter(mf -> TaskUtils.exists(mf.path(), fileIO))
.map(mf -> createManifestTask(...));
There was a problem hiding this comment.
Once we call .collect(Collectors.toSet(), the stream is fully materialized, which will lose the benefit of lazy execution. Here we are trying lower the memory footprint based on lazy execution.
| createAndRegisterTasks(batch, metaStoreManager, polarisCallContext, tableEntity); | ||
| totalCount += batch.size(); | ||
| } | ||
|
|
There was a problem hiding this comment.
can explicitly call batch.clear ?
There was a problem hiding this comment.
We could, but we don't have to, as this is the last batch.
singhpk234
left a comment
There was a problem hiding this comment.
LGTM, this seems like a nice improvement thanks @flyrain !
|
@pingtimeout : what is your take on this PR? |
|
@dimas-b This PR is very confusing to me as after review, I do not think it fixes anything at all... |
pingtimeout
left a comment
There was a problem hiding this comment.
Thanks @flyrain for the attempt at fixing the high space complexity issue. This is a good start, but I don't think we are quite there yet.
As far as I can tell, the space complexity of table cleanup was O(UM + PM + S + ST + PST + T). And with this change, it is O(UM + PM + S + ST + PST + batchSize) where:
PM= number of previous metadata filesS= number of snapshotsST= number of statistics filesPST= number of partition statistics filesUM= number of unique manifest files across all snapshotsT= total number of created TaskEntities
You can see that by running the code with large number of files under constrained memory. You will see that with the current code, there is always a number of files that results in an OOME, proving that the space complexity issue has not been solved by the change. You may want to use realistic (longer) paths to surface the issue faster.
I want to emphasize one critical point that must be addressed before this PR is merged. In #3256, you said the following:
please take a look to see if that solves the problem. It'd be really nice to run this with the same setup we used to validate the current PR which is this PR fixed the issue
Which contradicts the box that you checked in the description of this PR: Added/updated tests with good coverage, or manually tested (and explained how). Were you able to reproduce the issue before attempting to write a fix?
To summarize: based on my review of the code, I am convinced that this does not solve the underlying issue. And based on the lack of testing, I do not think this PR is ready. I appreciate the desire to provide an alternative to #3256. But I think #3256 is the best option we have, all things considered.
| } | ||
|
|
||
| @Test | ||
| public void testMetadataFileBatchingWithManyFiles() throws IOException { |
There was a problem hiding this comment.
This test is named testMetadataFileBatchingWithManyFiles but only creates 24 files in total. Unfortunately that does not prove that the code is better at handling large tables.
There was a problem hiding this comment.
The intent of this unit test is not to simulate a truly large table, but to validate the batching behavior and correctness when metadata files are processed incrementally. As is common practice, we avoid stress or scale tests in unit tests, since they would significantly slow down CI execution and are better suited for dedicated benchmark.
There was a problem hiding this comment.
I understand that unit tests should be quick to avoid slowing down CI. My main concern here is whether this code change has been tested at scale. And if so, how?
There was a problem hiding this comment.
Thanks for bringing it up. I think it's a good idea to have a benchmark, more details are here, #3256 (comment).
| .stream() | ||
| // distinct by manifest path, since multiple snapshots will contain the same manifest | ||
| // Use stateful filter to dedupe while streaming | ||
| .filter(mf -> seenPaths.add(mf.path())) |
There was a problem hiding this comment.
This line adds all unique manifest files across all snapshots to a set that is maintained in memory. Even though the stream is lazy, all unique manifest paths are materialized on the heap. This means that the space complexity does not change.
There was a problem hiding this comment.
Thanks for the detailed analysis. I agree that the only remaining unbounded structure here is the in memory set used to dedup manifest paths. I do not think this is a practical concern.
To put concrete numbers on it, with an extreme case that 1 million file paths and an estimated 50 to 100 bytes per path including object and set overhead, the memory footprint would be roughly 40 MB to 95 MB, which is acceptable. That is already a very large table cleanup scenario. At that scale, the question becomes whether we even want the Polaris server itself to handle such a task synchronously in memory. A delegation service would fit better in that case.
| int batchSize = callContext.getRealmConfig().getConfig(BATCH_SIZE_CONFIG_KEY, 10); | ||
| return getMetadataFileBatches(tableMetadata, batchSize).stream() | ||
|
|
||
| // Stream all metadata files without materializing them all at once |
There was a problem hiding this comment.
The only thing that this change does it to postpone the call to the .map(...) methods, but afaict the memory consumption stays identical.
There was a problem hiding this comment.
The main change is that stream().toList() has been removed to avoid fully materializing the results in memory. Instead, an iterator is used together with a configurable batch size (taskPersistenceBatchSize) to read and process items incrementally. This bounds memory usage, as shown in lines 169 to 175.
There was a problem hiding this comment.
The parameters of the stream construction are eager, so I am afraid the only thing lazily evaluated here is the call to .flatMap(Function.identity())
There was a problem hiding this comment.
The comment is misleading, removed. However, all file paths here is part of metadata.json file, we've loaded the matadata.json file as a table metadata to memory already. Applying lazy evaluation doesn't make sense here.
6ac5e4e to
789a4b8
Compare
|
This PR is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 5 days. |
Fix #2365 (comment)
Checklist
CHANGELOG.md(if needed)site/content/in-dev/unreleased(if needed)