Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions src/RESPite.Benchmark/BenchmarkBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public enum PipelineStrategy

protected BenchmarkBase(string[] args)
{
int operations = 100_000;
int operations = 100_000, payloadSize = -1;

string tests = "";
for (int i = 0; i < args.Length; i++)
Expand All @@ -81,6 +81,9 @@ protected BenchmarkBase(string[] args)
case "-P" when i != args.Length - 1 && int.TryParse(args[++i], out int tmp) && tmp > 0:
PipelineDepth = tmp;
break;
case "-d" when i != args.Length - 1 && int.TryParse(args[++i], out int tmp) && tmp > 0:
payloadSize = tmp;
break;
case "-w" when i != args.Length - 1 && int.TryParse(args[++i], out int tmp):
WriteMode = tmp;
break;
Expand Down Expand Up @@ -125,7 +128,21 @@ protected BenchmarkBase(string[] args)

_operationsPerClient = operations / ClientCount;

Payload = "abc"u8.ToArray();
if (payloadSize < 0)
{
Payload = "abc"u8.ToArray();
}
else
{
var arr = new byte[payloadSize];
var rand = new Random(payloadSize); // use the size as the seed, for repeatability
for (int i = 0; i < arr.Length; i++)
{
arr[i] = (byte)rand.Next(32, 127); // space thru ~ (ignore DEL)
}

Payload = arr;
}
}

public abstract Task RunAll();
Expand Down
23 changes: 12 additions & 11 deletions src/RESPite.Benchmark/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,36 @@ Example usage:

Basic options, for parity:

- `-h <hostname>` Server hostname (default 127.0.0.1)
- `-p <port>` Server port (default 6379)
- `-h <hostname>` Server hostname (default 127.0.0.1).
- `-p <port>` Server port (default 6379).
- `-c <clients>` Number of parallel connections (default 50).
- `-n <requests>` Total number of requests (default 100000)
- `-n <requests>` Total number of requests (default 100000).
- `-d <size>` Data size of SET/GET value in bytes (default 3).
- `-P <numreq>` Pipeline <numreq> requests. Default 1 (no pipeline).
- `-l` Loop. Run the tests forever
- `-q` Quiet. Just show query/sec values
- `-l` Loop. Run the tests forever.
- `-q` Quiet. Just show query/sec values.
- `-t <tests>` Only run the comma separated list of tests. The test names are the same as the ones produced as output.

## Custom options

Additional options specific to this tool:

- `+m` / `-m`: enable or disable (default) multiplexing: when enabled clients share a connection, otherwise each client has a separate connection
- `--batch` / `--queue` pipelining should using batching (default) or queueing strategy
- `--basic` : perform basic typical IO operations rather than synthetic benchmarks
- `+m` / `-m`: enable or disable (default) multiplexing: when enabled clients share a connection, otherwise each client has a separate connection.
- `--batch` / `--queue` pipelining should using batching (default) or queueing strategy.
- `--basic` : perform basic typical IO operations rather than synthetic benchmarks.


## Internal options

These exist mostly for Marc's benefit:

- `-w <mode>` Specify the internal write-mode
- `+x` / `-x`: enable or disable (default) cancellation support (irrelevant until later v3 tranche)
- `-w <mode>` Specify the internal write-mode.
- `+x` / `-x`: enable or disable (default) cancellation support (irrelevant until later v3 tranche).

## Local example

To build and run from source, `dotnet run` can be used with everything after `--` being args to the command:

```
dotnet run -p:TargetVer=3 -f net10.0 -c Release -- -q +m --batch -n 500000
dotnet run -p:TargetVer=3 -f net10.0 -c Release -- -q -c 50 -P 100 +m --queue -n 500000 -q -l -t INCR
```
1 change: 1 addition & 0 deletions src/RESPite/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[assembly: CLSCompliant(true)]
26 changes: 8 additions & 18 deletions src/RESPite/Buffers/CycleBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,18 @@ public partial struct CycleBuffer

// note: if someone uses an uninitialized CycleBuffer (via default): that's a skills issue; git gud
public static CycleBuffer Create(
MemoryPool<byte>? pool = null,
int pageSize = DefaultPageSize,
ICycleBufferCallback? callback = null)
{
pool ??= DefaultPool;
if (pageSize <= 0) pageSize = DefaultPageSize;
if (pageSize > pool.MaxBufferSize) pageSize = pool.MaxBufferSize;
return new CycleBuffer(pool, pageSize, callback);
}
CycleBufferPool? pool = null,
ICycleBufferCallback? callback = null) => new(pool, callback);

