Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
import org.apache.ignite.internal.processors.cache.query.reducer.NodePageStream;
import org.apache.ignite.internal.processors.cache.query.reducer.TextQueryReducer;
import org.apache.ignite.internal.processors.cache.query.reducer.UnsortedCacheQueryReducer;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.INDEX;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT;
Expand Down Expand Up @@ -323,7 +325,12 @@ private void cancelPages(UUID nodeId) {
}

/** {@inheritDoc} */
@Override public boolean onDone(Collection<R> res, Throwable err) {
@Override protected boolean onDone(@Nullable Collection<R> res, @Nullable Throwable err, boolean cancel) {
boolean done = super.onDone(res, err, cancel);

if (!done)
return false;

if (cctx.kernalContext().performanceStatistics().enabled() && startTimeNanos > 0) {
GridCacheQueryType type = qry.query().type();

Expand All @@ -340,9 +347,9 @@ private void cancelPages(UUID nodeId) {
reqId,
startTimeNanos,
System.nanoTime() - startTimeNanos,
err == null);
err == null || QueryUtils.wasCancelled(err));
}

return super.onDone(res, err);
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.cache.query.index.IndexQueryResultMeta;
import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
Expand Down Expand Up @@ -365,7 +366,7 @@ void clear() {
/** {@inheritDoc} */
@Override public boolean cancel() throws IgniteCheckedException {
if (onCancelled()) {
cancelQuery(new IgniteCheckedException("Query was cancelled."));
cancelQuery(new QueryCancelledException());

return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ public void unregister(long qryId, @Nullable Throwable failReason) {
qry.id(),
qry.startTime(),
System.nanoTime() - qry.startTimeNanos(),
!failed);
!failed || QueryUtils.wasCancelled(failReason));
}

if (!qryFinishedListeners.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
Expand All @@ -34,6 +35,7 @@
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.IndexQuery;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.client.Config;
Expand All @@ -48,7 +50,6 @@
import org.apache.ignite.internal.processors.cache.query.IndexQueryDesc;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -87,14 +88,19 @@ public class PerformanceStatisticsQueryTest extends AbstractPerformanceStatistic
@Parameterized.Parameter(1)
public ClientType clientType;

/** */
@Parameterized.Parameter(2)
public boolean fetchAll;

/** @return Test parameters. */
@Parameterized.Parameters(name = "pageSize={0}, clientType={1}")
@Parameterized.Parameters(name = "pageSize={0}, clientType={1}, fetchAll={2}")
public static Collection<?> parameters() {
List<Object[]> res = new ArrayList<>();

for (Integer pageSize : new Integer[] {ENTRY_COUNT, ENTRY_COUNT / 10}) {
for (ClientType clientType : new ClientType[] {SERVER, CLIENT, THIN_CLIENT})
res.add(new Object[] {pageSize, clientType});
for (boolean fetchAll : new boolean[] {true, false})
res.add(new Object[] {pageSize, clientType, fetchAll});
}

return res;
Expand Down Expand Up @@ -233,8 +239,6 @@ public void testSqlFieldsQueryWithReducer() throws Exception {
/** @throws Exception If failed. */
@Test
public void testSqlFieldsLocalQuery() throws Exception {
Assume.assumeTrue(clientType == SERVER);

String sql = "select * from " + DEFAULT_CACHE_NAME;

SqlFieldsQuery qry = new SqlFieldsQuery(sql).setPageSize(pageSize).setLocal(true);
Expand Down Expand Up @@ -290,6 +294,21 @@ public void testDdlAndDmlQueries() throws Exception {
runQueryAndCheck(SQL_FIELDS, new SqlFieldsQuery(sql), sql, true, false, false);
}

/** @throws Exception If failed. */
@Test
public void testCursorNotFullyRead() throws Exception {
query(new SqlFieldsQuery("create table " + SQL_TABLE + " (id int, val varchar, primary key (id))"));

for (int i = 0; i < 20; i++)
query(new SqlFieldsQuery("insert into " + SQL_TABLE + " (id) values (" + i + ")"));

String sql = "SELECT id FROM " + SQL_TABLE;

SqlFieldsQuery qry = new SqlFieldsQuery(sql).setPageSize(10);

runQueryAndCheck(SQL_FIELDS, qry, sql, true, false, false);
}

/** Runs query and checks statistics. */
private void runQueryAndCheck(
GridCacheQueryType expType,
Expand All @@ -305,23 +324,7 @@ private void runQueryAndCheck(

startCollectStatistics();

Collection<UUID> expNodeIds = new ArrayList<>();

if (clientType == SERVER) {
srv.cache(DEFAULT_CACHE_NAME).query(qry).getAll();

expNodeIds.add(srv.localNode().id());
}
else if (clientType == CLIENT) {
client.cache(DEFAULT_CACHE_NAME).query(qry).getAll();

expNodeIds.add(client.localNode().id());
}
else if (clientType == THIN_CLIENT) {
thinClient.cache(DEFAULT_CACHE_NAME).query(qry).getAll();

expNodeIds.addAll(nodeIds(client.cluster().forServers().nodes()));
}
Collection<UUID> expNodeIds = query(qry);

Set<UUID> readsNodes = new HashSet<>();

Expand Down Expand Up @@ -488,4 +491,40 @@ public void testMultipleStatementsSql() throws Exception {
assertTrue("Queries was not handled: " + expQrs, expQrs.isEmpty());
assertEquals("Unexpected IDs: " + qryIds, qrsWithReads.size(), qryIds.size());
}

/** */
private Collection<UUID> query(Query<?> qry) {
Collection<UUID> expNodeIds = new ArrayList<>();

QueryCursor<?> cursor = null;

if (clientType == SERVER) {
cursor = srv.cache(DEFAULT_CACHE_NAME).query(qry);

expNodeIds.add(srv.localNode().id());
}
else if (clientType == CLIENT) {
cursor = client.cache(DEFAULT_CACHE_NAME).query(qry);

expNodeIds.add(client.localNode().id());
}
else if (clientType == THIN_CLIENT) {
cursor = thinClient.cache(DEFAULT_CACHE_NAME).query(qry);

expNodeIds.addAll(nodeIds(client.cluster().forServers().nodes()));
}

if (fetchAll)
cursor.getAll();
else {
Iterator<?> iter = cursor.iterator();

if (iter.hasNext())
iter.next();

cursor.close();
}

return expNodeIds;
}
}