diff --git a/src/Hangfire.PostgreSql/CountersAggregator.cs b/src/Hangfire.PostgreSql/CountersAggregator.cs index 2474bd08..31f4844b 100644 --- a/src/Hangfire.PostgreSql/CountersAggregator.cs +++ b/src/Hangfire.PostgreSql/CountersAggregator.cs @@ -82,28 +82,26 @@ private string GetAggregationQuery() { string schemaName = _storage.Options.SchemaName; return - $@"BEGIN; + $""" + BEGIN; -INSERT INTO ""{schemaName}"".""aggregatedcounter"" (""key"", ""value"", ""expireat"") - SELECT - ""key"", - SUM(""value""), - MAX(""expireat"") - FROM ""{schemaName}"".""counter"" - GROUP BY - ""key"" - ON CONFLICT(""key"") DO - UPDATE - SET - ""value"" = ""aggregatedcounter"".""value"" + EXCLUDED.""value"", - ""expireat"" = EXCLUDED.""expireat""; + INSERT INTO "{schemaName}"."aggregatedcounter" ("key", "value", "expireat") + SELECT + "key", + SUM("value"), + MAX("expireat") + FROM "{schemaName}"."counter" + GROUP BY "key" + ON CONFLICT("key") DO UPDATE + SET "value" = "aggregatedcounter"."value" + EXCLUDED."value", "expireat" = EXCLUDED."expireat"; + + DELETE FROM "{schemaName}"."counter" + WHERE "key" IN ( + SELECT "key" FROM "{schemaName}"."aggregatedcounter" + ); - DELETE FROM ""{schemaName}"".""counter"" - WHERE - ""key"" IN (SELECT ""key"" FROM ""{schemaName}"".""aggregatedcounter"" ); - - COMMIT; - "; + COMMIT; + """; } } } diff --git a/src/Hangfire.PostgreSql/PostgreSqlMonitoringApi.cs b/src/Hangfire.PostgreSql/PostgreSqlMonitoringApi.cs index 7ea78145..63a5e4c6 100644 --- a/src/Hangfire.PostgreSql/PostgreSqlMonitoringApi.cs +++ b/src/Hangfire.PostgreSql/PostgreSqlMonitoringApi.cs @@ -379,12 +379,25 @@ private Dictionary GetTimelineStats(string type) private Dictionary GetTimelineStats(IDictionary keyMaps) { - string query = $@" - SELECT ""key"", COUNT(""value"") AS ""count"" - FROM ""{_storage.Options.SchemaName}"".""counter"" - WHERE ""key"" = ANY (@Keys) - GROUP BY ""key""; - "; + string query = + $""" + WITH "aggregated_counters" AS ( + SELECT "key", "value" + FROM "{_storage.Options.SchemaName}"."aggregatedcounter" + WHERE "key" = ANY(@Keys) + ), "regular_counters" AS ( + SELECT "key", "value" + FROM "{_storage.Options.SchemaName}"."counter" + WHERE "key" = ANY(@Keys) + ), "all_counters" AS ( + SELECT * FROM "aggregated_counters" + UNION ALL + SELECT * FROM "regular_counters" + ) + SELECT "key", COALESCE(SUM("value"), 0) AS "count" + FROM "all_counters" + GROUP BY "key" + """; Dictionary valuesMap = UseConnection(connection => connection.Query<(string Key, long Count)>(query, new { Keys = keyMaps.Keys.ToList() }) @@ -400,10 +413,10 @@ private Dictionary GetTimelineStats(IDictionary result = new(); - for (int i = 0; i < keyMaps.Count; i++) + foreach (KeyValuePair keyMap in keyMaps) { - long value = valuesMap[keyMaps.ElementAt(i).Key]; - result.Add(keyMaps.ElementAt(i).Value, value); + long value = valuesMap[keyMap.Key]; + result.Add(keyMap.Value, value); } return result; diff --git a/tests/Hangfire.PostgreSql.Tests/CountersAggregatorFacts.cs b/tests/Hangfire.PostgreSql.Tests/CountersAggregatorFacts.cs new file mode 100644 index 00000000..53402025 --- /dev/null +++ b/tests/Hangfire.PostgreSql.Tests/CountersAggregatorFacts.cs @@ -0,0 +1,89 @@ +using System; +using System.Threading; +using Dapper; +using Hangfire.PostgreSql.Tests.Utils; +using Npgsql; +using Xunit; + +namespace Hangfire.PostgreSql.Tests; + +public class CountersAggregatorFacts : IClassFixture +{ + private static readonly string _schemaName = ConnectionUtils.GetSchemaName(); + + private readonly CancellationToken _token; + private readonly PostgreSqlStorageFixture _fixture; + + public CountersAggregatorFacts(PostgreSqlStorageFixture fixture) + { + CancellationTokenSource cts = new(); + _token = cts.Token; + _fixture = fixture; + _fixture.SetupOptions(o => o.CountersAggregateInterval = TimeSpan.FromMinutes(5)); + } + + [Fact] + [CleanDatabase] + public void Execute_AggregatesCounters() + { + UseConnection((connection, manager) => { + CreateEntry(1); + CreateEntry(5); + CreateEntry(15); + CreateEntry(5, "key2"); + CreateEntry(10, "key2"); + + manager.Execute(_token); + + Assert.Equal(21, GetAggregatedCounters(connection)); + Assert.Equal(15, GetAggregatedCounters(connection, "key2")); + Assert.Null(GetRegularCounters(connection)); + Assert.Null(GetRegularCounters(connection, "key2")); + return; + + void CreateEntry(long value, string key = "key") + { + CreateCounterEntry(connection, value, key); + } + }); + } + + private void UseConnection(Action action) + { + PostgreSqlStorage storage = _fixture.SafeInit(); + CountersAggregator aggregator = new(storage, TimeSpan.Zero); + action(storage.CreateAndOpenConnection(), aggregator); + } + + private static void CreateCounterEntry(NpgsqlConnection connection, long? value, string key = "key") + { + value ??= 1; + string insertSql = + $""" + INSERT INTO "{_schemaName}"."counter"("key", "value", "expireat") + VALUES (@Key, @Value, null) + """; + + connection.Execute(insertSql, new { Key = key, Value = value }); + } + + private static long GetAggregatedCounters(NpgsqlConnection connection, string key = "key") + { + return connection.QuerySingle( + $""" + SELECT "value" + FROM {_schemaName}."aggregatedcounter" + WHERE "key" = @Key + """, new { Key = key }); + } + + private static long? GetRegularCounters(NpgsqlConnection connection, string key = "key") + { + return connection.QuerySingle( + $""" + SELECT SUM("value") + FROM {_schemaName}."counter" + WHERE "key" = @Key + """, new { Key = key }); + } +} diff --git a/tests/Hangfire.PostgreSql.Tests/PostgreSqlMonitoringApiFacts.cs b/tests/Hangfire.PostgreSql.Tests/PostgreSqlMonitoringApiFacts.cs index f5258b21..4f6f217a 100644 --- a/tests/Hangfire.PostgreSql.Tests/PostgreSqlMonitoringApiFacts.cs +++ b/tests/Hangfire.PostgreSql.Tests/PostgreSqlMonitoringApiFacts.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Globalization; +using System.Linq; using Dapper; using Hangfire.Common; using Hangfire.PostgreSql.Tests.Utils; @@ -58,6 +59,34 @@ public void GetJobs_MixedCasing_ReturnsJob() }); } + [Fact] + [CleanDatabase] + public void HourlySucceededJobs_ReturnsAggregatedStats() + { + DateTime now = DateTime.UtcNow; + string schemaName = ConnectionUtils.GetSchemaName(); + string key = $"stats:succeeded:{now.ToString("yyyy-MM-dd-HH", CultureInfo.InvariantCulture)}"; + string arrangeSql = + $""" + BEGIN; + INSERT INTO "{schemaName}"."counter"("key", "value") + VALUES (@Key, 5); + INSERT INTO "{schemaName}"."aggregatedcounter"("key", "value") + VALUES (@Key, 7); + COMMIT; + """; + UseConnection(connection => { + connection.Execute(arrangeSql, new { Key = key }); + + IMonitoringApi monitoringApi = _fixture.Storage.GetMonitoringApi(); + IDictionary stats = monitoringApi.HourlySucceededJobs(); + Assert.Equal(24, stats.Count); + + long actualCounter = Assert.Single(stats.Where(x => x.Key.Hour == now.Hour).Select(x => x.Value)); + Assert.Equal(12, actualCounter); + }); + } + private void UseConnection(Action action) { PostgreSqlStorage storage = _fixture.SafeInit(); diff --git a/tests/Hangfire.PostgreSql.Tests/Scripts/Clean.sql b/tests/Hangfire.PostgreSql.Tests/Scripts/Clean.sql index 68c38dc6..8efe8ea0 100644 --- a/tests/Hangfire.PostgreSql.Tests/Scripts/Clean.sql +++ b/tests/Hangfire.PostgreSql.Tests/Scripts/Clean.sql @@ -1,5 +1,6 @@ SET search_path = 'hangfire'; +DELETE FROM hangfire."aggregatedcounter"; DELETE FROM hangfire."counter"; DELETE FROM hangfire."hash"; DELETE FROM hangfire."job";