Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/tpcds-reusable.yml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ jobs:
components:
cargo
rustfmt
clippy

- name: Cargo clippy
run: |
# First eliminate unwrap; then enable -D warnings to enforce all default lints.
cargo clippy --all-targets --workspace -- -A warnings -A clippy::all -D clippy::unwrap_used

- name: Cargo test
run: |
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ members = [
"native-engine/auron-memmgr",
]

[workspace.lints.clippy]
unwrap_used = "deny"
panic = "deny"

[profile.release]
opt-level = 3
lto = true
Expand Down
4 changes: 4 additions & 0 deletions dev/mvn-build-helper/build-native.sh
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ if [ ! -f "$cache_libpath" ] || [ "$new_checksum" != "$old_checksum" ]; then
echo "Running cargo fmt..."
cargo fmt --all -q -- 2>&1

echo "Running cargo clippy..."
# First eliminate unwrap; then enable -D warnings to enforce all default lints.
cargo clippy --all-targets --workspace -- -A warnings -A clippy::all -D clippy::unwrap_used 2>&1

echo "Building native with [$profile] profile..."
cargo build --profile="$profile" $features_arg --verbose --locked --frozen 2>&1

Expand Down
16 changes: 10 additions & 6 deletions native-engine/auron-jni-bridge/src/jni_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,13 @@ macro_rules! jni_map_error_with_env {
match $result {
Ok(result) => $crate::jni_bridge::datafusion::error::Result::Ok(result),
Err($crate::jni_bridge::jni::errors::Error::JavaException) => {
let ex = $env.exception_occurred().unwrap();
$env.exception_describe().unwrap();
$env.exception_clear().unwrap();
let ex = $env
.exception_occurred()
.expect("failed to obtain pending Java execption object");
$env.exception_describe()
.expect("failed to print Java exception to stderr");
$env.exception_clear()
.expect("failed to clear pending Java exception");
let message_obj = $env
.call_method_unchecked(
ex,
Expand All @@ -102,13 +106,13 @@ macro_rules! jni_map_error_with_env {
.clone(),
&[],
)
.unwrap()
.expect("call Java Throwable.toString() failed")
.l()
.unwrap();
.expect("expected object return from Throwable.toString()");
let message = $env
.get_string(message_obj.into())
.map(|s| String::from(s))
.unwrap();
.expect("failed to read Throwable.toString() result as Java string");

Err(
$crate::jni_bridge::datafusion::error::DataFusionError::External(
Expand Down
4 changes: 3 additions & 1 deletion native-engine/auron-jni-bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ pub fn ensure_jni_bridge_inited() -> Result<()> {

pub fn is_task_running() -> bool {
fn is_task_running_impl() -> Result<bool> {
if !jni_call_static!(JniBridge.isTaskRunning() -> bool).unwrap() {
if !jni_call_static!(JniBridge.isTaskRunning() -> bool)
.expect("calling JniBridge.isTaskRunning() error")
{
jni_exception_clear!()?;
return Ok(false);
}
Expand Down
90 changes: 66 additions & 24 deletions native-engine/auron-serde/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,11 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
Ok(Arc::new(FilterExec::try_new(predicates, input)?))
}
PhysicalPlanType::ParquetScan(scan) => {
let conf: FileScanConfig = scan.base_conf.as_ref().unwrap().try_into()?;
let conf: FileScanConfig = scan
.base_conf
.as_ref()
.expect("base_conf must be set for ParquetScan")
.try_into()?;
let predicate = scan
.pruning_predicates
.iter()
Expand All @@ -168,7 +172,11 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
)))
}
PhysicalPlanType::OrcScan(scan) => {
let conf: FileScanConfig = scan.base_conf.as_ref().unwrap().try_into()?;
let conf: FileScanConfig = scan
.base_conf
.as_ref()
.expect("base_conf must be set for OrcScan")
.try_into()?;
let predicate = scan
.pruning_predicates
.iter()
Expand All @@ -192,10 +200,18 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
.on
.iter()
.map(|col| {
let left_key =
try_parse_physical_expr(&col.left.as_ref().unwrap(), &left.schema())?;
let right_key =
try_parse_physical_expr(&col.right.as_ref().unwrap(), &right.schema())?;
let left_key = try_parse_physical_expr(
&col.left
.as_ref()
.expect("hash join: left join key must be present"),
&left.schema(),
)?;
let right_key = try_parse_physical_expr(
&col.right
.as_ref()
.expect("hash join: right join key must be present"),
&right.schema(),
)?;
Ok((left_key, right_key))
})
.collect::<Result<_, Self::Error>>()?;
Expand Down Expand Up @@ -229,10 +245,18 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
.on
.iter()
.map(|col| {
let left_key =
try_parse_physical_expr(&col.left.as_ref().unwrap(), &left.schema())?;
let right_key =
try_parse_physical_expr(&col.right.as_ref().unwrap(), &right.schema())?;
let left_key = try_parse_physical_expr(
&col.left
.as_ref()
.expect("sort-merge join: left join key must be present"),
&left.schema(),
)?;
let right_key = try_parse_physical_expr(
&col.right
.as_ref()
.expect("sort-merge join: right join key must be present"),
&right.schema(),
)?;
Ok((left_key, right_key))
})
.collect::<Result<_, Self::Error>>()?;
Expand Down Expand Up @@ -270,7 +294,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {

Ok(Arc::new(ShuffleWriterExec::try_new(
input,
output_partitioning.unwrap(),
output_partitioning.expect("shuffle writer: output_partitioning must be set"),
shuffle_writer.output_data_file.clone(),
shuffle_writer.output_index_file.clone(),
)?))
Expand All @@ -285,7 +309,8 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
)?;
Ok(Arc::new(RssShuffleWriterExec::try_new(
input,
output_partitioning.unwrap(),
output_partitioning
.expect("rss shuffle writer: output_partitioning must be set"),
rss_shuffle_writer.rss_partition_writer_resource_id.clone(),
)?))
}
Expand Down Expand Up @@ -339,10 +364,18 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
.on
.iter()
.map(|col| {
let left_key =
try_parse_physical_expr(&col.left.as_ref().unwrap(), &left.schema())?;
let right_key =
try_parse_physical_expr(&col.right.as_ref().unwrap(), &right.schema())?;
let left_key = try_parse_physical_expr(
&col.left
.as_ref()
.expect("broadcast join: left join key must be present"),
&left.schema(),
)?;
let right_key = try_parse_physical_expr(
&col.right
.as_ref()
.expect("broadcast join: right join key must be present"),
&right.schema(),
)?;
Ok((left_key, right_key))
})
.collect::<Result<_, Self::Error>>()?;
Expand Down Expand Up @@ -471,7 +504,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {

let agg = match AggFunction::from(agg_function) {
AggFunction::Udaf => {
let udaf = agg_node.udaf.as_ref().unwrap();
let udaf = agg_node.udaf.as_ref().expect("udaf missing");
let serialized = udaf.serialized.clone();
create_udaf_agg(serialized, return_type, agg_children_exprs)?
}
Expand Down Expand Up @@ -696,7 +729,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
children,
)?,
GenerateFunction::Udtf => {
let udtf = pb_generator.udtf.as_ref().unwrap();
let udtf = pb_generator.udtf.as_ref().expect("udtf missing");
let serialized = udtf.serialized.clone();
let return_schema = Arc::new(convert_required!(udtf.return_schema)?);
create_udtf_generator(serialized, return_schema, children)?
Expand Down Expand Up @@ -1136,21 +1169,27 @@ pub fn parse_protobuf_partitioning(
.collect::<Result<Vec<PhysicalExprRef>, _>>()?;
Ok(Some(Partitioning::HashPartitioning(
expr,
hash_part.partition_count.try_into().unwrap(),
hash_part
.partition_count
.try_into()
.expect("hash repartition: invalid partition_count"),
)))
}

RepartitionType::RoundRobinRepartition(round_robin_part) => {
Ok(Some(Partitioning::RoundRobinPartitioning(
round_robin_part.partition_count.try_into().unwrap(),
round_robin_part
.partition_count
.try_into()
.expect("round-robin repartition: invalid partition_count"),
)))
}

RepartitionType::RangeRepartition(range_part) => {
if range_part.partition_count == 1 {
Ok(Some(Partitioning::SinglePartitioning()))
} else {
let sort = range_part.sort_expr.clone().unwrap();
let sort = range_part.sort_expr.clone().expect("sort_expr missing");
let exprs = try_parse_physical_sort_expr(&input, &sort).unwrap_or_else(|e| {
panic!("Failed to parse physical sort expressions: {}", e);
});
Expand Down Expand Up @@ -1187,7 +1226,10 @@ pub fn parse_protobuf_partitioning(
let bound_rows = sort_row_converter.lock().convert_columns(&bound_cols)?;
Ok(Some(Partitioning::RangePartitioning(
exprs,
range_part.partition_count.try_into().unwrap(),
range_part
.partition_count
.try_into()
.expect("range partition: invalid partition_count"),
Arc::new(bound_rows),
)))
}
Expand Down Expand Up @@ -1250,12 +1292,12 @@ impl From<&protobuf::ColumnStats> for ColumnStatistics {
max_value: cs
.max_value
.as_ref()
.map(|m| Precision::Exact(m.try_into().unwrap()))
.map(|m| Precision::Exact(m.try_into().expect("invalid max_value")))
.unwrap_or(Precision::Absent),
min_value: cs
.min_value
.as_ref()
.map(|m| Precision::Exact(m.try_into().unwrap()))
.map(|m| Precision::Exact(m.try_into().expect("invalid min_value")))
.unwrap_or(Precision::Absent),
sum_value: Precision::Absent,
distinct_count: Precision::Exact(cs.distinct_count as usize),
Expand Down
1 change: 1 addition & 0 deletions native-engine/auron/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ panic-message = { workspace = true }
prost = { workspace = true }
tokio = { workspace = true }
chrono = { workspace = true }
parking_lot = { workspace = true }

[dependencies.tikv-jemalloc-ctl]
version = "0.6.1"
Expand Down
9 changes: 4 additions & 5 deletions native-engine/auron/src/alloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@

use std::{
alloc::{GlobalAlloc, Layout},
sync::{
Mutex,
atomic::{AtomicUsize, Ordering::SeqCst},
},
sync::atomic::{AtomicUsize, Ordering::SeqCst},
};

use parking_lot::Mutex;

#[cfg(any(feature = "jemalloc", feature = "jemalloc-pprof"))]
#[cfg(not(windows))]
#[cfg_attr(not(windows), global_allocator)]
Expand Down Expand Up @@ -57,7 +56,7 @@ impl<T: GlobalAlloc> DebugAlloc<T> {
}

fn update(&self) {
let _lock = self.mutex.lock().unwrap();
let _lock = self.mutex.lock();
let current = self.current.load(SeqCst);
let last_updated = self.last_updated.load(SeqCst);
let delta = (current as isize - last_updated as isize).abs();
Expand Down
10 changes: 8 additions & 2 deletions native-engine/auron/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ pub extern "system" fn Java_org_apache_auron_jni_JniBridge_callNative(
INIT.get_or_try_init(|| {
// logging is not initialized at this moment
eprintln!("------ initializing auron native environment ------");
let log_level = env.get_string(log_level).map(|s| String::from(s)).unwrap();
let log_level = env
.get_string(log_level)
.map(|s| String::from(s))
.expect("init: failed to read log_level from env");
eprintln!("initializing logging with level: {}", log_level);
init_logging(log_level.as_str());

Expand Down Expand Up @@ -103,7 +106,10 @@ pub extern "system" fn Java_org_apache_auron_jni_JniBridge_callNative(
// create execution runtime
let runtime = Box::new(NativeExecutionRuntime::start(
native_wrapper,
SESSION.get().unwrap().task_ctx(),
SESSION
.get()
.expect("session must be initialized")
.task_ctx(),
)?);

// returns runtime raw pointer
Expand Down
9 changes: 4 additions & 5 deletions native-engine/auron/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ mod memory_profiling;
#[cfg(feature = "jemalloc-pprof")]
mod pprof;

use std::sync::Mutex;

use once_cell::sync::OnceCell;
use parking_lot::Mutex;
use poem::{Route, RouteMethod, Server, listener::TcpListener};

pub static HTTP_SERVICE: OnceCell<HttpService> = OnceCell::new();
Expand Down Expand Up @@ -50,7 +49,7 @@ impl DefaultHTTPServer {
.worker_threads(1)
.enable_io()
.build()
.unwrap(),
.expect("fast fail: error initializing tokio runtime"),
handlers: Mutex::new(vec![]),
}
}
Expand All @@ -66,7 +65,7 @@ impl HTTPServer for DefaultHTTPServer {
fn start(&self) {
if let Some(port) = find_available_port() {
let mut app = Route::new();
let handlers = self.handlers.lock().unwrap();
let handlers = self.handlers.lock();
for handler in handlers.iter() {
app = app.at(handler.get_route_path(), handler.get_route_method());
}
Expand All @@ -83,7 +82,7 @@ impl HTTPServer for DefaultHTTPServer {
}

fn register_handler(&self, handler: Box<dyn Handler + Send + Sync>) {
let mut handlers = self.handlers.lock().unwrap();
let mut handlers = self.handlers.lock();
handlers.push(handler);
}
}
Expand Down
2 changes: 1 addition & 1 deletion native-engine/auron/src/http/pprof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl Default for PProfRequest {
fn default() -> Self {
PProfRequest {
seconds: 5,
frequency: NonZeroI32::new(100).unwrap(),
frequency: NonZeroI32::new(100).expect("non-zero frequency"),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion native-engine/auron/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn handle_unwinded(err: Box<dyn Any + Send>) {
}

fn handle_unwinded_scope<T: Default, E: Debug>(scope: impl FnOnce() -> Result<T, E>) -> T {
match std::panic::catch_unwind(AssertUnwindSafe(|| scope().unwrap())) {
match std::panic::catch_unwind(AssertUnwindSafe(|| scope().expect("scope failed"))) {
Ok(v) => v,
Err(err) => {
handle_unwinded(err);
Expand Down
Loading