Cancellation with Async F#, C# and the Reactive Extensions

By jay at April 16, 2013 20:25 Tags: , , , ,

TL;DR: C# 5.0 async/await does not include the implicit support for cancellation, and needs to pass CancellationToken instances to every async method. F# and the Reactive Extensions offer solutions to this problem, with both implicit and explicit support for cancellation.

 

My development style has slowly shifted to a more functional approach, during the past year. I’ve been peeking a F# for a while and that shift to a more functional mindset in C# lends me toward understanding a lot better the concepts behind core features of F#, and more specifically the async “support” in F#.

It’s known that F# inspired a lot the implementation of C# async, but having looked at the way it’s been implemented in F# gives me some more points against the “unfinished” implementation in C#.

 

Recently, now that people are effectively using async, in real-world scenarios, problems are starting to bubble up, and some to giggle. Async void, async void lambdas, the fact that continuations run mostly on the UI thread when not taken care of properly, obscure exception handling scenarios, the “magic” relation to the SynchronizationContext, that it does not address parallelism, and one that’s been pretty low-key, cancellation.

Cancellation is pretty important, particularly, when the operations can take a pretty long time. If the user awaits an async result, he may have the right to cancel the operation, either because it took too long, or because the rest of the processing is not needed, as the user has gone away. This avoids wasting precious CPU cycles, and sometimes network or I/O related resources for computations for which the result will not be used.

 

Cancelling a C# async method

Now, consider the following code sample, that tries to cancel the execution of an async method:

public static void Run()
{
    var cts = new CancellationTokenSource();
    Task.Run(async () => await Test(), cts.Token);
    Console.ReadLine();
    cts.Cancel();
    Console.WriteLine("Cancel...");
    Console.ReadLine();
}

private static async Task Test()
{
    while (true)
    {
        await Task.Delay(1000);
        Console.WriteLine("Test...");
    }
}

Which produces the following output :

Test...
Test...
Test...

Cancel...
Test...
Test...
Test...
Test...

The problem here is pretty simple: the cancellation token only cancels the task created by Task.Run(). This can be problematic, particularly if the sub-tasks are computationally intensive, or are making external calls (e.g. an http web request) that effectively need to be cancelled.

With the TPL, the notion of cancellation is present through the concept of CancellationToken. A cancellation token is basically a glorified thread-safe boolean value that tells that it’s creator, a CancellationTokenSource, has been cancelled.

So, there are two things to do to support cancellation in an async method.

First, by passing a CancellationToken explicitly and poll the token frequently :

public static void Run()
{
    var cts = new CancellationTokenSource();
    Task.Run(async () => await Test(cts.Token), cts.Token);
    Console.ReadLine();
    cts.Cancel();
    Console.WriteLine("Cancel...");
    Console.ReadLine();
}

private static async Task Test(CancellationToken token)
{
    while (true)
    {
        await Task.Delay(1000);
        Console.WriteLine("Test...");

        if (token.IsCancellationRequested)
        {
            break;
        }
    }
    Console.WriteLine("Test cancelled");
}

Second, via a delegate passed through CancellationToken.Register:

public static void Run()
{
    var cts = new CancellationTokenSource();
    Task.Run(async () => await Test(cts.Token), cts.Token);
    Console.ReadLine();
    cts.Cancel();
    Console.WriteLine("Cancel...");
    Console.ReadLine();
}

private static async Task Test(CancellationToken token)
{
    var wr = HttpWebRequest.Create("http://1.2.3.4");

    token.Register(() => { 
        Console.WriteLine("Query cancelled");
        wr.Abort(); 
    });

    var r = await Task<WebResponse>.Factory.FromAsync(wr.BeginGetResponse, wr.EndGetResponse, null);

    if (token.IsCancellationRequested)
    {
        return;
    }

    Console.WriteLine("Got a result");
}

There are multiple problems with this approach, one being that the cancellation token parameter needs to appear explicitly in every single method of the call tree. If you have multiple layers of code and abstractions, cancellation becomes pretty much prominent.

Second is, to be thorough, to check the cancellation token as often as possible to avoid continuing doing work if it’s not necessary, even if you've detected that the task has been cancelled.

 

