Skip to content

Commit 34e00f8

Browse files
Increases underload
1 parent 8458dd3 commit 34e00f8

12 files changed

+287
-209
lines changed

EntityFrameworkCore.Sqlite.Concurrency/EFCore.Sqlite.Concurrency.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,14 @@ Get started in one line. Stop compromising on SQLite reliability and speed.
8686
<PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite" Version="10.0.0" />
8787
<PackageReference Include="Microsoft.Data.Sqlite" Version="10.0.0" />
8888
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.0" />
89+
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="10.0.0" />
8990
<PackageReference Include="System.Threading.Channels" Version="8.0.0" />
9091
<!-- SourceLink for debugging support -->
9192
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" PrivateAssets="All" />
9293
</ItemGroup>
9394

9495
<!-- Optional Dependencies (Conditional) -->
9596
<ItemGroup>
96-
<PackageReference Include="EFCore.BulkExtensions.Sqlite" Version="10.0.0" Condition="'$(IncludeBulkExtensions)' == 'true'" />
9797
<PackageReference Include="MemoryPack" Version="2.0.0" Condition="'$(IncludeMemoryPack)' == 'true'" />
9898
<PackageReference Include="Spectre.Console" Version="0.54.0" Condition="'$(IncludeSpectre)' == 'true'" />
9999
</ItemGroup>
Binary file not shown.

EntityFrameworkCore.Sqlite.Concurrency/bin/Debug/net10.0/EFCore.Sqlite.Concurrency.deps.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
"EFCore.Sqlite.Concurrency/10.0.2": {
1010
"dependencies": {
1111
"Microsoft.EntityFrameworkCore.Sqlite": "10.0.0",
12+
"Microsoft.Extensions.DependencyInjection": "10.0.0",
1213
"Microsoft.Extensions.Logging.Abstractions": "10.0.0"
1314
},
1415
"runtime": {
Binary file not shown.
Binary file not shown.

EntityFrameworkCore.Sqlite.Concurrency/src/Models/SqliteConcurrencyOptions.cs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,8 @@ namespace EntityFrameworkCore.Sqlite.Concurrency.Models;
22

33
public class SqliteConcurrencyOptions
44
{
5-
public bool UseWriteQueue { get; set; } = true;
65
public int MaxRetryAttempts { get; set; } = 3;
76
public TimeSpan BusyTimeout { get; set; } = TimeSpan.FromSeconds(30);
8-
public bool EnableWalCheckpointManagement { get; set; } = true;
97
public int CommandTimeout { get; set; } = 300; // 5 minutes
108
public int WalAutoCheckpoint { get; set; } = 1000;
11-
public bool EnableMemoryPack { get; set; } = false;
129
}

EntityFrameworkCore.Sqlite.Concurrency/src/SqliteConcurrencyExtensions.cs

Lines changed: 32 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,8 @@ public static DbContextOptionsBuilder UseSqliteWithConcurrency(
3838
SetBusyTimeout(sqliteConnection, options.BusyTimeout);
3939

4040
// Additional optimizations
41-
if (options.UseWriteQueue)
42-
{
43-
ConfigureForWriteQueue(sqliteConnection);
44-
}
45-
46-
if (options.EnableWalCheckpointManagement)
47-
{
48-
SetWalCheckpoint(sqliteConnection, options.WalAutoCheckpoint);
49-
}
41+
ConfigureForWriteQueue(sqliteConnection);
42+
SetWalCheckpoint(sqliteConnection, options.WalAutoCheckpoint);
5043
}
5144
}
5245
};
@@ -58,11 +51,9 @@ public static DbContextOptionsBuilder UseSqliteWithConcurrency(
5851
sqliteOptions.CommandTimeout(options.CommandTimeout);
5952
});
6053

61-
// Add interceptors if using write queue
62-
if (options.UseWriteQueue)
63-
{
64-
optionsBuilder.AddInterceptors(new SqliteConcurrencyInterceptor(options));
65-
}
54+
// Always add interceptors for write queue
55+
var interceptor = SqliteConnectionEnhancer.GetInterceptor(enhancedConnectionString, options);
56+
optionsBuilder.AddInterceptors(interceptor);
6657

