Skip to content

Commit

Permalink
Merge pull request #376 from hangfire-postgres/features/333-aggregate…
Browse files Browse the repository at this point in the history
…d-stats

Read aggregated counter table too for hourly stats
  • Loading branch information
azygis authored Oct 14, 2024
2 parents e69f90d + 93324ec commit 6966c4e
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 29 deletions.
38 changes: 18 additions & 20 deletions src/Hangfire.PostgreSql/CountersAggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,28 +82,26 @@ private string GetAggregationQuery()
{
string schemaName = _storage.Options.SchemaName;
return
$@"BEGIN;
$"""
BEGIN;
INSERT INTO ""{schemaName}"".""aggregatedcounter"" (""key"", ""value"", ""expireat"")
SELECT
""key"",
SUM(""value""),
MAX(""expireat"")
FROM ""{schemaName}"".""counter""
GROUP BY
""key""
ON CONFLICT(""key"") DO
UPDATE
SET
""value"" = ""aggregatedcounter"".""value"" + EXCLUDED.""value"",
""expireat"" = EXCLUDED.""expireat"";
INSERT INTO "{schemaName}"."aggregatedcounter" ("key", "value", "expireat")
SELECT
"key",
SUM("value"),
MAX("expireat")
FROM "{schemaName}"."counter"
GROUP BY "key"
ON CONFLICT("key") DO UPDATE
SET "value" = "aggregatedcounter"."value" + EXCLUDED."value", "expireat" = EXCLUDED."expireat";
DELETE FROM "{schemaName}"."counter"
WHERE "key" IN (
SELECT "key" FROM "{schemaName}"."aggregatedcounter"
);
DELETE FROM ""{schemaName}"".""counter""
WHERE
""key"" IN (SELECT ""key"" FROM ""{schemaName}"".""aggregatedcounter"" );
COMMIT;
";
COMMIT;
""";
}
}
}
31 changes: 22 additions & 9 deletions src/Hangfire.PostgreSql/PostgreSqlMonitoringApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -379,12 +379,25 @@ private Dictionary<DateTime, long> GetTimelineStats(string type)

