Skip to content

Commit 2b36a0b

Browse files
authored
refactor(query): supports parallel data transmission between nodes (#18984)
* refactor(query): support parall for exchange * refactor(query): support parall for exchange * refactor(query): support parall for exchange * refactor(query): support parall for exchange * refactor(query): support parall for exchange
1 parent 7a7096a commit 2b36a0b

File tree

14 files changed

+309
-261
lines changed

14 files changed

+309
-261
lines changed

โ€Žsrc/query/service/src/schedulers/fragments/fragmenter.rsโ€Ž

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::collections::BTreeMap;
1717
use std::collections::HashMap;
1818
use std::sync::Arc;
1919

20+
use databend_common_base::base::GlobalUniqName;
2021
use databend_common_catalog::cluster_info::Cluster;
2122
use databend_common_catalog::table_context::TableContext;
2223
use databend_common_exception::Result;
@@ -217,11 +218,23 @@ impl FragmentDeriveHandle {
217218

218219
Ok(match exchange_sink.kind {
219220
FragmentKind::Init => None,
220-
FragmentKind::Normal => Some(DataExchange::NodeToNodeExchange(NodeToNodeExchange {
221-
destination_ids: get_executors(cluster),
222-
shuffle_keys: exchange_sink.keys.clone(),
223-
allow_adjust_parallelism: exchange_sink.allow_adjust_parallelism,
224-
})),
221+
FragmentKind::Normal => {
222+
let destination_ids = get_executors(cluster);
223+
224+
let mut destination_channels = Vec::with_capacity(destination_ids.len());
225+
226+
for destination in &destination_ids {
227+
destination_channels
228+
.push((destination.clone(), vec![GlobalUniqName::unique()]));
229+
}
230+
231+
Some(DataExchange::NodeToNodeExchange(NodeToNodeExchange {
232+
destination_ids,
233+
destination_channels,
234+
shuffle_keys: exchange_sink.keys.clone(),
235+
allow_adjust_parallelism: exchange_sink.allow_adjust_parallelism,
236+
}))
237+
}
225238
FragmentKind::Merge => Some(MergeExchange::create(
226239
cluster.local_id(),
227240
exchange_sink.ignore_exchange,

โ€Žsrc/query/service/src/schedulers/fragments/query_fragment_actions.rsโ€Ž

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,14 +230,15 @@ impl QueryFragmentsActions {
230230
fn fragments_connections(&self, builder: &mut DataflowDiagramBuilder) -> Result<()> {
231231
for fragment_actions in &self.fragments_actions {
232232
if let Some(exchange) = &fragment_actions.data_exchange {
233-
let fragment_id = fragment_actions.fragment_id;
234233
let destinations = exchange.get_destinations();
235234

236235
for fragment_action in &fragment_actions.fragment_actions {
237236
let source = fragment_action.executor.to_string();
238237

239238
for destination in &destinations {
240-
builder.add_data_edge(&source, destination, fragment_id)?;
239+
for channel in exchange.get_channels(destination) {
240+
builder.add_data_edge(&source, destination, &channel)?;
241+
}
241242
}
242243
}
243244
}

โ€Žsrc/query/service/src/servers/flight/flight_client.rsโ€Ž

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -136,17 +136,11 @@ impl FlightClient {
136136

137137
#[async_backtrace::framed]
138138
#[fastrace::trace]
139-
pub async fn do_get(
140-
&mut self,
141-
query_id: &str,
142-
target: &str,
143-
fragment: usize,
144-
) -> Result<FlightExchange> {
139+
pub async fn do_get(&mut self, query_id: &str, channel_id: &str) -> Result<FlightExchange> {
145140
let request = RequestBuilder::create(Ticket::default())
146141
.with_metadata("x-type", "exchange_fragment")?
147-
.with_metadata("x-target", target)?
148142
.with_metadata("x-query-id", query_id)?
149-
.with_metadata("x-fragment-id", &fragment.to_string())?
143+
.with_metadata("x-channel-id", channel_id)?
150144
.build();
151145
let request = databend_common_tracing::inject_span_to_tonic_request(request);
152146

โ€Žsrc/query/service/src/servers/flight/v1/exchange/data_exchange.rsโ€Ž

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use databend_common_base::base::GlobalUniqName;
1516
use databend_common_expression::RemoteExpr;
1617

1718
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
@@ -29,19 +30,49 @@ impl DataExchange {
2930
DataExchange::NodeToNodeExchange(exchange) => exchange.destination_ids.clone(),
3031
}
3132
}
33+
34+
pub fn get_channels(&self, destination: &str) -> Vec<String> {
35+
match self {
36+
DataExchange::Merge(exchange) => vec![exchange.channel_id.clone()],
37+
DataExchange::Broadcast(exchange) => {
38+
for (to, channels) in &exchange.destination_channels {
39+
if to == destination {
40+
return channels.clone();
41+
}
42+
}
43+
44+
vec![]
45+
}
46+
DataExchange::NodeToNodeExchange(exchange) => {
47+
for (to, channels) in &exchange.destination_channels {
48+
if to == destination {
49+
return channels.clone();
50+
}
51+
}
52+
53+
vec![]
54+
}
55+
}
56+
}
57+
58+
pub fn get_parallel(&self) -> usize {
59+
1
60+
}
3261
}
3362

3463
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
3564
pub struct NodeToNodeExchange {
3665
pub destination_ids: Vec<String>,
3766
pub shuffle_keys: Vec<RemoteExpr>,
67+
pub destination_channels: Vec<(String, Vec<String>)>,
3868
pub allow_adjust_parallelism: bool,
3969
}
4070

4171
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
4272
pub struct MergeExchange {
4373
pub destination_id: String,
4474
pub ignore_exchange: bool,
75+
pub channel_id: String,
4576
pub allow_adjust_parallelism: bool,
4677
}
4778

@@ -55,17 +86,28 @@ impl MergeExchange {
5586
destination_id,
5687
ignore_exchange,
5788
allow_adjust_parallelism,
89+
channel_id: GlobalUniqName::unique(),
5890
})
5991
}
6092
}
6193

6294
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
6395
pub struct BroadcastExchange {
6496
pub destination_ids: Vec<String>,
97+
pub destination_channels: Vec<(String, Vec<String>)>,
6598
}
6699

67100
impl BroadcastExchange {
68101
pub fn create(destination_ids: Vec<String>) -> DataExchange {
69-
DataExchange::Broadcast(BroadcastExchange { destination_ids })
102+
let mut destination_channels = Vec::with_capacity(destination_ids.len());
103+
104+
for destination in &destination_ids {
105+
destination_channels.push((destination.clone(), vec![GlobalUniqName::unique()]));
106+
}
107+
108+
DataExchange::Broadcast(BroadcastExchange {
109+
destination_ids,
110+
destination_channels,
111+
})
70112
}
71113
}

0 commit comments

Comments
ย (0)