Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ serde_with_macros: 3.16.1, "Apache-2.0 OR MIT",
serde_yaml_ng: 0.10.0, "MIT",
serial_test: 3.3.1, "MIT",
serial_test_derive: 3.3.1, "MIT",
server: 0.6.3-edge.1, "Apache-2.0",
server: 0.6.3-edge.2, "Apache-2.0",
sha1: 0.10.6, "Apache-2.0 OR MIT",
sha2: 0.10.9, "Apache-2.0 OR MIT",
sha3: 0.10.8, "Apache-2.0 OR MIT",
Expand Down
3 changes: 1 addition & 2 deletions core/binary_protocol/src/utils/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,8 +656,7 @@ pub fn map_topic(payload: Bytes) -> Result<TopicDetails, IggyError> {
compression_algorithm: topic.compression_algorithm,
max_topic_size: topic.max_topic_size,
replication_factor: topic.replication_factor,
#[allow(clippy::cast_possible_truncation)]
partitions_count: partitions.len() as u32,
partitions_count: topic.partitions_count,
partitions,
};
Ok(topic)
Expand Down
2 changes: 1 addition & 1 deletion core/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

[package]
name = "server"
version = "0.6.3-edge.1"
version = "0.6.3-edge.2"
edition = "2024"
license = "Apache-2.0"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,14 @@
use crate::binary::command::{
BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
};
use crate::binary::handlers::consumer_groups::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::shard::IggyShard;
use crate::shard::transmission::frame::ShardResponse;
use crate::shard::transmission::message::{
ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
};
use crate::state::command::EntryCommand;
use crate::state::models::CreateConsumerGroupWithId;
use crate::shard::transmission::message::{ShardRequest, ShardRequestPayload};
use crate::streaming::session::Session;
use err_trail::ErrContext;
use iggy_common::create_consumer_group::CreateConsumerGroup;
use iggy_common::{Identifier, IggyError, SenderKind};
use iggy_common::{IggyError, SenderKind};
use std::rc::Rc;
use tracing::{debug, instrument};

