Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added basic batch support #226

Merged
merged 12 commits into from
Mar 9, 2022
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) IEvangelist. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Net;
using Microsoft.Azure.Cosmos;

namespace Microsoft.Azure.CosmosRepository.Exceptions
{
/// <summary>
/// Details an error when performing a batch operation for a given TItem
/// </summary>
/// <typeparam name="TItem"></typeparam>
public class BatchOperationException<TItem> : Exception
where TItem : IItem
{
/// <summary>
/// The response from the batch operation.
/// </summary>
public TransactionalBatchResponse Response { get; }

/// <summary>
/// The status code return from the <see cref="TransactionalBatchResponse"/>
/// </summary>
public HttpStatusCode StatusCode => Response.StatusCode;

/// <summary>
/// Creates <see cref="BatchOperationException{TItem}"/>
/// </summary>
/// <param name="response"></param>
public BatchOperationException(TransactionalBatchResponse response) : base(
$"Failed to execute the batch operation for {typeof(TItem).Name}")
{
Response = response;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public static IServiceCollection AddCosmosRepository(
.AddSingleton<ICosmosUniqueKeyPolicyProvider, DefaultCosmosUniqueKeyPolicyProvider>()
.AddSingleton(typeof(IReadOnlyRepository<>), typeof(DefaultRepository<>))
.AddSingleton(typeof(IWriteOnlyRepository<>), typeof(DefaultRepository<>))
.AddSingleton(typeof(IBatchRepository<>), typeof(DefaultRepository<>))
.AddSingleton(typeof(IRepository<>), typeof(DefaultRepository<>))
.AddSingleton<IRepositoryFactory, DefaultRepositoryFactory>()
.AddSingleton<ICosmosItemConfigurationProvider, DefaultCosmosItemConfigurationProvider>()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) IEvangelist. All rights reserved.
// Licensed under the MIT License.

using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace Microsoft.Azure.CosmosRepository.Extensions
{
/// <summary>
/// A set of useful extension methods for a <see cref="ValueTask{TResult}"/>
/// </summary>
public static class ValueTaskExtensions
{
/// <summary>
/// Converts a <see cref="ValueTask"/> of <see cref="IEnumerable{T}"/> to a <see cref="List{T}"/>
/// </summary>
/// <param name="valueTask">The value task</param>
/// <typeparam name="T">The type of <see cref="IEnumerable{T}"/></typeparam>
/// <returns></returns>
mumby0168 marked this conversation as resolved.
Show resolved Hide resolved
public static async ValueTask<List<T>> ToListAsync<T>(this ValueTask<IEnumerable<T>> valueTask)
{
IEnumerable<T> e = await valueTask;
return e.ToList();
mumby0168 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright (c) IEvangelist. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.CosmosRepository.Exceptions;

// ReSharper disable once CheckNamespace
namespace Microsoft.Azure.CosmosRepository
{
internal partial class DefaultRepository<TItem>
{
/// <inheritdoc />
public async ValueTask UpdateAsBatchAsync(
IEnumerable<TItem> items,
CancellationToken cancellationToken = default)
{
List<TItem> list = items.ToList();

string partitionKey = GetPartitionKeyValue(list);

Container container = await _containerProvider.GetContainerAsync();

TransactionalBatch batch = container.CreateTransactionalBatch(new PartitionKey(partitionKey));

foreach (TItem item in list)
{
TransactionalBatchItemRequestOptions options = new();

if (item is IItemWithEtag itemWithEtag)
{
options.IfMatchEtag = itemWithEtag.Etag;
}

batch.UpsertItem(item, options);
}

using TransactionalBatchResponse response = await batch.ExecuteAsync(cancellationToken);

if (!response.IsSuccessStatusCode)
{
throw new BatchOperationException<TItem>(response);
}
}

/// <inheritdoc />
public async ValueTask CreateAsBatchAsync(
IEnumerable<TItem> items,
CancellationToken cancellationToken = default)
{
List<TItem> list = items.ToList();

string partitionKey = GetPartitionKeyValue(list);

Container container = await _containerProvider.GetContainerAsync();

TransactionalBatch batch = container.CreateTransactionalBatch(new PartitionKey(partitionKey));

foreach (TItem item in list)
{
batch.CreateItem(item);
}

using TransactionalBatchResponse response = await batch.ExecuteAsync(cancellationToken);

if (!response.IsSuccessStatusCode)
{
throw new BatchOperationException<TItem>(response);
}
}

public async ValueTask DeleteAsBatchAsync(
IEnumerable<TItem> items,
CancellationToken cancellationToken = default)
{
List<TItem> list = items.ToList();

string partitionKey = GetPartitionKeyValue(list);

Container container = await _containerProvider.GetContainerAsync();

TransactionalBatch batch = container.CreateTransactionalBatch(new PartitionKey(partitionKey));

foreach (TItem item in list)
{
batch.DeleteItem(item.Id);
}

using TransactionalBatchResponse response = await batch.ExecuteAsync(cancellationToken);

if (!response.IsSuccessStatusCode)
{
throw new BatchOperationException<TItem>(response);
}
}

private static string GetPartitionKeyValue(List<TItem> items)
{
if (!items.Any())
{
throw new ArgumentException(
"Unable to perform batch operation with no items",
nameof(items));
}

return items[0].PartitionKey;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) IEvangelist. All rights reserved.
// Licensed under the MIT License.

// ReSharper disable once CheckNamespace

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.CosmosRepository.Exceptions;

namespace Microsoft.Azure.CosmosRepository
{
/// <summary>
/// This is the batch enabled repository interface for any implementation of
/// <typeparamref name="TItem"/>, exposing asynchronous batch update and create functionality.
/// </summary>
/// <typeparam name="TItem">The <see cref="IItem"/> implementation class type.</typeparam>
/// <example>
/// With DI, use .ctor injection to require any implementation of <see cref="IItem"/>:
/// <code language="c#">
/// <![CDATA[
/// public class ConsumingService
/// {
/// readonly IBatchRepository<SomePoco> _pocoRepository;
///
/// public ConsumingService(
/// IBatchRepository<SomePoco> pocoRepository) =>
/// _pocoRepository = pocoRepository;
/// }
/// ]]>
/// </code>
/// </example>
public interface IBatchRepository<in TItem>
where TItem : IItem
{
/// <summary>
/// Updates an <see cref="IEnumerable{TItem}"/> as a batch.
/// </summary>
/// <param name="items">The items to update.</param>
/// <param name="cancellationToken">A token to cancel the async operation.</param>
/// <exception cref="BatchOperationException{TItem}">Thrown when the batch operation fails</exception>
/// <returns>An <see cref="ValueTask"/> that represents the async batch operation.</returns>
ValueTask UpdateAsBatchAsync(
IEnumerable<TItem> items,
CancellationToken cancellationToken = default);

/// <summary>
/// Creates an <see cref="IEnumerable{TItem}"/> as a batch.
/// </summary>
/// <param name="items">The items to create.</param>
/// <param name="cancellationToken">A token to cancel the async operation.</param>
/// <exception cref="BatchOperationException{TItem}">Thrown when the batch operation fails</exception>
/// <returns>An <see cref="ValueTask"/> that represents the async batch operation.</returns>
ValueTask CreateAsBatchAsync(
IEnumerable<TItem> items,
CancellationToken cancellationToken = default);

/// <summary>
/// Deletes an <see cref="IEnumerable{TItem}"/> as a batch.
/// </summary>
/// <param name="items">The items to create.</param>
/// <param name="cancellationToken">A token to cancel the async operation.</param>
/// <exception cref="BatchOperationException{TItem}">Thrown when the batch operation fails</exception>
/// <returns>An <see cref="ValueTask"/> that represents the async batch operation.</returns>
ValueTask DeleteAsBatchAsync(
IEnumerable<TItem> items,
CancellationToken cancellationToken = default);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ namespace Microsoft.Azure.CosmosRepository
/// </example>
public interface IRepository<TItem> :
IReadOnlyRepository<TItem>,
IWriteOnlyRepository<TItem>
IWriteOnlyRepository<TItem>,
IBatchRepository<TItem>
where TItem : IItem
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

// ReSharper disable once CheckNamespace
namespace Microsoft.Azure.CosmosRepository
{
/// <inheritdoc/>
Expand Down Expand Up @@ -447,5 +448,26 @@ public async ValueTask<TResult> QueryAsync<TResult>(

private void MismatchedEtags() =>
throw new CosmosException(string.Empty, HttpStatusCode.PreconditionFailed, 0, string.Empty, 0);

public ValueTask UpdateAsBatchAsync(
IEnumerable<TItem> items,
CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
mumby0168 marked this conversation as resolved.
Show resolved Hide resolved

public ValueTask CreateAsBatchAsync(
IEnumerable<TItem> items,
CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
mumby0168 marked this conversation as resolved.
Show resolved Hide resolved

public ValueTask DeleteAsBatchAsync(
IEnumerable<TItem> items,
CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
mumby0168 marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public async Task Create_Rating_For_Product_Is_Replicated_To_Be_Partitioned_By_C

await _ratingsRepository.CreateAsync(tvRating);

await WaitFor(1);
await WaitFor(3);

IEnumerable<RatingByCategory> results = await _ratingsByCategoryRepository
.GetAsync(x => x.PartitionKey == TechnologyCategoryId &&
Expand Down
Loading