16 Ways To Create IObservables without implementing IObservable
The Reactive Extensions for .Net offers plenty of ways to create IObservables
Some primitives
IObservable<int> obs = Observable.Empty<int>(); IObservable<int> obs = Observable.Return(0); IObservable<int> obs = Observable.Throw<int>(new Exception());
Simple streams
IObservable<long> obs = Observable.Interval(new TimeSpan(0, 0, 1)); IObservable<long> obs = Observable.Timer(DateTimeOffset.Now.AddHours(1)); // Plus 7 more overloads IObservable<int> obs = Observable.Repeat(1); // Plus 7 more overloads IObservable<int> obs = Observable.Range(0, 1);
From async data
//From an Action or Func Observable.Start(() => 1); //From Task Task.Factory.StartNew(...).ToObservable(); //From AsyncPattern // typical use case is IO or Web service calls Func<int,int,double> sampleFunc = (a,b) => 1d; Func<int, int, IObservable<double>> funcObs = Observable.FromAsyncPattern<int, int, double>(sampleFunc.BeginInvoke, sampleFunc.EndInvoke); IObservable<double> obs = funcObs(1, 0);
From Events
public event EventHandler<EventArgs> AnEvent; IObservable<IEvent<EventArgs>> fromEventObs = Observable.FromEvent<EventArgs>(h => this.AnEvent += h, h => this.AnEvent -= h);From Existing Collections
IEnumerable<int> ie = new int[] {}; observable = ie.ToObservable();
By Generate()
There are 20 overloads to generate. See some prior examples here
By Create()
This creates a cold stream
IObservable<int> observable = Observable.Create<int>(o => { o.OnNext(1); o.OnNext(2); o.OnCompleted(); return () => { }; });
To make a hot stream via Create()
List<IObserver<int>> _subscribed = new List<IObserver<int>>(); private CreateHot() { observable = Observable.Create<int>(o => { _subscribed.Add(o); return () => _subscribed.Remove(o); }); } private void onNext(int val) { foreach (var o in _subscribed) { o.OnNext(val); } }
But rather than using create, a subject provides a cleaner (thread safe and tested) way of doing the above
var subj = new Subject<int>(); observable = subj.Hide(); subj.OnNext(1);