diff --git a/README.md b/README.md
index eee9d7f0..4518de0f 100755
--- a/README.md
+++ b/README.md
@@ -19,7 +19,7 @@
-
+
diff --git a/scripts/run_tpcc_matrix.sh b/scripts/run_tpcc_matrix.sh
deleted file mode 100755
index 444e8c8e..00000000
--- a/scripts/run_tpcc_matrix.sh
+++ /dev/null
@@ -1,177 +0,0 @@
-#!/usr/bin/env bash
-
-set -euo pipefail
-
-ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
-cd "$ROOT_DIR"
-
-NUM_WARE="${TPCC_NUM_WARE:-1}"
-MAX_DUPLICATE_RETRY="${TPCC_DUPLICATE_RETRY:-1}"
-MAIN_MEASURE_TIME="${TPCC_MAIN_MEASURE_TIME:-}"
-STAMP="${TPCC_RESULT_STAMP:-$(date +%Y-%m-%d_%H-%M-%S)}"
-RESULT_DIR="${TPCC_RESULT_DIR:-$ROOT_DIR/tpcc/results/$STAMP}"
-LOG_DIR="$RESULT_DIR/logs"
-TMP_DIR="$ROOT_DIR/target/tpcc-run-data"
-BINARY="$ROOT_DIR/target/release/tpcc"
-SUMMARY_FILE="$RESULT_DIR/summary.md"
-
-mkdir -p "$LOG_DIR" "$TMP_DIR"
-
-if [[ ! -x "$BINARY" ]]; then
- echo "missing binary: $BINARY" >&2
- echo "build it first with: cargo build -p tpcc --release" >&2
- exit 1
-fi
-
-extract_tpmc() {
- local log_file="$1"
- awk '//{getline; print $1; exit}' "$log_file"
-}
-
-extract_p90() {
- local log_file="$1"
- local label="$2"
- awk -v label="$label" '
- /<90th Percentile RT \(MaxRT\)>/ { in_block = 1; next }
- in_block && index($0, label) {
- gsub(/^[[:space:]]+/, "", $0)
- print $3
- exit
- }
- ' "$log_file"
-}
-
-should_retry_duplicate() {
- local log_file="$1"
- rg -q "UNIQUE constraint failed|duplicate key|primary key|Duplicate" "$log_file"
-}
-
-run_variant() {
- local name="$1"
- local measure_label="$2"
- local db_path="$3"
- shift 3
- local -a cmd=("$@")
- local log_file="$LOG_DIR/$name.log"
- local status="ok"
- local notes="-"
- local attempts=0
- local max_attempts=$((MAX_DUPLICATE_RETRY + 1))
-
- : > "$log_file"
-
- while (( attempts < max_attempts )); do
- attempts=$((attempts + 1))
- rm -rf "$db_path"
-
- {
- printf '## Attempt %s\n' "$attempts"
- printf '$'
- printf ' %q' "${cmd[@]}"
- printf '\n\n'
- } >> "$log_file"
-
- set +e
- "${cmd[@]}" >> "$log_file" 2>&1
- local cmd_status=$?
- set -e
-
- if [[ "$cmd_status" -eq 0 ]]; then
- break
- fi
-
- if (( attempts < max_attempts )) && should_retry_duplicate "$log_file"; then
- notes="retry after duplicate-key failure"
- printf '\n[runner] duplicate-key style failure detected, retrying %s from scratch\n\n' "$name" >> "$log_file"
- continue
- fi
-
- status="failed"
- notes="$(tail -n 5 "$log_file" | tr '\n' ' ' | sed 's/[[:space:]]\\+/ /g; s/^ //; s/ $//')"
- break
- done
-
- local tpmc="-"
- local new_order="-"
- local payment="-"
- local order_status="-"
- local delivery="-"
- local stock_level="-"
-
- if [[ "$status" == "ok" ]]; then
- tpmc="$(extract_tpmc "$log_file" || echo -)"
- new_order="$(extract_p90 "$log_file" "New-Order" || echo -)"
- payment="$(extract_p90 "$log_file" "Payment" || echo -)"
- order_status="$(extract_p90 "$log_file" "Order-Status" || echo -)"
- delivery="$(extract_p90 "$log_file" "Delivery" || echo -)"
- stock_level="$(extract_p90 "$log_file" "Stock-Level" || echo -)"
- fi
-
- printf '| %s | %s | %s | %s | %s | %s | %s | %s | %s | %s | %s | [%s](./logs/%s.log) |\n' \
- "$name" \
- "$status" \
- "$attempts" \
- "$measure_label" \
- "$tpmc" \
- "$new_order" \
- "$payment" \
- "$order_status" \
- "$delivery" \
- "$stock_level" \
- "$notes" \
- "$name" \
- "$name" \
- >> "$SUMMARY_FILE"
-
- rm -rf "$db_path"
-
- cat "$log_file"
-}
-
-cat > "$SUMMARY_FILE" < HepOptimizerPipeline {
vec![
NormalizationRuleImpl::LimitProjectTranspose,
NormalizationRuleImpl::PushLimitThroughJoin,
- NormalizationRuleImpl::PushLimitIntoTableScan,
],
)
.before_batch(
@@ -370,6 +369,11 @@ fn default_optimizer_pipeline() -> HepOptimizerPipeline {
HepBatchStrategy::once_topdown(),
vec![NormalizationRuleImpl::EvaluatorBind],
)
+ .after_batch(
+ "Limit Into Scan".to_string(),
+ HepBatchStrategy::fix_point_topdown(10),
+ vec![NormalizationRuleImpl::PushLimitIntoTableScan],
+ )
.implementations(vec![
// DQL
ImplementationRuleImpl::SimpleAggregate,
diff --git a/src/execution/dml/update.rs b/src/execution/dml/update.rs
index 255f9fd5..df1a22b0 100644
--- a/src/execution/dml/update.rs
+++ b/src/execution/dml/update.rs
@@ -23,10 +23,14 @@ use crate::iter_ext::Itertools;
use crate::planner::operator::update::UpdateOperator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
-use crate::types::index::Index;
+use crate::types::index::{Index, IndexMeta, IndexType};
use crate::types::tuple::{Schema, Tuple};
use crate::types::tuple_builder::TupleBuilder;
-use std::{collections::HashMap, mem};
+use crate::types::ColumnId;
+use std::{
+ collections::{HashMap, HashSet},
+ mem,
+};
pub struct Update {
table_name: TableName,
@@ -79,6 +83,24 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Update {
}
}
+impl Update {
+ fn index_needs_update(
+ index_meta: &IndexMeta,
+ updated_column_ids: &HashSet,
+ updates_primary_key: bool,
+ ) -> bool {
+ if matches!(index_meta.ty, IndexType::PrimaryKey { .. }) {
+ return false;
+ }
+
+ updates_primary_key
+ || index_meta
+ .column_ids
+ .iter()
+ .any(|column_id| updated_column_ids.contains(column_id))
+ }
+}
+
impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Update {
fn next_tuple(
&mut self,
@@ -91,8 +113,14 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Update {
};
let mut exprs_map = HashMap::with_capacity(self.value_exprs.len());
+ let mut updated_column_ids = HashSet::with_capacity(self.value_exprs.len());
for (column, expr) in self.value_exprs.drain(..) {
- exprs_map.insert(plan_arena.column(column).id(), expr);
+ let column = plan_arena.column(column);
+ let column_id = column
+ .id()
+ .ok_or_else(|| DatabaseError::column_not_found(column.name().to_string()))?;
+ updated_column_ids.insert(column_id);
+ exprs_map.insert(column_id, expr);
}
let table_cache = arena.context().table_cache();
@@ -104,6 +132,13 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Update {
.transpose()?
};
if let Some(table_snapshot) = table_snapshot {
+ let updates_primary_key = table_snapshot.primary_key_indices.iter().any(|index| {
+ table_snapshot
+ .columns
+ .get(*index)
+ .and_then(|column| plan_arena.column(*column).id())
+ .is_some_and(|column_id| updated_column_ids.contains(&column_id))
+ });
let serializers = self
.input_schema
.iter()
@@ -118,17 +153,30 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Update {
let Some(old_pk) = arena.result_tuple().pk.clone() else {
continue;
};
- for (index_meta, exprs) in table_snapshot.index_metas.iter() {
+
+ let mut old_index_values = Vec::new();
+ for (index_offset, (index_meta, exprs)) in
+ table_snapshot.index_metas.iter().enumerate()
+ {
let index_meta = plan_arena.index(*index_meta);
- with_projection_tmp_value(arena, None, exprs, |arena, value| {
- let mut state = arena.local_state(plan_arena);
- let (transaction, table_codec) = state.transaction_codec_mut();
- let index = Index::new(index_meta.id, &value, index_meta.ty);
- transaction.del_index(table_codec, &self.table_name, &index, &old_pk)
+ if !Self::index_needs_update(
+ index_meta,
+ &updated_column_ids,
+ updates_primary_key,
+ ) {
+ continue;
+ }
+
+ with_projection_tmp_value(arena, None, exprs, |_, value| {
+ old_index_values.push((index_offset, value));
+ Ok(())
})?;
}
for (i, column) in self.input_schema.iter().enumerate() {
- if let Some(expr) = exprs_map.get(&plan_arena.column(*column).id()) {
+ let Some(column_id) = plan_arena.column(*column).id() else {
+ continue;
+ };
+ if let Some(expr) = exprs_map.get(&column_id) {
let value = expr.eval(Some(arena.result_tuple()))?;
arena.result_tuple_mut().values[i] = value;
}
@@ -140,19 +188,35 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Update {
);
arena.result_tuple_mut().pk = Some(new_pk.clone());
- if new_pk != old_pk {
+ let primary_key_changed = new_pk != old_pk;
+ if primary_key_changed {
let mut state = arena.local_state(plan_arena);
let (transaction, table_codec) = state.transaction_codec_mut();
transaction.remove_tuple(table_codec, &self.table_name, &old_pk)?;
is_overwrite = false;
}
- for (index_meta, exprs) in table_snapshot.index_metas.iter() {
+
+ for (index_offset, old_value) in old_index_values {
+ let (index_meta, exprs) = &table_snapshot.index_metas[index_offset];
let index_meta = plan_arena.index(*index_meta);
+ let index_id = index_meta.id;
+ let index_ty = index_meta.ty;
with_projection_tmp_value(arena, None, exprs, |arena, value| {
+ if !primary_key_changed && old_value == value {
+ return Ok(());
+ }
+
let mut state = arena.local_state(plan_arena);
let (transaction, table_codec) = state.transaction_codec_mut();
- let index = Index::new(index_meta.id, &value, index_meta.ty);
- transaction.add_index(table_codec, &self.table_name, index, &new_pk)
+ let old_index = Index::new(index_id, &old_value, index_ty);
+ transaction.del_index(
+ table_codec,
+ &self.table_name,
+ &old_index,
+ &old_pk,
+ )?;
+ let new_index = Index::new(index_id, &value, index_ty);
+ transaction.add_index(table_codec, &self.table_name, new_index, &new_pk)
})?;
}
diff --git a/src/expression/range_detacher.rs b/src/expression/range_detacher.rs
index e5212e88..99e70efc 100644
--- a/src/expression/range_detacher.rs
+++ b/src/expression/range_detacher.rs
@@ -18,7 +18,7 @@ use crate::expression::{BinaryOperator, ScalarExpression};
use crate::iter_ext::Itertools;
use crate::planner::PlanArena;
use crate::types::value::DataValue;
-use crate::types::ColumnId;
+use crate::types::{ColumnId, LogicalType};
use kite_sql_serde_macros::ReferenceSerialization;
use std::cmp::Ordering;
use std::collections::Bound;
@@ -39,6 +39,38 @@ pub enum Range {
SortedRanges(Vec),
}
+#[derive(Debug, PartialEq, Eq, Clone, Hash, ReferenceSerialization)]
+pub struct DetachedPredicate {
+ pub(crate) range: Range,
+ pub(crate) residual: Option,
+}
+
+impl DetachedPredicate {
+ fn consumed(range: Range) -> Self {
+ Self {
+ range,
+ residual: None,
+ }
+ }
+
+ fn combine_residuals(
+ left: Option,
+ right: Option,
+ ) -> Option {
+ match (left, right) {
+ (Some(left), Some(right)) => Some(ScalarExpression::Binary {
+ op: BinaryOperator::And,
+ left_expr: Box::new(left),
+ right_expr: Box::new(right),
+ evaluator: None,
+ ty: LogicalType::Boolean,
+ }),
+ (Some(expr), None) | (None, Some(expr)) => Some(expr),
+ (None, None) => None,
+ }
+ }
+}
+
struct TreeNode {
value: Option,
children: Vec>,
@@ -202,34 +234,82 @@ impl<'a, 'p> RangeDetacher<'a, 'p> {
pub(crate) fn detach(
&mut self,
expr: &ScalarExpression,
- ) -> Result