diff --git a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp index 55ae930c7f01..81e91a1df7db 100644 --- a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp +++ b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -59,7 +60,8 @@ ASTPtr rewriteSelectQuery( const ASTPtr & query, const std::string & remote_database, const std::string & remote_table, - ASTPtr table_function_ptr) + ASTPtr table_function_ptr, + ASTPtr additional_filter) { auto modified_query_ast = query->clone(); @@ -72,6 +74,17 @@ ASTPtr rewriteSelectQuery( if (!context->getSettingsRef()[Setting::allow_experimental_analyzer]) { + if (additional_filter) + { + if (select_query.where()) + select_query.setExpression( + ASTSelectQuery::Expression::WHERE, + makeASTFunction("and", select_query.where(), additional_filter->clone())); + else + select_query.setExpression( + ASTSelectQuery::Expression::WHERE, additional_filter->clone()); + } + if (table_function_ptr) select_query.addTableFunction(table_function_ptr); else diff --git a/src/Interpreters/ClusterProxy/SelectStreamFactory.h b/src/Interpreters/ClusterProxy/SelectStreamFactory.h index 0b6246c31f94..413688504e92 100644 --- a/src/Interpreters/ClusterProxy/SelectStreamFactory.h +++ b/src/Interpreters/ClusterProxy/SelectStreamFactory.h @@ -41,7 +41,8 @@ ASTPtr rewriteSelectQuery( const ASTPtr & query, const std::string & remote_database, const std::string & remote_table, - ASTPtr table_function_ptr = nullptr); + ASTPtr table_function_ptr = nullptr, + ASTPtr additional_filter = nullptr); using ColumnsDescriptionByShardNum = std::unordered_map; using AdditionalShardFilterGenerator = std::function; diff --git a/src/Storages/HybridSegmentPruner.cpp b/src/Storages/HybridSegmentPruner.cpp new file mode 100644 index 000000000000..1715469204b5 --- /dev/null +++ b/src/Storages/HybridSegmentPruner.cpp @@ -0,0 +1,118 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace +{ + +ASTPtr makeIdentityKeyAST(const Names & column_names) +{ + auto key_ast = make_intrusive(); + key_ast->name = "tuple"; + key_ast->arguments = make_intrusive(); + key_ast->children.push_back(key_ast->arguments); + for (const auto & name : column_names) + key_ast->arguments->children.push_back(make_intrusive(name)); + return key_ast; +} + +NamesAndTypesList filterComparable(const NamesAndTypesList & in) +{ + NamesAndTypesList out; + for (const auto & c : in) + if (c.type && c.type->isComparable()) + out.push_back(c); + return out; +} + +KeyDescription buildIdentityKey(const NamesAndTypesList & comparable_cols, ContextPtr context) +{ + Names names; + names.reserve(comparable_cols.size()); + for (const auto & c : comparable_cols) + names.push_back(c.name); + return KeyDescription::getKeyFromAST( + makeIdentityKeyAST(names), + ColumnsDescription{comparable_cols}, + VirtualColumnsDescription{}, + context); +} + +NamesAndTypesList namesAndTypesFromKey(const KeyDescription & key) +{ + NamesAndTypesList out; + for (size_t i = 0; i < key.column_names.size(); ++i) + out.emplace_back(key.column_names[i], key.data_types[i]); + return out; +} + +} + +HybridSegmentPruner::HybridSegmentPruner( + const ActionsDAGWithInversionPushDown & filter_dag, + const NamesAndTypesList & hybrid_columns, + ContextPtr context_) + : identity_key(buildIdentityKey(filterComparable(hybrid_columns), context_)) + , user_condition(filter_dag, context_, + identity_key.column_names, identity_key.expression, + /*single_point=*/ false) + , context(std::move(context_)) +{ + useless = identity_key.column_names.empty() || user_condition.alwaysUnknownOrTrue(); +} + +bool HybridSegmentPruner::canBePruned(const ASTPtr & substituted_segment_predicate) const +{ + if (useless || !substituted_segment_predicate) + return false; + + auto segment_ast = substituted_segment_predicate->clone(); + auto sample = namesAndTypesFromKey(identity_key); + auto syntax_result = TreeRewriter(context).analyze(segment_ast, sample); + auto segment_dag = ExpressionAnalyzer(segment_ast, syntax_result, context).getActionsDAG(true); + ActionsDAGWithInversionPushDown segment_filter(segment_dag.getOutputs().at(0), context); + + KeyCondition segment_condition( + segment_filter, context, + identity_key.column_names, identity_key.expression, + /*single_point=*/ false); + + Hyperrectangle rect; + rect.reserve(identity_key.column_names.size()); + + for (size_t i = 0; i < identity_key.column_names.size(); ++i) + { + Ranges col_ranges; + if (!segment_condition.extractPlainRangesForColumn(i, col_ranges)) + { + rect.push_back(Range::createWholeUniverse()); + continue; + } + + if (col_ranges.empty()) + return true; + + if (col_ranges.size() != 1) + { + rect.push_back(Range::createWholeUniverse()); + continue; + } + + rect.push_back(col_ranges.front()); + } + + return !user_condition.checkInHyperrectangle(rect, identity_key.data_types).can_be_true; +} + +} diff --git a/src/Storages/HybridSegmentPruner.h b/src/Storages/HybridSegmentPruner.h new file mode 100644 index 000000000000..7b6db05c9871 --- /dev/null +++ b/src/Storages/HybridSegmentPruner.h @@ -0,0 +1,47 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace DB +{ + +/// Hybrid-segment pruner, modeled after PartitionPruner / Iceberg::ManifestFilesPruner / +/// Paimon::PartitionPruner. +/// +/// Build one KeyCondition over the user filter (PREWHERE+WHERE represented as an +/// ActionsDAG) using all comparable Hybrid columns as the key. For each segment, build +/// a second KeyCondition from its (already watermark-substituted) predicate AST and +/// use `KeyCondition::extractPlainRangesForColumn` to obtain a Hyperrectangle (fail-open +/// to whole-universe per column when extraction is ambiguous). Then ask +/// `KeyCondition::checkInHyperrectangle(rect, types).can_be_true`. The segment can be +/// pruned iff the answer is false. +/// +/// canBePruned() returns true only when (user_filter AND segment_predicate) is provably +/// empty. It returns false in all other cases — unsupported segment shapes, missing user +/// filter, exceptions — so the caller falls back to scanning the segment normally. +class HybridSegmentPruner +{ +public: + HybridSegmentPruner( + const ActionsDAGWithInversionPushDown & filter_dag, + const NamesAndTypesList & hybrid_columns, + ContextPtr context); + + bool canBePruned(const ASTPtr & substituted_segment_predicate) const; + + /// True if the user filter is unrecognizable / always-true on the Hybrid key columns: + /// no segment can ever be pruned, so callers can short-circuit. + bool isUseless() const { return useless; } + +private: + KeyDescription identity_key; + KeyCondition user_condition; + ContextPtr context; + bool useless = false; +}; + +} diff --git a/src/Storages/MergeTree/KeyCondition.cpp b/src/Storages/MergeTree/KeyCondition.cpp index e4336674f66f..aa4cf9324e13 100644 --- a/src/Storages/MergeTree/KeyCondition.cpp +++ b/src/Storages/MergeTree/KeyCondition.cpp @@ -4158,10 +4158,18 @@ bool KeyCondition::extractPlainRanges(Ranges & ranges) const if (key_columns.size() != 1) return false; + return extractPlainRangesForColumn(0, ranges); +} + +bool KeyCondition::extractPlainRangesForColumn(size_t column_index, Ranges & ranges) const +{ + if (column_index >= key_columns.size()) + return false; + if (hasMonotonicFunctionsChain()) return false; - /// All Ranges in rpn_stack is plain. + /// All Ranges in rpn_stack are plain. std::stack rpn_stack; for (const auto & element : rpn) @@ -4212,14 +4220,31 @@ bool KeyCondition::extractPlainRanges(Ranges & ranges) const { if (element.function == RPNElement::FUNCTION_IN_RANGE) { - rpn_stack.push(PlainRanges(element.range)); + if (element.getKeyColumn() != column_index) + rpn_stack.push(PlainRanges::makeUniverse()); + else + rpn_stack.push(PlainRanges(element.range)); } else if (element.function == RPNElement::FUNCTION_NOT_IN_RANGE) { - rpn_stack.push(PlainRanges(element.range.invertRange())); + if (element.getKeyColumn() != column_index) + rpn_stack.push(PlainRanges::makeUniverse()); + else + rpn_stack.push(PlainRanges(element.range.invertRange())); } else if (element.function == RPNElement::FUNCTION_IN_SET) { + /// Only single-column set atoms are supported. For multi-column tuple-IN, bail out; + /// the caller falls back to "can't prune" (see `HybridSegmentPruner::canBePruned`). + const auto & mapping = element.set_index->getIndexesMapping(); + if (mapping.size() != 1) + return false; + if (mapping[0].key_index != column_index) + { + rpn_stack.push(PlainRanges::makeUniverse()); + continue; + } + if (element.set_index->hasMonotonicFunctionsChain()) return false; @@ -4245,6 +4270,15 @@ bool KeyCondition::extractPlainRanges(Ranges & ranges) const } else if (element.function == RPNElement::FUNCTION_NOT_IN_SET) { + const auto & mapping = element.set_index->getIndexesMapping(); + if (mapping.size() != 1) + return false; + if (mapping[0].key_index != column_index) + { + rpn_stack.push(PlainRanges::makeUniverse()); + continue; + } + if (element.set_index->hasMonotonicFunctionsChain()) return false; @@ -4311,7 +4345,7 @@ bool KeyCondition::extractPlainRanges(Ranges & ranges) const } if (rpn_stack.size() != 1) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected stack size in KeyCondition::extractPlainRanges"); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected stack size in KeyCondition::extractPlainRangesForColumn"); ranges = std::move(rpn_stack.top().ranges); return true; diff --git a/src/Storages/MergeTree/KeyCondition.h b/src/Storages/MergeTree/KeyCondition.h index 915312e8f4e0..6889e338bb60 100644 --- a/src/Storages/MergeTree/KeyCondition.h +++ b/src/Storages/MergeTree/KeyCondition.h @@ -235,6 +235,11 @@ class KeyCondition /// - `(x < 10 AND x % 2 = 0) OR (x < 20 AND x % 3 = 0)` -> { "(-Inf, +Inf)" } (no partial extraction across OR branches) Ranges extractBounds() const; + /// Same stack algorithm as extractPlainRanges, but for a multi-column key: logical ops apply + /// as usual, while atoms that constrain other key columns become the universe for `column_index`. + /// Returns false if the RPN contains unsupported atoms for this extraction (same as extractPlainRanges). + bool extractPlainRangesForColumn(size_t column_index, Ranges & ranges) const; + /// The expression is stored as Reverse Polish Notation. struct RPNElement { diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 6c67afa80b4f..44ca16ef4ec9 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -97,7 +98,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -497,6 +500,26 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( } } + /// Hybrid segment pruning: mirror the per-shard pruning above, but at the segment level. + /// When a segment's predicate is provably unsatisfiable with the user query, drop it from + /// the plan. The base segment is signalled to `read()` by emptying `optimized_cluster` — + /// the same idiom `optimize_skip_unused_shards` uses for empty shard sets — and `nodes` is + /// recomputed automatically from the empty cluster. The verdict is recomputed in `read()` + /// for per-segment skipping; both calls read the watermark snapshot frozen on + /// `storage_snapshot` (see `HybridSnapshotData`), so the two verdicts agree even under a + /// concurrent `ALTER MODIFY SETTING hybrid_watermark_*`. + HybridPruningVerdict pruning_verdict; + if (!segments.empty() || base_segment_predicate) + { + pruning_verdict = computeHybridPruningVerdict(query_info, storage_snapshot, local_context); + if (pruning_verdict.base_pruned) + { + query_info.optimized_cluster = cluster->getClusterWithMultipleShards({}); + cluster = query_info.optimized_cluster; + nodes = getClusterQueriedNodes(settings, cluster); + } + } + if (settings[Setting::distributed_group_by_no_merge]) { if (settings[Setting::distributed_group_by_no_merge] == DISTRIBUTED_GROUP_BY_NO_MERGE_AFTER_AGGREGATION) @@ -523,6 +546,17 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( if (to_stage == QueryProcessingStage::WithMergeableState) return QueryProcessingStage::WithMergeableState; + // TODO: check logic + if (!segments.empty()) + { + size_t surviving_segments = segments.size(); + for (bool is_pruned : pruning_verdict.segments_pruned) + if (is_pruned && surviving_segments > 0) + --surviving_segments; + nodes += surviving_segments; + } + + /// If there is only one node, the query can be fully processed by the /// shard, initiator will work as a proxy only. if (nodes == 1) @@ -746,6 +780,23 @@ std::optional StorageDistributed::getOptimizedQueryP return QueryProcessingStage::Complete; } +StorageSnapshotPtr StorageDistributed::getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr) const +{ + /// For Hybrid tables, freeze the watermark snapshot at snapshot acquisition time so + /// every later phase (`getQueryProcessingStage()`, `read()`) operates on the same + /// values. A concurrent `ALTER MODIFY SETTING hybrid_watermark_*` cannot change what + /// this query sees, which keeps the pruning verdict — and therefore the chosen + /// processing stage — consistent with the planned segment set. + if (!segments.empty() || base_segment_predicate) + { + auto data = std::make_unique(); + data->watermark_snapshot = hybrid_watermark_params.get(); + return std::make_shared(*this, metadata_snapshot, std::move(data)); + } + return std::make_shared(*this, metadata_snapshot); +} + + namespace { @@ -857,7 +908,8 @@ bool rewriteJoinToGlobalJoinIfNeeded(QueryTreeNodePtr join_tree) QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info, const StorageSnapshotPtr & distributed_storage_snapshot, const StorageID & remote_storage_id, - const ASTPtr & remote_table_function) + const ASTPtr & remote_table_function, + const ASTPtr & additional_filter = nullptr) { auto & planner_context = query_info.planner_context; const auto & query_context = planner_context->getQueryContext(); @@ -924,7 +976,23 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info, replacement_table_expression->setAlias(query_info.table_expression->getAlias()); + QueryTreeNodePtr filter; + if (additional_filter) + { + filter = buildQueryTree(additional_filter->clone(), query_context); + QueryAnalysisPass(replacement_table_expression).run(filter, query_context); + } + auto query_tree_to_modify = query_info.query_tree->cloneAndReplace(query_info.table_expression, std::move(replacement_table_expression)); + + if (filter) + { + auto & query_node = query_tree_to_modify->as(); + query_node.getWhere() = query_node.hasWhere() + ? mergeConditionNodes({query_node.getWhere(), filter}, query_context) + : std::move(filter); + } + ReplaseAliasColumnsVisitor replace_alias_columns_visitor; replace_alias_columns_visitor.visit(query_tree_to_modify); @@ -945,6 +1013,136 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info, return buildQueryTreeForShard(query_info.planner_context, query_tree_to_modify, /*allow_global_join_for_right_table*/ false); } +std::optional> tryGetParamTypeAndName(const ASTPtr & node) +{ + if (auto * func = node->as(); func && func->name == "hybridParam") + { + auto * arg_list = func->arguments ? func->arguments->as() : nullptr; + if (!arg_list || arg_list->children.size() != 2) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "hybridParam() requires exactly 2 arguments: (name, type)"); + + auto * name_lit = arg_list->children[0]->as(); + auto * type_lit = arg_list->children[1]->as(); + if (!name_lit || name_lit->value.getType() != Field::Types::String) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "hybridParam() first argument (name) must be a string literal"); + if (!type_lit || type_lit->value.getType() != Field::Types::String) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "hybridParam() second argument (type) must be a string literal"); + + const auto & param_name = name_lit->value.safeGet(); + const auto & type_name = type_lit->value.safeGet(); + return {{param_name, type_name}}; + } + return std::nullopt; +} + +template +void visitHybridParams(ASTPtrType & node, Visitor & visitor) +{ + if (!node) + return; + + if (auto param_type_and_name = tryGetParamTypeAndName(node); param_type_and_name.has_value()) + { + visitor(node, *param_type_and_name); + return; + } + + for (auto & child : node->children) + visitHybridParams(child, visitor); +} + +} + +ASTPtr StorageDistributed::substituteHybridWatermarks( + ASTPtr predicate_ast, + const MultiVersion::Version & watermarks) +{ + if (!predicate_ast) + return predicate_ast; + predicate_ast = predicate_ast->clone(); + + auto substitute = [&](ASTPtr & node, const std::pair & param_type_and_name) + { + const auto & [param_name, type_name] = param_type_and_name; + + if (!watermarks) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Hybrid watermark '{}' has no value; use ALTER TABLE ... MODIFY SETTING {} = '...' to set it", + param_name, param_name); + + auto it = watermarks->find(param_name); + if (it == watermarks->end()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Hybrid watermark '{}' has no value; use ALTER TABLE ... MODIFY SETTING {} = '...' to set it", + param_name, param_name); + + auto data_type = DataTypeFactory::instance().get(type_name); + auto col = data_type->createColumn(); + ReadBufferFromString buf(it->second); + data_type->getDefaultSerialization()->deserializeWholeText(*col, buf, FormatSettings{}); + node = make_intrusive((*col)[0]); + }; + + visitHybridParams(predicate_ast, substitute); + return predicate_ast; +} + +MultiVersion::Version +StorageDistributed::getHybridWatermarkSnapshot(const StorageSnapshotPtr & storage_snapshot) const +{ + if (const auto * hybrid_data = storage_snapshot->data + ? dynamic_cast(storage_snapshot->data.get()) + : nullptr) + return hybrid_data->watermark_snapshot; + return hybrid_watermark_params.get(); +} + +StorageDistributed::HybridPruningVerdict StorageDistributed::computeHybridPruningVerdict( + const SelectQueryInfo & query_info, + const StorageSnapshotPtr & storage_snapshot, + const ContextPtr & local_context) const +{ + StorageDistributed::HybridPruningVerdict verdict; + verdict.segments_pruned.assign(segments.size(), false); + + if (segments.empty() && !base_segment_predicate) + return verdict; + + /// Without a materialized user filter (legacy non-analyzer path, or a query before + /// filter actions are computed) we can't prune. Fail open — same precedent as + /// `skipUnusedShardsWithAnalyzer()`. The DAG is per-table-expression, so JOIN-side + /// predicates are already excluded; no JOIN guard needed. + if (!query_info.filter_actions_dag) + return verdict; + + NamesAndTypesList hybrid_columns = storage_snapshot->metadata->getColumns().getAllPhysical(); + ActionsDAGWithInversionPushDown inverted_dag( + query_info.filter_actions_dag->getOutputs().at(0), local_context); + HybridSegmentPruner pruner(inverted_dag, hybrid_columns, local_context); + if (pruner.isUseless()) + return verdict; + + /// Both `getQueryProcessingStage()` and `read()` reach this function with the same + /// `storage_snapshot`, so the watermark snapshot frozen by `getStorageSnapshot()` + /// makes the verdict identical across the two calls even under a concurrent + /// `ALTER MODIFY SETTING hybrid_watermark_*`. + auto watermarks = getHybridWatermarkSnapshot(storage_snapshot); + + auto check = [&](const ASTPtr & predicate_ast) -> bool + { + if (!predicate_ast) + return false; + return pruner.canBePruned( + substituteHybridWatermarks(predicate_ast, watermarks)); + }; + + if (base_segment_predicate) + verdict.base_pruned = check(base_segment_predicate); + + for (size_t i = 0; i < segments.size(); ++i) + verdict.segments_pruned[i] = check(segments[i].predicate_ast); + + return verdict; } void StorageDistributed::read( @@ -961,7 +1159,56 @@ void StorageDistributed::read( SelectQueryInfo modified_query_info = query_info; + std::vector additional_query_infos; + const auto & settings = local_context->getSettingsRef(); + auto metadata_ptr = getInMemoryMetadataPtr(local_context, false); + + auto describe_segment_target = [&](const HybridSegment & segment) -> String + { + if (segment.storage_id) + return segment.storage_id->getNameForLogs(); + if (segment.table_function_ast) + return segment.table_function_ast->formatForLogging(); + chassert(false, "Hybrid segment is missing both storage_id and table_function_ast"); + return String{""}; + }; + + auto describe_base_target = [&]() -> String + { + if (remote_table_function_ptr) + return remote_table_function_ptr->formatForLogging(); + if (!remote_database.empty()) + return remote_database + "." + remote_table; + return remote_table; + }; + + String base_target = describe_base_target(); + + const bool log_hybrid_query_rewrites = (!segments.empty() || base_segment_predicate); + + auto log_rewritten_query = [&](const String & target, const ASTPtr & ast) + { + if (!log_hybrid_query_rewrites || !ast) + return; + + LOG_TRACE(log, "rewriteSelectQuery (target: {}) -> {}", target, ast->formatForLogging()); + }; + + /// Recompute the Hybrid pruning verdict for per-segment skipping. The watermark snapshot + /// it depends on was frozen at `getStorageSnapshot()` time and is reused via + /// `HybridSnapshotData`, so this verdict matches the one `getQueryProcessingStage()` + /// produced — both the surviving-segment set and the substitution of `hybridParam(...)` + /// literals stay consistent with the chosen processing stage even under a concurrent + /// `ALTER MODIFY SETTING hybrid_watermark_*`. + HybridPruningVerdict pruning_verdict; + if (!segments.empty() || base_segment_predicate) + pruning_verdict = computeHybridPruningVerdict(query_info, storage_snapshot, local_context); + + auto watermark_snapshot = getHybridWatermarkSnapshot(storage_snapshot); + + if (pruning_verdict.base_pruned) + LOG_TRACE(log, "Hybrid segment pruned (target: {})", base_target); if (settings[Setting::allow_experimental_analyzer]) { @@ -970,7 +1217,8 @@ void StorageDistributed::read( auto query_tree_distributed = buildQueryTreeDistributed(modified_query_info, query_info.initial_storage_snapshot ? query_info.initial_storage_snapshot : storage_snapshot, remote_storage_id, - remote_table_function_ptr); + remote_table_function_ptr, + substituteHybridWatermarks(base_segment_predicate, watermark_snapshot)); Block block = *InterpreterSelectQueryAnalyzer::getSampleBlock(query_tree_distributed, local_context, SelectQueryOptions(processed_stage).analyze()); /** For distributed tables we do not need constants in header, since we don't send them to remote servers. * Moreover, constants can break some functions like `hostName` that are constants only for local queries. @@ -983,8 +1231,41 @@ void StorageDistributed::read( modified_query_info.query_tree = std::move(query_tree_distributed); - /// Return directly (with correct header) if no shard to query. - if (modified_query_info.getCluster()->getShardsInfo().empty()) + if (!segments.empty()) + { + for (size_t segment_idx = 0; segment_idx < segments.size(); ++segment_idx) + { + const auto & segment = segments[segment_idx]; + if (pruning_verdict.segments_pruned[segment_idx]) + { + LOG_TRACE(log, "Hybrid segment pruned (target: {})", describe_segment_target(segment)); + continue; + } + + ASTPtr substituted_predicate = substituteHybridWatermarks(segment.predicate_ast, watermark_snapshot); + + // Create a modified query info with the segment predicate + SelectQueryInfo additional_query_info = query_info; + + auto additional_query_tree = buildQueryTreeDistributed(additional_query_info, + query_info.initial_storage_snapshot ? query_info.initial_storage_snapshot : storage_snapshot, + segment.storage_id ? *segment.storage_id : StorageID::createEmpty(), + segment.storage_id ? nullptr : segment.table_function_ast, + substituted_predicate); + + additional_query_info.query = queryNodeToDistributedSelectQuery(additional_query_tree); + additional_query_info.query_tree = std::move(additional_query_tree); + log_rewritten_query(describe_segment_target(segment), additional_query_info.query); + + additional_query_infos.push_back(std::move(additional_query_info)); + } + } + + /// Empty cluster + nothing else to plan: take the same path Distributed already uses + /// when `optimize_skip_unused_shards` filters every shard. For Hybrid this is the + /// "all segments pruned" case (base pruned via empty `optimized_cluster`, every + /// additional pruned via the segments loop above). + if (modified_query_info.getCluster()->getShardsInfo().empty() && additional_query_infos.empty()) return; } else @@ -993,9 +1274,50 @@ void StorageDistributed::read( modified_query_info.query = ClusterProxy::rewriteSelectQuery( local_context, modified_query_info.query, - remote_database, remote_table, remote_table_function_ptr); + remote_database, remote_table, remote_table_function_ptr, + substituteHybridWatermarks(base_segment_predicate, watermark_snapshot)); + log_rewritten_query(base_target, modified_query_info.query); - if (modified_query_info.getCluster()->getShardsInfo().empty()) + if (!segments.empty()) + { + for (size_t segment_idx = 0; segment_idx < segments.size(); ++segment_idx) + { + const auto & segment = segments[segment_idx]; + if (pruning_verdict.segments_pruned[segment_idx]) + { + LOG_TRACE(log, "Hybrid segment pruned (target: {})", describe_segment_target(segment)); + continue; + } + + ASTPtr resolved_predicate = substituteHybridWatermarks(segment.predicate_ast, watermark_snapshot); + SelectQueryInfo additional_query_info = query_info; + + if (segment.storage_id) + { + additional_query_info.query = ClusterProxy::rewriteSelectQuery( + local_context, additional_query_info.query, + segment.storage_id->database_name, segment.storage_id->table_name, + nullptr, + resolved_predicate); + } + else + { + additional_query_info.query = ClusterProxy::rewriteSelectQuery( + local_context, additional_query_info.query, + "", "", segment.table_function_ast, + resolved_predicate); + } + + log_rewritten_query(describe_segment_target(segment), additional_query_info.query); + additional_query_infos.push_back(std::move(additional_query_info)); + } + } + + /// Empty cluster + nothing else to plan: take the same path Distributed already uses + /// when `optimize_skip_unused_shards` filters every shard. For Hybrid this is the + /// "all segments pruned" case (base pruned via empty `optimized_cluster`, every + /// additional pruned via the segments loop above). + if (modified_query_info.getCluster()->getShardsInfo().empty() && additional_query_infos.empty()) { Pipe pipe(std::make_shared(header)); auto read_from_pipe = std::make_unique(std::move(pipe)); @@ -1006,30 +1328,72 @@ void StorageDistributed::read( } } - ClusterProxy::SelectStreamFactory select_stream_factory = - ClusterProxy::SelectStreamFactory( + /// Hybrid case 2: base pruned (cluster empty via `getQueryProcessingStage`'s empty + /// `optimized_cluster`) and at least one additional segment survives. The all-pruned + /// subcase is already handled by the existing empty-cluster early-returns above. We + /// can't call `ClusterProxy::executeQuery` with an empty cluster (its + /// `updateSettingsAndClientInfoForCluster` dereferences `getShardsAddresses().front()` + /// when `is_remote_function=true`), so build the local plans directly. The block below + /// is the same shape as the `additional_query_infos` block in `ClusterProxy::executeQuery` + /// — that block uses the original context (not `new_context`), so we don't depend on the + /// shared distributed-context setup. + if (modified_query_info.getCluster()->getShardsInfo().empty() && !additional_query_infos.empty()) + { + const Block & header_block = *header; + std::vector plans; + plans.reserve(additional_query_infos.size()); + for (const auto & additional_query_info : additional_query_infos) + { + plans.emplace_back(createLocalPlan( + additional_query_info.query, header_block, local_context, + processed_stage, /*shard_num=*/0, /*shard_count=*/1, /*build_logical_plan=*/false, "")); + } + + if (plans.size() == 1) + { + query_plan = std::move(*plans.front()); + } + else + { + SharedHeaders input_headers; + input_headers.reserve(plans.size()); + for (auto & plan : plans) + input_headers.emplace_back(plan->getCurrentHeader()); + + auto union_step = std::make_unique(std::move(input_headers)); + union_step->setStepDescription("Hybrid"); + query_plan.unitePlans(std::move(union_step), std::move(plans)); + } + return; + } + + if (!modified_query_info.getCluster()->getShardsInfo().empty() || !additional_query_infos.empty()) + { + ClusterProxy::SelectStreamFactory select_stream_factory = + ClusterProxy::SelectStreamFactory( + header, + storage_snapshot, + processed_stage); + + auto shard_filter_generator = ClusterProxy::getShardFilterGeneratorForCustomKey( + *modified_query_info.getCluster(), local_context, metadata_ptr->columns); + + ClusterProxy::executeQuery( + query_plan, header, - storage_snapshot, - processed_stage); - - auto shard_filter_generator = ClusterProxy::getShardFilterGeneratorForCustomKey( - *modified_query_info.getCluster(), local_context, getInMemoryMetadataPtr(local_context, false)->columns); - - ClusterProxy::executeQuery( - query_plan, - header, - processed_stage, - remote_storage, - remote_table_function_ptr, - select_stream_factory, - log, - local_context, - modified_query_info, - sharding_key_expr, - sharding_key_column_name, - *distributed_settings, - shard_filter_generator, - is_remote_function); + processed_stage, + remote_storage, + remote_table_function_ptr, + select_stream_factory, + log, + local_context, + modified_query_info, + sharding_key_expr, + sharding_key_column_name, + *distributed_settings, + shard_filter_generator, + is_remote_function); + } /// This is a bug, it is possible only when there is no shards to query, and this is handled earlier. if (!query_plan.isInitialized()) diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 5438b6b400e3..78652ad79da9 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -4,10 +4,12 @@ #include #include #include +#include #include #include #include +#include #include namespace DB @@ -50,6 +52,26 @@ class StorageDistributed final : public IStorage, WithContext friend class StorageSystemDistributionQueue; public: + struct HybridSegment + { + ASTPtr table_function_ast; + ASTPtr predicate_ast; + std::optional storage_id; + + HybridSegment(ASTPtr table_function_ast_, ASTPtr predicate_ast_) + : table_function_ast(std::move(table_function_ast_)) + , predicate_ast(std::move(predicate_ast_)) + {} + + HybridSegment(ASTPtr table_function_ast_, ASTPtr predicate_ast_, StorageID storage_id_) + : table_function_ast(std::move(table_function_ast_)) + , predicate_ast(std::move(predicate_ast_)) + , storage_id(std::move(storage_id_)) + {} + }; + + using WatermarkParams = std::map; + StorageDistributed( const StorageID & id_, const ColumnsDescription & columns_, @@ -88,6 +110,8 @@ class StorageDistributed final : public IStorage, WithContext QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override; + StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const override; + void read( QueryPlan & query_plan, const Names & column_names, @@ -167,6 +191,46 @@ class StorageDistributed final : public IStorage, WithContext std::vector getDirectoryQueueStatuses() const; static IColumn::Selector createSelector(ClusterPtr cluster, const ColumnWithTypeAndName & result); + + /// Substitute hybridParam(name, type) calls in `predicate_ast` with literal values from + /// `watermarks`. Returns a fresh cloned AST. Pass-through for nullptr. + static ASTPtr substituteHybridWatermarks( + ASTPtr predicate_ast, + const MultiVersion::Version & watermarks); + + /// Hybrid-specific snapshot-time state attached to `StorageSnapshot::data`. Populated + /// once in `StorageDistributed::getStorageSnapshot()` so the watermark values seen by + /// `getQueryProcessingStage()` and `read()` cannot diverge mid-query under a concurrent + /// `ALTER MODIFY SETTING hybrid_watermark_*`. Without this, two independent + /// `MultiVersion::get()` calls could observe different versions and produce inconsistent + /// pruning verdicts (e.g. a `Complete`-stage plan unioned without final merge). + struct HybridSnapshotData : public StorageSnapshot::Data + { + MultiVersion::Version watermark_snapshot; + }; + + /// Per-query Hybrid pruning verdict. Recomputed in both `getQueryProcessingStage()` + /// (to drive the stage decision and empty `optimized_cluster` when the base is pruned) + /// and `read()` (to skip planning of pruned additional segments). The verdict is + /// deterministic across both calls because the watermark snapshot it depends on is + /// taken once at `getStorageSnapshot()` time and reused via `HybridSnapshotData`. + struct HybridPruningVerdict + { + bool base_pruned = false; + std::vector segments_pruned; + }; + + HybridPruningVerdict computeHybridPruningVerdict( + const SelectQueryInfo & query_info, + const StorageSnapshotPtr & storage_snapshot, + const ContextPtr & local_context) const; + + /// Read the frozen watermark snapshot attached by `getStorageSnapshot()`. The standard read + /// path always attaches `HybridSnapshotData` for Hybrid tables; the fall-through to a live + /// `MultiVersion::get()` is for code paths that bypass `getStorageSnapshot()`. + MultiVersion::Version getHybridWatermarkSnapshot( + const StorageSnapshotPtr & storage_snapshot) const; + /// Apply the following settings: /// - optimize_skip_unused_shards /// - force_optimize_skip_unused_shards @@ -271,6 +335,10 @@ class StorageDistributed final : public IStorage, WithContext pcg64 rng; bool is_remote_function; + + MultiVersion hybrid_watermark_params; + ASTPtr base_segment_predicate; + std::vector segments; }; } diff --git a/src/Storages/tests/gtest_hybrid_segment_pruner.cpp b/src/Storages/tests/gtest_hybrid_segment_pruner.cpp new file mode 100644 index 000000000000..02af38dd3865 --- /dev/null +++ b/src/Storages/tests/gtest_hybrid_segment_pruner.cpp @@ -0,0 +1,106 @@ +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +using namespace DB; + +namespace +{ + +ASTPtr parseExpression(const std::string & text) +{ + ParserExpression parser; + return parseQuery(parser, text, 4096, 1000, 1000000); +} + +NamesAndTypesList hybridColumnsForTests() +{ + return { + {"ts", std::make_shared()}, + {"date", std::make_shared()}, + {"customerid", std::make_shared()}, + {"x", std::make_shared()}, + {"y", std::make_shared()}, + }; +} + +/// Build a HybridSegmentPruner over `where_text` and ask whether `segment_text` can be pruned. +/// The user-side ActionsDAG is built via the same `TreeRewriter + ExpressionAnalyzer` idiom +/// the planner uses to populate `query_info.filter_actions_dag`. +bool canPrune(const std::string & where_text, const std::string & segment_text) +{ + auto context = getContext().context; + auto cols = hybridColumnsForTests(); + + auto where_ast = parseExpression(where_text); + auto syntax_result = TreeRewriter(context).analyze(where_ast, cols); + /// `add_aliases=true` projects the DAG outputs to the predicate only, mirroring the shape of + /// the analyzer-built `query_info.filter_actions_dag` (one output = the filter expression). + /// With `add_aliases=false` the source columns are also kept as outputs, so `getOutputs().at(0)` + /// can point to an input column instead of the predicate. + auto dag = ExpressionAnalyzer(where_ast, syntax_result, context).getActionsDAG(true); + + ActionsDAGWithInversionPushDown inverted(dag.getOutputs().at(0), context); + HybridSegmentPruner pruner(inverted, cols, context); + + return pruner.canBePruned(parseExpression(segment_text)); +} + +class HybridSegmentPrunerTest : public ::testing::Test +{ +public: + static void SetUpTestSuite() + { + tryRegisterFunctions(); + } +}; + +} + +TEST_F(HybridSegmentPrunerTest, RangeContradictionPrunes) +{ + /// `ts > '2025-10-01'` (user) ∧ `ts <= '2025-09-01'` (segment) is unsat → prune. + EXPECT_TRUE(canPrune("ts > '2025-10-01'", "ts <= '2025-09-01'")); +} + +TEST_F(HybridSegmentPrunerTest, OverlappingRangeKeeps) +{ + /// `ts > '2025-10-01'` (user) ∧ `ts > '2025-08-01'` (segment) is satisfiable → keep. + EXPECT_FALSE(canPrune("ts > '2025-10-01'", "ts > '2025-08-01'")); +} + +TEST_F(HybridSegmentPrunerTest, BoundedDnfWithConstantFolding) +{ + /// `(date = yesterday() AND customerid IN (2, 3)) OR (date = today() AND customerid IN (2, 3))` + /// (user) ∧ `date < '2015-01-01'` (segment): KeyCondition handles the OR by itself; the segment + /// hyperrectangle on `date` is (-∞, '2015-01-01'), which excludes both yesterday() and today(). + EXPECT_TRUE(canPrune( + "(date = yesterday() AND customerid IN (2, 3)) OR (date = today() AND customerid IN (2, 3))", + "date < '2015-01-01'")); +} + +TEST_F(HybridSegmentPrunerTest, OrAlternativeNotMandatoryConstraint) +{ + /// `(x < 0 OR y = 1) AND x > 5` (user) ∧ `x > 0` (segment): the OR's `y = 1` branch is + /// satisfiable inside the segment hyperrectangle (e.g. x = 10, y = 1) → keep. + EXPECT_FALSE(canPrune("(x < 0 OR y = 1) AND x > 5", "x > 0")); +} + +TEST_F(HybridSegmentPrunerTest, UnsupportedAtomInOrKeeps) +{ + /// The OR contains an atom KeyCondition can't analyze (`length(toString(x)) > 10`), + /// so it conservatively keeps the segment. + EXPECT_FALSE(canPrune("(length(toString(x)) > 10 OR x = 1) AND x = 2", "x > 0")); +} diff --git a/tests/queries/0_stateless/03646_hybrid_segment_pruning.reference b/tests/queries/0_stateless/03646_hybrid_segment_pruning.reference new file mode 100644 index 000000000000..29d991c63970 --- /dev/null +++ b/tests/queries/0_stateless/03646_hybrid_segment_pruning.reference @@ -0,0 +1,105 @@ +-- {echoOn} + +-- Test 1: Baseline (no pruning) — both segments planned, Union (Hybrid) present. +-- SELECT * surfaces the actual rows from both segments so a buggy pruner that swaps or +-- drops a segment would change the values, not just the count. +SELECT * FROM pruning_t ORDER BY ts; +2025-06-15 00:00:00 4 +2025-08-01 00:00:00 3 +2025-10-15 00:00:00 1 +2025-11-01 00:00:00 2 +SELECT * FROM pruning_t WHERE value > 0 ORDER BY ts; +2025-06-15 00:00:00 4 +2025-08-01 00:00:00 3 +2025-10-15 00:00:00 1 +2025-11-01 00:00:00 2 +EXPLAIN SELECT count() FROM pruning_t WHERE value > 0; +Expression ((Project names + Projection)) + MergingAggregated + Union (Hybrid) + Aggregating + Expression (Before GROUP BY) + Expression ((WHERE + Change column names to column identifiers)) + ReadFromMergeTree (default.local_hot) + MergingAggregated + Aggregating + Expression (Before GROUP BY) + Expression ((WHERE + Change column names to column identifiers)) + ReadFromMergeTree (default.local_cold) +-- Test 2: Cold (additional) segment pruned via range contradiction — only base remains. +-- The surviving rows must be the two hot rows; the cold rows must not leak through. +SELECT * FROM pruning_t WHERE ts > '2025-10-01' ORDER BY ts; +2025-10-15 00:00:00 1 +2025-11-01 00:00:00 2 +EXPLAIN SELECT count() FROM pruning_t WHERE ts > '2025-10-01'; +Expression ((Project names + Projection)) + Aggregating + ReadFromPreparedSource (_exact_count_projection) +-- Test 3: Hot (base) segment pruned — only cold remains as a local plan. +-- The surviving rows must be the two cold rows. +SELECT * FROM pruning_t WHERE ts <= '2025-08-01' ORDER BY ts; +2025-06-15 00:00:00 4 +2025-08-01 00:00:00 3 +EXPLAIN SELECT count() FROM pruning_t WHERE ts <= '2025-08-01'; +Expression ((Project names + Projection)) + Aggregating + ReadFromPreparedSource (_exact_count_projection) +-- Test 4: PREWHERE participates in pruning. +SELECT * FROM pruning_t PREWHERE ts > '2025-10-01' ORDER BY ts; +2025-10-15 00:00:00 1 +2025-11-01 00:00:00 2 +EXPLAIN SELECT count() FROM pruning_t PREWHERE ts > '2025-10-01'; +Expression ((Project names + Projection)) + Aggregating + ReadFromPreparedSource (_exact_count_projection) +-- Test 5: All segments pruned — getQueryProcessingStage returns FetchColumns, +-- planner inserts ReadNothing, AggregatingTransform synthesizes the default row. +-- The meaningful answer here is "zero rows", so `count()` is the right shape. +SELECT count() FROM pruning_t WHERE ts > '2025-12-01' AND ts <= '2025-08-01'; +0 +EXPLAIN SELECT count() FROM pruning_t WHERE ts > '2025-12-01' AND ts <= '2025-08-01'; +Expression ((Project names + Projection)) + Aggregating + Expression (Before GROUP BY) + Filter ((WHERE + Change column names to column identifiers)) + ReadNothing (Read from NullSource) +-- {echoOn} + +-- Test 6: three segments, only hot survives. +SELECT * FROM pruning_t3 WHERE ts > '2025-10-01' ORDER BY ts; +2025-10-15 00:00:00 1 +2025-11-01 00:00:00 2 +EXPLAIN SELECT count() FROM pruning_t3 WHERE ts > '2025-10-01'; +Expression ((Project names + Projection)) + Aggregating + ReadFromPreparedSource (_exact_count_projection) +-- Test 7: OR alternative is not a mandatory constraint — hot survives via the OR. +SELECT * FROM pruning_or WHERE (value = 1 OR value = 2) AND ts > '2025-10-01' ORDER BY ts; +2025-10-15 00:00:00 1 +2025-11-01 00:00:00 2 +EXPLAIN SELECT count() FROM pruning_or WHERE (value = 1 OR value = 2) AND ts > '2025-10-01'; +Expression ((Project names + Projection)) + Aggregating + Expression (Before GROUP BY) + Expression ((WHERE + Change column names to column identifiers)) + ReadFromMergeTree (default.local_hot) +-- Test 8: JOIN — pruner conservatively skips, both segments planned. EXPLAIN is omitted +-- because JOIN-side ordering depends on randomized settings the test harness cycles +-- through (e.g. query_plan_join_swap_table); we verify by projecting the joined columns. +SELECT t.ts, t.value, d.label +FROM pruning_t AS t +INNER JOIN dim AS d ON t.value = d.id +WHERE d.id > 1 AND t.ts <= '2025-08-01' +ORDER BY t.ts; +2025-06-15 00:00:00 4 d +2025-08-01 00:00:00 3 c +-- Test 9: SELECT alias shadows a Hybrid column used by segment predicates. With default +-- prefer_column_name_to_alias=0 the WHERE's `ts` resolves to the alias expression (a +-- constant true for every row); if the pruner mistakenly treated the unresolved `ts` as +-- the Hybrid column it would prune the cold segment (`ts <= '2025-09-01'`) and silently +-- drop those rows. All 4 rows must survive — projecting `value` confirms which rows. +SELECT ts, value FROM (SELECT toDateTime('2025-11-01') AS ts, value FROM pruning_t WHERE ts > '2025-10-01') ORDER BY value; +2025-11-01 00:00:00 1 +2025-11-01 00:00:00 2 +2025-11-01 00:00:00 3 +2025-11-01 00:00:00 4 diff --git a/tests/queries/0_stateless/03646_hybrid_segment_pruning.sql b/tests/queries/0_stateless/03646_hybrid_segment_pruning.sql new file mode 100644 index 000000000000..59424afc46ca --- /dev/null +++ b/tests/queries/0_stateless/03646_hybrid_segment_pruning.sql @@ -0,0 +1,134 @@ +-- Tags: no-fasttest +-- Tag no-fasttest: requires remote() table function + +SET allow_experimental_hybrid_table = 1; + +-- The EXPLAIN-based assertions below print plan shapes that some randomized session +-- settings perturb. Pin them for deterministic output. None of these affect pruning logic; +-- they just stabilize how the plan is rendered. +SET prefer_localhost_replica = 1; -- avoid ReadFromRemote vs ReadFromMergeTree flips +SET query_plan_join_swap_table = 'false'; -- pin JOIN side ordering +SET use_query_condition_cache = 0; -- consistent EXPLAIN across runs +SET optimize_trivial_count_query = 1; +SET parallel_replicas_local_plan = 0; +SET session_timezone = 'UTC'; -- pin DateTime formatting in SELECT * output + +DROP TABLE IF EXISTS local_hot SYNC; +DROP TABLE IF EXISTS local_cold SYNC; +DROP TABLE IF EXISTS local_warm SYNC; +DROP TABLE IF EXISTS pruning_t SYNC; +DROP TABLE IF EXISTS pruning_t3 SYNC; +DROP TABLE IF EXISTS pruning_or SYNC; +DROP TABLE IF EXISTS dim SYNC; + +CREATE TABLE local_hot (ts DateTime, value UInt64) ENGINE = MergeTree ORDER BY ts; +CREATE TABLE local_cold (ts DateTime, value UInt64) ENGINE = MergeTree ORDER BY ts; +INSERT INTO local_hot VALUES ('2025-10-15', 1), ('2025-11-01', 2); +INSERT INTO local_cold VALUES ('2025-08-01', 3), ('2025-06-15', 4); + +CREATE TABLE pruning_t +ENGINE = Hybrid( + remote('127.0.0.1:9000', currentDatabase(), 'local_hot'), + ts > hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('127.0.0.1:9000', currentDatabase(), 'local_cold'), + ts <= hybridParam('hybrid_watermark_hot', 'DateTime') +) +SETTINGS hybrid_watermark_hot = '2025-09-01' +AS local_hot; + +-- {echoOn} + +-- Test 1: Baseline (no pruning) — both segments planned, Union (Hybrid) present. +-- SELECT * surfaces the actual rows from both segments so a buggy pruner that swaps or +-- drops a segment would change the values, not just the count. +SELECT * FROM pruning_t ORDER BY ts; +SELECT * FROM pruning_t WHERE value > 0 ORDER BY ts; +EXPLAIN SELECT count() FROM pruning_t WHERE value > 0; + +-- Test 2: Cold (additional) segment pruned via range contradiction — only base remains. +-- The surviving rows must be the two hot rows; the cold rows must not leak through. +SELECT * FROM pruning_t WHERE ts > '2025-10-01' ORDER BY ts; +EXPLAIN SELECT count() FROM pruning_t WHERE ts > '2025-10-01'; + +-- Test 3: Hot (base) segment pruned — only cold remains as a local plan. +-- The surviving rows must be the two cold rows. +SELECT * FROM pruning_t WHERE ts <= '2025-08-01' ORDER BY ts; +EXPLAIN SELECT count() FROM pruning_t WHERE ts <= '2025-08-01'; + +-- Test 4: PREWHERE participates in pruning. +SELECT * FROM pruning_t PREWHERE ts > '2025-10-01' ORDER BY ts; +EXPLAIN SELECT count() FROM pruning_t PREWHERE ts > '2025-10-01'; + +-- Test 5: All segments pruned — getQueryProcessingStage returns FetchColumns, +-- planner inserts ReadNothing, AggregatingTransform synthesizes the default row. +-- The meaningful answer here is "zero rows", so `count()` is the right shape. +SELECT count() FROM pruning_t WHERE ts > '2025-12-01' AND ts <= '2025-08-01'; +EXPLAIN SELECT count() FROM pruning_t WHERE ts > '2025-12-01' AND ts <= '2025-08-01'; + +-- {echoOff} + +-- Test 6: three-segment table; cold + middle pruned, only hot kept. +CREATE TABLE local_warm (ts DateTime, value UInt64) ENGINE = MergeTree ORDER BY ts; +INSERT INTO local_warm VALUES ('2025-09-15', 5), ('2025-09-25', 6); + +CREATE TABLE pruning_t3 +ENGINE = Hybrid( + remote('127.0.0.1:9000', currentDatabase(), 'local_hot'), + ts > hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('127.0.0.1:9000', currentDatabase(), 'local_warm'), + ts > hybridParam('hybrid_watermark_cold', 'DateTime') + AND ts <= hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('127.0.0.1:9000', currentDatabase(), 'local_cold'), + ts <= hybridParam('hybrid_watermark_cold', 'DateTime') +) +SETTINGS hybrid_watermark_hot = '2025-10-01', hybrid_watermark_cold = '2025-09-01' +AS local_hot; + +CREATE TABLE pruning_or +ENGINE = Hybrid( + remote('127.0.0.1:9000', currentDatabase(), 'local_hot'), + ts > hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('127.0.0.1:9000', currentDatabase(), 'local_cold'), + ts <= hybridParam('hybrid_watermark_hot', 'DateTime') +) +SETTINGS hybrid_watermark_hot = '2025-09-01' +AS local_hot; + +CREATE TABLE dim (id UInt64, label String) ENGINE = MergeTree ORDER BY id; +INSERT INTO dim VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'); + +-- {echoOn} + +-- Test 6: three segments, only hot survives. +SELECT * FROM pruning_t3 WHERE ts > '2025-10-01' ORDER BY ts; +EXPLAIN SELECT count() FROM pruning_t3 WHERE ts > '2025-10-01'; + +-- Test 7: OR alternative is not a mandatory constraint — hot survives via the OR. +SELECT * FROM pruning_or WHERE (value = 1 OR value = 2) AND ts > '2025-10-01' ORDER BY ts; +EXPLAIN SELECT count() FROM pruning_or WHERE (value = 1 OR value = 2) AND ts > '2025-10-01'; + +-- Test 8: JOIN — pruner conservatively skips, both segments planned. EXPLAIN is omitted +-- because JOIN-side ordering depends on randomized settings the test harness cycles +-- through (e.g. query_plan_join_swap_table); we verify by projecting the joined columns. +SELECT t.ts, t.value, d.label +FROM pruning_t AS t +INNER JOIN dim AS d ON t.value = d.id +WHERE d.id > 1 AND t.ts <= '2025-08-01' +ORDER BY t.ts; + +-- Test 9: SELECT alias shadows a Hybrid column used by segment predicates. With default +-- prefer_column_name_to_alias=0 the WHERE's `ts` resolves to the alias expression (a +-- constant true for every row); if the pruner mistakenly treated the unresolved `ts` as +-- the Hybrid column it would prune the cold segment (`ts <= '2025-09-01'`) and silently +-- drop those rows. All 4 rows must survive — projecting `value` confirms which rows. +SELECT ts, value FROM (SELECT toDateTime('2025-11-01') AS ts, value FROM pruning_t WHERE ts > '2025-10-01') ORDER BY value; + +-- {echoOff} + +DROP TABLE IF EXISTS dim SYNC; +DROP TABLE IF EXISTS pruning_or SYNC; +DROP TABLE IF EXISTS pruning_t3 SYNC; +DROP TABLE IF EXISTS pruning_t SYNC; +DROP TABLE IF EXISTS local_hot SYNC; +DROP TABLE IF EXISTS local_cold SYNC; +DROP TABLE IF EXISTS local_warm SYNC;