Skip to content

Commit ad9d1f1

Browse files
committed
POC for in flight search requests in tasks API
1 parent b52e63f commit ad9d1f1

8 files changed

Lines changed: 136 additions & 20 deletions

File tree

gradle/run.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.opensearch.gradle.testclusters.RunTask
3131

3232
apply plugin: 'opensearch.testclusters'
3333

34-
def numNodes = findProperty('numNodes') as Integer ?: 1
34+
def numNodes = findProperty('numNodes') as Integer ?: 2
3535
def numZones = findProperty('numZones') as Integer ?: 1
3636

3737
testClusters {

server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ public void onFailure(Exception e) {
179179
});
180180
} else {
181181
taskResourceTrackingService.refreshResourceStats(runningTask);
182+
// POC: Always use detailed=true to include the search source
182183
TaskInfo info = runningTask.taskInfo(clusterService.localNode().getId(), true);
183184
listener.onResponse(new GetTaskResponse(new TaskResult(false, info)));
184185
}

server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/list/TransportListTasksAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,9 @@ protected ListTasksResponse newResponse(
100100

101101
@Override
102102
protected void taskOperation(ListTasksRequest request, Task task, ActionListener<TaskInfo> listener) {
103-
listener.onResponse(task.taskInfo(clusterService.localNode().getId(), request.getDetailed()));
103+
// Always include detailed information if this is a search task to get the query source
104+
boolean detailed = request.getDetailed() || task instanceof org.opensearch.action.search.SearchTask;
105+
listener.onResponse(task.taskInfo(clusterService.localNode().getId(), detailed));
104106
}
105107

106108
@Override

server/src/main/java/org/opensearch/action/search/SearchRequest.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -715,7 +715,16 @@ public String pipeline() {
715715

716716
@Override
717717
public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
718-
return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers, cancelAfterTimeInterval);
718+
return new SearchTask(
719+
id,
720+
type,
721+
action,
722+
this::buildDescription,
723+
parentTaskId,
724+
headers,
725+
cancelAfterTimeInterval,
726+
source
727+
);
719728
}
720729

