Introduction to the Reactive Extensions for JavaScript – Custom Schedulers

In the previous post, I talked a little bit about asynchronous method chaining and extending jQuery in a fluent style to fetch tweets from Twitter and append them to a given element.  This time I want to expand upon that post, instead of taking only one tweet, I want to take one hundred of them and then cycle through each of them at a given interval.  In order to do that, I must make use of something that I alluded to in earlier posts, and that is a custom scheduler.

Before we get started, let’s get caught up to where we are today:

Custom Schedulers

Another variation on a theme from our last post could be to take instead of just one tweet, but instead, take 100 of them and then fade each one out in turn by a specified interval.  To do this, we’ll have to make use of a custom scheduler.  To create a scheduler, we need three parameters, a schedule, a schedule with time and a now parameter.  The schedule parameter provides an implementation function to the scheduler that runs an action at the scheduler’s convenience.  The schedule with time parameter provides an implementation function to the scheduler to schedule the given action at the specified time from now.  Finally, we provide a now function which provides an implementation function to the scheduler with the timer’s notion of the current time.

// schedule : action -> disposable
// scheduleWithTime : action -> due time -> disposable
// now : () -> time

Rx.Scheduler = function(
    schedule,
    scheduleWithTime,
    now);

With this new found knowledge, we can create our own custom scheduler which takes a delay in milliseconds to create a scheduler.  We can create our Rx.Scheduler by passing in our three functions, the schedule, the schedule with time and the notion of now.  Let’s break each function parameter down one by one.

The schedule function takes a given action and then makes a call to window.setTimeout with our action and our delay which allows us to evaluate the action after the given span.  We then return a Disposable which then gives us the ability to cancel the timeout with our given id from our setTimeout should we need to.

function(action) {
    var id = window.setTimeout(action, delay);
    return Rx.Disposable.Create(function() { 
        window.clearTimeout(id); });
},

The schedule with time function takes given action and a due time, and like before, it makes a call to window.setTimeout but this time with the action and the given due time.  And just as before, we create a Disposable to allow us to clean up and cancel the timeout should we need to. 

function(action, dueTime) {
    var id = window.setTimeout(action, dueTime);
    return Rx.Disposable.Create(function() { 
        window.clearTimeout(id); });
},

Finally, the now parameter takes no arguments and we’ll simply return the current time. 

function() {
      return new Date().getTime();
});

When we put these together we can create our custom DelayedScheduler such as the following:

var delayedScheduler = Rx.DelayedScheduler = function(delay) {

     return new Rx.Scheduler(
        function(action) {
            var id = window.setTimeout(action, delay);
            return Rx.Disposable.Create(function() { 
                window.clearTimeout(id); });
        },
        function(action, dueTime) {
            var id = window.setTimeout(action, dueTime);
            return Rx.Disposable.Create(function() { 
                window.clearTimeout(id); });
        },
        function() {
            return new Date().getTime();
        });
};

Now that we’ve defined our custom scheduler, it’s time to put it to use.  What we’re going to do is take the array of our tweets from Twitter and put the delay between them.  To do that, we’ll make use of the Rx.Observable.FromArray method which takes not only the array we want to turn into an observable, but also a custom scheduler should we need one.  We’ll create an extension to the Array class which allows us to turn our arrays easily into observables.

Array.prototype.toObservable = function(scheduler) {
    return Rx.Observable.FromArray(this, scheduler);
}

Once the extension has been defined, let’s do something useful with it.  We’ll take the approach once again from the previous post to get tweets, but this time, we’ll take the maximum we can, which is 100 and then space each one out accordingly, and then rinse and repeat after we’ve exhausted our 100 tweets.  In our subscription, we’ll fade in and fade out our text as we append it to the content to give ourselves a rolling feel to it.

 

var url = "http://search.twitter.com/search.json?q=4sq.com&rpp=100";
var customScheduler = new Rx.DelayedScheduler(5000);

$.ajaxAsObservable({
            url: url,
            type: "get",
            dataType: "jsonp",
        })
    .SelectMany(
        function(d) {
            return d.data.results.toObservable(customScheduler);
        })
    .Repeat()
    .Subscribe(
        function(result) {
            $("<p/>")
                .fadeTo(1000, 1.0)
                .html(result.text)
                .fadeTo(1000, 0.1)
                .appendTo("#content");
        });

When we fire up our browser of choice we can see the fruits of our labor such as the following with proper spacing between the two and nice effects fading in and out.

image

And there you have it, using a custom scheduler to take data we already have and spread it out over time.  The idea here is to not tax the remote server, but instead query the system once, display all the results and then hit the server again once we’ve exhausted the data source.  You can find the complete example of this code here.

Conclusion

Dealing with asynchronous programming has been in the forefront of many minds in the JavaScript community.  At JSConf, there were several examples of frameworks trying to get around the idea of callbacks and instead lean more towards composition.  By utilizing the Reactive Extensions for JavaScript, we’re able to compose together asynchronous and event-based operations together and transform the results in interesting ways.  For example, we can query a system for the data, space it out over time and then requery the system for more as we need it.  That’s just one of the many things you can do with RxJS.

So with that, download it, and give the team feedback!

No Comments