Use Event Aggregator to make your application more extensible

Recently, in KiGG/DotNetShoutout we have integrated Twitter, nothing complex, very basic thing like when a story is submitted or appears in the front page it will broadcast in Twitter and like our feed it will post the short url of the original story (Cant resist to do some shameless marketing for DotNetShoutout).

Since it is a new requirement, initially I have modified the StoryService constructor to include the twitter client as a new argument and use it after the story is added in the database, the code is something like the following:

//Other codes

//Add it in database
_storyRepository.Add(story);

//Increase User Score
_userScoreService.StorySubmitted(byUser);

// Send Trackback
PingStory(content, story, detailUrl);

//Ping the Feed Servers
PingServers();

//Send Tweet
_twitter.UpdateStatus(story);

result = new StoryCreateResult { NewStory = story, DetailUrl = detailUrl };

return result;

As you can see there are quite a few things that I have do:

  • Increase the User Score.
  • Send Trackback (Thank you for submitting this cool story)
  • Ping Feedburner/Pingomatic/technorati servers.
  • Send Tweet.

So for this new requirement we have to modify it and will again have to do if we add more features in future and we do have a plan to implement the Spy/Log (don’t confuse it with the code logger, it is ajax view which shows what is happening currently in the application) and this obviously not a path we should follow. It violets the OCP completely, tie us to write some transactional script  kind of code. We need to find a way where we can add new features without modifying the StoryService. One option would be to pass an array of IBackgroundService (As these are noting but some background services)in the StoryService constructor which these classes will implement and later on the StoryService will call the Execute method of each background service. But Story Submit, Story Publish is more like a system event which should be globally available.

Enter Event Aggregator

As per Martin Fowler “An Event Aggregator acts as a single source of events for many objects. It registers for all the events of the many objects allowing clients to register with just the aggregator.” Jeremy D Miller also used this pattern in his excellent Build your own CAB series. And recently I was browsing through the CompositeWpf/Prism code and found the EventAggregator which seems a perfect candidate for my situation. According to its documentation:

“The EventAggregator service is primarily a container for events that allow decoupling of publishers and subscribers so they can evolve independently.”

EventAggregator

But it has some dependency on .NET WPF which I would like to remove, and after talking with Glen Block, he confirmed that since it is licensed under MS-PL, I can copy and modify it according to my need. Also most of the example you will find of Event Aggregator demonstrate the UI events, but it is a very powerful pattern and can be used in non UI events too which we will see next.

As mentioned in the above that UserScoreService, PingStory, PingServer, SendTweet is some kind of background task which can be run independently and it will act as subscriber in the above diagram, so lets create an interface:

public interface IBackgroundTask
{
    bool IsRunning
    {
        get;
    }

    void Start();

    void Stop();
}

Next we will create an abstract class which will implement this interface to reduce the duplicate codes:

public abstract class BaseBackgroundTask : IBackgroundTask
{
    private readonly IEventAggregator _eventAggregator;

    protected BaseBackgroundTask(IEventAggregator eventAggregator)
    {
        if (eventAggregator == null)
        {
            throw new ArgumentNullException("eventAggregator");
        }

        _eventAggregator = eventAggregator;
    }

    public bool IsRunning
    {
        get;
        private set;
    }

    protected IEventAggregator EventAggregator
    {
        get
        {
            return _eventAggregator;
        }
    }

    public void Start()
    {
        OnStart();
        IsRunning = true;
    }

    public void Stop()
    {
        OnStop();
        IsRunning = false;
    }

    protected abstract void OnStart();

    protected abstract void OnStop();

    protected SubscriptionToken Subscribe<TEvent, TEventArgs>(Action<TEventArgs> action) where TEvent : BaseEvent<TEventArgs> where TEventArgs : class
    {
        return EventAggregator.GetEvent<TEvent>().Subscribe(action, true);
    }

    protected void Unsubscribe<TEvent>(SubscriptionToken token) where TEvent : BaseEvent
    {
        EventAggregator.GetEvent<TEvent>().Unsubscribe(token);
    }
}

Now lets create some concrete implementation of these background tasks.

First, the Twitter:

public class SendTweet : BaseBackgroundTask
{
    private SubscriptionToken _storySubmitToken;

    public SendTweet(IEventAggregator eventAggregator) : base(eventAggregator)
    {
    }

    protected override void OnStart()
    {
        if (!IsRunning)
        {
            _storySubmitToken = Subscribe<StorySubmitEvent, StorySubmitEventArgs>(StorySubmitted);
        }
    }

    protected override void OnStop()
    {
        if (IsRunning)
        {
            Unsubscribe<StorySubmitEvent>(_storySubmitToken);
        }
    }

    private void StorySubmitted(StorySubmitEventArgs eventArgs)
    {
        UpdateStatus(eventArgs.Story);
    }

