diff --git a/LICENSE.md b/LICENSE.md index 7afb1a4af..6e6e3d749 100644 --- a/LICENSE.md +++ b/LICENSE.md @@ -1,7 +1,7 @@ License ======== -Copyright © 2022 Hangfire OÜ. +Copyright © 2023 Hangfire OÜ. Hangfire software is an open-source software that is multi-licensed under the terms of the licenses listed in this file. Recipients may choose the terms under which they are want to use or distribute the software, when all the preconditions of a chosen license are satisfied. diff --git a/README.md b/README.md index 3016535c3..ec8849396 100644 --- a/README.md +++ b/README.md @@ -167,7 +167,7 @@ In order to give the community time to respond and upgrade we strongly urge you License -------- -Copyright © 2022 Hangfire OÜ. +Copyright © 2023 Hangfire OÜ. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by diff --git a/nuspecs/Hangfire.Core.nuspec b/nuspecs/Hangfire.Core.nuspec index d4fbabe61..a7ac5633a 100644 --- a/nuspecs/Hangfire.Core.nuspec +++ b/nuspecs/Hangfire.Core.nuspec @@ -16,7 +16,7 @@ An easy and reliable way to perform fire-and-forget, delayed and recurring, long Backed by Redis, SQL Server, SQL Azure or MSMQ. This is a .NET alternative to Sidekiq, Resque and Celery. https://www.hangfire.io/ - Copyright © 2013-2022 Hangfire OÜ + Copyright © 2013-2023 Hangfire OÜ Hangfire OWIN Long-Running Background Fire-And-Forget Delayed Recurring Tasks Jobs Scheduler Threading Queues https://www.hangfire.io/blog/ diff --git a/nuspecs/Hangfire.NetCore.nuspec b/nuspecs/Hangfire.NetCore.nuspec index 8fd663ac6..da399b333 100644 --- a/nuspecs/Hangfire.NetCore.nuspec +++ b/nuspecs/Hangfire.NetCore.nuspec @@ -11,7 +11,7 @@ false https://raw.github.com/HangfireIO/Hangfire/master/LICENSE.md .NET Core's Worker Service host support for Hangfire (background job system for ASP.NET applications). - Copyright © 2019-2022 Hangfire OÜ + Copyright © 2019-2023 Hangfire OÜ hangfire netcore https://www.hangfire.io/blog/ diff --git a/nuspecs/Hangfire.SqlServer.MSMQ.nuspec b/nuspecs/Hangfire.SqlServer.MSMQ.nuspec index f1d27f763..9037de09a 100644 --- a/nuspecs/Hangfire.SqlServer.MSMQ.nuspec +++ b/nuspecs/Hangfire.SqlServer.MSMQ.nuspec @@ -11,7 +11,7 @@ false https://raw.github.com/HangfireIO/Hangfire/master/LICENSE.md MSMQ queues support for SQL Server job storage implementation for Hangfire (background job system for ASP.NET applications). - Copyright © 2014-2022 Hangfire OÜ + Copyright © 2014-2023 Hangfire OÜ Hangfire SqlServer MSMQ https://www.hangfire.io/blog/ diff --git a/src/Hangfire.Core/Common/CancellationTokenExtentions.cs b/src/Hangfire.Core/Common/CancellationTokenExtentions.cs index 2e8adbeaf..4d6c9bfed 100644 --- a/src/Hangfire.Core/Common/CancellationTokenExtentions.cs +++ b/src/Hangfire.Core/Common/CancellationTokenExtentions.cs @@ -22,8 +22,6 @@ namespace Hangfire.Common { public static class CancellationTokenExtentions { - private static readonly ILog Logger = LogProvider.GetLogger(typeof(CancellationTokenExtentions)); - /// /// Returns a class that contains a that is set, when /// the given is canceled. This method is based @@ -72,8 +70,15 @@ public static bool Wait(this CancellationToken cancellationToken, TimeSpan timeo timeout >= timeoutThreshold && stopwatch.Elapsed < elapsedThreshold) { - Logger.Error($"Actual wait time for non-canceled token was '{stopwatch.Elapsed}' instead of '{timeout}', wait result: {waitResult}, using protective wait. Please report this to Hangfire developers."); - Thread.Sleep(protectionTime); + try + { + var logger = LogProvider.GetLogger(typeof(CancellationTokenExtentions)); + logger.Error($"Actual wait time for non-canceled token was '{stopwatch.Elapsed}' instead of '{timeout}', wait result: {waitResult}, using protective wait. Please report this to Hangfire developers."); + } + finally + { + Thread.Sleep(protectionTime); + } } return waitResult; diff --git a/src/Hangfire.Core/Processing/TaskExtensions.cs b/src/Hangfire.Core/Processing/TaskExtensions.cs index bc4b8a939..181f116eb 100644 --- a/src/Hangfire.Core/Processing/TaskExtensions.cs +++ b/src/Hangfire.Core/Processing/TaskExtensions.cs @@ -26,7 +26,6 @@ namespace Hangfire.Processing { internal static class TaskExtensions { - private static readonly ILog Logger = LogProvider.GetLogger(typeof(TaskExtensions)); private static readonly Type[] EmptyTypes = new Type[0]; private static readonly WaitHandle InvalidWaitHandleInstance = new InvalidWaitHandle(); @@ -58,8 +57,15 @@ public static bool WaitOne([NotNull] this WaitHandle waitHandle, TimeSpan timeou timeout >= timeoutThreshold && stopwatch.Elapsed < elapsedThreshold) { - Logger.Error($"Actual wait time for non-canceled token was '{stopwatch.Elapsed}' instead of '{timeout}', wait result: {waitResult}, using protective wait. Please report this to Hangfire developers."); - Thread.Sleep(protectionTime); + try + { + var logger = LogProvider.GetLogger(typeof(TaskExtensions)); + logger.Error($"Actual wait time for non-canceled token was '{stopwatch.Elapsed}' instead of '{timeout}', wait result: {waitResult}, using protective wait. Please report this to Hangfire developers."); + } + finally + { + Thread.Sleep(protectionTime); + } } token.ThrowIfCancellationRequested(); diff --git a/src/Hangfire.Core/Server/AspNetShutdownDetector.cs b/src/Hangfire.Core/Server/AspNetShutdownDetector.cs index c61bf20a9..657ad9a29 100644 --- a/src/Hangfire.Core/Server/AspNetShutdownDetector.cs +++ b/src/Hangfire.Core/Server/AspNetShutdownDetector.cs @@ -23,7 +23,6 @@ namespace Hangfire.Server { internal static class AspNetShutdownDetector { - private static readonly ILog Logger = LogProvider.GetLogger(typeof(AspNetShutdownDetector)); private static readonly TimeSpan CheckForShutdownTimerInterval = TimeSpan.FromMilliseconds(250); private static readonly CancellationTokenSource CancellationTokenSource = new CancellationTokenSource(); @@ -121,7 +120,7 @@ private static void EnsureInitialized() } catch (Exception ex) when (ex.IsCatchableExceptionType()) { - Logger.ErrorException("Failed to initialize shutdown triggers for ASP.NET application.", ex); + GetLogger().ErrorException("Failed to initialize shutdown triggers for ASP.NET application.", ex); } } @@ -155,7 +154,7 @@ private static void CheckForAppDomainShutdown(object state) } catch (Exception ex) when (ex.IsCatchableExceptionType()) { - Logger.ErrorException( + GetLogger().ErrorException( "An exception occurred while checking for ASP.NET shutdown, will not able to do the checks properly.", ex); } @@ -163,7 +162,7 @@ private static void CheckForAppDomainShutdown(object state) private static void Cancel(string reason) { - Logger.Info($"ASP.NET application is shutting down: {reason}."); + GetLogger().Info($"ASP.NET application is shutting down: {reason}."); try { @@ -174,7 +173,7 @@ private static void Cancel(string reason) } catch (AggregateException ag) { - Logger.ErrorException("One or more exceptions were thrown during app pool shutdown: ", ag); + GetLogger().ErrorException("One or more exceptions were thrown during app pool shutdown: ", ag); } } @@ -189,12 +188,12 @@ private static void RegisterForStopListeningEvent(ref bool success) if (stopEvent == null) return; stopEvent.AddEventHandler(null, new EventHandler(StopListening)); - Logger.Debug("HostingEnvironment.StopListening shutdown trigger initialized successfully."); + GetLogger().Debug("HostingEnvironment.StopListening shutdown trigger initialized successfully."); success = true; } catch (Exception ex) when (ex.IsCatchableExceptionType()) { - Logger.DebugException("Unable to initialize HostingEnvironment.StopListening shutdown trigger", ex); + GetLogger().DebugException("Unable to initialize HostingEnvironment.StopListening shutdown trigger", ex); } } @@ -215,7 +214,7 @@ private static void InitializeShutdownReason(ref bool success) _shutdownReasonFunc = ShutdownReasonFunc; - Logger.Debug("HostingEnvironment.ShutdownReason shutdown trigger initialized successfully."); + GetLogger().Debug("HostingEnvironment.ShutdownReason shutdown trigger initialized successfully."); success = true; string ShutdownReasonFunc() @@ -231,7 +230,7 @@ string ShutdownReasonFunc() } catch (Exception ex) when (ex.IsCatchableExceptionType()) { - Logger.TraceException("Unable to call the HostingEnvironment.ShutdownReason property due to an exception.", ex); + GetLogger().TraceException("Unable to call the HostingEnvironment.ShutdownReason property due to an exception.", ex); } return null; @@ -239,7 +238,7 @@ string ShutdownReasonFunc() } catch (Exception ex) when (ex.IsCatchableExceptionType()) { - Logger.DebugException("Unable to initialize HostingEnvironment.ShutdownReason shutdown trigger", ex); + GetLogger().DebugException("Unable to initialize HostingEnvironment.ShutdownReason shutdown trigger", ex); } } @@ -255,12 +254,12 @@ private static void InitializeMgdHasConfigChanged(ref bool success) _checkConfigChangedFunc = (Func)Delegate.CreateDelegate(typeof(Func), methodInfo); - Logger.Debug("UnsafeIISMethods.MgdHasConfigChanged shutdown trigger initialized successfully."); + GetLogger().Debug("UnsafeIISMethods.MgdHasConfigChanged shutdown trigger initialized successfully."); success = true; } catch (Exception ex) when (ex.IsCatchableExceptionType()) { - Logger.DebugException("Unable to initialize UnsafeIISMethods.MgdHasConfigChanged shutdown trigger", ex); + GetLogger().DebugException("Unable to initialize UnsafeIISMethods.MgdHasConfigChanged shutdown trigger", ex); } } @@ -282,12 +281,12 @@ private static void InitializeDisposingHttpRuntime(ref bool success) _disposingHttpRuntime = () => disposingHttpRuntime(theRuntime()); - Logger.Debug("HttpRuntime._disposingHttpRuntime shutdown trigger initialized successfully."); + GetLogger().Debug("HttpRuntime._disposingHttpRuntime shutdown trigger initialized successfully."); success = true; } catch (Exception ex) { - Logger.DebugException("Unable to initialize HttpRuntime._disposingHttpRuntime shutdown trigger", ex); + GetLogger().DebugException("Unable to initialize HttpRuntime._disposingHttpRuntime shutdown trigger", ex); } } @@ -305,5 +304,10 @@ private static Func CreateGetFieldDelegate(FieldInfo fieldInfo, Ty return Expression.Lambda>(fieldExp, instExp).Compile(); } #endif + + private static ILog GetLogger() + { + return LogProvider.GetLogger(typeof(AspNetShutdownDetector)); + } } } \ No newline at end of file diff --git a/src/Hangfire.NetCore/AspNetCore/AspNetCoreJobActivatorScope.cs b/src/Hangfire.NetCore/AspNetCore/AspNetCoreJobActivatorScope.cs index 5cb179353..286165a31 100644 --- a/src/Hangfire.NetCore/AspNetCore/AspNetCoreJobActivatorScope.cs +++ b/src/Hangfire.NetCore/AspNetCore/AspNetCoreJobActivatorScope.cs @@ -36,6 +36,16 @@ public override object Resolve(Type type) public override void DisposeScope() { +#if NETCOREAPP3_0_OR_GREATER || NETSTANDARD2_1 + if (_serviceScope is IAsyncDisposable asyncDisposable) + { + // Service scope disposal is triggered inside a dedicated background thread, + // while Task result is being set in CLR's Thread Pool, so no deadlocks on + // wait should happen. + asyncDisposable.DisposeAsync().ConfigureAwait(false).GetAwaiter().GetResult(); + return; + } +#endif _serviceScope.Dispose(); } } diff --git a/src/Hangfire.SqlServer/SqlServerConnection.cs b/src/Hangfire.SqlServer/SqlServerConnection.cs index af68a6798..02dc987d6 100644 --- a/src/Hangfire.SqlServer/SqlServerConnection.cs +++ b/src/Hangfire.SqlServer/SqlServerConnection.cs @@ -472,11 +472,11 @@ public override void AnnounceServer(string serverId, ServerContext context) { connection.Execute( $@";merge [{_storage.SchemaName}].Server with (holdlock) as Target -using (VALUES (@id, @data, @heartbeat)) as Source (Id, Data, Heartbeat) +using (VALUES (@id, @data, sysutcdatetime())) as Source (Id, Data, Heartbeat) on Target.Id = Source.Id when matched then update set Data = Source.Data, LastHeartbeat = Source.Heartbeat when not matched then insert (Id, Data, LastHeartbeat) values (Source.Id, Source.Data, Source.Heartbeat);", - new { id = serverId, data = SerializationHelper.Serialize(data), heartbeat = DateTime.UtcNow }, + new { id = serverId, data = SerializationHelper.Serialize(data) }, commandTimeout: _storage.CommandTimeout); }); } @@ -501,8 +501,8 @@ public override void Heartbeat(string serverId) _storage.UseConnection(_dedicatedConnection, connection => { var affected = connection.Execute( - $@"update [{_storage.SchemaName}].Server set LastHeartbeat = @now where Id = @id", - new { now = DateTime.UtcNow, id = serverId }, + $@"update [{_storage.SchemaName}].Server set LastHeartbeat = sysutcdatetime() where Id = @id", + new { id = serverId }, commandTimeout: _storage.CommandTimeout); if (affected == 0) @@ -520,8 +520,8 @@ public override int RemoveTimedOutServers(TimeSpan timeOut) } return _storage.UseConnection(_dedicatedConnection, connection => connection.Execute( - $@"delete s from [{_storage.SchemaName}].Server s with (readpast, readcommitted) where LastHeartbeat < @timeOutAt", - new { timeOutAt = DateTime.UtcNow.Add(timeOut.Negate()) }, + $@"delete s from [{_storage.SchemaName}].Server s with (readpast, readcommitted) where LastHeartbeat < dateadd(ms, @timeoutMsNeg, sysutcdatetime())", + new { timeoutMsNeg = timeOut.Negate().TotalMilliseconds }, commandTimeout: _storage.CommandTimeout)); } diff --git a/src/Hangfire.SqlServer/SqlServerJobQueue.cs b/src/Hangfire.SqlServer/SqlServerJobQueue.cs index ac4e41fc8..5fc5433ba 100644 --- a/src/Hangfire.SqlServer/SqlServerJobQueue.cs +++ b/src/Hangfire.SqlServer/SqlServerJobQueue.cs @@ -39,7 +39,7 @@ internal class SqlServerJobQueue : IPersistentJobQueue private static readonly TimeSpan LongPollingThreshold = TimeSpan.FromSeconds(1); private static readonly int PollingQuantumMs = 1000; private static readonly int DefaultPollingDelayMs = 200; - private static readonly int MinPollingDelayMs = 50; + private static readonly int MinPollingDelayMs = 100; private static readonly DynamicMutex> DynamicMutex = new DynamicMutex>(); @@ -91,83 +91,87 @@ private SqlServerTimeoutJob DequeueUsingSlidingInvisibilityTimeout(string[] queu { if (queues == null) throw new ArgumentNullException(nameof(queues)); if (queues.Length == 0) throw new ArgumentException("Queue array must be non-empty.", nameof(queues)); + + cancellationToken.ThrowIfCancellationRequested(); + + // First we will check if our queues has any background jobs in it and + // return if any. In this case we don't need any additional logic like + // semaphores or waiting. + var fetchedJob = FetchJob(queues); + if (fetchedJob != null) return fetchedJob; + + // Then we determine whether we should use the long polling feature, + // where only a single worker acquires a semaphore for each queue set + // to avoid excessive load on a database. + var configuredPollInterval = _options.QueuePollInterval; + var useLongPolling = configuredPollInterval < LongPollingThreshold; + + // Then we determine a delay between attempts. For long-polling we use constrained + // sub-second intervals within the [MinPollingDelayMs, PollingQuantumMs] interval. + // For regular polling we just use the interval defined in the QueuePollInterval + // option. + var pollingDelayMs = useLongPolling + ? TimeSpan.FromMilliseconds( + Math.Min( + Math.Max( + configuredPollInterval == TimeSpan.Zero ? DefaultPollingDelayMs : (int)configuredPollInterval.TotalMilliseconds, + MinPollingDelayMs), + PollingQuantumMs)) + : configuredPollInterval; - var useLongPolling = false; var queuesString = String.Join("_", queues.OrderBy(x => x)); var resource = Tuple.Create(_storage, queuesString); - var pollingDelayMs = _options.QueuePollInterval > TimeSpan.Zero - ? (int) _options.QueuePollInterval.TotalMilliseconds - : DefaultPollingDelayMs; - - pollingDelayMs = Math.Min( - Math.Max(pollingDelayMs, MinPollingDelayMs), - PollingQuantumMs); - - SqlServerTimeoutJob fetched = null; - using (var cancellationEvent = cancellationToken.GetCancellationEvent()) { - while (!cancellationToken.IsCancellationRequested) - { - var acquired = false; - - try - { - if (useLongPolling) DynamicMutex.Wait(resource, cancellationToken, out acquired); - - fetched = _storage.UseConnection(null, connection => - { - var parameters = new DynamicParameters(); - parameters.Add("@queues", queues); - parameters.Add("@timeoutSs", (int)_options.SlidingInvisibilityTimeout.Value.Negate().TotalSeconds); - parameters.Add("@delayMs", pollingDelayMs); - parameters.Add("@endMs", PollingQuantumMs); - - var query = useLongPolling ? GetBlockingFetchSql() : GetNonBlockingFetchSql(); - - using (var reader = connection.QueryMultiple(query, parameters, commandTimeout: _storage.CommandTimeout)) - { - while (!reader.IsConsumed) - { - var fetchedJob = reader.Read().SingleOrDefault(x => x != null); - if (fetchedJob != null) - { - return new SqlServerTimeoutJob(_storage, fetchedJob.Id, fetchedJob.JobId.ToString(CultureInfo.InvariantCulture), fetchedJob.Queue, fetchedJob.FetchedAt.Value); - } - } - } - - return null; - }); - } - finally - { - if (acquired) - { - DynamicMutex.Release(resource); - } - } + var waitArray = new WaitHandle[] { cancellationEvent.WaitHandle, NewItemInQueueEvent }; + var acquired = false; - if (fetched != null) - { - break; - } + try + { + if (useLongPolling) DynamicMutex.Wait(resource, cancellationToken, out acquired); - if (_options.QueuePollInterval < LongPollingThreshold) - { - useLongPolling = true; - } - else + while (!cancellationToken.IsCancellationRequested) { - WaitHandle.WaitAny(new WaitHandle[] { cancellationEvent.WaitHandle, NewItemInQueueEvent }, _options.QueuePollInterval); + // For non-first attempts we just trying again and again with + // the determined delay between attempts, until shutdown + // request is received. + fetchedJob = FetchJob(queues); + if (fetchedJob != null) return fetchedJob; + + WaitHandle.WaitAny(waitArray, pollingDelayMs); + cancellationToken.ThrowIfCancellationRequested(); } } + finally + { + if (acquired) DynamicMutex.Release(resource); + } cancellationToken.ThrowIfCancellationRequested(); + return null; } + } - return fetched; + private SqlServerTimeoutJob FetchJob(string[] queues) + { + return _storage.UseConnection(null, connection => + { + var parameters = new DynamicParameters(); + parameters.Add("@queues", queues); + parameters.Add("@timeoutSs", (int)_options.SlidingInvisibilityTimeout.Value.Negate().TotalSeconds); + + var fetchedJob = connection + .Query( + GetNonBlockingFetchSql(), + parameters, + commandTimeout: _storage.CommandTimeout) + .SingleOrDefault(); + + return fetchedJob != null + ? new SqlServerTimeoutJob(_storage, fetchedJob.Id, fetchedJob.JobId.ToString(CultureInfo.InvariantCulture), fetchedJob.Queue, fetchedJob.FetchedAt.Value) + : null; + }); } private string GetNonBlockingFetchSql() @@ -183,26 +187,6 @@ where Queue in @queues and (FetchedAt is null or FetchedAt < DATEADD(second, @timeoutSs, GETUTCDATE()));"; } - private string GetBlockingFetchSql() - { - return $@" -set nocount on;set xact_abort on;set tran isolation level read committed; - -declare @end datetime2 = DATEADD(ms, @endMs, SYSUTCDATETIME()), - @delay datetime = DATEADD(ms, @delayMs, convert(DATETIME, 0)); - -WHILE (SYSUTCDATETIME() < @end) -BEGIN - update top (1) JQ set FetchedAt = GETUTCDATE() - output INSERTED.Id, INSERTED.JobId, INSERTED.Queue, INSERTED.FetchedAt - from [{_storage.SchemaName}].JobQueue JQ with (forceseek, readpast, updlock, rowlock) - where Queue in @queues and (FetchedAt is null or FetchedAt < DATEADD(second, @timeoutSs, GETUTCDATE())); - - IF @@ROWCOUNT > 0 RETURN; - WAITFOR DELAY @delay; -END"; - } - private SqlServerTransactionJob DequeueUsingTransaction(string[] queues, CancellationToken cancellationToken) { FetchedJob fetchedJob = null; diff --git a/tests/Hangfire.SqlServer.Tests/SqlServerConnectionFacts.cs b/tests/Hangfire.SqlServer.Tests/SqlServerConnectionFacts.cs index 45c3dcb00..def729323 100644 --- a/tests/Hangfire.SqlServer.Tests/SqlServerConnectionFacts.cs +++ b/tests/Hangfire.SqlServer.Tests/SqlServerConnectionFacts.cs @@ -1196,6 +1196,8 @@ public void AnnounceServer_CreatesOrUpdatesARecord(bool useMicrosoftDataSqlClien "{\"WorkerCount\":4,\"Queues\":[\"critical\",\"default\"],\"StartedAt\":"), server.Data); Assert.NotNull(server.LastHeartbeat); + Assert.True(DateTime.UtcNow.AddHours(-1) < server.LastHeartbeat && + server.LastHeartbeat < DateTime.UtcNow.AddHours(1)); var context2 = new ServerContext { @@ -1269,6 +1271,9 @@ public void Heartbeat_UpdatesLastHeartbeat_OfTheServerWithGivenId(bool useMicros Assert.NotEqual(2012, servers["server1"].Year); Assert.Equal(2012, servers["server2"].Year); + + Assert.True(DateTime.UtcNow.AddHours(-1) < servers["server1"] && + servers["server1"] < DateTime.UtcNow.AddHours(1)); }, useBatching: false, useMicrosoftDataSqlClient); }