In this episode, we’ll implement the outbox publisher, or better yet, two versions of it, one better suited for lower latency and another for reliability. As we continue our event-driven path, this will be a good opportunity to introduce a couple of interesting .NET Core features:
Note: depending on your preference, you can check out the following video, otherwise, skip to the written version below.
The playlist for the whole series is here.
In the previous episode, we implemented the outbox, as well as storing the messages in it transactionally. In this episode, we’ll implement the outbox publisher (two versions in fact) that’s responsible for reading the messages from the table, push them to the event bus and delete them after they’re published successfully.
Something we’ll see that the outbox publisher takes into consideration is that multiple instances might be running concurrently. Due to this, the publisher is coded in a way to try to avoid publishing the same message multiple times, “try” being the keyword here, as it’s not a guarantee we can achieve with this kind of solution.
As briefly pointed out, we’ll in fact implement two versions of the outbox publisher, the first geared towards reducing event publishing latency, while the second aimed at reliability, ensuring all the events are published even in the face of transient failures. As you might be suspecting from this quick intro, we could live with just the second one, simplifying our work, but the first allows us to play with a .NET Core feature we haven’t used so far,
Another .NET Core feature we’ll use in this episode is running tasks in the background, by implementing
IHostedService, which can be done directly or by inheriting from the
Before getting on with business, to situate ourselves in the event-driven integration path, we can take a look at the diagram introduced in episode 40:
Let’s start with our main outbox publisher, which is triggered every time a new message is stored. This is the implementation that gives us lower event publishing latency, as it doesn’t rely on polling, but on being listening for new work.
As introduced, this implementation is not completely reliable by itself. The reason for this, is that from the time the publisher is triggered, to the time it publishes the events, something might go wrong, like the server going down, and the event that caused the publisher execution would remain in the outbox pending publishing. Due to this, we need additional strategies to ensure all events are published, regardless of transient failures. As a result, this outbox publisher implementation becomes more of an optimization, to try to publish the events as soon as possible, as well as an opportunity to play with Channels 🙂.
Notify when a new message is stored
Picking up where we left in the previous episode, in terms of implementation, we had a comment in the
SaveChangesAsync method to “publish the events persisted in the outbox”.
What we’ll do is not actually publish the events, as the comment mentioned, but instead notify some interested component that messages were persisted and it can proceed to publish them. As publishing the event is not required to fulfill the user’s request, this reduces the time spent waiting for the request to complete.
We could create an interface to represent this notification behavior, but lately I’ve been more adept of creating delegates instead of single-method interfaces (depending on the scenario of course). With this in mind, we can create an
We could also achieve the same with a
Func, but not only giving it a name can make it easier to understand, it also helps when configuring things in the dependency injection container, as we could have different
Funcs with the same signature but different purposes.
Now we can inject the delegate in the
AuthDbContext and use it when new messages are persisted in the outbox table.
AuthDbContext is ready to notify when a new message is added to the outbox, now we need to create the glue between said notification and some component that runs in the background and actually publishes things to the event bus.
This is where we’ll make use of
Channels help us implement in-memory producer/consumer scenarios, optimized for async code. This fits our problem very nicely, as we want to notify (produce) when a new message is available in persistence, while having another component listening (consume) to that notification to act on it.
To encapsulate this, we can create a class
OutboxListener (not very happy with the name, but it’ll do for now 😛).
Creating a channel
Firstly, let’s look at the constructor. In there, we’re creating the channel we’ll use to publish the id of the message stored in the outbox, hence the
Channel<long> type, meaning we’ll have a channel that can contain
longs, the type of our message ids.
We can have bounded and unbounded channels, where the first is limited in size and we should elect a strategy to handle a full channel (e.g. wait for space or drop new items), while the latter doesn’t have a size restriction. An unbounded channel can be a bit dangerous because if the consumer is slow to process items, memory will grow indefinitely. We’ll go with unbounded for now, but keep in mind bounded is likely a better idea.
When creating a channel, we can provide some options, in the unbounded channel case, through the
UnboundedChannelOptions class. In this case, we’re indicating that we’ll have a single reader/consumer and multiple writers/producers. With these options, the channel instance we’ll get can be optimized for our use case. If we were using a bounded channel, it would be through these options (using the
BoundedChannelOptions class) that we would be able to set the capacity and the behavior of the channel when full.
Writing to a channel
With a channel instance in hand, we can start writing to it. This is done in the
OnNewMessages method. Notice the method signature matches the delegate we created for
AuthDbContext to use. This is no coincidence, as this will be configured in the dependency injection container to be provided to the
A channel exposes two properties,
Reader (of types
ChannelReader<T> respectively), which provide the methods to write to/read from it. For either case we have multiple options, not a single method for writing and reading, to adapt to our needs.
ChannelWriter, the methods available are:
TryWrite, as the name implies, tries to write to the channel, returning a boolean to indicate whether it wrote or not. Reasons for not writing may be that the channel is full or completed (no longer accepting new writes).
WaitToWriteAsyncdoesn’t actually write to the channel, instead returning
ValueTask<bool>that can be awaited to know when space is available to write. If the boolean returned is false, it means it isn’t be possible to write anymore.
WriteAsyncis a mix between
WaitToWriteAsync. If there is space to write, it writes, otherwise waits for space to be available.
TryCompleteis used when we don’t want to write to the channel anymore, be it because we have nothing more to write or an exception happened and we want to stop all the things.
Looking at the
OutboxListener code, we’re simply using
TryWrite. There are a couple of factors for this decision.
The most immediate explanation is, being an unbounded channel,
TryWrite will always succeed because there are no space issues (the only way to return false, is if the channel is completed).
Additionally, even if we were using a bounded channel, we could still ignore when not being able to write because, as introduced in the beginning of the post, we will have a fallback publishing any pending messages. If we didn’t have this fallback, then we’d need to approach things differently. In this case we’re be making a tradeoff between the time a user needs to wait for a request to complete and the latency of event publishing.
Reading from a channel
ChannelReader also exposes a number of methods with similar behavior, just applied to reading:
TryReadreads an item from the channel if there is one available, returning true in such a case, otherwise returns false.
WaitToReadAsyncdoesn’t actually read, instead returning a
ValueTask<bool>that can be awaited to know when an item is available to read. If the boolean returned is false, it means it isn’t possible to read anymore (channel completed).
ReadAsyncis a mix between
WaitToReadAsync. If there is an item to read, it reads, otherwise waits for an item to be available.
If you look at our
OutboxListener code, you’ll notice we’re not using any of these.
Besides the methods previously mentioned,
ChannelReader also exposes a
ReadAllAsync, used above, which returns an
IAsyncEnumerable. If you never seen an
IAsyncEnumerable, which wouldn’t be surprising as it’s a recent feature (introduced with .NET Core 3.0) like the name implies, it’s like an
IEnumerable but tailored for async scenarios. With it we can use a feature introduced in C# 8,
await foreach, which allows us to handle async streams in a similar way to traditional iteration on collections. There’s a section in “What’s new in C# 8.0” about asynchronous streams.
Running in the background
OutboxListener ready, we can now use it to be notified when new messages are stored in the outbox. To do this, we’ll create a background task that starts the process of listening to these notifications. In .NET Core, we can create these kinds of background tasks by implementing an
IHostedService, either directly or by inheriting from the
The responsibility of this component, a class named
OutboxPublisherBackgroundService, will be to listen for notifications and forward to an
OutboxPublisher class that’ll implement the remaining logic.
As we can see, we’re inheriting from
BackgroundService, which means we have a single method we need to implement,
ExecuteAsync. This method returns a
Task, that when completed means the service has finished its job. In our case, we want it to be running during the whole lifetime of the application, but in other cases we might just want to run some things asynchronously when starting the application.
As for the implementation of
ExecuteAsync, we’re doing the
await foreach mentioned earlier, handling each message id as it arrives. As noted in the comment, executing the event publishing logic one by one will likely hurt event publishing throughput, so we should consider batching, but we’ll keep it simple for now.
For each iteration, we make use of the
OutboxPublisher class (which we’ll see in the next section) to handle the event publishing logic.
Besides that, we catch and log exceptions, because we don’t want the service to stop while the application keeps running. Depending on the type of error though, we could probably improve this.
Publish an event
Publishing an event happens in the previously mentioned
OutboxPublisher logic consists of:
- Reading the message for the given id from the outbox.
- Publish the event to the event bus.
- Delete the message pertaining the published event from the outbox.
The code to implement this logic is a bit more complex then we would expect from this description, as we want to take some precautions due to the fact multiple publishers might be running concurrently, not in this service, where we have a single one, but we might have multiple instances of the auth service running (e.g. multiple servers or multiple containers).
Let’s go through
The first thing that comes up is actually not logic related, but needed, which is creating a dependency injection scope and getting a
DbContext instance from there. We need to do this, because we passed the
OutboxPublisher to the
OutboxPublisherBackgroundService through the constructor, and
OutboxPublisherBackgroundService will live for as long as the application lives. As a
DbContext shouldn’t live for that long (e.g. the change tracker keeps things in memory), we need to control its lifetime manually.
As for actual publishing logic, the first thing we do is starting a transaction. As you might be suspecting, this is due to the precautions I mentioned regarding concurrent publishing.
Immediately after querying the database to get the message with the provided id, we call a
TryDeleteMessageAsync method, that not only tells the
DbContext the message should be removed, it actually calls
SaveChangesAsync to make it so in the database, not just in-memory. Remember though, that we’re in a transaction, so even if the deletion is done in the database, it’s not committed yet. We do this because if there’s a concurrent publisher executing, which for some reason tries to delete the same message, it will be locked until the current transaction is committed or rolled back. This way we minimize the likelihood of publishing the same event multiple times.
TryDeleteMessageAsync returns a boolean, where true means the message was successfully deleted and we can proceed with publishing the event, while false is returned when deletion wasn’t successful, as we can see in the code, due to a
DbUpdateConcurrencyException is the exception that’s thrown when a change fails in the database due to another happening concurrently, in this case, another component beat the current executing code to deleting the outbox message.
When deletion of the message is successful, we can publish the event and commit the changes to the database. In the code above there’s a log representing the actual publishing to the event bus, as we’ll implement that in the coming episodes using Apache Kafka.
If the message wasn’t successfully deleted (or if an unexpected exception occurs), we rollback the transaction.
With this, we wrap up the latency oriented outbox publisher implementation, we can proceed to the reliability oriented version.
Fallback outbox publisher
Before getting into the implementation details, let’s review why do we need to have a fallback for the outbox publisher we just implemented.
The most important reason is to handle cases where a transient failure makes us lose the message ids that were written to the in-memory channel used in the outbox publisher flow. An example of such a failure is the server (or container) going down.
Additionally, having this fallback allows us, as we saw, to have a more naive implementation. Examples of this are:
- If we used a bounded channel and items were dropped, we didn’t worry because the fallback would pick them up.
- If the event bus is temporarily down, causing an error to occur when publishing an event, we didn’t worry with retries and related patterns, the fallback would pick things up.
This is not to say that the current implementation couldn’t use some extra improvements, it likely could, but having this fallback lets us get away with some less thought out approaches.
Read and publish events
OutboxFallbackPublisher class, which implements the logic to publish any events that got left behind, has many similarities to the
OutboxPublisher seen previously, being the major difference that it looks for any messages left on the outbox table, instead of just for a given message id.
Let’s start with the core logic.
As we want to publish all pending events, not just some,
PublishPendingAsync, which is the only public method of the class, keeps looping while there are pending messages in the outbox, moving the batch publishing logic to
PublishBatchAsync, it’s very similar to what we saw in the original
OutboxPublisher. The main differences we can spot are a call to
GetMessageBatchAsync, which will provide a number of messages, not a single specific one, as well as returning a boolean indicating if there are more messages available to publish.
Let’s now drill down into the methods used to support this logic.
MessageBatchQuery to have the base query to obtain pending messages. The rationale I used was, if the message is there for more than 30 seconds, it probably means it got left behind, so we should publish it. Using this base query,
GetMessageBatchAsync fetches a batch of messages, while
IsNewBatchAvailableAsync simply checks if there are any messages pending that match the defined criteria.
TryDeleteMessagesAsync is the same as we saw in the
OutboxPublisher, differing just in that it deletes multiple rows, not just one.
GetMinimumMessageAgeToBatch is a helper method to calculate the minimum age a message should be to qualify as pending (side note, using
DateTime.UtcNow directly is not great for unit testing).
To wrap things up about the
OutboxFallbackPublisher, we need to schedule its execution. To do this, we can again resort to a
Similarly to the
OutboxPublisherBackgroundService, we just want to get the publisher to do its work. In this case, as we don’t subscribe to anything, we take a polling approach. We call the publisher to process any pending messages, and when it’s done, we “sleep” for 30 seconds, instead of hammering the database continuously.
Wiring everything together
To get everything working together, what’s left is setting things up in the dependency injection container. This is done in an
EventExtensions class created to keep the
Startup class clean.
The scan for event mappers was already there, from previous episodes, so the new stuff is what comes after.
OutboxFallbackPublisher are registered as usual. They’re all singletons,
OutboxListener really needs to be, because we need to keep using the same channel to notify of new messages.
OutboxFallbackPublisher don’t need to be singleton by themselves, but as they’ll be used by the background services that have the same the lifetime as the application, as we already discussed, it makes sense to make them singleton as well.
The registration of
OnNewOutboxMessages might be slightly different from what’s common, because we want to associate a specific instance method with the delegate. That’s why we’re making use of overload that accepts a
Func, where we get an
IServiceProvider to obtain the
OutboxListener from which we want to bind the
OnNewMessages method with the delegate used by the
OutboxPublisherFallbackBackgroundService are registered using the
AddHostedService, which internally registers the background service as a singleton.
That does it for this episode. We implemented the outbox publisher, two versions of it to be more precise, while playing with some interesting features of .NET Core - channels and background services.
Summarizing, the main topics we looked at were:
- Using channels to implement in-memory producer/consumer scenarios, optimized for async code.
- Implementing background tasks using
- Reading and publishing messages from the outbox, taking concurrent execution into consideration.
As a quick reminder, the achieved solution might be a bit overkill, as we could get away with just the polling solution, but we wouldn’t have the opportunity to play with all the things we did 🙂.
In the next episodes, we’ll introduce Apache Kafka and implement event publishing/subscription on top of it.
Links in the post:
- An Introduction to System.Threading.Channels
- Background tasks with hosted services in ASP.NET Core
- “What’s new in C# 8.0”
- Event-driven integration #1 - Intro to the transactional outbox pattern [ASPF02O|E040]
- Event-driven integration #3 - Storing events in the outbox table [ASPF02O|E042]
The source code for this post is in the Auth repository, tagged as
Sharing and feedback always appreciated!
Thanks for stopping by, cyaz!