Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions crates/fluss/src/client/table/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::client::connection::FlussConnection;
use crate::client::metadata::Metadata;
use crate::error::{Error, Result};
use crate::metadata::{RowType, TableBucket, TableInfo};
use crate::record::kv::SCHEMA_ID_LENGTH;
use crate::row::InternalRow;
use crate::row::compacted::CompactedRow;
use crate::row::encode::{KeyEncoder, KeyEncoderFactory};
Expand Down Expand Up @@ -64,7 +65,10 @@ impl<'a> LookupResult<'a> {
pub fn get_single_row(&self) -> Result<Option<CompactedRow<'_>>> {
match self.rows.len() {
0 => Ok(None),
1 => Ok(Some(CompactedRow::from_bytes(self.row_type, &self.rows[0]))),
1 => Ok(Some(CompactedRow::from_bytes(
self.row_type,
&self.rows[0][SCHEMA_ID_LENGTH..],
))),
_ => Err(Error::UnexpectedError {
message: "LookupResult contains multiple rows, use get_rows() instead".to_string(),
source: None,
Comment on lines -67 to 74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more of a brainstorming for later, on paper can still panic if the response is smaller than 2 bytes right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. That is right, however that would be an unexpected case. I would expect that schema id field and header to be there, having said that, making this more defensive by returning Error would be a good improvement to make

Expand All @@ -76,7 +80,8 @@ impl<'a> LookupResult<'a> {
pub fn get_rows(&self) -> Vec<CompactedRow<'_>> {
self.rows
.iter()
.map(|bytes| CompactedRow::from_bytes(self.row_type, bytes))
// TODO Add schema id check and fetch when implementing prefix lookup
.map(|bytes| CompactedRow::from_bytes(self.row_type, &bytes[SCHEMA_ID_LENGTH..]))
.collect()
}
}
Expand Down
Loading