    private void UpdateStatus(Story story)
    {
        //Update Twitter status goes here
    }
}

Next the UserScoreService:

public class UserScoreService : BaseBackgroundTask
{
    private SubscriptionToken _storySubmitToken;

    public UserScoreService(IEventAggregator eventAggregator) : base(eventAggregator)
    {
    }

    protected override void OnStart()
    {
        if (!IsRunning)
        {
            _storySubmitToken = Subscribe<StorySubmitEvent, StorySubmitEventArgs>(StorySubmitted);
        }
    }

    protected override void OnStop()
    {
        if (IsRunning)
        {
            Unsubscribe<StorySubmitEvent>(_storySubmitToken);
        }
    }

    private void StorySubmitted(StorySubmitEventArgs eventArgs)
    {
        IncreaseUserScore(eventArgs.Story.PostedBy);
    }

    private void IncreaseUserScore(User user)
    {
        // Increase user score goes here
    }
}

I am skipping the PingStory and PingServer as they are very much same as above.

Next the published part, now the StoryService will forward the StorySubmit event to EventAggregator instead of calling these class individually.

//Other codes

//Add it in database
_storyRepository.Add(story);

//Publish the event
_eventAggregator.GetEvent<StorySubmitEvent>().Publish(new StorySubmitEventArgs(story, detailUrl));

result = new StoryCreateResult { NewStory = story, DetailUrl = detailUrl };

return result;

And now we have to ensure that these background services can start listing this event. So lets create a BootstrapperTask which I have shown in my previous post.

public class StartBackgroundTasks : IBootstrapperTask
{
    private readonly IBackgroundTask[] _tasks;

    public StartBackgroundTasks(IBackgroundTask[] tasks)
    {
        _tasks = tasks;
    }

    public void Execute()
    {
        foreach(IBackgroundTask task in _tasks)
        {
            task.Start();
        }
    }
}

And at last the wiring of Unity container, I am using the fluent version but you can also use the xml configuration if you want.

IUnityContainer container = new UnityContainer();

container.RegisterType<IBackgroundTask, UserScoreService>("userScore", new ContainerControlledLifetimeManager())
         .RegisterType<IBackgroundTask, SendTweet>("sendTweet", new ContainerControlledLifetimeManager())
         .RegisterType<IBackgroundTask, PingStory>("pingStory", new ContainerControlledLifetimeManager())
         .RegisterType<IBackgroundTask, PingServer>("pingServer", new ContainerControlledLifetimeManager())
         .RegisterType<IEventAggregator, EventAggregator>(new ContainerControlledLifetimeManager())
         .RegisterType<IControllerFactory, CommonServiceLocatorControllerFactory>(new ContainerControlledLifetimeManager())
         .RegisterType<IBootstrapperTask, RegisterRoutes>("route", new ContainerControlledLifetimeManager(), new InjectionConstructor(new [] { RouteTable.Routes}))
         .RegisterType<IBootstrapperTask, RegisterControllerFactory>("controllerFactory", new ContainerControlledLifetimeManager())
         .RegisterType<IBootstrapperTask, StartBackgroundTasks>("startBackgroundTasks", new ContainerControlledLifetimeManager())
         .RegisterType<IFormsAuthentication, FormsAuthenticationService>(new ContainerControlledLifetimeManager())
         .RegisterType<IMembershipService, AccountMembershipService>(new InjectionConstructor())
         .RegisterType<AccountController>()
         .RegisterType<HomeController>();

ServiceLocator.SetLocatorProvider(() => new UnityServiceLocator(container));

My initial plan was to refer you the KiGG/DotNetShoutout source code rather than posting a separate version, but as you know that ASP.NET MVC RC2 has released few days ago I would rather upgrade it to that version.

Further Reference:

Comments/Suggestions?

Download: EventAggregator.zip

Shout it

2 Comments

  • I've been playing with this concept some, but have run into a few questions:
    1) How do you manage thread-safety? Not knowing what thread each publisher or subscriber is on made me unsure whether there was a chance for a race condition in my "spike".
    2) Why not just use plain old events and event handlers? Something like:
    //Subscribers
    _eventAggregator.StorySubmitted += this.StorySubmitted;
    ...
    //Publishers
    _eventAggregator.OnStorySubmitted(this, args);

  • @Daniel:

    1. I have mentioned that I am using a modified version for the web app, the EventAggregator.GetEvent() uses ReaderWriterLockSlim for thread safety. And BTW it does not support background/ui threading like the original Prism version, the subscriber uses the same publisher thread.

    2. The benefit is i do not have to modify the signature of this class when I am adding a new event. Also there is a filtering support like the original prism version.

    Hope this will clarify your concerns.

Comments have been disabled for this content.