16 Ways To Create IObservables without implementing IObservable

The Reactive Extensions for .Net offers plenty of ways to create IObservables

 

Some primitives

IObservable<int> obs = Observable.Empty<int>();
IObservable<int> obs = Observable.Return(0);
IObservable<int> obs = Observable.Throw<int>(new Exception());

Simple streams

IObservable<long> obs = Observable.Interval(new TimeSpan(0, 0, 1));
IObservable<long> obs = Observable.Timer(DateTimeOffset.Now.AddHours(1)); // Plus 7 more overloads
IObservable<int> obs = Observable.Repeat(1); // Plus 7 more overloads
IObservable<int> obs = Observable.Range(0, 1);

 

From async data

//From an Action or Func
Observable.Start(() => 1);

//From Task
Task.Factory.StartNew(...).ToObservable();

//From AsyncPattern
// typical use case is IO or Web service calls
Func<int,int,double> sampleFunc = (a,b) => 1d;
Func<int, int, IObservable<double>> funcObs = 
                  Observable.FromAsyncPattern<int, int, double>(sampleFunc.BeginInvoke, 
                                                                      sampleFunc.EndInvoke);
IObservable<double> obs = funcObs(1, 0);

From Events

public event EventHandler<EventArgs> AnEvent;
IObservable<IEvent<EventArgs>> fromEventObs = 
Observable.FromEvent<EventArgs>(h => this.AnEvent += h, 
                                      h => this.AnEvent -= h);
From Existing Collections
IEnumerable<int> ie = new int[] {};
observable = ie.ToObservable();

By Generate()

There are 20 overloads to generate. See some prior examples here

By Create()

This creates a cold stream

IObservable<int> observable = Observable.Create<int>(o =>
                                {
                                    o.OnNext(1);
                                    o.OnNext(2);
                                    o.OnCompleted();
                                    return () => { };
                                });

To make a hot stream via Create()

List<IObserver<int>> _subscribed = new List<IObserver<int>>();
private CreateHot()
{
    observable = Observable.Create<int>(o =>
    {
        _subscribed.Add(o);
        return () => _subscribed.Remove(o);
    });

}
private void onNext(int val)
{
    foreach (var o in _subscribed)
    {
        o.OnNext(val);
    }
}

But rather than using create, a subject provides a cleaner (thread safe and tested) way of doing the above

var subj = new Subject<int>();
observable = subj.Hide();
subj.OnNext(1);

2 Comments

Comments have been disabled for this content.