-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Introduce CachingCollectorManager to parallelize search when using CachingCollector #16247
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
0cde4ba
7bc5217
0466d2f
22efafa
6ccf0f6
55e79e2
dab05cc
7666028
5d85a91
4472875
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,131 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.lucene.search; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.List; | ||
|
|
||
| /** | ||
| * A {@link CollectorManager} that wraps a delegate {@link CollectorManager} and caches all | ||
| * collected documents (and optionally scores) per slice, so they can be replayed to a second-pass | ||
| * {@link CollectorManager} without re-running the query. | ||
| * | ||
| * <p>One {@link CachingCollector} is created per slice. During {@link #replay}, each cached slice | ||
| * is replayed into a fresh second-pass collector, and all second-pass collectors are reduced | ||
| * together. This works correctly with both sequential and concurrent search. | ||
| * | ||
| * <p>Example usage: | ||
| * | ||
| * <pre class="prettyprint"> | ||
| * CachingCollectorManager<C1, R1> caching = new CachingCollectorManager<>( | ||
| * firstPassManager, cacheScores, maxRAMMB, null); | ||
| * R1 firstResult = searcher.search(query, caching); | ||
| * | ||
| * if (caching.isCached()) { | ||
| * R2 secondResult = caching.replay(secondPassManager); | ||
| * } else { | ||
| * // cache overflowed — re-run the query | ||
| * R2 secondResult = searcher.search(query, secondPassManager); | ||
| * } | ||
| * </pre> | ||
| * | ||
| * @lucene.experimental | ||
| */ | ||
| public class CachingCollectorManager<C extends Collector, R> | ||
| implements CollectorManager<CachingCollector, R> { | ||
|
|
||
| private final CollectorManager<C, R> delegate; | ||
| private final boolean cacheScores; | ||
| private final Double maxRAMMB; | ||
| private final Integer maxDocsToCache; | ||
|
|
||
| // One CachingCollector per slice | ||
| private final List<CachingCollector> cachingCollectors = new ArrayList<>(); | ||
|
|
||
| /** | ||
| * @param delegate the first-pass {@link CollectorManager} | ||
| * @param cacheScores whether to cache scores in addition to document IDs | ||
| * @param maxRAMMB the maximum RAM in MB to use per slice cache, or null if using maxDocsToCache | ||
| * @param maxDocsToCache the maximum number of documents to cache per slice, or null if using | ||
| * maxRAMMB | ||
| */ | ||
| public CachingCollectorManager( | ||
| CollectorManager<C, R> delegate, | ||
| boolean cacheScores, | ||
| Double maxRAMMB, | ||
| Integer maxDocsToCache) { | ||
| if (maxRAMMB == null && maxDocsToCache == null || maxRAMMB != null && maxDocsToCache != null) { | ||
| throw new IllegalArgumentException("Exactly one of maxRAMMB or maxDocsToCache must be set"); | ||
| } | ||
| this.delegate = delegate; | ||
| this.cacheScores = cacheScores; | ||
| this.maxRAMMB = maxRAMMB; | ||
| this.maxDocsToCache = maxDocsToCache; | ||
| } | ||
|
|
||
| @Override | ||
| public CachingCollector newCollector() throws IOException { | ||
| C collector = delegate.newCollector(); | ||
| CachingCollector cache = | ||
| maxDocsToCache != null | ||
| ? CachingCollector.create(collector, cacheScores, maxDocsToCache) | ||
| : CachingCollector.create(collector, cacheScores, maxRAMMB); | ||
| cachingCollectors.add(cache); | ||
| return cache; | ||
| } | ||
|
|
||
| @Override | ||
| @SuppressWarnings("unchecked") | ||
| public R reduce(Collection<CachingCollector> collectors) throws IOException { | ||
| List<C> originals = new ArrayList<>(collectors.size()); | ||
| for (CachingCollector cache : collectors) { | ||
| originals.add((C) cache.in); | ||
| } | ||
| return delegate.reduce(originals); | ||
| } | ||
|
|
||
| /** | ||
| * Returns {@code true} if the search has been run and all per-slice caches are intact (none | ||
| * overflowed their RAM/doc budget). Returns {@code false} if the search has not yet been run or | ||
| * any cache overflowed. | ||
| */ | ||
| public boolean isCached() { | ||
| return !cachingCollectors.isEmpty() | ||
|
javanna marked this conversation as resolved.
|
||
| && cachingCollectors.stream().allMatch(CachingCollector::isCached); | ||
|
javanna marked this conversation as resolved.
|
||
| } | ||
|
|
||
| /** | ||
| * Replays each per-slice cache into a fresh second-pass collector, then reduces all results. | ||
| * | ||
| * @throws IllegalStateException if {@link #isCached()} returns {@code false} | ||
| */ | ||
| public <C2 extends Collector, R2> R2 replay(CollectorManager<C2, R2> secondPassManager) | ||
| throws IOException { | ||
| if (!isCached()) { | ||
| throw new IllegalStateException("cache is not available; re-run the query instead"); | ||
| } | ||
| List<C2> secondCollectors = new ArrayList<>(cachingCollectors.size()); | ||
|
javanna marked this conversation as resolved.
|
||
| for (CachingCollector cache : cachingCollectors) { | ||
| C2 secondCollector = secondPassManager.newCollector(); | ||
| cache.replay(secondCollector); | ||
| secondCollectors.add(secondCollector); | ||
| } | ||
| return secondPassManager.reduce(secondCollectors); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,99 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.lucene.search; | ||
|
|
||
| import java.io.IOException; | ||
| import org.apache.lucene.document.Document; | ||
| import org.apache.lucene.store.Directory; | ||
| import org.apache.lucene.tests.index.RandomIndexWriter; | ||
| import org.apache.lucene.tests.util.LuceneTestCase; | ||
|
|
||
| public class TestCachingCollectorManager extends LuceneTestCase { | ||
|
|
||
| public void testCacheOverflow() throws IOException { | ||
| Directory dir = newDirectory(); | ||
| RandomIndexWriter iw = new RandomIndexWriter(random(), dir); | ||
| for (int i = 0; i < atLeast(10); i++) { | ||
| iw.addDocument(new Document()); | ||
| } | ||
| IndexSearcher searcher = newSearcher(iw.getReader()); | ||
| iw.close(); | ||
|
|
||
| CachingCollectorManager<TopScoreDocCollector, TopDocs> caching = | ||
| new CachingCollectorManager<>( | ||
| new TopScoreDocCollectorManager(10, Integer.MAX_VALUE), false, null, 0); | ||
|
|
||
| searcher.search(MatchAllDocsQuery.INSTANCE, caching); | ||
| assertFalse(caching.isCached()); | ||
| assertThrows( | ||
| IllegalStateException.class, | ||
| () -> caching.replay(new TopScoreDocCollectorManager(10, Integer.MAX_VALUE))); | ||
|
|
||
| searcher.getIndexReader().close(); | ||
| dir.close(); | ||
| } | ||
|
|
||
| public void testNotCachedBeforeSearch() { | ||
| CachingCollectorManager<TopScoreDocCollector, TopDocs> caching = | ||
| new CachingCollectorManager<>( | ||
| new TopScoreDocCollectorManager(10, Integer.MAX_VALUE), false, null, Integer.MAX_VALUE); | ||
| assertFalse(caching.isCached()); | ||
|
|
||
| assertThrows( | ||
| IllegalStateException.class, | ||
| () -> caching.replay(new TopScoreDocCollectorManager(10, Integer.MAX_VALUE))); | ||
| } | ||
|
|
||
| public void testBasic() throws IOException { | ||
| Directory dir = newDirectory(); | ||
| RandomIndexWriter iw = new RandomIndexWriter(random(), dir); | ||
| for (int i = 0; i < 10; i++) { | ||
| iw.addDocument(new Document()); | ||
| } | ||
| IndexSearcher searcher = newSearcher(iw.getReader()); | ||
| iw.close(); | ||
|
|
||
| CachingCollectorManager<TopScoreDocCollector, TopDocs> caching = | ||
| new CachingCollectorManager<>( | ||
| new TopScoreDocCollectorManager(10, Integer.MAX_VALUE), true, null, Integer.MAX_VALUE); | ||
|
|
||
| TopDocs firstResult = searcher.search(MatchAllDocsQuery.INSTANCE, caching); | ||
| assertTrue(caching.isCached()); | ||
| assertEquals(10, firstResult.totalHits.value()); | ||
|
|
||
| TopDocs replayResult = caching.replay(new TopScoreDocCollectorManager(10, Integer.MAX_VALUE)); | ||
| assertEquals(firstResult.totalHits.value(), replayResult.totalHits.value()); | ||
| assertEquals(firstResult.scoreDocs.length, replayResult.scoreDocs.length); | ||
|
|
||
| searcher.getIndexReader().close(); | ||
| dir.close(); | ||
| } | ||
|
|
||
| public void testConstructor() { | ||
| assertThrows( | ||
| IllegalArgumentException.class, | ||
| () -> | ||
| new CachingCollectorManager<>( | ||
| new TopScoreDocCollectorManager(10, Integer.MAX_VALUE), false, null, null)); | ||
|
|
||
| assertThrows( | ||
| IllegalArgumentException.class, | ||
| () -> | ||
| new CachingCollectorManager<>( | ||
| new TopScoreDocCollectorManager(10, Integer.MAX_VALUE), false, 1.0, 1)); | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could we add also a test for the happy path ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry ,where? I mean, these tests never verify normal functioning of the collector manager, calling search against it without exceptions. Or am I not looking in the right place?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, sorry for the misunderstanding, added a new test method testBasic() to test the happy path, thanks! |
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we also throw if both are non null given only one will be used and the other silently ignored?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed that, throw exception if both are non-null.