# Monday, June 10, 2013

A "Mini" workflow engine. Part 4: "out-of-order" composite activities, internal bookmarks, and execution.

Last time, we have seen how a sequence of activities works with internal bookmarks to schedule the execution of the next step, introducing a queue to hold these bookmarks.
We left with a question: what if execution is not sequential?
In fact, one of the advantages of an extensible workflow framework is the ability to define custom statements; we are not limited to Sequence, IfThenElse, While, and so on but we can invent our own, like Parallel:

public class Parallel : CompositeActivity
{
    protected internal override ActivityExecutionStatus Execute(WorkflowInstanceContext context) {
        foreach (var activity in children)        
           context.RunProgramStatement(activity, this.ContinueAt);       
        return ActivityExecutionStatus.Executing;
    }

    private void ContinueAt(WorkflowInstanceContext context, object arg) {
        foreach (var activity in children)        
            //Check every activity is closes...
            if (activity.ExecutionStatus != ActivityExecutionStatus.Closed)
                return;
        
         CloseActivity(context);
     }
}

And even have different behaviours here (like speculative execution: try several alternative, choose the one that complete first).

Clearly, a queue does not work with such a statement. We can again use delegates to solve this issue, using a pattern that is used in several other scenarios (e.g. forms management).
When a composite activity schedules a child activity for execution, it subscribes to the activity, to get notified of its completion (when the activity will finish and become "closed"). Upon completion, it can then resume execution (the "internal bookmark" mechanism).

child.Close += this.ContinueAt;
Basically, we are moving the bookmark and (above all) its continuation to the activity itself. It will be the role of CloseActivity to find the delegate and fire it. This is exactly how WF works...

context.CloseActivity() {
  ...
  if (currentExecutingActivity.Close != null)
     currentExecutingActivity.Close(this, null);

}
... and it is the reason why WF needs to keep explicit track of the current executing activity.

Personally, I find the "currentExecutingActivity" part a little unnerving. I would rather say explicitly which activity is closing:

context.CloseActivity(this);
Moreover, I would enforce a call to CloseActivity with the correct argument (this)

