Friday, September 30, 2011

Making WCF Web Async Apis

One of the hidden gems that you can get today from the WCF Web Api source code in codeplex (wcf.codeplex.com) is the support for the task-based asynchronous model introduced in .NET 4.0. The new model simplifies a lot the way you do asynchronous stuff in .NET, and it’s going to get even better with the async support in C# 5.
The idea here is that you represent an asynchronous operation with a Task<T> class, where T is the resulting type. There is no need to call an End operation to retrieve the result anymore, as the result can be obtained from the task itself. You also get some of other cool features for free like support for cancellations, exception handling and operation composition.
With the recent addition of this new model to WCF, making async operations in a service is just a matter of returning a Task<T> class, and WCF will take care of the rest for us Smile. The following code illustrates how an async operation looks like
[WebGet(UriTemplate = "contacts")]
public Task<HttpResponseMessage> Contacts()
{
// Create an HttpClient (we could also reuse an existing one)
HttpClient client = new HttpClient();
 
// Submit GET requests for contacts and return task directly
return client.GetAsync(backendAddress + "/contacts");
}
This code is available as part of the TaskAsync sample also included in the source code. In that code, the operation is basically calling an external service using http in an asynchronous manner. That’s pretty cool, isn’t it ?. What about invoking a query against a database and returns the results asynchronously in a service operation ?. ADO.NET supports an asynchronous model already, but Entity Framework still does not. Nothing that can not be resolved easily with an extension method in IQueryable,
public static class AsyncExtensions
{    
public static Task<IEnumerable<T>> AsAsync<T>(this IQueryable<T> source) where T : EntityObject
{
var query = (ObjectQuery<T>)source;        
var cmd = new SqlCommand();
cmd.Connection = (SqlConnection)((EntityConnection)query.Context.Connection).StoreConnection;
cmd.CommandText = query.ToTraceString(); 
cmd.Parameters.AddRange(query.Parameters.Select(x => new SqlParameter(x.Name, x.Value ?? DBNull.Value)).ToArray());        
cmd.Connection.ConnectionString = new SqlConnectionStringBuilder(cmd.Connection.ConnectionString)
{            
AsynchronousProcessing = true        
}.ToString();        
cmd.Connection.Open();
 
var tcs = new TaskCompletionSource<IEnumerable<T>>();
 
Task.Factory.FromAsync<SqlDataReader>(cmd.BeginExecuteReader(), cmd.EndExecuteReader).ContinueWith(task =>
{
if (task.IsFaulted) tcs.TrySetException(task.Exception.InnerExceptions);
else if (task.IsCanceled) tcs.TrySetCanceled();
else tcs.TrySetResult(query.Context.Translate<T>(task.Result).Cast<T>());
});
 
return tcs.Task;
}
}
This extension method allows to do things like context.Customers.Where(c => c.Id == 1).AsAsync() to execute an async query against EF to return a customer with ID equals to 1.
Ok, so far we have an extension method that executes a query asynchronously, so now, it’s a matter of returning that resulting task in our operation.
[ServiceContract]
public class CustomerResource
{
[WebGet]
public Task<IEnumerable<CustomerDTO>> GetAllCustomers()
{
var model = new NorthwindEntities();
 
return model.Customers.AsAsync()
.ToApm(t => t.Result.Select(c => new CustomerDTO { Id = c.CustomerID, Name = c.ContactName }));
}
}
Two things to notice in the code above.
  1. I am doing a projection to return a dto and not the original entity, which I might not want to serialize completely on the wire
  2. I am doing a task continuation or composition through the “ToApm” extension method. For illustrative purposes, that code means I want to do a projection right after the query execution is completed and return that.
The code for the “ToApm” extension method is quite simple as well.
public static class TaskExtensions
{
public static Task<TResult> ToApm<TResult, TSource>(this Task<TSource> task, Func<Task<TSource>, TResult> action)
{
var tcs = new TaskCompletionSource<TResult>();
 
task.ContinueWith(delegate
{
if (task.IsFaulted) tcs.TrySetException(task.Exception.InnerExceptions);
else if (task.IsCanceled) tcs.TrySetCanceled();
else tcs.TrySetResult(action(task));
 
}, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
 
return tcs.Task;
}
}
It does a continuation on the original task passed as argument and also verifies whether the task was cancelled or some exception occurred before returning the result.
Go and grab the WCF source in codeplex to start playing with these cool features today