Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 161 additions & 51 deletions crates/paimon/src/arrow/format/mosaic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
use paimon_mosaic_core::reader::{InputFile, MosaicReader, ReaderAccess};
use std::collections::HashSet;
use std::io;

pub(crate) struct MosaicFormatReader;
Expand All @@ -46,16 +47,28 @@ impl FormatFileReader for MosaicFormatReader {
row_selection: Option<Vec<RowRange>>,
) -> crate::Result<ArrowRecordBatchStream> {
// Mosaic predicates are currently residual; callers must re-check them for exact filtering.
let target_schema = build_target_arrow_schema(read_fields)?;
validate_mosaic_schema(&target_schema)?;

let file_bytes = reader.read(0..file_size).await?;
let mosaic_reader = MosaicReader::new(MemoryInputFile::new(file_bytes), file_size)
.map_err(mosaic_read_error)?;
let projected_names = read_fields

let file_column_names = mosaic_reader
.schema()
.columns
.iter()
.map(|column| column.name.as_str())
.collect::<HashSet<_>>();
let existing_read_fields = read_fields
.iter()
.filter(|field| file_column_names.contains(field.name()))
.cloned()
.collect::<Vec<_>>();
let read_schema = build_target_arrow_schema(&existing_read_fields)?;
validate_mosaic_schema(&read_schema)?;
let projected_names = existing_read_fields
.iter()
.map(|field| field.name().to_string())
.collect::<Vec<_>>();
let all_projected_columns_missing = !read_fields.is_empty() && projected_names.is_empty();
let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE);

Ok(try_stream! {
Expand All @@ -82,24 +95,25 @@ impl FormatFileReader for MosaicFormatReader {
}
}

let mut row_group_reader = if projected_names.is_empty() {
mosaic_reader
.row_group_reader_by_names(row_group_index, &[])
.map_err(mosaic_read_error)?
let batch = if all_projected_columns_missing {
let row_count = selected_indices
.as_ref()
.map_or(row_group_rows, UInt64Array::len);
empty_batch(read_schema.clone(), row_count)?
} else {
let names = projected_names
.iter()
.map(String::as_str)
.collect::<Vec<_>>();
mosaic_reader
let mut row_group_reader = mosaic_reader
.row_group_reader_by_names(row_group_index, &names)
.map_err(mosaic_read_error)?
};
.map_err(mosaic_read_error)?;

let batch = row_group_reader
.read_columns()
.map_err(mosaic_read_error)?;
let batch = take_rows(batch, selected_indices.as_ref(), &target_schema)?;
let batch = row_group_reader
.read_columns()
.map_err(mosaic_read_error)?;
take_rows(batch, selected_indices.as_ref(), &read_schema)?
};
for chunk in split_batch(batch, batch_size) {
yield chunk;
}
Expand Down Expand Up @@ -251,15 +265,7 @@ fn take_rows(
};

if batch.num_columns() == 0 {
return RecordBatch::try_new_with_options(
target_schema.clone(),
Vec::new(),
&RecordBatchOptions::new().with_row_count(Some(indices.len())),
)
.map_err(|e| Error::UnexpectedError {
message: format!("Failed to build empty Mosaic RecordBatch: {e}"),
source: Some(Box::new(e)),
});
return empty_batch(target_schema.clone(), indices.len());
}

let columns = batch
Expand Down Expand Up @@ -287,15 +293,7 @@ fn ensure_schema(batch: RecordBatch, target_schema: &SchemaRef) -> crate::Result
}

if batch.num_columns() == 0 {
return RecordBatch::try_new_with_options(
target_schema.clone(),
Vec::new(),
&RecordBatchOptions::new().with_row_count(Some(batch.num_rows())),
)
.map_err(|e| Error::UnexpectedError {
message: format!("Failed to build empty Mosaic RecordBatch: {e}"),
source: Some(Box::new(e)),
});
return empty_batch(target_schema.clone(), batch.num_rows());
}

RecordBatch::try_new(target_schema.clone(), batch.columns().to_vec()).map_err(|e| {
Expand All @@ -306,6 +304,18 @@ fn ensure_schema(batch: RecordBatch, target_schema: &SchemaRef) -> crate::Result
})
}

fn empty_batch(schema: SchemaRef, row_count: usize) -> crate::Result<RecordBatch> {
RecordBatch::try_new_with_options(
schema,
Vec::new(),
&RecordBatchOptions::new().with_row_count(Some(row_count)),
)
.map_err(|e| Error::UnexpectedError {
message: format!("Failed to build empty Mosaic RecordBatch: {e}"),
source: Some(Box::new(e)),
})
}