private Dictionary<DateTime, long> GetTimelineStats(IDictionary<string, DateTime> keyMaps)
{
string query = $@"
SELECT ""key"", COUNT(""value"") AS ""count""
FROM ""{_storage.Options.SchemaName}"".""counter""
WHERE ""key"" = ANY (@Keys)
GROUP BY ""key"";
";
string query =
$"""
WITH "aggregated_counters" AS (
SELECT "key", "value"
FROM "{_storage.Options.SchemaName}"."aggregatedcounter"
WHERE "key" = ANY(@Keys)
), "regular_counters" AS (
SELECT "key", "value"
FROM "{_storage.Options.SchemaName}"."counter"
WHERE "key" = ANY(@Keys)
), "all_counters" AS (
SELECT * FROM "aggregated_counters"
UNION ALL
SELECT * FROM "regular_counters"
)
SELECT "key", COALESCE(SUM("value"), 0) AS "count"
FROM "all_counters"
GROUP BY "key"
""";

Dictionary<string, long> valuesMap = UseConnection(connection => connection.Query<(string Key, long Count)>(query,
new { Keys = keyMaps.Keys.ToList() })
Expand All @@ -400,10 +413,10 @@ private Dictionary<DateTime, long> GetTimelineStats(IDictionary<string, DateTime
}

Dictionary<DateTime, long> result = new();
for (int i = 0; i < keyMaps.Count; i++)
foreach (KeyValuePair<string, DateTime> keyMap in keyMaps)
{
long value = valuesMap[keyMaps.ElementAt(i).Key];
result.Add(keyMaps.ElementAt(i).Value, value);
long value = valuesMap[keyMap.Key];
result.Add(keyMap.Value, value);
}

return result;
Expand Down
89 changes: 89 additions & 0 deletions tests/Hangfire.PostgreSql.Tests/CountersAggregatorFacts.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
using System;
using System.Threading;
using Dapper;
using Hangfire.PostgreSql.Tests.Utils;
using Npgsql;
using Xunit;

namespace Hangfire.PostgreSql.Tests;

public class CountersAggregatorFacts : IClassFixture<PostgreSqlStorageFixture>
{
private static readonly string _schemaName = ConnectionUtils.GetSchemaName();

private readonly CancellationToken _token;
private readonly PostgreSqlStorageFixture _fixture;

public CountersAggregatorFacts(PostgreSqlStorageFixture fixture)
{
CancellationTokenSource cts = new();
_token = cts.Token;
_fixture = fixture;
_fixture.SetupOptions(o => o.CountersAggregateInterval = TimeSpan.FromMinutes(5));
}

[Fact]
[CleanDatabase]
public void Execute_AggregatesCounters()
{
UseConnection((connection, manager) => {
CreateEntry(1);
CreateEntry(5);
CreateEntry(15);
CreateEntry(5, "key2");
CreateEntry(10, "key2");

manager.Execute(_token);

Assert.Equal(21, GetAggregatedCounters(connection));
Assert.Equal(15, GetAggregatedCounters(connection, "key2"));
Assert.Null(GetRegularCounters(connection));
Assert.Null(GetRegularCounters(connection, "key2"));
return;

void CreateEntry(long value, string key = "key")
{
CreateCounterEntry(connection, value, key);
}
});
}

private void UseConnection(Action<NpgsqlConnection, CountersAggregator> action)
{
PostgreSqlStorage storage = _fixture.SafeInit();
CountersAggregator aggregator = new(storage, TimeSpan.Zero);
action(storage.CreateAndOpenConnection(), aggregator);
}

private static void CreateCounterEntry(NpgsqlConnection connection, long? value, string key = "key")
{
value ??= 1;
string insertSql =
$"""
INSERT INTO "{_schemaName}"."counter"("key", "value", "expireat")
VALUES (@Key, @Value, null)
""";

connection.Execute(insertSql, new { Key = key, Value = value });
}

private static long GetAggregatedCounters(NpgsqlConnection connection, string key = "key")
{
return connection.QuerySingle<long>(
$"""
SELECT "value"
FROM {_schemaName}."aggregatedcounter"
WHERE "key" = @Key
""", new { Key = key });
}

private static long? GetRegularCounters(NpgsqlConnection connection, string key = "key")
{
return connection.QuerySingle<long?>(
$"""
SELECT SUM("value")
FROM {_schemaName}."counter"
WHERE "key" = @Key
""", new { Key = key });
}
}
29 changes: 29 additions & 0 deletions tests/Hangfire.PostgreSql.Tests/PostgreSqlMonitoringApiFacts.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using Dapper;
using Hangfire.Common;
using Hangfire.PostgreSql.Tests.Utils;
Expand Down Expand Up @@ -58,6 +59,34 @@ public void GetJobs_MixedCasing_ReturnsJob()
});
}

[Fact]
[CleanDatabase]
public void HourlySucceededJobs_ReturnsAggregatedStats()
{
DateTime now = DateTime.UtcNow;
string schemaName = ConnectionUtils.GetSchemaName();
string key = $"stats:succeeded:{now.ToString("yyyy-MM-dd-HH", CultureInfo.InvariantCulture)}";
string arrangeSql =
$"""
BEGIN;
INSERT INTO "{schemaName}"."counter"("key", "value")
VALUES (@Key, 5);
INSERT INTO "{schemaName}"."aggregatedcounter"("key", "value")
VALUES (@Key, 7);
COMMIT;
""";
UseConnection(connection => {
connection.Execute(arrangeSql, new { Key = key });

IMonitoringApi monitoringApi = _fixture.Storage.GetMonitoringApi();
IDictionary<DateTime, long> stats = monitoringApi.HourlySucceededJobs();
Assert.Equal(24, stats.Count);

long actualCounter = Assert.Single(stats.Where(x => x.Key.Hour == now.Hour).Select(x => x.Value));
Assert.Equal(12, actualCounter);
});
}

private void UseConnection(Action<NpgsqlConnection> action)
{
PostgreSqlStorage storage = _fixture.SafeInit();
Expand Down
1 change: 1 addition & 0 deletions tests/Hangfire.PostgreSql.Tests/Scripts/Clean.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
SET search_path = 'hangfire';

DELETE FROM hangfire."aggregatedcounter";
DELETE FROM hangfire."counter";
DELETE FROM hangfire."hash";
DELETE FROM hangfire."job";
Expand Down

0 comments on commit 6966c4e

Please sign in to comment.