diff --git a/crates/paimon/src/arrow/format/mosaic.rs b/crates/paimon/src/arrow/format/mosaic.rs index 74727ba6..f631fada 100644 --- a/crates/paimon/src/arrow/format/mosaic.rs +++ b/crates/paimon/src/arrow/format/mosaic.rs @@ -17,8 +17,9 @@ use super::{FilePredicates, FormatFileReader}; use crate::arrow::build_target_arrow_schema; +use crate::arrow::filtering::{predicates_may_match_with_schema, StatsAccessor}; use crate::io::FileRead; -use crate::spec::DataField; +use crate::spec::{DataField, DataType as PaimonDataType, Datum, Predicate}; use crate::table::{ArrowRecordBatchStream, RowRange}; use crate::Error; use arrow_array::{ArrayRef, RecordBatch, RecordBatchOptions, UInt64Array}; @@ -28,7 +29,10 @@ use async_trait::async_trait; use bytes::Bytes; use futures::StreamExt; use paimon_mosaic_core::reader::{InputFile, MosaicReader, ReaderAccess}; -use std::collections::HashSet; +use paimon_mosaic_core::schema::MosaicSchema; +use paimon_mosaic_core::stats::ColumnStats; +use paimon_mosaic_core::values::Value as MosaicValue; +use std::collections::{HashMap, HashSet}; use std::io; pub(crate) struct MosaicFormatReader; @@ -42,11 +46,10 @@ impl FormatFileReader for MosaicFormatReader { reader: Box, file_size: u64, read_fields: &[DataField], - _predicates: Option<&FilePredicates>, + predicates: Option<&FilePredicates>, batch_size: Option, row_selection: Option>, ) -> crate::Result { - // Mosaic predicates are currently residual; callers must re-check them for exact filtering. let file_bytes = reader.read(0..file_size).await?; let mosaic_reader = MosaicReader::new(MemoryInputFile::new(file_bytes), file_size) .map_err(mosaic_read_error)?; @@ -70,6 +73,16 @@ impl FormatFileReader for MosaicFormatReader { .collect::>(); let all_projected_columns_missing = !read_fields.is_empty() && projected_names.is_empty(); let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE); + let predicate_state = predicates.map(|predicates| { + let file_fields = predicates.file_fields.clone(); + let file_column_indices = + build_file_column_indices(mosaic_reader.schema(), &file_fields); + ( + predicates.predicates.clone(), + file_fields, + file_column_indices, + ) + }); Ok(try_stream! { let mut row_group_start = 0usize; @@ -95,6 +108,21 @@ impl FormatFileReader for MosaicFormatReader { } } + if let Some((predicates, file_fields, file_column_indices)) = &predicate_state { + let row_group_stats = mosaic_reader + .row_group_stats(row_group_index) + .map_err(mosaic_read_error)?; + if !row_group_may_match( + row_group_rows, + row_group_stats, + file_column_indices, + predicates, + file_fields, + )? { + continue; + } + } + let batch = if all_projected_columns_missing { let row_count = selected_indices .as_ref() @@ -123,6 +151,167 @@ impl FormatFileReader for MosaicFormatReader { } } +struct MosaicRowGroupStats<'a> { + row_count: i64, + row_group_stats: &'a [ColumnStats], + file_column_indices: &'a [Option], +} + +impl StatsAccessor for MosaicRowGroupStats<'_> { + fn row_count(&self) -> i64 { + self.row_count + } + + fn null_count(&self, index: usize) -> Option { + i64::try_from(self.column_stats(index)?.null_count).ok() + } + + fn min_value(&self, index: usize, data_type: &PaimonDataType) -> Option { + mosaic_value_to_datum(self.column_stats(index)?.min.as_ref()?, data_type) + } + + fn max_value(&self, index: usize, data_type: &PaimonDataType) -> Option { + mosaic_value_to_datum(self.column_stats(index)?.max.as_ref()?, data_type) + } +} + +impl MosaicRowGroupStats<'_> { + fn column_stats(&self, index: usize) -> Option<&ColumnStats> { + let column_index = self.file_column_indices.get(index).copied().flatten()?; + self.row_group_stats + .iter() + .find(|stats| stats.column_index == column_index) + } +} + +fn row_group_may_match( + row_group_rows: usize, + row_group_stats: &[ColumnStats], + file_column_indices: &[Option], + predicates: &[Predicate], + file_fields: &[DataField], +) -> crate::Result { + let row_count = i64::try_from(row_group_rows).map_err(|e| Error::DataInvalid { + message: "Mosaic row group row count exceeds i64".to_string(), + source: Some(Box::new(e)), + })?; + let identity_mapping = (0..file_fields.len()).map(Some).collect::>(); + let stats = MosaicRowGroupStats { + row_count, + row_group_stats, + file_column_indices, + }; + Ok(predicates_may_match_with_schema( + predicates, + &stats, + &identity_mapping, + file_fields, + )) +} + +fn build_file_column_indices( + mosaic_schema: &MosaicSchema, + file_fields: &[DataField], +) -> Vec> { + let by_name = mosaic_schema + .columns + .iter() + .enumerate() + .map(|(index, column)| (column.name.as_str(), index)) + .collect::>(); + file_fields + .iter() + .map(|field| by_name.get(field.name()).copied()) + .collect() +} + +fn mosaic_value_to_datum(value: &MosaicValue, data_type: &PaimonDataType) -> Option { + match (value, data_type) { + (MosaicValue::Boolean(value), PaimonDataType::Boolean(_)) => Some(Datum::Bool(*value)), + (MosaicValue::TinyInt(value), PaimonDataType::TinyInt(_)) => Some(Datum::TinyInt(*value)), + (MosaicValue::SmallInt(value), PaimonDataType::SmallInt(_)) => { + Some(Datum::SmallInt(*value)) + } + (MosaicValue::Integer(value), PaimonDataType::Int(_)) => Some(Datum::Int(*value)), + (MosaicValue::BigInt(value), PaimonDataType::BigInt(_)) => Some(Datum::Long(*value)), + (MosaicValue::Float(value), PaimonDataType::Float(_)) => Some(Datum::Float(*value)), + (MosaicValue::Double(value), PaimonDataType::Double(_)) => Some(Datum::Double(*value)), + (MosaicValue::Date(value), PaimonDataType::Date(_)) => Some(Datum::Date(*value)), + (MosaicValue::Time(value), PaimonDataType::Time(_)) => Some(Datum::Time(*value)), + (MosaicValue::String(value), PaimonDataType::Char(_)) + | (MosaicValue::String(value), PaimonDataType::VarChar(_)) => { + String::from_utf8(value.clone()).ok().map(Datum::String) + } + (MosaicValue::Bytes(value), PaimonDataType::Binary(_)) + | (MosaicValue::Bytes(value), PaimonDataType::VarBinary(_)) => { + Some(Datum::Bytes(value.clone())) + } + (MosaicValue::DecimalCompact(value), PaimonDataType::Decimal(decimal)) => { + Some(Datum::Decimal { + unscaled: i128::from(*value), + precision: decimal.precision(), + scale: decimal.scale(), + }) + } + (MosaicValue::TimestampMillis(value), PaimonDataType::Timestamp(timestamp)) + if timestamp.precision() <= 3 => + { + Some(Datum::Timestamp { + millis: *value, + nanos: 0, + }) + } + (MosaicValue::TimestampMillis(value), PaimonDataType::LocalZonedTimestamp(timestamp)) + if timestamp.precision() <= 3 => + { + Some(Datum::LocalZonedTimestamp { + millis: *value, + nanos: 0, + }) + } + (MosaicValue::TimestampMicros(value), PaimonDataType::Timestamp(timestamp)) + if (4..=6).contains(×tamp.precision()) => + { + let (millis, nanos) = micros_to_millis_nanos(*value); + Some(Datum::Timestamp { millis, nanos }) + } + (MosaicValue::TimestampMicros(value), PaimonDataType::LocalZonedTimestamp(timestamp)) + if (4..=6).contains(×tamp.precision()) => + { + let (millis, nanos) = micros_to_millis_nanos(*value); + Some(Datum::LocalZonedTimestamp { millis, nanos }) + } + ( + MosaicValue::TimestampNanos { + millis, + nanos_of_milli, + }, + PaimonDataType::Timestamp(timestamp), + ) if (7..=9).contains(×tamp.precision()) => Some(Datum::Timestamp { + millis: *millis, + nanos: *nanos_of_milli, + }), + ( + MosaicValue::TimestampNanos { + millis, + nanos_of_milli, + }, + PaimonDataType::LocalZonedTimestamp(timestamp), + ) if (7..=9).contains(×tamp.precision()) => Some(Datum::LocalZonedTimestamp { + millis: *millis, + nanos: *nanos_of_milli, + }), + _ => None, + } +} + +fn micros_to_millis_nanos(micros: i64) -> (i64, i32) { + ( + micros.div_euclid(1000), + (micros.rem_euclid(1000) * 1000) as i32, + ) +} + #[derive(Clone)] struct MemoryInputFile { data: Bytes, @@ -341,9 +530,12 @@ fn mosaic_read_error(error: io::Error) -> Error { #[cfg(test)] mod tests { use super::*; - use crate::arrow::format::FormatFileReader; - use crate::spec::{ArrayType, DataType, IntType, RowType, VarCharType}; - use arrow_array::{Array, Int32Array, StringArray}; + use crate::arrow::format::{FilePredicates, FormatFileReader}; + use crate::spec::{ + ArrayType, DataType, Datum, IntType, Predicate, PredicateBuilder, RowType, TimestampType, + VarCharType, + }; + use arrow_array::{Array, Int32Array, StringArray, TimestampMicrosecondArray}; use arrow_schema::{DataType as ArrowDataType, Field, Schema}; use bytes::Bytes; use futures::TryStreamExt; @@ -434,20 +626,39 @@ mod tests { .unwrap() } + fn batch(ids: Vec, names: Vec<&str>, scores: Vec) -> RecordBatch { + RecordBatch::try_new( + arrow_schema(), + vec![ + Arc::new(Int32Array::from(ids)), + Arc::new(StringArray::from(names)), + Arc::new(Int32Array::from(scores)), + ], + ) + .unwrap() + } + fn write_mosaic(batch: &RecordBatch) -> Bytes { + write_mosaic_batches(std::slice::from_ref(batch), Vec::new()) + } + + fn write_mosaic_batches(batches: &[RecordBatch], stats_columns: Vec) -> Bytes { let out = MemOutputFile::new(); let mut writer = MosaicWriter::new( out, - batch.schema().as_ref(), + batches[0].schema().as_ref(), WriterOptions { compression: COMPRESSION_NONE, num_buckets: 2, - row_group_max_size: u64::MAX, + row_group_max_size: 1, + stats_columns, ..Default::default() }, ) .unwrap(); - writer.write_batch(batch).unwrap(); + for batch in batches { + writer.write_batch(batch).unwrap(); + } writer.close().unwrap(); Bytes::from(writer.output().data.to_vec()) } @@ -456,6 +667,15 @@ mod tests { data: Bytes, read_fields: &[DataField], row_selection: Option>, + ) -> crate::Result> { + read_batches_with_predicates(data, read_fields, None, row_selection).await + } + + async fn read_batches_with_predicates( + data: Bytes, + read_fields: &[DataField], + predicates: Option<&FilePredicates>, + row_selection: Option>, ) -> crate::Result> { let file_size = data.len() as u64; MosaicFormatReader @@ -463,7 +683,7 @@ mod tests { Box::new(TestFileRead { data }), file_size, read_fields, - None, + predicates, None, row_selection, ) @@ -472,6 +692,96 @@ mod tests { .await } + fn collect_i32_column(batches: &[RecordBatch], column_index: usize) -> Vec { + batches + .iter() + .flat_map(|batch| { + batch + .column(column_index) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .to_vec() + }) + .collect() + } + + fn predicate_file_predicates( + fields: Vec, + predicates: Vec, + ) -> FilePredicates { + FilePredicates { + predicates, + file_fields: fields, + } + } + + fn multi_row_group_mosaic(stats_columns: Vec) -> Bytes { + write_mosaic_batches( + &[ + batch(vec![1, 2], vec!["a", "b"], vec![10, 20]), + batch(vec![10, 11], vec!["c", "d"], vec![30, 40]), + batch(vec![20, 21], vec!["e", "f"], vec![50, 60]), + ], + stats_columns, + ) + } + + fn timestamp_fields() -> Vec { + vec![ + DataField::new( + 0, + "ts".to_string(), + DataType::Timestamp(TimestampType::new(6).unwrap()), + ), + DataField::new(1, "id".to_string(), DataType::Int(IntType::new())), + ] + } + + fn timestamp_arrow_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new( + "ts", + ArrowDataType::Timestamp(TimeUnit::Microsecond, None), + true, + ), + Field::new("id", ArrowDataType::Int32, true), + ])) + } + + fn timestamp_batch(micros: Vec, ids: Vec) -> RecordBatch { + RecordBatch::try_new( + timestamp_arrow_schema(), + vec![ + Arc::new(TimestampMicrosecondArray::from(micros)), + Arc::new(Int32Array::from(ids)), + ], + ) + .unwrap() + } + + fn write_timestamp_mosaic_batches(batches: &[RecordBatch]) -> Bytes { + let out = MemOutputFile::new(); + let mut writer = MosaicWriter::new( + out, + batches[0].schema().as_ref(), + WriterOptions { + compression: COMPRESSION_NONE, + num_buckets: 1, + row_group_max_size: 1, + stats_columns: vec!["ts".to_string()], + ..Default::default() + }, + ) + .unwrap(); + for batch in batches { + writer.write_batch(batch).unwrap(); + } + writer.close().unwrap(); + Bytes::from(writer.output().data.to_vec()) + } + #[tokio::test] async fn test_read_basic_mosaic_file() { let data = write_mosaic(&sample_batch()); @@ -539,6 +849,170 @@ mod tests { assert_eq!(ids.values(), &[2, 3, 5]); } + #[tokio::test] + async fn test_read_predicate_prunes_non_matching_row_groups() { + let fields = data_fields(); + let builder = PredicateBuilder::new(&fields); + let predicates = predicate_file_predicates( + fields.clone(), + vec![builder.equal("id", Datum::Int(10)).unwrap()], + ); + let data = multi_row_group_mosaic(vec!["id".to_string()]); + let batches = read_batches_with_predicates(data, &fields, Some(&predicates), None) + .await + .unwrap(); + + assert_eq!(collect_i32_column(&batches, 0), vec![10, 11]); + } + + #[tokio::test] + async fn test_read_predicate_prunes_all_row_groups() { + let fields = data_fields(); + let builder = PredicateBuilder::new(&fields); + let predicates = predicate_file_predicates( + fields.clone(), + vec![builder.equal("id", Datum::Int(99)).unwrap()], + ); + let data = multi_row_group_mosaic(vec!["id".to_string()]); + let batches = read_batches_with_predicates(data, &fields, Some(&predicates), None) + .await + .unwrap(); + + assert!(batches.is_empty()); + } + + #[tokio::test] + async fn test_read_predicate_missing_stats_fails_open() { + let fields = data_fields(); + let builder = PredicateBuilder::new(&fields); + let predicates = predicate_file_predicates( + fields.clone(), + vec![builder.equal("id", Datum::Int(99)).unwrap()], + ); + let data = multi_row_group_mosaic(vec!["score".to_string()]); + let batches = read_batches_with_predicates(data, &fields, Some(&predicates), None) + .await + .unwrap(); + + assert_eq!(collect_i32_column(&batches, 0), vec![1, 2, 10, 11, 20, 21]); + } + + #[tokio::test] + async fn test_read_predicate_combines_with_row_selection() { + let fields = data_fields(); + let builder = PredicateBuilder::new(&fields); + let predicates = predicate_file_predicates( + fields.clone(), + vec![builder.equal("id", Datum::Int(20)).unwrap()], + ); + let data = multi_row_group_mosaic(vec!["id".to_string()]); + let batches = read_batches_with_predicates( + data, + &fields, + Some(&predicates), + Some(vec![RowRange::new(0, 4)]), + ) + .await + .unwrap(); + + assert_eq!(collect_i32_column(&batches, 0), vec![20]); + } + + #[tokio::test] + async fn test_read_predicate_filter_column_not_projected() { + let fields = data_fields(); + let builder = PredicateBuilder::new(&fields); + let predicates = predicate_file_predicates( + fields.clone(), + vec![builder.equal("id", Datum::Int(10)).unwrap()], + ); + let projected = vec![fields[2].clone()]; + let data = multi_row_group_mosaic(vec!["id".to_string()]); + let batches = read_batches_with_predicates(data, &projected, Some(&predicates), None) + .await + .unwrap(); + + assert_eq!(collect_i32_column(&batches, 0), vec![30, 40]); + } + + #[tokio::test] + async fn test_read_predicate_negative_timestamp_microseconds() { + let fields = timestamp_fields(); + let builder = PredicateBuilder::new(&fields); + let predicates = predicate_file_predicates( + fields.clone(), + vec![builder + .equal( + "ts", + Datum::Timestamp { + millis: -1, + nanos: 999_000, + }, + ) + .unwrap()], + ); + let data = write_timestamp_mosaic_batches(&[ + timestamp_batch(vec![-1, -1], vec![1, 2]), + timestamp_batch(vec![1_000], vec![3]), + ]); + let read_fields = vec![fields[1].clone()]; + let batches = read_batches_with_predicates(data, &read_fields, Some(&predicates), None) + .await + .unwrap(); + + assert_eq!(collect_i32_column(&batches, 0), vec![1, 2]); + } + + #[tokio::test] + async fn test_read_predicate_all_missing_projection_keeps_row_count() { + let fields = data_fields(); + let predicates = predicate_file_predicates(fields, vec![Predicate::AlwaysTrue]); + let projected = vec![field( + 3, + "new_score", + DataType::Int(IntType::with_nullable(true)), + )]; + let data = multi_row_group_mosaic(vec!["id".to_string()]); + let batches = read_batches_with_predicates(data, &projected, Some(&predicates), None) + .await + .unwrap(); + + assert_eq!(batches.iter().map(RecordBatch::num_rows).sum::(), 6); + assert!(batches.iter().all(|batch| batch.num_columns() == 0)); + } + + #[tokio::test] + async fn test_read_predicate_missing_column_false_prunes_all_row_groups() { + let fields = data_fields(); + let predicates = predicate_file_predicates(fields.clone(), vec![Predicate::AlwaysFalse]); + let projected = vec![ + fields[0].clone(), + field(3, "new_score", DataType::Int(IntType::with_nullable(true))), + ]; + let data = multi_row_group_mosaic(vec!["id".to_string()]); + let batches = read_batches_with_predicates(data, &projected, Some(&predicates), None) + .await + .unwrap(); + + assert!(batches.is_empty()); + } + + #[tokio::test] + async fn test_read_predicate_missing_column_is_null_keeps_row_groups() { + let fields = data_fields(); + let predicates = predicate_file_predicates(fields.clone(), vec![Predicate::AlwaysTrue]); + let projected = vec![ + fields[0].clone(), + field(3, "new_score", DataType::Int(IntType::with_nullable(true))), + ]; + let data = multi_row_group_mosaic(vec!["id".to_string()]); + let batches = read_batches_with_predicates(data, &projected, Some(&predicates), None) + .await + .unwrap(); + + assert_eq!(collect_i32_column(&batches, 0), vec![1, 2, 10, 11, 20, 21]); + } + #[tokio::test] async fn test_read_projection_with_missing_column() { let fields = data_fields(); diff --git a/crates/paimon/src/table/read_builder.rs b/crates/paimon/src/table/read_builder.rs index a884d3e7..f11acc76 100644 --- a/crates/paimon/src/table/read_builder.rs +++ b/crates/paimon/src/table/read_builder.rs @@ -150,12 +150,11 @@ impl<'a> ReadBuilder<'a> { /// Stats pruning is per file. Files with a different `schema_id`, /// incompatible stats layout, or inconclusive stats are kept. /// - /// [`TableRead`] may use supported non-partition data predicates on regular - /// Parquet and ORC read paths for conservative row-group pruning. Parquet - /// may also use native row filtering. Unsupported predicates, formats - /// without reader pruning, and data-evolution reads remain residual and - /// should still be applied by the caller if exact filtering semantics are - /// required. + /// [`TableRead`] may use supported non-partition data predicates on formats + /// with reader pruning for conservative row-group pruning. Parquet may also + /// use native row filtering. Unsupported predicates, formats without reader + /// pruning, and data-evolution reads remain residual and should still be + /// applied by the caller if exact filtering semantics are required. pub fn with_filter(&mut self, filter: Predicate) -> &mut Self { self.filter = normalize_filter(self.table, filter); self.try_extract_row_id_ranges();