.NET Heisenbug Mystery Theater: How Did an Exception Escape its Catch Block?

A painful lesson on atomicity and the assignment of structs.

21 minutes to read

Over the past several months the Akka.NET team has had reports of the following Exception popping up unexpectedly throughout many of our plugins and end-user applications that use the Akka.Streams1 SelectAsync stage - such as Akka.Streams.Kafka and Akka.Persistence.Sql:

ArgumentException stemming from the cancellation of an Akka.NET stream without a cause

That error message seems simple enough - it comes from here inside GraphStage.cs:

[InternalApi] 
public void InternalOnDownstreamFinish(Exception cause)
{
    try
    {
        if (cause == null)
            throw new ArgumentException("Cancellation cause must not be null", nameof(cause));

In Akka.Streams parlance, a stream gets cancelled when an unhandled Exception is thrown and that error should be propagated all the way down to this GraphStage.InternalOnDownstreamFinish method so we can log why the stream is being cancelled / terminated.

Here’s the mystery - this is the code that “threw” the Exception inside Akka.Persistence.Sql for instance:

.SelectAsync(
    JournalConfig.DaoConfig.Parallelism,
    async promisesAndRows =>
    {
        try
        {
            await WriteJournalRows(promisesAndRows.Rows);
            foreach (var taskCompletionSource in promisesAndRows.Tcs)
                taskCompletionSource.TrySetResult(NotUsed.Instance);
        }
        catch (Exception e)
        {
            foreach (var taskCompletionSource in promisesAndRows.Tcs)
                taskCompletionSource.TrySetException(e);
        }

        return NotUsed.Instance;
    })

How on Earth did the Exception escape the catch block? And why do we have no record of it by the time the stream completes? That is the mystery we are going to explore today.

SelectAsync Exceptions

Over several months we attempted to trap this issue so we could accurately locate where it was happening inside our code. The original ArgumentException was very broad, so we needed better data:

And eventually, those changes produced logs that looked like this:

[ERROR][03/14/2025 06:26:33.434Z][Thread 0029][FlowShape`2([SelectAsync.in] [SelectAsync.out])(akka://KafkaUnexpectedRecordsConsumer2/user/kafka-unexpected-records-100-no-PollKafka-actor/worker-0/StreamSupervisor-2)] An exception occured inside SelectAsync while processing message [Akka.Streams.Kafka.Messages.CommittableMessage`2[Confluent.Kafka.Ignore,System.Byte[]]]. Supervision strategy: Stop

And different users using different plugins, both of which use SelectAsync internally, reported similar versions of this new error too:

Ok, so now we know where the Exception is coming from - SelectAsync. That narrows our search area by a significant amount. Now the next question is - how can an Exception escape this code inside SelectAsync?

SelectAsync was protected by a try..catch

The Plumbing

If we peel the covers back on SelectAsync we have the following:

public SelectAsync(int parallelism, Func<TIn, Task<TOut>> mapFunc)
{
    _parallelism = parallelism;
    _mapFunc = mapFunc;
    Shape = new FlowShape<TIn, TOut>(In, Out);
}

Akka.Streams’ syntax looks a lot like LINQ, but the differences are:

  1. It’s asynchronous, all run by Akka.NET actors under the covers and
  2. It’s back-pressure aware - if a stage is blocking as a result of an I/O or compute constraint, we block the upstream stages from emitting new events.

The int parallelism is one of the tools that SelectAsync uses to create backpressure - by limiting the number of in-flight tasks that can be executing at any given time. The mapFunc is what emits the Task<TOut> that we are going to await on.

The other thing that is special about SelectAsync is that it preserves invocation order - even if the parallel Task<TOut>s complete in arbitrary orders (which they almost certainly will), we will always deliver the results of SelectAsync in the original invocation order of the TIn inputs sent to us2.

Therefore, SelectAsync has to have a data structure for storing the pending Task<TOut>’s eventual output for this purpose:

private class Holder<T>
{
    private readonly Action<Holder<T>> _callback;

    public Holder(object message, Result<T> element, Action<Holder<T>> callback)
    {
        _callback = callback;
        Message = message;
        Element = element;
    }

    public Result<T> Element { get; private set; }
    public object Message { get; }

    public void SetElement(Result<T> result)
    {
        Element = result.IsSuccess && result.Value == null
            ? Result.Failure<T>(ReactiveStreamsCompliance.ElementMustNotBeNullException)
            : result;
    }

    public void Invoke(Result<T> result)
    {
        SetElement(result);
        _callback(this);
    }
}

That’s what the Holder<T> does, and the Holder<T> will eventually accept a Result<T> once the Task<T> completes:

public struct Result<T> : IEquatable<Result<T>>
{
    public readonly bool IsSuccess;

    public readonly T Value;

    public readonly Exception Exception;

    public Result(T value) : this()
    {
        IsSuccess = true;
        Value = value;
    }

    public Result(Exception exception) : this()
    {
        IsSuccess = false;
        Exception = exception;
    }
}

And this is where all of the magic happens inside SelectAsync:

public override void OnPush()
{
    var message = Grab(_stage.In);
    try
    {
        var task = _stage._mapFunc(message);
        var holder = new Holder<TOut>(message, NotYetThere, _taskCallback);
        _buffer.Enqueue(holder);

        // We dispatch the task if it's ready to optimize away
        // scheduling it to an execution context
        if (task.IsCompleted)
        {
            holder.SetElement(Result.FromTask(task));
            HolderCompleted(holder);
        }
        else
            task.ContinueWith(t => holder.Invoke(Result.FromTask(t)),
                TaskContinuationOptions.ExecuteSynchronously);
    }
    catch (Exception e)
    {
    	// ... error handling
    }
  1. Once we get a new TIn “pushed” to use from upstream, we will invoke the user-defined function and receive a new Task<TOut>;
  2. We will check to see if this Task completed quickly and if it has, we will immediately call Holder<TOut>.SetElement without using a continuation Task; and
  3. If the Task is still running, we will schedule a continuation Task and have that call Holder<TOut>.Invoke.

Holder<T>.Invoke ended up being the key detail in this whole piece - remember, these Tasks that SelectAsync is produced are being run as detached Tasks. So when they complete, we’re relying on a little bit of Akka.Streams infrastructure to marshall the results back into our actors in a thread-safe way:

Action<Holder<T>> _taskCallback = GetAsyncCallback<Holder<TOut>>(HolderCompleted);

private void HolderCompleted(Holder<TOut> holder)
{
    var element = holder.Element;
    if (element.IsSuccess)
    {
        if (IsAvailable(_stage.Out))
            PushOne();
        return;
    }
    
    var exception = element.Exception;
    var strategy = _decider(exception);
    Log.Error(exception, "An exception occured inside SelectAsync while executing Task. Supervision strategy: {0}", strategy);
    switch (strategy)
    {
        case Directive.Stop:
            FailStage(exception); // THIS IS WHERE THE PROBLEMS OCCURRED
            break;
        
        case Directive.Resume:
        case Directive.Restart:
            if (IsAvailable(_stage.Out))
                PushOne();
            break;
        
        default:
            throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", exception);
    }
}

The GetAsyncCallback method decorates our Action<TOut> to transform it into an actor message, which gets processed normally inside the Akka.Streams’ actors thread-safe context. This is necessary in order to make sure that the callback function can safely access the Akka.Stream state without needing locks or any synchronization mechanisms.

That callback function is where the exception was getting detected - but here’s the catch: the Exception was null here. This is what created our original ArgumentException error at the start of the post. But how could this occur?

Tasks, “Atomic” struct Assignments, and Lies

There are two critical details at work here:

  1. There is something very fishy about the Holder<T>.Invoke method.
  2. Result<T> is a user-defined struct.

First, let’s take a look at Holder<T>.Invoke with some extra comments:

public void Invoke(Result<T> result)
{
    SetElement(result); // happens inside the `ContinueWith` function's context
    _callback(this); // happens inside the SelectAsync `StageActor` context
    // VERY, VERY LIKELY THAT THESE ARE TWO DIFFERENT THREADS
}

This call is actually split across two different threads of execution - as my comments highlight. This is a potentially unsafe assignment. A no-no.

private static readonly Result<TOut> NotYetThere = Result.Failure<TOut>(new Exception());

var holder = new Holder<TOut>(message, NotYetThere, _taskCallback); 

When we create a brand new Holder<TOut> we initialize its result with a placeholder NotYetThere value, which we will check for whenever the SelectAsync stage gets pulled for output by the stages below it:

else if (_buffer.Peek().Element == NotYetThere)
{
    if (Todo < _stage._parallelism && !HasBeenPulled(inlet))
        TryPull(inlet);
}
else
{
    var holder = _buffer.Dequeue();
    var result = holder.Element;
    if (!result.IsSuccess)
    {
        // this could happen if we are looping in PushOne and end up on a failed Task before the
        // HolderCompleted callback has run
        var strategy = _decider(result.Exception);
        Log.Error(result.Exception, "An exception occured inside SelectAsync while processing message [{0}]. Supervision strategy: {1}", holder.Message, strategy);
        switch (strategy)
        {
            case Directive.Stop:
                FailStage(result.Exception); // FAILURE CAN HAPPEN HERE TOO
                return;

The reason I point this out: we can’t pull an element where Holder<TOut>.Result has not been assigned yet - the NotYetThere check ensures this.

HOWEVER: struct assignment does not work the same way class or “heap” assignment does.

Here’s the rub: remember how I said Result<T> is a struct? Well when you assign a user-defined struct that operation is not atomic! In the words of the great Eric Lippert:

I also talked a bit about how making fields of a struct readonly has no effect on atomicity; when the struct is copied around, it may be copied around four bytes at a time regardless of whether its fields are marked as readonly or not.

There is a larger problem though with reasoning about readonly fields in a struct beyond their non-atomicity. Yes, when you read from readonly fields in a struct on multiple threads without any locking, you can get inconsistent results due to race conditions. But the situation is actually worse than that; readonly fields need not give you results that you think are consistent even on one thread! Basically, readonly fields in a struct are the moral equivalent of the struct author writing a cheque without having the funds to back it3.

So here’s how we ended up with a null Exception that escaped the catch block - the default value of a Result<T> looks like this:

public struct Result<T> : IEquatable<Result<T>>
{
    public readonly bool IsSuccess; // false
 
    public readonly T Value; // null

    public readonly Exception Exception; // null

    public Result()
    {
    }
}

In some instances while the Result<T> value was being assigned on one thread, it was being dirtily / unsafely read on another as its default(Result<T>) value - which gives us a null Exception with IsSuccess == false. That set of conditions would transport us exactly to this error state here:

 if (!result.IsSuccess) // we pass here because `IsSuccess == false`
    {
        var strategy = _decider(result.Exception); // decider does not care if Exception is `null`
        Log.Error(result.Exception, "An exception occured inside SelectAsync while processing message [{0}]. Supervision strategy: {1}", holder.Message, strategy);
        switch (strategy)
        {
            case Directive.Stop:
            // `Exception` is null and gets passed all the way down to InternalOnDownstreamFinish, which blows up
                FailStage(result.Exception); 

Here’s the rub: this “exception” can AND DID occur without a real Exception ever being thrown inside SelectAsync the entire time. It was entirely the dirty assignment + read of the Result<T> struct that created this problem.

Fixing the Heisenbug

This bug was a true Heisenbug in the sense that it was elusive to observe via traditional debugging. The debugger created just enough lag where the two threads could both read the same assigned value for the struct Result<T>.

However, fixing it was quite simple: just do everything inside the AsyncCallback:

private sealed class Holder<T>(object? message, Result<T> element)
{
    public object? Message { get; private set; } = message;
    
    public Result<T> Element { get; private set; } = element;

    public void SetElement(Result<T> result)
    {
        Element = result.IsSuccess && result.Value == null
            ? Result.Failure<T>(ReactiveStreamsCompliance.ElementMustNotBeNullException)
            : result;
    }
}

private readonly Action<(Holder<TOut>, Result<TOut>)> _taskCallback;

_taskCallback = GetAsyncCallback<(Holder<TOut> holder, Result<TOut> result)>
	(t => HolderCompleted(t.holder, t.result));

private void HolderCompleted(Holder<TOut> holder, Result<TOut> result)
{
    // we may not be at the front of the line right now, so save the result for later
    holder.SetElement(result);
    if (result.IsSuccess)
    {
        if (IsAvailable(_stage.Out))
            PushOne();
        return;
    }

    // rest of method
}

If your first impulse upon seeing an issue like this is to build some sort of Rube-Goldberg machine out of synchronization code: stop!

In many cases these types of problems can be solved via simplification - in this instance, moving the assignment and reading of the struct and to inside the same thread.

  1. If you want to learn more about Akka.NET Streams, check out “Why and When to Use Akka.Streams” 

  2. If you don’t care about preserving invocation order and instead would prefer higher throughput, we have a SelectAsyncUnordered stage that does exactly this in Akka.NET streams. 

  3. If this sort of low-level CLI/CLR stuff fascinates you, all three of Eric’s posts in his “Atomicity, volatility and immutability are different” series are excellent. Part 1. Part 2. Part 3

If you liked this post, you can share it with your followers or follow us on Twitter!
Written by Aaron Stannard on March 14, 2025

 

 

Observe and Monitor Your Akka.NET Applications with Phobos

Did you know that Phobos can automatically instrument your Akka.NET applications with OpenTelemetry?

Click here to learn more.

Get the Latest on Akka.NET

Subscribe to stay up to date on the latest happenings with Akka.NET and access to training + live events.

    We won't send you spam. Unsubscribe at any time.
    Oh no, Comentario failed to start.
    If you own this website, you might want to look at the browser console to find out why.

    Get the Latest from Petabridge

    Subscribe to get our latest .NET tutorials, content, and Akka.NET updates by email.

      We respect your privacy. Unsubscribe at any time.