6758
return optionsBuilder;
6859
}
@@ -95,36 +86,41 @@ public static async Task BulkInsertOptimizedAsync<T>(
9586
IEnumerable<T> entities,
9687
CancellationToken cancellationToken = default) where T : class
9788
{
98-
await using var transaction = await context.Database.BeginTransactionAsync(cancellationToken);
99-
await context.Database.ExecuteSqlRawAsync("BEGIN IMMEDIATE", cancellationToken);
89+
if (SqliteConnectionEnhancer.IsWriteLockHeld.Value)
90+
{
91+
await context.AddRangeAsync(entities, cancellationToken);
92+
await context.SaveChangesAsync(cancellationToken);
93+
return;
94+
}
95+
96+
var connectionString = context.Database.GetDbConnection().ConnectionString;
97+
var enhancedConnectionString = SqliteConnectionEnhancer.GetOptimizedConnectionString(connectionString);
98+
var writeLock = SqliteConnectionEnhancer.GetWriteLock(enhancedConnectionString);
10099

101-
// Check if EFCore.BulkExtensions is available via reflection
102-
var bulkExtensionsType =
103-
Type.GetType("EFCore.BulkExtensions.SqliteBulkExtensions, EFCore.BulkExtensions.Sqlite");
104-
if (bulkExtensionsType != null)
100+
await writeLock.WaitAsync(cancellationToken);
101+
SqliteConnectionEnhancer.IsWriteLockHeld.Value = true;
102+
103+
try
105104
{
106-
var method = bulkExtensionsType.GetMethod("BulkInsertAsync",
107-
new[] { typeof(DbContext), typeof(IEnumerable<T>), typeof(CancellationToken) });
105+
await using var transaction = await context.Database.BeginTransactionAsync(cancellationToken);
106+
107+
// Batch inserts
108+
var batchSize = 1000;
109+
var batches = entities.Chunk(batchSize);
108110

109-
if (method != null)
111+
foreach (var batch in batches)
110112
{
111-
await (Task)method.Invoke(null, new object[] { context, entities, cancellationToken });
112-
await transaction.CommitAsync(cancellationToken);
113-
return;
113+
await context.AddRangeAsync(batch, cancellationToken);
114+
await context.SaveChangesAsync(cancellationToken);
114115
}
115-
}
116-
117-
// Fallback to batch inserts
118-
var batchSize = 1000;
119-
var batches = entities.Chunk(batchSize);
120116

121-
foreach (var batch in batches)
117+
await transaction.CommitAsync(cancellationToken);
118+
}
119+
finally
122120
{
123-
await context.AddRangeAsync(batch, cancellationToken);
124-
await context.SaveChangesAsync(cancellationToken);
121+
SqliteConnectionEnhancer.IsWriteLockHeld.Value = false;
122+
writeLock.Release();
125123
}
126-
127-
await transaction.CommitAsync(cancellationToken);
128124
}
129125

130126
private static void ConfigureForWriteQueue(SqliteConnection connection)

EntityFrameworkCore.Sqlite.Concurrency/src/SqliteConcurrencyInterceptor.cs

Lines changed: 107 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,17 @@
88

99
namespace EntityFrameworkCore.Sqlite.Concurrency;
1010