public class Activity {
   private CloseActivity() {
      context.CloseActivity(this); //internal
Much better!

Now we almost everything sorted out; the only piece missing is about recursion. Remember, when we call the continuation directly, we end up with a recursive behaviour. This may (or may not) be a problem; but what can we do to make the two pieces really independent?

We can introduce a simple execution queue:

public class ExecutionQueue
{
    private readonly BlockingCollection<Action> workItemQueue = new BlockingCollection<Action>();

    public void Post(Action workItem) {
           ...
    }
}
Now, each time we have a continuation (from an internal or external bookmark), we post it to the execution queue. This makes the code for RunActivity much simpler:

internal void RunProgramStatement(Activity activity, Action<WorkflowContext, object> continueAt)
{
    logger.Debug("Context::RunProgramStatement");

    // Add the "bookmark"
    activity.Closed = continueAt;

    // Execute the a activity
    Debug.Assert(activity.ExecutionStatus == ActivityExecutionStatus.Initialized);
    ExecuteActivity(activity);
} 
As you may have noticed, we are giving away a little optimization; a direct call when the activity just completes should be much faster.
Also, why would we need a queue? We could do just fine using Tasks:

Task.Factory.
    StartNew(() => activity.Execute(this)).
    ContinueWith((result) =>
    {
        // The activity already completed?
        if (result.Result == ActivityExecutionStatus.Closed)
        continueAt(this, null);
        else
        {
        // if it didn't complete, an external bookmark was created
        // and will be resumed. When this will happen, be ready!
        activity.OnClose = continueAt;
        }
    });


internal void ResumeBookmark(string bookmarkName, object payload) {            
    ...
    Task.Factory.
        StartNew(() => bookmark.ContinueAt(this, payload));
}

internal void CloseActivity(Activity activity) {
    if (activity.OnClose != null)
    Task.Factory.
        StartNew(() => activity.OnClose(this, null));
}
Which makes things nicer (and potentially parallel too). Potentially, because in a sequential composite activity like Sequence, a new Task will be fired only upon completion of another. Things for the Parallel activity, however, will be different. We will need to be careful with shared state and input and output arguments though, as we will see in a future post.

Are there other alternatives to a (single threaded) execution queue or to Tasks?

Well, every time I see this kind of pause-and-resume-from-here kind of execution, I see an IEnumerable. For example, I was really fond of the Microsoft CCR and how it used IEnumerable to make concurrent, data-flow style asynchronous execution easier to read:

    int totalSum = 0;
    var portInt = new Port<int>();

    // using CCR iterators we can write traditional loops
    // and still yield to asynchronous I/O !
    for (int i = 0; i < 10; i++)
    {
        // post current iteration value to a port
        portInt.Post(i);

        // yield until the receive is satisifed. No thread is blocked
        yield return portInt.Receive();               
        // retrieve value using simple assignment
        totalSum += portInt;
    }
    Console.WriteLine("Total:" + totalSum);

(Example code from MSDN)
I used the CCR and I have to say that it is initially awkward, but then it is a style that grows on you, it is a very efficient, and it was way of representing asynchronous with a "sequential" look, well before async/await.

The code for ReadLine would become much clearer:

public class ReadLine : Activity
{
    public OutArgument<string> Text = new OutArgument<string>();       

    protected override IEnumerator<Bookmark> Execute(WorkflowInstanceContext context, object value)
    {
        // We need to wait? We just yield control to our "executor"
        yield return new Bookmark(this);

        this.Text.Value = (string)value;

        // Finish!
        yield break;
        // This is like returning ActivityExecutionStatus.Close,
        // or calling context.CloseActivity();            
    }
}
Code for composite activities like sequence could be even cleaner:

protected internal override IEnumerator<Bookmark> Execute(WorkflowInstanceContext context)
{
    // Empty statement block
    if (children.Count == 0)
    {
        yield break;
    }
    else
    {
        foreach (var child in children)
        {
            foreach (var bookmark in context.RunProgramStatement(child))
                yield return bookmark;
        }                
    }
}

Or, with some LINQ goodness:

return children.SelectMany(child => context.RunProgramStatement(child));
The IEnumerable/yield pair have a very simple, clever implementation; it is basically a compiler-generated state machine. The compiler transforms your class to include scaffolding and memorize in which state the code is, and jump execution to the right code when the method is invoked.

But remember, our purpose is exactly to save and persist this information: will this state machine be serialized as well?
According to this StackOverflow question, the answer seem to be positive, with the appropriate precautions; also, it seems that the idea is nothing new (actually, it is 5 years old!)
http://dotnet.agilekiwi.com/blog/2007/05/implementing-workflow-with-persistent.html

...and it also seems that the general thought is that you cannot do reliably.

In fact, I would say you can: even if you do not consider that 5 years passed and the implementation of yield is still the same, the CLR will not break existing code. At IL level, the CLR just see some classes, one (or more) variables that holds the state, and a method with conditions based on those variables. It is not worse (or better) that serializing delegates, something we are already doing.

The iterator-based workflow code sits in a branch of my local git, and I will give it a more serious try in the future; for now, let's play on the safe side and stick to the Task-based execution.

Until now, we mimicked quite closely the design choices of WF; I personally like the approach based on continuations, as I really like the code-as-data philosophy. However the serialization of a delegate can hardly be seen as "code-as-data": the code is there, in the assembly; it's only status information (the closure) that is serialized, and the problem is that you have little or no control over these compiler generated classes.

Therefore, even without IEnumerable, serialization of our MiniWorkflow looks problematic: at the moment, it is tied to BinarySerializer. I want to break free from this dependency, so I will move towards a design without delegates.

Next time: bookmarks without delegates

#    Comments [0] |