721730
public final String buildDescription() {

server/src/main/java/org/opensearch/action/search/SearchTask.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.opensearch.common.annotation.PublicApi;
3636
import org.opensearch.common.unit.TimeValue;
3737
import org.opensearch.core.tasks.TaskId;
38+
import org.opensearch.search.builder.SearchSourceBuilder;
3839
import org.opensearch.tasks.SearchBackpressureTask;
3940
import org.opensearch.wlm.QueryGroupTask;
4041

@@ -53,6 +54,7 @@ public class SearchTask extends QueryGroupTask implements SearchBackpressureTask
5354
// generating description in a lazy way since source can be quite big
5455
private final Supplier<String> descriptionSupplier;
5556
private SearchProgressListener progressListener = SearchProgressListener.NOOP;
57+
private final SearchSourceBuilder sourceBuilder;
5658

5759
public SearchTask(
5860
long id,
@@ -62,7 +64,7 @@ public SearchTask(
6264
TaskId parentTaskId,
6365
Map<String, String> headers
6466
) {
65-
this(id, type, action, descriptionSupplier, parentTaskId, headers, NO_TIMEOUT);
67+
this(id, type, action, descriptionSupplier, parentTaskId, headers, NO_TIMEOUT, null);
6668
}
6769

6870
public SearchTask(
@@ -76,6 +78,22 @@ public SearchTask(
7678
) {
7779
super(id, type, action, null, parentTaskId, headers, cancelAfterTimeInterval);
7880
this.descriptionSupplier = descriptionSupplier;
81+
this.sourceBuilder = null;
82+
}
83+
84+
public SearchTask(
85+
long id,
86+
String type,
87+
String action,
88+
Supplier<String> descriptionSupplier,
89+
TaskId parentTaskId,
90+
Map<String, String> headers,
91+
TimeValue cancelAfterTimeInterval,
92+
SearchSourceBuilder sourceBuilder
93+
) {
94+
super(id, type, action, null, parentTaskId, headers, cancelAfterTimeInterval);
95+
this.descriptionSupplier = descriptionSupplier;
96+
this.sourceBuilder = sourceBuilder;
7997
}
8098

8199
@Override
@@ -106,4 +124,11 @@ public final SearchProgressListener getProgressListener() {
106124
public boolean shouldCancelChildrenOnCancellation() {
107125
return true;
108126
}
127+
128+
/**
129+
* Returns the search source builder associated with this task, if any.
130+
*/
131+
public SearchSourceBuilder getSourceBuilder() {
132+
return sourceBuilder;
133+
}
109134
}

server/src/main/java/org/opensearch/tasks/Task.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
package org.opensearch.tasks;
3434

3535
import org.opensearch.ExceptionsHelper;
36+
import org.opensearch.action.search.SearchTask;
3637
import org.opensearch.common.annotation.PublicApi;
3738
import org.opensearch.core.action.ActionResponse;
3839
import org.opensearch.core.action.NotifyOnceListener;
@@ -178,9 +179,16 @@ private TaskInfo taskInfo(String localNodeId, boolean detailed, boolean excludeS
178179
String description = null;
179180
Task.Status status = null;
180181
TaskResourceStats resourceStats = null;
182+
Object searchSource = null;
181183
if (detailed) {
182184
description = getDescription();
183185
status = getStatus();
186+
if (this instanceof SearchTask) {
187+
SearchTask searchTask = (SearchTask) this;
188+
if (searchTask.getSourceBuilder() != null) {
189+
searchSource = searchTask.getSourceBuilder();
190+
}
191+
}
184192
}
185193
if (excludeStats == false) {
186194
resourceStats = new TaskResourceStats(new HashMap<>() {
@@ -192,20 +200,27 @@ private TaskInfo taskInfo(String localNodeId, boolean detailed, boolean excludeS
192200
}
193201
}, getThreadUsage());
194202
}
195-
return taskInfo(localNodeId, description, status, resourceStats);
203+
return taskInfo(localNodeId, description, status, resourceStats, searchSource);
196204
}
197205

198206
/**
199207
* Build a {@link TaskInfo} for this task without resource stats.
200208
*/
201209
protected final TaskInfo taskInfo(String localNodeId, String description, Status status) {
202-
return taskInfo(localNodeId, description, status, null);
210+
return taskInfo(localNodeId, description, status, null, null);
203211
}
204212

205213
/**
206214
* Build a proper {@link TaskInfo} for this task.
207215
*/
208216
protected final TaskInfo taskInfo(String localNodeId, String description, Status status, TaskResourceStats resourceStats) {
217+
return taskInfo(localNodeId, description, status, resourceStats, null);
218+
}
219+
220+
/**
221+
* Build a proper {@link TaskInfo} for this task.
222+
*/
223+
protected final TaskInfo taskInfo(String localNodeId, String description, Status status, TaskResourceStats resourceStats, Object searchSource) {
209224
boolean cancelled = this instanceof CancellableTask && ((CancellableTask) this).isCancelled();
210225
Long cancellationStartTime = null;
211226
if (cancelled) {
@@ -224,7 +239,8 @@ protected final TaskInfo taskInfo(String localNodeId, String description, Status
224239
parentTask,
225240
headers,
226241
resourceStats,
227-
cancellationStartTime
242+
cancellationStartTime,
243+
searchSource
228244
);
229245
}
230246

server/src/main/java/org/opensearch/tasks/TaskInfo.java

Lines changed: 67 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
9797

9898
private final TaskResourceStats resourceStats;
9999

100+
private final Object searchSource;
101+
100102
public TaskInfo(
101103
TaskId taskId,
102104
String type,
@@ -124,6 +126,7 @@ public TaskInfo(
124126
parentTaskId,
125127
headers,
126128
resourceStats,
129+
null,
127130
null
128131
);
129132
}
@@ -142,6 +145,40 @@ public TaskInfo(
142145
Map<String, String> headers,
143146
TaskResourceStats resourceStats,
144147
Long cancellationStartTime
148+
) {
149+
this(
150+
taskId,
151+
type,
152+
action,
153+
description,
154+
status,
155+
startTime,
156+
runningTimeNanos,
157+
cancellable,
158+
cancelled,
159+
parentTaskId,
160+
headers,
161+
resourceStats,
162+
cancellationStartTime,
163+
null
164+
);
165+
}
166+
167+
public TaskInfo(
168+
TaskId taskId,
169+
String type,
170+
String action,
171+
String description,
172+
Task.Status status,
173+
long startTime,
174+
long runningTimeNanos,
175+
boolean cancellable,
176+
boolean cancelled,
177+
TaskId parentTaskId,
178+
Map<String, String> headers,
179+
TaskResourceStats resourceStats,
180+
Long cancellationStartTime,
181+
Object searchSource
145182
) {
146183
if (cancellable == false && cancelled == true) {
147184
throw new IllegalArgumentException("task cannot be cancelled");
@@ -159,6 +196,7 @@ public TaskInfo(
159196
this.headers = headers;
160197
this.resourceStats = resourceStats;
161198
this.cancellationStartTime = cancellationStartTime;
199+
this.searchSource = searchSource;
162200
}
163201

164202
/**
@@ -194,6 +232,8 @@ public TaskInfo(StreamInput in) throws IOException {
194232
} else {
195233
cancellationStartTime = null;
196234
}
235+
// For now, searchSource is not serialized over the wire
236+
searchSource = null;
197237
}
198238

199239
@Override
@@ -300,6 +340,13 @@ public TaskResourceStats getResourceStats() {
300340
return resourceStats;
301341
}
302342

343+
/**
344+
* Returns the search source for this task if this is a search task
345+
*/
346+
public Object getSearchSource() {
347+
return searchSource;
348+
}
349+
303350
@Override
304351
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
305352
builder.field("node", taskId.getNodeId());
@@ -335,6 +382,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
335382
if (cancellationStartTime != null) {
336383
builder.humanReadableField("cancellation_time_millis", "cancellation_time", new TimeValue(cancellationStartTime));
337384
}
385+
// if (searchSource != null && params.paramAsBoolean("detailed", false)) {
386+
if (searchSource != null) {
387+
builder.field("search_source", searchSource);
388+
}
338389
return builder;
339390
}
340391

@@ -367,18 +418,19 @@ public static TaskInfo fromXContent(XContentParser parser) {
367418
TaskId parentTaskId = parentTaskIdString == null ? TaskId.EMPTY_TASK_ID : new TaskId(parentTaskIdString);
368419
return new TaskInfo(
369420
id,
370-
type,
371-
action,
372-
description,
373-
status,
374-
startTime,
375-
runningTimeNanos,
376-
cancellable,
377-
cancelled,
421+
type,
422+
action,
423+
description,
424+
status,
425+
startTime,
426+
runningTimeNanos,
427+
cancellable,
428+
cancelled,
378429
parentTaskId,
379-
headers,
380-
resourceStats,
381-
cancellationStartTime
430+
headers,
431+
resourceStats,
432+
cancellationStartTime,
433+
null // searchSource not parsed from XContent yet
382434
);
383435
});
384436
static {
@@ -424,7 +476,8 @@ public boolean equals(Object obj) {
424476
&& Objects.equals(status, other.status)
425477
&& Objects.equals(headers, other.headers)
426478
&& Objects.equals(resourceStats, other.resourceStats)
427-
&& Objects.equals(cancellationStartTime, other.cancellationStartTime);
479+
&& Objects.equals(cancellationStartTime, other.cancellationStartTime)
480+
&& Objects.equals(searchSource, other.searchSource);
428481
}
429482

430483
@Override
@@ -442,7 +495,8 @@ public int hashCode() {
442495
status,
443496
headers,
444497
resourceStats,
445-
cancellationStartTime
498+
cancellationStartTime,
499+
searchSource
446500
);
447501
}
448502
}

server/src/main/java/org/opensearch/tasks/TaskManager.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import org.opensearch.ExceptionsHelper;
3939
import org.opensearch.OpenSearchException;
4040
import org.opensearch.OpenSearchTimeoutException;
41+
import org.opensearch.action.search.SearchShardTask;
42+
import org.opensearch.action.search.SearchTask;
4143
import org.opensearch.cluster.ClusterChangedEvent;
4244
import org.opensearch.cluster.ClusterStateApplier;
4345
import org.opensearch.cluster.node.DiscoveryNode;
@@ -320,6 +322,13 @@ public Task unregister(Task task) {
320322
}
321323
}
322324

325+
try {
326+
if (task instanceof SearchTask || task instanceof SearchShardTask) {
327+
Thread.sleep(100000);
328+
}
329+
} catch (InterruptedException e) {
330+
throw new RuntimeException(e);
331+
}
323332
if (task instanceof CancellableTask) {
324333
CancellableTaskHolder holder = cancellableTasks.remove(task.getId());
325334
if (holder != null) {

0 commit comments

Comments
 (0)