Skip to content

Commit 05093f0

Browse files
committed
Propagate cancellation within leaf search
1 parent 0358ca3 commit 05093f0

File tree

2 files changed

+32
-48
lines changed

2 files changed

+32
-48
lines changed

quickwit/quickwit-search/src/leaf.rs

Lines changed: 28 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use tantivy::directory::FileSlice;
4141
use tantivy::fastfield::FastFieldReaders;
4242
use tantivy::schema::Field;
4343
use tantivy::{DateTime, Index, ReloadPolicy, Searcher, TantivyError, Term};
44-
use tokio::task::JoinError;
44+
use tokio::task::{JoinError, JoinSet};
4545
use tracing::*;
4646

4747
use crate::collector::{IncrementalCollector, make_collector_for_split, make_merge_collector};
@@ -1202,8 +1202,7 @@ pub async fn multi_index_leaf_search(
12021202
//
12031203
// It is a little bit tricky how to handle which is now the incremental_merge_collector, one
12041204
// per index, e.g. when to merge results and how to avoid lock contention.
1205-
let mut leaf_request_tasks = Vec::new();
1206-
1205+
let mut join_set = JoinSet::new();
12071206
for leaf_search_request_ref in leaf_search_request.leaf_requests.into_iter() {
12081207
let index_uri = quickwit_common::uri::Uri::from_str(
12091208
leaf_search_request
@@ -1226,7 +1225,7 @@ pub async fn multi_index_leaf_search(
12261225
})?
12271226
.clone();
12281227

1229-
let leaf_request_future = tokio::spawn({
1228+
join_set.spawn({
12301229
let storage_resolver = storage_resolver.clone();
12311230
let searcher_context = searcher_context.clone();
12321231
let search_request = search_request.clone();
@@ -1241,33 +1240,16 @@ pub async fn multi_index_leaf_search(
12411240
doc_mapper,
12421241
aggregation_limits,
12431242
)
1243+
.in_current_span()
12441244
.await
12451245
}
1246-
.in_current_span()
12471246
});
1248-
leaf_request_tasks.push(leaf_request_future);
12491247
}
12501248

1251-
let leaf_responses: Vec<crate::Result<LeafSearchResponse>> = tokio::time::timeout(
1252-
searcher_context.searcher_config.request_timeout(),
1253-
try_join_all(leaf_request_tasks),
1254-
)
1255-
.await??;
12561249
let merge_collector = make_merge_collector(&search_request, aggregation_limits)?;
12571250
let mut incremental_merge_collector = IncrementalCollector::new(merge_collector);
1258-
for result in leaf_responses {
1259-
match result {
1260-
Ok(result) => {
1261-
incremental_merge_collector.add_result(result)?;
1262-
}
1263-
Err(err) => {
1264-
incremental_merge_collector.add_failed_split(SplitSearchError {
1265-
split_id: "unknown".to_string(),
1266-
error: format!("{err}"),
1267-
retryable_error: true,
1268-
});
1269-
}
1270-
}
1251+
while let Some(result) = join_set.join_next().await {
1252+
incremental_merge_collector.add_result(result??)?;
12711253
}
12721254

12731255
crate::search_thread_pool()
@@ -1349,9 +1331,6 @@ pub async fn single_doc_mapping_leaf_search(
13491331

13501332
let split_filter = Arc::new(RwLock::new(split_filter));
13511333

1352-
let mut leaf_search_single_split_join_handles: Vec<(String, tokio::task::JoinHandle<()>)> =
1353-
Vec::with_capacity(split_with_req.len());
1354-
13551334
let merge_collector = make_merge_collector(&request, aggregations_limits.clone())?;
13561335
let incremental_merge_collector = IncrementalCollector::new(merge_collector);
13571336
let incremental_merge_collector = Arc::new(Mutex::new(incremental_merge_collector));
@@ -1379,6 +1358,8 @@ pub async fn single_doc_mapping_leaf_search(
13791358
split_filter: split_filter.clone(),
13801359
});
13811360

1361+
let mut join_set = JoinSet::new();
1362+
let mut split_with_task_id = Vec::with_capacity(split_with_req.len());
13821363
for ((split, search_request), permit_fut) in
13831364
split_with_req.into_iter().zip(permit_futures.into_iter())
13841365
{
@@ -1394,35 +1375,37 @@ pub async fn single_doc_mapping_leaf_search(
13941375
leaf_search_state_guard.set_state(SplitSearchState::PrunedBeforeWarmup);
13951376
continue;
13961377
};
1397-
1398-
leaf_search_single_split_join_handles.push((
1399-
split.split_id.clone(),
1400-
tokio::spawn(
1401-
leaf_search_single_split_wrapper(
1402-
simplified_search_request,
1403-
leaf_search_context.clone(),
1404-
index_storage.clone(),
1405-
split,
1406-
leaf_split_search_permit,
1407-
aggregations_limits.clone(),
1408-
)
1409-
.in_current_span(),
1410-
),
1411-
));
1378+
let split_id = split.split_id.clone();
1379+
let handle = join_set.spawn(
1380+
leaf_search_single_split_wrapper(
1381+
simplified_search_request,
1382+
leaf_search_context.clone(),
1383+
index_storage.clone(),
1384+
split,
1385+
leaf_split_search_permit,
1386+
aggregations_limits.clone(),
1387+
)
1388+
.in_current_span(),
1389+
);
1390+
split_with_task_id.push((split_id, handle.id()));
14121391
}
14131392

14141393
// TODO we could cancel running splits when !run_all_splits and the running split can no
14151394
// longer give better results after some other split answered.
14161395
let mut split_search_join_errors: Vec<(String, JoinError)> = Vec::new();
14171396

1418-
// There is no need to use `join_all`, as these are spawned tasks.
1419-
for (split, leaf_search_join_handle) in leaf_search_single_split_join_handles {
1397+
while let Some(leaf_search_join_result) = join_set.join_next().await {
14201398
// splits that did not panic were already added to the collector
1421-
if let Err(join_error) = leaf_search_join_handle.await {
1399+
if let Err(join_error) = leaf_search_join_result {
14221400
if join_error.is_cancelled() {
14231401
// An explicit task cancellation is not an error.
14241402
continue;
14251403
}
1404+
let position = split_with_task_id
1405+
.iter()
1406+
.position(|(_, task_id)| *task_id == join_error.id())
1407+
.unwrap();
1408+
let (split, _) = split_with_task_id.remove(position);
14261409
if join_error.is_panic() {
14271410
error!(split=%split, "leaf search task panicked");
14281411
} else {

quickwit/quickwit-search/src/service.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ impl SearchService for SearchServiceImpl {
186186
.map(|req| req.split_offsets.len())
187187
.sum::<usize>();
188188

189-
LeafSearchMetricsFuture {
189+
let tracked_future = LeafSearchMetricsFuture {
190190
tracked: multi_index_leaf_search(
191191
self.searcher_context.clone(),
192192
leaf_search_request,
@@ -195,8 +195,9 @@ impl SearchService for SearchServiceImpl {
195195
start: Instant::now(),
196196
targeted_splits: num_splits,
197197
status: None,
198-
}
199-
.await
198+
};
199+
let timeout = self.searcher_context.searcher_config.request_timeout();
200+
tokio::time::timeout(timeout, tracked_future).await?
200201
}
201202

202203
async fn fetch_docs(

0 commit comments

Comments
 (0)