Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 142 additions & 30 deletions crates/integration_tests/tests/read_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,21 @@ fn extract_id_name(batches: &[RecordBatch]) -> Vec<(i32, String)> {
rows
}

fn extract_ids(batches: &[RecordBatch]) -> Vec<i32> {
let mut ids = Vec::new();
for batch in batches {
let id = batch
.column_by_name("id")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.expect("id");
for i in 0..batch.num_rows() {
ids.push(id.value(i));
}
}
ids.sort();
ids
}

fn extract_id_name_dt(batches: &[RecordBatch]) -> Vec<(i32, String, String)> {
let mut rows = Vec::new();
for batch in batches {
Expand Down Expand Up @@ -1194,8 +1209,12 @@ fn assert_plan_has_multiple_schema_ids(plan: &Plan, table_name: &str) {
/// Old Parquet files lack the new column; newer ORC/Avro files contain it.
#[tokio::test]
async fn test_read_format_schema_evolution_add_column() {
use paimon::spec::{Datum, PredicateBuilder};

let table_name = "format_schema_evolution_add_column";
let (plan, batches) = scan_and_read_with_fs_catalog(table_name, None).await;
let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog, table_name).await;
let (plan, batches) = scan_and_read(&catalog, table_name, None).await;
assert_plan_file_formats(&plan, &["avro", "orc", "parquet"], table_name);
assert_plan_has_multiple_schema_ids(&plan, table_name);

Expand Down Expand Up @@ -1235,14 +1254,39 @@ async fn test_read_format_schema_evolution_add_column() {
],
"Old Parquet rows should have null age and new ORC/Avro rows should keep age values"
);

let pb = PredicateBuilder::new(table.schema().fields());
let filter = pb
.equal("age", Datum::Int(30))
.expect("Failed to build predicate");
let (_, filtered_batches) =
scan_and_read_with_projection_and_filter(&table, Some(&["id", "name"]), filter).await;
assert_eq!(
extract_id_name(&filtered_batches),
vec![(3, "carol".to_string())],
"Projection plus age filter should return only the matching row"
);

let filter = pb.is_null("age").expect("Failed to build predicate");
let (_, filtered_batches) =
scan_and_read_with_projection_and_filter(&table, Some(&["id"]), filter).await;
assert_eq!(
extract_ids(&filtered_batches),
vec![1, 2],
"Projection plus age IS NULL should return rows with null added-column values"
);
}

/// Test reading mixed-format files after ALTER TABLE ALTER COLUMN TYPE (INT -> BIGINT).
/// Old Parquet files have INT; newer ORC/Avro files have BIGINT.
#[tokio::test]
async fn test_read_format_schema_evolution_type_promotion() {
use paimon::spec::{Datum, PredicateBuilder};

let table_name = "format_schema_evolution_type_promotion";
let (plan, batches) = scan_and_read_with_fs_catalog(table_name, None).await;
let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog, table_name).await;
let (plan, batches) = scan_and_read(&catalog, table_name, None).await;
assert_plan_file_formats(&plan, &["avro", "orc", "parquet"], table_name);
assert_plan_has_multiple_schema_ids(&plan, table_name);

Expand Down Expand Up @@ -1283,6 +1327,18 @@ async fn test_read_format_schema_evolution_type_promotion() {
],
"Old Parquet INT rows should be cast to BIGINT and new ORC/Avro BIGINT rows should match"
);

let pb = PredicateBuilder::new(table.schema().fields());
let filter = pb
.greater_than("value", Datum::Long(250))
.expect("Failed to build predicate");
let (_, filtered_batches) =
scan_and_read_with_projection_and_filter(&table, Some(&["id"]), filter).await;
assert_eq!(
extract_ids(&filtered_batches),
vec![3, 4, 5, 6],
"Projection plus promoted BIGINT filter should return matching promoted values"
);
}

