Introduction to the Reactive Extensions for JavaScript – Going Parallel with ForkJoin

This past week there was a new release of the Reactive Extensions for JavaScript which includes many of the changes I’ve been talking about lately including the third-party library integration, aggregates and joins which I covered in the previous posts.  This time, I’m going to look at how we can run some observable sequence using the fork/join pattern in JavaScript, to attain some cooperative multi-tasking.

Before we get started, let’s get caught up to where we are today:

Going Parallel with ForkJoin

One common asynchronous pattern that is often used is called fork/join where we start with a specified number of tasks and later we join together all the work.  Let’s first take a look at a simple model of what we’re trying to do.  Say we want to have the ability to load a few scripts in JavaScript?  Typically, we’d have an array of sources we want to visit and then we’d process each in turn and then return the overall result as an array.

var results = [];
var urls = [url1, url2, url3];

for(var task in tasks) {
    results.push(loadScript(tasks[task]));
}

The problem with this approach is that it is a blocking scenario in that we must wait for each one to execute in turn, so the browser can do nothing else in the mean time.  Just as well, each of our operations do not depend on the other, so we could, hypothetically, run them in cooperation with each other if we had a good way of doing so.  This would easily qualify as an embarrassingly parallel solution.

But how do we do it?  After all, we want to run these as non-blocking calls and we want to run them in parallel.  With the asynchronous behavior, that brings any number of issues such as error handling, cancellation checking and so forth.  Luckily, with the Reactive Extensions for JavaScript, we have that part covered with the ability to compose asynchronous operations together.  But, what about running them in parallel?  That’s where the function ForkJoin comes into play.

The ForkJoin function in the Reactive Extensions for JavaScript allows us to take all observable sequences and run them in a cooperative fashion and return their first values in an array.  This function takes a variable amount of arguments, depending on how many observable sequences you have.

Rx.Observable.ForkJoin = function(
    obs1,
    ...
    obsx);

Just as well, in a future version that is not out yet as of this writing will also support an array as the first argument such as:

Rx.Observable.ForkJoin = function(
    observableArray);

Now, let’s run through a quick example of using ForkJoin.  In this example, I’m going to check the flight status of a given number of flights, in case I have to coordinate a bunch of people coming into town.  To do this, I can use the Bing Instant Answer API to do that and return the answers in JSON format. 

function flightStatus(flight) {
    var serviceUrl = "http://api.bing.net/json.aspx";

    return $.getJSONAsObservable(
        serviceUrl,
        { appId : appId,
          Query : flight,
          Sources : "InstantAnswer",
          Market : "en-us",
          Version : "2.2" })
        .SelectMany(
            function(d) {
                return d.data.SearchResponse.InstantAnswer.Results.toObservable();
            });

}

Once we’ve defined our function to get the flight status, let’s start with a few flight numbers.

var flights = ["UA123","UA124","UA125"];

Now, we need to take those flight numbers and then turn them into observable sequences by using the jQuery map function which takes the sequence and applies a function to each element.

var flightsAsObservables = $.map(flights, function(flight) { 
    return flightStatus(flight); })

And now we can call ForkJoin to run these in parallel and return them in the order in which we sent them.  We’ll then iterate over the array of results and output the departure and arrival airport and associated times.

Rx.Observable.ForkJoin(flightsAsObservables)
    .Subscribe(function(results) {

        $.each(results, function(_,result) {
            var status = result.InstantAnswerSpecificData.FlightStatus;

            $("#translatedText")
                .append("<li>Flight: "+ status.FlightName +
                    " from " + status.OriginAirport.Name +
                    " to " + status.DestinationAirport.Name +
                    " departs at " + status.ScheduledDeparture +
                    " arrives at " + status.ScheduledArrival + "</li>");

        });
    });

And there you have it, we’re now able to run computations in a cooperative multi-tasking environment and return the result as an array, and in this case, getting the flight schedules for a number of flights we wish to track.

Conclusion

Using the Reactive Extensions for JavaScript, we’re able to run observable sequences together and get the result as an arry using a well defined pattern of fork/join in JavaScript.  That’s just one of the many things we can do with it that I’ll hopefully cover more in the near future.  So, download it, and give the team feedback!

3 Comments

Comments have been disabled for this content.