private CycleBuffer(MemoryPool<byte> pool, int pageSize, ICycleBufferCallback? callback)
private CycleBuffer(CycleBufferPool? pool, ICycleBufferCallback? callback = null)
{
Pool = pool;
PageSize = pageSize;
_pool = pool ?? CycleBufferPool.Default;
_callback = callback;
leasedStart = -1;
}

private const int DefaultPageSize = 8 * 1024;

public int PageSize { get; }
public MemoryPool<byte> Pool { get; }
public CycleBufferPool Pool => _pool;
private readonly CycleBufferPool _pool;
private readonly ICycleBufferCallback? _callback;

private Segment? startSegment, endSegment;
Expand Down Expand Up @@ -412,7 +402,7 @@ private Segment GetNextSegment()
}
}

Segment newSegment = Segment.Create(Pool.Rent(PageSize));
Segment newSegment = Segment.Create(endSegment is null ? _pool.Rent() : _pool.Rent(GetAllCommitted()));
if (endSegment is null)
{
// tabula rasa
Expand Down Expand Up @@ -452,7 +442,7 @@ public Memory<byte> GetUncommittedMemory(int hint = 0)
{
if (!memory.IsEmpty) return MemoryMarshal.AsMemory(memory);
}
else if (memory.Length >= Math.Min(hint, PageSize >> 2)) // respect the hint up to 1/4 of the page size
else if (memory.Length >= Math.Min(hint, 1024)) // respect the hint, or 1k (only relevant when large requests are made)
{
return MemoryMarshal.AsMemory(memory);
}
Expand Down
96 changes: 96 additions & 0 deletions src/RESPite/Buffers/CycleBufferPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
using System.Buffers;
using System.Diagnostics.CodeAnalysis;

namespace RESPite.Buffers;

[Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)]
public abstract class CycleBufferPool
{
/// <summary>
/// Create an initial buffer.
/// </summary>
public virtual IMemoryOwner<byte> Rent() => Rent(default);

/// <summary>
/// Create a buffer with knowledge of the existing leased data.
/// </summary>
public abstract IMemoryOwner<byte> Rent(in ReadOnlySequence<byte> existing);

// new MemoryPool(...) would be a non-growing buffer pool.
public static CycleBufferPool Default { get; } = new GrowingMemoryPool(minBytes: 8 * 1024);

private class MemoryPool : CycleBufferPool
{
#if TRACK_MEMORY
private static MemoryPool<byte> DefaultPool => MemoryTrackedPool<byte>.Shared;
#else
private static MemoryPool<byte> DefaultPool => MemoryPool<byte>.Shared;
#endif
private readonly MemoryPool<byte> _pool;
private readonly int _minBytes, _maxBytes;

public MemoryPool(int minBytes, MemoryPool<byte>? pool = null, int maxBytes = int.MaxValue)
{
_pool = pool ?? DefaultPool;
// capture the max bytes, without exceeding the pool's max size
_maxBytes = Math.Min(maxBytes, _pool.MaxBufferSize);
// capture the min bytes, applying a rigid lower bound, and not overlapping the max bytes
_minBytes = Math.Min(Math.Max(minBytes, 16), _maxBytes);
}

/// <summary>
/// Rent a chunk using the specified size as a hint.
/// </summary>
protected IMemoryOwner<byte> Rent(int bytes)
{
#if NET
bytes = Math.Clamp(bytes, _minBytes, _maxBytes);
#else
bytes = Math.Min(Math.Max(bytes, _minBytes), _maxBytes);
#endif
return _pool.Rent(bytes);
}

// by default, use fixed size without reference to the existing data; subclasses can tweak

/// <inheritdoc/>
public override IMemoryOwner<byte> Rent() => Rent(_minBytes);

/// <inheritdoc/>
public override IMemoryOwner<byte> Rent(in ReadOnlySequence<byte> existing) => Rent(_minBytes);
}

private sealed class GrowingMemoryPool(int minBytes, MemoryPool<byte>? pool = null, int maxBytes = int.MaxValue)
: MemoryPool(minBytes, pool, maxBytes)
{
public override IMemoryOwner<byte> Rent(in ReadOnlySequence<byte> existing)
{
if (existing.IsEmpty) return base.Rent(existing);
// use a growth strategy looking at the size of the last segment
int lastChunk;
if (existing.IsSingleSegment)
{
lastChunk = existing.First.Length;
}
else if (existing.End.GetObject() is ReadOnlySequenceSegment<byte> segment)
{
lastChunk = segment.Memory.Length; // note we ignore GetInteger() intentionally
}
else
{
// do it the hard way; note we'll only observe the reserved size, rather
// than the actual buffer size, but that's the best we can do
lastChunk = 0;
foreach (var chunk in existing)
{
if (!chunk.IsEmpty) lastChunk = chunk.Length;
}

if (lastChunk is 0) lastChunk = existing.First.Length; // paranoia
}

// "max" here is to account for overflow - i.e. stop growing if that happens (unlikely)
return Rent(Math.Max(lastChunk, lastChunk << 1));
}
}
}
1 change: 1 addition & 0 deletions src/RESPite/Messages/RespReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ internal readonly T ParseBytes<T, TState>(Parser<byte, TState, T> parser, TState
}
}

