diff --git a/Cargo.lock b/Cargo.lock index 7812a2d6..fc80de1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1073,7 +1073,7 @@ dependencies = [ [[package]] name = "kite_sql" -version = "0.3.2" +version = "0.3.3" dependencies = [ "bumpalo", "chrono", @@ -1102,7 +1102,7 @@ dependencies = [ [[package]] name = "kite_sql_serde_macros" -version = "0.2.2" +version = "0.2.3" dependencies = [ "darling", "proc-macro2", diff --git a/Cargo.toml b/Cargo.toml index 526f8048..eaaa021c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ [package] name = "kite_sql" -version = "0.3.2" +version = "0.3.3" edition = "2021" build = "build.rs" authors = ["Kould ", "Xwg "] @@ -53,7 +53,7 @@ required-features = ["pprof"] bumpalo = { version = "3", default-features = false, features = ["collections"] } ordered-float = { version = "4" } paste = { version = "1" } -kite_sql_serde_macros = { version = "0.2.2", path = "kite_sql_serde_macros" } +kite_sql_serde_macros = { version = "0.2.3", path = "kite_sql_serde_macros" } # Optional dependencies for features comfy-table = { version = "7", default-features = false, optional = true } diff --git a/examples/transaction.rs b/examples/transaction.rs index 9e145307..00b4c868 100644 --- a/examples/transaction.rs +++ b/examples/transaction.rs @@ -49,33 +49,40 @@ mod app { .run("insert into t1 values(0, 0), (1, 1)")? .done()?; - assert!(database.run("select * from t1")?.next().is_none()); + let mut hidden_iter = database.run("select * from t1")?; + assert!(hidden_iter.next_tuple(|_, _| ())?.is_none()); + hidden_iter.done()?; transaction.commit()?; let mut iter = database.run("select * from t1")?; - assert_eq!( - iter.next().unwrap()?, - Tuple::new(None, vec![DataValue::Int32(0), DataValue::Int32(0)]) - ); - assert_eq!( - iter.next().unwrap()?, - Tuple::new(None, vec![DataValue::Int32(1), DataValue::Int32(1)]) - ); - assert!(iter.next().is_none()); + iter.next_tuple(|_, tuple| { + assert_eq!( + tuple, + &Tuple::new(None, vec![DataValue::Int32(0), DataValue::Int32(0)]) + ); + })? + .unwrap(); + iter.next_tuple(|_, tuple| { + assert_eq!( + tuple, + &Tuple::new(None, vec![DataValue::Int32(1), DataValue::Int32(1)]) + ); + })? + .unwrap(); + assert!(iter.next_tuple(|_, _| ())?.is_none()); iter.done()?; let mut tx2 = database.new_transaction()?; tx2.run("update t1 set c2 = 99 where c1 = 0")?.done()?; + let mut c2_iter = database.run("select c2 from t1 where c1 = 0")?; assert_eq!( - database - .run("select c2 from t1 where c1 = 0")? - .next() - .unwrap()? - .values[0] - .i32(), + c2_iter + .next_tuple(|_, tuple| tuple.values[0].i32())? + .unwrap(), Some(0) ); + c2_iter.done()?; drop(tx2); database.ddl("drop table t1")?; diff --git a/kite_sql_serde_macros/Cargo.toml b/kite_sql_serde_macros/Cargo.toml index a30b5785..3ca20a2a 100644 --- a/kite_sql_serde_macros/Cargo.toml +++ b/kite_sql_serde_macros/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kite_sql_serde_macros" -version = "0.2.2" +version = "0.2.3" edition = "2021" description = "Derive macros for KiteSQL" documentation = "https://docs.rs/kite_sql_serde_macros/latest/kite_sql_serde_macros/" diff --git a/kite_sql_serde_macros/src/orm.rs b/kite_sql_serde_macros/src/orm.rs index 0b2b5752..b79dbfdc 100644 --- a/kite_sql_serde_macros/src/orm.rs +++ b/kite_sql_serde_macros/src/orm.rs @@ -244,7 +244,7 @@ pub(crate) fn handle(ast: DeriveInput) -> Result { }); field_initializers.push(quote! { #field_name: ::kite_sql::orm::take_value_at::<#field_ty>( - &mut tuple, + tuple, #field_index_ident, #column_name_lit, )? @@ -410,7 +410,7 @@ pub(crate) fn handle(ast: DeriveInput) -> Result { { fn from_query_row( schema: &::kite_sql::types::tuple::SchemaView<'_, '_>, - mut tuple: ::kite_sql::types::tuple::Tuple, + tuple: &mut ::kite_sql::types::tuple::Tuple, ) -> ::std::result::Result { let mut __kite_orm_found_fields = 0usize; #(#field_index_declarations)* diff --git a/kite_sql_serde_macros/src/projection.rs b/kite_sql_serde_macros/src/projection.rs index e9b5b5ce..1067fcb3 100644 --- a/kite_sql_serde_macros/src/projection.rs +++ b/kite_sql_serde_macros/src/projection.rs @@ -88,7 +88,7 @@ pub(crate) fn handle(ast: DeriveInput) -> Result { }); field_initializers.push(quote! { #field_name: ::kite_sql::orm::take_value_at::<#field_ty>( - &mut tuple, + tuple, #field_index_ident, #field_name_lit, )? @@ -121,7 +121,7 @@ pub(crate) fn handle(ast: DeriveInput) -> Result { { fn from_query_row( schema: &::kite_sql::types::tuple::SchemaView<'_, '_>, - mut tuple: ::kite_sql::types::tuple::Tuple, + tuple: &mut ::kite_sql::types::tuple::Tuple, ) -> ::std::result::Result { let mut __kite_orm_found_fields = 0usize; #(#field_index_declarations)* diff --git a/scripts/run_tpcc_stable.py b/scripts/run_tpcc_stable.py new file mode 100755 index 00000000..61187f67 --- /dev/null +++ b/scripts/run_tpcc_stable.py @@ -0,0 +1,498 @@ +#!/usr/bin/env python3 +"""Run the TPCC matrix only when the machine is cool and idle enough.""" + +from __future__ import annotations + +import argparse +import datetime as dt +import os +import re +import shlex +import shutil +import subprocess +import sys +import time +from dataclasses import dataclass +from pathlib import Path + + +ROOT_DIR = Path(__file__).resolve().parents[1] +DEFAULT_BINARY = ROOT_DIR / "target" / "release" / "tpcc" +DEFAULT_TMP_DIR = ROOT_DIR / "target" / "tpcc-stable-run-data" + + +@dataclass(frozen=True) +class Variant: + name: str + backend: str + db_name: str + sqlite_profile: str | None = None + + +@dataclass(frozen=True) +class Health: + cpu_percent: float | None + max_temp_c: float | None + temp_sources: int + + def format(self) -> str: + cpu = "-" if self.cpu_percent is None else f"{self.cpu_percent:.1f}%" + temp = "-" if self.max_temp_c is None else f"{self.max_temp_c:.1f}C" + return f"cpu={cpu}, max_temp={temp}, temp_sources={self.temp_sources}" + + +VARIANTS = [ + Variant("kitesql-lmdb", "kitesql-lmdb", "kitesql-lmdb"), + Variant("kitesql-rocksdb", "kitesql-rocksdb", "kitesql-rocksdb"), + Variant("sqlite-balanced", "sqlite", "sqlite-balanced.sqlite", "balanced"), + Variant("sqlite-practical", "sqlite", "sqlite-practical.sqlite", "practical"), +] + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description=( + "Run kitesql-lmdb, kitesql-rocksdb, sqlite-balanced, and " + "sqlite-practical TPCC benchmarks with cooldown gates between runs." + ) + ) + parser.add_argument("--binary", type=Path, default=DEFAULT_BINARY) + parser.add_argument("--result-dir", type=Path) + parser.add_argument("--tmp-dir", type=Path, default=DEFAULT_TMP_DIR) + parser.add_argument("--stamp", default=dt.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")) + parser.add_argument("--num-ware", type=int, default=int(os.getenv("TPCC_NUM_WARE", "1"))) + parser.add_argument("--measure-time", type=int, default=None) + parser.add_argument("--tpcc-max-retry", type=int, default=5) + parser.add_argument("--duplicate-retry", type=int, default=1) + parser.add_argument("--cool-temp-c", type=float, default=65.0) + parser.add_argument("--idle-cpu-percent", type=float, default=20.0) + parser.add_argument("--min-cooldown-sec", type=int, default=300) + parser.add_argument("--stable-samples", type=int, default=3) + parser.add_argument("--sample-interval-sec", type=float, default=10.0) + parser.add_argument("--max-wait-sec", type=int, default=7200) + parser.add_argument("--skip-health-check", action="store_true") + parser.add_argument( + "--build", + action="store_true", + help="Run `cargo build -p tpcc --release` before the benchmark matrix.", + ) + return parser.parse_args() + + +def read_proc_stat() -> tuple[int, int] | None: + try: + line = Path("/proc/stat").read_text(encoding="utf-8").splitlines()[0] + except (OSError, IndexError): + return None + parts = line.split() + if not parts or parts[0] != "cpu": + return None + values = [int(v) for v in parts[1:]] + idle = values[3] + (values[4] if len(values) > 4 else 0) + total = sum(values) + return idle, total + + +def sample_cpu_percent(interval_sec: float) -> float | None: + first = read_proc_stat() + if first is None: + return None + time.sleep(interval_sec) + second = read_proc_stat() + if second is None: + return None + idle_delta = second[0] - first[0] + total_delta = second[1] - first[1] + if total_delta <= 0: + return None + busy_delta = max(0, total_delta - idle_delta) + return 100.0 * busy_delta / total_delta + + +def is_likely_cpu_sensor(label: str, name: str) -> bool: + text = f"{label} {name}".lower() + cpu_tokens = ( + "cpu", + "core", + "package", + "pkg", + "k10temp", + "zenpower", + "coretemp", + "x86_pkg_temp", + "tctl", + "tdie", + "ccd", + ) + non_cpu_tokens = ("nvme", "ssd", "pch", "acpitz", "wifi", "iwlwifi", "battery", "bat") + if any(token in text for token in cpu_tokens): + return True + if any(token in text for token in non_cpu_tokens): + return False + return False + + +def read_temp_file(path: Path) -> float | None: + try: + raw = path.read_text(encoding="utf-8").strip() + value = float(raw) + except (OSError, ValueError): + return None + if value > 1000.0: + value /= 1000.0 + if -20.0 <= value <= 130.0: + return value + return None + + +def read_temperature_samples() -> list[float]: + samples: list[float] = [] + + for hwmon in Path("/sys/class/hwmon").glob("hwmon*"): + name = "" + try: + name = (hwmon / "name").read_text(encoding="utf-8").strip() + except OSError: + pass + for temp_input in hwmon.glob("temp*_input"): + label_path = temp_input.with_name(temp_input.name.replace("_input", "_label")) + try: + label = label_path.read_text(encoding="utf-8").strip() + except OSError: + label = "" + if not is_likely_cpu_sensor(label, name): + continue + value = read_temp_file(temp_input) + if value is not None: + samples.append(value) + + for zone in Path("/sys/class/thermal").glob("thermal_zone*"): + try: + zone_type = (zone / "type").read_text(encoding="utf-8").strip() + except OSError: + zone_type = "" + if not is_likely_cpu_sensor(zone_type, ""): + continue + value = read_temp_file(zone / "temp") + if value is not None: + samples.append(value) + + return samples + + +def sample_health(args: argparse.Namespace) -> Health: + cpu_percent = sample_cpu_percent(args.sample_interval_sec) + temps = read_temperature_samples() + return Health( + cpu_percent=cpu_percent, + max_temp_c=max(temps) if temps else None, + temp_sources=len(temps), + ) + + +def append_line(path: Path, line: str = "") -> None: + with path.open("a", encoding="utf-8") as file: + file.write(line) + file.write("\n") + + +def wait_for_ready( + args: argparse.Namespace, + log_file: Path, + variant_name: str, + previous_finished_at: float | None, +) -> Health: + if args.skip_health_check: + health = Health(cpu_percent=None, max_temp_c=None, temp_sources=0) + append_line(log_file, f"[runner] health checks skipped for {variant_name}") + return health + + append_line(log_file, f"[runner] waiting for stable machine state before {variant_name}") + append_line( + log_file, + "[runner] gates: " + f"temp<={args.cool_temp_c:.1f}C, " + f"cpu<={args.idle_cpu_percent:.1f}%, " + f"min_cooldown={args.min_cooldown_sec}s, " + f"stable_samples={args.stable_samples}", + ) + + start = time.monotonic() + stable = 0 + last_health = Health(cpu_percent=None, max_temp_c=None, temp_sources=0) + + while True: + elapsed = time.monotonic() - start + if elapsed > args.max_wait_sec: + raise TimeoutError( + f"waited {args.max_wait_sec}s for {variant_name}, last state: {last_health.format()}" + ) + + cooldown_left = 0.0 + if previous_finished_at is not None: + cooldown_left = max( + 0.0, args.min_cooldown_sec - (time.monotonic() - previous_finished_at) + ) + + health = sample_health(args) + last_health = health + temp_ok = health.max_temp_c is None or health.max_temp_c <= args.cool_temp_c + cpu_ok = health.cpu_percent is None or health.cpu_percent <= args.idle_cpu_percent + cooldown_ok = cooldown_left <= 0.0 + + reasons = [] + if not cooldown_ok: + reasons.append(f"cooldown_left={cooldown_left:.0f}s") + if not temp_ok: + reasons.append(f"temp={health.max_temp_c:.1f}C") + if not cpu_ok: + reasons.append(f"cpu={health.cpu_percent:.1f}%") + + if cooldown_ok and temp_ok and cpu_ok: + stable += 1 + append_line( + log_file, + f"[runner] ready sample {stable}/{args.stable_samples}: {health.format()}", + ) + if stable >= args.stable_samples: + append_line(log_file, f"[runner] machine accepted for {variant_name}") + return health + else: + stable = 0 + append_line( + log_file, + f"[runner] waiting: {health.format()}, reason={', '.join(reasons)}", + ) + + +def remove_path(path: Path) -> None: + if path.is_dir(): + shutil.rmtree(path) + elif path.exists(): + path.unlink() + + +def quote_cmd(cmd: list[str]) -> str: + return " ".join(shlex.quote(part) for part in cmd) + + +def build_command(args: argparse.Namespace, variant: Variant, db_path: Path) -> list[str]: + cmd = [ + str(args.binary), + "--backend", + variant.backend, + "--num-ware", + str(args.num_ware), + "--max-retry", + str(args.tpcc_max_retry), + "--path", + str(db_path), + ] + if args.measure_time is not None: + cmd.extend(["--measure-time", str(args.measure_time)]) + if variant.sqlite_profile is not None: + cmd.extend(["--sqlite-profile", variant.sqlite_profile]) + return cmd + + +def duplicate_key_failure(log_text: str) -> bool: + needles = ( + "UNIQUE constraint failed", + "duplicate key", + "primary key", + "Duplicate", + ) + return any(needle in log_text for needle in needles) + + +def extract_tpmc(log_text: str) -> str: + match = re.search(r"\s*\n\s*([0-9.]+)", log_text) + return match.group(1) if match else "-" + + +def extract_p90(log_text: str, label: str) -> str: + marker = "<90th Percentile RT (MaxRT)>" + if marker not in log_text: + return "-" + block = log_text.split(marker, 1)[1] + for line in block.splitlines(): + if label not in line: + continue + parts = line.split() + return parts[2] if len(parts) >= 3 else "-" + return "-" + + +def summarize_log(log_file: Path, status: str, notes: str) -> dict[str, str]: + log_text = log_file.read_text(encoding="utf-8", errors="replace") + if status != "ok": + return { + "tpmc": "-", + "new_order": "-", + "payment": "-", + "order_status": "-", + "delivery": "-", + "stock_level": "-", + "notes": notes, + } + return { + "tpmc": extract_tpmc(log_text), + "new_order": extract_p90(log_text, "New-Order"), + "payment": extract_p90(log_text, "Payment"), + "order_status": extract_p90(log_text, "Order-Status"), + "delivery": extract_p90(log_text, "Delivery"), + "stock_level": extract_p90(log_text, "Stock-Level"), + "notes": notes, + } + + +def run_variant( + args: argparse.Namespace, + variant: Variant, + log_dir: Path, + summary_file: Path, + previous_finished_at: float | None, +) -> float: + log_file = log_dir / f"{variant.name}.log" + db_path = args.tmp_dir / variant.db_name + log_file.write_text("", encoding="utf-8") + + start_health = wait_for_ready(args, log_file, variant.name, previous_finished_at) + status = "ok" + notes = f"start {start_health.format()}" + attempts = 0 + max_attempts = args.duplicate_retry + 1 + cmd = build_command(args, variant, db_path) + + while attempts < max_attempts: + attempts += 1 + remove_path(db_path) + append_line(log_file) + append_line(log_file, f"## Attempt {attempts}") + append_line(log_file, f"$ {quote_cmd(cmd)}") + append_line(log_file) + + with log_file.open("a", encoding="utf-8", errors="replace") as file: + proc = subprocess.run(cmd, cwd=ROOT_DIR, stdout=file, stderr=subprocess.STDOUT) + + if proc.returncode == 0: + break + + log_text = log_file.read_text(encoding="utf-8", errors="replace") + if attempts < max_attempts and duplicate_key_failure(log_text): + notes = "retry after duplicate-key failure" + append_line( + log_file, + f"[runner] duplicate-key style failure detected, retrying {variant.name} from scratch", + ) + continue + + status = "failed" + tail = " ".join(log_text.splitlines()[-5:]).strip() + notes = tail or f"exit code {proc.returncode}" + break + + finished_at = time.monotonic() + end_health = sample_health(args) if not args.skip_health_check else Health(None, None, 0) + append_line(log_file, f"[runner] finished state: {end_health.format()}") + + result = summarize_log(log_file, status, notes) + append_line( + summary_file, + "| {name} | {status} | {attempts} | {measure} | {tpmc} | {new_order} | " + "{payment} | {order_status} | {delivery} | {stock_level} | {notes} | " + "[{name}](./logs/{name}.log) |".format( + name=variant.name, + status=status, + attempts=attempts, + measure=f"{args.measure_time}s" if args.measure_time is not None else "tpcc default (720s)", + tpmc=result["tpmc"], + new_order=result["new_order"], + payment=result["payment"], + order_status=result["order_status"], + delivery=result["delivery"], + stock_level=result["stock_level"], + notes=result["notes"].replace("|", "\\|"), + ), + ) + + remove_path(db_path) + print(log_file.read_text(encoding="utf-8", errors="replace"), end="") + return finished_at + + +def write_summary_header(args: argparse.Namespace, summary_file: Path) -> None: + measure = f"{args.measure_time}s" if args.measure_time is not None else "tpcc default (720s)" + summary_file.write_text( + "\n".join( + [ + "# TPCC Stable Run Summary", + "", + f"- Timestamp: {args.stamp}", + f"- Warehouses: {args.num_ware}", + f"- TPCC max retry: {args.tpcc_max_retry}", + f"- Duplicate-key retries per variant: {args.duplicate_retry}", + f"- Measure time: {measure}", + f"- Binary: `{args.binary}`", + ( + "- Machine gates: " + f"temp<={args.cool_temp_c:.1f}C, " + f"cpu<={args.idle_cpu_percent:.1f}%, " + f"min_cooldown={args.min_cooldown_sec}s, " + f"stable_samples={args.stable_samples}, " + f"sample_interval={args.sample_interval_sec}s" + ), + "", + "| Variant | Status | Attempts | Measure Time | TpmC | New-Order p90 | Payment p90 | Order-Status p90 | Delivery p90 | Stock-Level p90 | Notes | Raw Log |", + "| --- | --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | --- | --- |", + ] + ) + + "\n", + encoding="utf-8", + ) + + +def main() -> int: + args = parse_args() + args.binary = args.binary.resolve() + args.tmp_dir = args.tmp_dir.resolve() + result_dir = ( + args.result_dir.resolve() + if args.result_dir is not None + else ROOT_DIR / "tpcc" / "results" / args.stamp + ) + log_dir = result_dir / "logs" + summary_file = result_dir / "summary.md" + log_dir.mkdir(parents=True, exist_ok=True) + args.tmp_dir.mkdir(parents=True, exist_ok=True) + + if args.build: + subprocess.run(["cargo", "build", "-p", "tpcc", "--release"], cwd=ROOT_DIR, check=True) + if not args.binary.exists() or not os.access(args.binary, os.X_OK): + print( + f"missing executable binary: {args.binary}\n" + "build it first with: cargo build -p tpcc --release", + file=sys.stderr, + ) + return 1 + + write_summary_header(args, summary_file) + + previous_finished_at: float | None = None + try: + for variant in VARIANTS: + previous_finished_at = run_variant( + args, variant, log_dir, summary_file, previous_finished_at + ) + finally: + try: + args.tmp_dir.rmdir() + except OSError: + pass + + print(f"\nGenerated summary: {summary_file}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/bin/shell.rs b/src/bin/shell.rs index 23bb7dae..1769421a 100644 --- a/src/bin/shell.rs +++ b/src/bin/shell.rs @@ -15,7 +15,7 @@ #[cfg(not(target_arch = "wasm32"))] mod native { use comfy_table::{Cell, Table}; - use kite_sql::db::{BorrowResultIter, DBTransaction, DataBaseBuilder, Database}; + use kite_sql::db::{DBTransaction, DataBaseBuilder, Database, ResultIter}; use kite_sql::errors::DatabaseError; use kite_sql::storage::rocksdb::RocksStorage; use rustyline::config::Configurer; @@ -172,7 +172,7 @@ Transaction commands: fn print_table(mut iter: I) -> Result<(), DatabaseError> where - I: BorrowResultIter, + I: ResultIter, { let (table, schema_len, row_count) = create_table(&mut iter)?; iter.done()?; @@ -192,7 +192,7 @@ Transaction commands: fn create_table(iter: &mut I) -> Result<(Table, usize, usize), DatabaseError> where - I: BorrowResultIter, + I: ResultIter, { let mut table = Table::new(); let (header, schema_len) = iter.schema(|schema| { @@ -209,13 +209,14 @@ Transaction commands: } let mut row_count = 0usize; - while let Some(tuple) = iter.next_borrowed_tuple()? { - row_count += 1; - let row = tuple + while let Some(row) = iter.next_tuple(|_, tuple| { + tuple .values .iter() .map(|value| Cell::new(format!("{value}"))) - .collect::>(); + .collect::>() + })? { + row_count += 1; table.add_row(row); } diff --git a/src/db.rs b/src/db.rs index f1303294..07cb3e8b 100644 --- a/src/db.rs +++ b/src/db.rs @@ -926,23 +926,20 @@ where } } -/// Borrowing interface for result iterators returned by database execution APIs. -pub trait BorrowResultIter { +/// Common interface for result iterators returned by database execution APIs. +pub trait ResultIter { /// Borrows the output schema for the current result set. fn schema(&self, f: impl FnOnce(&SchemaView<'_, '_>) -> R) -> R; - /// Returns the next row as a borrowed tuple. - fn next_borrowed_tuple(&mut self) -> Result, DatabaseError>; + /// Advances to the next row and runs `f` while the executor row slot is borrowed. + fn next_tuple( + &mut self, + f: impl FnOnce(&SchemaView<'_, '_>, &mut Tuple) -> R, + ) -> Result, DatabaseError>; /// Finishes consuming the iterator and flushes any remaining work. fn done(self) -> Result<(), DatabaseError>; -} -/// Common interface for owned-tuple result iterators. -/// -/// This remains for compatibility with existing callers that expect -/// `Iterator>`. -pub trait ResultIter: BorrowResultIter + Iterator> { #[cfg(feature = "orm")] /// Converts this iterator into a typed ORM iterator. /// @@ -958,8 +955,6 @@ pub trait ResultIter: BorrowResultIter + Iterator ResultIter for I where I: BorrowResultIter + Iterator> {} - #[cfg(feature = "orm")] /// Typed adapter over a [`ResultIter`] that yields ORM models instead of raw tuples. pub struct OrmIter { @@ -1000,11 +995,10 @@ where type Item = Result; fn next(&mut self) -> Option { - let tuple = match self.inner.next()? { - Ok(tuple) => tuple, - Err(err) => return Some(Err(err)), - }; - Some(self.inner.schema(|schema| T::from_query_row(schema, tuple))) + self.inner + .next_tuple(|schema, tuple| T::from_query_row(schema, tuple)) + .transpose() + .map(|row| row.and_then(std::convert::identity)) } } @@ -1032,8 +1026,11 @@ impl DatabaseIter<'_, S> { } #[inline] - pub fn next_borrowed_tuple(&mut self) -> Result, DatabaseError> { - unsafe { (*self.inner).next_borrowed_tuple() } + pub fn next_tuple( + &mut self, + f: impl FnOnce(&SchemaView<'_, '_>, &mut Tuple) -> R, + ) -> Result, DatabaseError> { + unsafe { (*self.inner).next_tuple(f) } } #[inline] @@ -1048,21 +1045,16 @@ impl DatabaseIter<'_, S> { } } -impl Iterator for DatabaseIter<'_, S> { - type Item = Result; - - fn next(&mut self) -> Option { - unsafe { (*self.inner).next() } - } -} - -impl BorrowResultIter for DatabaseIter<'_, S> { +impl ResultIter for DatabaseIter<'_, S> { fn schema(&self, f: impl FnOnce(&SchemaView<'_, '_>) -> R) -> R { DatabaseIter::schema(self, f) } - fn next_borrowed_tuple(&mut self) -> Result, DatabaseError> { - DatabaseIter::next_borrowed_tuple(self) + fn next_tuple( + &mut self, + f: impl FnOnce(&SchemaView<'_, '_>, &mut Tuple) -> R, + ) -> Result, DatabaseError> { + DatabaseIter::next_tuple(self, f) } fn done(self) -> Result<(), DatabaseError> { @@ -1175,7 +1167,10 @@ impl<'a, T: Transaction + 'a> TransactionIter<'a, T> { } #[inline] - pub fn next_borrowed_tuple(&mut self) -> Result, DatabaseError> { + pub fn next_tuple( + &mut self, + f: impl FnOnce(&SchemaView<'_, '_>, &mut Tuple) -> R, + ) -> Result, DatabaseError> { let Some(executor) = self.executor.as_mut() else { return Ok(None); }; @@ -1185,7 +1180,10 @@ impl<'a, T: Transaction + 'a> TransactionIter<'a, T> { .as_mut() .expect("result iterator plan arena is unavailable after statement completion"); match unsafe { (*executor_ptr).next_tuple(plan_arena) } { - Ok(Some(tuple)) => Ok(Some(tuple)), + Ok(Some(tuple)) => { + let schema = SchemaView::new(&self.schema, plan_arena); + Ok(Some(f(&schema, tuple))) + } Ok(None) => { self.finish_statement_scope()?; Ok(None) @@ -1199,12 +1197,12 @@ impl<'a, T: Transaction + 'a> TransactionIter<'a, T> { #[inline] pub fn done(mut self) -> Result<(), DatabaseError> { - while self.next_borrowed_tuple()?.is_some() {} + while self.next_tuple(|_, _| ())?.is_some() {} Ok(()) } fn done_with_ddl_apply(mut self) -> Result<(PlanArena<'a>, Vec), DatabaseError> { - while self.next_borrowed_tuple()?.is_some() {} + while self.next_tuple(|_, _| ())?.is_some() {} Ok(( self.plan_arena .take() @@ -1220,39 +1218,16 @@ impl Drop for TransactionIter<'_, T> { } } -impl Iterator for TransactionIter<'_, T> { - type Item = Result; - - fn next(&mut self) -> Option { - let result = { - let executor = self.executor.as_mut()?; - let plan_arena = self - .plan_arena - .as_mut() - .expect("result iterator plan arena is unavailable after statement completion"); - executor.next_tuple(plan_arena) - }; - match result { - Ok(Some(tuple)) => Some(Ok(tuple.clone())), - Ok(None) => match self.finish_statement_scope() { - Ok(()) => None, - Err(err) => Some(Err(err)), - }, - Err(err) => match self.finish_statement_scope() { - Ok(()) => Some(Err(err)), - Err(scope_err) => Some(Err(scope_err)), - }, - } - } -} - -impl BorrowResultIter for TransactionIter<'_, T> { +impl ResultIter for TransactionIter<'_, T> { fn schema(&self, f: impl FnOnce(&SchemaView<'_, '_>) -> R) -> R { TransactionIter::schema(self, f) } - fn next_borrowed_tuple(&mut self) -> Result, DatabaseError> { - TransactionIter::next_borrowed_tuple(self) + fn next_tuple( + &mut self, + f: impl FnOnce(&SchemaView<'_, '_>, &mut Tuple) -> R, + ) -> Result, DatabaseError> { + TransactionIter::next_tuple(self, f) } fn done(self) -> Result<(), DatabaseError> { @@ -1266,7 +1241,7 @@ pub(crate) mod test { use crate::catalog::{ColumnCatalog, ColumnDesc}; #[cfg(feature = "unsafe_txdb_checkpoint")] use crate::db::CatalogKind; - use crate::db::{BorrowResultIter, DataBaseBuilder, DatabaseError}; + use crate::db::{DataBaseBuilder, DatabaseError, ResultIter}; use crate::expression::ScalarExpression; use crate::planner::operator::join::JoinCondition; use crate::planner::operator::Operator; @@ -1338,15 +1313,30 @@ pub(crate) mod test { Ok(()) } + fn tuple_owned(tuple: &Tuple) -> Tuple { + Tuple { + pk: tuple.pk.clone(), + values: tuple.values.clone(), + } + } + + fn next_tuple_owned(iter: &mut I) -> Result, DatabaseError> { + iter.next_tuple(|_, tuple| tuple_owned(tuple)) + } + + fn next_values(iter: &mut I) -> Result>, DatabaseError> { + iter.next_tuple(|_, tuple| tuple.values.clone()) + } + fn read_single_i32(mut iter: I) -> Result where - I: BorrowResultIter + Iterator>, + I: ResultIter, { - let value = match iter.next().transpose()?.map(|tuple| tuple.values) { - Some(values) => match values.as_slice() { - [DataValue::Int32(value)] => *value, - other => panic!("expected a single Int32 column, got {other:?}"), - }, + let value = match iter.next_tuple(|_, tuple| match tuple.values.as_slice() { + [DataValue::Int32(value)] => *value, + other => panic!("expected a single Int32 column, got {other:?}"), + })? { + Some(value) => value, None => panic!("expected one result row"), }; iter.done()?; @@ -1359,11 +1349,11 @@ pub(crate) mod test { sql: &str, ) -> Result { let mut iter = database.run(sql)?; - let value = match iter.next().transpose()?.map(|tuple| tuple.values) { - Some(values) => match values.as_slice() { - [DataValue::Int32(value)] => *value, - other => panic!("expected a single Int32 column, got {other:?}"), - }, + let value = match iter.next_tuple(|_, tuple| match tuple.values.as_slice() { + [DataValue::Int32(value)] => *value, + other => panic!("expected a single Int32 column, got {other:?}"), + })? { + Some(value) => value, None => panic!("expected one result row for query: {sql}"), }; iter.done()?; @@ -1376,9 +1366,15 @@ pub(crate) mod test { let mut database = DataBaseBuilder::path(temp_dir.path()).build_rocksdb()?; database.ddl("create table t1(c1 int primary key, c2 boolean, c3 int)")?; - for result in database.run("select * from t1")? { - println!("{:#?}", result?); - } + let mut iter = database.run("select * from t1")?; + while iter + .next_tuple(|_, tuple| { + println!("{tuple:#?}"); + () + })? + .is_some() + {} + iter.done()?; Ok(()) } @@ -1396,13 +1392,13 @@ pub(crate) mod test { assert_eq!(column.datatype(), &LogicalType::Date); }); assert_eq!( - iter.next().unwrap()?, + next_tuple_owned(&mut iter)?.unwrap(), Tuple::new( None, vec![DataValue::Date32(Local::now().num_days_from_ce())] ) ); - assert!(iter.next().is_none()); + assert!(iter.next_tuple(|_, _| ())?.is_none()); Ok(()) } @@ -1423,11 +1419,11 @@ pub(crate) mod test { assert_eq!(column.datatype(), &LogicalType::Integer); }); assert_eq!( - iter.next().unwrap()?, + next_tuple_owned(&mut iter)?.unwrap(), Tuple::new(None, vec![DataValue::Int32(3)]) ); assert_eq!( - iter.next().unwrap()?, + next_tuple_owned(&mut iter)?.unwrap(), Tuple::new(None, vec![DataValue::Int32(4)]) ); Ok(()) @@ -1715,7 +1711,7 @@ pub(crate) mod test { let mut iter = kite_sql.execute(statement, &[("$1", DataValue::Int32(0))])?; - let row = iter.next().unwrap()?; + let row = next_tuple_owned(&mut iter)?.unwrap(); let plan = row.values[0].utf8().unwrap(); assert!(plan.contains("Projection")); assert!(plan.contains("Filter (")); @@ -1737,7 +1733,7 @@ pub(crate) mod test { ("$4", DataValue::Int32(0)), ], )?; - let row = iter.next().unwrap()?; + let row = next_tuple_owned(&mut iter)?.unwrap(); let plan = row.values[0].utf8().unwrap(); assert!(plan.contains("Projection")); assert!(plan.contains("Aggregate")); @@ -1757,7 +1753,7 @@ pub(crate) mod test { ("$4", DataValue::Int32(0)), ], )?; - let row = iter.next().unwrap()?; + let row = next_tuple_owned(&mut iter)?.unwrap(); let plan = row.values[0].utf8().unwrap(); assert!(plan.contains("Projection")); assert!(plan.contains("LeftOuter Join")); @@ -1812,22 +1808,19 @@ pub(crate) mod test { let collect_plan = |sql: &str| -> Result { let mut iter = kite_sql.run(sql)?; - let rows = iter.by_ref().collect::, _>>()?; + let mut lines = Vec::new(); + while let Some(row) = next_tuple_owned(&mut iter)? { + if let Some(DataValue::Utf8 { value, .. }) = row.values.first() { + lines.push(value.clone()); + } + } iter.done()?; - Ok(rows - .iter() - .filter_map(|row| match row.values.first() { - Some(DataValue::Utf8 { value, .. }) => Some(value.as_str()), - _ => None, - }) - .collect::>() - .join("\n")) + Ok(lines.join("\n")) }; let collect_ids = |sql: &str| -> Result, DatabaseError> { let mut iter = kite_sql.run(sql)?; let mut ids = Vec::new(); - for row in iter.by_ref() { - let row = row?; + while let Some(row) = next_tuple_owned(&mut iter)? { ids.push(row.values[0].i32().unwrap()); } iter.done()?; @@ -1944,22 +1937,19 @@ pub(crate) mod test { let collect_plan = |sql: &str| -> Result { let mut iter = kite_sql.run(sql)?; - let rows = iter.by_ref().collect::, _>>()?; + let mut lines = Vec::new(); + while let Some(row) = next_tuple_owned(&mut iter)? { + if let Some(DataValue::Utf8 { value, .. }) = row.values.first() { + lines.push(value.clone()); + } + } iter.done()?; - Ok(rows - .iter() - .filter_map(|row| match row.values.first() { - Some(DataValue::Utf8 { value, .. }) => Some(value.as_str()), - _ => None, - }) - .collect::>() - .join("\n")) + Ok(lines.join("\n")) }; let collect_ids = |sql: &str| -> Result, DatabaseError> { let mut iter = kite_sql.run(sql)?; let mut ids = Vec::new(); - for row in iter.by_ref() { - let row = row?; + while let Some(row) = next_tuple_owned(&mut iter)? { ids.push(row.values[0].i32().unwrap()); } iter.done()?; @@ -2012,14 +2002,14 @@ pub(crate) mod test { "insert into t_multi values(0, 0); insert into t_multi values(1, 1); select * from t_multi order by a", )?; assert_eq!( - iter.next().unwrap()?.values, + next_values(&mut iter)?.unwrap(), vec![DataValue::Int32(0), DataValue::Int32(0)] ); assert_eq!( - iter.next().unwrap()?.values, + next_values(&mut iter)?.unwrap(), vec![DataValue::Int32(1), DataValue::Int32(1)] ); - assert!(iter.next().is_none()); + assert!(iter.next_tuple(|_, _| ())?.is_none()); iter.done()?; Ok(()) @@ -2125,16 +2115,16 @@ pub(crate) mod test { let mut iter_2 = tx_2.run("select * from t1")?; assert_eq!( - iter_1.next().unwrap()?.values, + next_values(&mut iter_1)?.unwrap(), vec![DataValue::Int32(0), DataValue::Int32(0)] ); assert_eq!( - iter_1.next().unwrap()?.values, + next_values(&mut iter_1)?.unwrap(), vec![DataValue::Int32(1), DataValue::Int32(1)] ); assert_eq!( - iter_2.next().unwrap()?.values, + next_values(&mut iter_2)?.unwrap(), vec![DataValue::Int32(3), DataValue::Int32(3)] ); drop(iter_1); @@ -2162,20 +2152,20 @@ pub(crate) mod test { "insert into t_multi_tx values(0, 0); insert into t_multi_tx values(1, 1); select * from t_multi_tx order by a", )?; assert_eq!( - iter.next().unwrap()?.values, + next_values(&mut iter)?.unwrap(), vec![DataValue::Int32(0), DataValue::Int32(0)] ); assert_eq!( - iter.next().unwrap()?.values, + next_values(&mut iter)?.unwrap(), vec![DataValue::Int32(1), DataValue::Int32(1)] ); - assert!(iter.next().is_none()); + assert!(iter.next_tuple(|_, _| ())?.is_none()); iter.done()?; tx.commit()?; let mut check_iter = kite_sql.run("select count(*) from t_multi_tx")?; assert_eq!( - check_iter.next().unwrap()?.values, + next_values(&mut check_iter)?.unwrap(), vec![DataValue::Int32(2)] ); check_iter.done()?; @@ -2194,7 +2184,8 @@ pub(crate) mod test { tx.run("insert into t_iter_drop values (0, 0), (1, 1)")? .done()?; - assert!(kite_sql.run("select * from t_iter_drop")?.next().is_none()); + let mut iter = kite_sql.run("select * from t_iter_drop")?; + assert!(iter.next_tuple(|_, _| ())?.is_none()); tx.commit()?; @@ -2213,14 +2204,14 @@ pub(crate) mod test { let mut iter = kite_sql.run("select * from t_iter_guard order by a")?; assert_eq!( - iter.next().unwrap()?.values, + next_values(&mut iter)?.unwrap(), vec![DataValue::Int32(0), DataValue::Int32(0)] ); assert_eq!( - iter.next().unwrap()?.values, + next_values(&mut iter)?.unwrap(), vec![DataValue::Int32(1), DataValue::Int32(1)] ); - assert!(iter.next().is_none()); + assert!(iter.next_tuple(|_, _| ())?.is_none()); iter.done()?; kite_sql.ddl("drop table t_iter_guard")?; @@ -2321,20 +2312,20 @@ pub(crate) mod test { let mut iter_2 = tx_2.run("select * from t1")?; assert_eq!( - iter_1.next().unwrap()?.values, + next_values(&mut iter_1)?.unwrap(), vec![DataValue::Int32(0), DataValue::Int32(0)] ); assert_eq!( - iter_1.next().unwrap()?.values, + next_values(&mut iter_1)?.unwrap(), vec![DataValue::Int32(1), DataValue::Int32(1)] ); assert_eq!( - iter_2.next().unwrap()?.values, + next_values(&mut iter_2)?.unwrap(), vec![DataValue::Int32(0), DataValue::Int32(0)] ); assert_eq!( - iter_2.next().unwrap()?.values, + next_values(&mut iter_2)?.unwrap(), vec![DataValue::Int32(3), DataValue::Int32(3)] ); drop(iter_1); diff --git a/src/execution/ddl/mod.rs b/src/execution/ddl/mod.rs index e0141708..ca1e68d3 100644 --- a/src/execution/ddl/mod.rs +++ b/src/execution/ddl/mod.rs @@ -191,7 +191,7 @@ where transaction.append_tuple( table_codec, table_name.as_ref(), - tuple.clone(), + tuple, new_serializers(), true, )?; diff --git a/src/execution/dml/copy_from_file.rs b/src/execution/dml/copy_from_file.rs index 6216772e..6e1ca075 100644 --- a/src/execution/dml/copy_from_file.rs +++ b/src/execution/dml/copy_from_file.rs @@ -107,7 +107,7 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for CopyFromFile { let chunk = tuple_builder.build_with_row(record.iter())?; let mut state = arena.local_state(plan_arena); let (transaction, table_codec) = state.transaction_codec_mut(); - transaction.append_tuple(table_codec, &table_name, chunk, &serializers, false)?; + transaction.append_tuple(table_codec, &table_name, &chunk, &serializers, false)?; size += 1; } @@ -193,7 +193,9 @@ mod tests { &transaction, ); - let result = executor.next().expect("copy from file should yield once")?; + let result = executor + .next_tuple()? + .expect("copy from file should yield once"); assert_eq!(result.values[0].to_string(), "2"); Ok(()) diff --git a/src/execution/dml/copy_to_file.rs b/src/execution/dml/copy_to_file.rs index d7a64be7..af946131 100644 --- a/src/execution/dml/copy_to_file.rs +++ b/src/execution/dml/copy_to_file.rs @@ -190,7 +190,7 @@ mod tests { &transaction, ); - let tuple = executor.next().expect("executor should yield once")?; + let tuple = executor.next_tuple()?.expect("executor should yield once"); let mut rdr = csv::Reader::from_path(file_path)?; let headers = rdr.headers()?.clone(); diff --git a/src/execution/dml/delete.rs b/src/execution/dml/delete.rs index 7dd33e99..10005b67 100644 --- a/src/execution/dml/delete.rs +++ b/src/execution/dml/delete.rs @@ -93,26 +93,27 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Delete { let mut deleted_count = 0; while arena.next_tuple(input, plan_arena)? { - let tuple = arena.result_tuple().clone(); - if let Some(tuple_id) = &tuple.pk { - for (index_id, index_ty, exprs) in index_templates.iter() { - with_projection_tmp_value(arena, Some(&tuple), exprs, |arena, value| { - let mut state = arena.local_state(plan_arena); - let (transaction, table_codec) = state.transaction_codec_mut(); - transaction.del_index( - table_codec, - &self.table_name, - &Index::new(*index_id, &value, *index_ty), - tuple_id, - ) - })?; - } + let Some(tuple_id) = arena.result_tuple().pk.clone() else { + continue; + }; - let mut state = arena.local_state(plan_arena); - let (transaction, table_codec) = state.transaction_codec_mut(); - transaction.remove_tuple(table_codec, &self.table_name, tuple_id)?; - deleted_count += 1; + for (index_id, index_ty, exprs) in index_templates.iter() { + with_projection_tmp_value(arena, None, exprs, |arena, value| { + let mut state = arena.local_state(plan_arena); + let (transaction, table_codec) = state.transaction_codec_mut(); + transaction.del_index( + table_codec, + &self.table_name, + &Index::new(*index_id, &value, *index_ty), + &tuple_id, + ) + })?; } + + let mut state = arena.local_state(plan_arena); + let (transaction, table_codec) = state.transaction_codec_mut(); + transaction.remove_tuple(table_codec, &self.table_name, &tuple_id)?; + deleted_count += 1; } TupleBuilder::build_result_into(arena.result_tuple_mut(), deleted_count.to_string()); diff --git a/src/execution/dml/insert.rs b/src/execution/dml/insert.rs index 71867b45..ec90c883 100644 --- a/src/execution/dml/insert.rs +++ b/src/execution/dml/insert.rs @@ -131,18 +131,17 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Insert { .iter() .map(|column| plan_arena.column(*column).datatype().serializable()) .collect_vec(); + let mut tuple = Tuple::new(None, Vec::with_capacity(table_snapshot.columns_len)); let mut inserted_count = 0; while arena.next_tuple(input, plan_arena)? { - let values = arena.result_tuple().values.clone(); - - let mut tuple_map = HashMap::new(); - for (i, value) in values.into_iter().enumerate() { + let mut tuple_map = HashMap::with_capacity(self.input_schema.len()); + for (i, value) in arena.result_tuple_mut().values.drain(..).enumerate() { let column = plan_arena.column(self.input_schema[i]); tuple_map.insert(Self::column_key(column, self.is_mapping_by_name), value); } - let mut values = Vec::with_capacity(table_snapshot.columns_len); + tuple.values.clear(); for column in table_snapshot.columns.iter() { let column = plan_arena.column(*column); let mut value = { @@ -159,10 +158,12 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Insert { if value.is_null() && !column.nullable() { return Err(DatabaseError::not_null_column(column.name().to_string())); } - values.push(value) + tuple.values.push(value) } - let pk = Tuple::primary_projection(table_snapshot.primary_key_indices, &values); - let tuple = Tuple::new(Some(pk), values); + tuple.pk = Some(Tuple::primary_projection( + table_snapshot.primary_key_indices, + &tuple.values, + )); for (index_meta, exprs) in table_snapshot.index_metas.iter() { let index_meta = plan_arena.index(*index_meta); @@ -179,7 +180,7 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Insert { transaction.append_tuple( table_codec, &self.table_name, - tuple, + &tuple, &serializers, self.is_overwrite, )?; diff --git a/src/execution/dml/update.rs b/src/execution/dml/update.rs index b0f7bfa5..255f9fd5 100644 --- a/src/execution/dml/update.rs +++ b/src/execution/dml/update.rs @@ -26,7 +26,7 @@ use crate::storage::Transaction; use crate::types::index::Index; use crate::types::tuple::{Schema, Tuple}; use crate::types::tuple_builder::TupleBuilder; -use std::collections::HashMap; +use std::{collections::HashMap, mem}; pub struct Update { table_name: TableName, @@ -113,15 +113,14 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Update { let mut updated_count = 0; while arena.next_tuple(input, plan_arena)? { - let mut tuple = arena.result_tuple().clone(); let mut is_overwrite = true; - let Some(old_pk) = tuple.pk.clone() else { + let Some(old_pk) = arena.result_tuple().pk.clone() else { continue; }; for (index_meta, exprs) in table_snapshot.index_metas.iter() { let index_meta = plan_arena.index(*index_meta); - with_projection_tmp_value(arena, Some(&tuple), exprs, |arena, value| { + with_projection_tmp_value(arena, None, exprs, |arena, value| { let mut state = arena.local_state(plan_arena); let (transaction, table_codec) = state.transaction_codec_mut(); let index = Index::new(index_meta.id, &value, index_meta.ty); @@ -130,18 +129,18 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Update { } for (i, column) in self.input_schema.iter().enumerate() { if let Some(expr) = exprs_map.get(&plan_arena.column(*column).id()) { - let value = expr.eval(Some(&tuple))?; - tuple.values[i] = value; + let value = expr.eval(Some(arena.result_tuple()))?; + arena.result_tuple_mut().values[i] = value; } } - tuple.pk = Some(Tuple::primary_projection( + let new_pk = Tuple::primary_projection( table_snapshot.primary_key_indices, - &tuple.values, - )); - let new_pk = tuple.pk.as_ref().ok_or(DatabaseError::PrimaryKeyNotFound)?; + &arena.result_tuple().values, + ); + arena.result_tuple_mut().pk = Some(new_pk.clone()); - if new_pk != &old_pk { + if new_pk != old_pk { let mut state = arena.local_state(plan_arena); let (transaction, table_codec) = state.transaction_codec_mut(); transaction.remove_tuple(table_codec, &self.table_name, &old_pk)?; @@ -149,20 +148,21 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Update { } for (index_meta, exprs) in table_snapshot.index_metas.iter() { let index_meta = plan_arena.index(*index_meta); - with_projection_tmp_value(arena, Some(&tuple), exprs, |arena, value| { + with_projection_tmp_value(arena, None, exprs, |arena, value| { let mut state = arena.local_state(plan_arena); let (transaction, table_codec) = state.transaction_codec_mut(); let index = Index::new(index_meta.id, &value, index_meta.ty); - transaction.add_index(table_codec, &self.table_name, index, new_pk) + transaction.add_index(table_codec, &self.table_name, index, &new_pk) })?; } + let tuple = mem::take(arena.result_tuple_mut()); let mut state = arena.local_state(plan_arena); let (transaction, table_codec) = state.transaction_codec_mut(); transaction.append_tuple( table_codec, &self.table_name, - tuple, + &tuple, &serializers, is_overwrite, )?; diff --git a/src/execution/dql/aggregate/hash_agg.rs b/src/execution/dql/aggregate/hash_agg.rs index 2b948bcf..c8f54979 100644 --- a/src/execution/dql/aggregate/hash_agg.rs +++ b/src/execution/dql/aggregate/hash_agg.rs @@ -18,16 +18,37 @@ use crate::execution::{ build_read, ExecArena, ExecId, ExecNode, ExecutionContext, ExecutorNode, ReadExecutor, }; use crate::expression::ScalarExpression; -use crate::iter_ext::Itertools; use crate::planner::operator::aggregate::AggregateOperator; use crate::planner::LogicalPlan; use crate::storage::Transaction; +use crate::types::tuple::Tuple; use crate::types::value::DataValue; -use std::collections::hash_map::{Entry, IntoIter as HashMapIntoIter}; +use std::collections::hash_map::IntoIter as HashMapIntoIter; use std::collections::HashMap; type HashAggOutput = HashMapIntoIter, Vec>>; +fn update_accumulators( + accs: &mut [Box], + agg_calls: &[ScalarExpression], + tuple: &Tuple, +) -> Result<(), DatabaseError> { + for (acc, expr) in accs.iter_mut().zip(agg_calls.iter()) { + let ScalarExpression::AggCall { args, .. } = expr else { + unreachable!() + }; + if args.len() > 1 { + return Err(DatabaseError::UnsupportedStmt( + "currently aggregate functions only support a single Column as a parameter" + .to_string(), + )); + } + let value = args[0].eval(Some(tuple))?; + acc.update_value(&value)?; + } + Ok(()) +} + pub struct HashAggExecutor { agg_calls: Vec, groupby_exprs: Vec, @@ -71,32 +92,21 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for HashAggExecutor { if self.output.is_none() { let mut group_hash_accs: HashMap, Vec>> = HashMap::new(); + let mut group_keys = Vec::with_capacity(self.groupby_exprs.len()); while arena.next_tuple(self.input, plan_arena)? { let tuple = arena.result_tuple(); - let group_keys = self - .groupby_exprs - .iter() - .map(|expr| expr.eval(Some(tuple))) - .try_collect()?; - - let entry = match group_hash_accs.entry(group_keys) { - Entry::Occupied(entry) => entry.into_mut(), - Entry::Vacant(entry) => entry.insert(create_accumulators(&self.agg_calls)?), - }; - - for (acc, expr) in entry.iter_mut().zip(self.agg_calls.iter()) { - let ScalarExpression::AggCall { args, .. } = expr else { - unreachable!() - }; - if args.len() > 1 { - return Err(DatabaseError::UnsupportedStmt( - "currently aggregate functions only support a single Column as a parameter" - .to_string(), - )); - } - let value = args[0].eval(Some(tuple))?; - acc.update_value(&value)?; + group_keys.clear(); + for expr in &self.groupby_exprs { + group_keys.push(expr.eval(Some(tuple))?); + } + + if let Some(accs) = group_hash_accs.get_mut(group_keys.as_slice()) { + update_accumulators(accs, &self.agg_calls, tuple)?; + } else { + let mut accs = create_accumulators(&self.agg_calls)?; + update_accumulators(&mut accs, &self.agg_calls, tuple)?; + group_hash_accs.insert(group_keys.clone(), accs); } } diff --git a/src/execution/dql/join/hash_join.rs b/src/execution/dql/join/hash_join.rs index 68422c3e..8ba9a7cd 100644 --- a/src/execution/dql/join/hash_join.rs +++ b/src/execution/dql/join/hash_join.rs @@ -33,7 +33,7 @@ use crate::types::tuple::Tuple; use crate::types::value::DataValue; use bumpalo::Bump; use std::collections::HashMap; -use std::mem::transmute; +use std::mem::{self, transmute}; pub struct HashJoin { state: HashJoinState, @@ -157,7 +157,7 @@ impl HashJoin { let mut build_count = 0usize; while arena.next_tuple(self.left_input, plan_arena)? { - let tuple = arena.result_tuple().clone(); + let tuple = mem::take(arena.result_tuple_mut()); Self::eval_keys(&self.on_left_keys, &tuple, &mut build_buf)?; match build_map.get_mut(&build_buf) { @@ -283,7 +283,7 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for HashJoin { if !arena.next_tuple(self.right_input, plan_arena)? { break true; } - let tuple = arena.result_tuple().clone(); + let tuple = mem::take(arena.result_tuple_mut()); Self::eval_keys(&self.on_right_keys, &tuple, &mut probe_buf)?; probe_state = Some(ProbeState { is_keys_has_null: probe_buf.iter().any(DataValue::is_null), diff --git a/src/execution/dql/join/nested_loop_join.rs b/src/execution/dql/join/nested_loop_join.rs index f47a56a3..51ad5e0f 100644 --- a/src/execution/dql/join/nested_loop_join.rs +++ b/src/execution/dql/join/nested_loop_join.rs @@ -15,6 +15,8 @@ //! Defines the nested loop join executor, it supports [`JoinType::Inner`], [`JoinType::LeftOuter`], //! [`JoinType::RightOuter`], [`JoinType::Cross`], [`JoinType::Full`]. +use std::mem; + use crate::errors::DatabaseError; use crate::execution::dql::join::RowBitmap; use crate::execution::{ @@ -187,7 +189,7 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for NestedLoopJoin { arena.finish(); return Ok(()); } - let left_tuple = arena.result_tuple().clone(); + let left_tuple = mem::take(arena.result_tuple_mut()); state = NestedLoopJoinState::ScanRight { active_left: ActiveLeftState { @@ -205,7 +207,7 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for NestedLoopJoin { mut right_bitmap, } => { while arena.next_tuple(active_left.right_input, plan_arena)? { - let right_tuple = arena.result_tuple().clone(); + let right_tuple = mem::take(arena.result_tuple_mut()); let idx = active_left.right_index; active_left.right_index += 1; @@ -334,7 +336,7 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for NestedLoopJoin { mut right_emit_index, } => { while arena.next_tuple(right_input, plan_arena)? { - let mut right_tuple = arena.result_tuple().clone(); + let mut right_tuple = mem::take(arena.result_tuple_mut()); let idx = right_emit_index; right_emit_index += 1; @@ -1211,9 +1213,8 @@ mod test { )?; let mut actual = Vec::new(); - for row in iter.by_ref() { - let tuple = row?; - actual.push(tuple_to_strings(&tuple)); + while let Some(row) = iter.next_tuple(|_, tuple| tuple_to_strings(tuple))? { + actual.push(row); } iter.done()?; diff --git a/src/execution/dql/mark_apply.rs b/src/execution/dql/mark_apply.rs index de4eaf5f..d2fbfc38 100644 --- a/src/execution/dql/mark_apply.rs +++ b/src/execution/dql/mark_apply.rs @@ -22,7 +22,6 @@ use crate::storage::Transaction; use crate::types::index::RuntimeIndexProbe; use crate::types::tuple::{SplitTupleRef, Tuple}; use crate::types::value::DataValue; -use std::mem; #[derive(PartialEq, Eq)] enum QuantifiedPredicateOutcome { @@ -36,7 +35,6 @@ pub struct MarkApply { op: MarkApplyOperator, right_input_plan: LogicalPlan, left_input: ExecId, - left_tuple: Tuple, } impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for MarkApply { @@ -54,7 +52,6 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for MarkApply { op, right_input_plan: right_input, left_input, - left_tuple: Tuple::default(), })) } } @@ -70,12 +67,11 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for MarkApply { return Ok(()); } - self.left_tuple = mem::take(arena.result_tuple_mut()); - let marker = self.mark_value(arena, plan_arena)?; + let mut left_tuple = arena.result_tuple().clone(); + let marker = self.mark_value(arena, plan_arena, &left_tuple)?; - arena.produce_tuple(mem::take(&mut self.left_tuple)); - arena.result_tuple_mut().values.push(marker); - arena.resume(); + left_tuple.values.push(marker); + arena.produce_tuple(left_tuple); Ok(()) } } @@ -142,27 +138,31 @@ impl MarkApply { result } - fn parameterized_probe_value(&self) -> Result, DatabaseError> { + fn parameterized_probe_value( + &self, + left_tuple: &Tuple, + ) -> Result, DatabaseError> { self.op .parameterized_probe() - .map(|probe| probe.eval(Some(&self.left_tuple))) + .map(|probe| probe.eval(Some(left_tuple))) .transpose() } fn mark_value<'a, T: Transaction + 'a>( - &mut self, + &self, arena: &mut ExecArena<'a, T>, plan_arena: &mut crate::planner::PlanArena<'a>, + left_tuple: &Tuple, ) -> Result { match self.op.kind { MarkApplyKind::Exists => self.with_right_input( arena, plan_arena, - self.parameterized_probe_value()?, + self.parameterized_probe_value(left_tuple)?, |arena, plan_arena, right_input| { while arena.next_tuple(right_input, plan_arena)? { let right_tuple = arena.result_tuple(); - if self.exists_predicate_matched(&self.left_tuple, right_tuple)? { + if self.exists_predicate_matched(left_tuple, right_tuple)? { return Ok(DataValue::Boolean(true)); } } @@ -171,7 +171,7 @@ impl MarkApply { }, ), MarkApplyKind::Quantified(MarkApplyQuantifier::Any) => { - if let Some(probe_value) = self.parameterized_probe_value()? { + if let Some(probe_value) = self.parameterized_probe_value(left_tuple)? { if !probe_value.is_null() { if self.with_right_input( arena, @@ -180,10 +180,8 @@ impl MarkApply { |arena, plan_arena, right_input| { while arena.next_tuple(right_input, plan_arena)? { let right_tuple = arena.result_tuple(); - if self.quantified_predicate_outcome( - &self.left_tuple, - right_tuple, - )? == QuantifiedPredicateOutcome::True + if self.quantified_predicate_outcome(left_tuple, right_tuple)? + == QuantifiedPredicateOutcome::True { return Ok(true); } @@ -202,10 +200,8 @@ impl MarkApply { |arena, plan_arena, right_input| { while arena.next_tuple(right_input, plan_arena)? { let right_tuple = arena.result_tuple(); - if self.quantified_predicate_outcome( - &self.left_tuple, - right_tuple, - )? == QuantifiedPredicateOutcome::Null + if self.quantified_predicate_outcome(left_tuple, right_tuple)? + == QuantifiedPredicateOutcome::Null { return Ok(true); } @@ -227,6 +223,7 @@ impl MarkApply { plan_arena, right_input, MarkApplyQuantifier::Any, + left_tuple, ) }) } @@ -237,6 +234,7 @@ impl MarkApply { plan_arena, right_input, MarkApplyQuantifier::All, + left_tuple, ) }) } @@ -249,12 +247,13 @@ impl MarkApply { plan_arena: &mut crate::planner::PlanArena<'a>, right_input: ExecId, quantifier: MarkApplyQuantifier, + left_tuple: &Tuple, ) -> Result { let mut saw_null = false; while arena.next_tuple(right_input, plan_arena)? { let right_tuple = arena.result_tuple(); - match self.quantified_predicate_outcome(&self.left_tuple, right_tuple)? { + match self.quantified_predicate_outcome(left_tuple, right_tuple)? { QuantifiedPredicateOutcome::True => { if matches!(quantifier, MarkApplyQuantifier::Any) { return Ok(DataValue::Boolean(true)); @@ -581,15 +580,15 @@ mod tests { &transaction, ); - let mut exec = MarkApply { + let exec = MarkApply { op, right_input_plan: right, left_input: 0, - left_tuple: Tuple::new(None, vec![DataValue::Int32(2), DataValue::Int32(1)]), }; + let left_tuple = Tuple::new(None, vec![DataValue::Int32(2), DataValue::Int32(1)]); assert_eq!( - exec.mark_value(&mut arena, &mut plan_arena)?, + exec.mark_value(&mut arena, &mut plan_arena, &left_tuple)?, DataValue::Boolean(true) ); assert_eq!( @@ -629,15 +628,15 @@ mod tests { &transaction, ); - let mut exec = MarkApply { + let exec = MarkApply { op, right_input_plan: right, left_input: 0, - left_tuple: Tuple::new(None, vec![DataValue::Int32(2)]), }; + let left_tuple = Tuple::new(None, vec![DataValue::Int32(2)]); assert_eq!( - exec.mark_value(&mut arena, &mut plan_arena)?, + exec.mark_value(&mut arena, &mut plan_arena, &left_tuple)?, DataValue::Boolean(true) ); assert_eq!( @@ -677,15 +676,15 @@ mod tests { &transaction, ); - let mut exec = MarkApply { + let exec = MarkApply { op, right_input_plan: right, left_input: 0, - left_tuple: Tuple::new(None, vec![DataValue::Null]), }; + let left_tuple = Tuple::new(None, vec![DataValue::Null]); assert_eq!( - exec.mark_value(&mut arena, &mut plan_arena)?, + exec.mark_value(&mut arena, &mut plan_arena, &left_tuple)?, DataValue::Null ); assert_eq!( diff --git a/src/execution/dql/scalar_apply.rs b/src/execution/dql/scalar_apply.rs index 2278c3b2..3f80ef44 100644 --- a/src/execution/dql/scalar_apply.rs +++ b/src/execution/dql/scalar_apply.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::mem; - use crate::errors::DatabaseError; use crate::execution::{ build_read, ExecArena, ExecId, ExecNode, ExecutionContext, ExecutorNode, ReadExecutor, @@ -22,6 +20,7 @@ use crate::planner::operator::scalar_apply::ScalarApplyOperator; use crate::planner::LogicalPlan; use crate::storage::Transaction; use crate::types::tuple::Tuple; +use std::mem; pub struct ScalarApply { left_input: ExecId, diff --git a/src/execution/dql/set_membership.rs b/src/execution/dql/set_membership.rs index f61d5600..ecd041a0 100644 --- a/src/execution/dql/set_membership.rs +++ b/src/execution/dql/set_membership.rs @@ -21,6 +21,7 @@ use crate::planner::LogicalPlan; use crate::storage::Transaction; use crate::types::tuple::Tuple; use std::collections::HashMap; +use std::mem; pub struct SetMembership { kind: SetMembershipKind, @@ -87,7 +88,7 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for SetMembership { while arena.next_tuple(self.right_input, plan_arena)? { *self .right_counts - .entry(arena.result_tuple().clone()) + .entry(mem::take(arena.result_tuple_mut())) .or_insert(0) += 1; } self.built = true; diff --git a/src/execution/dql/sort.rs b/src/execution/dql/sort.rs index c3bddc94..449ccc4c 100644 --- a/src/execution/dql/sort.rs +++ b/src/execution/dql/sort.rs @@ -18,12 +18,11 @@ use crate::execution::{ }; use crate::planner::operator::sort::{SortField, SortOperator}; use crate::planner::LogicalPlan; -use crate::storage::table_codec::BumpBytes; use crate::storage::Transaction; use crate::types::tuple::Tuple; use bumpalo::Bump; use std::cmp::Ordering; -use std::mem::{transmute, MaybeUninit}; +use std::mem::{self, transmute, MaybeUninit}; pub(crate) type BumpVec<'bump, T> = bumpalo::collections::Vec<'bump, T>; @@ -36,57 +35,21 @@ impl<'a, T> NullableVec<'a, T> { NullableVec(BumpVec::new_in(arena)) } - #[inline] - pub(crate) fn with_capacity(capacity: usize, arena: &'a Bump) -> NullableVec<'a, T> { - NullableVec(BumpVec::with_capacity_in(capacity, arena)) - } - - #[inline] - pub(crate) fn fill_capacity(capacity: usize, arena: &'a Bump) -> NullableVec<'a, T> { - let mut data = BumpVec::with_capacity_in(capacity, arena); - for _ in 0..capacity { - data.push(MaybeUninit::uninit()); - } - NullableVec(data) - } - #[inline] pub(crate) fn put(&mut self, item: T) { self.0.push(MaybeUninit::new(item)); } - #[inline] - pub(crate) fn set(&mut self, pos: usize, item: T) { - self.0[pos] = MaybeUninit::new(item); - } - - #[inline] - pub(crate) fn take(&mut self, offset: usize) -> T { - unsafe { self.0[offset].assume_init_read() } - } - #[inline] pub(crate) fn len(&self) -> usize { self.0.len() } - #[inline] - pub(crate) fn is_empty(&self) -> bool { - self.0.is_empty() - } - #[inline] pub(crate) fn iter(&self) -> impl Iterator { self.0.iter().map(|item| unsafe { item.assume_init_ref() }) } - #[inline] - pub(crate) fn take_iter(&mut self) -> impl Iterator + '_ { - self.0 - .iter_mut() - .map(|item| unsafe { item.assume_init_read() }) - } - #[inline] pub(crate) fn into_iter(self) -> impl Iterator + 'a { self.0 @@ -95,179 +58,63 @@ impl<'a, T> NullableVec<'a, T> { } } -pub struct RemappingIterator<'a, T> { - tuples: NullableVec<'a, (usize, Tuple)>, - indices: T, -} - -impl> RemappingIterator<'_, T> { - pub fn new(tuples: NullableVec<(usize, Tuple)>, indices: T) -> RemappingIterator { - RemappingIterator { tuples, indices } - } -} - -impl> Iterator for RemappingIterator<'_, T> { - type Item = Tuple; - - fn next(&mut self) -> Option { - self.indices.next().map(|pos| self.tuples.take(pos).1) - } -} - -const BUCKET_SIZE: usize = u8::MAX as usize + 1; - -// LSD Radix Sort -pub(crate) fn radix_sort<'a, T, A: AsRef<[u8]>>( - tuples: &mut NullableVec<'a, (T, A)>, - arena: &'a Bump, -) { - if tuples.is_empty() { - return; - } - let max_len = tuples - .iter() - .map(|(_, bytes)| bytes.as_ref().len()) - .max() - .unwrap(); - - let mut buf = NullableVec::fill_capacity(tuples.len(), arena); - - let mut count = [0usize; BUCKET_SIZE]; - let mut pos = [0usize; BUCKET_SIZE]; - - for i in (0..max_len).rev() { - count.fill(0); - - for (_, value) in tuples.iter() { - let bytes = value.as_ref(); - let idx = if bytes.len() > i { bytes[i] } else { 0 }; - count[idx as usize] += 1; - } - - { - let mut sum = 0; - for j in 0..BUCKET_SIZE { - let c = count[j]; - pos[j] = sum; - sum += c; - } +pub(crate) fn sort_tuples<'a>( + sort_fields: &[SortField], + mut tuples: NullableVec<'a, (usize, Tuple)>, +) -> Result + 'a, DatabaseError> { + let fn_nulls_first = |nulls_first: bool| { + if nulls_first { + Ordering::Greater + } else { + Ordering::Less } - - for (t, value) in tuples.take_iter() { - let bytes = value.as_ref(); - let idx = if bytes.len() > i { bytes[i] } else { 0 }; - let p = pos[idx as usize]; - buf.set(p, (t, value)); - pos[idx as usize] += 1; + }; + // Extract the results of calculating SortFields to avoid double calculation + // of data during comparison. + let mut eval_values = vec![Vec::with_capacity(tuples.len()); sort_fields.len()]; + + for (x, SortField { expr, .. }) in sort_fields.iter().enumerate() { + for (_, tuple) in tuples.iter() { + eval_values[x].push(expr.eval(Some(tuple))?); } - std::mem::swap(tuples, &mut buf); } -} - -pub enum SortBy { - Radix, - Fast, -} -impl SortBy { - pub(crate) fn sorted_tuples<'a>( - &self, - arena: &'a Bump, - sort_fields: &[SortField], - mut tuples: NullableVec<'a, (usize, Tuple)>, - ) -> Result + 'a>, DatabaseError> { - match self { - SortBy::Radix => { - let mut sort_keys = NullableVec::with_capacity(tuples.len(), arena); - - for (i, (_, tuple)) in tuples.iter().enumerate() { - let mut full_key = BumpVec::new_in(arena); - - for SortField { - expr, - nulls_first, - asc, - } in sort_fields - { - let mut key = BumpBytes::new_in(arena); - - expr.eval(Some(tuple))? - .memcomparable_encode_with_null_order(&mut key, *nulls_first)?; - - if !asc && key.len() > 1 { - for byte in key.iter_mut().skip(1) { - *byte ^= 0xFF; - } - } - full_key.extend(key); + tuples.0.sort_by(|tuple_1, tuple_2| { + let (i_1, _) = unsafe { tuple_1.assume_init_ref() }; + let (i_2, _) = unsafe { tuple_2.assume_init_ref() }; + let mut ordering = Ordering::Equal; + + for ( + x, + SortField { + asc, nulls_first, .. + }, + ) in sort_fields.iter().enumerate() + { + let value_1 = &eval_values[x][*i_1]; + let value_2 = &eval_values[x][*i_2]; + + ordering = match (value_1.is_null(), value_2.is_null()) { + (false, true) => fn_nulls_first(*nulls_first), + (true, false) => fn_nulls_first(*nulls_first).reverse(), + _ => { + let mut ordering = value_1.partial_cmp(value_2).unwrap_or(Ordering::Equal); + if !*asc { + ordering = ordering.reverse(); } - sort_keys.put((i, full_key)) + ordering } - radix_sort(&mut sort_keys, arena); - - Ok(Box::new(RemappingIterator::new( - tuples, - sort_keys.into_iter().map(|(i, _)| i), - ))) + }; + if ordering != Ordering::Equal { + break; } - SortBy::Fast => { - let fn_nulls_first = |nulls_first: bool| { - if nulls_first { - Ordering::Greater - } else { - Ordering::Less - } - }; - // Extract the results of calculating SortFields to avoid double calculation - // of data during comparison - let mut eval_values = vec![Vec::with_capacity(sort_fields.len()); tuples.len()]; - - for (x, SortField { expr, .. }) in sort_fields.iter().enumerate() { - for (_, tuple) in tuples.iter() { - eval_values[x].push(expr.eval(Some(tuple))?); - } - } - - tuples.0.sort_by(|tuple_1, tuple_2| { - let (i_1, _) = unsafe { tuple_1.assume_init_ref() }; - let (i_2, _) = unsafe { tuple_2.assume_init_ref() }; - let mut ordering = Ordering::Equal; + } - for ( - x, - SortField { - asc, nulls_first, .. - }, - ) in sort_fields.iter().enumerate() - { - let value_1 = &eval_values[x][*i_1]; - let value_2 = &eval_values[x][*i_2]; - - ordering = match (value_1.is_null(), value_2.is_null()) { - (false, true) => fn_nulls_first(*nulls_first), - (true, false) => fn_nulls_first(*nulls_first).reverse(), - _ => { - let mut ordering = - value_1.partial_cmp(value_2).unwrap_or(Ordering::Equal); - if !*asc { - ordering = ordering.reverse(); - } - ordering - } - }; - if ordering != Ordering::Equal { - break; - } - } + ordering + }); + drop(eval_values); - ordering - }); - drop(eval_values); - - Ok(Box::new(tuples.into_iter().map(|(_, tuple)| tuple))) - } - } - } + Ok(tuples.into_iter().map(|(_, tuple)| tuple)) } pub struct Sort { @@ -310,22 +157,16 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Sort { while arena.next_tuple(self.input, plan_arena)? { let offset = tuples.len(); - tuples.put((offset, arena.result_tuple().clone())); + tuples.put((offset, mem::take(arena.result_tuple_mut()))); } - let sort_by = if tuples.len() > 256 { - SortBy::Radix - } else { - SortBy::Fast - }; let limit = self.limit.unwrap_or(tuples.len()); - let rows = sort_by.sorted_tuples(&self.arena, &self.sort_fields, tuples)?; - let rows: Box + '_> = Box::new(rows.take(limit)); - // The arena lives at a stable boxed address, so we can keep the old iterator shape + let rows = sort_tuples(&self.sort_fields, tuples)?; + // The arena lives at a stable boxed address, so we can keep the iterator // and resume it across executor polls. self.output = Some(unsafe { transmute:: + '_>, Box>>( - rows, + Box::new(rows.take(limit)), ) }); } @@ -343,7 +184,7 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Sort { mod test { use crate::catalog::{ColumnCatalog, ColumnDesc}; use crate::errors::DatabaseError; - use crate::execution::dql::sort::{radix_sort, NullableVec, SortBy}; + use crate::execution::dql::sort::{sort_tuples, NullableVec}; use crate::expression::ScalarExpression; use crate::planner::operator::sort::SortField; use crate::types::tuple::Tuple; @@ -351,27 +192,6 @@ mod test { use crate::types::LogicalType; use bumpalo::Bump; - #[test] - fn test_radix_sort() { - let arena = Bump::new(); - { - let mut indices = NullableVec::with_capacity(4, &arena); - indices.put((0usize, "abc".as_bytes().to_vec())); - indices.put((1, "abz".as_bytes().to_vec())); - indices.put((2, "abe".as_bytes().to_vec())); - indices.put((3, "abcd".as_bytes().to_vec())); - - radix_sort(&mut indices, &arena); - - let mut iter = indices.iter(); - - assert_eq!(Some(&(0, "abc".as_bytes().to_vec())), iter.next()); - assert_eq!(Some(&(3, "abcd".as_bytes().to_vec())), iter.next()); - assert_eq!(Some(&(2, "abe".as_bytes().to_vec())), iter.next()); - assert_eq!(Some(&(1, "abz".as_bytes().to_vec())), iter.next()); - } - } - #[test] fn test_single_value_desc_and_null_first() -> Result<(), DatabaseError> { let table_arena = crate::planner::TableArenaCell::default(); @@ -475,49 +295,22 @@ mod test { } }; - // RadixSort - fn_asc_and_nulls_first_eq(SortBy::Radix.sorted_tuples( - &arena, - &fn_sort_fields(true, true), - fn_tuples(), - )?); - fn_asc_and_nulls_last_eq(SortBy::Radix.sorted_tuples( - &arena, - &fn_sort_fields(true, false), - fn_tuples(), - )?); - fn_desc_and_nulls_first_eq(SortBy::Radix.sorted_tuples( - &arena, - &fn_sort_fields(false, true), - fn_tuples(), - )?); - fn_desc_and_nulls_last_eq(SortBy::Radix.sorted_tuples( - &arena, - &fn_sort_fields(false, false), - fn_tuples(), - )?); - - // FastSort - fn_asc_and_nulls_first_eq(SortBy::Fast.sorted_tuples( - &arena, + fn_asc_and_nulls_first_eq(Box::new(sort_tuples( &fn_sort_fields(true, true), fn_tuples(), - )?); - fn_asc_and_nulls_last_eq(SortBy::Fast.sorted_tuples( - &arena, + )?)); + fn_asc_and_nulls_last_eq(Box::new(sort_tuples( &fn_sort_fields(true, false), fn_tuples(), - )?); - fn_desc_and_nulls_first_eq(SortBy::Fast.sorted_tuples( - &arena, + )?)); + fn_desc_and_nulls_first_eq(Box::new(sort_tuples( &fn_sort_fields(false, true), fn_tuples(), - )?); - fn_desc_and_nulls_last_eq(SortBy::Fast.sorted_tuples( - &arena, + )?)); + fn_desc_and_nulls_last_eq(Box::new(sort_tuples( &fn_sort_fields(false, false), fn_tuples(), - )?); + )?)); Ok(()) } @@ -732,49 +525,22 @@ mod test { } }; - // RadixSort - fn_asc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq(SortBy::Radix.sorted_tuples( - &arena, - &fn_sort_fields(true, true, true, true), - fn_tuples(), - )?); - fn_asc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq(SortBy::Radix.sorted_tuples( - &arena, - &fn_sort_fields(true, false, true, true), - fn_tuples(), - )?); - fn_desc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq(SortBy::Radix.sorted_tuples( - &arena, - &fn_sort_fields(false, true, true, true), - fn_tuples(), - )?); - fn_desc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq(SortBy::Radix.sorted_tuples( - &arena, - &fn_sort_fields(false, false, true, true), - fn_tuples(), - )?); - - // FastSort - fn_asc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq(SortBy::Fast.sorted_tuples( - &arena, + fn_asc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq(Box::new(sort_tuples( &fn_sort_fields(true, true, true, true), fn_tuples(), - )?); - fn_asc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq(SortBy::Fast.sorted_tuples( - &arena, + )?)); + fn_asc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq(Box::new(sort_tuples( &fn_sort_fields(true, false, true, true), fn_tuples(), - )?); - fn_desc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq(SortBy::Fast.sorted_tuples( - &arena, + )?)); + fn_desc_1_and_nulls_first_1_and_asc_2_and_nulls_first_2_eq(Box::new(sort_tuples( &fn_sort_fields(false, true, true, true), fn_tuples(), - )?); - fn_desc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq(SortBy::Fast.sorted_tuples( - &arena, + )?)); + fn_desc_1_and_nulls_last_1_and_asc_2_and_nulls_first_2_eq(Box::new(sort_tuples( &fn_sort_fields(false, false, true, true), fn_tuples(), - )?); + )?)); Ok(()) } diff --git a/src/execution/dql/top_k.rs b/src/execution/dql/top_k.rs index 3fc9e258..31fed1b0 100644 --- a/src/execution/dql/top_k.rs +++ b/src/execution/dql/top_k.rs @@ -26,7 +26,7 @@ use crate::types::tuple::Tuple; use bumpalo::Bump; use std::cmp::Ordering; use std::collections::{btree_set::IntoIter as BTreeSetIntoIter, BTreeSet}; -use std::mem::transmute; +use std::mem::{self, transmute}; #[derive(Eq, PartialEq, Debug)] struct CmpItem<'a> { @@ -143,7 +143,7 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for TopK { &self.arena, &self.sort_fields, &mut set, - arena.result_tuple().clone(), + mem::take(arena.result_tuple_mut()), keep_count, )?; } diff --git a/src/execution/mod.rs b/src/execution/mod.rs index f2948c79..6438caa0 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -147,11 +147,11 @@ impl<'a, T: Transaction + 'a> Executor<'a, T> { pub(crate) fn next_tuple( &mut self, plan_arena: &mut PlanArena<'a>, - ) -> Result, DatabaseError> { + ) -> Result, DatabaseError> { if !self.arena.next_tuple(self.root, plan_arena)? { return Ok(None); } - Ok(Some(self.arena.result_tuple())) + Ok(Some(self.arena.result_tuple_mut())) } pub(crate) fn take_ddl_apply(&mut self) -> Vec { @@ -1043,24 +1043,12 @@ mod test_utils { plan_arena: PlanArena<'a>, } - impl<'a, T: Transaction + 'a> TestExecutor<'a, T> { - pub(crate) fn next_tuple(&mut self) -> Result, DatabaseError> { + impl TestExecutor<'_, T> { + pub(crate) fn next_tuple(&mut self) -> Result, DatabaseError> { self.executor.next_tuple(&mut self.plan_arena) } } - impl Iterator for TestExecutor<'_, T> { - type Item = Result; - - fn next(&mut self) -> Option { - match self.next_tuple() { - Ok(Some(tuple)) => Some(Ok(tuple.clone())), - Ok(None) => None, - Err(err) => Some(Err(err)), - } - } - } - pub(crate) fn execute<'a, T, E>( executor: E, cache: ExecutionContext<'a>, diff --git a/src/macros/mod.rs b/src/macros/mod.rs index fb67e4ab..dd177b58 100644 --- a/src/macros/mod.rs +++ b/src/macros/mod.rs @@ -38,8 +38,11 @@ #[macro_export] macro_rules! from_tuple { ($struct_name:ident, ($($field_name:ident : $field_type:ty => $closure:expr),+)) => { - impl<'__kite_schema, '__kite_arena> From<(&::kite_sql::types::tuple::SchemaView<'__kite_schema, '__kite_arena>, ::kite_sql::types::tuple::Tuple)> for $struct_name { - fn from((schema, mut tuple): (&::kite_sql::types::tuple::SchemaView<'__kite_schema, '__kite_arena>, ::kite_sql::types::tuple::Tuple)) -> Self { + impl ::kite_sql::orm::FromQueryRow for $struct_name { + fn from_query_row( + schema: &::kite_sql::types::tuple::SchemaView<'_, '_>, + tuple: &mut ::kite_sql::types::tuple::Tuple, + ) -> ::std::result::Result { fn try_get(tuple: &mut ::kite_sql::types::tuple::Tuple, schema: &::kite_sql::types::tuple::SchemaView<'_, '_>, field_name: &str) -> Option<::kite_sql::types::value::DataValue> { let ty = ::kite_sql::types::LogicalType::type_trans::()?; let idx = schema.position(field_name)?; @@ -49,14 +52,14 @@ macro_rules! from_tuple { let mut struct_instance = $struct_name::default(); $( - if let Some(value) = try_get::<$field_type>(&mut tuple, schema, stringify!($field_name)) { + if let Some(value) = try_get::<$field_type>(tuple, schema, stringify!($field_name)) { $closure( &mut struct_instance, value ); } )+ - struct_instance + Ok(struct_instance) } } }; diff --git a/src/optimizer/core/histogram.rs b/src/optimizer/core/histogram.rs index 2141a96c..883ab1c4 100644 --- a/src/optimizer/core/histogram.rs +++ b/src/optimizer/core/histogram.rs @@ -367,6 +367,23 @@ impl Histogram { } } + fn top_n_count_or_fallback( + &self, + value: &DataValue, + sketch: &CountMinSketch, + top_n: &ColumnTopN, + ) -> usize { + let Some(entry) = top_n.get_entry(value) else { + return self.equal_count(value, sketch); + }; + if entry.error() == 0 || entry.error() < self.average_count() { + return entry.count(); + } + + let lower = entry.count().saturating_sub(entry.error()); + self.equal_count(value, sketch).clamp(lower, entry.count()) + } + pub fn collect_count( &self, ranges: &[Range], @@ -604,10 +621,8 @@ impl Histogram { Range::Eq(value) => { *count += if value.is_null() { self.meta.null_count - } else if let Some(count) = top_n.get(value) { - count } else { - self.equal_count(value, sketch) + self.top_n_count_or_fallback(value, sketch, top_n) }; *binary_i += 1 } @@ -1130,6 +1145,36 @@ mod tests { Ok(()) } + #[test] + fn test_eq_count_falls_back_when_top_n_error_is_large() -> Result<(), DatabaseError> { + let mut builder = HistogramBuilder::new(&index_meta(), ANALYZE_STATISTICS_RELATIVE_ERROR)?; + + for value in 0..10_000 { + builder.append(DataValue::Int32(value))?; + } + + let (histogram, sketch, _) = builder.build(100)?; + + let mut top_n = ColumnTopN::default(); + top_n.add_with_size(1, DataValue::Int32(1), 10); + top_n.add_with_size(1, DataValue::Int32(7), 1); + let entry = top_n.get_entry(&DataValue::Int32(7)).unwrap(); + assert_eq!(entry.count(), 11); + assert_eq!(entry.error(), 10); + + let fallback = histogram.equal_count(&DataValue::Int32(7), &sketch); + let lower = entry.count().saturating_sub(entry.error()); + let expected = fallback.clamp(lower, entry.count()); + assert!(expected < entry.count()); + + assert_eq!( + histogram.collect_count(&[Range::Eq(DataValue::Int32(7))], &sketch, &top_n)?, + expected + ); + + Ok(()) + } + #[test] fn test_collect_count_ignores_tuple_prefix_endpoint_count() -> Result<(), DatabaseError> { let mut builder = HistogramBuilder::new(&index_meta(), ANALYZE_STATISTICS_RELATIVE_ERROR)?; diff --git a/src/optimizer/core/top_n.rs b/src/optimizer/core/top_n.rs index eab8858c..62773c5e 100644 --- a/src/optimizer/core/top_n.rs +++ b/src/optimizer/core/top_n.rs @@ -70,14 +70,6 @@ impl ColumnTopN { self.add_with_options(top_n_size, value, count, 0); } - pub fn merge_with_size(&mut self, other: ColumnTopN, top_n_size: usize) { - for entry in other.values { - if self.should_insert(&entry.value, entry.count, entry.error) { - self.insert_new_with_options(top_n_size, entry); - } - } - } - pub fn finish_with_size(mut self, top_n_size: usize) -> Self { self.prune_to_capacity(top_n_size); self @@ -134,11 +126,19 @@ impl ColumnTopN { } } - fn insert_new_with_options(&mut self, capacity: usize, entry: ColumnTopNEntry) { + fn insert_new_with_options(&mut self, capacity: usize, mut entry: ColumnTopNEntry) { if capacity == 0 { return; } + if self.values.len() >= capacity { + let Some(min_entry) = self.prune_min() else { + return; + }; + entry.count = min_entry.count.saturating_add(entry.count); + entry.error = min_entry.count.saturating_add(entry.error); + } + let index = self.find(&entry.value).unwrap_or_else(|index| index); self.values.insert(index, entry); self.on_insert(index); @@ -159,15 +159,12 @@ impl ColumnTopN { } } - fn prune_min(&mut self) -> Option<()> { - let (min_index, next_min_index) = match self.min_index.take() { - Some(index) if index < self.values.len() => (index, None), - _ => self.find_min_and_next_index()?, - }; - self.values.remove(min_index); + fn prune_min(&mut self) -> Option { + let (min_index, next_min_index) = self.find_min_and_next_index()?; + let entry = self.values.remove(min_index); self.min_index = next_min_index.map(|index| if index > min_index { index - 1 } else { index }); - Some(()) + Some(entry) } fn on_insert(&mut self, index: usize) { @@ -232,19 +229,17 @@ mod tests { use crate::types::value::DataValue; #[test] - fn top_n_prunes_to_capacity() { + fn top_n_replaces_min_counter_when_full() { let mut top_n = ColumnTopN::default(); top_n.add_with_size(2, DataValue::Int32(1), 5); top_n.add_with_size(2, DataValue::Int32(2), 3); top_n.add_with_size(2, DataValue::Int32(3), 1); - assert_eq!(top_n.len(), 2); - - top_n.add_with_size(2, DataValue::Int32(4), 1); - assert_eq!(top_n.len(), 2); assert_eq!(top_n.get(&DataValue::Int32(1)), Some(5)); - assert_eq!(top_n.get(&DataValue::Int32(2)), Some(3)); + let entry = top_n.get_entry(&DataValue::Int32(3)).unwrap(); + assert_eq!(entry.count(), 4); + assert_eq!(entry.error(), 3); } #[test] @@ -261,23 +256,9 @@ mod tests { .windows(2) .all(|pair| pair[0].value() < pair[1].value())); assert_eq!(top_n.get(&DataValue::Int32(1)), Some(5)); - assert_eq!(top_n.get(&DataValue::Int32(2)), Some(1)); - } - - #[test] - fn top_n_merge_accumulates_entries() { - let mut left = ColumnTopN::default(); - left.add_with_size(2, DataValue::Int32(1), 3); - left.add_with_size(2, DataValue::Int32(2), 2); - - let mut right = ColumnTopN::default(); - right.add_with_size(2, DataValue::Int32(1), 4); - right.add_with_size(2, DataValue::Int32(3), 1); - - left.merge_with_size(right, 2); - - assert_eq!(left.get(&DataValue::Int32(1)), Some(7)); - assert_eq!(left.len(), 2); + let entry = top_n.get_entry(&DataValue::Int32(3)).unwrap(); + assert_eq!(entry.count(), 2); + assert_eq!(entry.error(), 1); } #[test] diff --git a/src/orm/mod.rs b/src/orm/mod.rs index bef198e4..c31d9146 100644 --- a/src/orm/mod.rs +++ b/src/orm/mod.rs @@ -6,8 +6,7 @@ use crate::binder::{ }; use crate::catalog::{ColumnCatalog, ColumnRef, TableCatalog, TableName}; use crate::db::{ - BindSource, BorrowResultIter, DBTransaction, Database, DatabaseIter, OrmIter, ResultIter, - TransactionIter, + BindSource, DBTransaction, Database, DatabaseIter, OrmIter, ResultIter, TransactionIter, }; use crate::errors::DatabaseError; use crate::expression::{self, AliasType, ScalarExpression}; @@ -60,28 +59,26 @@ pub struct DescribeColumn { pub default: String, } -impl From<(&SchemaView<'_, '_>, Tuple)> for DescribeColumn { - fn from((_, tuple): (&SchemaView<'_, '_>, Tuple)) -> Self { - let mut values = tuple.values.into_iter(); - - let field = describe_text_value(values.next()); - let data_type = describe_text_value(values.next()); - let len = describe_text_value(values.next()); +impl FromQueryRow for DescribeColumn { + fn from_query_row(_: &SchemaView<'_, '_>, tuple: &mut Tuple) -> Result { + let field = describe_text_value(take_projected_value(tuple, 0)); + let data_type = describe_text_value(take_projected_value(tuple, 1)); + let len = describe_text_value(take_projected_value(tuple, 2)); let nullable = matches!( - values.next(), + take_projected_value(tuple, 3), Some(DataValue::Utf8 { value, .. }) if value == "true" ); - let key = describe_text_value(values.next()); - let default = describe_text_value(values.next()); + let key = describe_text_value(take_projected_value(tuple, 4)); + let default = describe_text_value(take_projected_value(tuple, 5)); - Self { + Ok(Self { field, data_type, len, nullable, key, default, - } + }) } } @@ -2524,26 +2521,18 @@ pub trait FromDataValue: Sized { /// ``` pub trait FromQueryTuple: Sized { /// Decodes one projected tuple into `Self`. - fn from_query_tuple(tuple: Tuple) -> Result; + fn from_query_tuple(tuple: &mut Tuple) -> Result; } /// Conversion trait from a query result row into a Rust value. /// /// `#[derive(Model)]` and `#[derive(Projection)]` generate this automatically. -/// Types that still implement the older `From<(&SchemaView, Tuple)>` mapping -/// are also accepted through a compatibility implementation. pub trait FromQueryRow: Sized { /// Decodes one result row into `Self`. - fn from_query_row(schema: &SchemaView<'_, '_>, tuple: Tuple) -> Result; -} - -impl FromQueryRow for T -where - T: for<'view, 'schema, 'arena> From<(&'view SchemaView<'schema, 'arena>, Tuple)>, -{ - fn from_query_row(schema: &SchemaView<'_, '_>, tuple: Tuple) -> Result { - Ok(T::from((schema, tuple))) - } + fn from_query_row( + schema: &SchemaView<'_, '_>, + tuple: &mut Tuple, + ) -> Result; } /// Typed adapter over a [`ResultIter`] that yields projected values instead of raw tuples. @@ -2649,8 +2638,9 @@ where fn next(&mut self) -> Option { self.inner - .next() - .map(|result| result.and_then(extract_value_from_tuple::)) + .next_tuple(|_, tuple| extract_value_from_tuple::(tuple)) + .transpose() + .map(|value| value.and_then(std::convert::identity)) } } @@ -2664,8 +2654,9 @@ where fn next(&mut self) -> Option { self.inner - .next() - .map(|result| result.and_then(extract_projected_tuple::)) + .next_tuple(|_, tuple| extract_projected_tuple::(tuple)) + .transpose() + .map(|tuple| tuple.and_then(std::convert::identity)) } } @@ -2937,24 +2928,26 @@ macro_rules! impl_from_query_tuple { $($name: FromDataValue,)+ { #[allow(non_snake_case)] - fn from_query_tuple(tuple: Tuple) -> Result { + fn from_query_tuple(tuple: &mut Tuple) -> Result { let expected_len = [$(stringify!($name)),+].len(); - let mut values = tuple.values.into_iter(); + if tuple.values.len() != expected_len { + return Err(DatabaseError::MisMatch( + "the expected tuple projection width", + "the query result", + )); + } + let mut indexes = 0..expected_len; $( let $name = extract_projected_data_value::<$name>( - values.next(), + take_projected_value( + tuple, + indexes.next().expect("checked projected tuple width"), + ), expected_len, )?; )+ - if values.next().is_some() { - return Err(DatabaseError::MisMatch( - "the expected tuple projection width", - "the query result", - )); - } - Ok(($($name,)+)) } } @@ -3019,13 +3012,12 @@ where I: ResultIter, T: FromQueryRow, { - Ok(match iter.next() { - Some(tuple) => { - let tuple = tuple?; - Some(iter.schema(|schema| T::from_query_row(schema, tuple))?) - } - None => None, - }) + Ok( + match iter.next_tuple(|schema, tuple| T::from_query_row(schema, tuple))? { + Some(row) => Some(row?), + None => None, + }, + ) } fn convert_projected_value(value: DataValue) -> Result { @@ -3045,6 +3037,13 @@ fn invalid_from_data_value(value: &DataValue) -> DatabaseError { )) } +fn take_projected_value(tuple: &mut Tuple, index: usize) -> Option { + tuple + .values + .get_mut(index) + .map(|value| std::mem::replace(value, DataValue::Null)) +} + fn extract_projected_data_value( value: Option, _expected_len: usize, @@ -3056,9 +3055,9 @@ fn extract_projected_data_value( convert_projected_value::(value) } -fn extract_value_from_tuple(mut tuple: Tuple) -> Result { +fn extract_value_from_tuple(tuple: &mut Tuple) -> Result { let value = if tuple.values.len() == 1 { - tuple.values.swap_remove(0) + take_projected_value(tuple, 0).expect("checked one projected expression") } else { return Err(DatabaseError::MisMatch( "one projected expression", @@ -3069,7 +3068,7 @@ fn extract_value_from_tuple(mut tuple: Tuple) -> Result(value) } -fn extract_projected_tuple(tuple: Tuple) -> Result { +fn extract_projected_tuple(tuple: &mut Tuple) -> Result { T::from_query_tuple(tuple) } diff --git a/src/python.rs b/src/python.rs index 12dd9558..e83492e6 100644 --- a/src/python.rs +++ b/src/python.rs @@ -162,13 +162,16 @@ enum PythonResultIterInner { } impl PythonResultIterInner { - fn next_tuple(&mut self) -> Result, DatabaseError> { + fn next_tuple( + &mut self, + f: impl FnOnce(&mut Tuple) -> R, + ) -> Result, DatabaseError> { match self { #[cfg(feature = "lmdb")] - PythonResultIterInner::Lmdb(iter) => iter.next_borrowed_tuple(), - PythonResultIterInner::Memory(iter) => iter.next_borrowed_tuple(), + PythonResultIterInner::Lmdb(iter) => iter.next_tuple(|_, tuple| f(tuple)), + PythonResultIterInner::Memory(iter) => iter.next_tuple(|_, tuple| f(tuple)), #[cfg(feature = "rocksdb")] - PythonResultIterInner::Rocks(iter) => iter.next_borrowed_tuple(), + PythonResultIterInner::Rocks(iter) => iter.next_tuple(|_, tuple| f(tuple)), } } @@ -251,7 +254,7 @@ impl PythonDatabase { pub fn execute(&self, sql: &str) -> PyResult<()> { let mut iter = self.inner.run(sql).map_err(to_py_err)?; - while iter.next_tuple().map_err(to_py_err)?.is_some() {} + while iter.next_tuple(|_| ()).map_err(to_py_err)?.is_some() {} iter.done().map_err(to_py_err)?; Ok(()) @@ -290,8 +293,11 @@ impl PythonResultIter { pub fn next(&mut self, py: Python<'_>) -> PyResult> { let iter = self.inner_mut()?; - match iter.next_tuple().map_err(to_py_err)? { - Some(tuple) => tuple_to_python_row(py, tuple).map(Some), + match iter + .next_tuple(|tuple| tuple_to_python_row(py, tuple)) + .map_err(to_py_err)? + { + Some(row) => row.map(Some), None => Ok(None), } } @@ -308,8 +314,11 @@ impl PythonResultIter { .ok_or_else(|| PyValueError::new_err("iterator already consumed"))?; let mut rows = Vec::new(); - while let Some(tuple) = iter.next_tuple().map_err(to_py_err)? { - rows.push(tuple_to_python_row(py, tuple)?); + while let Some(row) = iter + .next_tuple(|tuple| tuple_to_python_row(py, tuple)) + .map_err(to_py_err)? + { + rows.push(row?); } iter.done().map_err(to_py_err)?; diff --git a/src/storage/lmdb.rs b/src/storage/lmdb.rs index d3f0c06d..39659355 100644 --- a/src/storage/lmdb.rs +++ b/src/storage/lmdb.rs @@ -371,7 +371,7 @@ mod tests { .unwrap(); let mut iter = kite_sql.run("select b from t1 where a = 2").unwrap(); - let tuple = iter.next().unwrap().unwrap(); + let tuple = iter.next_tuple(|_, tuple| tuple.clone()).unwrap().unwrap(); assert_eq!(tuple.values[0].to_string(), "20"); iter.done().unwrap(); } diff --git a/src/storage/memory.rs b/src/storage/memory.rs index 5e329126..36c802a1 100644 --- a/src/storage/memory.rs +++ b/src/storage/memory.rs @@ -200,7 +200,7 @@ mod wasm_tests { transaction.append_tuple( &mut table_codec, "test", - Tuple::new( + &Tuple::new( Some(DataValue::Int32(1)), vec![DataValue::Int32(1), DataValue::Boolean(true)], ), @@ -213,7 +213,7 @@ mod wasm_tests { transaction.append_tuple( &mut table_codec, "test", - Tuple::new( + &Tuple::new( Some(DataValue::Int32(2)), vec![DataValue::Int32(2), DataValue::Boolean(true)], ), @@ -345,7 +345,7 @@ mod native_tests { transaction.append_tuple( &mut table_codec, "test", - Tuple::new( + &Tuple::new( Some(DataValue::Int32(1)), vec![DataValue::Int32(1), DataValue::Boolean(true)], ), @@ -358,7 +358,7 @@ mod native_tests { transaction.append_tuple( &mut table_codec, "test", - Tuple::new( + &Tuple::new( Some(DataValue::Int32(2)), vec![DataValue::Int32(2), DataValue::Boolean(true)], ), diff --git a/src/storage/mod.rs b/src/storage/mod.rs index ec9500aa..6b121a63 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -385,7 +385,7 @@ pub trait Transaction: Sized { &mut self, table_codec: &mut TableCodec, table_name: &str, - tuple: Tuple, + tuple: &Tuple, serializers: I, is_overwrite: bool, ) -> Result<(), DatabaseError> @@ -400,7 +400,7 @@ pub trait Transaction: Sized { table_codec.with_tuple( table_name, tuple_id, - Some((&tuple, &mut write_value)), + Some((tuple, &mut write_value)), |key, value| { if !is_overwrite && self.exists(key)? { return Err(DatabaseError::DuplicatePrimaryKey); @@ -2289,7 +2289,7 @@ mod test { let plan_arena = PlanArena::new(&table_arena); let tuples = build_tuples(); - for tuple in tuples.iter().cloned() { + for tuple in tuples.iter() { transaction.append_tuple( &mut table_codec, "t1", @@ -2592,7 +2592,7 @@ mod test { for (tuple_id, index) in indexes.iter().cloned() { transaction.add_index(&mut table_codec, "t1", index, &tuple_id)?; } - for tuple in tuples.iter().cloned() { + for tuple in tuples.iter() { transaction.append_tuple( &mut table_codec, "t1", @@ -2715,13 +2715,7 @@ mod test { Index::new(index_id, &initial_tuple.values[2], IndexType::Normal), initial_tuple.pk.as_ref().unwrap(), )?; - setup_tx.append_tuple( - &mut table_codec, - "t1", - initial_tuple.clone(), - &serializers, - false, - )?; + setup_tx.append_tuple(&mut table_codec, "t1", &initial_tuple, &serializers, false)?; setup_tx.commit()?; index_id @@ -2765,13 +2759,7 @@ mod test { Index::new(index_id, &updated_tuple.values[2], IndexType::Normal), updated_tuple.pk.as_ref().unwrap(), )?; - writer_tx.append_tuple( - &mut table_codec, - "t1", - updated_tuple.clone(), - &serializers, - true, - )?; + writer_tx.append_tuple(&mut table_codec, "t1", &updated_tuple, &serializers, true)?; writer_tx.commit()?; let after_update = table_codec.with_tuple("t1", &tuple_id, None, |key, _| { diff --git a/src/storage/rocksdb.rs b/src/storage/rocksdb.rs index e2fc0dd7..37cee65f 100644 --- a/src/storage/rocksdb.rs +++ b/src/storage/rocksdb.rs @@ -877,7 +877,7 @@ mod test { transaction.append_tuple( &mut table_codec, "test", - Tuple::new( + &Tuple::new( Some(DataValue::Int32(1)), vec![DataValue::Int32(1), DataValue::Boolean(true)], ), @@ -890,7 +890,7 @@ mod test { transaction.append_tuple( &mut table_codec, "test", - Tuple::new( + &Tuple::new( Some(DataValue::Int32(2)), vec![DataValue::Int32(2), DataValue::Boolean(true)], ), diff --git a/src/wasm.rs b/src/wasm.rs index 1c5d1158..9380be87 100644 --- a/src/wasm.rs +++ b/src/wasm.rs @@ -170,7 +170,7 @@ impl WasmDatabase { pub fn execute(&self, sql: &str) -> Result<(), JsValue> { let mut iter = self.inner.run(sql).map_err(to_js_err)?; - while iter.next_borrowed_tuple().map_err(to_js_err)?.is_some() {} + while iter.next_tuple(|_, _| ()).map_err(to_js_err)?.is_some() {} Ok(()) } @@ -192,8 +192,11 @@ impl WasmResultIter { .inner .as_mut() .ok_or_else(|| to_js_err("iterator already consumed"))?; - match iter.next_borrowed_tuple().map_err(to_js_err)? { - Some(tuple) => tuple_to_wasm_row(tuple), + match iter + .next_tuple(|_, tuple| tuple_to_wasm_row(tuple)) + .map_err(to_js_err)? + { + Some(row) => row, None => Ok(JsValue::undefined()), } } @@ -230,8 +233,11 @@ impl WasmResultIter { .take() .ok_or_else(|| to_js_err("iterator already consumed"))?; let rows = Array::new(); - while let Some(tuple) = iter.next_borrowed_tuple().map_err(to_js_err)? { - rows.push(&tuple_to_wasm_row(tuple)?); + while let Some(row) = iter + .next_tuple(|_, tuple| tuple_to_wasm_row(tuple)) + .map_err(to_js_err)? + { + rows.push(&row?); } iter.done().map_err(to_js_err)?; Ok(rows.into()) diff --git a/tests/macros-test/src/main.rs b/tests/macros-test/src/main.rs index c620b4b1..b91cba13 100644 --- a/tests/macros-test/src/main.rs +++ b/tests/macros-test/src/main.rs @@ -66,6 +66,33 @@ mod test { (Tuple::new(None, values), schema) } + fn tuple_owned(tuple: &Tuple) -> Tuple { + Tuple { + pk: tuple.pk.clone(), + values: tuple.values.clone(), + } + } + + fn collect_result_tuples(mut iter: I) -> Result, DatabaseError> { + let mut rows = Vec::new(); + while let Some(row) = iter.next_tuple(|_, tuple| tuple_owned(tuple))? { + rows.push(row); + } + iter.done()?; + Ok(rows) + } + + fn drain_result_iter(mut iter: I) -> Result<(), DatabaseError> { + while iter.next_tuple(|_, _| ())?.is_some() {} + iter.done() + } + + fn has_result_row(mut iter: I) -> Result { + let exists = iter.next_tuple(|_, _| ())?.is_some(); + iter.done()?; + Ok(exists) + } + fn build_test_database() -> Result<(TempDir, Database), DatabaseError> { let temp_dir = TempDir::new().expect("create temp dir for ORM test"); let database = DataBaseBuilder::path(temp_dir.path()).build_rocksdb()?; @@ -328,9 +355,10 @@ mod test { fn test_from_tuple() { let table_arena = TableArenaCell::default(); let mut plan_arena = PlanArena::new(&table_arena); - let (tuple, schema) = build_tuple(&mut plan_arena); + let (mut tuple, schema) = build_tuple(&mut plan_arena); let schema = SchemaView::new(&schema, &plan_arena); - let my_struct = MyStruct::from((&schema, tuple)); + let my_struct = + ::from_query_row(&schema, &mut tuple).unwrap(); println!("{:?}", my_struct); @@ -356,7 +384,7 @@ mod test { true, ColumnDesc::new(LogicalType::Integer, None, true, None).unwrap(), ))); - let tuple = Tuple::new( + let mut tuple = Tuple::new( None, vec![ DataValue::Int32(9), @@ -371,7 +399,8 @@ mod test { let schema = SchemaView::new(&schema, &plan_arena); let derived = - ::from_query_row(&schema, tuple).unwrap(); + ::from_query_row(&schema, &mut tuple) + .unwrap(); assert_eq!(derived.c1, 9); assert_eq!(derived.name, "LOL"); @@ -448,9 +477,7 @@ mod test { } database.analyze_model::()?; - let mut iter = database.run("describe wallets")?; - let rows = iter.by_ref().collect::, _>>()?; - iter.done()?; + let rows = collect_result_tuples(database.run("describe wallets")?)?; let balance = rows .iter() @@ -476,9 +503,7 @@ mod test { create_model_table::(&mut database)?; - let mut iter = database.run("describe country_codes")?; - let rows = iter.by_ref().collect::, _>>()?; - iter.done()?; + let rows = collect_result_tuples(database.run("describe country_codes")?)?; let code = rows .iter() @@ -517,9 +542,7 @@ mod test { Some(MigratingUserV3 { id: 1, age: 18 }) ); - let describe_rows = database - .run("describe migrating_users")? - .collect::, _>>()?; + let describe_rows = collect_result_tuples(database.run("describe migrating_users")?)?; let column_names = describe_rows .iter() .filter_map(|row| match row.values.first() { @@ -727,25 +750,17 @@ mod test { .unwrap() as usize; assert_eq!(count, 2); - let exists = database - .bind(|ctx| { - ctx.from::()? - .filter(|e| e.column(User::id())?.eq(2))? - .exists() - })? - .next() - .transpose()? - .is_some(); + let exists = has_result_row(database.bind(|ctx| { + ctx.from::()? + .filter(|e| e.column(User::id())?.eq(2))? + .exists() + })?)?; assert!(exists); - let missing = database - .bind(|ctx| { - ctx.from::()? - .filter(|e| e.column(User::id())?.eq(99))? - .exists() - })? - .next() - .transpose()? - .is_some(); + let missing = has_result_row(database.bind(|ctx| { + ctx.from::()? + .filter(|e| e.column(User::id())?.eq(99))? + .exists() + })?)?; assert!(!missing); let two_users = database @@ -2353,7 +2368,7 @@ mod test { })? .finish() }) - .and_then(|iter| iter.collect::, _>>().map(|_| ())); + .and_then(drain_result_iter); assert!(join_subquery.is_err()); let group_by_subquery = database @@ -2367,7 +2382,7 @@ mod test { })? .finish() }) - .and_then(|iter| iter.collect::, _>>().map(|_| ())); + .and_then(drain_result_iter); assert!(group_by_subquery.is_err()); let having_subquery = database @@ -2385,7 +2400,7 @@ mod test { })? .finish() }) - .and_then(|iter| iter.collect::, _>>().map(|_| ())); + .and_then(drain_result_iter); assert!(having_subquery.is_err()); let sort_subquery = database @@ -2399,7 +2414,7 @@ mod test { })? .finish() }) - .and_then(|iter| iter.collect::, _>>().map(|_| ())); + .and_then(drain_result_iter); assert!(sort_subquery.is_err()); database.drop_table::()?; @@ -2452,9 +2467,8 @@ mod test { } database.analyze_model::()?; - let mut explain_iter = database.run("explain select age from users where age = 1050")?; - let explain_rows = explain_iter.by_ref().collect::, _>>()?; - explain_iter.done()?; + let explain_rows = + collect_result_tuples(database.run("explain select age from users where age = 1050")?)?; let explain_plan = explain_rows .iter() .filter_map(|row| match row.values.first() { diff --git a/tests/sqllogictest/src/lib.rs b/tests/sqllogictest/src/lib.rs index 17dd9785..f379fb35 100644 --- a/tests/sqllogictest/src/lib.rs +++ b/tests/sqllogictest/src/lib.rs @@ -72,14 +72,14 @@ fn collect_output( let types = vec![DefaultColumnType::Any; iter.schema(|schema| schema.len())]; let mut rows = Vec::new(); - while let Some(tuple) = iter.next_borrowed_tuple()? { - rows.push( - tuple - .values - .iter() - .map(|value| format!("{}", value)) - .collect(), - ); + while let Some(row) = iter.next_tuple(|_, tuple| { + tuple + .values + .iter() + .map(|value| format!("{}", value)) + .collect() + })? { + rows.push(row); } iter.done()?; if rows.is_empty() { diff --git a/tpcc/README.md b/tpcc/README.md index f3e13560..f723fe82 100644 --- a/tpcc/README.md +++ b/tpcc/README.md @@ -11,6 +11,35 @@ Use `./scripts/run_tpcc_matrix.sh` to run the TPCC performance comparison in one - If a run fails with a duplicate-key style error, the script clears that backend's database and retries that variant once. - Outputs are written to `tpcc/results//`, including `summary.md` and per-backend raw logs. +For more stable local numbers on machines that thermal-throttle under sustained TPCC load, use the Python runner: + +```shell +./scripts/run_tpcc_stable.py --build +``` + +It runs the same four variants, but waits before each variant until the machine has enough consecutive stable samples: + +- CPU temperature is at or below `--cool-temp-c` (default `65.0`). +- CPU usage is at or below `--idle-cpu-percent` (default `20.0`). +- At least `--min-cooldown-sec` seconds have passed after the previous variant (default `300`). +- The state remains stable for `--stable-samples` samples (default `3`, sampled every `10s`). + +Example shorter smoke run: + +```shell +./scripts/run_tpcc_stable.py --measure-time 60 --cool-temp-c 60 --min-cooldown-sec 180 +``` + +Before a formal local run, clear old TPCC data and Linux page cache so each matrix starts from a comparable state: + +```shell +rm -rf target/tpcc-stable-run-data target/tpcc-run-data kite_sql_tpcc kite_sql_tpcc.sqlite +sync +sudo sh -c 'echo 3 > /proc/sys/vm/drop_caches' +``` + +The runner reads CPU temperatures from Linux `/sys/class/hwmon` and `/sys/class/thermal` when available. If no CPU temperature sensor is exposed, it still gates on CPU usage and the fixed cooldown window, and records the missing temperature source in the raw log. + Duplicate-key note: The benchmark stores `history.h_date` as `timestamp(6)`, so high-throughput `Payment` transactions do not collide on second-level timestamp buckets. A duplicate-primary-key failure during TPCC should be treated as a run failure and investigated or rerun from a clean database. @@ -25,16 +54,18 @@ TPCC_DUPLICATE_RETRY=1 ./scripts/run_tpcc_matrix.sh - Tips: TPC-C currently runs as a single worker. ## 720s comparison -Local 720-second comparison on the machine above: +Local stable-run 720-second comparison on the machine above: | Backend | TpmC | New-Order p90 | Payment p90 | Order-Status p90 | Delivery p90 | Stock-Level p90 | | --- | ---: | ---: | ---: | ---: | ---: | ---: | -| KiteSQL LMDB | 73638 | 0.001s | 0.001s | 0.001s | 0.002s | 0.001s | -| KiteSQL RocksDB | 39051 | 0.001s | 0.001s | 0.002s | 0.009s | 0.001s | -| SQLite balanced | 56788 | 0.001s | 0.001s | 0.001s | 0.001s | 0.001s | -| SQLite practical | 44049 | 0.001s | 0.001s | 0.001s | 0.001s | 0.001s | - -- All rows are from fresh 720-second reruns with `--num-ware 1` and the default `--max-retry 5`. +| KiteSQL LMDB | 71345 | 0.001s | 0.001s | 0.001s | 0.002s | 0.001s | +| KiteSQL RocksDB | 40563 | 0.001s | 0.001s | 0.002s | 0.009s | 0.001s | +| SQLite balanced | 67527 | 0.001s | 0.001s | 0.001s | 0.001s | 0.001s | +| SQLite practical | 64774 | 0.001s | 0.001s | 0.001s | 0.001s | 0.001s | + +- All rows are from `./scripts/run_tpcc_stable.py --build` at `2026-06-23_00-50-48`. +- The stable-run gates were `temp<=65.0C`, `cpu<=20.0%`, `min_cooldown=300s`, `stable_samples=3`, and `sample_interval=10.0s`. +- All rows use `--num-ware 1`, `--max-retry 5`, and TPCC's default 720-second measure time. - SQLite rows use the `balanced` and `practical` profiles respectively. ### KiteSQL LMDB @@ -43,11 +74,11 @@ Transaction Summary (elapsed 720.0s) +--------------+---------+------+---------+-------+ | Transaction | Success | Late | Failure | Total | +--------------+---------+------+---------+-------+ -| New-Order | 883651 | 0 | 8950 | 892601 | -| Payment | 883630 | 0 | 0 | 883630 | -| Order-Status | 88363 | 0 | 0 | 88363 | -| Delivery | 88363 | 0 | 0 | 88363 | -| Stock-Level | 88363 | 0 | 0 | 88363 | +| New-Order | 856143 | 0 | 8829 | 864972 | +| Payment | 856120 | 0 | 0 | 856120 | +| Order-Status | 85612 | 0 | 0 | 85612 | +| Delivery | 85612 | 0 | 0 | 85612 | +| Stock-Level | 85612 | 0 | 0 | 85612 | +--------------+---------+------+---------+-------+ (all must be [OK]) [transaction percentage] @@ -67,41 +98,35 @@ Transaction Summary (elapsed 720.0s) 1.New-Order -0.001, 883629 -0.002, 19 -0.003, 3 +0.001, 856143 2.Payment -0.001, 883628 -0.002, 2 +0.001, 856120 3.Order-Status -0.001, 85670 -0.002, 2219 -0.003, 474 +0.001, 83223 +0.002, 2103 +0.003, 286 4.Delivery -0.002, 88351 -0.003, 8 -0.004, 2 -0.005, 1 -0.006, 1 +0.002, 85606 +0.003, 6 5.Stock-Level -0.001, 88363 +0.001, 85612 <90th Percentile RT (MaxRT)> - New-Order : 0.001 (0.003) - Payment : 0.001 (0.001) -Order-Status : 0.001 (0.003) - Delivery : 0.002 (0.006) - Stock-Level : 0.001 (0.001) + New-Order : 0.001 (0.001) + Payment : 0.001 (0.000) +Order-Status : 0.001 (0.002) + Delivery : 0.002 (0.002) + Stock-Level : 0.001 (0.000) -73638 Tpmc +71345 Tpmc ``` ### KiteSQL RocksDB @@ -110,11 +135,11 @@ Transaction Summary (elapsed 720.0s) +--------------+---------+------+---------+-------+ | Transaction | Success | Late | Failure | Total | +--------------+---------+------+---------+-------+ -| New-Order | 468621 | 0 | 4749 | 473370 | -| Payment | 468598 | 0 | 0 | 468598 | -| Order-Status | 46859 | 0 | 0 | 46859 | -| Delivery | 46860 | 0 | 0 | 46860 | -| Stock-Level | 46860 | 0 | 0 | 46860 | +| New-Order | 486763 | 0 | 5029 | 491792 | +| Payment | 486739 | 0 | 0 | 486739 | +| Order-Status | 48674 | 0 | 0 | 48674 | +| Delivery | 48674 | 0 | 0 | 48674 | +| Stock-Level | 48674 | 0 | 0 | 48674 | +--------------+---------+------+---------+-------+ (all must be [OK]) [transaction percentage] @@ -134,52 +159,53 @@ Transaction Summary (elapsed 720.0s) 1.New-Order -0.001, 468582 -0.002, 35 -0.003, 1 +0.001, 486709 +0.002, 54 2.Payment -0.001, 468590 -0.002, 4 +0.001, 486736 +0.002, 2 3.Order-Status -0.001, 41541 -0.002, 4161 -0.003, 702 -0.004, 332 -0.005, 116 -0.006, 6 +0.001, 43202 +0.002, 4215 +0.003, 717 +0.004, 395 +0.005, 137 +0.006, 7 +0.007, 1 4.Delivery -0.002, 2614 -0.003, 7226 -0.004, 6196 -0.005, 3654 -0.006, 5167 -0.007, 6457 -0.008, 6343 -0.009, 5644 -0.010, 3185 -0.011, 363 -0.012, 7 -0.013, 2 -0.015, 2 +0.002, 3572 +0.003, 8048 +0.004, 7083 +0.005, 4455 +0.006, 5238 +0.007, 6637 +0.008, 6138 +0.009, 4702 +0.010, 2626 +0.011, 164 +0.012, 4 +0.013, 3 +0.014, 3 +0.022, 1 5.Stock-Level -0.001, 46860 +0.001, 48674 <90th Percentile RT (MaxRT)> - New-Order : 0.001 (0.008) - Payment : 0.001 (0.008) -Order-Status : 0.002 (0.010) - Delivery : 0.009 (0.014) - Stock-Level : 0.001 (0.000) + New-Order : 0.001 (0.001) + Payment : 0.001 (0.007) +Order-Status : 0.002 (0.007) + Delivery : 0.009 (0.022) + Stock-Level : 0.001 (0.001) -39051 Tpmc +40563 Tpmc ``` ### SQLite balanced @@ -188,11 +214,11 @@ Transaction Summary (elapsed 720.0s) +--------------+---------+------+---------+-------+ | Transaction | Success | Late | Failure | Total | +--------------+---------+------+---------+-------+ -| New-Order | 681461 | 0 | 6997 | 688458 | -| Payment | 681435 | 0 | 0 | 681435 | -| Order-Status | 68143 | 0 | 0 | 68143 | -| Delivery | 68144 | 0 | 0 | 68144 | -| Stock-Level | 68144 | 0 | 0 | 68144 | +| New-Order | 810343 | 0 | 8322 | 818665 | +| Payment | 810320 | 0 | 0 | 810320 | +| Order-Status | 81032 | 0 | 0 | 81032 | +| Delivery | 81032 | 0 | 0 | 81032 | +| Stock-Level | 81032 | 0 | 0 | 81032 | +--------------+---------+------+---------+-------+ (all must be [OK]) [transaction percentage] @@ -212,34 +238,33 @@ Transaction Summary (elapsed 720.0s) 1.New-Order -0.001, 681310 -0.002, 151 +0.001, 810329 +0.002, 14 2.Payment -0.001, 681435 +0.001, 810320 3.Order-Status -0.001, 68143 +0.001, 81032 4.Delivery -0.001, 68121 -0.002, 23 +0.001, 81032 5.Stock-Level -0.001, 68144 +0.001, 81032 <90th Percentile RT (MaxRT)> New-Order : 0.001 (0.001) - Payment : 0.001 (0.001) + Payment : 0.001 (0.000) Order-Status : 0.001 (0.000) Delivery : 0.001 (0.001) Stock-Level : 0.001 (0.000) -56788 Tpmc +67527 Tpmc ``` ### SQLite practical @@ -248,11 +273,11 @@ Transaction Summary (elapsed 720.0s) +--------------+---------+------+---------+-------+ | Transaction | Success | Late | Failure | Total | +--------------+---------+------+---------+-------+ -| New-Order | 528594 | 0 | 5563 | 534157 | -| Payment | 528570 | 0 | 0 | 528570 | -| Order-Status | 52857 | 0 | 0 | 52857 | -| Delivery | 52857 | 0 | 0 | 52857 | -| Stock-Level | 52857 | 0 | 0 | 52857 | +| New-Order | 777304 | 0 | 8128 | 785432 | +| Payment | 777280 | 0 | 0 | 777280 | +| Order-Status | 77728 | 0 | 0 | 77728 | +| Delivery | 77728 | 0 | 0 | 77728 | +| Stock-Level | 77728 | 0 | 0 | 77728 | +--------------+---------+------+---------+-------+ (all must be [OK]) [transaction percentage] @@ -272,37 +297,33 @@ Transaction Summary (elapsed 720.0s) 1.New-Order -0.001, 528448 -0.002, 141 -0.003, 5 +0.001, 777302 +0.002, 2 2.Payment -0.001, 528567 -0.002, 3 +0.001, 777280 3.Order-Status -0.001, 52857 +0.001, 77728 4.Delivery -0.001, 52831 -0.002, 25 -0.003, 1 +0.001, 77728 5.Stock-Level -0.001, 52857 +0.001, 77728 <90th Percentile RT (MaxRT)> - New-Order : 0.001 (0.003) - Payment : 0.001 (0.002) + New-Order : 0.001 (0.001) + Payment : 0.001 (0.000) Order-Status : 0.001 (0.000) - Delivery : 0.001 (0.003) + Delivery : 0.001 (0.001) Stock-Level : 0.001 (0.000) -44049 Tpmc +64774 Tpmc ``` ## Refer to diff --git a/tpcc/src/backend/dual.rs b/tpcc/src/backend/dual.rs index c88d87fd..1a2793b9 100644 --- a/tpcc/src/backend/dual.rs +++ b/tpcc/src/backend/dual.rs @@ -119,11 +119,11 @@ pub struct DualTransaction<'a> { impl<'a> BackendTransaction for DualTransaction<'a> { type PreparedStatement = DualPreparedStatement<'a>; - fn query_one( + fn execute_drain( &mut self, statement: &mut Self::PreparedStatement, params: &[DbParam], - ) -> Result { + ) -> Result<(), TpccError> { let spec = statement.spec.clone(); let kitesql_iter = self.kitesql.execute_raw(&mut statement.kitesql, params)?; @@ -131,71 +131,53 @@ impl<'a> BackendTransaction for DualTransaction<'a> { if is_select_sql(&spec) { if spec.sql == STOCK_LEVEL_DISTINCT_SQL { - let kitesql_rows = collect_all_rows(kitesql_iter)?; - let sqlite_rows = collect_all_rows(sqlite_iter)?; - compare_unordered_rows(&kitesql_rows, &sqlite_rows, spec.sql)?; - return kitesql_rows - .into_iter() - .next() - .ok_or(TpccError::EmptyTuples); + let (kitesql_counts, kitesql_len) = collect_kitesql_value_counts(kitesql_iter)?; + let sqlite_rows = collect_sqlite_rows(sqlite_iter)?; + compare_unordered_rows(kitesql_counts, kitesql_len, &sqlite_rows, spec.sql) + } else { + drain_and_compare_ordered(kitesql_iter, sqlite_iter, spec.sql) } - query_ordered_nth(kitesql_iter, sqlite_iter, spec.sql, 0) } else { drain_sqlite_iter(sqlite_iter)?; - let mut kitesql_iter = kitesql_iter; - match kitesql_iter.next() { - Some(row) => row, - None => Err(TpccError::EmptyTuples), - } + drain_kitesql_iter(kitesql_iter) } } - fn query_nth( + fn with_query_one( &mut self, statement: &mut Self::PreparedStatement, params: &[DbParam], - n: usize, - ) -> Result { - let spec = statement.spec.clone(); - - let kitesql_iter = self.kitesql.execute_raw(&mut statement.kitesql, params)?; - let sqlite_iter = self.sqlite.execute_raw(&mut statement.sqlite, params)?; - - if spec.sql == STOCK_LEVEL_DISTINCT_SQL { - let kitesql_rows = collect_all_rows(kitesql_iter)?; - let sqlite_rows = collect_all_rows(sqlite_iter)?; - compare_unordered_rows(&kitesql_rows, &sqlite_rows, spec.sql)?; - return kitesql_rows - .into_iter() - .nth(n) - .ok_or(TpccError::EmptyTuples); - } - - query_ordered_nth(kitesql_iter, sqlite_iter, spec.sql, n) + visitor: &mut dyn FnMut(&Tuple) -> Result<(), TpccError>, + ) -> Result<(), TpccError> { + self.with_query_nth(statement, params, 0, visitor) } - fn execute_drain( + fn with_query_nth( &mut self, statement: &mut Self::PreparedStatement, params: &[DbParam], + n: usize, + visitor: &mut dyn FnMut(&Tuple) -> Result<(), TpccError>, ) -> Result<(), TpccError> { let spec = statement.spec.clone(); let kitesql_iter = self.kitesql.execute_raw(&mut statement.kitesql, params)?; let sqlite_iter = self.sqlite.execute_raw(&mut statement.sqlite, params)?; - if is_select_sql(&spec) { - if spec.sql == STOCK_LEVEL_DISTINCT_SQL { - let kitesql_rows = collect_all_rows(kitesql_iter)?; - let sqlite_rows = collect_all_rows(sqlite_iter)?; - compare_unordered_rows(&kitesql_rows, &sqlite_rows, spec.sql) - } else { - drain_and_compare_ordered(kitesql_iter, sqlite_iter, spec.sql) - } - } else { + if spec.sql == STOCK_LEVEL_DISTINCT_SQL { + let (kitesql_counts, kitesql_len) = collect_kitesql_value_counts(kitesql_iter)?; + let sqlite_rows = collect_sqlite_rows(sqlite_iter)?; + compare_unordered_rows(kitesql_counts, kitesql_len, &sqlite_rows, spec.sql)?; + let tuple = sqlite_rows.get(n).ok_or(TpccError::EmptyTuples)?; + return visitor(tuple); + } + + if !is_select_sql(&spec) { drain_sqlite_iter(sqlite_iter)?; - drain_kitesql_iter(kitesql_iter) + return with_kitesql_nth(kitesql_iter, n, visitor); } + + with_ordered_nth(kitesql_iter, sqlite_iter, spec.sql, n, visitor) } fn commit(self) -> Result<(), TpccError> { @@ -224,16 +206,11 @@ fn drain_sqlite_iter(mut iter: SqliteResult<'_, '_>) -> Result<(), TpccError> { fn drain_kitesql_iter( mut iter: KiteSqlTxnResult<'_, T>, ) -> Result<(), TpccError> { - while let Some(row) = iter.next() { - row?; - } + while iter.skip_next_tuple()? {} Ok(()) } -fn collect_all_rows(mut iter: I) -> Result, TpccError> -where - I: Iterator>, -{ +fn collect_sqlite_rows(mut iter: SqliteResult<'_, '_>) -> Result, TpccError> { let mut rows = Vec::new(); while let Some(row) = iter.next() { rows.push(row?); @@ -241,42 +218,71 @@ where Ok(rows) } -fn query_ordered_nth( +fn collect_kitesql_value_counts( + mut iter: KiteSqlTxnResult<'_, T>, +) -> Result<(HashMap, usize>, usize), TpccError> { + let mut counts = HashMap::new(); + let mut len = 0; + while let Some(()) = iter.with_next_tuple(|tuple| { + *counts.entry(tuple.values.clone()).or_insert(0) += 1; + len += 1; + Ok(()) + })? {} + Ok((counts, len)) +} + +fn with_kitesql_nth( + mut iter: KiteSqlTxnResult<'_, T>, + n: usize, + visitor: &mut dyn FnMut(&Tuple) -> Result<(), TpccError>, +) -> Result<(), TpccError> { + for _ in 0..n { + if !iter.skip_next_tuple()? { + return Err(TpccError::EmptyTuples); + } + } + iter.with_next_tuple(|tuple| visitor(tuple))? + .ok_or(TpccError::EmptyTuples) +} + +fn with_ordered_nth( mut kitesql_iter: KiteSqlTxnResult<'_, T>, mut sqlite_iter: SqliteResult<'_, '_>, sql: &'static str, n: usize, -) -> Result { - let mut result = None; + visitor: &mut dyn FnMut(&Tuple) -> Result<(), TpccError>, +) -> Result<(), TpccError> { + let mut visited = false; let mut index = 0usize; loop { - match kitesql_iter.next() { - Some(kitesql_row) => { - let kitesql_tuple = kitesql_row?; - let sqlite_tuple = match sqlite_iter.next() { - Some(row) => row?, - None => { - return Err(TpccError::BackendMismatch(format!( - "SQLite returned fewer rows for SQL: {}", - sql - ))) - } - }; - if kitesql_tuple.values != sqlite_tuple.values { - println!("[Dual] mismatch SQL: {}", sql); - println!(" KiteSQL row: {:?}", kitesql_tuple.values); - println!(" SQLite row: {:?}", sqlite_tuple.values); + match kitesql_iter.with_next_tuple(|kitesql_tuple| { + let sqlite_tuple = match sqlite_iter.next() { + Some(row) => row?, + None => { return Err(TpccError::BackendMismatch(format!( - "Result mismatch for SQL: {}", + "SQLite returned fewer rows for SQL: {}", sql - ))); - } - if index == n { - result = Some(kitesql_tuple.clone()); + ))) } - index += 1; + }; + if kitesql_tuple.values != sqlite_tuple.values { + println!("[Dual] mismatch SQL: {}", sql); + println!(" KiteSQL row: {:?}", kitesql_tuple.values); + println!(" SQLite row: {:?}", sqlite_tuple.values); + return Err(TpccError::BackendMismatch(format!( + "Result mismatch for SQL: {}", + sql + ))); + } + if index == n { + visitor(&sqlite_tuple)?; + visited = true; } + index += 1; + Ok(()) + })? { + Some(()) => {} None => { if let Some(extra) = sqlite_iter.next() { extra?; @@ -285,7 +291,11 @@ fn query_ordered_nth( sql ))); } - return result.ok_or(TpccError::EmptyTuples); + return if visited { + Ok(()) + } else { + Err(TpccError::EmptyTuples) + }; } } } @@ -297,28 +307,28 @@ fn drain_and_compare_ordered( sql: &'static str, ) -> Result<(), TpccError> { loop { - match kitesql_iter.next() { - Some(kitesql_row) => { - let kitesql_tuple = kitesql_row?; - let sqlite_tuple = match sqlite_iter.next() { - Some(row) => row?, - None => { - return Err(TpccError::BackendMismatch(format!( - "SQLite returned fewer rows for SQL: {}", - sql - ))) - } - }; - if kitesql_tuple.values != sqlite_tuple.values { - println!("[Dual] mismatch SQL: {}", sql); - println!(" KiteSQL row: {:?}", kitesql_tuple.values); - println!(" SQLite row: {:?}", sqlite_tuple.values); + match kitesql_iter.with_next_tuple(|kitesql_tuple| { + let sqlite_tuple = match sqlite_iter.next() { + Some(row) => row?, + None => { return Err(TpccError::BackendMismatch(format!( - "Result mismatch for SQL: {}", + "SQLite returned fewer rows for SQL: {}", sql - ))); + ))) } + }; + if kitesql_tuple.values != sqlite_tuple.values { + println!("[Dual] mismatch SQL: {}", sql); + println!(" KiteSQL row: {:?}", kitesql_tuple.values); + println!(" SQLite row: {:?}", sqlite_tuple.values); + return Err(TpccError::BackendMismatch(format!( + "Result mismatch for SQL: {}", + sql + ))); } + Ok(()) + })? { + Some(()) => {} None => { if let Some(extra) = sqlite_iter.next() { extra?; @@ -334,21 +344,18 @@ fn drain_and_compare_ordered( } fn compare_unordered_rows( - kitesql_rows: &[Tuple], + mut counts: HashMap, usize>, + kitesql_len: usize, sqlite_rows: &[Tuple], sql: &'static str, ) -> Result<(), TpccError> { - if kitesql_rows.len() != sqlite_rows.len() { + if kitesql_len != sqlite_rows.len() { return Err(TpccError::BackendMismatch(format!( "SQLite returned different row count for SQL: {}", sql ))); } - let mut counts: HashMap, usize> = HashMap::new(); - for row in kitesql_rows { - *counts.entry(row.values.clone()).or_insert(0) += 1; - } for row in sqlite_rows { match counts.get_mut(&row.values) { Some(count) => { diff --git a/tpcc/src/backend/kitesql_lmdb.rs b/tpcc/src/backend/kitesql_lmdb.rs index 86c1d06c..92026ad9 100644 --- a/tpcc/src/backend/kitesql_lmdb.rs +++ b/tpcc/src/backend/kitesql_lmdb.rs @@ -109,41 +109,13 @@ impl<'a> KiteSqlLmdbTransactionWrapper<'a> { impl<'a> BackendTransaction for KiteSqlLmdbTransactionWrapper<'a> { type PreparedStatement = KiteSqlPreparedStatement; - fn query_one( - &mut self, - statement: &mut Self::PreparedStatement, - params: &[DbParam], - ) -> Result { - self.execute_raw(statement, params)? - .next_borrowed_tuple()? - .cloned() - .ok_or(TpccError::EmptyTuples) - } - - fn query_nth( - &mut self, - statement: &mut Self::PreparedStatement, - params: &[DbParam], - n: usize, - ) -> Result { - let mut iter = self.execute_raw(statement, params)?; - for _ in 0..n { - if iter.next_borrowed_tuple()?.is_none() { - return Err(TpccError::EmptyTuples); - } - } - iter.next_borrowed_tuple()? - .cloned() - .ok_or(TpccError::EmptyTuples) - } - fn execute_drain( &mut self, statement: &mut Self::PreparedStatement, params: &[DbParam], ) -> Result<(), TpccError> { let mut iter = self.execute_raw(statement, params)?; - while iter.next_borrowed_tuple()?.is_some() {} + while iter.skip_next_tuple()? {} Ok(()) } @@ -154,8 +126,8 @@ impl<'a> BackendTransaction for KiteSqlLmdbTransactionWrapper<'a> { visitor: &mut dyn FnMut(&Tuple) -> Result<(), TpccError>, ) -> Result<(), TpccError> { let mut iter = self.execute_raw(statement, params)?; - let tuple = iter.next_borrowed_tuple()?.ok_or(TpccError::EmptyTuples)?; - visitor(tuple) + iter.with_next_tuple(|tuple| visitor(tuple))? + .ok_or(TpccError::EmptyTuples) } fn with_query_nth( @@ -167,12 +139,12 @@ impl<'a> BackendTransaction for KiteSqlLmdbTransactionWrapper<'a> { ) -> Result<(), TpccError> { let mut iter = self.execute_raw(statement, params)?; for _ in 0..n { - if iter.next_borrowed_tuple()?.is_none() { + if !iter.skip_next_tuple()? { return Err(TpccError::EmptyTuples); } } - let tuple = iter.next_borrowed_tuple()?.ok_or(TpccError::EmptyTuples)?; - visitor(tuple) + iter.with_next_tuple(|tuple| visitor(tuple))? + .ok_or(TpccError::EmptyTuples) } fn commit(self) -> Result<(), TpccError> { diff --git a/tpcc/src/backend/kitesql_rocksdb.rs b/tpcc/src/backend/kitesql_rocksdb.rs index ce225111..6a6e14da 100644 --- a/tpcc/src/backend/kitesql_rocksdb.rs +++ b/tpcc/src/backend/kitesql_rocksdb.rs @@ -182,41 +182,13 @@ impl<'a, S: Storage> KiteSqlRocksTransaction<'a, S> { impl<'a, S: Storage> BackendTransaction for KiteSqlRocksTransaction<'a, S> { type PreparedStatement = KiteSqlPreparedStatement; - fn query_one( - &mut self, - statement: &mut Self::PreparedStatement, - params: &[DbParam], - ) -> Result { - self.execute_raw(statement, params)? - .next_borrowed_tuple()? - .cloned() - .ok_or(TpccError::EmptyTuples) - } - - fn query_nth( - &mut self, - statement: &mut Self::PreparedStatement, - params: &[DbParam], - n: usize, - ) -> Result { - let mut iter = self.execute_raw(statement, params)?; - for _ in 0..n { - if iter.next_borrowed_tuple()?.is_none() { - return Err(TpccError::EmptyTuples); - } - } - iter.next_borrowed_tuple()? - .cloned() - .ok_or(TpccError::EmptyTuples) - } - fn execute_drain( &mut self, statement: &mut Self::PreparedStatement, params: &[DbParam], ) -> Result<(), TpccError> { let mut iter = self.execute_raw(statement, params)?; - while iter.next_borrowed_tuple()?.is_some() {} + while iter.skip_next_tuple()? {} Ok(()) } @@ -227,8 +199,8 @@ impl<'a, S: Storage> BackendTransaction for KiteSqlRocksTransaction<'a, S> { visitor: &mut dyn FnMut(&Tuple) -> Result<(), TpccError>, ) -> Result<(), TpccError> { let mut iter = self.execute_raw(statement, params)?; - let tuple = iter.next_borrowed_tuple()?.ok_or(TpccError::EmptyTuples)?; - visitor(tuple) + iter.with_next_tuple(|tuple| visitor(tuple))? + .ok_or(TpccError::EmptyTuples) } fn with_query_nth( @@ -240,12 +212,12 @@ impl<'a, S: Storage> BackendTransaction for KiteSqlRocksTransaction<'a, S> { ) -> Result<(), TpccError> { let mut iter = self.execute_raw(statement, params)?; for _ in 0..n { - if iter.next_borrowed_tuple()?.is_none() { + if !iter.skip_next_tuple()? { return Err(TpccError::EmptyTuples); } } - let tuple = iter.next_borrowed_tuple()?.ok_or(TpccError::EmptyTuples)?; - visitor(tuple) + iter.with_next_tuple(|tuple| visitor(tuple))? + .ok_or(TpccError::EmptyTuples) } fn commit(self) -> Result<(), TpccError> { @@ -261,15 +233,25 @@ impl<'a, T: Transaction + 'a> KiteSqlTxnResult<'a, T> { Self(iter) } - pub(crate) fn next_borrowed_tuple(&mut self) -> Result, TpccError> { - self.0.next_borrowed_tuple().map_err(TpccError::from) + pub(crate) fn skip_next_tuple(&mut self) -> Result { + Ok(self + .0 + .next_tuple(|_, _| ()) + .map_err(TpccError::from)? + .is_some()) } -} -impl Iterator for KiteSqlTxnResult<'_, T> { - type Item = Result; - - fn next(&mut self) -> Option { - self.0.next().map(|item| item.map_err(TpccError::from)) + pub(crate) fn with_next_tuple( + &mut self, + f: impl FnOnce(&Tuple) -> Result, + ) -> Result, TpccError> { + match self + .0 + .next_tuple(|_, tuple| f(tuple)) + .map_err(TpccError::from)? + { + Some(result) => result.map(Some), + None => Ok(None), + } } } diff --git a/tpcc/src/backend/mod.rs b/tpcc/src/backend/mod.rs index 9a0fe915..1ee25287 100644 --- a/tpcc/src/backend/mod.rs +++ b/tpcc/src/backend/mod.rs @@ -51,19 +51,6 @@ pub trait BackendControl: SimpleExecutor { pub trait BackendTransaction { type PreparedStatement: PreparedStatement; - fn query_one( - &mut self, - statement: &mut Self::PreparedStatement, - params: &[DbParam], - ) -> Result; - - fn query_nth( - &mut self, - statement: &mut Self::PreparedStatement, - params: &[DbParam], - n: usize, - ) -> Result; - fn execute_drain( &mut self, statement: &mut Self::PreparedStatement, @@ -75,10 +62,7 @@ pub trait BackendTransaction { statement: &mut Self::PreparedStatement, params: &[DbParam], visitor: &mut dyn FnMut(&Tuple) -> Result<(), TpccError>, - ) -> Result<(), TpccError> { - let tuple = self.query_one(statement, params)?; - visitor(&tuple) - } + ) -> Result<(), TpccError>; fn with_query_nth( &mut self, @@ -86,10 +70,7 @@ pub trait BackendTransaction { params: &[DbParam], n: usize, visitor: &mut dyn FnMut(&Tuple) -> Result<(), TpccError>, - ) -> Result<(), TpccError> { - let tuple = self.query_nth(statement, params, n)?; - visitor(&tuple) - } + ) -> Result<(), TpccError>; fn commit(self) -> Result<(), TpccError>; } diff --git a/tpcc/src/backend/sqlite.rs b/tpcc/src/backend/sqlite.rs index 1ac4606c..f690a606 100644 --- a/tpcc/src/backend/sqlite.rs +++ b/tpcc/src/backend/sqlite.rs @@ -148,45 +148,44 @@ impl Drop for SqliteTransaction<'_> { impl<'a> BackendTransaction for SqliteTransaction<'a> { type PreparedStatement = SqlitePreparedStatement<'a>; - fn query_one( + fn execute_drain( &mut self, statement: &mut Self::PreparedStatement, params: &[DbParam], - ) -> Result { - match self.execute_raw(statement, params)?.next() { - Some(row) => row, - None => Err(TpccError::EmptyTuples), + ) -> Result<(), TpccError> { + let mut iter = self.execute_raw(statement, params)?; + while let Some(row) = iter.next() { + row?; } + Ok(()) } - fn query_nth( + fn with_query_one( &mut self, statement: &mut Self::PreparedStatement, params: &[DbParam], - n: usize, - ) -> Result { + visitor: &mut dyn FnMut(&Tuple) -> Result<(), TpccError>, + ) -> Result<(), TpccError> { let mut iter = self.execute_raw(statement, params)?; - for _ in 0..n { - if iter.next().transpose()?.is_none() { - return Err(TpccError::EmptyTuples); - } - } - match iter.next() { - Some(row) => row, - None => Err(TpccError::EmptyTuples), - } + let tuple = iter.next().transpose()?.ok_or(TpccError::EmptyTuples)?; + visitor(&tuple) } - fn execute_drain( + fn with_query_nth( &mut self, statement: &mut Self::PreparedStatement, params: &[DbParam], + n: usize, + visitor: &mut dyn FnMut(&Tuple) -> Result<(), TpccError>, ) -> Result<(), TpccError> { let mut iter = self.execute_raw(statement, params)?; - while let Some(row) = iter.next() { - row?; + for _ in 0..n { + if iter.next().transpose()?.is_none() { + return Err(TpccError::EmptyTuples); + } } - Ok(()) + let tuple = iter.next().transpose()?.ok_or(TpccError::EmptyTuples)?; + visitor(&tuple) } fn commit(mut self) -> Result<(), TpccError> { diff --git a/tpcc/src/main.rs b/tpcc/src/main.rs index 1ff185bd..06849802 100644 --- a/tpcc/src/main.rs +++ b/tpcc/src/main.rs @@ -810,6 +810,7 @@ impl_from_tpcc_error!(std::io::Error, Io); #[test] fn explain_tpcc() -> Result<(), DatabaseError> { use kite_sql::db::{DataBaseBuilder, ResultIter}; + use kite_sql::types::tuple::Tuple; fn create_table(mut iter: I) -> Result { let mut output = iter.schema(|schema| { @@ -822,8 +823,7 @@ fn explain_tpcc() -> Result<(), DatabaseError> { if !output.is_empty() { output.push('\n'); } - for tuple in iter.by_ref() { - let tuple = tuple?; + while let Some(()) = iter.next_tuple(|_, tuple| { output.push_str( &tuple .values @@ -833,71 +833,94 @@ fn explain_tpcc() -> Result<(), DatabaseError> { .join("\t"), ); output.push('\n'); - } + })? {} iter.done()?; Ok(output) } + fn with_next_tuple( + mut iter: I, + f: impl FnOnce(&Tuple) -> R, + ) -> Result { + let value = iter + .next_tuple(|_, tuple| f(tuple))? + .expect("expected one tuple"); + iter.done()?; + Ok(value) + } + let database = DataBaseBuilder::path(tpcc_db_path()).build_lmdb()?; let mut tx = database.new_transaction()?; - let customer_tuple = tx - .run("SELECT c_w_id, c_d_id, c_id, c_last, c_balance, c_data FROM customer limit 1")? - .next() - .unwrap()?; - let district_tuple = tx - .run("SELECT d_id, d_w_id, d_next_o_id FROM district limit 1")? - .next() - .unwrap()?; - let item_tuple = tx.run("SELECT i_id FROM item limit 1")?.next().unwrap()?; - let stock_tuple = tx - .run("SELECT s_i_id, s_w_id, s_quantity FROM stock limit 1")? - .next() - .unwrap()?; - let orders_tuple = tx - .run("SELECT o_w_id, o_d_id, o_c_id, o_id, o_carrier_id FROM orders limit 1")? - .next() - .unwrap()?; - let order_line_tuple = tx - .run("SELECT ol_w_id, ol_d_id, ol_o_id, ol_delivery_d FROM order_line limit 1")? - .next() - .unwrap()?; - let new_order_tuple = tx - .run("SELECT no_d_id, no_w_id, no_o_id FROM new_orders limit 1")? - .next() - .unwrap()?; - - let c_w_id = customer_tuple.values[0].clone(); - let c_d_id = customer_tuple.values[1].clone(); - let c_id = customer_tuple.values[2].clone(); - let c_last = customer_tuple.values[3].clone(); - let c_balance = customer_tuple.values[4].clone(); - let c_data = customer_tuple.values[5].clone(); - - let d_id = district_tuple.values[0].clone(); - let d_w_id = district_tuple.values[1].clone(); - let d_next_o_id = district_tuple.values[2].clone(); - - let i_id = item_tuple.values[0].clone(); - - let s_i_id = stock_tuple.values[0].clone(); - let s_w_id = stock_tuple.values[1].clone(); - let s_quantity = stock_tuple.values[2].clone(); - - let o_w_id = orders_tuple.values[0].clone(); - let o_d_id = orders_tuple.values[1].clone(); - let o_c_id = orders_tuple.values[2].clone(); - let o_id = orders_tuple.values[3].clone(); - let o_carrier_id = orders_tuple.values[4].clone(); - - let ol_w_id = order_line_tuple.values[0].clone(); - let ol_d_id = order_line_tuple.values[1].clone(); - let ol_o_id = order_line_tuple.values[2].clone(); - let ol_delivery_d = order_line_tuple.values[3].clone(); - - let no_d_id = new_order_tuple.values[0].clone(); - let no_w_id = new_order_tuple.values[1].clone(); - let no_o_id = new_order_tuple.values[2].clone(); + let (c_w_id, c_d_id, c_id, c_last, c_balance, c_data) = with_next_tuple( + tx.run("SELECT c_w_id, c_d_id, c_id, c_last, c_balance, c_data FROM customer limit 1")?, + |tuple| { + ( + tuple.values[0].clone(), + tuple.values[1].clone(), + tuple.values[2].clone(), + tuple.values[3].clone(), + tuple.values[4].clone(), + tuple.values[5].clone(), + ) + }, + )?; + let (d_id, d_w_id, d_next_o_id) = with_next_tuple( + tx.run("SELECT d_id, d_w_id, d_next_o_id FROM district limit 1")?, + |tuple| { + ( + tuple.values[0].clone(), + tuple.values[1].clone(), + tuple.values[2].clone(), + ) + }, + )?; + let i_id = with_next_tuple(tx.run("SELECT i_id FROM item limit 1")?, |tuple| { + tuple.values[0].clone() + })?; + let (s_i_id, s_w_id, s_quantity) = with_next_tuple( + tx.run("SELECT s_i_id, s_w_id, s_quantity FROM stock limit 1")?, + |tuple| { + ( + tuple.values[0].clone(), + tuple.values[1].clone(), + tuple.values[2].clone(), + ) + }, + )?; + let (o_w_id, o_d_id, o_c_id, o_id, o_carrier_id) = with_next_tuple( + tx.run("SELECT o_w_id, o_d_id, o_c_id, o_id, o_carrier_id FROM orders limit 1")?, + |tuple| { + ( + tuple.values[0].clone(), + tuple.values[1].clone(), + tuple.values[2].clone(), + tuple.values[3].clone(), + tuple.values[4].clone(), + ) + }, + )?; + let (ol_w_id, ol_d_id, ol_o_id, ol_delivery_d) = with_next_tuple( + tx.run("SELECT ol_w_id, ol_d_id, ol_o_id, ol_delivery_d FROM order_line limit 1")?, + |tuple| { + ( + tuple.values[0].clone(), + tuple.values[1].clone(), + tuple.values[2].clone(), + tuple.values[3].clone(), + ) + }, + )?; + let (no_d_id, no_w_id, no_o_id) = with_next_tuple( + tx.run("SELECT no_d_id, no_w_id, no_o_id FROM new_orders limit 1")?, + |tuple| { + ( + tuple.values[0].clone(), + tuple.values[1].clone(), + tuple.values[2].clone(), + ) + }, + )?; // ORDER { println!("========Explain on Order");