Queueing Feature Documentation¶
Process single-consumer work items through in-process or durable queue brokers with retained-message inspection, retry/archive controls, and queue/type pause-resume control.
Overview¶
Queueing provides an application-level abstraction for background work that must be processed by exactly one logical handler per queued message type. It complements Messaging rather than replacing it:
- Messaging is for pub/sub fan-out where multiple handlers may react to the same event.
- Queueing is for work dispatch where one queued item must be processed once by one handler when a compatible handler is available.
The current queueing implementation ships with several brokers:
InProcessQueueBrokerfor local, process-bound work distribution and tests.EntityFrameworkQueueBroker<TContext>for durable SQL-backed processing with renewable leases and runtime-safe competing consumers.RabbitMQQueueBrokerfor broker-backed durable queue processing with manual acknowledgement, retry, and dead-letter semantics.ServiceBusQueueBrokerfor Azure Service Bus queue transport with manual complete/abandon/dead-letter semantics.AzureQueueStorageQueueBrokerfor Azure Queue Storage transport with visibility timeout, polling-based consumption, and retry/dead-letter semantics.
The feature also includes an operational web endpoint surface for queue broker summary, subscription inspection, waiting-message inspection, and queue/type pause-resume control.
Challenges¶
- Single-consumer semantics: one handler per queued message type while still allowing multiple host instances to compete for work.
- Delayed handler availability: work can be enqueued before a handler is registered and should wait instead of failing.
- Durability: persisted messages need retries, leases, expiration, and dead-lettering behavior without depending on the messaging feature.
- Operational control: support engineers need visibility into waiting work, active subscriptions, and queue pause state.
Solution¶
- Contracts:
IQueueMessage,IQueueMessageHandler<TMessage>,IQueueBroker, andIQueueBrokerServiceprovide a queue-specific API. - Runtime:
QueueingServiceis the single hosted service for the feature and applies subscriptions after host startup. - Providers: brokers implement
QueueBrokerBase; provider-specific background work plugs into the single runtime throughIQueueBrokerBackgroundProcessor. - Operations:
IQueueBrokerServiceandPresentation.Web.Queueingexpose broker inspection and operational controls, including retained-message queries, retry/archive actions, waiting-message inspection, purge, and queue/type pause-resume management.
Architecture¶
sequenceDiagram
actor Producer
participant Broker as IQueueBroker
participant Behaviors as Enqueue Behaviors
participant Transport as Broker Provider
participant Worker as QueueingService / Provider Worker
participant Handler as IQueueMessageHandler<T>
Producer->>Broker: Enqueue(message)
Broker->>Behaviors: Execute enqueue pipeline
Behaviors->>Transport: Persist or dispatch work item
Worker->>Transport: Claim item (if durable)
Worker->>Handler: Handle(message)
alt success
Worker->>Transport: Mark succeeded
else no handler yet
Worker->>Transport: Leave waiting for handler
else failure
Worker->>Transport: Retry or dead-letter
end
Core Contracts¶
IQueueBroker(src/Application.Queueing/IQueueBroker.cs)- Subscribe/unsubscribe queue handlers.
- Enqueue messages and optionally wait for provider-specific persistence confirmation.
- Process messages through the shared queue dispatch pipeline.
IQueueBrokerService(src/Application.Queueing/IQueueBrokerService.cs)- Inspect queue summary, subscriptions, and waiting messages.
- Pause or resume queues and specific message types.
QueueBrokerBase(src/Application.Queueing/QueueBrokerBase.cs)- Validates messages, runs behaviors, resolves handlers, and enforces queue semantics.
QueueingService(src/Application.Queueing/QueueingService.cs)- The single hosted service for the whole feature.
Getting Started¶
In-process broker¶
builder.Services.AddQueueing(builder.Configuration, context =>
context.WithSubscription<OrderQueuedMessage, OrderQueuedHandler>())
.WithInProcessBroker(new InProcessQueueBrokerConfiguration
{
MaxDegreeOfParallelism = 1,
EnsureOrdered = true
})
.AddEndpoints();
Entity Framework broker¶
builder.Services.AddDbContext<AppDbContext>(...);
builder.Services.AddQueueing(builder.Configuration, context =>
context.WithSubscription<OrderQueuedMessage, OrderQueuedHandler>())
.WithEntityFrameworkBroker<AppDbContext>(new EntityFrameworkQueueBrokerConfiguration
{
AutoSave = true,
ProcessingInterval = TimeSpan.FromSeconds(15),
LeaseDuration = TimeSpan.FromSeconds(30)
})
.AddEndpoints(options => options.RequireAuthorization());
Azure Service Bus broker¶
builder.Services.AddQueueing(builder.Configuration, context =>
context.WithSubscription<OrderQueuedMessage, OrderQueuedHandler>())
.WithServiceBusBroker(new ServiceBusQueueBrokerConfiguration
{
ConnectionString = configuration["Queueing:ServiceBus:ConnectionString"],
QueueNamePrefix = "bit",
AutoCreateQueue = true,
MaxConcurrentCalls = 8,
MaxDeliveryAttempts = 5
})
.AddEndpoints(options => options.RequireAuthorization());
Azure Queue Storage broker¶
builder.Services.AddQueueing(builder.Configuration, context =>
context.WithSubscription<OrderQueuedMessage, OrderQueuedHandler>())
.WithAzureQueueStorageBroker(new AzureQueueStorageQueueBrokerConfiguration
{
ConnectionString = configuration["Queueing:AzureQueueStorage:ConnectionString"],
QueueNamePrefix = "bit",
AutoCreateQueue = true,
MaxConcurrentCalls = 8,
MaxDeliveryAttempts = 5,
VisibilityTimeout = TimeSpan.FromSeconds(30),
PollingInterval = TimeSpan.FromSeconds(1)
})
.AddEndpoints(options => options.RequireAuthorization());
Your DbContext must implement IQueueingContext:
public class AppDbContext : DbContext, IQueueingContext
{
public DbSet<QueueMessage> QueueMessages { get; set; }
}
Define a queue message and handler¶
public sealed class OrderQueuedMessage(Guid orderId) : QueueMessageBase
{
public Guid OrderId { get; } = orderId;
}
public sealed class OrderQueuedHandler : IQueueMessageHandler<OrderQueuedMessage>
{
public Task Handle(OrderQueuedMessage message, CancellationToken cancellationToken)
{
// process one logical work item
return Task.CompletedTask;
}
}
Enqueue work¶
public sealed class OrdersService(IQueueBroker queueBroker)
{
public Task QueueOrderAsync(Guid orderId, CancellationToken cancellationToken)
{
return queueBroker.Enqueue(new OrderQueuedMessage(orderId), cancellationToken);
}
}
Operational Endpoints¶
The retained-message operational surface lives in src/Presentation.Web.Queueing/QueueingEndpoints.cs.
When you reference Presentation.Web.Queueing, you can register it directly from the fluent queueing builder:
builder.Services.AddQueueing(builder.Configuration)
.WithSubscription<OrderQueuedMessage, OrderQueuedHandler>()
.WithEntityFrameworkBroker<AppDbContext>()
.AddEndpoints(options => options
.GroupPath("/api/_system/queueing")
.GroupTag("_System.Queueing")
.RequireAuthorization());
If you prefer separate registration, the existing builder.Services.AddQueueingEndpoints(options => options.RequireAuthorization()) helper is also available.
Routes:
GET /api/_system/queueing/statsGET /api/_system/queueing/subscriptionsGET /api/_system/queueing/messagesGET /api/_system/queueing/messages/{id}GET /api/_system/queueing/messages/{id}/contentGET /api/_system/queueing/messages/statsGET /api/_system/queueing/messages/waiting?take=50POST /api/_system/queueing/messages/{id}/retryPOST /api/_system/queueing/messages/{id}/lease/releasePOST /api/_system/queueing/messages/{id}/archiveDELETE /api/_system/queueing/messagesPOST /api/_system/queueing/queues/{queueName}/pausePOST /api/_system/queueing/queues/{queueName}/resumePOST /api/_system/queueing/types/{type}/pausePOST /api/_system/queueing/types/{type}/resumePOST /api/_system/queueing/types/{type}/circuit/reset
In-process semantics¶
The in-process broker is the simplest queue transport. It uses an in-memory channel per message type and dispatches work within the same process.
Queue topology:
- One in-memory channel is created per registered queue message type.
- There is no external broker or persistence layer.
Competing consumers:
- Only consumers inside the same process can compete for work.
- Multiple application instances do not share the queue; each instance has its own isolated channel.
- This makes the in-process broker suitable for tests and single-instance scenarios only.
Acknowledgement and retry:
- Messages are removed from the channel immediately before the handler is invoked.
- If the handler throws, the message is tracked as failed but is not automatically retried because it has already been removed from the channel.
- There is no dead-letter queue; failed messages are only tracked in the runtime.
Waiting for handler:
- Messages can be enqueued before a handler subscribes; they wait in the channel until a consumer is available.
- When a subscription is added, the consumer starts reading from the channel and processes any backlog.
Pause/resume:
- When paused, the consumer stops reading from the channel.
- Messages remain in the in-memory channel and are processed when consumption resumes.
Expiration:
- The broker checks
message.Timestamp + MessageExpirationagainst UTC now before invoking the handler. - If expired, the message is skipped and tracked as
Expired.
Operational visibility:
- The in-process broker tracks messages in memory for the lifetime of the process.
GetMessagesAsync,GetSummaryAsync, pause/resume, and purge are supported through this in-memory tracker.- Restarting the application clears all tracked and queued messages.
Entity Framework semantics¶
The Entity Framework broker maps queue semantics to a SQL-backed durable store using DbContext and renewable leases.
Queue topology:
- One logical queue is represented by rows in a
QueueMessagetable filtered by message type. - The queue name defaults to the message type name, with optional
QueueNamePrefixandQueueNameSuffix.
Competing consumers:
- Multiple application instances compete for work by claiming leases on rows in the same database table.
- A worker claims a message by updating
LockedByandLockedUntil; other workers skip rows that are already leased. - Workers verify
LockedBybefore finalizing state. If another node took ownership, the older worker skips finalization.
Leases and renewal:
LeaseDurationcontrols how long a worker owns a message.LeaseRenewalIntervalcontrols how often a healthy worker renews its lease.- If a worker crashes, the lease expires and another worker can claim the message after
LeaseDuration.
Retry and dead-letter:
- The broker increments
AttemptCounton each processing attempt. - On failure: the message is released (lease cleared) and becomes available for the next worker.
- After
MaxDeliveryAttemptsis exceeded, the message is markedDeadLetteredand is no longer eligible for processing.
Waiting for handler:
- Messages can be enqueued before any handler subscribes; they persist in the database table.
- When no handler is registered, the broker returns
WaitingForHandlerand leaves the message unleased for the next polling cycle.
Pause/resume:
- When a queue or message type is paused, workers stop claiming messages for that queue/type.
- Messages remain in the database table and are picked up when polling resumes.
Expiration:
- The broker checks
message.Timestamp + MessageExpirationagainst UTC now before invoking the handler. - If expired, the message is marked
Expiredand is no longer eligible for processing.
Archive:
- Terminal messages (
Succeeded,DeadLettered,Expired) can be archived withArchiveMessageAsync. AutoArchiveAfterandAutoArchiveStatusescan be configured to archive terminal messages automatically after a retention period.
Operational visibility:
- The Entity Framework broker provides full durable retained history in the database.
GetMessagesAsync,GetSummaryAsync,GetMessageStatsAsync, retry, archive, lease release, pause/resume, and purge are all backed by the database.- Unlike the in-memory brokers, tracked history survives application restarts.
RabbitMQ semantics¶
The RabbitMQ queue broker maps queue semantics to RabbitMQ work queues using the default exchange and manual acknowledgement.
Queue topology:
- One RabbitMQ queue is created per registered queue message type.
- The queue name defaults to the message type name (e.g.,
OrderQueuedMessage), with optionalQueueNamePrefixandQueueNameSuffix. - The broker uses the default exchange (
"") and publishes with the queue name as the routing key. RabbitMQ routes messages directly to the queue with the matching name.
flowchart LR
P[Publisher] -->|routingKey=OrderQueuedMessage| E[Default Exchange]
E --> Q1[Queue: OrderQueuedMessage]
E --> Q2[Queue: InvoiceQueuedMessage]
Q1 --> C1[Consumer A]
Q1 --> C2[Consumer B <br/>(competing)]
Q2 --> C3[Consumer C]
Competing consumers:
- Multiple application instances that use the same
QueueNamePrefix/QueueNameSuffixand subscribe to the same message type consume from the same queue. - RabbitMQ round-robins messages across all connected consumers on that queue.
- A message is delivered to exactly one consumer at a time.
Acknowledgement and retry:
- Consumption uses manual ack (
autoAck: false). - On success: the broker sends
BasicAckand the message is removed from the queue. - On failure: the broker implements retry by republishing the message with an incremented
x-attempt-countheader, then acks the original message. If republish fails, it falls back toBasicNack(requeue: true). - After
MaxDeliveryAttemptsis exceeded, the broker sendsBasicNack(requeue: false)and the message is dropped (dead-lettered). No separate dead-letter exchange is configured by default. - If a handler throws an unexpected exception, the broker nacks with requeue so the message is retried.
Waiting for handler:
- The broker declares the queue on both publish and subscribe. This means messages can be enqueued before any handler subscribes.
- When no consumer is connected, messages accumulate in the RabbitMQ queue.
- When a consumer connects, RabbitMQ delivers the backlog.
Pause/resume:
- When a queue or message type is paused, the broker detects this in
OnMessageAsyncand sendsBasicNack(requeue: true). - The message remains in the RabbitMQ queue and will be redelivered when the consumer resumes processing.
Expiration:
- The broker checks
message.Timestamp + MessageExpirationagainst UTC now before invoking the handler. - If expired, the message is nacked without requeue and tracked as
Expired. - The AMQP
Expirationproperty is also set on publish, so RabbitMQ can drop expired messages that have not yet been delivered.
EnqueueAndWait:
EnqueueAndWaitenables publisher confirms (ConfirmSelect) and callsWaitForConfirmsOrDiewith a 30-second timeout. This guarantees the message has been persisted by RabbitMQ before the call returns.
Operational visibility:
- The RabbitMQ broker service (
RabbitMQQueueBrokerService) tracks recent messages in memory (bounded to 10,000 items with LRU eviction). GetMessagesAsync,GetSummaryAsync,GetMessageStatsAsync, pause/resume, and purge are supported through this in-memory tracker.- Unlike the Entity Framework broker, there is no durable retained history. Restarting the application clears the operational tracker (the messages themselves remain in RabbitMQ).
Service Bus semantics¶
The Service Bus broker maps queue semantics to Azure Service Bus queues using peek-lock consumption and manual complete/abandon/dead-letter.
Queue topology:
- One Service Bus queue is created per registered queue message type.
- The queue name defaults to the message type name, with optional
QueueNamePrefixandQueueNameSuffix. - Queue names are sanitized to comply with Service Bus naming rules (alphanumeric, hyphens, underscores, and slashes, 1-260 characters).
Competing consumers:
- Multiple application instances that use the same
QueueNamePrefix/QueueNameSuffixand subscribe to the same message type consume from the same queue. - Service Bus delivers messages to exactly one consumer at a time using peek-lock.
Acknowledgement and retry:
- Consumption uses peek-lock with
AutoCompleteMessages = false. - On success: the broker calls
CompleteMessageAsyncand the message is removed from the queue. - On failure: the broker calls
AbandonMessageAsync, which increments the Service Bus delivery count and makes the message immediately available for redelivery. - After
MaxDeliveryAttemptsis exceeded, the broker callsDeadLetterMessageAsyncwith reasonMaxDeliveryAttemptsExceeded.
Waiting for handler:
- The broker creates the queue at runtime when
AutoCreateQueueis enabled. - Messages can be enqueued before any handler subscribes; they persist in the Service Bus queue.
- When no handler is registered, the broker returns
WaitingForHandler, abandons the message, and tracks it accordingly.
Pause/resume:
- When a queue or message type is paused, the broker detects this in
OnMessageAsyncand callsAbandonMessageAsync. - The message remains in the Service Bus queue and will be redelivered when the consumer resumes processing.
- Because abandon triggers immediate redelivery, the pause window should be kept short to avoid exhausting
MaxDeliveryCount.
Expiration:
- The broker checks
message.Timestamp + MessageExpirationagainst UTC now before invoking the handler. - If expired, the broker dead-letters the message with reason
Expired. - The Service Bus
TimeToLiveis also set on enqueue so the service can drop expired messages that have not yet been delivered.
Operational visibility:
- The Service Bus broker service (
ServiceBusQueueBrokerService) tracks recent messages in memory (bounded to 10,000 items). GetMessagesAsync,GetSummaryAsync,GetMessageStatsAsync, pause/resume, and purge are supported through this in-memory tracker.- Unlike the Entity Framework broker, there is no durable retained history. Restarting the application clears the operational tracker (the messages themselves remain in Service Bus).
Azure Queue Storage semantics¶
The Azure Queue Storage broker maps queueing semantics to Azure Queue Storage using polling-based consumption and visibility timeouts.
Queue topology:
- One Azure Queue Storage queue is created per registered queue message type.
- The queue name defaults to the message type name (e.g.,
orderqueuedmessage), with optionalQueueNamePrefixandQueueNameSuffix. - Queue names are sanitized to comply with Azure Queue Storage naming rules (lowercase alphanumeric and hyphens, 3-63 characters).
Polling and visibility timeout:
- The broker starts a background polling loop per subscribed queue.
- Each poll calls
ReceiveMessagesAsyncwith a configurableVisibilityTimeout. - While a message is invisible, no other consumer can receive it.
- If the message is not deleted within the visibility timeout, it becomes visible again for redelivery.
- When no messages are available, the broker sleeps for
PollingIntervalbefore polling again.
Competing consumers:
- Multiple application instances that use the same
QueueNamePrefix/QueueNameSuffixand subscribe to the same message type consume from the same queue. - Azure Queue Storage round-robins messages across all connected consumers on that queue.
- A message is delivered to exactly one consumer at a time while it remains within the visibility timeout.
Retry and dead-letter:
- The broker uses the built-in
DequeueCountheader to track delivery attempts. - On success: the broker deletes the message from the queue.
- On failure: if
DequeueCountis belowMaxDeliveryAttempts, the broker updates the message visibility timeout toRetryDelayso it reappears for retry after a short delay. - After
MaxDeliveryAttemptsis exceeded, the broker deletes the message and tracks it asDeadLetteredin the runtime. - If a handler throws an unexpected exception, the message is retried up to
MaxDeliveryAttempts.
Waiting for handler:
- The broker creates the queue on both publish and subscribe when
AutoCreateQueueis enabled. - Messages can be enqueued before any handler subscribes; they remain in the queue until a poller starts.
- When no handler is registered for a message type, the broker returns
WaitingForHandler, makes the message immediately visible again, and tracks it accordingly.
Pause/resume:
- When a queue or message type is paused, the polling loop detects this and delays further receives.
- Messages already in the queue remain there and will be picked up when polling resumes.
Expiration:
- The broker checks
message.Timestamp + MessageExpirationagainst UTC now before invoking the handler. - If expired, the message is deleted from the queue and tracked as
Expired. - The Azure Queue Storage
TimeToLiveproperty is also set on enqueue so the service can drop expired messages that have not yet been delivered.
Operational visibility:
- The Azure Queue Storage broker service (
AzureQueueStorageQueueBrokerService) tracks recent messages in memory (bounded to 10,000 items). GetMessagesAsync,GetSummaryAsync,GetMessageStatsAsync, pause/resume, and purge are supported through this in-memory tracker.- Unlike the Entity Framework broker, there is no durable retained history. Restarting the application clears the operational tracker (the messages themselves remain in Azure Queue Storage).
All brokers implement the same IQueueBrokerService operational contract. The in-process broker exposes it over runtime-tracked items, the Entity Framework broker adds durable retained history plus archive-aware filtering and lease management, and the Service Bus broker provides lightweight in-memory operational tracking.
For Entity Framework, the most relevant broker-specific retention options are:
AutoArchiveAfterto archive terminal messages automatically after a retention period.AutoArchiveStatusesto limit auto-archival to specific terminal states such asSucceeded,DeadLettered, orExpired.
Runtime Behavior¶
- Duplicate handlers fail fast. A second handler for the same queue message type is rejected.
- Missing handlers produce
WaitingForHandlerinstead of immediate failure. - Durable providers use at-least-once delivery semantics; handlers should remain idempotent.
AddQueueing(...)may be called from multiple modules. Registrations accumulate, but queueing still uses one hosted service.
Multi-host Deployment Notes¶
EntityFrameworkQueueBroker<TContext>is intended to support multiple host instances competing for work against the same durable store.- For real multi-host deployments, prefer SQL Server or PostgreSQL so lease claim and renewal can use efficient conditional updates in the database.
- Queueing still provides at-least-once delivery semantics. The goal is one logical owner at a time, not an exactly-once execution guarantee.
- A queued item can be reprocessed if a host crashes after side effects but before finalize, or if lease ownership changes after expiry.
- Queue handlers should therefore be idempotent and safe to execute more than once for the same
MessageId. - Set
LeaseDurationlonger than normal handler execution time andLeaseRenewalIntervallow enough that healthy workers renew ownership before expiry. SQLiteis suitable for local/dev and lightweight durable scenarios, but it is not the recommended storage engine for distributed multi-host queue processing.- Workers verify
LockedBybefore finalizing state. If another node took ownership, the older worker skips finalization rather than overwriting the newer lease owner.
Relation To Messaging¶
Use Messaging when one event should fan out to many handlers. Use Queueing when one work item should be owned by one handler execution. The APIs are intentionally similar so the developer experience stays familiar, but the runtime semantics are different.