[CLSCompliant(false)]
public readonly unsafe bool TryParseScalar<T>(
delegate* managed<ReadOnlySpan<byte>, out T, bool> parser, out T value)
{
Expand Down
3 changes: 0 additions & 3 deletions src/RESPite/PublicAPI/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
[SER004]RESPite.Buffers.CycleBuffer.GetCommittedLength() -> long
[SER004]RESPite.Buffers.CycleBuffer.GetUncommittedMemory(int hint = 0) -> System.Memory<byte>
[SER004]RESPite.Buffers.CycleBuffer.GetUncommittedSpan(int hint = 0) -> System.Span<byte>
[SER004]RESPite.Buffers.CycleBuffer.PageSize.get -> int
[SER004]RESPite.Buffers.CycleBuffer.Pool.get -> System.Buffers.MemoryPool<byte>!
[SER004]RESPite.Buffers.CycleBuffer.Release() -> void
[SER004]RESPite.Buffers.CycleBuffer.TryGetCommitted(out System.ReadOnlySpan<byte> span) -> bool
[SER004]RESPite.Buffers.CycleBuffer.TryGetFirstCommittedMemory(int minBytes, out System.ReadOnlyMemory<byte> memory) -> bool
Expand Down Expand Up @@ -203,7 +201,6 @@
[SER004]RESPite.Messages.RespScanState.TryRead(System.ReadOnlySpan<byte> value, out int bytesRead) -> bool
[SER004]RESPite.RespException
[SER004]RESPite.RespException.RespException(string! message) -> void
[SER004]static RESPite.Buffers.CycleBuffer.Create(System.Buffers.MemoryPool<byte>? pool = null, int pageSize = 8192, RESPite.Buffers.ICycleBufferCallback? callback = null) -> RESPite.Buffers.CycleBuffer
[SER004]static RESPite.Messages.RespFrameScanner.Default.get -> RESPite.Messages.RespFrameScanner!
[SER004]static RESPite.Messages.RespFrameScanner.Subscription.get -> RESPite.Messages.RespFrameScanner!
[SER004]virtual RESPite.Messages.RespAttributeReader<T>.Read(ref RESPite.Messages.RespReader reader, ref T value) -> void
Expand Down
7 changes: 7 additions & 0 deletions src/RESPite/PublicAPI/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,10 @@
[SER004]static RESPite.AsciiHash.EqualsCI(System.ReadOnlySpan<char> first, System.ReadOnlySpan<byte> second) -> bool
[SER004]static RESPite.AsciiHash.SequenceEqualsCI(System.ReadOnlySpan<byte> first, System.ReadOnlySpan<char> second) -> bool
[SER004]static RESPite.AsciiHash.SequenceEqualsCI(System.ReadOnlySpan<char> first, System.ReadOnlySpan<byte> second) -> bool
[SER004]abstract RESPite.Buffers.CycleBufferPool.Rent(in System.Buffers.ReadOnlySequence<byte> existing) -> System.Buffers.IMemoryOwner<byte>!
[SER004]RESPite.Buffers.CycleBufferPool
[SER004]RESPite.Buffers.CycleBufferPool.CycleBufferPool() -> void
[SER004]static RESPite.Buffers.CycleBufferPool.Default.get -> RESPite.Buffers.CycleBufferPool!
[SER004]virtual RESPite.Buffers.CycleBufferPool.Rent() -> System.Buffers.IMemoryOwner<byte>!
[SER004]static RESPite.Buffers.CycleBuffer.Create(RESPite.Buffers.CycleBufferPool? pool = null, RESPite.Buffers.ICycleBufferCallback? callback = null) -> RESPite.Buffers.CycleBuffer
[SER004]RESPite.Buffers.CycleBuffer.Pool.get -> RESPite.Buffers.CycleBufferPool!
1 change: 1 addition & 0 deletions src/RESPite/Shared/AsciiHash.Instance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public AsciiHash(string? value) : this(value is null ? [] : Encoding.ASCII.GetBy
public override bool Equals(object? other) => other is AsciiHash hash && Equals(hash);

/// <inheritdoc cref="Equals(object)" />
[CLSCompliant(false)]
public bool Equals(in AsciiHash other)
{
return (_length == other.Length & _hashCS == other._hashCS)
Expand Down
7 changes: 3 additions & 4 deletions src/StackExchange.Redis/AssemblyInfoHack.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
// Yes, this is embarrassing. However, in .NET Core the including AssemblyInfo (ifdef'd or not) will screw with
// your version numbers. Therefore, we need to move the attribute out into another file...this file.
// When .csproj merges in, this should be able to return to Properties/AssemblyInfo.cs
using System;
using System;
using System.Runtime.CompilerServices;

[assembly: CLSCompliant(true)]
[module: SkipLocalsInit]
5 changes: 3 additions & 2 deletions src/StackExchange.Redis/BufferedStreamWriter.Switchable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
using RESPite.Buffers;

namespace StackExchange.Redis;

Expand All @@ -13,8 +14,8 @@ internal sealed class SwitchableBufferedStreamWriter : CycleBufferStreamWriter,
private ManualResetValueTaskSourceCore<bool> _readerTask;
private bool _syncSignalled;

public SwitchableBufferedStreamWriter(Stream target, CancellationToken cancellationToken, bool initiallySync)
: base(target, cancellationToken, initiallySync ? StateFlags.None : StateFlags.AsyncMode)
public SwitchableBufferedStreamWriter(CycleBufferPool? pool, Stream target, CancellationToken cancellationToken, bool initiallySync)
: base(pool, target, cancellationToken, initiallySync ? StateFlags.None : StateFlags.AsyncMode)
{
_readerTask.RunContinuationsAsynchronously = true; // we never want the flusher to take over the copying
if (initiallySync)
Expand Down
10 changes: 5 additions & 5 deletions src/StackExchange.Redis/BufferedStreamWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public enum WriteMode

public virtual bool IsSync => false;

public static BufferedStreamWriter Create(WriteMode mode, ConnectionType connectionType, Stream target, CancellationToken cancellationToken)
public static BufferedStreamWriter Create(WriteMode mode, ConnectionType connectionType, Stream target, ConfigurationOptions? options, CancellationToken cancellationToken)
{
if (connectionType is ConnectionType.Subscription | mode is WriteMode.Default)
{
Expand All @@ -67,8 +67,8 @@ public static BufferedStreamWriter Create(WriteMode mode, ConnectionType connect
}
return mode switch
{
WriteMode.Sync => new SwitchableBufferedStreamWriter(target, cancellationToken, initiallySync: true),
WriteMode.Async => new SwitchableBufferedStreamWriter(target, cancellationToken, initiallySync: false),
WriteMode.Sync => new SwitchableBufferedStreamWriter(options?.RequestCycleBufferPool, target, cancellationToken, initiallySync: true),
WriteMode.Async => new SwitchableBufferedStreamWriter(options?.RequestCycleBufferPool, target, cancellationToken, initiallySync: false),
WriteMode.Pipe => new PipeStreamWriter(target, cancellationToken),
_ => throw new ArgumentOutOfRangeException(nameof(mode)),
};
Expand Down Expand Up @@ -115,10 +115,10 @@ public virtual void DebugSetLog(Action<string> log) { }

internal abstract class CycleBufferStreamWriter : BufferedStreamWriter, ICycleBufferCallback
{
protected CycleBufferStreamWriter(Stream target, CancellationToken cancellationToken, StateFlags flags = StateFlags.None)
protected CycleBufferStreamWriter(CycleBufferPool? pool, Stream target, CancellationToken cancellationToken, StateFlags flags = StateFlags.None)
: base(target, cancellationToken)
{
_buffer = CycleBuffer.Create(callback: this);
_buffer = CycleBuffer.Create(pool: pool, callback: this);
_stateFlags = flags;
}

Expand Down
25 changes: 25 additions & 0 deletions src/StackExchange.Redis/ConfigurationOptions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Net;
using System.Net.Security;
Expand All @@ -12,6 +13,8 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using RESPite;
using RESPite.Buffers;
using StackExchange.Redis.Configuration;

namespace StackExchange.Redis
Expand Down Expand Up @@ -1413,5 +1416,27 @@ internal static bool TryParseRedisProtocol(string? value, out RedisProtocol prot
protocol = default;
return false;
}

/// <summary>
/// The buffer pool to use when buffering responses.
/// </summary>
public CycleBufferPool? ResponseCycleBufferPool
{
[Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)]
get;
[Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)]
set;
}

/// <summary>
/// The buffer pool to use when buffering requests.
/// </summary>
public CycleBufferPool? RequestCycleBufferPool
{
[Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)]
get;
[Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)]
set;
}
}
}
Loading
Loading