From 94be261e76e9bef061649be407e89e949da3ec1e Mon Sep 17 00:00:00 2001 From: kould Date: Sun, 21 Jun 2026 09:39:16 +0800 Subject: [PATCH] perf: reuse sqlite prepared statements in tpcc --- README.md | 4 +- tpcc/README.md | 77 ++++++++--------- tpcc/src/backend/dual.rs | 88 +++++++++++++------ tpcc/src/backend/kitesql_lmdb.rs | 33 ++++--- tpcc/src/backend/kitesql_rocksdb.rs | 33 ++++--- tpcc/src/backend/mod.rs | 43 +++++----- tpcc/src/backend/sqlite.rs | 129 +++++++++++++++++----------- tpcc/src/delivery.rs | 32 +++---- tpcc/src/main.rs | 60 +++++++------ tpcc/src/new_ord.rs | 40 ++++----- tpcc/src/order_stat.rs | 28 +++--- tpcc/src/payment.rs | 42 ++++----- tpcc/src/slev.rs | 24 +++--- 13 files changed, 346 insertions(+), 287 deletions(-) diff --git a/README.md b/README.md index a0d40765..4e2f0c7b 100755 --- a/README.md +++ b/README.md @@ -204,8 +204,8 @@ Recent 720-second local comparison on the machine above: | --- | ---: | ---: | ---: | ---: | ---: | ---: | | KiteSQL LMDB | 73638 | 0.001s | 0.001s | 0.001s | 0.002s | 0.001s | | KiteSQL RocksDB | 39051 | 0.001s | 0.001s | 0.002s | 0.009s | 0.001s | -| SQLite balanced | 39835 | 0.001s | 0.001s | 0.001s | 0.001s | 0.001s | -| SQLite practical | 37911 | 0.001s | 0.001s | 0.001s | 0.001s | 0.001s | +| SQLite balanced | 56788 | 0.001s | 0.001s | 0.001s | 0.001s | 0.001s | +| SQLite practical | 44049 | 0.001s | 0.001s | 0.001s | 0.001s | 0.001s | The detailed raw outputs are recorded in [tpcc/README.md](tpcc/README.md). #### 👉[check more](tpcc/README.md) diff --git a/tpcc/README.md b/tpcc/README.md index 57a89810..f3e13560 100644 --- a/tpcc/README.md +++ b/tpcc/README.md @@ -31,12 +31,11 @@ Local 720-second comparison on the machine above: | --- | ---: | ---: | ---: | ---: | ---: | ---: | | KiteSQL LMDB | 73638 | 0.001s | 0.001s | 0.001s | 0.002s | 0.001s | | KiteSQL RocksDB | 39051 | 0.001s | 0.001s | 0.002s | 0.009s | 0.001s | -| SQLite balanced | 39835 | 0.001s | 0.001s | 0.001s | 0.001s | 0.001s | -| SQLite practical | 37911 | 0.001s | 0.001s | 0.001s | 0.001s | 0.001s | +| SQLite balanced | 56788 | 0.001s | 0.001s | 0.001s | 0.001s | 0.001s | +| SQLite practical | 44049 | 0.001s | 0.001s | 0.001s | 0.001s | 0.001s | - All rows are from fresh 720-second reruns with `--num-ware 1` and the default `--max-retry 5`. - SQLite rows use the `balanced` and `practical` profiles respectively. -- Raw logs for this run were generated under `tpcc/results/2026-06-20_12-16-15/`. ### KiteSQL LMDB ```shell @@ -189,11 +188,11 @@ Transaction Summary (elapsed 720.0s) +--------------+---------+------+---------+-------+ | Transaction | Success | Late | Failure | Total | +--------------+---------+------+---------+-------+ -| New-Order | 478016 | 0 | 4738 | 482754 | -| Payment | 477993 | 0 | 0 | 477993 | -| Order-Status | 47800 | 0 | 0 | 47800 | -| Delivery | 47799 | 0 | 0 | 47799 | -| Stock-Level | 47799 | 0 | 0 | 47799 | +| New-Order | 681461 | 0 | 6997 | 688458 | +| Payment | 681435 | 0 | 0 | 681435 | +| Order-Status | 68143 | 0 | 0 | 68143 | +| Delivery | 68144 | 0 | 0 | 68144 | +| Stock-Level | 68144 | 0 | 0 | 68144 | +--------------+---------+------+---------+-------+ (all must be [OK]) [transaction percentage] @@ -213,37 +212,34 @@ Transaction Summary (elapsed 720.0s) 1.New-Order -0.001, 472255 -0.002, 5697 -0.003, 64 +0.001, 681310 +0.002, 151 2.Payment -0.001, 477992 -0.002, 1 +0.001, 681435 3.Order-Status -0.001, 47800 +0.001, 68143 4.Delivery -0.001, 46716 -0.002, 1070 -0.003, 13 +0.001, 68121 +0.002, 23 5.Stock-Level -0.001, 47799 +0.001, 68144 <90th Percentile RT (MaxRT)> - New-Order : 0.001 (0.003) + New-Order : 0.001 (0.001) Payment : 0.001 (0.001) -Order-Status : 0.001 (0.001) - Delivery : 0.001 (0.003) +Order-Status : 0.001 (0.000) + Delivery : 0.001 (0.001) Stock-Level : 0.001 (0.000) -39835 Tpmc +56788 Tpmc ``` ### SQLite practical @@ -252,11 +248,11 @@ Transaction Summary (elapsed 720.0s) +--------------+---------+------+---------+-------+ | Transaction | Success | Late | Failure | Total | +--------------+---------+------+---------+-------+ -| New-Order | 454941 | 0 | 4578 | 459519 | -| Payment | 454917 | 0 | 0 | 454917 | -| Order-Status | 45491 | 0 | 0 | 45491 | -| Delivery | 45492 | 0 | 0 | 45492 | -| Stock-Level | 45491 | 0 | 0 | 45491 | +| New-Order | 528594 | 0 | 5563 | 534157 | +| Payment | 528570 | 0 | 0 | 528570 | +| Order-Status | 52857 | 0 | 0 | 52857 | +| Delivery | 52857 | 0 | 0 | 52857 | +| Stock-Level | 52857 | 0 | 0 | 52857 | +--------------+---------+------+---------+-------+ (all must be [OK]) [transaction percentage] @@ -276,36 +272,37 @@ Transaction Summary (elapsed 720.0s) 1.New-Order -0.001, 452749 -0.002, 2175 -0.003, 17 +0.001, 528448 +0.002, 141 +0.003, 5 2.Payment -0.001, 454917 +0.001, 528567 +0.002, 3 3.Order-Status -0.001, 45491 +0.001, 52857 4.Delivery -0.001, 45045 -0.002, 442 -0.003, 5 +0.001, 52831 +0.002, 25 +0.003, 1 5.Stock-Level -0.001, 45491 +0.001, 52857 <90th Percentile RT (MaxRT)> - New-Order : 0.001 (0.002) - Payment : 0.001 (0.001) -Order-Status : 0.001 (0.001) + New-Order : 0.001 (0.003) + Payment : 0.001 (0.002) +Order-Status : 0.001 (0.000) Delivery : 0.001 (0.003) Stock-Level : 0.001 (0.000) -37911 Tpmc +44049 Tpmc ``` ## Refer to diff --git a/tpcc/src/backend/dual.rs b/tpcc/src/backend/dual.rs index 8d13e7d7..c88d87fd 100644 --- a/tpcc/src/backend/dual.rs +++ b/tpcc/src/backend/dual.rs @@ -13,9 +13,10 @@ // limitations under the License. use super::kitesql_rocksdb::{KiteSqlRocksDbBackend, KiteSqlRocksDbTransaction, KiteSqlTxnResult}; -use super::sqlite::{SqliteBackend, SqliteResult, SqliteTransaction}; +use super::sqlite::{SqliteBackend, SqlitePreparedStatement, SqliteResult, SqliteTransaction}; use super::{ - BackendControl, BackendTransaction, DbParam, PreparedStatement, SimpleExecutor, StatementSpec, + BackendControl, BackendTransaction, DbParam, KiteSqlPreparedStatement, PreparedStatement, + SimpleExecutor, StatementSpec, }; use crate::{TpccError, STOCK_LEVEL_DISTINCT_SQL, STOCK_LEVEL_DISTINCT_SQLITE}; use kite_sql::types::tuple::Tuple; @@ -28,6 +29,18 @@ pub struct DualBackend { sqlite: SqliteBackend, } +pub struct DualPreparedStatement<'a> { + kitesql: KiteSqlPreparedStatement, + sqlite: SqlitePreparedStatement<'a>, + spec: StatementSpec, +} + +impl PreparedStatement for DualPreparedStatement<'_> { + fn spec(&self) -> &StatementSpec { + &self.spec + } +} + impl DualBackend { pub fn new(path: &str, rocksdb_stats: bool) -> Result { Ok(Self { @@ -38,6 +51,11 @@ impl DualBackend { } impl BackendControl for DualBackend { + type PreparedStatement<'a> + = DualPreparedStatement<'a> + where + Self: 'a; + type Transaction<'a> = DualTransaction<'a> where @@ -46,8 +64,29 @@ impl BackendControl for DualBackend { fn prepare_statements( &self, specs: &[Vec], - ) -> Result>, TpccError> { - self.kitesql.prepare_statements(specs) + ) -> Result>>, TpccError> { + let sqlite_specs: Vec> = specs + .iter() + .map(|group| group.iter().map(sqlite_statement_spec).collect()) + .collect(); + let kitesql_groups = self.kitesql.prepare_statements(specs)?; + let sqlite_groups = self.sqlite.prepare_statements(&sqlite_specs)?; + let mut groups = Vec::with_capacity(kitesql_groups.len()); + + for (kitesql_group, sqlite_group) in kitesql_groups.into_iter().zip(sqlite_groups) { + let mut group = Vec::with_capacity(kitesql_group.len()); + for (kitesql, sqlite) in kitesql_group.into_iter().zip(sqlite_group) { + let spec = kitesql.spec().clone(); + group.push(DualPreparedStatement { + kitesql, + sqlite, + spec, + }); + } + groups.push(group); + } + + Ok(groups) } fn new_transaction(&self) -> Result, TpccError> { @@ -78,18 +117,17 @@ pub struct DualTransaction<'a> { } impl<'a> BackendTransaction for DualTransaction<'a> { + type PreparedStatement = DualPreparedStatement<'a>; + fn query_one( &mut self, - statement: &PreparedStatement, + statement: &mut Self::PreparedStatement, params: &[DbParam], ) -> Result { - let spec = statement.spec().clone(); - let sqlite_stmt = PreparedStatement::Sqlite { - spec: sqlite_statement_spec(&spec), - }; + let spec = statement.spec.clone(); - let kitesql_iter = self.kitesql.execute_raw(statement, params)?; - let sqlite_iter = self.sqlite.execute_raw(&sqlite_stmt, params)?; + let kitesql_iter = self.kitesql.execute_raw(&mut statement.kitesql, params)?; + let sqlite_iter = self.sqlite.execute_raw(&mut statement.sqlite, params)?; if is_select_sql(&spec) { if spec.sql == STOCK_LEVEL_DISTINCT_SQL { @@ -114,17 +152,14 @@ impl<'a> BackendTransaction for DualTransaction<'a> { fn query_nth( &mut self, - statement: &PreparedStatement, + statement: &mut Self::PreparedStatement, params: &[DbParam], n: usize, ) -> Result { - let spec = statement.spec().clone(); - let sqlite_stmt = PreparedStatement::Sqlite { - spec: sqlite_statement_spec(&spec), - }; + let spec = statement.spec.clone(); - let kitesql_iter = self.kitesql.execute_raw(statement, params)?; - let sqlite_iter = self.sqlite.execute_raw(&sqlite_stmt, params)?; + let kitesql_iter = self.kitesql.execute_raw(&mut statement.kitesql, params)?; + let sqlite_iter = self.sqlite.execute_raw(&mut statement.sqlite, params)?; if spec.sql == STOCK_LEVEL_DISTINCT_SQL { let kitesql_rows = collect_all_rows(kitesql_iter)?; @@ -141,16 +176,13 @@ impl<'a> BackendTransaction for DualTransaction<'a> { fn execute_drain( &mut self, - statement: &PreparedStatement, + statement: &mut Self::PreparedStatement, params: &[DbParam], ) -> Result<(), TpccError> { - let spec = statement.spec().clone(); - let sqlite_stmt = PreparedStatement::Sqlite { - spec: sqlite_statement_spec(&spec), - }; + let spec = statement.spec.clone(); - let kitesql_iter = self.kitesql.execute_raw(statement, params)?; - let sqlite_iter = self.sqlite.execute_raw(&sqlite_stmt, params)?; + let kitesql_iter = self.kitesql.execute_raw(&mut statement.kitesql, params)?; + let sqlite_iter = self.sqlite.execute_raw(&mut statement.sqlite, params)?; if is_select_sql(&spec) { if spec.sql == STOCK_LEVEL_DISTINCT_SQL { @@ -182,7 +214,7 @@ fn normalize_sqlite_sql(sql: &str) -> Option> { } } -fn drain_sqlite_iter(mut iter: SqliteResult<'_>) -> Result<(), TpccError> { +fn drain_sqlite_iter(mut iter: SqliteResult<'_, '_>) -> Result<(), TpccError> { while let Some(row) = iter.next() { row?; } @@ -211,7 +243,7 @@ where fn query_ordered_nth( mut kitesql_iter: KiteSqlTxnResult<'_, T>, - mut sqlite_iter: SqliteResult<'_>, + mut sqlite_iter: SqliteResult<'_, '_>, sql: &'static str, n: usize, ) -> Result { @@ -261,7 +293,7 @@ fn query_ordered_nth( fn drain_and_compare_ordered( mut kitesql_iter: KiteSqlTxnResult<'_, T>, - mut sqlite_iter: SqliteResult<'_>, + mut sqlite_iter: SqliteResult<'_, '_>, sql: &'static str, ) -> Result<(), TpccError> { loop { diff --git a/tpcc/src/backend/kitesql_lmdb.rs b/tpcc/src/backend/kitesql_lmdb.rs index d551e12b..86c1d06c 100644 --- a/tpcc/src/backend/kitesql_lmdb.rs +++ b/tpcc/src/backend/kitesql_lmdb.rs @@ -14,7 +14,8 @@ use super::kitesql_rocksdb::{execute_kitesql_batch, KiteSqlTxnResult}; use super::{ - BackendControl, BackendTransaction, DbParam, PreparedStatement, SimpleExecutor, StatementSpec, + BackendControl, BackendTransaction, DbParam, KiteSqlPreparedStatement, SimpleExecutor, + StatementSpec, }; use crate::TpccError; use kite_sql::db::{prepare, DBTransaction, DataBaseBuilder, Database}; @@ -37,13 +38,13 @@ impl KiteSqlLmdbBackend { fn prepare_spec_groups( &self, specs: &[Vec], - ) -> Result>, TpccError> { + ) -> Result>, TpccError> { let mut groups = Vec::with_capacity(specs.len()); for group in specs { let mut prepared = Vec::with_capacity(group.len()); for spec in group { let statement = prepare(spec.sql)?; - prepared.push(PreparedStatement::KiteSql { + prepared.push(KiteSqlPreparedStatement { statement, spec: spec.clone(), }); @@ -61,6 +62,11 @@ impl KiteSqlLmdbBackend { } impl BackendControl for KiteSqlLmdbBackend { + type PreparedStatement<'a> + = KiteSqlPreparedStatement + where + Self: 'a; + type Transaction<'a> = KiteSqlLmdbTransactionWrapper<'a> where @@ -69,7 +75,7 @@ impl BackendControl for KiteSqlLmdbBackend { fn prepare_statements( &self, specs: &[Vec], - ) -> Result>, TpccError> { + ) -> Result>>, TpccError> { self.prepare_spec_groups(specs) } @@ -91,22 +97,21 @@ pub struct KiteSqlLmdbTransactionWrapper<'a> { impl<'a> KiteSqlLmdbTransactionWrapper<'a> { pub(crate) fn execute_raw<'b>( &'b mut self, - statement: &PreparedStatement, + statement: &mut KiteSqlPreparedStatement, params: &[DbParam], ) -> Result>, TpccError> { - let PreparedStatement::KiteSql { statement, .. } = statement else { - return Err(TpccError::InvalidBackend); - }; Ok(KiteSqlTxnResult::new( - self.inner.execute(statement, params)?, + self.inner.execute(&statement.statement, params)?, )) } } impl<'a> BackendTransaction for KiteSqlLmdbTransactionWrapper<'a> { + type PreparedStatement = KiteSqlPreparedStatement; + fn query_one( &mut self, - statement: &PreparedStatement, + statement: &mut Self::PreparedStatement, params: &[DbParam], ) -> Result { self.execute_raw(statement, params)? @@ -117,7 +122,7 @@ impl<'a> BackendTransaction for KiteSqlLmdbTransactionWrapper<'a> { fn query_nth( &mut self, - statement: &PreparedStatement, + statement: &mut Self::PreparedStatement, params: &[DbParam], n: usize, ) -> Result { @@ -134,7 +139,7 @@ impl<'a> BackendTransaction for KiteSqlLmdbTransactionWrapper<'a> { fn execute_drain( &mut self, - statement: &PreparedStatement, + statement: &mut Self::PreparedStatement, params: &[DbParam], ) -> Result<(), TpccError> { let mut iter = self.execute_raw(statement, params)?; @@ -144,7 +149,7 @@ impl<'a> BackendTransaction for KiteSqlLmdbTransactionWrapper<'a> { fn with_query_one( &mut self, - statement: &PreparedStatement, + statement: &mut Self::PreparedStatement, params: &[DbParam], visitor: &mut dyn FnMut(&Tuple) -> Result<(), TpccError>, ) -> Result<(), TpccError> { @@ -155,7 +160,7 @@ impl<'a> BackendTransaction for KiteSqlLmdbTransactionWrapper<'a> { fn with_query_nth( &mut self, - statement: &PreparedStatement, + statement: &mut Self::PreparedStatement, params: &[DbParam], n: usize, visitor: &mut dyn FnMut(&Tuple) -> Result<(), TpccError>, diff --git a/tpcc/src/backend/kitesql_rocksdb.rs b/tpcc/src/backend/kitesql_rocksdb.rs index db1ab87d..ce225111 100644 --- a/tpcc/src/backend/kitesql_rocksdb.rs +++ b/tpcc/src/backend/kitesql_rocksdb.rs @@ -13,7 +13,8 @@ // limitations under the License. use super::{ - BackendControl, BackendTransaction, DbParam, PreparedStatement, SimpleExecutor, StatementSpec, + BackendControl, BackendTransaction, DbParam, KiteSqlPreparedStatement, SimpleExecutor, + StatementSpec, }; use crate::TpccError; use kite_sql::binder::{command_type, CommandType}; @@ -56,13 +57,13 @@ impl KiteSqlRocksBackend { fn prepare_spec_groups( &self, specs: &[Vec], - ) -> Result>, TpccError> { + ) -> Result>, TpccError> { let mut groups = Vec::with_capacity(specs.len()); for group in specs { let mut prepared = Vec::with_capacity(group.len()); for spec in group { let statement = prepare(spec.sql)?; - prepared.push(PreparedStatement::KiteSql { + prepared.push(KiteSqlPreparedStatement { statement, spec: spec.clone(), }); @@ -80,6 +81,11 @@ impl KiteSqlRocksBackend { } impl BackendControl for KiteSqlRocksBackend { + type PreparedStatement<'a> + = KiteSqlPreparedStatement + where + Self: 'a; + type Transaction<'a> = KiteSqlRocksTransaction<'a, S> where @@ -88,7 +94,7 @@ impl BackendControl for KiteSqlRocksBackend { fn prepare_statements( &self, specs: &[Vec], - ) -> Result>, TpccError> { + ) -> Result>>, TpccError> { self.prepare_spec_groups(specs) } @@ -164,22 +170,21 @@ pub struct KiteSqlRocksTransaction<'a, S: Storage> { impl<'a, S: Storage> KiteSqlRocksTransaction<'a, S> { pub(crate) fn execute_raw<'b>( &'b mut self, - statement: &PreparedStatement, + statement: &mut KiteSqlPreparedStatement, params: &[DbParam], ) -> Result>, TpccError> { - let PreparedStatement::KiteSql { statement, .. } = statement else { - return Err(TpccError::InvalidBackend); - }; Ok(KiteSqlTxnResult::new( - self.inner.execute(statement, params)?, + self.inner.execute(&statement.statement, params)?, )) } } impl<'a, S: Storage> BackendTransaction for KiteSqlRocksTransaction<'a, S> { + type PreparedStatement = KiteSqlPreparedStatement; + fn query_one( &mut self, - statement: &PreparedStatement, + statement: &mut Self::PreparedStatement, params: &[DbParam], ) -> Result { self.execute_raw(statement, params)? @@ -190,7 +195,7 @@ impl<'a, S: Storage> BackendTransaction for KiteSqlRocksTransaction<'a, S> { fn query_nth( &mut self, - statement: &PreparedStatement, + statement: &mut Self::PreparedStatement, params: &[DbParam], n: usize, ) -> Result { @@ -207,7 +212,7 @@ impl<'a, S: Storage> BackendTransaction for KiteSqlRocksTransaction<'a, S> { fn execute_drain( &mut self, - statement: &PreparedStatement, + statement: &mut Self::PreparedStatement, params: &[DbParam], ) -> Result<(), TpccError> { let mut iter = self.execute_raw(statement, params)?; @@ -217,7 +222,7 @@ impl<'a, S: Storage> BackendTransaction for KiteSqlRocksTransaction<'a, S> { fn with_query_one( &mut self, - statement: &PreparedStatement, + statement: &mut Self::PreparedStatement, params: &[DbParam], visitor: &mut dyn FnMut(&Tuple) -> Result<(), TpccError>, ) -> Result<(), TpccError> { @@ -228,7 +233,7 @@ impl<'a, S: Storage> BackendTransaction for KiteSqlRocksTransaction<'a, S> { fn with_query_nth( &mut self, - statement: &PreparedStatement, + statement: &mut Self::PreparedStatement, params: &[DbParam], n: usize, visitor: &mut dyn FnMut(&Tuple) -> Result<(), TpccError>, diff --git a/tpcc/src/backend/mod.rs b/tpcc/src/backend/mod.rs index db38a639..9a0fe915 100644 --- a/tpcc/src/backend/mod.rs +++ b/tpcc/src/backend/mod.rs @@ -29,14 +29,17 @@ pub trait SimpleExecutor { } pub trait BackendControl: SimpleExecutor { - type Transaction<'a>: BackendTransaction + 'a + type PreparedStatement<'a>: PreparedStatement + 'a + where + Self: 'a; + type Transaction<'a>: BackendTransaction> + 'a where Self: 'a; fn prepare_statements( &self, specs: &[Vec], - ) -> Result>, TpccError>; + ) -> Result>>, TpccError>; fn new_transaction(&self) -> Result, TpccError>; @@ -46,28 +49,30 @@ pub trait BackendControl: SimpleExecutor { } pub trait BackendTransaction { + type PreparedStatement: PreparedStatement; + fn query_one( &mut self, - statement: &PreparedStatement, + statement: &mut Self::PreparedStatement, params: &[DbParam], ) -> Result; fn query_nth( &mut self, - statement: &PreparedStatement, + statement: &mut Self::PreparedStatement, params: &[DbParam], n: usize, ) -> Result; fn execute_drain( &mut self, - statement: &PreparedStatement, + statement: &mut Self::PreparedStatement, params: &[DbParam], ) -> Result<(), TpccError>; fn with_query_one( &mut self, - statement: &PreparedStatement, + statement: &mut Self::PreparedStatement, params: &[DbParam], visitor: &mut dyn FnMut(&Tuple) -> Result<(), TpccError>, ) -> Result<(), TpccError> { @@ -77,7 +82,7 @@ pub trait BackendTransaction { fn with_query_nth( &mut self, - statement: &PreparedStatement, + statement: &mut Self::PreparedStatement, params: &[DbParam], n: usize, visitor: &mut dyn FnMut(&Tuple) -> Result<(), TpccError>, @@ -108,21 +113,17 @@ pub struct StatementSpec { } #[derive(Clone)] -pub enum PreparedStatement { - KiteSql { - statement: Statement, - spec: StatementSpec, - }, - Sqlite { - spec: StatementSpec, - }, +pub struct KiteSqlPreparedStatement { + pub statement: Statement, + pub spec: StatementSpec, +} + +pub trait PreparedStatement { + fn spec(&self) -> &StatementSpec; } -impl PreparedStatement { - pub fn spec(&self) -> &StatementSpec { - match self { - PreparedStatement::KiteSql { spec, .. } => spec, - PreparedStatement::Sqlite { spec } => spec, - } +impl PreparedStatement for KiteSqlPreparedStatement { + fn spec(&self) -> &StatementSpec { + &self.spec } } diff --git a/tpcc/src/backend/sqlite.rs b/tpcc/src/backend/sqlite.rs index 8ffc4759..1ac4606c 100644 --- a/tpcc/src/backend/sqlite.rs +++ b/tpcc/src/backend/sqlite.rs @@ -23,7 +23,7 @@ use kite_sql::types::tuple::Tuple; use kite_sql::types::value::{DataValue, Utf8Type}; use kite_sql::types::CharLengthUnits; use rust_decimal::Decimal; -use sqlite::{Connection, CursorWithOwnership, Row, Statement as SqliteStatement, Value}; +use sqlite::{Connection, State, Statement as SqliteStatement, Value}; pub struct SqliteBackend { connection: Connection, @@ -51,17 +51,21 @@ impl SqliteBackend { fn prepare_spec_groups( &self, specs: &[Vec], - ) -> Result>, TpccError> { - Ok(specs - .iter() - .map(|group| { - group - .iter() - .cloned() - .map(|spec| PreparedStatement::Sqlite { spec }) - .collect() - }) - .collect()) + ) -> Result>>, TpccError> { + let mut groups = Vec::with_capacity(specs.len()); + + for group in specs { + let mut prepared = Vec::with_capacity(group.len()); + for spec in group { + let statement = self.connection.prepare(spec.sql)?; + prepared.push(SqlitePreparedStatement { + statement, + spec: spec.clone(), + }); + } + groups.push(prepared); + } + Ok(groups) } fn start_transaction(&self) -> Result, TpccError> { @@ -74,6 +78,11 @@ impl SqliteBackend { } impl BackendControl for SqliteBackend { + type PreparedStatement<'a> + = SqlitePreparedStatement<'a> + where + Self: 'a; + type Transaction<'a> = SqliteTransaction<'a> where @@ -82,7 +91,7 @@ impl BackendControl for SqliteBackend { fn prepare_statements( &self, specs: &[Vec], - ) -> Result>, TpccError> { + ) -> Result>>, TpccError> { self.prepare_spec_groups(specs) } @@ -105,18 +114,26 @@ pub struct SqliteTransaction<'a> { finished: bool, } +pub struct SqlitePreparedStatement<'a> { + statement: SqliteStatement<'a>, + spec: StatementSpec, +} + +impl PreparedStatement for SqlitePreparedStatement<'_> { + fn spec(&self) -> &StatementSpec { + &self.spec + } +} + impl<'a> SqliteTransaction<'a> { pub(crate) fn execute_raw<'b>( &'b mut self, - statement: &PreparedStatement, + statement: &'b mut SqlitePreparedStatement<'a>, params: &[DbParam], - ) -> Result, TpccError> { - let PreparedStatement::Sqlite { spec } = statement else { - return Err(TpccError::InvalidBackend); - }; - let mut stmt = self.connection.prepare(spec.sql)?; - bind_params(&mut stmt, params)?; - SqliteResult::new(stmt.into_iter(), spec.result_types) + ) -> Result, TpccError> { + statement.statement.reset()?; + bind_params(&mut statement.statement, params)?; + SqliteResult::new(&mut statement.statement, statement.spec.result_types) } } @@ -129,9 +146,11 @@ impl Drop for SqliteTransaction<'_> { } impl<'a> BackendTransaction for SqliteTransaction<'a> { + type PreparedStatement = SqlitePreparedStatement<'a>; + fn query_one( &mut self, - statement: &PreparedStatement, + statement: &mut Self::PreparedStatement, params: &[DbParam], ) -> Result { match self.execute_raw(statement, params)?.next() { @@ -142,7 +161,7 @@ impl<'a> BackendTransaction for SqliteTransaction<'a> { fn query_nth( &mut self, - statement: &PreparedStatement, + statement: &mut Self::PreparedStatement, params: &[DbParam], n: usize, ) -> Result { @@ -160,7 +179,7 @@ impl<'a> BackendTransaction for SqliteTransaction<'a> { fn execute_drain( &mut self, - statement: &PreparedStatement, + statement: &mut Self::PreparedStatement, params: &[DbParam], ) -> Result<(), TpccError> { let mut iter = self.execute_raw(statement, params)?; @@ -288,58 +307,66 @@ fn normalize_sqlite_sql(sql: &str) -> Option { Some(trimmed.to_string()) } -pub struct SqliteResult<'a> { - cursor: CursorWithOwnership<'a>, +pub struct SqliteResult<'stmt, 'conn> { + statement: &'stmt mut SqliteStatement<'conn>, column_types: &'static [ColumnType], } -impl<'a> SqliteResult<'a> { +impl<'stmt, 'conn> SqliteResult<'stmt, 'conn> { fn new( - cursor: CursorWithOwnership<'a>, + statement: &'stmt mut SqliteStatement<'conn>, column_types: &'static [ColumnType], ) -> Result { Ok(Self { - cursor, + statement, column_types, }) } } -impl Iterator for SqliteResult<'_> { +impl Drop for SqliteResult<'_, '_> { + fn drop(&mut self) { + let _ = self.statement.reset(); + } +} + +impl Iterator for SqliteResult<'_, '_> { type Item = Result; fn next(&mut self) -> Option { - let row = match self.cursor.next()? { - Ok(row) => row, + match self.statement.next() { + Ok(State::Row) => Some(convert_statement_row(&self.statement, self.column_types)), + Ok(State::Done) => None, Err(err) => return Some(Err(TpccError::Sqlite(err))), - }; - - Some(convert_row(row, self.column_types)) + } } } -fn convert_row(row: Row, types: &[ColumnType]) -> Result { +fn convert_statement_row( + statement: &SqliteStatement<'_>, + types: &[ColumnType], +) -> Result { let mut values = Vec::with_capacity(types.len()); for (idx, column_type) in types.iter().enumerate() { let value = match column_type { - ColumnType::Int8 => DataValue::Int8(row.try_read::(idx)? as i8), - ColumnType::Int16 => DataValue::Int16(row.try_read::(idx)? as i16), - ColumnType::Int32 => DataValue::Int32(row.try_read::(idx)? as i32), - ColumnType::Int64 => DataValue::Int64(row.try_read::(idx)?), - ColumnType::Decimal => DataValue::Decimal(read_decimal(&row, idx)?), + ColumnType::Int8 => DataValue::Int8(statement.read::(idx)? as i8), + ColumnType::Int16 => DataValue::Int16(statement.read::(idx)? as i16), + ColumnType::Int32 => DataValue::Int32(statement.read::(idx)? as i32), + ColumnType::Int64 => DataValue::Int64(statement.read::(idx)?), + ColumnType::Decimal => DataValue::Decimal(read_decimal(statement, idx)?), ColumnType::Utf8 => DataValue::Utf8 { - value: row.try_read::<&str, _>(idx)?.to_string(), + value: statement.read::(idx)?, ty: Utf8Type::Variable(None), unit: CharLengthUnits::Characters, }, ColumnType::DateTime => { - let text: &str = row.try_read(idx)?; - parse_datetime(text)? + let text: String = statement.read(idx)?; + parse_datetime(&text)? } ColumnType::NullableDateTime => { - let text: Option<&str> = row.try_read(idx)?; + let text: Option = statement.read(idx)?; match text { - Some(value) => parse_datetime(value)?, + Some(value) => parse_datetime(&value)?, None => DataValue::Null, } } @@ -354,13 +381,13 @@ fn parse_datetime(text: &str) -> Result { Ok(DataValue::from(&dt)) } -fn read_decimal(row: &Row, idx: usize) -> Result { - if let Ok(text) = row.try_read::<&str, _>(idx) { - return Ok(Decimal::from_str_exact(text)?); +fn read_decimal(statement: &SqliteStatement<'_>, idx: usize) -> Result { + if let Ok(text) = statement.read::(idx) { + return Ok(Decimal::from_str_exact(&text)?); } - if let Ok(value) = row.try_read::(idx) { + if let Ok(value) = statement.read::(idx) { return Ok(Decimal::from_str_exact(&value.to_string())?); } - let value: i64 = row.try_read(idx)?; + let value: i64 = statement.read(idx)?; Ok(Decimal::from_str_exact(&value.to_string())?) } diff --git a/tpcc/src/delivery.rs b/tpcc/src/delivery.rs index 406d6bb6..927d6d55 100644 --- a/tpcc/src/delivery.rs +++ b/tpcc/src/delivery.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::backend::{BackendTransaction, PreparedStatement}; +use crate::backend::BackendTransaction; use crate::load::DIST_PER_WARE; use crate::{TpccArgs, TpccError, TpccTest, TpccTransaction}; use chrono::Utc; @@ -38,10 +38,10 @@ pub(crate) struct DeliveryTest; impl TpccTransaction for Delivery { type Args = DeliveryArgs; - fn run( - tx: &mut dyn BackendTransaction, + fn run( + tx: &mut T, args: &Self::Args, - statements: &[PreparedStatement], + statements: &mut [T::PreparedStatement], ) -> Result<(), TpccError> { let now = Utc::now().naive_utc(); @@ -49,7 +49,7 @@ impl TpccTransaction for Delivery { // "SELECT COALESCE(MIN(no_o_id),0) FROM new_orders WHERE no_d_id = ? AND no_w_id = ?" let mut no_o_id = 0; tx.with_query_one( - &statements[0], + &mut statements[0], &[ ("$1", DataValue::Int8(d_id as i8)), ("$2", DataValue::Int16(args.w_id as i16)), @@ -65,7 +65,7 @@ impl TpccTransaction for Delivery { } // "DELETE FROM new_orders WHERE no_o_id = ? AND no_d_id = ? AND no_w_id = ?" tx.execute_drain( - &statements[1], + &mut statements[1], &[ ("$1", DataValue::Int32(no_o_id)), ("$2", DataValue::Int8(d_id as i8)), @@ -75,7 +75,7 @@ impl TpccTransaction for Delivery { // "SELECT o_c_id FROM orders WHERE o_id = ? AND o_d_id = ? AND o_w_id = ?" let mut c_id = 0; tx.with_query_one( - &statements[2], + &mut statements[2], &[ ("$1", DataValue::Int32(no_o_id)), ("$2", DataValue::Int8(d_id as i8)), @@ -88,7 +88,7 @@ impl TpccTransaction for Delivery { )?; // "UPDATE orders SET o_carrier_id = ? WHERE o_id = ? AND o_d_id = ? AND o_w_id = ?" tx.execute_drain( - &statements[3], + &mut statements[3], &[ ("$1", DataValue::Int8(args.o_carrier_id as i8)), ("$2", DataValue::Int32(no_o_id)), @@ -98,7 +98,7 @@ impl TpccTransaction for Delivery { )?; // "UPDATE order_line SET ol_delivery_d = ? WHERE ol_o_id = ? AND ol_d_id = ? AND ol_w_id = ?" tx.execute_drain( - &statements[4], + &mut statements[4], &[ ("$1", DataValue::from(&now)), ("$2", DataValue::Int32(no_o_id)), @@ -109,7 +109,7 @@ impl TpccTransaction for Delivery { // "SELECT SUM(ol_amount) FROM order_line WHERE ol_o_id = ? AND ol_d_id = ? AND ol_w_id = ?" let mut ol_total = Default::default(); tx.with_query_one( - &statements[5], + &mut statements[5], &[ ("$1", DataValue::Int32(no_o_id)), ("$2", DataValue::Int8(d_id as i8)), @@ -122,7 +122,7 @@ impl TpccTransaction for Delivery { )?; // "UPDATE customer SET c_balance = c_balance + ? , c_delivery_cnt = c_delivery_cnt + 1 WHERE c_id = ? AND c_d_id = ? AND c_w_id = ?" tx.execute_drain( - &statements[6], + &mut statements[6], &[ ("$1", DataValue::Decimal(ol_total)), ("$2", DataValue::Int32(c_id)), @@ -137,17 +137,13 @@ impl TpccTransaction for Delivery { } impl TpccTest for DeliveryTest { - fn name(&self) -> &'static str { - "Delivery" - } - - fn do_transaction( + fn do_transaction( &self, rng: &mut ThreadRng, - tx: &mut dyn BackendTransaction, + tx: &mut T, num_ware: usize, _: &TpccArgs, - statements: &[PreparedStatement], + statements: &mut [T::PreparedStatement], ) -> Result<(), TpccError> { let w_id = rng.gen_range(0..num_ware) + 1; let o_carrier_id = rng.gen_range(1..10); diff --git a/tpcc/src/main.rs b/tpcc/src/main.rs index ac040b8d..1ff185bd 100644 --- a/tpcc/src/main.rs +++ b/tpcc/src/main.rs @@ -16,9 +16,7 @@ use crate::backend::dual::DualBackend; use crate::backend::kitesql_lmdb::KiteSqlLmdbBackend; use crate::backend::kitesql_rocksdb::{KiteSqlOptimisticRocksDbBackend, KiteSqlRocksDbBackend}; use crate::backend::sqlite::{SqliteBackend, SqliteProfile}; -use crate::backend::{ - BackendControl, BackendTransaction, ColumnType, PreparedStatement, StatementSpec, -}; +use crate::backend::{BackendControl, BackendTransaction, ColumnType, StatementSpec}; use crate::delivery::DeliveryTest; use crate::load::Load; use crate::new_ord::NewOrdTest; @@ -74,23 +72,21 @@ pub(crate) const STOCK_LEVEL_DISTINCT_SQLITE: &str = "SELECT DISTINCT ol_i_id FR pub(crate) trait TpccTransaction { type Args; - fn run( - tx: &mut dyn BackendTransaction, + fn run( + tx: &mut T, args: &Self::Args, - statements: &[PreparedStatement], + statements: &mut [T::PreparedStatement], ) -> Result<(), TpccError>; } pub(crate) trait TpccTest { - fn name(&self) -> &'static str; - - fn do_transaction( + fn do_transaction( &self, rng: &mut ThreadRng, - tx: &mut dyn BackendTransaction, + tx: &mut T, num_ware: usize, args: &TpccArgs, - statements: &[PreparedStatement], + statements: &mut [T::PreparedStatement], ) -> Result<(), TpccError>; } @@ -193,19 +189,12 @@ fn run_tpcc( Load::load_ord(rng, backend, args.num_ware)?; let statement_specs = statement_specs(); - let test_statements = backend.prepare_statements(&statement_specs)?; + let mut test_statements = backend.prepare_statements(&statement_specs)?; let mut rt_hist = RtHist::new(); let mut success = [0usize; 5]; let mut late = [0usize; 5]; let mut failure = [0usize; 5]; - let tests: Vec> = vec![ - Box::new(NewOrdTest), - Box::new(PaymentTest), - Box::new(OrderStatTest), - Box::new(DeliveryTest), - Box::new(SlevTest), - ]; let tpcc_args = TpccArgs { joins: args.joins }; let duration = Duration::new(args.measure_time, 0); @@ -228,8 +217,7 @@ fn run_tpcc( while tpcc_start.elapsed() < duration { let i = seq_gen.get(); - let tpcc_test = &tests[i]; - let statement = &test_statements[i]; + let statement = &mut test_statements[i]; let mut is_succeed = false; let mut last_error = None; @@ -238,7 +226,7 @@ fn run_tpcc( let mut tx = backend.new_transaction()?; if let Err(err) = - tpcc_test.do_transaction(rng, &mut tx, args.num_ware, &tpcc_args, statement) + do_tpcc_transaction::(i, rng, &mut tx, args.num_ware, &tpcc_args, statement) { failure[i] += 1; last_error = Some(err); @@ -260,7 +248,7 @@ fn run_tpcc( if let Some(err) = last_error { eprintln!( "[{}] Error after {} retries: {}", - tpcc_test.name(), + tpcc_test_name(i), args.max_retry, err ); @@ -281,7 +269,7 @@ fn run_tpcc( print_checkpoint( round_count / CHECK_POINT_COUNT, round_count, - tpcc_test.name(), + tpcc_test_name(i), p90, &success, &late, @@ -323,6 +311,28 @@ fn run_tpcc( Ok(()) } +fn tpcc_test_name(index: usize) -> &'static str { + TX_NAMES[index] +} + +fn do_tpcc_transaction<'a, B: BackendControl>( + index: usize, + rng: &mut ThreadRng, + tx: &mut B::Transaction<'a>, + num_ware: usize, + args: &TpccArgs, + statements: &mut [B::PreparedStatement<'a>], +) -> Result<(), TpccError> { + match index { + 0 => NewOrdTest.do_transaction(rng, tx, num_ware, args, statements), + 1 => PaymentTest.do_transaction(rng, tx, num_ware, args, statements), + 2 => OrderStatTest.do_transaction(rng, tx, num_ware, args, statements), + 3 => DeliveryTest.do_transaction(rng, tx, num_ware, args, statements), + 4 => SlevTest.do_transaction(rng, tx, num_ware, args, statements), + _ => Err(TpccError::InvalidTransaction), + } +} + #[cfg(all(unix, feature = "pprof"))] struct PprofSession { guard: ProfilerGuard<'static>, @@ -740,6 +750,7 @@ pub enum TpccError { EmptyTuples, MaxRetry, InvalidBackend, + InvalidTransaction, InvalidParameter, InvalidDateTime, BackendMismatch(String), @@ -757,6 +768,7 @@ impl fmt::Display for TpccError { Self::EmptyTuples => f.write_str("tuples is empty"), Self::MaxRetry => f.write_str("maximum retries reached"), Self::InvalidBackend => f.write_str("invalid backend usage"), + Self::InvalidTransaction => f.write_str("invalid transaction index"), Self::InvalidParameter => f.write_str("invalid parameter name"), Self::InvalidDateTime => f.write_str("invalid datetime value"), Self::BackendMismatch(value) => write!(f, "backend mismatch: {value}"), diff --git a/tpcc/src/new_ord.rs b/tpcc/src/new_ord.rs index ec3550a3..5c4376f3 100644 --- a/tpcc/src/new_ord.rs +++ b/tpcc/src/new_ord.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::backend::{BackendTransaction, PreparedStatement}; +use crate::backend::BackendTransaction; use crate::load::{nu_rand, CUST_PER_DIST, DIST_PER_WARE, MAX_ITEMS, MAX_NUM_ITEMS}; use crate::{other_ware, TpccArgs, TpccError, TpccTest, TpccTransaction, ALLOW_MULTI_WAREHOUSE_TX}; use chrono::Utc; @@ -67,10 +67,10 @@ pub(crate) struct NewOrdTest; impl TpccTransaction for NewOrd { type Args = NewOrdArgs; - fn run( - tx: &mut dyn BackendTransaction, + fn run( + tx: &mut T, args: &Self::Args, - statements: &[PreparedStatement], + statements: &mut [T::PreparedStatement], ) -> Result<(), TpccError> { let mut price = vec![Decimal::default(); MAX_NUM_ITEMS]; let mut iname = vec![String::new(); MAX_NUM_ITEMS]; @@ -86,7 +86,7 @@ impl TpccTransaction for NewOrd { let mut c_credit = String::new(); let mut w_tax = Decimal::default(); tx.with_query_one( - &statements[0], + &mut statements[0], &[ ("$1", DataValue::Int16(args.w_id as i16)), ("$2", DataValue::Int16(args.w_id as i16)), @@ -109,7 +109,7 @@ impl TpccTransaction for NewOrd { let mut c_last = String::new(); let mut c_credit = String::new(); tx.with_query_one( - &statements[1], + &mut statements[1], &[ ("$1", DataValue::Int16(args.w_id as i16)), ("$2", DataValue::Int8(args.d_id as i8)), @@ -125,7 +125,7 @@ impl TpccTransaction for NewOrd { // "SELECT w_tax FROM warehouse WHERE w_id = ?" let mut w_tax = Decimal::default(); tx.with_query_one( - &statements[2], + &mut statements[2], &[("$1", DataValue::Int16(args.w_id as i16))], &mut |tuple| { w_tax = tuple.values[0].decimal().unwrap(); @@ -139,7 +139,7 @@ impl TpccTransaction for NewOrd { let mut d_next_o_id = 0; let mut d_tax = Decimal::default(); tx.with_query_one( - &statements[3], + &mut statements[3], &[ ("$1", DataValue::Int8(args.d_id as i8)), ("$2", DataValue::Int16(args.w_id as i16)), @@ -152,7 +152,7 @@ impl TpccTransaction for NewOrd { )?; // "UPDATE district SET d_next_o_id = ? + 1 WHERE d_id = ? AND d_w_id = ?" tx.execute_drain( - &statements[4], + &mut statements[4], &[ ("$1", DataValue::Int32(d_next_o_id)), ("$2", DataValue::Int8(args.d_id as i8)), @@ -162,7 +162,7 @@ impl TpccTransaction for NewOrd { let o_id = d_next_o_id; // "INSERT INTO orders (o_id, o_d_id, o_w_id, o_c_id, o_entry_d, o_ol_cnt, o_all_local) VALUES(?, ?, ?, ?, ?, ?, ?)" tx.execute_drain( - &statements[5], + &mut statements[5], &[ ("$1", DataValue::Int32(o_id)), ("$2", DataValue::Int8(args.d_id as i8)), @@ -175,7 +175,7 @@ impl TpccTransaction for NewOrd { )?; // "INSERT INTO new_orders (no_o_id, no_d_id, no_w_id) VALUES (?,?,?)" tx.execute_drain( - &statements[6], + &mut statements[6], &[ ("$1", DataValue::Int32(o_id)), ("$2", DataValue::Int8(args.d_id as i8)), @@ -214,7 +214,7 @@ impl TpccTransaction for NewOrd { let mut i_price = Decimal::default(); let mut i_name = String::new(); let mut i_data = String::new(); - tx.with_query_one(&statements[7], ¶ms, &mut |tuple| { + tx.with_query_one(&mut statements[7], ¶ms, &mut |tuple| { i_price = tuple.values[0].decimal().unwrap(); i_name = tuple.values[1].utf8().unwrap().to_string(); i_data = tuple.values[2].utf8().unwrap().to_string(); @@ -241,7 +241,7 @@ impl TpccTransaction for NewOrd { let mut s_dist_08 = String::new(); let mut s_dist_09 = String::new(); let mut s_dist_10 = String::new(); - tx.with_query_one(&statements[8], ¶ms, &mut |tuple| { + tx.with_query_one(&mut statements[8], ¶ms, &mut |tuple| { s_quantity = tuple.values[0].i16().unwrap(); s_data = tuple.values[1].utf8().unwrap().to_string(); s_dist_01 = tuple.values[2].utf8().unwrap().to_string(); @@ -280,7 +280,7 @@ impl TpccTransaction for NewOrd { ("$2", DataValue::Int32(ol_i_id as i32)), ("$3", DataValue::Int16(ol_supply_w_id as i16)), ]; - tx.execute_drain(&statements[9], ¶ms)?; + tx.execute_drain(&mut statements[9], ¶ms)?; // Tips: Integers always have 7 digits, so divide by 10 here let mut ol_amount = Decimal::from(ol_quantity) @@ -304,7 +304,7 @@ impl TpccTransaction for NewOrd { ("$8", DataValue::Decimal(ol_amount.round_dp(2))), ("$9", DataValue::from(ol_dist_info)), ]; - tx.execute_drain(&statements[10], ¶ms)?; + tx.execute_drain(&mut statements[10], ¶ms)?; } Ok(()) @@ -312,17 +312,13 @@ impl TpccTransaction for NewOrd { } impl TpccTest for NewOrdTest { - fn name(&self) -> &'static str { - "New-Order" - } - - fn do_transaction( + fn do_transaction( &self, rng: &mut ThreadRng, - tx: &mut dyn BackendTransaction, + tx: &mut T, num_ware: usize, args: &TpccArgs, - statements: &[PreparedStatement], + statements: &mut [T::PreparedStatement], ) -> Result<(), TpccError> { let mut all_local = 1; let notfound = MAX_ITEMS + 1; diff --git a/tpcc/src/order_stat.rs b/tpcc/src/order_stat.rs index 883bd288..fa76ba1d 100644 --- a/tpcc/src/order_stat.rs +++ b/tpcc/src/order_stat.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::backend::{BackendTransaction, PreparedStatement}; +use crate::backend::BackendTransaction; use crate::load::{last_name, nu_rand, CUST_PER_DIST, DIST_PER_WARE}; use crate::{TpccArgs, TpccError, TpccTest, TpccTransaction}; use kite_sql::types::value::DataValue; @@ -52,16 +52,16 @@ pub(crate) struct OrderStatTest; impl TpccTransaction for OrderStat { type Args = OrderStatArgs; - fn run( - tx: &mut dyn BackendTransaction, + fn run( + tx: &mut T, args: &Self::Args, - statements: &[PreparedStatement], + statements: &mut [T::PreparedStatement], ) -> Result<(), TpccError> { let (_c_balance, _c_first, _c_middle, _c_last) = if args.by_name { // "SELECT count(c_id) FROM customer WHERE c_w_id = ? AND c_d_id = ? AND c_last = ?" let mut name_cnt = 0usize; tx.with_query_one( - &statements[0], + &mut statements[0], &[ ("$1", DataValue::Int16(args.w_id as i16)), ("$2", DataValue::Int8(args.d_id as i8)), @@ -86,7 +86,7 @@ impl TpccTransaction for OrderStat { let mut c_first = String::new(); let mut c_middle = String::new(); let mut c_last = String::new(); - tx.with_query_nth(&statements[1], ¶ms, target, &mut |tuple| { + tx.with_query_nth(&mut statements[1], ¶ms, target, &mut |tuple| { c_balance = tuple.values[0].decimal().unwrap(); c_first = tuple.values[1].utf8().unwrap().to_string(); c_middle = tuple.values[2].utf8().unwrap().to_string(); @@ -101,7 +101,7 @@ impl TpccTransaction for OrderStat { let mut c_middle = String::new(); let mut c_last = String::new(); tx.with_query_one( - &statements[2], + &mut statements[2], &[ ("$1", DataValue::Int16(args.w_id as i16)), ("$2", DataValue::Int8(args.d_id as i8)), @@ -127,7 +127,7 @@ impl TpccTransaction for OrderStat { ("$6", DataValue::Int32(args.c_id as i32)), ]; let mut o_id = 0; - tx.with_query_one(&statements[3], ¶ms, &mut |tuple| { + tx.with_query_one(&mut statements[3], ¶ms, &mut |tuple| { o_id = tuple.values[0].i32().unwrap(); Ok(()) })?; @@ -137,7 +137,7 @@ impl TpccTransaction for OrderStat { ("$2", DataValue::Int8(args.d_id as i8)), ("$3", DataValue::Int32(o_id)), ]; - tx.with_query_one(&statements[4], ¶ms, &mut |_| Ok(()))?; + tx.with_query_one(&mut statements[4], ¶ms, &mut |_| Ok(()))?; // let ol_i_id = tuple.values[0].i32(); // let ol_supply_w_id = tuple.values[1].i16(); // let ol_quantity = tuple.values[2].i8(); @@ -149,17 +149,13 @@ impl TpccTransaction for OrderStat { } impl TpccTest for OrderStatTest { - fn name(&self) -> &'static str { - "Order-Status" - } - - fn do_transaction( + fn do_transaction( &self, rng: &mut ThreadRng, - tx: &mut dyn BackendTransaction, + tx: &mut T, num_ware: usize, _: &TpccArgs, - statements: &[PreparedStatement], + statements: &mut [T::PreparedStatement], ) -> Result<(), TpccError> { let w_id = rng.gen_range(0..num_ware) + 1; let d_id = rng.gen_range(1..DIST_PER_WARE); diff --git a/tpcc/src/payment.rs b/tpcc/src/payment.rs index dbdeee9a..5b4f846a 100644 --- a/tpcc/src/payment.rs +++ b/tpcc/src/payment.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::backend::{BackendTransaction, PreparedStatement}; +use crate::backend::BackendTransaction; use crate::load::{last_name, nu_rand, CUST_PER_DIST, DIST_PER_WARE}; use crate::{other_ware, TpccArgs, TpccError, TpccTest, TpccTransaction, ALLOW_MULTI_WAREHOUSE_TX}; use chrono::Utc; @@ -65,15 +65,15 @@ impl TpccTransaction for Payment { type Args = PaymentArgs; #[allow(unused_variables)] - fn run( - tx: &mut dyn BackendTransaction, + fn run( + tx: &mut T, args: &Self::Args, - statements: &[PreparedStatement], + statements: &mut [T::PreparedStatement], ) -> Result<(), TpccError> { let now = Utc::now(); // "UPDATE warehouse SET w_ytd = w_ytd + ? WHERE w_id = ?" tx.execute_drain( - &statements[0], + &mut statements[0], &[ ("$1", DataValue::Decimal(args.h_amount)), ("$2", DataValue::Int16(args.w_id as i16)), @@ -87,7 +87,7 @@ impl TpccTransaction for Payment { let mut w_zip = String::new(); let mut w_name = String::new(); tx.with_query_one( - &statements[1], + &mut statements[1], &[("$1", DataValue::Int16(args.w_id as i16))], &mut |tuple| { w_street_1 = tuple.values[0].utf8().unwrap().to_string(); @@ -102,7 +102,7 @@ impl TpccTransaction for Payment { // "UPDATE district SET d_ytd = d_ytd + ? WHERE d_w_id = ? AND d_id = ?" tx.execute_drain( - &statements[2], + &mut statements[2], &[ ("$1", DataValue::Decimal(args.h_amount)), ("$2", DataValue::Int16(args.w_id as i16)), @@ -118,7 +118,7 @@ impl TpccTransaction for Payment { let mut d_zip = String::new(); let mut d_name = String::new(); tx.with_query_one( - &statements[3], + &mut statements[3], &[ ("$1", DataValue::Int16(args.w_id as i16)), ("$2", DataValue::Int8(args.d_id as i8)), @@ -139,7 +139,7 @@ impl TpccTransaction for Payment { // "SELECT count(c_id) FROM customer WHERE c_w_id = ? AND c_d_id = ? AND c_last = ?" let mut name_cnt = 0; tx.with_query_one( - &statements[4], + &mut statements[4], &[ ("$1", DataValue::Int16(args.c_w_id as i16)), ("$2", DataValue::Int8(args.c_d_id as i8)), @@ -160,7 +160,7 @@ impl TpccTransaction for Payment { name_cnt += 1; } let target = name_cnt as usize / 2 - 1; - tx.with_query_nth(&statements[5], ¶ms, target, &mut |tuple| { + tx.with_query_nth(&mut statements[5], ¶ms, target, &mut |tuple| { c_id = tuple.values[0].i32().unwrap(); Ok(()) })?; @@ -181,7 +181,7 @@ impl TpccTransaction for Payment { let mut c_balance = Decimal::default(); let mut c_since = Default::default(); tx.with_query_one( - &statements[6], + &mut statements[6], &[ ("$1", DataValue::Int16(args.c_w_id as i16)), ("$2", DataValue::Int8(args.c_d_id as i8)), @@ -212,7 +212,7 @@ impl TpccTransaction for Payment { // "SELECT c_data FROM customer WHERE c_w_id = ? AND c_d_id = ? AND c_id = ?" let mut c_data = String::new(); tx.with_query_one( - &statements[7], + &mut statements[7], &[ ("$1", DataValue::Int16(args.c_w_id as i16)), ("$2", DataValue::Int8(args.c_d_id as i8)), @@ -229,7 +229,7 @@ impl TpccTransaction for Payment { // "UPDATE customer SET c_balance = ?, c_data = ? WHERE c_w_id = ? AND c_d_id = ? AND c_id = ?" tx.execute_drain( - &statements[8], + &mut statements[8], &[ ("$1", DataValue::Decimal(c_balance)), ("$2", DataValue::from(c_data)), @@ -241,7 +241,7 @@ impl TpccTransaction for Payment { } else { // "UPDATE customer SET c_balance = ? WHERE c_w_id = ? AND c_d_id = ? AND c_id = ?" tx.execute_drain( - &statements[9], + &mut statements[9], &[ ("$1", DataValue::Decimal(c_balance)), ("$2", DataValue::Int16(args.c_w_id as i16)), @@ -253,7 +253,7 @@ impl TpccTransaction for Payment { } else { // "UPDATE customer SET c_balance = ? WHERE c_w_id = ? AND c_d_id = ? AND c_id = ?" tx.execute_drain( - &statements[9], + &mut statements[9], &[ ("$1", DataValue::Decimal(c_balance)), ("$2", DataValue::Int16(args.c_w_id as i16)), @@ -265,7 +265,7 @@ impl TpccTransaction for Payment { let h_data = format!("\\0{d_name} \\0"); // "INSERT INTO history(h_c_d_id, h_c_w_id, h_c_id, h_d_id, h_w_id, h_date, h_amount, h_data) VALUES(?, ?, ?, ?, ?, ?, ?, ?)" tx.execute_drain( - &statements[10], + &mut statements[10], &[ ("$1", DataValue::Int8(args.c_d_id as i8)), ("$2", DataValue::Int16(args.c_w_id as i16)), @@ -283,17 +283,13 @@ impl TpccTransaction for Payment { } impl TpccTest for PaymentTest { - fn name(&self) -> &'static str { - "Payment" - } - - fn do_transaction( + fn do_transaction( &self, rng: &mut ThreadRng, - tx: &mut dyn BackendTransaction, + tx: &mut T, num_ware: usize, _: &TpccArgs, - statements: &[PreparedStatement], + statements: &mut [T::PreparedStatement], ) -> Result<(), TpccError> { let w_id = rng.gen_range(0..num_ware) + 1; let d_id = rng.gen_range(1..DIST_PER_WARE); diff --git a/tpcc/src/slev.rs b/tpcc/src/slev.rs index c3bb10e3..6b24718f 100644 --- a/tpcc/src/slev.rs +++ b/tpcc/src/slev.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::backend::{BackendTransaction, PreparedStatement}; +use crate::backend::BackendTransaction; use crate::load::DIST_PER_WARE; use crate::{TpccArgs, TpccError, TpccTest, TpccTransaction}; use kite_sql::types::value::DataValue; @@ -38,15 +38,15 @@ pub(crate) struct SlevTest; impl TpccTransaction for Slev { type Args = SlevArgs; - fn run( - tx: &mut dyn BackendTransaction, + fn run( + tx: &mut T, args: &Self::Args, - statements: &[PreparedStatement], + statements: &mut [T::PreparedStatement], ) -> Result<(), TpccError> { // "SELECT d_next_o_id FROM district WHERE d_id = ? AND d_w_id = ?" let mut d_next_o_id = 0; tx.with_query_one( - &statements[0], + &mut statements[0], &[ ("$1", DataValue::Int8(args.d_id as i8)), ("$2", DataValue::Int16(args.w_id as i16)), @@ -59,7 +59,7 @@ impl TpccTransaction for Slev { // "SELECT DISTINCT ol_i_id FROM order_line WHERE ol_w_id = ? AND ol_d_id = ? AND ol_o_id < ? AND ol_o_id >= (? - 20)" let mut ol_i_id = 0; tx.with_query_one( - &statements[1], + &mut statements[1], &[ ("$1", DataValue::Int16(args.w_id as i16)), ("$2", DataValue::Int8(args.d_id as i8)), @@ -73,7 +73,7 @@ impl TpccTransaction for Slev { )?; // "SELECT count(*) FROM stock WHERE s_w_id = ? AND s_i_id = ? AND s_quantity < ?" tx.with_query_one( - &statements[2], + &mut statements[2], &[ ("$1", DataValue::Int16(args.w_id as i16)), ("$2", DataValue::Int8(ol_i_id as i8)), @@ -88,17 +88,13 @@ impl TpccTransaction for Slev { } impl TpccTest for SlevTest { - fn name(&self) -> &'static str { - "Stock-Level" - } - - fn do_transaction( + fn do_transaction( &self, rng: &mut ThreadRng, - tx: &mut dyn BackendTransaction, + tx: &mut T, num_ware: usize, _: &TpccArgs, - statements: &[PreparedStatement], + statements: &mut [T::PreparedStatement], ) -> Result<(), TpccError> { let w_id = rng.gen_range(0..num_ware) + 1; let d_id = rng.gen_range(1..DIST_PER_WARE);