diff --git a/src/Simpleverse.Repository.Db.Test/SqlServer/AcquireLocksTests.cs b/src/Simpleverse.Repository.Db.Test/SqlServer/AcquireLocksTests.cs new file mode 100644 index 0000000..bda705f --- /dev/null +++ b/src/Simpleverse.Repository.Db.Test/SqlServer/AcquireLocksTests.cs @@ -0,0 +1,146 @@ +using Microsoft.Data.SqlClient; +using Simpleverse.Repository.Db.SqlServer; +using StackExchange.Profiling.Data; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Xunit; +using Xunit.Abstractions; + +namespace Simpleverse.Repository.Db.Test.SqlServer +{ + [Collection("SqlServerCollection")] + public class AcquireLocksTests : DatabaseTestFixture + { + public AcquireLocksTests(DatabaseFixture fixture, ITestOutputHelper output) + : base(fixture, output) + { + } + + [Fact] + public async Task TryGetAcquireLocks_AllLocksAcquired_ReturnsTrue() + { + using (var connection = _fixture.GetProfiledConnection()) + { + connection.Open(); + var sqlConnection = (SqlConnection)((ProfiledDbConnection)connection).WrappedConnection; + using (var transaction = sqlConnection.BeginTransaction()) + { + var keys = new List { "101", "102", "103" }; + + // act + var result = await sqlConnection.TryGetAppLockAsync(keys, transaction: transaction); + + // assert + Assert.True(result); + + transaction.Commit(); + } + } + } + + [Fact] + public async Task TryGetAcquireLocks_ParallelThreads_ContentionTest() + { + var keys = new List { "201", "202", "203" }; + + async Task TryAcquireLocksAsync() + { + using var connection = _fixture.GetProfiledConnection(); + connection.Open(); + var sqlConnection = (SqlConnection)((ProfiledDbConnection)connection).WrappedConnection; + using var transaction = sqlConnection.BeginTransaction(); + var result = await sqlConnection.TryGetAppLockAsync(keys, transaction: transaction, lockTimeout: TimeSpan.FromMilliseconds(500)); + if (result) + transaction.Commit(); + else + transaction.Rollback(); + return result; + } + + // Run two tasks in parallel + var task1 = Task.Run(TryAcquireLocksAsync); + var task2 = Task.Run(TryAcquireLocksAsync); + + var results = await Task.WhenAll(task1, task2); + + // Only one should succeed in acquiring all locks + Assert.Equal(2, results.Count(r => r)); + } + + [Fact] + public async Task TryGetAcquireLocks_NullKeys_ThrowsArgumentException() + { + using var connection = _fixture.GetProfiledConnection(); + connection.Open(); + var sqlConnection = (SqlConnection)((ProfiledDbConnection)connection).WrappedConnection; + using var transaction = sqlConnection.BeginTransaction(); + + await Assert.ThrowsAsync(async () => + { + await sqlConnection.TryGetAppLockAsync(null, transaction: transaction); + }); + + transaction.Rollback(); + } + + [Fact] + public async Task TryGetAcquireLocks_EmptyKeys() + { + using var connection = _fixture.GetProfiledConnection(); + connection.Open(); + var sqlConnection = (SqlConnection)((ProfiledDbConnection)connection).WrappedConnection; + using var transaction = sqlConnection.BeginTransaction(); + + try + { + await sqlConnection.TryGetAppLockAsync(new List { }, transaction: transaction); + } + catch (ArgumentException ex) when (ex.Message.Contains("Keys collection must not be null or empty.", StringComparison.OrdinalIgnoreCase)) + { + transaction.Rollback(); + return; + } + } + + [Fact] + public async Task TryGetAcquireLocks_DuplicateKeys_ReturnsTrue() + { + using var connection = _fixture.GetProfiledConnection(); + connection.Open(); + var sqlConnection = (SqlConnection)((ProfiledDbConnection)connection).WrappedConnection; + using var transaction = sqlConnection.BeginTransaction(); + + var keys = new List { "301", "301", "302" }; + var result = await sqlConnection.TryGetAppLockAsync(keys, transaction: transaction); + + Assert.True(result); + + transaction.Commit(); + } + + [Fact] + public async Task TryGetAcquireLocks_LockTimeout_ReturnsFalse() + { + var keys = new List { "401", "402" }; + + using var connection1 = _fixture.GetProfiledConnection(); + connection1.Open(); + var sqlConnection1 = (SqlConnection)((ProfiledDbConnection)connection1).WrappedConnection; + using var transaction1 = sqlConnection1.BeginTransaction(); + await sqlConnection1.TryGetAppLockAsync(keys, transaction: transaction1); + + using var connection2 = _fixture.GetProfiledConnection(); + connection2.Open(); + var sqlConnection2 = (SqlConnection)((ProfiledDbConnection)connection2).WrappedConnection; + using var transaction2 = sqlConnection2.BeginTransaction(); + var result = await sqlConnection2.TryGetAppLockAsync(keys, transaction: transaction2, lockTimeout: TimeSpan.FromMilliseconds(100)); + + Assert.False(result); + + transaction1.Rollback(); + transaction2.Rollback(); + } + } +} diff --git a/src/Simpleverse.Repository.Db/Simpleverse.Repository.Db.csproj b/src/Simpleverse.Repository.Db/Simpleverse.Repository.Db.csproj index 64100be..aedbe08 100644 --- a/src/Simpleverse.Repository.Db/Simpleverse.Repository.Db.csproj +++ b/src/Simpleverse.Repository.Db/Simpleverse.Repository.Db.csproj @@ -13,10 +13,10 @@ true Dapper, Bulk, Merge, Upsert, Delete, Insert, Update, Repository LICENSE - 2.1.30 + 2.1.31 High performance operation for MS SQL Server built for Dapper ORM. Including bulk operations Insert, Update, Delete, Get as well as Upsert both single and bulk. - 2.1.30.0 - 2.1.30.0 + 2.1.31.0 + 2.1.31.0 https://github.com/lukaferlez/Simpleverse.Repository README.md true diff --git a/src/Simpleverse.Repository.Db/SqlServer/SqlConnectionExtensions.cs b/src/Simpleverse.Repository.Db/SqlServer/SqlConnectionExtensions.cs index 450afd1..3a53d4d 100644 --- a/src/Simpleverse.Repository.Db/SqlServer/SqlConnectionExtensions.cs +++ b/src/Simpleverse.Repository.Db/SqlServer/SqlConnectionExtensions.cs @@ -3,7 +3,9 @@ using System; using System.Collections.Generic; using System.Data; +using System.Linq; using System.Reflection; +using System.Threading; using System.Threading.Tasks; namespace Simpleverse.Repository.Db.SqlServer @@ -88,6 +90,59 @@ select @result return result == 0; } + public static async Task ReleaseAppLockAsync(this SqlConnection connection, IEnumerable keys, IDbTransaction transaction = null) + { + bool allReleased = true; + + foreach (var key in keys) + { + var released = await connection.ReleaseAppLockAsync(key, transaction); + if (!released) + allReleased = false; + } + + return allReleased; + } + + public static async Task TryGetAppLockAsync(this SqlConnection connection, IEnumerable keys, int retryTimeout = 100, int numberOfRetries = 3, IDbTransaction transaction = null, TimeSpan? lockTimeout = null) + { + if (keys == null || !keys.Any()) + throw new ArgumentException("Keys collection must not be null or empty.", nameof(keys)); + if (numberOfRetries < 1) + throw new ArgumentOutOfRangeException(nameof(numberOfRetries), "Number of retries must be at least 1."); + if (retryTimeout < 0) + throw new ArgumentOutOfRangeException(nameof(retryTimeout), "Retry timeout must be non-negative."); + + bool allLocked = true; + + for (int retry = 0; retry < numberOfRetries; retry++) + { + int lastAttemptedIndex = -1; + allLocked = true; + + foreach (var key in keys) + { + var locked = await connection.GetAppLockAsync(key, transaction, lockTimeout); + + if (!locked) + { + allLocked = false; + break; + } + + lastAttemptedIndex++; + } + + if (allLocked) + break; + + await connection.ReleaseAppLockAsync(keys.Take(lastAttemptedIndex + 1), transaction); + await Task.Delay(retryTimeout); + } + + return allLocked; + } + public static Task ExecuteWithAppLockAsync( this SqlConnection conn, string resourceIdentifier,