/// Stats pruning should treat a newly added column as all-NULL for old files.
Expand Down Expand Up @@ -1516,14 +1572,14 @@ async fn test_read_schema_evolution_drop_column() {
/// Old files have the old physical field name; reader should map by field id.
#[tokio::test]
async fn test_read_schema_evolution_rename_column() {
let (plan, batches) =
scan_and_read_with_fs_catalog("schema_evolution_rename_column", None).await;
use paimon::spec::{Datum, PredicateBuilder};

assert_plan_file_formats(
&plan,
&["avro", "orc", "parquet"],
"schema_evolution_rename_column",
);
let catalog = create_file_system_catalog();
let table_name = "schema_evolution_rename_column";
let table = get_table_from_catalog(&catalog, table_name).await;
let (plan, batches) = scan_and_read(&catalog, table_name, None).await;

assert_plan_file_formats(&plan, &["avro", "orc", "parquet"], table_name);

let mut rows: Vec<(i32, String)> = Vec::new();
for batch in &batches {
Expand Down Expand Up @@ -1558,8 +1614,7 @@ async fn test_read_schema_evolution_rename_column() {
);

let (_, projected_batches) =
scan_and_read_with_fs_catalog("schema_evolution_rename_column", Some(&["renamed_payload"]))
.await;
scan_and_read(&catalog, table_name, Some(&["renamed_payload"])).await;
let mut projected_values = Vec::new();
for batch in &projected_batches {
assert_eq!(
Expand All @@ -1586,14 +1641,30 @@ async fn test_read_schema_evolution_rename_column() {
],
"Projection on renamed column should still use field-id mapping"
);

let pb = PredicateBuilder::new(table.schema().fields());
let filter = pb
.equal("renamed_payload", Datum::String("parquet-old".into()))
.expect("Failed to build predicate");
let (_, filtered_batches) =
scan_and_read_with_projection_and_filter(&table, Some(&["id"]), filter).await;
assert_eq!(
extract_ids(&filtered_batches),
vec![1],
"Projection plus filter on renamed column should map old Parquet field ids"
);
}

/// Test reading a mixed-format table after ALTER TABLE DROP COLUMN.
/// Old Parquet/ORC data files have the dropped column; new Avro files do not.
#[tokio::test]
async fn test_read_mixed_format_schema_evolution_drop_column() {
use paimon::spec::{Datum, PredicateBuilder};

let table_name = "mixed_format_schema_evolution_drop_column";
let (plan, batches) = scan_and_read_with_fs_catalog(table_name, None).await;
let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog, table_name).await;
let (plan, batches) = scan_and_read(&catalog, table_name, None).await;
assert_plan_file_formats(&plan, &["avro", "orc", "parquet"], table_name);

for batch in &batches {
Expand Down Expand Up @@ -1632,11 +1703,7 @@ async fn test_read_mixed_format_schema_evolution_drop_column() {
"Mixed-format DROP COLUMN should expose only remaining columns from all file formats"
);

let (_, projected_batches) = scan_and_read_with_fs_catalog(
"mixed_format_schema_evolution_drop_column",
Some(&["name", "id"]),
)
.await;
let (_, projected_batches) = scan_and_read(&catalog, table_name, Some(&["name", "id"])).await;

let mut projected_rows: Vec<(i32, String)> = Vec::new();
for batch in &projected_batches {
Expand Down Expand Up @@ -1678,21 +1745,32 @@ async fn test_read_mixed_format_schema_evolution_drop_column() {
],
"Projection should read remaining columns across old and new file schemas"
);

let pb = PredicateBuilder::new(table.schema().fields());
let filter = pb
.equal("name", Datum::String("orc-carol".into()))
.expect("Failed to build predicate");
let (_, filtered_batches) =
scan_and_read_with_projection_and_filter(&table, Some(&["id"]), filter).await;
assert_eq!(
extract_ids(&filtered_batches),
vec![3],
"Projection plus filter should read remaining columns after DROP COLUMN"
);
}

/// Test reading a mixed-format table after ALTER COLUMN ... FIRST/AFTER.
/// Old files keep the original physical column order; new files use moved columns.
#[tokio::test]
async fn test_read_mixed_format_schema_evolution_reorder_move_column() {
let (plan, batches) =
scan_and_read_with_fs_catalog("mixed_format_schema_evolution_reorder_move_column", None)
.await;
use paimon::spec::{Datum, PredicateBuilder};

assert_plan_file_formats(
&plan,
&["avro", "orc", "parquet"],
"mixed_format_schema_evolution_reorder_move_column",
);
let table_name = "mixed_format_schema_evolution_reorder_move_column";
let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog, table_name).await;
let (plan, batches) = scan_and_read(&catalog, table_name, None).await;

assert_plan_file_formats(&plan, &["avro", "orc", "parquet"], table_name);

for batch in &batches {
let schema = batch.schema();
Expand Down Expand Up @@ -1741,11 +1819,8 @@ async fn test_read_mixed_format_schema_evolution_reorder_move_column() {
"Mixed-format REORDER/MOVE COLUMN should read values by field id, not physical position"
);

let (_, projected_batches) = scan_and_read_with_fs_catalog(
"mixed_format_schema_evolution_reorder_move_column",
Some(&["id", "right_value"]),
)
.await;
let (_, projected_batches) =
scan_and_read(&catalog, table_name, Some(&["id", "right_value"])).await;
let mut projected_rows: Vec<(i32, String)> = Vec::new();
for batch in &projected_batches {
let schema = batch.schema();
Expand Down Expand Up @@ -1781,6 +1856,43 @@ async fn test_read_mixed_format_schema_evolution_reorder_move_column() {
],
"Projection should still map reordered old and new files by field id"
);

let pb = PredicateBuilder::new(table.schema().fields());
let filter = pb
.equal("left_value", Datum::String("orc-left-3".into()))
.expect("Failed to build predicate");
let (_, filtered_batches) =
scan_and_read_with_projection_and_filter(&table, Some(&["right_value", "id"]), filter)
.await;

let mut filtered_rows: Vec<(i32, String)> = Vec::new();
for batch in &filtered_batches {
let schema = batch.schema();
let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(
field_names,
vec!["right_value", "id"],
"Projection plus filter should preserve caller-specified order"
);

let right_value = batch
.column_by_name("right_value")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.expect("filtered right_value");
let id = batch
.column_by_name("id")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.expect("filtered id");
for i in 0..batch.num_rows() {
filtered_rows.push((id.value(i), right_value.value(i).to_string()));
}
}
filtered_rows.sort_by_key(|(id, _)| *id);
assert_eq!(
filtered_rows,
vec![(3, "orc-right-3".into())],
"Projection plus filter should map reordered fields by id"
);
}

// ---------------------------------------------------------------------------
Expand Down
Loading