Skip to content

Commit

Permalink
Merge branch 'main' into importer
Browse files Browse the repository at this point in the history
  • Loading branch information
Scooletz committed Nov 12, 2024
2 parents d895fff + c6bf9d1 commit fbe34dd
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 24 deletions.
61 changes: 61 additions & 0 deletions src/Paprika.Tests/Chain/PrefetchingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,67 @@ public async Task Prefetches_properly_on_not_changed_structure()
}
}

[Test]
public async Task Makes_all_decompression_on_prefetch()
{
using var db = PagedDb.NativeMemoryDb(8 * 1024 * 1024, 2);
var merkle = new ComputeMerkleBehavior(ComputeMerkleBehavior.ParallelismNone);
await using var blockchain = new Blockchain(db, merkle);

// Create one block with some values, commit it and finalize
var hash = Keccak.EmptyTreeHash;

hash = BuildBlock(blockchain, hash, 1);
blockchain.Finalize(hash);
await blockchain.WaitTillFlush(hash);

hash = BuildBlock(blockchain, hash, 2);

return;

static Keccak BuildBlock(Blockchain blockchain, Keccak parent, uint number)
{
var isFirst = number == 1;

byte[] value = isFirst ? [17] : [23];

const int seed = 13;
const int contracts = 10;
const int slots = 10;

using var block = blockchain.StartNew(parent);
var random = new Random(seed);

// Open prefetcher on blocks beyond first
var prefetcher = isFirst == false ? block.OpenPrefetcher() : null;

for (var i = 0; i < contracts; i++)
{
var contract = random.NextKeccak();
prefetcher?.PrefetchAccount(contract);

if (isFirst)
{
block.SetAccount(contract, new Account(1, 1, Keccak.Zero, Keccak.Zero));
}

for (var j = 0; j < slots; j++)
{
var storage = random.NextKeccak();
prefetcher?.PrefetchStorage(contract, storage);
block.SetStorage(contract, storage, value);
}
}

prefetcher?.SpinTillPrefetchDone();

using (RlpMemo.NoDecompression())
{
return block.Commit(number);
}
}
}

