diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java index 2b12a3b784c7a..b03d6762cdc92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java @@ -37,8 +37,10 @@ import org.apache.ignite.internal.processors.cache.query.reducer.NodePageStream; import org.apache.ignite.internal.processors.cache.query.reducer.TextQueryReducer; import org.apache.ignite.internal.processors.cache.query.reducer.UnsortedCacheQueryReducer; +import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.lang.GridPlainCallable; import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.INDEX; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT; @@ -323,7 +325,12 @@ private void cancelPages(UUID nodeId) { } /** {@inheritDoc} */ - @Override public boolean onDone(Collection res, Throwable err) { + @Override protected boolean onDone(@Nullable Collection res, @Nullable Throwable err, boolean cancel) { + boolean done = super.onDone(res, err, cancel); + + if (!done) + return false; + if (cctx.kernalContext().performanceStatistics().enabled() && startTimeNanos > 0) { GridCacheQueryType type = qry.query().type(); @@ -340,9 +347,9 @@ private void cancelPages(UUID nodeId) { reqId, startTimeNanos, System.nanoTime() - startTimeNanos, - err == null); + err == null || QueryUtils.wasCancelled(err)); } - return super.onDone(res, err); + return true; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java index 2dcc4aa59da05..09a11ad22d6a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.cache.query.index.IndexQueryResultMeta; import org.apache.ignite.internal.processors.cache.CacheObjectUtils; @@ -365,7 +366,7 @@ void clear() { /** {@inheritDoc} */ @Override public boolean cancel() throws IgniteCheckedException { if (onCancelled()) { - cancelQuery(new IgniteCheckedException("Query was cancelled.")); + cancelQuery(new QueryCancelledException()); return true; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java index 9fde5568cb543..f4b17a202a0b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java @@ -429,7 +429,7 @@ public void unregister(long qryId, @Nullable Throwable failReason) { qry.id(), qry.startTime(), System.nanoTime() - qry.startTimeNanos(), - !failed); + !failed || QueryUtils.wasCancelled(failReason)); } if (!qryFinishedListeners.isEmpty()) { diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsQueryTest.java index 97661fb41b1a9..70a586011fb4a 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsQueryTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsQueryTest.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Set; @@ -34,6 +35,7 @@ import org.apache.ignite.cache.query.FieldsQueryCursor; import org.apache.ignite.cache.query.IndexQuery; import org.apache.ignite.cache.query.Query; +import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.client.Config; @@ -48,7 +50,6 @@ import org.apache.ignite.internal.processors.cache.query.IndexQueryDesc; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; -import org.junit.Assume; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -87,14 +88,19 @@ public class PerformanceStatisticsQueryTest extends AbstractPerformanceStatistic @Parameterized.Parameter(1) public ClientType clientType; + /** */ + @Parameterized.Parameter(2) + public boolean fetchAll; + /** @return Test parameters. */ - @Parameterized.Parameters(name = "pageSize={0}, clientType={1}") + @Parameterized.Parameters(name = "pageSize={0}, clientType={1}, fetchAll={2}") public static Collection parameters() { List res = new ArrayList<>(); for (Integer pageSize : new Integer[] {ENTRY_COUNT, ENTRY_COUNT / 10}) { for (ClientType clientType : new ClientType[] {SERVER, CLIENT, THIN_CLIENT}) - res.add(new Object[] {pageSize, clientType}); + for (boolean fetchAll : new boolean[] {true, false}) + res.add(new Object[] {pageSize, clientType, fetchAll}); } return res; @@ -233,8 +239,6 @@ public void testSqlFieldsQueryWithReducer() throws Exception { /** @throws Exception If failed. */ @Test public void testSqlFieldsLocalQuery() throws Exception { - Assume.assumeTrue(clientType == SERVER); - String sql = "select * from " + DEFAULT_CACHE_NAME; SqlFieldsQuery qry = new SqlFieldsQuery(sql).setPageSize(pageSize).setLocal(true); @@ -290,6 +294,21 @@ public void testDdlAndDmlQueries() throws Exception { runQueryAndCheck(SQL_FIELDS, new SqlFieldsQuery(sql), sql, true, false, false); } + /** @throws Exception If failed. */ + @Test + public void testCursorNotFullyRead() throws Exception { + query(new SqlFieldsQuery("create table " + SQL_TABLE + " (id int, val varchar, primary key (id))")); + + for (int i = 0; i < 20; i++) + query(new SqlFieldsQuery("insert into " + SQL_TABLE + " (id) values (" + i + ")")); + + String sql = "SELECT id FROM " + SQL_TABLE; + + SqlFieldsQuery qry = new SqlFieldsQuery(sql).setPageSize(10); + + runQueryAndCheck(SQL_FIELDS, qry, sql, true, false, false); + } + /** Runs query and checks statistics. */ private void runQueryAndCheck( GridCacheQueryType expType, @@ -305,23 +324,7 @@ private void runQueryAndCheck( startCollectStatistics(); - Collection expNodeIds = new ArrayList<>(); - - if (clientType == SERVER) { - srv.cache(DEFAULT_CACHE_NAME).query(qry).getAll(); - - expNodeIds.add(srv.localNode().id()); - } - else if (clientType == CLIENT) { - client.cache(DEFAULT_CACHE_NAME).query(qry).getAll(); - - expNodeIds.add(client.localNode().id()); - } - else if (clientType == THIN_CLIENT) { - thinClient.cache(DEFAULT_CACHE_NAME).query(qry).getAll(); - - expNodeIds.addAll(nodeIds(client.cluster().forServers().nodes())); - } + Collection expNodeIds = query(qry); Set readsNodes = new HashSet<>(); @@ -488,4 +491,40 @@ public void testMultipleStatementsSql() throws Exception { assertTrue("Queries was not handled: " + expQrs, expQrs.isEmpty()); assertEquals("Unexpected IDs: " + qryIds, qrsWithReads.size(), qryIds.size()); } + + /** */ + private Collection query(Query qry) { + Collection expNodeIds = new ArrayList<>(); + + QueryCursor cursor = null; + + if (clientType == SERVER) { + cursor = srv.cache(DEFAULT_CACHE_NAME).query(qry); + + expNodeIds.add(srv.localNode().id()); + } + else if (clientType == CLIENT) { + cursor = client.cache(DEFAULT_CACHE_NAME).query(qry); + + expNodeIds.add(client.localNode().id()); + } + else if (clientType == THIN_CLIENT) { + cursor = thinClient.cache(DEFAULT_CACHE_NAME).query(qry); + + expNodeIds.addAll(nodeIds(client.cluster().forServers().nodes())); + } + + if (fetchAll) + cursor.getAll(); + else { + Iterator iter = cursor.iterator(); + + if (iter.hasNext()) + iter.next(); + + cursor.close(); + } + + return expNodeIds; + } }