Introduction
Azure Durable Functions is an extension that allows for stateful workflows to be executed in the Azure Functions serverless compute environment. This extension handles the state, checkpoints and restarts of the workflows for you.
This post supplements the Application Patterns guidance specifically around kicking off an undefined amount of processes using the Fan out/fan in pattern and assumes working knowledge of Azure Durable Functions.
First, we will go through the defined pattern and then we will make some real world scenario tweaks to keep your application from being overwhelmed with Azure Durable Functions raw scaling power. There is a potential to crush your backend system or simply have all or most of your activities fail if you are working with large datasets.
Finally, we will go through an example of how to recover and replay an orchestration from a transient error. In this example, CosmosException 429 (Too Many Requests) exception handler will be used to represent F2.
Fan out/fan in pattern
(taken from docs.microsoft.com)
[FunctionName("FanOutFanIn")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var parallelTasks = new List<Task<int>>();
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
for (int i = 0; i < workBatch.Length; i++)
{
Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
parallelTasks.Add(task);
}
await Task.WhenAll(parallelTasks);
// Aggregate all N outputs and send the result to F3.
int sum = parallelTasks.Sum(t => t.Result);
await context.CallActivityAsync("F3", sum);
}
The fan-out work is distributed to multiple instances of the F2 function. The work is tracked by using a dynamic list of tasks. Task.WhenAll is called to wait for all the called functions to finish. Then, the F2 function outputs are aggregated from the dynamic task list and passed to the F3 function.
The automatic checkpointing that happens at the await call on Task.WhenAll ensures that a potential midway crash or reboot doesn't require restarting an already completed task.
Now, what if you have 1,000, 5,000, 10,000 or 1,000,000 tasks (each being an instance of the F2 Durable Task Activity) in the parallelTasks? If F2 is making a database insert, update, and query or http request, you better have an awesome backend system or more than likely you are going to start getting the dreaded 429, Too Many Request errors. Not only that, but then your load will spike and have a long tail as your error handling and back offs kick in which is just going to make your workflow take longer to complete. As opposed to the alternate fan out/in where there is a steady pool of executing tasks where tasks are added to the pool as others finish to keep the function cpus highly utilized and ultimately, your job completes faster with fewer errors.
Fan Out/In Alternative
The ideal scenario is you want to define a task pool defined by how many F2 tasks in parallel your backend can safely handle. Only add task as other tasks have completed. We can force the orchestration to pause adding tasks to the pool using a Durable Task Timer. We will also force a checkpoint on the completed tasks before will execute the timer to pause the tight for loop.
[FunctionName("FanOutFanIn")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
//Define your task batch size
int batchSize = 200;
var parallelTasks = new List<Task<int>>();
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
for (int i = 0; i < workBatch.Length; i++)
{
Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
parallelTasks.Add(task);
//Pause orchestration if Running tasks are greater than or equal the batchsize
while (parallelTasks.Count(t => !t.IsCompleted) >= batchSize)
{
//Checkpoint completed tasks
await Task.WhenAll(parallelTasks.Where(t => t.IsCompleted));
//Put the orchestration to bed for 15 seconds
DateTime deadline = context.CurrentUtcDateTime.Add(TimeSpan.FromSeconds(15));
//Create the durable task timer and wait
await context.CreateTimer(deadline, CancellationToken.None);
}
}
await Task.WhenAll(parallelTasks);
// Aggregate all N outputs and send the result to F3.
int sum = parallelTasks.Sum(t => t.Result);
await context.CallActivityAsync("F3", sum);
}
Note
Ideally, you could put the batchSize in the application confguration so it can be updated without needing a new deployment.
//Define your task batch size
int batchSize = int.Parse(Environment.GetEnvironmentVariable("batchSize"));
Replay Transient Errors (like 429s)
Let's say your load isn't very predicatable and during high volume days you are still experiencing throttling on the backed causing F2 to fail with a 429, Too Many Requests error from Cosmos DB.
Warning
This one is a little tricky, we can introduce a timer and the ContinueAsNew method into a refactored F2. F2 will now become an Orchestration instead of an Activity and Cosmos DB code will now live in CosmosFunction. ContinueAsNew will allow the F2 Orchestration to discard the result of the Orchestration and replay it after a backoff timer has completed.
Changes in the FanOutFanIn Function code
//Original
Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
//Change to call the F2 Orchestration
Task<int> task = context.CallSubOrchestrationAsync<int>("F2", workBatch[i]);
Introducing the F2 Orchestration
The F2 orchestration now calls a function Activity named CosmosFunction that, you guessed it, calls a Cosmos DB to return the integer. We are wrapping the CosmosFunction in a try/catch where upon a 429 error, it will set a backoff durable function timer at a random interval in seconds and then clear the orchestration history and replay the F2 orchestration with the ContinueAsNew method.
Note
Notice we needed to refactor F2 into an Orchestration from an activity. Only Orchestrations can be replayed with ContinueAsNew.
[FunctionName("F2")]
public async Task<int> RunF2(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
object input = context.GetInput<object>();
try
{
return await context.CallActivityAsync<int>("CosmosFunction", input);
}
catch (CosmosException cosmosEx)
{
if (cosmosEx.StatusCode == System.Net.HttpStatusCode.TooManyRequests)
{
Random random = new Random(DateTime.UtcNow.Millisecond);
int next = random.Next(5, 90);
DateTime backoff = context.CurrentUtcDateTime.Add(TimeSpan.FromSeconds(next));
await context.CreateTimer(backoff, CancellationToken.None);
context.ContinueAsNew(input);
}
else throw;
}
}
Code Complete
[FunctionName("FanOutFanIn")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
//Define your task batch size
int batchSize = 200;
var parallelTasks = new List<Task<int>>();
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
for (int i = 0; i < workBatch.Length; i++)
{
Task<int> task = context.CallSubOrchestrationAsync<int>("F2", workBatch[i]);
parallelTasks.Add(task);
//Pause orchestration if Running tasks are greater than or equal the batchsize
while (parallelTasks.Count(t => !t.IsCompleted) >= batchSize)
{
//Checkpoint completed tasks
await Task.WhenAll(parallelTasks.Where(t => t.IsCompleted));
//Put the orchestration to bed for 15 seconds
DateTime deadline = context.CurrentUtcDateTime.Add(TimeSpan.FromSeconds(15));
//Create the durable task timer and wait
await context.CreateTimer(deadline, CancellationToken.None);
}
}
await Task.WhenAll(parallelTasks);
// Aggregate all N outputs and send the result to F3.
int sum = parallelTasks.Sum(t => t.Result);
await context.CallActivityAsync("F3", sum);
}
[FunctionName("F2")]
public async Task<int> RunF2(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
object input = context.GetInput<object>();
try
{
return await context.CallActivityAsync<int>("CosmosFunction", input);
}
catch (CosmosException cosmosEx)
{
if (cosmosEx.StatusCode == System.Net.HttpStatusCode.TooManyRequests)
{
Random random = new Random(DateTime.UtcNow.Millisecond);
int next = random.Next(5, 90);
DateTime backoff = context.CurrentUtcDateTime.Add(TimeSpan.FromSeconds(next));
await context.CreateTimer(backoff, CancellationToken.None);
context.ContinueAsNew(input);
}
else throw;
}
}
Summary
Recap of the control Orchestration FanOutFanIn that fans out/in sub Orchestration F2 and results in calling F3. F2 calls the CosmosFunction that will calculate a random backoff interval and wait before restarting the F2 Orchestration with ContinueAsNew if it encounters a 429, Too Many Requests error.
- Alternate fan out/in method that produces a more consistent workload.
- Transient error handling via orchestration with durable timer backoff and orchestration replay using ContinueAsNew
Open a discussion Tweet