Expand All @@ -51,94 +45,23 @@ impl ServerCommandHandler for CreateConsumerGroup {
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
shard.ensure_authenticated(session)?;
let (stream_id, topic_id) = shard.resolve_topic_id(&self.stream_id, &self.topic_id)?;
shard
.metadata
.perm_create_consumer_group(session.get_user_id(), stream_id, topic_id)?;

let request = ShardRequest {
stream_id: self.stream_id.clone(),
topic_id: self.topic_id.clone(),
partition_id: 0,
payload: ShardRequestPayload::CreateConsumerGroup {
let request =
ShardRequest::control_plane(ShardRequestPayload::CreateConsumerGroupRequest {
user_id: session.get_user_id(),
stream_id: self.stream_id.clone(),
topic_id: self.topic_id.clone(),
name: self.name.clone(),
},
};

let message = ShardMessage::Request(request);
match shard.send_request_to_shard_or_recoil(None, message).await? {
ShardSendRequestResult::Recoil(message) => {
if let ShardMessage::Request(ShardRequest { payload, .. }) = message
&& let ShardRequestPayload::CreateConsumerGroup {
stream_id: recoil_stream_id,
topic_id: recoil_topic_id,
name,
..
} = payload
{
let cg_id =
shard.create_consumer_group(&recoil_stream_id, &recoil_topic_id, name)?;

let stream_id = self.stream_id.clone();
let topic_id = self.topic_id.clone();
shard
.state
.apply(
session.get_user_id(),
&EntryCommand::CreateConsumerGroup(CreateConsumerGroupWithId {
group_id: cg_id as u32,
command: self,
}),
)
.await
.error(|e: &IggyError| {
format!(
"{COMPONENT} (error: {e}) - failed to apply create consumer group for stream_id: {stream_id}, topic_id: {topic_id}, group_id: {cg_id}, session: {session}"
)
})?;
command: self,
});

let (numeric_stream_id, numeric_topic_id) =
shard.resolve_topic_id(&stream_id, &topic_id)?;

let cg_identifier = Identifier::numeric(cg_id as u32).unwrap();
let response = shard
.metadata
.get_consumer_group(numeric_stream_id, numeric_topic_id, cg_id)
.map(|cg| mapper::map_consumer_group_from_meta(&cg))
.ok_or_else(|| {
IggyError::ConsumerGroupIdNotFound(cg_identifier, topic_id.clone())
})?;
sender.send_ok_response(&response).await?;
} else {
unreachable!(
"Expected a CreateConsumerGroup request inside of CreateConsumerGroup handler"
);
}
match shard.send_to_control_plane(request).await? {
ShardResponse::CreateConsumerGroupResponse(data) => {
sender
.send_ok_response(&mapper::map_consumer_group_from_response(&data))
.await?;
}
ShardSendRequestResult::Response(response) => match response {
ShardResponse::CreateConsumerGroupResponse(cg_id) => {
let (numeric_stream_id, numeric_topic_id) =
shard.resolve_topic_id(&self.stream_id, &self.topic_id)?;
ShardResponse::ErrorResponse(err) => return Err(err),
_ => unreachable!("Expected CreateConsumerGroupResponse"),
}

let cg_identifier = Identifier::numeric(cg_id as u32).unwrap();
let response = shard
.metadata
.get_consumer_group(numeric_stream_id, numeric_topic_id, cg_id)
.map(|cg| mapper::map_consumer_group_from_meta(&cg))
.ok_or_else(|| {
IggyError::ConsumerGroupIdNotFound(cg_identifier, self.topic_id.clone())
})?;
sender.send_ok_response(&response).await?;
}
ShardResponse::ErrorResponse(err) => return Err(err),
_ => unreachable!(
"Expected a CreateConsumerGroupResponse inside of CreateConsumerGroup handler"
),
},
};
Ok(HandlerResult::Finished)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,11 @@
use crate::binary::command::{
BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
};
use crate::binary::handlers::consumer_groups::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::metadata::ConsumerGroupMeta;
use crate::shard::IggyShard;
use crate::shard::transmission::frame::ShardResponse;
use crate::shard::transmission::message::{
ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
};
use crate::state::command::EntryCommand;
use crate::streaming::polling_consumer::ConsumerGroupId;
use crate::shard::transmission::message::{ShardRequest, ShardRequestPayload};
use crate::streaming::session::Session;
use err_trail::ErrContext;
use iggy_common::delete_consumer_group::DeleteConsumerGroup;
use iggy_common::{IggyError, SenderKind};
use std::rc::Rc;
Expand All @@ -51,112 +44,21 @@ impl ServerCommandHandler for DeleteConsumerGroup {
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
shard.ensure_authenticated(session)?;
let (numeric_stream_id, numeric_topic_id) =
shard.resolve_topic_id(&self.stream_id, &self.topic_id)?;
shard.metadata.perm_delete_consumer_group(
session.get_user_id(),
numeric_stream_id,
numeric_topic_id,
)?;

let request = ShardRequest {
stream_id: self.stream_id.clone(),
topic_id: self.topic_id.clone(),
partition_id: 0,
payload: ShardRequestPayload::DeleteConsumerGroup {
let request =
ShardRequest::control_plane(ShardRequestPayload::DeleteConsumerGroupRequest {
user_id: session.get_user_id(),
stream_id: self.stream_id.clone(),
topic_id: self.topic_id.clone(),
group_id: self.group_id.clone(),
},
};
command: self,
});

let message = ShardMessage::Request(request);
let cg_meta: ConsumerGroupMeta = match shard
.send_request_to_shard_or_recoil(None, message)
.await?
{
ShardSendRequestResult::Recoil(message) => {
if let ShardMessage::Request(ShardRequest { payload, .. }) = message
&& let ShardRequestPayload::DeleteConsumerGroup {
stream_id,
topic_id,
group_id,
..
} = payload
{
shard.delete_consumer_group(&stream_id, &topic_id, &group_id).error(|e: &IggyError| {
format!(
"{COMPONENT} (error: {e}) - failed to delete consumer group with ID: {} for topic with ID: {} in stream with ID: {} for session: {}",
group_id, topic_id, stream_id, session
)
})?
} else {
unreachable!(
"Expected a DeleteConsumerGroup request inside of DeleteConsumerGroup handler"
);
}
}
ShardSendRequestResult::Response(response) => match response {
ShardResponse::DeleteConsumerGroupResponse => {
sender.send_empty_ok_response().await?;
return Ok(HandlerResult::Finished);
}
ShardResponse::ErrorResponse(err) => return Err(err),
_ => unreachable!(
"Expected a DeleteConsumerGroupResponse inside of DeleteConsumerGroup handler"
),
},
};

let cg_id = cg_meta.id;

for (_, member) in cg_meta.members.iter() {
if let Err(err) = shard.client_manager.leave_consumer_group(
member.client_id,
numeric_stream_id,
numeric_topic_id,
cg_id,
) {
tracing::warn!(
"{COMPONENT} (error: {err}) - failed to make client leave consumer group for client ID: {}, group ID: {}",
member.client_id,
cg_id
);
match shard.send_to_control_plane(request).await? {
ShardResponse::DeleteConsumerGroupResponse => {
sender.send_empty_ok_response().await?;
}
ShardResponse::ErrorResponse(err) => return Err(err),
_ => unreachable!("Expected DeleteConsumerGroupResponse"),
}

let cg_id_spez = ConsumerGroupId(cg_id);
shard.delete_consumer_group_offsets(
cg_id_spez,
&self.stream_id,
&self.topic_id,
&cg_meta.partitions,
).await.error(|e: &IggyError| {
format!(
"{COMPONENT} (error: {e}) - failed to delete consumer group offsets for group ID: {} in stream: {}, topic: {}",
cg_id_spez,
self.stream_id,
self.topic_id
)
})?;

let stream_id = self.stream_id.clone();
let topic_id = self.topic_id.clone();
shard
.state
.apply(
session.get_user_id(),
&EntryCommand::DeleteConsumerGroup(self),
)
.await
.error(|e: &IggyError| {
format!(
"{COMPONENT} (error: {e}) - failed to apply delete consumer group for stream_id: {}, topic_id: {}, group_id: {cg_id}, session: {session}",
stream_id, topic_id
)
})?;
sender.send_empty_ok_response().await?;
Ok(HandlerResult::Finished)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,20 @@ impl ServerCommandHandler for GetConsumerGroup {
) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
shard.ensure_authenticated(session)?;
let Ok((stream_id, topic_id, numeric_group_id)) =
shard.resolve_consumer_group_id(&self.stream_id, &self.topic_id, &self.group_id)

let Some(consumer_group) = shard.metadata.query_consumer_group(
session.get_user_id(),
&self.stream_id,
&self.topic_id,
&self.group_id,
)?
else {
sender.send_empty_ok_response().await?;
return Ok(HandlerResult::Finished);
};
if shard
.metadata
.perm_get_consumer_group(session.get_user_id(), stream_id, topic_id)
.is_err()
{
sender.send_empty_ok_response().await?;
return Ok(HandlerResult::Finished);
}

let consumer_group = shard
.metadata
.get_consumer_group(stream_id, topic_id, numeric_group_id)
.map(|cg| mapper::map_consumer_group_from_meta(&cg))
.ok_or_else(|| {
IggyError::ConsumerGroupIdNotFound(self.group_id.clone(), self.topic_id.clone())
})?;
sender.send_ok_response(&consumer_group).await?;
let response = mapper::map_consumer_group_from_meta(&consumer_group);
sender.send_ok_response(&response).await?;
Ok(HandlerResult::Finished)
}
}
Expand Down
Loading
Loading