Programming with C# asynchronous sequences [C#]
Tomas Petricek, in his last blog post titled “Programming with F# asynchronous sequences,” presents an F# implementation of something called asynchronous sequences. In this post, I will show you how the same concept can be implemented in C#. Let’s look at the sample code below to understand better what the asynchronous sequence is: Tomas Petricek, in his last blog post titled “Programming with F# asynchronous sequences,” presents an F# implementation of something called asynchronous sequences. In this post, I will show you how the same concept can be implemented in C#. Let’s look at the sample code below to understand better what the asynchronous sequence is: Tomas Petricek, in his last blog post titled “Programming with F# asynchronous sequences,” presents an F# implementation of something called asynchronous sequences. In this post, I will show you how the same concept can be implemented in C#. Let’s look at the sample code below to understand better what the asynchronous sequence is: Tomas Petricek, in his last blog post titled “Programming with F# asynchronous sequences,” presents an F# implementation of something called asynchronous sequences. In this post, I will show you how the same concept can be implemented in C#. Let’s look at the sample code below to understand better what the asynchronous sequence is:
IEnumerable<...> AsyncSeq()
{
yield return "Hello";
await TaskEx.Delay(100);
yield return "world!";
}
An asynchronous sequence is a code that produces the sequence of values generated on demand (this is how the IEnumerable
interface can be interpreted), but additionally does some asynchronous work during the evaluation process (await
keyword). Every time the client of the asynchronous sequence calls the MoveNext
method, the next value is evaluated. The key feature here is that the client decides when to produce the next value and when to stop the processing.
There are two problems with such an implementation of an asynchronous sequence. Sequences in the .NET world are represented with the IEnumerable
interface, which allows only synchronous processing. Since the MoveNext
method returns a bool
value in the interface implementation, we need to immediately decide whether the next value can be produced or not. Providing such information in asynchronous processing can take a few seconds or even minutes. The second problem is that we cannot mix await
keyword (Async CTP) with yield return/yield
break keywords inside the same method body. My solution resolves those two problems, and the above sequence can be implemented in the following way:
IEnumerable<AsyncSeqItem<string>> AsyncSeq()
{
yield return "Hello";
yield return TaskEx.Delay(100);
yield return "world!";
}
public enum AsyncSeqItemMode
{
Value, Task, Sequence
}
public class AsyncSeqItem<T>
{
public AsyncSeqItemMode Mode { get; private set; }
public T Value { get; private set; }
public Task Task { get; private set; }
public IEnumerable<AsyncSeqItem<T>> Seq { get; private set; }
public AsyncSeqItem(T value)
{
Value = value;
Mode = AsyncSeqItemMode.Value;
}
public AsyncSeqItem(Task task)
{
Task = task;
Mode = AsyncSeqItemMode.Task;
}
public AsyncSeqItem(IEnumerable<AsyncSeqItem<T>> seq)
{
Seq = seq;
Mode = AsyncSeqItemMode.Sequence;
}
public static implicit operator AsyncSeqItem<T>(T value)
{
return new AsyncSeqItem<T>(value);
}
public static implicit operator AsyncSeqItem<T>(Task task)
{
return new AsyncSeqItem<T>(task);
}
}
AsyncSeqItem
represents one of the following three values:
- Value – next value generated by the sequence
- Task – some asynchronous work that needs to be awaited for before going forward
- Sequence – it’s used with recursive calls, and it means that we want to use tail recursion
There are two ways of consuming such a sequence in the client:
public static class AsyncSeqExtensions
{
public static IEnumerable<Task<Option<T>>> ToTaskEnumerable<T>(this IEnumerable<AsyncSeqItem<T>> seq, bool continueOnCapturedContext = true)
{ ... }
public static IAsyncEnumerable<T> ToAsyncEnumerable<T>(this IEnumerable<AsyncSeqItem<T>> seq, bool continueOnCapturedContext = true)
{ ... }
}
public class Option<T>
{
public T Value { get; private set; }
public bool HasValue { get; private set; }
public Option()
{
HasValue = false;
}
public Option(T value)
{
Value = value;
HasValue = true;
}
public static implicit operator Option<T>(T value)
{
return new Option<T>(value);
}
}
In the first approach, we are calling ToAsyncEnumerable
extension method returning the sequence of tasks. Each task wraps a special type called Option<T>
, which can be used similarly to Nullable<T>
type except that it works with value and reference types. Returning a task with an option object without a value means that we reached the end of the sequence. I also provide a few standard LINQ operators built on top of such a sequence semantics:
public static class AsyncSeqExtensions
{
async private static Task ForEachTaskImpl<T>(this IEnumerable<Task<Option<T>>> seq, Action<Task<Option<T>>> action)
{
foreach (var task in seq)
{
await task;
action(task);
}
}
public static Task ForEachTask<T>(this IEnumerable<Task<Option<T>>> seq, Action<Task<Option<T>>> action)
{
return ForEachTaskImpl(seq, action);
}
public static Task ForEach<T>(this IEnumerable<Task<Option<T>>> seq, Action<T> action)
{
return seq.ForEachTask(task =>
{
if(task.Result.HasValue)
action(task.Result.Value);
});
}
async private static Task<T[]> ToArrayImpl<T>(IEnumerable<Task<Option<T>>> seq)
{
var list = new List<T>();
await seq.ForEach(v => list.Add(v));
return list.ToArray();
}
public static Task<T[]> ToArray<T>(this IEnumerable<Task<Option<T>>> seq)
{
return ToArrayImpl(seq);
}
public static IEnumerable<Task<Option<TResult>>> Select<T, TResult>(this IEnumerable<Task<Option<T>>> source,
Func<T,TResult> selector) { ... }
public static IEnumerable<Task<Option<T>>> Where<T>(this IEnumerable<Task<Option<T>>> source,
Func<T, bool> predicate) { ... }
public static IEnumerable<Task<Option<T>>> Take<T>(this IEnumerable<Task<Option<T>>> source,
int count) { ... }
...
}
Returning an additional task object at the end of a sequence with a special value allows us to use the standard IEnumerable<T>
interface, but it’s inconvenient. In the second approach, we use the IAsyncEnumerable
interface from the Reactive Framework library released some time ago.
public interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<T> GetEnumerator();
}
public interface IAsyncEnumerator<out T> : IDisposable
{
Task<bool> MoveNext();
T Current { get; }
}
public static class AsyncEnumerable
{
public static IAsyncEnumerable<TResult> Select<TSource, TResult>(IAsyncEnumerable<TSource> source,
Func<TSource, TResult> selector) { ... }
public static IAsyncEnumerable<TSource> Where<TSource>(IAsyncEnumerable<TSource> source,
Func<TSource, bool> predicate) { ... }
public static IAsyncEnumerable<TSource> Take<TSource>(IAsyncEnumerable<TSource> source,
int n) { ... }
}
This interface perfectly represents the semantics of an asynchronous sequence. The Rx library also provides many standard LINQ operations, such as Where, Select, Take, Sum, First, and so on. This allows us to write almost any LINQ query executing on top of an asynchronous sequence.
Now, let’s summarize what we have achieved so far. We can write imperative code implementing an asynchronous sequence. We can use an extension method to create one of two asynchronous sequence representations. Finally, we can iterate through all items in such a sequence, or build a new sequence object using LINQ operators.
The C# version of the web crawler presented in Tomas Petricek’s blog post could look like this:
public static class AsyncSeqSample
{
async public static Task CrawlBingUsingAsyncEnumerable()
{
await RandomCrawl("http://news.bing.com")
.ToAsyncEnumerable()
.Where(t => !t.Item1.Contains("bing.com"))
.Select(t => t.Item2)
.Take(10)
.ForEach(Console.WriteLine);
Console.WriteLine("the end...");
}
async public static Task CrawlBingUsingTaskEnumerable()
{
await RandomCrawl("http://news.bing.com")
.ToTaskEnumerable()
.Where(t => !t.Item1.Contains("bing.com"))
.Select(t => t.Item2)
.Take(10)
.ForEach(Console.WriteLine);
Console.WriteLine("the end...");
}
public static IEnumerable<AsyncSeqItem<Tuple<string, string>>> RandomCrawl(string url)
{
return RandomCrawlLoop(url, new HashSet<string>());
}
private static IEnumerable<AsyncSeqItem<Tuple<string,string>>> RandomCrawlLoop(string url, HashSet<string> visited)
{
if (visited.Add(url))
{
var downloadTask = DownloadDocument(url);
yield return downloadTask;
if (downloadTask.Result.HasValue)
{
var doc = downloadTask.Result.Value;
yield return Tuple.Create(url, GetTitle(doc));
foreach (var link in ExtractLinks(doc))
{
foreach (var l in RandomCrawlLoop(link, visited))
{
yield return l;
}
}
}
}
}
private static string[] ExtractLinks(HtmlDocument doc)
{
try
{
var q = from a in doc.DocumentNode.SelectNodes("//a")
where a.Attributes.Contains("href")
let href = a.Attributes["href"].Value
where href.StartsWith("http://")
let endl = href.IndexOf('?')
select endl > 0 ? href.Substring(0, endl) : href;
return q.ToArray();
}
catch
{
return new string[0];
}
}
async private static Task<Option<HtmlDocument>> DownloadDocument(string url)
{
try
{
var client = new WebClient();
var html = await client.DownloadStringTaskAsync(url);
var doc = new HtmlDocument();
doc.LoadHtml(html);
return new Option<HtmlDocument>(doc);
}
catch (Exception)
{
return new Option<HtmlDocument>();
}
}
private static string GetTitle(HtmlDocument doc)
{
var title = doc.DocumentNode.SelectSingleNode("//title");
return title != null ? title.InnerText.Trim() : "Untitled";
}
}
Now let’s see how ToAsyncEnumerable
and ToTaskEnumerable
methods have been implemented:
public static class AsyncSeqExtensions
{
public static IAsyncEnumerable<T> ToAsyncEnumerable<T>(this IEnumerable<AsyncSeqItem<T>> seq, bool continueOnCapturedContext = true)
{
if (seq == null) throw new ArgumentNullException("seq");
return new AnonymousAsyncEnumerable<T>(() =>
{
var enumerator = seq.ToTaskEnumerable(continueOnCapturedContext).GetEnumerator();
seq = null; // holding reference to seq parameter introduces memory leaks when asynchronous sequence uses recursive calls
TaskCompletionSource<bool> currentTcs = null;
Task<Option<T>> currentTask = null;
return new AnonymousAsyncEnumerator<T>(
() =>
{
currentTcs = new TaskCompletionSource<bool>();
if (CheckEndOfSeq(currentTask) == false)
{
currentTcs.SetResult(false);
return currentTcs.Task;
}
enumerator.MoveNext();
enumerator.Current.ContinueWith(t =>
{
if (t.IsFaulted)
{
currentTcs.SetException(t.Exception);
}
else
{
if (!t.Result.HasValue)
{
currentTcs.SetResult(false);
}
else
{
currentTask = t;
currentTcs.SetResult(true);
}
}
});
return currentTcs.Task;
},
() => currentTask.Result.Value
);
});
}
public static IEnumerable<Task<Option<T>>> ToTaskEnumerable<T>(this IEnumerable<AsyncSeqItem<T>> seq, bool continueOnCapturedContext = true)
{
if (seq == null) throw new ArgumentNullException("seq");
return new AnonymousEnumerable<Task<Option<T>>>(() =>
{
var synchronizationContext = continueOnCapturedContext ? SynchronizationContext.Current : null;
var enumerator = seq.GetEnumerator();
seq = null; // holding reference to seq parameter introduces memory leaks when asynchronous sequence uses recursive calls
TaskCompletionSource<Option<T>> currentTcs = null;
return new AnonymousEnumerator<Task<Option<T>>>(
() =>
{
if (CheckEndOfSeq(currentTcs) == false)
return false;
currentTcs = new TaskCompletionSource<Option<T>>();
Action moveNext = null;
moveNext = () =>
{
Start:
bool b;
try
{
b = enumerator.MoveNext();
}
catch (Exception exception)
{
currentTcs.SetException(exception);
return;
}
if (b == false)
{
currentTcs.SetResult(new Option<T>());
}
else
{
var c = enumerator.Current;
if (c.Mode == AsyncSeqItemMode.Value)
{
currentTcs.SetResult(c.Value);
}
else if (c.Mode == AsyncSeqItemMode.Task)
{
if (synchronizationContext != null)
c.Task.ContinueWith(_ => synchronizationContext.Post(s => ((Action)s)(), moveNext));
else
c.Task.ContinueWith(_ => moveNext());
}
else if (c.Mode == AsyncSeqItemMode.Sequence)
{
enumerator = c.Seq.GetEnumerator();
goto Start;
}
}
};
moveNext();
return true;
},
() => currentTcs.Task
);
});
}
}
As you can see, the implementation is straightforward, but the whole concept of asynchronous sequence is compelling.