Skip to content
Merged
23 changes: 18 additions & 5 deletions src/query/service/src/schedulers/fragments/fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::BTreeMap;
use std::collections::HashMap;
use std::sync::Arc;

use databend_common_base::base::GlobalUniqName;
use databend_common_catalog::cluster_info::Cluster;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
Expand Down Expand Up @@ -217,11 +218,23 @@ impl FragmentDeriveHandle {

Ok(match exchange_sink.kind {
FragmentKind::Init => None,
FragmentKind::Normal => Some(DataExchange::NodeToNodeExchange(NodeToNodeExchange {
destination_ids: get_executors(cluster),
shuffle_keys: exchange_sink.keys.clone(),
allow_adjust_parallelism: exchange_sink.allow_adjust_parallelism,
})),
FragmentKind::Normal => {
let destination_ids = get_executors(cluster);

let mut destination_channels = Vec::with_capacity(destination_ids.len());

for destination in &destination_ids {
destination_channels
.push((destination.clone(), vec![GlobalUniqName::unique()]));
}

Some(DataExchange::NodeToNodeExchange(NodeToNodeExchange {
destination_ids,
destination_channels,
shuffle_keys: exchange_sink.keys.clone(),
allow_adjust_parallelism: exchange_sink.allow_adjust_parallelism,
}))
}
FragmentKind::Merge => Some(MergeExchange::create(
cluster.local_id(),
exchange_sink.ignore_exchange,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,15 @@ impl QueryFragmentsActions {
fn fragments_connections(&self, builder: &mut DataflowDiagramBuilder) -> Result<()> {
for fragment_actions in &self.fragments_actions {
if let Some(exchange) = &fragment_actions.data_exchange {
let fragment_id = fragment_actions.fragment_id;
let destinations = exchange.get_destinations();

for fragment_action in &fragment_actions.fragment_actions {
let source = fragment_action.executor.to_string();

for destination in &destinations {
builder.add_data_edge(&source, destination, fragment_id)?;
for channel in exchange.get_channels(destination) {
builder.add_data_edge(&source, destination, &channel)?;
}
}
}
}
Expand Down
10 changes: 2 additions & 8 deletions src/query/service/src/servers/flight/flight_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,17 +136,11 @@ impl FlightClient {

#[async_backtrace::framed]
#[fastrace::trace]
pub async fn do_get(
&mut self,
query_id: &str,
target: &str,
fragment: usize,
) -> Result<FlightExchange> {
pub async fn do_get(&mut self, query_id: &str, channel_id: &str) -> Result<FlightExchange> {
let request = RequestBuilder::create(Ticket::default())
.with_metadata("x-type", "exchange_fragment")?
.with_metadata("x-target", target)?
.with_metadata("x-query-id", query_id)?
.with_metadata("x-fragment-id", &fragment.to_string())?
.with_metadata("x-channel-id", channel_id)?
.build();
let request = databend_common_tracing::inject_span_to_tonic_request(request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_base::base::GlobalUniqName;
use databend_common_expression::RemoteExpr;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
Expand All @@ -29,19 +30,49 @@ impl DataExchange {
DataExchange::NodeToNodeExchange(exchange) => exchange.destination_ids.clone(),
}
}

pub fn get_channels(&self, destination: &str) -> Vec<String> {
match self {
DataExchange::Merge(exchange) => vec![exchange.channel_id.clone()],
DataExchange::Broadcast(exchange) => {
for (to, channels) in &exchange.destination_channels {
if to == destination {
return channels.clone();
}
}

vec![]
}
DataExchange::NodeToNodeExchange(exchange) => {
for (to, channels) in &exchange.destination_channels {
if to == destination {
return channels.clone();
}
}

vec![]
}
}
}

pub fn get_parallel(&self) -> usize {
1
}
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct NodeToNodeExchange {
pub destination_ids: Vec<String>,
pub shuffle_keys: Vec<RemoteExpr>,
pub destination_channels: Vec<(String, Vec<String>)>,
pub allow_adjust_parallelism: bool,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct MergeExchange {
pub destination_id: String,
pub ignore_exchange: bool,
pub channel_id: String,
pub allow_adjust_parallelism: bool,
}

Expand All @@ -55,17 +86,28 @@ impl MergeExchange {
destination_id,
ignore_exchange,
allow_adjust_parallelism,
channel_id: GlobalUniqName::unique(),
})
}
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct BroadcastExchange {
pub destination_ids: Vec<String>,
pub destination_channels: Vec<(String, Vec<String>)>,
}

impl BroadcastExchange {
pub fn create(destination_ids: Vec<String>) -> DataExchange {
DataExchange::Broadcast(BroadcastExchange { destination_ids })
let mut destination_channels = Vec::with_capacity(destination_ids.len());

for destination in &destination_ids {
destination_channels.push((destination.clone(), vec![GlobalUniqName::unique()]));
}

DataExchange::Broadcast(BroadcastExchange {
destination_ids,
destination_channels,
})
}
}
Loading
Loading