From 8ee91ec185e197ec164c637567f36066c54f95a3 Mon Sep 17 00:00:00 2001 From: liujiwen-up Date: Tue, 16 Jun 2026 23:44:28 +0800 Subject: [PATCH] test: cover filters for mixed-format schema evolution --- crates/integration_tests/tests/read_tables.rs | 172 +++++++++++++++--- 1 file changed, 142 insertions(+), 30 deletions(-) diff --git a/crates/integration_tests/tests/read_tables.rs b/crates/integration_tests/tests/read_tables.rs index 5d2d4db9..90f16cb5 100644 --- a/crates/integration_tests/tests/read_tables.rs +++ b/crates/integration_tests/tests/read_tables.rs @@ -144,6 +144,21 @@ fn extract_id_name(batches: &[RecordBatch]) -> Vec<(i32, String)> { rows } +fn extract_ids(batches: &[RecordBatch]) -> Vec { + let mut ids = Vec::new(); + for batch in batches { + let id = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("id"); + for i in 0..batch.num_rows() { + ids.push(id.value(i)); + } + } + ids.sort(); + ids +} + fn extract_id_name_dt(batches: &[RecordBatch]) -> Vec<(i32, String, String)> { let mut rows = Vec::new(); for batch in batches { @@ -1194,8 +1209,12 @@ fn assert_plan_has_multiple_schema_ids(plan: &Plan, table_name: &str) { /// Old Parquet files lack the new column; newer ORC/Avro files contain it. #[tokio::test] async fn test_read_format_schema_evolution_add_column() { + use paimon::spec::{Datum, PredicateBuilder}; + let table_name = "format_schema_evolution_add_column"; - let (plan, batches) = scan_and_read_with_fs_catalog(table_name, None).await; + let catalog = create_file_system_catalog(); + let table = get_table_from_catalog(&catalog, table_name).await; + let (plan, batches) = scan_and_read(&catalog, table_name, None).await; assert_plan_file_formats(&plan, &["avro", "orc", "parquet"], table_name); assert_plan_has_multiple_schema_ids(&plan, table_name); @@ -1235,14 +1254,39 @@ async fn test_read_format_schema_evolution_add_column() { ], "Old Parquet rows should have null age and new ORC/Avro rows should keep age values" ); + + let pb = PredicateBuilder::new(table.schema().fields()); + let filter = pb + .equal("age", Datum::Int(30)) + .expect("Failed to build predicate"); + let (_, filtered_batches) = + scan_and_read_with_projection_and_filter(&table, Some(&["id", "name"]), filter).await; + assert_eq!( + extract_id_name(&filtered_batches), + vec![(3, "carol".to_string())], + "Projection plus age filter should return only the matching row" + ); + + let filter = pb.is_null("age").expect("Failed to build predicate"); + let (_, filtered_batches) = + scan_and_read_with_projection_and_filter(&table, Some(&["id"]), filter).await; + assert_eq!( + extract_ids(&filtered_batches), + vec![1, 2], + "Projection plus age IS NULL should return rows with null added-column values" + ); } /// Test reading mixed-format files after ALTER TABLE ALTER COLUMN TYPE (INT -> BIGINT). /// Old Parquet files have INT; newer ORC/Avro files have BIGINT. #[tokio::test] async fn test_read_format_schema_evolution_type_promotion() { + use paimon::spec::{Datum, PredicateBuilder}; + let table_name = "format_schema_evolution_type_promotion"; - let (plan, batches) = scan_and_read_with_fs_catalog(table_name, None).await; + let catalog = create_file_system_catalog(); + let table = get_table_from_catalog(&catalog, table_name).await; + let (plan, batches) = scan_and_read(&catalog, table_name, None).await; assert_plan_file_formats(&plan, &["avro", "orc", "parquet"], table_name); assert_plan_has_multiple_schema_ids(&plan, table_name); @@ -1283,6 +1327,18 @@ async fn test_read_format_schema_evolution_type_promotion() { ], "Old Parquet INT rows should be cast to BIGINT and new ORC/Avro BIGINT rows should match" ); + + let pb = PredicateBuilder::new(table.schema().fields()); + let filter = pb + .greater_than("value", Datum::Long(250)) + .expect("Failed to build predicate"); + let (_, filtered_batches) = + scan_and_read_with_projection_and_filter(&table, Some(&["id"]), filter).await; + assert_eq!( + extract_ids(&filtered_batches), + vec![3, 4, 5, 6], + "Projection plus promoted BIGINT filter should return matching promoted values" + ); } /// Stats pruning should treat a newly added column as all-NULL for old files. @@ -1516,14 +1572,14 @@ async fn test_read_schema_evolution_drop_column() { /// Old files have the old physical field name; reader should map by field id. #[tokio::test] async fn test_read_schema_evolution_rename_column() { - let (plan, batches) = - scan_and_read_with_fs_catalog("schema_evolution_rename_column", None).await; + use paimon::spec::{Datum, PredicateBuilder}; - assert_plan_file_formats( - &plan, - &["avro", "orc", "parquet"], - "schema_evolution_rename_column", - ); + let catalog = create_file_system_catalog(); + let table_name = "schema_evolution_rename_column"; + let table = get_table_from_catalog(&catalog, table_name).await; + let (plan, batches) = scan_and_read(&catalog, table_name, None).await; + + assert_plan_file_formats(&plan, &["avro", "orc", "parquet"], table_name); let mut rows: Vec<(i32, String)> = Vec::new(); for batch in &batches { @@ -1558,8 +1614,7 @@ async fn test_read_schema_evolution_rename_column() { ); let (_, projected_batches) = - scan_and_read_with_fs_catalog("schema_evolution_rename_column", Some(&["renamed_payload"])) - .await; + scan_and_read(&catalog, table_name, Some(&["renamed_payload"])).await; let mut projected_values = Vec::new(); for batch in &projected_batches { assert_eq!( @@ -1586,14 +1641,30 @@ async fn test_read_schema_evolution_rename_column() { ], "Projection on renamed column should still use field-id mapping" ); + + let pb = PredicateBuilder::new(table.schema().fields()); + let filter = pb + .equal("renamed_payload", Datum::String("parquet-old".into())) + .expect("Failed to build predicate"); + let (_, filtered_batches) = + scan_and_read_with_projection_and_filter(&table, Some(&["id"]), filter).await; + assert_eq!( + extract_ids(&filtered_batches), + vec![1], + "Projection plus filter on renamed column should map old Parquet field ids" + ); } /// Test reading a mixed-format table after ALTER TABLE DROP COLUMN. /// Old Parquet/ORC data files have the dropped column; new Avro files do not. #[tokio::test] async fn test_read_mixed_format_schema_evolution_drop_column() { + use paimon::spec::{Datum, PredicateBuilder}; + let table_name = "mixed_format_schema_evolution_drop_column"; - let (plan, batches) = scan_and_read_with_fs_catalog(table_name, None).await; + let catalog = create_file_system_catalog(); + let table = get_table_from_catalog(&catalog, table_name).await; + let (plan, batches) = scan_and_read(&catalog, table_name, None).await; assert_plan_file_formats(&plan, &["avro", "orc", "parquet"], table_name); for batch in &batches { @@ -1632,11 +1703,7 @@ async fn test_read_mixed_format_schema_evolution_drop_column() { "Mixed-format DROP COLUMN should expose only remaining columns from all file formats" ); - let (_, projected_batches) = scan_and_read_with_fs_catalog( - "mixed_format_schema_evolution_drop_column", - Some(&["name", "id"]), - ) - .await; + let (_, projected_batches) = scan_and_read(&catalog, table_name, Some(&["name", "id"])).await; let mut projected_rows: Vec<(i32, String)> = Vec::new(); for batch in &projected_batches { @@ -1678,21 +1745,32 @@ async fn test_read_mixed_format_schema_evolution_drop_column() { ], "Projection should read remaining columns across old and new file schemas" ); + + let pb = PredicateBuilder::new(table.schema().fields()); + let filter = pb + .equal("name", Datum::String("orc-carol".into())) + .expect("Failed to build predicate"); + let (_, filtered_batches) = + scan_and_read_with_projection_and_filter(&table, Some(&["id"]), filter).await; + assert_eq!( + extract_ids(&filtered_batches), + vec![3], + "Projection plus filter should read remaining columns after DROP COLUMN" + ); } /// Test reading a mixed-format table after ALTER COLUMN ... FIRST/AFTER. /// Old files keep the original physical column order; new files use moved columns. #[tokio::test] async fn test_read_mixed_format_schema_evolution_reorder_move_column() { - let (plan, batches) = - scan_and_read_with_fs_catalog("mixed_format_schema_evolution_reorder_move_column", None) - .await; + use paimon::spec::{Datum, PredicateBuilder}; - assert_plan_file_formats( - &plan, - &["avro", "orc", "parquet"], - "mixed_format_schema_evolution_reorder_move_column", - ); + let table_name = "mixed_format_schema_evolution_reorder_move_column"; + let catalog = create_file_system_catalog(); + let table = get_table_from_catalog(&catalog, table_name).await; + let (plan, batches) = scan_and_read(&catalog, table_name, None).await; + + assert_plan_file_formats(&plan, &["avro", "orc", "parquet"], table_name); for batch in &batches { let schema = batch.schema(); @@ -1741,11 +1819,8 @@ async fn test_read_mixed_format_schema_evolution_reorder_move_column() { "Mixed-format REORDER/MOVE COLUMN should read values by field id, not physical position" ); - let (_, projected_batches) = scan_and_read_with_fs_catalog( - "mixed_format_schema_evolution_reorder_move_column", - Some(&["id", "right_value"]), - ) - .await; + let (_, projected_batches) = + scan_and_read(&catalog, table_name, Some(&["id", "right_value"])).await; let mut projected_rows: Vec<(i32, String)> = Vec::new(); for batch in &projected_batches { let schema = batch.schema(); @@ -1781,6 +1856,43 @@ async fn test_read_mixed_format_schema_evolution_reorder_move_column() { ], "Projection should still map reordered old and new files by field id" ); + + let pb = PredicateBuilder::new(table.schema().fields()); + let filter = pb + .equal("left_value", Datum::String("orc-left-3".into())) + .expect("Failed to build predicate"); + let (_, filtered_batches) = + scan_and_read_with_projection_and_filter(&table, Some(&["right_value", "id"]), filter) + .await; + + let mut filtered_rows: Vec<(i32, String)> = Vec::new(); + for batch in &filtered_batches { + let schema = batch.schema(); + let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert_eq!( + field_names, + vec!["right_value", "id"], + "Projection plus filter should preserve caller-specified order" + ); + + let right_value = batch + .column_by_name("right_value") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("filtered right_value"); + let id = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("filtered id"); + for i in 0..batch.num_rows() { + filtered_rows.push((id.value(i), right_value.value(i).to_string())); + } + } + filtered_rows.sort_by_key(|(id, _)| *id); + assert_eq!( + filtered_rows, + vec![(3, "orc-right-3".into())], + "Projection plus filter should map reordered fields by id" + ); } // ---------------------------------------------------------------------------