fn split_batch(batch: RecordBatch, batch_size: usize) -> Vec<RecordBatch> {
if batch_size == 0 || batch.num_rows() <= batch_size {
return vec![batch];
Expand Down Expand Up @@ -400,6 +410,10 @@ mod tests {
]
}

fn field(id: i32, name: &str, data_type: DataType) -> DataField {
DataField::new(id, name.to_string(), data_type)
}

fn arrow_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("id", ArrowDataType::Int32, false),
Expand Down Expand Up @@ -526,32 +540,128 @@ mod tests {
}

#[tokio::test]
async fn test_unsupported_type_returns_error() {
let unsupported = vec![DataField::new(
async fn test_read_projection_with_missing_column() {
let fields = data_fields();
let projected = vec![
fields[0].clone(),
field(3, "new_score", DataType::Int(IntType::with_nullable(true))),
fields[1].clone(),
];
let data = write_mosaic(&sample_batch());
let batches = read_batches(data, &projected, None).await.unwrap();

assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 5);
assert_eq!(batches[0].num_columns(), 2);
assert_eq!(batches[0].schema().field(0).name(), "id");
assert_eq!(batches[0].schema().field(1).name(), "name");
let ids = batches[0]
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(ids.values(), &[1, 2, 3, 4, 5]);
}

#[tokio::test]
async fn test_read_projection_with_missing_unsupported_column() {
let fields = data_fields();
let projected = vec![
fields[0].clone(),
field(
3,
"new_items",
DataType::Array(ArrayType::new(DataType::Int(IntType::new()))),
),
];
let data = write_mosaic(&sample_batch());
let batches = read_batches(data, &projected, None).await.unwrap();

assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 5);
assert_eq!(batches[0].num_columns(), 1);
assert_eq!(batches[0].schema().field(0).name(), "id");
}

#[tokio::test]
async fn test_read_projection_with_existing_unsupported_column_returns_error() {
let projected = vec![field(
0,
"items".to_string(),
"id",
DataType::Array(ArrayType::new(DataType::Int(IntType::new()))),
)];
let result = MosaicFormatReader
.read_batch_stream(
Box::new(TestFileRead { data: Bytes::new() }),
0,
&unsupported,
None,
None,
None,
)
.await;
let err = match result {
Ok(_) => panic!("expected unsupported Mosaic type error"),
Err(err) => err,
};
let data = write_mosaic(&sample_batch());
let err = read_batches(data, &projected, None).await.unwrap_err();

assert!(
matches!(err, Error::Unsupported { message } if message.contains("Mosaic format does not support column 'items'"))
matches!(err, Error::Unsupported { message } if message.contains("Mosaic format does not support column 'id'"))
);
}

#[tokio::test]
async fn test_read_projection_all_columns_missing() {
let projected = vec![
field(3, "new_score", DataType::Int(IntType::with_nullable(true))),
field(
4,
"new_name",
DataType::VarChar(VarCharType::with_nullable(true, 20).unwrap()),
),
];
let data = write_mosaic(&sample_batch());
let batches = read_batches(data, &projected, None).await.unwrap();

assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 5);
assert_eq!(batches[0].num_columns(), 0);
assert!(batches[0].schema().fields().is_empty());
}

#[tokio::test]
async fn test_read_projection_all_columns_missing_with_row_selection() {
let projected = vec![field(
3,
"new_score",
DataType::Int(IntType::with_nullable(true)),
)];
let data = write_mosaic(&sample_batch());
let batches = read_batches(data, &projected, Some(vec![RowRange::new(1, 3)]))
.await
.unwrap();

assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 3);
assert_eq!(batches[0].num_columns(), 0);
}

#[tokio::test]
async fn test_read_projection_with_missing_column_and_row_selection() {
let fields = data_fields();
let projected = vec![
fields[2].clone(),
field(3, "new_id", DataType::Int(IntType::with_nullable(true))),
];
let data = write_mosaic(&sample_batch());
let batches = read_batches(
data,
&projected,
Some(vec![RowRange::new(0, 1), RowRange::new(4, 4)]),
)
.await
.unwrap();

assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 3);
assert_eq!(batches[0].num_columns(), 1);
assert_eq!(batches[0].schema().field(0).name(), "score");
let scores = batches[0]
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(scores.values(), &[10, 20, 50]);
}

#[test]
fn test_validate_row_type_as_unsupported() {
let unsupported = vec![DataField::new(
Expand Down
Loading
Loading