Cancelling an F# async function

With F#, async is actually a “side-effect” of the more powerful feature called the Computation Expressions. Naively, I see this a super-feature that allows the creation of some of the syntactic sugar based-features of C#, async and the iterators.

Async support takes the form of an Async type, that allows the creation of an “async” computation expression that, in turn, allows for the manipulation of Async<T> returning expressions. If this sounds familiar, you’re spot-on, this is roughly the same as Task<T> and C# async/await.

Now, the interesting thing about these Async<T> expressions is that when created, Async<T> functions respect the cancellation token used when starting the async expression :

open System
open System.Threading
let main argv = 

    let myAsync = 
        async {
            while true do 
                do! Async.Sleep(1000)
                Console.WriteLine(DateTime.Now)
        }

    let tokenSource1 = new System.Threading.CancellationTokenSource()
    let val1 = Async.Start(myAsync, cancellationToken=tokenSource1.Token)    
    Console.ReadLine() |> ignore
    tokenSource1.Cancel()
    Console.ReadLine() |> ignore
    0

When cancelled, the async expression stops processing its content, without any explicit support of the token.

But the most interesting part in this is that this token flows through to the other async expressions !

To better demonstrate this, there is the ability to register a function that will be called when the async expression is called :

let main argv = 

    let r2 = 
        async {
            use! c = Async.OnCancel(fun () -> Console.WriteLine("Cancelled 1"))

            // In case there is a non async computation that needs 
            // to check on cancellation
            let! token = Async.CancellationToken
            
            while true do 
                do! Async.Sleep(1000)
                Console.WriteLine(DateTime.Now)

            return 42
        }

    let r = 
        async {
            use! c = Async.OnCancel(fun () -> Console.WriteLine("Cancelled 2"))

            let! test = r2

            while true do 
                do! Async.Sleep(100)

            return 42
        }

    let tokenSource1 = new System.Threading.CancellationTokenSource()
    let res3 = Async.StartAsTask(r, cancellationToken=tokenSource1.Token)    
    Console.ReadLine() |> ignore
    tokenSource1.Cancel()
    Console.ReadLine() |> ignore
    0

This way, there is no explicit need for passing around a cancellation token, and if it is needed, then it can be used explicitly. There is also the ability to asynchronously get the ambient CancellationToken (which is also async !), in case there is a CPU bound computation that needs to be cancelled. Or you can just call any ! (bang) operator that will automatically check on the cancellation.

Pretty powerful !

Cancelling an Rx query

On the same topic of cancellation, the Reactive Extensions also have the notion of cancellation embedded into the flow.

public static void Run()
{
    var s = Observable.Create(o =>
        {
            var wr = HttpWebRequest.Create("http://1.2.3.4");

            var subscriptions = new CompositeDisposable();

            subscriptions.Add(Disposable.Create(() => wr.Abort()));

            var s = Observable
                .FromAsyncPattern(wr.BeginGetResponse, wr.EndGetResponse)()
                .Subscribe(o);

            subscriptions.Add(s);

            return subscriptions;
        }
    )
    .Subscribe(_ => Console.WriteLine("Got result"));

    Console.ReadLine();
    s.Dispose();
    Console.ReadLine();
}

Every observable can be subscribed to, and returns a disposable instance. Whenever the final subscription gets disposed, the whole query gets disposed as well, along with the ability to intercept that disposition with the Observable.Create operator.

 

Final words…

Cancellation is an important topic that has been “forgotten” in C# async. This is a very important topic, and this one adds up to all the others about the implementation in C# 5.0, for which I hope this will get fixed in a future release in the language.

For the moment, both Rx and F# approach async in a more thorough manner, and moreover, attack head-on the concurrency side of asynchrony which is ignored by C# 5.0...

If you're interested in a more detailed comparison of F# and C# on the subject, read Tomas Petricek blog post series.

blog comments powered by Disqus

About me

My name is Jerome Laban, I am a Software Architect, C# MVP and .NET enthustiast from Montréal, QC. You will find my blog on this site, where I'm adding my thoughts on current events, or the things I'm working on, such as the Remote Control for Windows Phone.