Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 99 additions & 4 deletions src/Dan.Plugin.Tilda/Functions/Timers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,22 @@
using Dan.Plugin.Tilda.Clients;
using Dan.Plugin.Tilda.Exceptions;
using Dan.Plugin.Tilda.Interfaces;
using Dan.Plugin.Tilda.Models.AlertMessages;
using Dan.Plugin.Tilda.Services;
using Dan.Plugin.Tilda.Utils;
using Dan.Tilda.Models.Alerts;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;

namespace Dan.Plugin.Tilda.Functions;

public class Timers(
ITildaSourceProvider tildaSourceProvider,
IMtamCounterClient mtamCounterClient,
IAlertMessageSender alertMessageSender)
IAlertMessageSender alertMessageSender,
IConnectionMultiplexer connectionMultiplexer,
IBrregService brregService,
ILogger<Timers> logger)
{
// MTAM - Melding til annen myndighet (message to other authority/auditor)
[Function("MtamTimer")]
Expand All @@ -36,7 +39,7 @@ public async Task MessageToOtherAuditorsTimer(
{
messages = await mtamSource.GetAlertMessagesAsync(from);
}
catch (FailedToFetchDataException e)
catch (FailedToFetchDataException)
{
// If we fail to fetch alert messages from a source, we should continue with the rest of sources
// but not update this source's counter so we'll try to fetch from the same time again next attempt
Expand All @@ -54,4 +57,96 @@ public async Task MessageToOtherAuditorsTimer(
await mtamCounterClient.UpsertMtamCounter(mtamCounter);
}
}

[Function("CacheRefreshTimer")]
public async Task CacheRefreshTimer(
//[HttpTrigger(AuthorizationLevel.Function, "get", "post")] HttpRequestData req, // easier to test locally using http trigger
[TimerTrigger("%CacheRefreshCron%")] TimerInfo timerInfo,
FunctionContext context)
{
var db = connectionMultiplexer.GetDatabase();
var now = DateTime.UtcNow;
Comment thread
SondreJDigdir marked this conversation as resolved.
var mainunitKeys = new List<string>();
var subunitKeys = new List<string>();
const string mainKeyPrefix = "Tilda-Cache_Absolute_GET_https://data.brreg.no/enhetsregisteret/api/enheter/";
const string subunitKeyPrefix = "Tilda-Cache_Absolute_GET_https://data.brreg.no/enhetsregisteret/api/enheter/?overordnetEnhet=";

var keys = GetKeysAsync("Tilda-Cache_Absolute*");
await foreach (var key in keys)
{
var expiry = await db.KeyExpireTimeAsync(key);
if (expiry is null)
{
logger.LogInformation("Expiry for key {key} is null", key);
continue;
}
var timeLeft = expiry - now;
if (!(timeLeft <= TimeSpan.FromMinutes(11)))
{
continue;
}

if (key.StartsWith(subunitKeyPrefix))
{
var subKey = key.Replace(subunitKeyPrefix, "");
subunitKeys.Add(subKey);
}
else if (key.StartsWith(mainKeyPrefix))
{
var mainKey = key.Replace(mainKeyPrefix, "");
mainunitKeys.Add(mainKey);
}
}

if (mainunitKeys.Count == 0 && subunitKeys.Count == 0)
{
return;
}

logger.LogInformation("Refreshing {mainKeys} main keys and {subunitKeys} subunit keys", mainunitKeys.Count, subunitKeys.Count);

var tasks = new List<Task>();
foreach (var key in mainunitKeys)
{
tasks.Add(RefreshOrganisationEntry(key, includeSubunits: false));
}

foreach (var key in subunitKeys)
{
tasks.Add(RefreshOrganisationEntry(key, includeSubunits:true));
}

await Task.WhenAll(tasks);
Comment on lines +108 to +119
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cat -n src/Dan.Plugin.Tilda/Functions/Timers.cs | sed -n '100,130p'

Repository: data-altinn-no/plugin-tilda

Length of output: 1213


🏁 Script executed:

cat -n src/Dan.Plugin.Tilda/Functions/Timers.cs | sed -n '80,120p'

Repository: data-altinn-no/plugin-tilda

Length of output: 1602


🏁 Script executed:

cat -n src/Dan.Plugin.Tilda/Functions/Timers.cs | sed -n '60,85p'

Repository: data-altinn-no/plugin-tilda

Length of output: 1445


🏁 Script executed:

find src/Dan.Plugin.Tilda -name "*.csproj" -o -name "*.fsproj" | xargs cat | grep -A 5 "TargetFramework"

Repository: data-altinn-no/plugin-tilda

Length of output: 281


🏁 Script executed:

rg -A 10 "async.*RefreshOrganisationEntry" src/Dan.Plugin.Tilda/Functions/Timers.cs

Repository: data-altinn-no/plugin-tilda

Length of output: 679


🏁 Script executed:

rg "MaxDegreeOfParallelism|ParallelOptions|Parallel.ForEachAsync" src/Dan.Plugin.Tilda/

Repository: data-altinn-no/plugin-tilda

Length of output: 53


🏁 Script executed:

find src/Dan.Plugin.Tilda -name "appsettings*" -o -name "*.json" | head -5

Repository: data-altinn-no/plugin-tilda

Length of output: 399


🏁 Script executed:

rg "IConfiguration|ConfigurationBuilder|options\." src/Dan.Plugin.Tilda/Functions/Timers.cs | head -10

Repository: data-altinn-no/plugin-tilda

Length of output: 53


🏁 Script executed:

find src/Dan.Plugin.Tilda -name "Program.cs" -o -name "Startup.cs" | xargs cat | head -80

Repository: data-altinn-no/plugin-tilda

Length of output: 3067


🏁 Script executed:

find src/Dan.Plugin.Tilda -name "local.settings.json" -o -name "appsettings.json"

Repository: data-altinn-no/plugin-tilda

Length of output: 53


🏁 Script executed:

find src/Dan.Plugin.Tilda -name "Settings.cs" -o -path "*/Config/*" | head -5

Repository: data-altinn-no/plugin-tilda

Length of output: 197


🏁 Script executed:

rg -l "class Settings" src/Dan.Plugin.Tilda/ | head -3

Repository: data-altinn-no/plugin-tilda

Length of output: 110


🏁 Script executed:

cat -n src/Dan.Plugin.Tilda/Config/Settings.cs

Repository: data-altinn-no/plugin-tilda

Length of output: 2138


Bound refresh concurrency to avoid BRREG thundering herd.

The code at lines 108-119 launches one concurrent task per expired key with no cap. When many cache entries approach expiry together (within the 11-minute window), this can spike outbound traffic to BRREG and spike worker resource usage. Implement concurrency limiting with Parallel.ForEachAsync and MaxDegreeOfParallelism to control the refresh rate.

🛡️ Suggested change
-        var tasks = new List<Task>();
-        foreach (var key in mainunitKeys)
-        {
-            tasks.Add(RefreshOrganisationEntry(key, includeSubunits: false));
-        }
-
-        foreach (var key in subunitKeys)
-        {
-            tasks.Add(RefreshOrganisationEntry(key, includeSubunits:true));
-        }
-
-        await Task.WhenAll(tasks);
+        var maxParallelRefresh = 10; // consider moving to configuration
+        var refreshTargets = mainunitKeys.Select(k => (Org: k, IncludeSubunits: false))
+            .Concat(subunitKeys.Select(k => (Org: k, IncludeSubunits: true)));
+
+        await Parallel.ForEachAsync(
+            refreshTargets,
+            new ParallelOptions { MaxDegreeOfParallelism = maxParallelRefresh },
+            async (target, _) => await RefreshOrganisationEntry(target.Org, target.IncludeSubunits));
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Dan.Plugin.Tilda/Functions/Timers.cs` around lines 108 - 119, The loop
creates an unbounded number of concurrent RefreshOrganisationEntry tasks (for
mainunitKeys and subunitKeys) which can cause a thundering herd; replace the
manual tasks list with bounded concurrency using Parallel.ForEachAsync (or
Parallel.ForEachAsync combined over mainunitKeys and subunitKeys) and set
MaxDegreeOfParallelism to a sensible limit (e.g., a configurable value) so
RefreshOrganisationEntry is invoked with includeSubunits false for mainunitKeys
and true for subunitKeys under controlled concurrency; ensure you await the
Parallel.ForEachAsync calls and propagate exceptions as before.

}

private async IAsyncEnumerable<string> GetKeysAsync(string pattern)
{
if (string.IsNullOrWhiteSpace(pattern))
{
throw new ArgumentException("Value cannot be null or whitespace.", nameof(pattern));
}

foreach (var endpoint in connectionMultiplexer.GetEndPoints())
{
var server = connectionMultiplexer.GetServer(endpoint);
await foreach (var key in server.KeysAsync(pattern: pattern))
{
yield return key.ToString();
}
}
}

private async Task RefreshOrganisationEntry(string org, bool includeSubunits)
{
try
{
await brregService.GetFromBr(org, includeSubunits, skipCache: true);
}
catch (Exception e)
{
// Logging warning would be flooding the alerts/dashboards a bit too much, it's not really an issue if
// something fails to refresh, this is just a feature to prevent regular calls being too long too often
logger.LogInformation("Failed to refresh organisation cache entry for {org}, exception message: {exMessage}", org, e.Message);
}
}
}
20 changes: 10 additions & 10 deletions src/Dan.Plugin.Tilda/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using System.Linq;
using System.Net.Http;
using System.Reflection;
using System.Threading.Tasks;
using Altinn.ApiClients.Maskinporten.Interfaces;
using Altinn.ApiClients.Maskinporten.Services;
using Azure.Identity;
Expand Down Expand Up @@ -52,21 +53,20 @@
{
option.Configuration = settings.RedisConnectionString;
});
services.AddSingleton<IConnectionMultiplexer>(ConnectionMultiplexer.Connect(settings.RedisConnectionString));
}
else
{
var configurationOptions = ConfigurationOptions
.Parse(settings.RedisConnectionString)
.ConfigureForAzureWithTokenCredentialAsync(credentials)
.GetAwaiter().GetResult();

IConnectionMultiplexer connectionMultiplexer = ConnectionMultiplexer.Connect(configurationOptions);
services.AddSingleton(connectionMultiplexer);
services.AddStackExchangeRedisCache(option =>
{
option.ConnectionMultiplexerFactory = async () =>
{
var configurationOptions = await ConfigurationOptions
.Parse(settings.RedisConnectionString)
.ConfigureForAzureWithTokenCredentialAsync(credentials);

var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync(configurationOptions);

return connectionMultiplexer;
};
option.ConnectionMultiplexerFactory = () => Task.FromResult(connectionMultiplexer);
});
}

Expand Down
58 changes: 32 additions & 26 deletions src/Dan.Plugin.Tilda/Services/BrregService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public interface IBrregService
{
Task<AccountsInformation> GetAnnualTurnoverFromBr(string organizationNumber);

Task<List<BREntityRegisterEntry>> GetFromBr(string organization, bool? includeSubunits);
Task<List<BREntityRegisterEntry>> GetFromBr(string organization, bool? includeSubunits, bool skipCache = false);

Task<List<string>> GetKofuviAddresses(string organizationNumber);
}
Expand Down Expand Up @@ -92,7 +92,7 @@ public async Task<AccountsInformation> GetAnnualTurnoverFromBr(string organizati
return result;
}

public async Task<List<BREntityRegisterEntry>> GetFromBr(string organization, bool? includeSubunits)
public async Task<List<BREntityRegisterEntry>> GetFromBr(string organization, bool? includeSubunits, bool skipCache = false)
{
var result = new List<BREntityRegisterEntry>();

Expand All @@ -110,11 +110,11 @@ public async Task<List<BREntityRegisterEntry>> GetFromBr(string organization, bo

if (includeSubunits == true)
{
result.AddRange(await GetAllUnitsFromBr(organization));
result.AddRange(await GetAllUnitsFromBr(organization, skipCache));
}
else
{
result.Add(await GetOrganizationInfoFromBr(organization));
result.Add(await GetOrganizationInfoFromBr(organization, skipCache));
}

return result;
Expand Down Expand Up @@ -207,7 +207,7 @@ public async Task<List<string>> GetKofuviAddresses(string organizationNumber)
}
}

private async Task<BREntityRegisterEntry> GetOrganizationInfoFromBr(string organizationNumber)
private async Task<BREntityRegisterEntry> GetOrganizationInfoFromBr(string organizationNumber, bool skipCache = false)
{
var mainUnitUrl = $"https://data.brreg.no/enhetsregisteret/api/enheter/{organizationNumber}";
var subUnitUrl = $"https://data.brreg.no/enhetsregisteret/api/underenheter/{organizationNumber}";
Expand All @@ -217,19 +217,22 @@ private async Task<BREntityRegisterEntry> GetOrganizationInfoFromBr(string organ
string rawResult;
try
{
try
if (!skipCache)
{
result = await cache.GetValueAsync<BREntityRegisterEntry>(cacheKey);
if (result is not null)
try
{
return result;
result = await cache.GetValueAsync<BREntityRegisterEntry>(cacheKey);
if (result is not null)
{
return result;
}
}
catch (Exception e)
{
logger.LogWarning("Failed to get GetOrganizationInfoFromBr from cache for {orgNumber}, fetching from source directly. Exception message: {message}",
organizationNumber,
e.Message);
}
}
catch (Exception e)
{
logger.LogWarning("Failed to get GetOrganizationInfoFromBr from cache for {orgNumber}, fetching from source directly. Exception message: {message}",
organizationNumber,
e.Message);
}

var response = await _erClient.GetAsync(mainUnitUrl);
Expand Down Expand Up @@ -270,7 +273,7 @@ private async Task<BREntityRegisterEntry> GetOrganizationInfoFromBr(string organ
return result;
}

private async Task<List<BREntityRegisterEntry>> GetAllUnitsFromBr(string organizationNumber)
private async Task<List<BREntityRegisterEntry>> GetAllUnitsFromBr(string organizationNumber, bool skipCache = false)
{
List<BREntityRegisterEntry> result;
string rawResult;
Expand All @@ -279,19 +282,22 @@ private async Task<List<BREntityRegisterEntry>> GetAllUnitsFromBr(string organiz

try
{
try
if (!skipCache)
{
result = await cache.GetValueAsync<List<BREntityRegisterEntry>>(cacheKey);
if (result is not null)
try
{
return result;
result = await cache.GetValueAsync<List<BREntityRegisterEntry>>(cacheKey);
if (result is not null)
{
return result;
}
}
catch (Exception e)
{
logger.LogWarning("Failed to get GetAllUnitsFromBr from cache for {orgNumber}, fetching from source directly. Exception message: {message}",
organizationNumber,
e.Message);
}
}
catch (Exception e)
{
logger.LogWarning("Failed to get GetAllUnitsFromBr from cache for {orgNumber}, fetching from source directly. Exception message: {message}",
organizationNumber,
e.Message);
}

result = [];
Expand Down
Loading