diff --git a/core/integration/tests/mod.rs b/core/integration/tests/mod.rs index 7c543a3338..c46be600bb 100644 --- a/core/integration/tests/mod.rs +++ b/core/integration/tests/mod.rs @@ -40,6 +40,7 @@ mod mcp; mod sdk; mod server; mod state; +mod storage; lazy_static! { static ref TESTS_FAILED: AtomicBool = AtomicBool::new(false); diff --git a/core/integration/tests/storage/consumer_offsets.rs b/core/integration/tests/storage/consumer_offsets.rs new file mode 100644 index 0000000000..0fbc628da4 --- /dev/null +++ b/core/integration/tests/storage/consumer_offsets.rs @@ -0,0 +1,180 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use iggy_common::{ConsumerKind, IggyError}; +use server::streaming::partitions::storage::{load_consumer_group_offsets, load_consumer_offsets}; +use std::path::Path; +use std::sync::atomic::Ordering; + +fn write_offset_file(dir: &Path, name: &str, offset: u64) { + std::fs::write(dir.join(name), offset.to_le_bytes()).unwrap(); +} + +#[test] +fn load_consumer_offsets_valid_files() { + let dir = tempfile::tempdir().unwrap(); + write_offset_file(dir.path(), "1", 100); + write_offset_file(dir.path(), "2", 200); + write_offset_file(dir.path(), "3", 300); + + let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap(); + + assert_eq!(offsets.len(), 3); + assert_eq!(offsets[0].consumer_id, 1); + assert_eq!(offsets[0].offset.load(Ordering::Relaxed), 100); + assert_eq!(offsets[0].kind, ConsumerKind::Consumer); + assert_eq!(offsets[1].consumer_id, 2); + assert_eq!(offsets[1].offset.load(Ordering::Relaxed), 200); + assert_eq!(offsets[2].consumer_id, 3); + assert_eq!(offsets[2].offset.load(Ordering::Relaxed), 300); +} + +#[test] +fn load_consumer_offsets_skips_non_numeric_files() { + let dir = tempfile::tempdir().unwrap(); + write_offset_file(dir.path(), ".DS_Store", 0); + write_offset_file(dir.path(), "backup.bak", 0); + write_offset_file(dir.path(), "1", 42); + + let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap(); + + assert_eq!(offsets.len(), 1); + assert_eq!(offsets[0].consumer_id, 1); + assert_eq!(offsets[0].offset.load(Ordering::Relaxed), 42); +} + +#[test] +fn load_consumer_offsets_skips_truncated_files() { + let dir = tempfile::tempdir().unwrap(); + std::fs::write(dir.path().join("1"), [0u8; 3]).unwrap(); + std::fs::write(dir.path().join("2"), []).unwrap(); + write_offset_file(dir.path(), "3", 500); + + let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap(); + + assert_eq!(offsets.len(), 1); + assert_eq!(offsets[0].consumer_id, 3); + assert_eq!(offsets[0].offset.load(Ordering::Relaxed), 500); +} + +#[test] +fn load_consumer_offsets_empty_dir() { + let dir = tempfile::tempdir().unwrap(); + + let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap(); + + assert!(offsets.is_empty()); +} + +#[test] +fn load_consumer_offsets_skips_directories() { + let dir = tempfile::tempdir().unwrap(); + std::fs::create_dir(dir.path().join("123")).unwrap(); + write_offset_file(dir.path(), "1", 77); + + let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap(); + + assert_eq!(offsets.len(), 1); + assert_eq!(offsets[0].consumer_id, 1); + assert_eq!(offsets[0].offset.load(Ordering::Relaxed), 77); +} + +#[test] +fn load_consumer_offsets_nonexistent_dir() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().to_str().unwrap().to_string(); + drop(dir); + + let result = load_consumer_offsets(&path); + + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + IggyError::CannotReadConsumerOffsets(_) + )); +} + +#[test] +fn load_consumer_group_offsets_valid_files() { + let dir = tempfile::tempdir().unwrap(); + write_offset_file(dir.path(), "1", 500); + write_offset_file(dir.path(), "2", 600); + + let offsets = load_consumer_group_offsets(dir.path().to_str().unwrap()).unwrap(); + + assert_eq!(offsets.len(), 2); + for (group_id, offset) in &offsets { + assert_eq!(offset.kind, ConsumerKind::ConsumerGroup); + assert_eq!(offset.consumer_id, group_id.0 as u32); + } + let ids: Vec = offsets.iter().map(|(_, co)| co.consumer_id).collect(); + assert!(ids.contains(&1)); + assert!(ids.contains(&2)); +} + +#[test] +fn load_consumer_group_offsets_skips_non_numeric_files() { + let dir = tempfile::tempdir().unwrap(); + write_offset_file(dir.path(), ".DS_Store", 0); + write_offset_file(dir.path(), "notes.txt", 0); + write_offset_file(dir.path(), "5", 999); + + let offsets = load_consumer_group_offsets(dir.path().to_str().unwrap()).unwrap(); + + assert_eq!(offsets.len(), 1); + assert_eq!(offsets[0].0.0, 5); + assert_eq!(offsets[0].1.consumer_id, 5); + assert_eq!(offsets[0].1.offset.load(Ordering::Relaxed), 999); +} + +#[test] +fn load_consumer_group_offsets_skips_truncated_files() { + let dir = tempfile::tempdir().unwrap(); + std::fs::write(dir.path().join("1"), [0u8; 4]).unwrap(); + write_offset_file(dir.path(), "2", 750); + + let offsets = load_consumer_group_offsets(dir.path().to_str().unwrap()).unwrap(); + + assert_eq!(offsets.len(), 1); + assert_eq!(offsets[0].0.0, 2); + assert_eq!(offsets[0].1.offset.load(Ordering::Relaxed), 750); +} + +#[test] +fn load_consumer_group_offsets_empty_dir() { + let dir = tempfile::tempdir().unwrap(); + + let offsets = load_consumer_group_offsets(dir.path().to_str().unwrap()).unwrap(); + + assert!(offsets.is_empty()); +} + +#[test] +fn load_consumer_group_offsets_nonexistent_dir() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().to_str().unwrap().to_string(); + drop(dir); + + let result = load_consumer_group_offsets(&path); + + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + IggyError::CannotReadConsumerOffsets(_) + )); +} diff --git a/core/integration/tests/storage/mod.rs b/core/integration/tests/storage/mod.rs new file mode 100644 index 0000000000..8d4bac8b05 --- /dev/null +++ b/core/integration/tests/storage/mod.rs @@ -0,0 +1,19 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +mod consumer_offsets; diff --git a/core/server/src/streaming/partitions/storage.rs b/core/server/src/streaming/partitions/storage.rs index 13f4011f3f..e641b3ec31 100644 --- a/core/server/src/streaming/partitions/storage.rs +++ b/core/server/src/streaming/partitions/storage.rs @@ -25,10 +25,9 @@ use compio::{ fs::{self, OpenOptions, create_dir_all}, io::AsyncWriteAtExt, }; -use err_trail::ErrContext; use iggy_common::{ConsumerKind, IggyError}; use std::{io::Read, path::Path, sync::atomic::AtomicU64}; -use tracing::{error, trace}; +use tracing::{error, trace, warn}; pub async fn create_partition_file_hierarchy( stream_id: usize, @@ -170,20 +169,43 @@ pub fn load_consumer_offsets(path: &str) -> Result, IggyErro let mut consumer_offsets = Vec::new(); let dir_entries = dir_entries.unwrap(); for dir_entry in dir_entries { - let dir_entry = dir_entry.unwrap(); - let metadata = dir_entry.metadata(); - if metadata.is_err() { - break; - } + let dir_entry = match dir_entry { + Ok(entry) => entry, + Err(e) => { + warn!( + "Failed to read directory entry in consumer offsets path: {path}, \ + error: {e}, skipping." + ); + continue; + } + }; + + let metadata = match dir_entry.metadata() { + Ok(m) => m, + Err(e) => { + warn!( + "Failed to read metadata for entry in consumer offsets path: {path}, \ + error: {e}, skipping." + ); + continue; + } + }; - if metadata.unwrap().is_dir() { + if metadata.is_dir() { continue; } - let name = dir_entry.file_name().into_string().unwrap(); - let consumer_id = name.parse::().unwrap_or_else(|_| { - panic!("Invalid consumer ID file with name: '{}'.", name); - }); + let name = dir_entry.file_name().to_string_lossy().to_string(); + let consumer_id = match name.parse::() { + Ok(id) => id, + Err(_) => { + warn!( + "Unexpected non-numeric consumer offset file: '{}', skipping.", + name + ); + continue; + } + }; let path = dir_entry.path(); let path = path.to_str(); @@ -193,19 +215,25 @@ pub fn load_consumer_offsets(path: &str) -> Result, IggyErro } let path = path.unwrap().to_string(); - let file = std::fs::File::open(&path) - .error(|e: &std::io::Error| { - format!("{COMPONENT} (error: {e}) - failed to open offset file, path: {path}") - }) - .map_err(|_| IggyError::CannotReadFile)?; + let file = match std::fs::File::open(&path) { + Ok(f) => f, + Err(e) => { + warn!( + "{COMPONENT} (error: {e}) - failed to open offset file, \ + path: {path}, skipping." + ); + continue; + } + }; let mut cursor = std::io::Cursor::new(file); let mut offset = [0; 8]; - cursor - .get_mut().read_exact(&mut offset) - .error(|e: &std::io::Error| { - format!("{COMPONENT} (error: {e}) - failed to read consumer offset from file, path: {path}") - }) - .map_err(|_| IggyError::CannotReadFile)?; + if let Err(e) = cursor.get_mut().read_exact(&mut offset) { + warn!( + "{COMPONENT} (error: {e}) - failed to read consumer offset from file \ + (truncated or corrupt?), path: {path}, skipping." + ); + continue; + } let offset = AtomicU64::new(u64::from_le_bytes(offset)); consumer_offsets.push(ConsumerOffset { @@ -232,24 +260,44 @@ pub fn load_consumer_group_offsets( let mut consumer_group_offsets = Vec::new(); let dir_entries = dir_entries.unwrap(); for dir_entry in dir_entries { - let dir_entry = dir_entry.unwrap(); - let metadata = dir_entry.metadata(); - if metadata.is_err() { - break; - } + let dir_entry = match dir_entry { + Ok(entry) => entry, + Err(e) => { + warn!( + "Failed to read directory entry in consumer group offsets path: {path}, \ + error: {e}, skipping." + ); + continue; + } + }; + + let metadata = match dir_entry.metadata() { + Ok(m) => m, + Err(e) => { + warn!( + "Failed to read metadata for entry in consumer group offsets path: {path}, \ + error: {e}, skipping." + ); + continue; + } + }; - if metadata.unwrap().is_dir() { + if metadata.is_dir() { continue; } - let name = dir_entry.file_name().into_string().unwrap(); + let name = dir_entry.file_name().to_string_lossy().to_string(); - let consumer_group_id = name.parse::().unwrap_or_else(|_| { - panic!( - "Invalid consumer group ID in consumer group file with name: '{}'.", - name - ); - }); + let consumer_group_id = match name.parse::() { + Ok(id) => id, + Err(_) => { + warn!( + "Unexpected non-numeric consumer group offset file: '{}', skipping.", + name + ); + continue; + } + }; let consumer_group_id = ConsumerGroupId(consumer_group_id as usize); let path = dir_entry.path(); @@ -263,19 +311,25 @@ pub fn load_consumer_group_offsets( } let path = path.unwrap().to_string(); - let file = std::fs::File::open(&path) - .error(|e: &std::io::Error| { - format!("{COMPONENT} (error: {e}) - failed to open offset file, path: {path}") - }) - .map_err(|_| IggyError::CannotReadFile)?; + let file = match std::fs::File::open(&path) { + Ok(f) => f, + Err(e) => { + warn!( + "{COMPONENT} (error: {e}) - failed to open offset file, \ + path: {path}, skipping." + ); + continue; + } + }; let mut cursor = std::io::Cursor::new(file); let mut offset = [0; 8]; - cursor - .get_mut().read_exact(&mut offset) - .error(|e: &std::io::Error| { - format!("{COMPONENT} (error: {e}) - failed to read consumer group offset from file, path: {path}") - }) - .map_err(|_| IggyError::CannotReadFile)?; + if let Err(e) = cursor.get_mut().read_exact(&mut offset) { + warn!( + "{COMPONENT} (error: {e}) - failed to read consumer group offset from file \ + (truncated or corrupt?), path: {path}, skipping." + ); + continue; + } let offset = AtomicU64::new(u64::from_le_bytes(offset)); let consumer_offset = ConsumerOffset {