private static void Set(Keccak[] accounts, uint account, IWorldState start, UInt256 bigNonce)
{
ref var k = ref accounts[account];
Expand Down
60 changes: 43 additions & 17 deletions src/Paprika/Chain/Blockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public Blockchain(IDb db, IPreCommitBehavior preCommit, TimeSpan? minFlushDelay
_cacheUsagePreCommit = _meter.CreateHistogram<int>("PreCommit transient cache usage per commit", "%",
"How much used was the transient cache");
_prefetchCount = _meter.CreateHistogram<int>("Prefetch count",
"Number of prefetches performed by the prefetcher", "count");
"Key count", "Keys prefetched in the background by the prefetcher");

// pool
_pool = new(1024, true, _meter);
Expand Down Expand Up @@ -739,7 +739,7 @@ public void Reset()

private class PreCommitPrefetcher : IDisposable, IPreCommitPrefetcher, IPrefetcherContext, IThreadPoolWorkItem
{
private bool _prefetchPossible = true;
private volatile bool _prefetchPossible = true;

private readonly ConcurrentQueue<(Keccak, Keccak)> _items = new();
private readonly BitFilter _prefetched;
Expand All @@ -750,6 +750,7 @@ private class PreCommitPrefetcher : IDisposable, IPreCommitPrefetcher, IPrefetch
private const int Working = 1;
private const int NotWorking = 0;
private volatile int _working = NotWorking;
private readonly Page _workspace;

private static readonly Keccak JustAccount = Keccak.Zero;

Expand All @@ -759,9 +760,10 @@ public PreCommitPrefetcher(PooledSpanDictionary cache, BlockState parent, Buffer
_parent = parent;
_pool = pool;
_prefetched = _parent._blockchain.CreateBitFilter();
_workspace = pool.Rent(false);
}

public bool CanPrefetchFurther => Volatile.Read(ref _prefetchPossible);
public bool CanPrefetchFurther => _prefetchPossible;

public void PrefetchAccount(in Keccak account)
{
Expand Down Expand Up @@ -790,6 +792,11 @@ private void EnsureRunning()
}
}

public void SpinTillPrefetchDone()
{
SpinWait.SpinUntil(() => _working == NotWorking);
}

private bool ShouldPrefetch(ulong hash) => _prefetched.AddAtomic(hash);

public void PrefetchStorage(in Keccak account, in Keccak storage)
Expand Down Expand Up @@ -822,15 +829,25 @@ public void PrefetchStorage(in Keccak account, in Keccak storage)

void IThreadPoolWorkItem.Execute()
{
while (CanPrefetchFurther && _items.TryDequeue(out (Keccak account, Keccak storage) item))
while (_items.TryDequeue(out (Keccak account, Keccak storage) item))
{
if (item.storage.Equals(JustAccount))
{
PreCommit.Prefetch(item.account, this);
}
else
lock (_cache)
{
PreCommit.Prefetch(item.account, item.storage, this);
if (_prefetchPossible == false)
{
// We leave _working set to Working so that next Prefetch operations
// never ensure that a task is running.
return;
}

if (item.storage.Equals(JustAccount))
{
PreCommit.Prefetch(item.account, this);
}
else
{
PreCommit.Prefetch(item.account, item.storage, this);
}
}
}

Expand All @@ -843,15 +860,17 @@ void IThreadPoolWorkItem.Execute()

public void BlockFurtherPrefetching()
{
// Mark as not possible to prefetch
Volatile.Write(ref _prefetchPossible, false);

// Spin until worker is done
SpinWait.SpinUntil(() => _working == NotWorking);
lock (_cache)
{
// Just set the prefetch possible to false and return.
// As every operation in IThreadPoolWorkItem.Execute takes this lock, it's safe.
// This has one additional benefit. There's no need to worry about whether a worker runs or not atm.
_prefetchPossible = false;
}
}

[SkipLocalsInit]
public ReadOnlySpanOwner<byte> Get(scoped in Key key, SpanFunc<EntryType> entryMapping)
public ReadOnlySpanOwner<byte> Get(scoped in Key key, TransformPrefetchedData transform)
{
if (CanPrefetchFurther == false)
{
Expand All @@ -877,7 +896,13 @@ public ReadOnlySpanOwner<byte> Get(scoped in Key key, SpanFunc<EntryType> entryM
var ancestor = _parent.TryGetAncestors(key, keyWritten, hash);

var span = ancestor.Span;
_cache.Set(keyWritten, hash, span, (byte)entryMapping(span));

// Transform data before storing them in the cache. This is done so that Decompress for example is run on
// this thread, no on the one that marks paths as dirty.
var transformed = transform(span, _workspace.Span, out var entryType);

// Store the transformed so that, if a buffer reuse occurs in the transform it can be done before the next one is called.
_cache.Set(keyWritten, hash, transformed, (byte)entryType);
_parent._filter.AddAtomic(hash);
PrefetchCount++;

Expand All @@ -895,6 +920,7 @@ public ReadOnlySpanOwner<byte> Get(scoped in Key key, SpanFunc<EntryType> entryM

public void Dispose()
{
_pool.Return(_workspace);
_prefetched.Return(_pool);
}
}
Expand Down
16 changes: 12 additions & 4 deletions src/Paprika/Chain/IPreCommitPrefetcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ public interface IPreCommitPrefetcher
/// <param name="account">The account to be prefetched.</param>
/// <param name="storage">The storage slot</param>
void PrefetchStorage(in Keccak account, in Keccak storage);

/// <summary>
/// <see cref="SpinWait.SpinUntil(System.Func{bool})"/> the prefetch is done.
/// </summary>
void SpinTillPrefetchDone();
}

/// <summary>
Expand All @@ -38,10 +43,13 @@ public interface IPrefetcherContext

/// <summary>
/// Tries to retrieve the result stored under the given key.
/// If it fails to get it from the current state, it will fetch it from the ancestors and store it accordingly to the
/// <paramref name="entryMapping"/>.
/// If it fails to get it from the current state,
/// it will fetch it from the ancestors and store it after transforming it with <paramref name="transform"/>.
/// </summary>
public ReadOnlySpanOwner<byte> Get(scoped in Key key, SpanFunc<EntryType> entryMapping);
public ReadOnlySpanOwner<byte> Get(scoped in Key key, TransformPrefetchedData transform);
}

public delegate TResult SpanFunc<TResult>(in ReadOnlySpan<byte> data);
/// <summary>
/// Transforms incoming <paramref name="data"/> to the result, providing the type of the entry as well.
/// </summary>
public delegate ReadOnlySpan<byte> TransformPrefetchedData(in ReadOnlySpan<byte> data, in Span<byte> workspace, out EntryType type);
37 changes: 34 additions & 3 deletions src/Paprika/Merkle/ComputeMerkleBehavior.cs
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ public void Set(in Key key, in ReadOnlySpan<byte> payload0, in ReadOnlySpan<byte

public IChildCommit GetChild() => new ChildCommit(parent, commit.GetChild());

public bool Owns(object? actualSpanOwner) => ReferenceEquals(actualSpanOwner, commit);
public bool Owns(object? actualSpanOwner) => ReferenceEquals(actualSpanOwner, parent);

public IReadOnlyDictionary<Keccak, int> Stats =>
throw new NotImplementedException("No stats for the child commit");
Expand Down Expand Up @@ -1438,8 +1438,39 @@ public void Prefetch(in Keccak account, in Keccak storage, IPrefetcherContext co
PrefetchImpl(account, storage, context);
}

private static ReadOnlySpan<byte> Transform(in ReadOnlySpan<byte> data, in Span<byte> workspace, out EntryType type)
{
if (data.IsEmpty || Node.Header.GetTypeFrom(data) != Node.Type.Branch)
{
type = EntryType.UseOnce;
return data;
}

Debug.Assert(Node.Header.GetTypeFrom(data) == Node.Type.Branch);

// Branch should be always persistent, already copied and ready to work with.
type = EntryType.Persistent;

var leftoverLength = Node.Branch.ReadFrom(data, out var branch).Length;
if (leftoverLength == RlpMemo.Size)
{
// Rlp memo is decompressed, good to be stored as is.
return data;
}

// RlpMemo not decompressed.

// Write branch first
var leftover = branch.WriteToWithLeftover(workspace);

// Decompress to the leftover
RlpMemo.Decompress(leftover, branch.Children, leftover);

return workspace[..(workspace.Length - leftover.Length + RlpMemo.Size)];
}

[SkipLocalsInit]
private static void PrefetchImpl(in Keccak account, in Keccak storage, IPrefetcherContext context)
private void PrefetchImpl(in Keccak account, in Keccak storage, IPrefetcherContext context)
{
var isAccountPrefetch = Unsafe.IsNullRef(in storage);
var accountPath = NibblePath.FromKey(account);
Expand All @@ -1461,7 +1492,7 @@ private static void PrefetchImpl(in Keccak account, in Keccak storage, IPrefetch
var leftoverPath = path.SliceFrom(i);

// Query for the node
using var owner = context.Get(key, GetEntryType);
using var owner = context.Get(key, Transform);

if (owner.IsEmpty)
{
Expand Down
31 changes: 31 additions & 0 deletions src/Paprika/Merkle/RlpMemo.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using Paprika.Crypto;
using Paprika.Data;
Expand Down Expand Up @@ -68,6 +69,11 @@ public bool TryGetKeccak(byte nibble, out ReadOnlySpan<byte> keccak)
public static RlpMemo Decompress(scoped in ReadOnlySpan<byte> leftover, NibbleSet.Readonly children,
scoped in Span<byte> workingSet)
{
if (_decompressionForbidden)
{
ThrowDecompressionForbidden();
}

var span = workingSet[..Size];

if (leftover.IsEmpty)
Expand Down Expand Up @@ -115,6 +121,10 @@ public static RlpMemo Decompress(scoped in ReadOnlySpan<byte> leftover, NibbleSe
}

return memo;

[DoesNotReturn]
[MethodImpl(MethodImplOptions.NoInlining)]
static void ThrowDecompressionForbidden() => throw new InvalidOperationException("Decompression is forbidden.");
}

[SkipLocalsInit]
Expand Down Expand Up @@ -168,4 +178,25 @@ public static int Compress(in Key key, scoped in ReadOnlySpan<byte> memoizedRlp,
// Return only children that were written
return at * Keccak.Size;
}

/// <summary>
/// Test only method to forbid decompression.
/// </summary>
/// <returns></returns>
public static NoDecompressionScope NoDecompression() => new();

private static volatile bool _decompressionForbidden;

public readonly struct NoDecompressionScope : IDisposable
{
public NoDecompressionScope()
{
_decompressionForbidden = true;
}

public void Dispose()
{
_decompressionForbidden = false;
}
}
}

0 comments on commit fbe34dd

Please sign in to comment.