11-
public class SqliteConcurrencyInterceptor : DbCommandInterceptor, IAsyncDisposable
11+
public class SqliteConcurrencyInterceptor : DbCommandInterceptor
1212
{
1313
private readonly SqliteConcurrencyOptions _options;
14-
private readonly SemaphoreSlim _writeLock = new(1, 1);
14+
private readonly SemaphoreSlim _writeLock;
1515
private readonly Channel<Func<ValueTask>> _writeQueue;
1616
private readonly Task _queueProcessor;
17-
private bool _disposed;
1817

19-
public SqliteConcurrencyInterceptor(SqliteConcurrencyOptions options)
18+
public SqliteConcurrencyInterceptor(SqliteConcurrencyOptions options, string connectionString)
2019
{
2120
_options = options;
21+
_writeLock = SqliteConnectionEnhancer.GetWriteLock(connectionString);
2222
_writeQueue = Channel.CreateUnbounded<Func<ValueTask>>(new UnboundedChannelOptions
2323
{
2424
SingleReader = true,
@@ -36,41 +36,131 @@ public override async ValueTask<InterceptionResult<DbDataReader>> ReaderExecutin
3636
{
3737
if (IsWriteCommand(command.CommandText))
3838
{
39+
if (SqliteConnectionEnhancer.IsWriteLockHeld.Value)
40+
{
41+
UpgradeToBeginImmediate(command);
42+
return await base.ReaderExecutingAsync(command, eventData, result, cancellationToken);
43+
}
44+
3945
var tcs = new TaskCompletionSource<InterceptionResult<DbDataReader>>();
4046
await _writeQueue.Writer.WriteAsync(async () =>
4147
{
4248
try
4349
{
44-
// NO BEGIN IMMEDIATE - let EF Core handle transactions
45-
result = await base.ReaderExecutingAsync(command, eventData, result, cancellationToken);
46-
tcs.SetResult(result);
50+
UpgradeToBeginImmediate(command);
51+
var r = await base.ReaderExecutingAsync(command, eventData, result, cancellationToken);
52+
tcs.SetResult(r);
4753
}
4854
catch (Exception ex)
4955
{
5056
tcs.SetException(ex);
5157
}
5258
}, cancellationToken);
5359

54-
await tcs.Task;
55-
return result;
60+
return await tcs.Task;
5661
}
5762

5863
return await base.ReaderExecutingAsync(command, eventData, result, cancellationToken);
5964
}
6065

61-
// Same pattern for NonQueryExecutingAsync and ScalarExecutingAsync...
66+
public override async ValueTask<InterceptionResult<int>> NonQueryExecutingAsync(
67+
DbCommand command,
68+
CommandEventData eventData,
69+
InterceptionResult<int> result,
70+
CancellationToken cancellationToken = default)
71+
{
72+
if (IsWriteCommand(command.CommandText))
73+
{
74+
if (SqliteConnectionEnhancer.IsWriteLockHeld.Value)
75+
{
76+
UpgradeToBeginImmediate(command);
77+
return await base.NonQueryExecutingAsync(command, eventData, result, cancellationToken);
78+
}
79+
80+
var tcs = new TaskCompletionSource<InterceptionResult<int>>();
81+
await _writeQueue.Writer.WriteAsync(async () =>
82+
{
83+
try
84+
{
85+
UpgradeToBeginImmediate(command);
86+
var r = await base.NonQueryExecutingAsync(command, eventData, result, cancellationToken);
87+
tcs.SetResult(r);
88+
}
89+
catch (Exception ex)
90+
{
91+
tcs.SetException(ex);
92+
}
93+
}, cancellationToken);
94+
95+
return await tcs.Task;
96+
}
97+
98+
return await base.NonQueryExecutingAsync(command, eventData, result, cancellationToken);
99+
}
100+
101+
public override async ValueTask<InterceptionResult<object>> ScalarExecutingAsync(
102+
DbCommand command,
103+
CommandEventData eventData,
104+
InterceptionResult<object> result,
105+
CancellationToken cancellationToken = default)
106+
{
107+
if (IsWriteCommand(command.CommandText))
108+
{
109+
if (SqliteConnectionEnhancer.IsWriteLockHeld.Value)
110+
{
111+
UpgradeToBeginImmediate(command);
112+
return await base.ScalarExecutingAsync(command, eventData, result, cancellationToken);
113+
}
114+
115+
var tcs = new TaskCompletionSource<InterceptionResult<object>>();
116+
await _writeQueue.Writer.WriteAsync(async () =>
117+
{
118+
try
119+
{
120+
UpgradeToBeginImmediate(command);
121+
var r = await base.ScalarExecutingAsync(command, eventData, result, cancellationToken);
122+
tcs.SetResult(r);
123+
}
124+
catch (Exception ex)
125+
{
126+
tcs.SetException(ex);
127+
}
128+
}, cancellationToken);
129+
130+
return await tcs.Task;
131+
}
132+
133+
return await base.ScalarExecutingAsync(command, eventData, result, cancellationToken);
134+
}
135+
136+
private static void UpgradeToBeginImmediate(DbCommand command)
137+
{
138+
var text = command.CommandText.Trim();
139+
if (text.StartsWith("BEGIN", StringComparison.OrdinalIgnoreCase) &&
140+
!text.Contains("IMMEDIATE", StringComparison.OrdinalIgnoreCase) &&
141+
!text.Contains("EXCLUSIVE", StringComparison.OrdinalIgnoreCase))
142+
{
143+
if (text.Equals("BEGIN", StringComparison.OrdinalIgnoreCase) ||
144+
text.Equals("BEGIN TRANSACTION", StringComparison.OrdinalIgnoreCase))
145+
{
146+
command.CommandText = "BEGIN IMMEDIATE";
147+
}
148+
}
149+
}
62150

63151
private async Task ProcessWriteQueue()
64152
{
65153
await foreach (var writeOperation in _writeQueue.Reader.ReadAllAsync())
66154
{
67155
await _writeLock.WaitAsync();
156+
SqliteConnectionEnhancer.IsWriteLockHeld.Value = true;
68157
try
69158
{
70159
await writeOperation();
71160
}
72161
finally
73162
{
163+
SqliteConnectionEnhancer.IsWriteLockHeld.Value = false;
74164
_writeLock.Release();
75165
}
76166
}
@@ -85,25 +175,20 @@ private static bool IsWriteCommand(string commandText)
85175

86176
// Skip SELECT and read-only PRAGMAs
87177
if (normalized.StartsWith("SELECT") ||
88-
(normalized.StartsWith("PRAGMA") && normalized.Contains("TABLE_INFO")))
178+
(normalized.StartsWith("PRAGMA") && (normalized.Contains("TABLE_INFO") ||
179+
normalized.Contains("INDEX_LIST") ||
180+
normalized.Contains("INDEX_INFO") ||
181+
normalized.Contains("FOREIGN_KEY_LIST"))))
89182
return false;
90183

91184
return normalized.StartsWith("INSERT") ||
92185
normalized.StartsWith("UPDATE") ||
93186
normalized.StartsWith("DELETE") ||
94187
normalized.StartsWith("CREATE") ||
95188
normalized.StartsWith("DROP") ||
96-
normalized.StartsWith("ALTER");
189+
normalized.StartsWith("ALTER") ||
190+
normalized.StartsWith("BEGIN") ||
191+
normalized.StartsWith("PRAGMA");
97192
}
98193

99-
public async ValueTask DisposeAsync()
100-
{
101-
if (_disposed) return;
102-
103-
_writeQueue.Writer.Complete();
104-
await _queueProcessor;
105-
_writeLock.Dispose();
106-
107-
_disposed = true;
108-
}
109194
}

0 commit comments

Comments
 (0)