Skip to content

Commit 42686d1

Browse files
ldrozdz93LotharKAtt
authored andcommitted
parquet sink: log the event that breaks the schema (#26)
* more debugs for parquet failures * tidy
1 parent 6ac8343 commit 42686d1

1 file changed

Lines changed: 19 additions & 4 deletions

File tree

lib/codecs/src/encoding/format/parquet.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use parquet::{
1616
use serde::{Deserialize, Serialize};
1717
use snafu::*;
1818
use tokio_util::codec::Encoder;
19+
use tracing::error;
1920

2021
use vector_config::configurable_component;
2122
use vector_core::{
@@ -426,17 +427,31 @@ impl<'a, T, F: Fn(&Value) -> Result<T, ParquetSerializerError>> Column<'a, T, F>
426427

427428
fn extract_column(&mut self, events: &[Event]) -> Result<(), ParquetSerializerError> {
428429
for event in events {
429-
match event {
430+
let res = match event {
430431
Event::Log(log) => {
431-
self.extract_value(log.value(), Level::root())?;
432+
self.extract_value(log.value(), Level::root())
432433
}
433434
Event::Trace(trace) => {
434-
self.extract_value(trace.value(), Level::root())?;
435+
self.extract_value(trace.value(), Level::root())
435436
}
436437
Event::Metric(_) => {
437438
panic!("Metrics are not supported.");
438439
}
439-
}
440+
};
441+
res.inspect_err(|error| {
442+
// event to json string
443+
match serde_json::to_string(&event) {
444+
Ok(event) => error!(
445+
error = ?error,
446+
event = event,
447+
),
448+
Err(e) => error!(
449+
error = ?error,
450+
event = ?event,
451+
serde_error = %e,
452+
),
453+
}
454+
})?;
440455
}
441456
Ok(())
442457
}

0 commit comments

Comments
 (0)