From 87c1c8097893e5aa738bca7857a224669932bcab Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Wed, 17 Jun 2026 13:20:47 +0100 Subject: [PATCH 1/2] - introduce CycleBufferPool as a concept that encapsulates MemoryPool with the growth logic - support `-d` in the benchmark --- src/RESPite.Benchmark/BenchmarkBase.cs | 21 +++- src/RESPite.Benchmark/readme.md | 23 ++--- src/RESPite/Buffers/CycleBuffer.cs | 26 ++--- src/RESPite/Buffers/CycleBufferPool.cs | 96 +++++++++++++++++++ src/RESPite/PublicAPI/PublicAPI.Shipped.txt | 3 - src/RESPite/PublicAPI/PublicAPI.Unshipped.txt | 7 ++ 6 files changed, 142 insertions(+), 34 deletions(-) create mode 100644 src/RESPite/Buffers/CycleBufferPool.cs diff --git a/src/RESPite.Benchmark/BenchmarkBase.cs b/src/RESPite.Benchmark/BenchmarkBase.cs index 50994b23f..6e08b0515 100644 --- a/src/RESPite.Benchmark/BenchmarkBase.cs +++ b/src/RESPite.Benchmark/BenchmarkBase.cs @@ -59,7 +59,7 @@ public enum PipelineStrategy protected BenchmarkBase(string[] args) { - int operations = 100_000; + int operations = 100_000, payloadSize = -1; string tests = ""; for (int i = 0; i < args.Length; i++) @@ -81,6 +81,9 @@ protected BenchmarkBase(string[] args) case "-P" when i != args.Length - 1 && int.TryParse(args[++i], out int tmp) && tmp > 0: PipelineDepth = tmp; break; + case "-d" when i != args.Length - 1 && int.TryParse(args[++i], out int tmp) && tmp > 0: + payloadSize = tmp; + break; case "-w" when i != args.Length - 1 && int.TryParse(args[++i], out int tmp): WriteMode = tmp; break; @@ -125,7 +128,21 @@ protected BenchmarkBase(string[] args) _operationsPerClient = operations / ClientCount; - Payload = "abc"u8.ToArray(); + if (payloadSize < 0) + { + Payload = "abc"u8.ToArray(); + } + else + { + var arr = new byte[payloadSize]; + var rand = new Random(payloadSize); // use the size as the seed, for repeatability + for (int i = 0; i < arr.Length; i++) + { + arr[i] = (byte)rand.Next(32, 127); // space thru ~ (ignore DEL) + } + + Payload = arr; + } } public abstract Task RunAll(); diff --git a/src/RESPite.Benchmark/readme.md b/src/RESPite.Benchmark/readme.md index ea8fe2105..cf8a6b0c0 100644 --- a/src/RESPite.Benchmark/readme.md +++ b/src/RESPite.Benchmark/readme.md @@ -21,35 +21,36 @@ Example usage: Basic options, for parity: -- `-h ` Server hostname (default 127.0.0.1) -- `-p ` Server port (default 6379) +- `-h ` Server hostname (default 127.0.0.1). +- `-p ` Server port (default 6379). - `-c ` Number of parallel connections (default 50). -- `-n ` Total number of requests (default 100000) +- `-n ` Total number of requests (default 100000). +- `-d ` Data size of SET/GET value in bytes (default 3). - `-P ` Pipeline requests. Default 1 (no pipeline). -- `-l` Loop. Run the tests forever -- `-q` Quiet. Just show query/sec values +- `-l` Loop. Run the tests forever. +- `-q` Quiet. Just show query/sec values. - `-t ` Only run the comma separated list of tests. The test names are the same as the ones produced as output. ## Custom options Additional options specific to this tool: -- `+m` / `-m`: enable or disable (default) multiplexing: when enabled clients share a connection, otherwise each client has a separate connection -- `--batch` / `--queue` pipelining should using batching (default) or queueing strategy -- `--basic` : perform basic typical IO operations rather than synthetic benchmarks +- `+m` / `-m`: enable or disable (default) multiplexing: when enabled clients share a connection, otherwise each client has a separate connection. +- `--batch` / `--queue` pipelining should using batching (default) or queueing strategy. +- `--basic` : perform basic typical IO operations rather than synthetic benchmarks. ## Internal options These exist mostly for Marc's benefit: -- `-w ` Specify the internal write-mode -- `+x` / `-x`: enable or disable (default) cancellation support (irrelevant until later v3 tranche) +- `-w ` Specify the internal write-mode. +- `+x` / `-x`: enable or disable (default) cancellation support (irrelevant until later v3 tranche). ## Local example To build and run from source, `dotnet run` can be used with everything after `--` being args to the command: ``` -dotnet run -p:TargetVer=3 -f net10.0 -c Release -- -q +m --batch -n 500000 +dotnet run -p:TargetVer=3 -f net10.0 -c Release -- -q -c 50 -P 100 +m --queue -n 500000 -q -l -t INCR ``` \ No newline at end of file diff --git a/src/RESPite/Buffers/CycleBuffer.cs b/src/RESPite/Buffers/CycleBuffer.cs index 14774b357..334171c12 100644 --- a/src/RESPite/Buffers/CycleBuffer.cs +++ b/src/RESPite/Buffers/CycleBuffer.cs @@ -38,28 +38,18 @@ public partial struct CycleBuffer // note: if someone uses an uninitialized CycleBuffer (via default): that's a skills issue; git gud public static CycleBuffer Create( - MemoryPool? pool = null, - int pageSize = DefaultPageSize, - ICycleBufferCallback? callback = null) - { - pool ??= DefaultPool; - if (pageSize <= 0) pageSize = DefaultPageSize; - if (pageSize > pool.MaxBufferSize) pageSize = pool.MaxBufferSize; - return new CycleBuffer(pool, pageSize, callback); - } + CycleBufferPool? pool = null, + ICycleBufferCallback? callback = null) => new(pool, callback); - private CycleBuffer(MemoryPool pool, int pageSize, ICycleBufferCallback? callback) + private CycleBuffer(CycleBufferPool? pool, ICycleBufferCallback? callback = null) { - Pool = pool; - PageSize = pageSize; + _pool = pool ?? CycleBufferPool.Default; _callback = callback; leasedStart = -1; } - private const int DefaultPageSize = 8 * 1024; - - public int PageSize { get; } - public MemoryPool Pool { get; } + public CycleBufferPool Pool => _pool; + private readonly CycleBufferPool _pool; private readonly ICycleBufferCallback? _callback; private Segment? startSegment, endSegment; @@ -412,7 +402,7 @@ private Segment GetNextSegment() } } - Segment newSegment = Segment.Create(Pool.Rent(PageSize)); + Segment newSegment = Segment.Create(endSegment is null ? _pool.Rent() : _pool.Rent(GetAllCommitted())); if (endSegment is null) { // tabula rasa @@ -452,7 +442,7 @@ public Memory GetUncommittedMemory(int hint = 0) { if (!memory.IsEmpty) return MemoryMarshal.AsMemory(memory); } - else if (memory.Length >= Math.Min(hint, PageSize >> 2)) // respect the hint up to 1/4 of the page size + else if (memory.Length >= Math.Min(hint, 1024)) // respect the hint, or 1k (only relevant when large requests are made) { return MemoryMarshal.AsMemory(memory); } diff --git a/src/RESPite/Buffers/CycleBufferPool.cs b/src/RESPite/Buffers/CycleBufferPool.cs new file mode 100644 index 000000000..76a87e4dc --- /dev/null +++ b/src/RESPite/Buffers/CycleBufferPool.cs @@ -0,0 +1,96 @@ +using System.Buffers; +using System.Diagnostics.CodeAnalysis; + +namespace RESPite.Buffers; + +[Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)] +public abstract class CycleBufferPool +{ + /// + /// Create an initial buffer. + /// + public virtual IMemoryOwner Rent() => Rent(default); + + /// + /// Create a buffer with knowledge of the existing leased data. + /// + public abstract IMemoryOwner Rent(in ReadOnlySequence existing); + + // new MemoryPool(...) would be a non-growing buffer pool. + public static CycleBufferPool Default { get; } = new GrowingMemoryPool(minBytes: 8 * 1024); + + private class MemoryPool : CycleBufferPool + { +#if TRACK_MEMORY + private static MemoryPool DefaultPool => MemoryTrackedPool.Shared; +#else + private static MemoryPool DefaultPool => MemoryPool.Shared; +#endif + private readonly MemoryPool _pool; + private readonly int _minBytes, _maxBytes; + + public MemoryPool(int minBytes, MemoryPool? pool = null, int maxBytes = int.MaxValue) + { + _pool = pool ?? DefaultPool; + // capture the max bytes, without exceeding the pool's max size + _maxBytes = Math.Min(maxBytes, _pool.MaxBufferSize); + // capture the min bytes, applying a rigid lower bound, and not overlapping the max bytes + _minBytes = Math.Min(Math.Max(minBytes, 16), _maxBytes); + } + + /// + /// Rent a chunk using the specified size as a hint. + /// + protected IMemoryOwner Rent(int bytes) + { +#if NET + bytes = Math.Clamp(bytes, _minBytes, _maxBytes); +#else + bytes = Math.Min(Math.Max(bytes, _minBytes), _maxBytes); +#endif + return _pool.Rent(bytes); + } + + // by default, use fixed size without reference to the existing data; subclasses can tweak + + /// + public override IMemoryOwner Rent() => Rent(_minBytes); + + /// + public override IMemoryOwner Rent(in ReadOnlySequence existing) => Rent(_minBytes); + } + + private sealed class GrowingMemoryPool(int minBytes, MemoryPool? pool = null, int maxBytes = int.MaxValue) + : MemoryPool(minBytes, pool, maxBytes) + { + public override IMemoryOwner Rent(in ReadOnlySequence existing) + { + if (existing.IsEmpty) return base.Rent(existing); + // use a growth strategy looking at the size of the last segment + int lastChunk; + if (existing.IsSingleSegment) + { + lastChunk = existing.First.Length; + } + else if (existing.End.GetObject() is ReadOnlySequenceSegment segment) + { + lastChunk = segment.Memory.Length; // note we ignore GetInteger() intentionally + } + else + { + // do it the hard way; note we'll only observe the reserved size, rather + // than the actual buffer size, but that's the best we can do + lastChunk = 0; + foreach (var chunk in existing) + { + if (!chunk.IsEmpty) lastChunk = chunk.Length; + } + + if (lastChunk is 0) lastChunk = existing.First.Length; // paranoia + } + + // "max" here is to account for overflow - i.e. stop growing if that happens (unlikely) + return Rent(Math.Max(lastChunk, lastChunk << 1)); + } + } +} diff --git a/src/RESPite/PublicAPI/PublicAPI.Shipped.txt b/src/RESPite/PublicAPI/PublicAPI.Shipped.txt index 27160d830..f5ab197bf 100644 --- a/src/RESPite/PublicAPI/PublicAPI.Shipped.txt +++ b/src/RESPite/PublicAPI/PublicAPI.Shipped.txt @@ -32,8 +32,6 @@ [SER004]RESPite.Buffers.CycleBuffer.GetCommittedLength() -> long [SER004]RESPite.Buffers.CycleBuffer.GetUncommittedMemory(int hint = 0) -> System.Memory [SER004]RESPite.Buffers.CycleBuffer.GetUncommittedSpan(int hint = 0) -> System.Span -[SER004]RESPite.Buffers.CycleBuffer.PageSize.get -> int -[SER004]RESPite.Buffers.CycleBuffer.Pool.get -> System.Buffers.MemoryPool! [SER004]RESPite.Buffers.CycleBuffer.Release() -> void [SER004]RESPite.Buffers.CycleBuffer.TryGetCommitted(out System.ReadOnlySpan span) -> bool [SER004]RESPite.Buffers.CycleBuffer.TryGetFirstCommittedMemory(int minBytes, out System.ReadOnlyMemory memory) -> bool @@ -203,7 +201,6 @@ [SER004]RESPite.Messages.RespScanState.TryRead(System.ReadOnlySpan value, out int bytesRead) -> bool [SER004]RESPite.RespException [SER004]RESPite.RespException.RespException(string! message) -> void -[SER004]static RESPite.Buffers.CycleBuffer.Create(System.Buffers.MemoryPool? pool = null, int pageSize = 8192, RESPite.Buffers.ICycleBufferCallback? callback = null) -> RESPite.Buffers.CycleBuffer [SER004]static RESPite.Messages.RespFrameScanner.Default.get -> RESPite.Messages.RespFrameScanner! [SER004]static RESPite.Messages.RespFrameScanner.Subscription.get -> RESPite.Messages.RespFrameScanner! [SER004]virtual RESPite.Messages.RespAttributeReader.Read(ref RESPite.Messages.RespReader reader, ref T value) -> void diff --git a/src/RESPite/PublicAPI/PublicAPI.Unshipped.txt b/src/RESPite/PublicAPI/PublicAPI.Unshipped.txt index 9acf1fc40..f10c7cc88 100644 --- a/src/RESPite/PublicAPI/PublicAPI.Unshipped.txt +++ b/src/RESPite/PublicAPI/PublicAPI.Unshipped.txt @@ -4,3 +4,10 @@ [SER004]static RESPite.AsciiHash.EqualsCI(System.ReadOnlySpan first, System.ReadOnlySpan second) -> bool [SER004]static RESPite.AsciiHash.SequenceEqualsCI(System.ReadOnlySpan first, System.ReadOnlySpan second) -> bool [SER004]static RESPite.AsciiHash.SequenceEqualsCI(System.ReadOnlySpan first, System.ReadOnlySpan second) -> bool +[SER004]abstract RESPite.Buffers.CycleBufferPool.Rent(in System.Buffers.ReadOnlySequence existing) -> System.Buffers.IMemoryOwner! +[SER004]RESPite.Buffers.CycleBufferPool +[SER004]RESPite.Buffers.CycleBufferPool.CycleBufferPool() -> void +[SER004]static RESPite.Buffers.CycleBufferPool.Default.get -> RESPite.Buffers.CycleBufferPool! +[SER004]virtual RESPite.Buffers.CycleBufferPool.Rent() -> System.Buffers.IMemoryOwner! +[SER004]static RESPite.Buffers.CycleBuffer.Create(RESPite.Buffers.CycleBufferPool? pool = null, RESPite.Buffers.ICycleBufferCallback? callback = null) -> RESPite.Buffers.CycleBuffer +[SER004]RESPite.Buffers.CycleBuffer.Pool.get -> RESPite.Buffers.CycleBufferPool! \ No newline at end of file From 6350db9d94ee3e298ada3cd0a3305bde88281d02 Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Wed, 17 Jun 2026 14:32:41 +0100 Subject: [PATCH 2/2] - add ConfigurationOptions support - add [SkipLocalsInit] to RESPite --- src/RESPite/AssemblyInfo.cs | 1 + src/RESPite/Messages/RespReader.cs | 1 + src/RESPite/Shared/AsciiHash.Instance.cs | 1 + .../Shared}/SkipLocalsInit.cs | 0 src/StackExchange.Redis/AssemblyInfoHack.cs | 7 +++--- .../BufferedStreamWriter.Switchable.cs | 5 ++-- .../BufferedStreamWriter.cs | 10 ++++---- .../ConfigurationOptions.cs | 25 +++++++++++++++++++ .../PhysicalConnection.Read.cs | 6 +++-- .../PhysicalConnection.Write.cs | 5 ++-- .../PublicAPI/PublicAPI.Unshipped.txt | 4 +++ .../BufferedStreamWriterTests.cs | 10 ++++---- .../StackExchange.Redis.Tests/ConfigTests.cs | 2 ++ 13 files changed, 57 insertions(+), 20 deletions(-) create mode 100644 src/RESPite/AssemblyInfo.cs rename src/{StackExchange.Redis => RESPite/Shared}/SkipLocalsInit.cs (100%) diff --git a/src/RESPite/AssemblyInfo.cs b/src/RESPite/AssemblyInfo.cs new file mode 100644 index 000000000..f5476101b --- /dev/null +++ b/src/RESPite/AssemblyInfo.cs @@ -0,0 +1 @@ +[assembly: CLSCompliant(true)] diff --git a/src/RESPite/Messages/RespReader.cs b/src/RESPite/Messages/RespReader.cs index 86ff08f40..6b5dc2228 100644 --- a/src/RESPite/Messages/RespReader.cs +++ b/src/RESPite/Messages/RespReader.cs @@ -714,6 +714,7 @@ internal readonly T ParseBytes(Parser parser, TState } } + [CLSCompliant(false)] public readonly unsafe bool TryParseScalar( delegate* managed, out T, bool> parser, out T value) { diff --git a/src/RESPite/Shared/AsciiHash.Instance.cs b/src/RESPite/Shared/AsciiHash.Instance.cs index 53db4ff27..50ac4d245 100644 --- a/src/RESPite/Shared/AsciiHash.Instance.cs +++ b/src/RESPite/Shared/AsciiHash.Instance.cs @@ -35,6 +35,7 @@ public AsciiHash(string? value) : this(value is null ? [] : Encoding.ASCII.GetBy public override bool Equals(object? other) => other is AsciiHash hash && Equals(hash); /// + [CLSCompliant(false)] public bool Equals(in AsciiHash other) { return (_length == other.Length & _hashCS == other._hashCS) diff --git a/src/StackExchange.Redis/SkipLocalsInit.cs b/src/RESPite/Shared/SkipLocalsInit.cs similarity index 100% rename from src/StackExchange.Redis/SkipLocalsInit.cs rename to src/RESPite/Shared/SkipLocalsInit.cs diff --git a/src/StackExchange.Redis/AssemblyInfoHack.cs b/src/StackExchange.Redis/AssemblyInfoHack.cs index 50cdc2c1c..f2e7744d4 100644 --- a/src/StackExchange.Redis/AssemblyInfoHack.cs +++ b/src/StackExchange.Redis/AssemblyInfoHack.cs @@ -1,6 +1,5 @@ -// Yes, this is embarrassing. However, in .NET Core the including AssemblyInfo (ifdef'd or not) will screw with -// your version numbers. Therefore, we need to move the attribute out into another file...this file. -// When .csproj merges in, this should be able to return to Properties/AssemblyInfo.cs -using System; +using System; +using System.Runtime.CompilerServices; [assembly: CLSCompliant(true)] +[module: SkipLocalsInit] diff --git a/src/StackExchange.Redis/BufferedStreamWriter.Switchable.cs b/src/StackExchange.Redis/BufferedStreamWriter.Switchable.cs index 153c3962a..4fe9c5f31 100644 --- a/src/StackExchange.Redis/BufferedStreamWriter.Switchable.cs +++ b/src/StackExchange.Redis/BufferedStreamWriter.Switchable.cs @@ -4,6 +4,7 @@ using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Sources; +using RESPite.Buffers; namespace StackExchange.Redis; @@ -13,8 +14,8 @@ internal sealed class SwitchableBufferedStreamWriter : CycleBufferStreamWriter, private ManualResetValueTaskSourceCore _readerTask; private bool _syncSignalled; - public SwitchableBufferedStreamWriter(Stream target, CancellationToken cancellationToken, bool initiallySync) - : base(target, cancellationToken, initiallySync ? StateFlags.None : StateFlags.AsyncMode) + public SwitchableBufferedStreamWriter(CycleBufferPool? pool, Stream target, CancellationToken cancellationToken, bool initiallySync) + : base(pool, target, cancellationToken, initiallySync ? StateFlags.None : StateFlags.AsyncMode) { _readerTask.RunContinuationsAsynchronously = true; // we never want the flusher to take over the copying if (initiallySync) diff --git a/src/StackExchange.Redis/BufferedStreamWriter.cs b/src/StackExchange.Redis/BufferedStreamWriter.cs index 85a993a71..a839a8ad5 100644 --- a/src/StackExchange.Redis/BufferedStreamWriter.cs +++ b/src/StackExchange.Redis/BufferedStreamWriter.cs @@ -57,7 +57,7 @@ public enum WriteMode public virtual bool IsSync => false; - public static BufferedStreamWriter Create(WriteMode mode, ConnectionType connectionType, Stream target, CancellationToken cancellationToken) + public static BufferedStreamWriter Create(WriteMode mode, ConnectionType connectionType, Stream target, ConfigurationOptions? options, CancellationToken cancellationToken) { if (connectionType is ConnectionType.Subscription | mode is WriteMode.Default) { @@ -67,8 +67,8 @@ public static BufferedStreamWriter Create(WriteMode mode, ConnectionType connect } return mode switch { - WriteMode.Sync => new SwitchableBufferedStreamWriter(target, cancellationToken, initiallySync: true), - WriteMode.Async => new SwitchableBufferedStreamWriter(target, cancellationToken, initiallySync: false), + WriteMode.Sync => new SwitchableBufferedStreamWriter(options?.RequestCycleBufferPool, target, cancellationToken, initiallySync: true), + WriteMode.Async => new SwitchableBufferedStreamWriter(options?.RequestCycleBufferPool, target, cancellationToken, initiallySync: false), WriteMode.Pipe => new PipeStreamWriter(target, cancellationToken), _ => throw new ArgumentOutOfRangeException(nameof(mode)), }; @@ -115,10 +115,10 @@ public virtual void DebugSetLog(Action log) { } internal abstract class CycleBufferStreamWriter : BufferedStreamWriter, ICycleBufferCallback { - protected CycleBufferStreamWriter(Stream target, CancellationToken cancellationToken, StateFlags flags = StateFlags.None) + protected CycleBufferStreamWriter(CycleBufferPool? pool, Stream target, CancellationToken cancellationToken, StateFlags flags = StateFlags.None) : base(target, cancellationToken) { - _buffer = CycleBuffer.Create(callback: this); + _buffer = CycleBuffer.Create(pool: pool, callback: this); _stateFlags = flags; } diff --git a/src/StackExchange.Redis/ConfigurationOptions.cs b/src/StackExchange.Redis/ConfigurationOptions.cs index e8114cdb6..91de712f4 100644 --- a/src/StackExchange.Redis/ConfigurationOptions.cs +++ b/src/StackExchange.Redis/ConfigurationOptions.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.ComponentModel; +using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Net; using System.Net.Security; @@ -12,6 +13,8 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; +using RESPite; +using RESPite.Buffers; using StackExchange.Redis.Configuration; namespace StackExchange.Redis @@ -1413,5 +1416,27 @@ internal static bool TryParseRedisProtocol(string? value, out RedisProtocol prot protocol = default; return false; } + + /// + /// The buffer pool to use when buffering responses. + /// + public CycleBufferPool? ResponseCycleBufferPool + { + [Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)] + get; + [Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)] + set; + } + + /// + /// The buffer pool to use when buffering requests. + /// + public CycleBufferPool? RequestCycleBufferPool + { + [Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)] + get; + [Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)] + set; + } } } diff --git a/src/StackExchange.Redis/PhysicalConnection.Read.cs b/src/StackExchange.Redis/PhysicalConnection.Read.cs index af5421301..774db2c6b 100644 --- a/src/StackExchange.Redis/PhysicalConnection.Read.cs +++ b/src/StackExchange.Redis/PhysicalConnection.Read.cs @@ -65,6 +65,8 @@ static void StartReadingSync(PhysicalConnection conn, CancellationToken cancella private void StartReadAllAsync(CancellationToken cancellationToken) => Task.Run(() => ReadAllAsync(cancellationToken)).RedisFireAndForget(); + private CycleBufferPool? ReaderBufferPool => BridgeCouldBeNull?.Multiplexer?.RawConfig?.ResponseCycleBufferPool; + private async Task ReadAllAsync(CancellationToken cancellationToken) { var tail = _ioStream ?? Stream.Null; @@ -73,7 +75,7 @@ private async Task ReadAllAsync(CancellationToken cancellationToken) // preserve existing state if transitioning _readStatus = ReadStatus.Init; _readState = default; - _readBuffer = CycleBuffer.Create(); + _readBuffer = CycleBuffer.Create(pool: ReaderBufferPool); } try { @@ -130,7 +132,7 @@ private void ReadAllSync(CancellationToken cancellationToken) var tail = _ioStream ?? Stream.Null; _readStatus = ReadStatus.Init; _readState = default; - _readBuffer = CycleBuffer.Create(); + _readBuffer = CycleBuffer.Create(pool: ReaderBufferPool); try { int read; diff --git a/src/StackExchange.Redis/PhysicalConnection.Write.cs b/src/StackExchange.Redis/PhysicalConnection.Write.cs index 84d3430e1..0d213927d 100644 --- a/src/StackExchange.Redis/PhysicalConnection.Write.cs +++ b/src/StackExchange.Redis/PhysicalConnection.Write.cs @@ -22,9 +22,10 @@ private void InitOutput(Stream? stream) { if (stream is null) return; _ioStream = stream; - _output = BufferedStreamWriter.Create(WriteMode, connectionType, stream, OutputCancel); + var config = BridgeCouldBeNull?.Multiplexer?.RawConfig; + _output = BufferedStreamWriter.Create(WriteMode, connectionType, stream, config, OutputCancel); #if DEBUG - if (BridgeCouldBeNull?.Multiplexer.RawConfig.OutputLog is { } log) + if (config?.OutputLog is { } log) { _output.DebugSetLog(log); } diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt index 56c23463e..4a8bf8b68 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt @@ -1,4 +1,8 @@ #nullable enable +[SER004]StackExchange.Redis.ConfigurationOptions.RequestCycleBufferPool.get -> RESPite.Buffers.CycleBufferPool? +[SER004]StackExchange.Redis.ConfigurationOptions.RequestCycleBufferPool.set -> void +[SER004]StackExchange.Redis.ConfigurationOptions.ResponseCycleBufferPool.get -> RESPite.Buffers.CycleBufferPool? +[SER004]StackExchange.Redis.ConfigurationOptions.ResponseCycleBufferPool.set -> void [SER005]StackExchange.Redis.TestHarness [SER005]StackExchange.Redis.TestHarness.BufferValidator [SER005]StackExchange.Redis.TestHarness.ChannelPrefix.get -> StackExchange.Redis.RedisChannel diff --git a/tests/StackExchange.Redis.Tests/BufferedStreamWriterTests.cs b/tests/StackExchange.Redis.Tests/BufferedStreamWriterTests.cs index 56e1c86e9..932153db0 100644 --- a/tests/StackExchange.Redis.Tests/BufferedStreamWriterTests.cs +++ b/tests/StackExchange.Redis.Tests/BufferedStreamWriterTests.cs @@ -17,7 +17,7 @@ public class BufferedStreamWriterTests public async Task FlushStateDoesNotLeakIntoNextPageActivation(WriteMode mode) { var stream = new ObservedStream(); - var writer = BufferedStreamWriter.Create((BufferedStreamWriter.WriteMode)mode, ConnectionType.Interactive, stream, CancellationToken.None); + var writer = BufferedStreamWriter.Create((BufferedStreamWriter.WriteMode)mode, ConnectionType.Interactive, stream, null, CancellationToken.None); try { Write(writer, 1, 1); @@ -50,7 +50,7 @@ public async Task FlushStateDoesNotLeakIntoNextPageActivation(WriteMode mode) public async Task WriterDoesNotLoseFlushRequestedDuringDrainFlush(WriteMode mode) { var stream = new ObservedStream(); - var writer = BufferedStreamWriter.Create((BufferedStreamWriter.WriteMode)mode, ConnectionType.Interactive, stream, CancellationToken.None); + var writer = BufferedStreamWriter.Create((BufferedStreamWriter.WriteMode)mode, ConnectionType.Interactive, stream, null, CancellationToken.None); try { stream.BlockNextFlush(); @@ -81,7 +81,7 @@ public async Task WriterFaultsWriteCompleteAfterTargetWriteFailure(WriteMode mod { var failure = new IOException("simulated target write failure"); var stream = new ObservedStream { WriteException = failure }; - var writer = BufferedStreamWriter.Create((BufferedStreamWriter.WriteMode)mode, ConnectionType.Interactive, stream, CancellationToken.None); + var writer = BufferedStreamWriter.Create((BufferedStreamWriter.WriteMode)mode, ConnectionType.Interactive, stream, null, CancellationToken.None); Write(writer, 1, 1); writer.Flush(); @@ -101,7 +101,7 @@ public async Task WriterFaultsWriteCompleteAfterTargetWriteFailure(WriteMode mod public async Task SyncWriterTransitionsToAsyncWhileIdleAndPreservesBufferedData() { var stream = new ObservedStream(); - var writer = BufferedStreamWriter.Create(BufferedStreamWriter.WriteMode.Sync, ConnectionType.Interactive, stream, CancellationToken.None); + var writer = BufferedStreamWriter.Create(BufferedStreamWriter.WriteMode.Sync, ConnectionType.Interactive, stream, null, CancellationToken.None); try { Assert.True(writer.IsSync); @@ -135,7 +135,7 @@ public async Task SyncWriterTransitionsToAsyncWhileIdleAndPreservesBufferedData( public async Task SyncWriterTransitionsToAsyncAfterActiveSyncDrain() { var stream = new ObservedStream(); - var writer = BufferedStreamWriter.Create(BufferedStreamWriter.WriteMode.Sync, ConnectionType.Interactive, stream, CancellationToken.None); + var writer = BufferedStreamWriter.Create(BufferedStreamWriter.WriteMode.Sync, ConnectionType.Interactive, stream, null, CancellationToken.None); try { stream.BlockNextWrite(); diff --git a/tests/StackExchange.Redis.Tests/ConfigTests.cs b/tests/StackExchange.Redis.Tests/ConfigTests.cs index 561308d30..a117fc9e5 100644 --- a/tests/StackExchange.Redis.Tests/ConfigTests.cs +++ b/tests/StackExchange.Redis.Tests/ConfigTests.cs @@ -90,6 +90,8 @@ orderby name "password", "proxy", "reconnectRetryPolicy", + "RequestCycleBufferPool", + "ResponseCycleBufferPool", "responseTimeout", "ServiceName", "SocketManager",