Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<p align="center">
<a href="https://github.com/KipData/KiteSQL/actions/workflows/ci.yml"><img src="https://github.com/KipData/KiteSQL/actions/workflows/ci.yml/badge.svg" alt="CI"></img></a>
<a href="https://deepwiki.com/KipData/KiteSQL"><img src="https://deepwiki.com/badge.svg" alt="Ask DeepWiki"></img></a>
<a href="https://discord.gg/89cxtD2g"><img src="https://img.shields.io/badge/Discord-Join_us-5865F2?logo=discord&logoColor=white" alt="Discord"></a>
<a href="https://discord.gg/dU8eVGpJ8h"><img src="https://img.shields.io/badge/Discord-Join_us-5865F2?logo=discord&logoColor=white" alt="Discord"></a>
<a href="https://crates.io/crates/kite_sql/"><img src="https://img.shields.io/crates/v/kite_sql.svg"></a>
<a href="https://github.com/KipData/KiteSQL" target="_blank">
<img src="https://img.shields.io/github/stars/KipData/KiteSQL.svg?style=social" alt="github star"/>
Expand Down
177 changes: 0 additions & 177 deletions scripts/run_tpcc_matrix.sh

This file was deleted.

6 changes: 5 additions & 1 deletion src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,6 @@ fn default_optimizer_pipeline() -> HepOptimizerPipeline {
vec![
NormalizationRuleImpl::LimitProjectTranspose,
NormalizationRuleImpl::PushLimitThroughJoin,
NormalizationRuleImpl::PushLimitIntoTableScan,
],
)
.before_batch(
Expand Down Expand Up @@ -370,6 +369,11 @@ fn default_optimizer_pipeline() -> HepOptimizerPipeline {
HepBatchStrategy::once_topdown(),
vec![NormalizationRuleImpl::EvaluatorBind],
)
.after_batch(
"Limit Into Scan".to_string(),
HepBatchStrategy::fix_point_topdown(10),
vec![NormalizationRuleImpl::PushLimitIntoTableScan],
)
.implementations(vec![
// DQL
ImplementationRuleImpl::SimpleAggregate,
Expand Down
92 changes: 78 additions & 14 deletions src/execution/dml/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ use crate::iter_ext::Itertools;
use crate::planner::operator::update::UpdateOperator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::index::Index;
use crate::types::index::{Index, IndexMeta, IndexType};
use crate::types::tuple::{Schema, Tuple};
use crate::types::tuple_builder::TupleBuilder;
use std::{collections::HashMap, mem};
use crate::types::ColumnId;
use std::{
collections::{HashMap, HashSet},
mem,
};

pub struct Update {
table_name: TableName,
Expand Down Expand Up @@ -79,6 +83,24 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Update {
}
}

impl Update {
fn index_needs_update(
index_meta: &IndexMeta,
updated_column_ids: &HashSet<ColumnId>,
updates_primary_key: bool,
) -> bool {
if matches!(index_meta.ty, IndexType::PrimaryKey { .. }) {
return false;
}

updates_primary_key
|| index_meta
.column_ids
.iter()
.any(|column_id| updated_column_ids.contains(column_id))
}
}

impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Update {
fn next_tuple(
&mut self,
Expand All @@ -91,8 +113,14 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Update {
};

let mut exprs_map = HashMap::with_capacity(self.value_exprs.len());
let mut updated_column_ids = HashSet::with_capacity(self.value_exprs.len());
for (column, expr) in self.value_exprs.drain(..) {
exprs_map.insert(plan_arena.column(column).id(), expr);
let column = plan_arena.column(column);
let column_id = column
.id()
.ok_or_else(|| DatabaseError::column_not_found(column.name().to_string()))?;
updated_column_ids.insert(column_id);
exprs_map.insert(column_id, expr);
}

let table_cache = arena.context().table_cache();
Expand All @@ -104,6 +132,13 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Update {
.transpose()?
};
if let Some(table_snapshot) = table_snapshot {
let updates_primary_key = table_snapshot.primary_key_indices.iter().any(|index| {
table_snapshot
.columns
.get(*index)
.and_then(|column| plan_arena.column(*column).id())
.is_some_and(|column_id| updated_column_ids.contains(&column_id))
});
let serializers = self
.input_schema
.iter()
Expand All @@ -118,17 +153,30 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Update {
let Some(old_pk) = arena.result_tuple().pk.clone() else {
continue;
};
for (index_meta, exprs) in table_snapshot.index_metas.iter() {

let mut old_index_values = Vec::new();
for (index_offset, (index_meta, exprs)) in
table_snapshot.index_metas.iter().enumerate()
{
let index_meta = plan_arena.index(*index_meta);
with_projection_tmp_value(arena, None, exprs, |arena, value| {
let mut state = arena.local_state(plan_arena);
let (transaction, table_codec) = state.transaction_codec_mut();
let index = Index::new(index_meta.id, &value, index_meta.ty);
transaction.del_index(table_codec, &self.table_name, &index, &old_pk)
if !Self::index_needs_update(
index_meta,
&updated_column_ids,
updates_primary_key,
) {
continue;
}

with_projection_tmp_value(arena, None, exprs, |_, value| {
old_index_values.push((index_offset, value));
Ok(())
})?;
}
for (i, column) in self.input_schema.iter().enumerate() {
if let Some(expr) = exprs_map.get(&plan_arena.column(*column).id()) {
let Some(column_id) = plan_arena.column(*column).id() else {
continue;
};
if let Some(expr) = exprs_map.get(&column_id) {
let value = expr.eval(Some(arena.result_tuple()))?;
arena.result_tuple_mut().values[i] = value;
}
Expand All @@ -140,19 +188,35 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Update {
);
arena.result_tuple_mut().pk = Some(new_pk.clone());

if new_pk != old_pk {
let primary_key_changed = new_pk != old_pk;
if primary_key_changed {
let mut state = arena.local_state(plan_arena);
let (transaction, table_codec) = state.transaction_codec_mut();
transaction.remove_tuple(table_codec, &self.table_name, &old_pk)?;
is_overwrite = false;
}
for (index_meta, exprs) in table_snapshot.index_metas.iter() {

for (index_offset, old_value) in old_index_values {
let (index_meta, exprs) = &table_snapshot.index_metas[index_offset];
let index_meta = plan_arena.index(*index_meta);
let index_id = index_meta.id;
let index_ty = index_meta.ty;
with_projection_tmp_value(arena, None, exprs, |arena, value| {
if !primary_key_changed && old_value == value {
return Ok(());
}

let mut state = arena.local_state(plan_arena);
let (transaction, table_codec) = state.transaction_codec_mut();
let index = Index::new(index_meta.id, &value, index_meta.ty);
transaction.add_index(table_codec, &self.table_name, index, &new_pk)
let old_index = Index::new(index_id, &old_value, index_ty);
transaction.del_index(
table_codec,
&self.table_name,
&old_index,
&old_pk,
)?;
let new_index = Index::new(index_id, &value, index_ty);
transaction.add_index(table_codec, &self.table_name, new_index, &new_pk)
})?;
}

Expand Down
Loading
Loading