The code samples and PowerPoint deck from my presentation on the RX to the New York ALT.NET group are available on MSDN Code samples:
http://code.msdn.microsoft.com/RxDemos
Recently my building has been having issues with its boilers, and the heat has been going out for longer than is comfortable. The superintendent that makes a habit of periodically checking on the status of each of the boilers. A workable approach certainly, but figured this would be ideal for a technology assist.
For $9, I purchased a USB thermometer, word on the web is that the software comes with its fairly miserable (and crashed immediately on machine), but with some Google and reflector, was able to come up with a polling based API to read the temperature
public interface IUsbTEMPer
{
double Temperature { get; }
}
With the RX, converting the API into a stream of data is just one line:
IObservable<double> ts = Observable.Generate(
Scheduler.Later,
() => new Notification<double>.OnNext(usbTempReader.Temperature)
).Publish();
And getting some simple alerts is easy too:
ts.Buffer(new TimeSpan(1, 5, 0))
.Select(fiveminOfTemp => fiveminOfTemp.Average())
.Where(avgtemp => avgtemp < 65)
.Subscribe(cold => ToTwiter("buildingstatus account..."));
There’s a pattern used in the Reactive Extensions that I’m calling the Anonymous Implementation.
You can see it in use on IObservable’s one method
IDisposable Subscribe(IObserver<T> observer);
Given an Observable which you want to subscribe to, the brute force, naive (or pedantic) approach would be to create a new class implementing IObserver<T> and then subscribe it.
public class BasicConsoleWriteLineObserver<T>: IObserver<T>
{
public void OnNext(T value)
{
Console.WriteLine(value);
}
public void OnError(Exception ex)
{
throw ex;
}
public void OnCompleted() { }
}
IObservable<int> stream = ...;
stream.Subscribe(new BasicConsoleWriteLineObserver<int>());
But a simpler method, one that dispenses of all the ceremony of creating a new Type is to use one of the Observer factory methods, like so:
IObserver<int> basicCWLobserver = Observer.Create((int value) => Console.WriteLine(value));
stream.Subscribe(basicCWLobserver);
Observer.Create (and it’s 4 overloads) will create an anonymous implementation of the interface needed. Of course you can skip a step or two and use the extension methods on IObservable to get even shorter versions of the same:
stream.Subscribe(Console.WriteLine);
This approach is great, for all but the most complex implementations, there’s no need to ever implement IObserver<T>. And as they say, less code is better code.
There are a number of other interfaces that I wish used this approach. IValueConverter, and IComparer<T> come to mind first.
The Reactive Extensions for .Net offers plenty of ways to create IObservables
Some primitives
IObservable<int> obs = Observable.Empty<int>();
IObservable<int> obs = Observable.Return(0);
IObservable<int> obs = Observable.Throw<int>(new Exception());
Simple streams
IObservable<long> obs = Observable.Interval(new TimeSpan(0, 0, 1));
IObservable<long> obs = Observable.Timer(DateTimeOffset.Now.AddHours(1)); // Plus 7 more overloads
IObservable<int> obs = Observable.Repeat(1); // Plus 7 more overloads
IObservable<int> obs = Observable.Range(0, 1);
From async data
//From an Action or Func
Observable.Start(() => 1);
//From Task
Task.Factory.StartNew(...).ToObservable();
//From AsyncPattern
// typical use case is IO or Web service calls
Func<int,int,double> sampleFunc = (a,b) => 1d;
Func<int, int, IObservable<double>> funcObs =
Observable.FromAsyncPattern<int, int, double>(sampleFunc.BeginInvoke,
sampleFunc.EndInvoke);
IObservable<double> obs = funcObs(1, 0);
From Events
public event EventHandler<EventArgs> AnEvent;
IObservable<IEvent<EventArgs>> fromEventObs =
Observable.FromEvent<EventArgs>(h => this.AnEvent += h,
h => this.AnEvent -= h);
From Existing Collections
IEnumerable<int> ie = new int[] {};
observable = ie.ToObservable();
By Generate()
There are 20 overloads to generate. See some prior examples here
By Create()
This creates a cold stream
IObservable<int> observable = Observable.Create<int>(o =>
{
o.OnNext(1);
o.OnNext(2);
o.OnCompleted();
return () => { };
});
To make a hot stream via Create()
List<IObserver<int>> _subscribed = new List<IObserver<int>>();
private CreateHot()
{
observable = Observable.Create<int>(o =>
{
_subscribed.Add(o);
return () => _subscribed.Remove(o);
});
}
private void onNext(int val)
{
foreach (var o in _subscribed)
{
o.OnNext(val);
}
}
But rather than using create, a subject provides a cleaner (thread safe and tested) way of doing the above
var subj = new Subject<int>();
observable = subj.Hide();
subj.OnNext(1);
Streaming OLAP is something that comes up over and over again in the “CEP space” – using the Reactive Extensions for .Net this demo shows the basics; filtering, grouping, aggregates, and concurrent queries.
To set the context, the idea here is focus on the “query” side, so the UI is aggressively simple – no nifty visualizations here, just enough to show working code. I chose a non-financial domain, filesystem changes in this case, simply because the simplified equities example is a bit overused.
Here’s a screenshot and download link http://code.msdn.microsoft.com/FSOlapRxDemo
Let’s walk through the code:
Getting the data
The .Net class FileSystemWatcher will report changes to the filesystem as events, so to get a stream of filesystem changes we only need do the following:
- get all the fixed drives
- create a new FileSystemWatcher for each
- convert the Changed, Deleted, and Created events to IObservables using FromEvent()
- Merge() those into a single IObservable
- map the EventArgs to to a more query friendly class
public static IObservable<FileChangeFact> GetFileSystemStream()
{
var fsEventTypes = new string[] { "Changed", "Deleted", "Created" };
IEnumerable<IObservable<IEvent<FileSystemEventArgs>>> fsEventsAsObservables =
DriveInfo.GetDrives()
.Where(di => di.DriveType == DriveType.Fixed)
.Select(drive => new FileSystemWatcher(drive.RootDirectory.FullName) { … })
.SelectMany(fsw => fsEventTypes.Select(eventType => Observable.FromEvent…;
return Observable.Merge(fsEventsAsObservables)
.Select(fsea =>
{
var fi = new FileInfo(fsea.EventArgs.FullPath);
return new FileChangeFact
{
ChangeType = fsea.EventArgs.ChangeType,
Path = fsea.EventArgs.FullPath,
IsContainer = !fi.Exists,
Length = fi.Exists ? fi.Length : 0,
Extension = String.IsNullOrEmpty(fi.Extension) ? "(none)" : fi.Extension
};
});
}
Calculating Aggregates, take 1
The Scan() operator is ideal for computing aggregates in a streaming olap scenario. Unlike traditional queries where vectors are aggregated into a single value, we want to computing running values.
So to compute a few of the most common query aggregates, Count, Sum, Mean, and StdDev we can do the following:
public static IObservable<double> Mean(this IObservable<double> source)
{
var temp = new { N = 0, Mean = 0d };
return source.Scan(temp, (cur, next) =>
{
var n = cur.N + 1;
var delta = next - cur.Mean;
var meanp = cur.Mean + delta / n;
return new
{
N = n,
Mean = meanp,
};
}).Select(it => it.Mean);
}
public static IObservable<double> StdDev(this IObservable<double> source)
{
var temp = new { N = 0, Mean = 0d, M2 = 0d };
return source.Scan(temp, (cur, next) =>
{
var n = cur.N + 1;
var delta = next - cur.Mean;
var meanp = cur.Mean + delta / n;
return new
{
N = n,
Mean = meanp,
M2 = cur.M2 + delta * (next - meanp)
};
}).Select(it => Math.Sqrt(it.M2 / (it.N)));
}
var fss = GetFileSystemStream();
fss.Select(fcf => (double)fcf.Length)
.Scan(0,(c, _) => c + 1) // Count
.Zip(lenxs.Scan(0d,(c, n) => c + n), (cnt, sum) => new FileChangeAggregate(){Sum=sum,Count...
.Zip(lenxs.Mean(), (fca, mean) => { fca.Mean = mean; return fca; })
.Zip(lenxs.StdDev(), (fca, stddev) => { fca.StdDev = stddev; return fca; })
//... subscribe()...
The first two methods are just wrappers around Scan(), and to compute multiple aggregates on a single stream, the Zip() operator comes in handy, letting us essentially stitch together multiple computations.
One nice aspect of this approach is that the code for each of the aggregates cohesive, loosely coupled, and composable – all nice attributes, but it’s verbose and redundant (in that the Mean is calculated twice) and I suspect not as performant. So that takes us to
Calculating Aggregates, take 2
public static IObservable<StatInfoItem<T>> ToCommonAggregates<T, TSrc>(
this IObservable<TSrc> source,
Func<TSrc, double> dataSelector,
Func<TSrc, T> itemSelector)
{
return source.Scan(new StatInfoItem<T>(), (cur, next) =>
{
double data = dataSelector(next);
T itemp = itemSelector(next);
var n = cur.Count + 1;
var delta = data - cur.Mean;
var meanp = cur.Mean + delta / n;
var m2 = cur.M2 + delta * (data - meanp);
var stdDevp = Math.Sqrt(m2 / n);
return new StatInfoItem<T>()
{
Item = itemp,
Sum = data + cur.Sum,
Count = n,
Mean = meanp,
M2 = m2,
StdDev = stdDevp,
Min = Math.Min(data, cur.Min),
Max = Math.Max(data, cur.Max),
};
})
.Skip(1); // need a seed, but don't want to include seed value in the output
}
Which also add Min() & Max(). Perhaps not as elegant, but using the above is quite easy:
IOvservable<StatInfoItem<string>> aggstream =
fss.ToCommonAggregates(fcf => fcf.Length, _ => "Label")
Filtering and Multiple Queries
For the demo, each query will be a “drill down” into the filesystem, implemented as a filter on the full path of the file that changed. This is done with the same Where() operator used in normal Linq. Adding multiple queries isn’t as trivial. Unlike IEnumerables, which contain their data, Observables only have a promise of future data. So if we want to say find out how much has changed in c:\windows, we need to query a store of past data as well as include future data. Here’s one approach using the StartWith() operator and a ConcurrentQueue :
private IObservable<FileChangeFact> _fss;
private ConcurrentQueue<FileChangeFact> _store = new …
private void NewQuery()
{
_fss = GetFileSystemStream();
_fss.Subscribe(fsi => _store.Enqueue(fsi));
var newQuery = _fss.StartWith(_store.ToArray()).Where(fsi => {...} );
}
A cleaner approach is to take advantage of the RX’s subjects, in particular the ReplaySubject which store history for us.
private ReplaySubject<FileChangeFact> _storeSubject;
_storeSubject = GetFileSystemStream().Replay();
var drillDownQry = _storeSubject.Where(fltr);
Grouping
Thanks to the GroupBy() and SelectMany() operators grouping turns out to be quite easy. First we group by the file extention, selecting the Length property. And for each group, compute the aggregates.
IObservable<StatInfoItem<string>> grouped =
newQuerystream.GroupBy(fsi => fsi.Extension, fsi => (double)fsi.Length)
.SelectMany(grp => grp.ToCommonAggregates(x => x, _ => grp.Key));
Updating the UI
Merging a stream of data into an ObservableCollection<T> is such a common operation. I’ve found myself wishing that MS offered a KeyedObservableCollection<T> and I’ve considered writting such a thing, but instead I used a smaller extension method MergeInsert() which does the job without the all the additional ceremony of a new class
public static IDisposable MergeInsert<T, TKey>(
this ObservableCollection<T> col,
IObservable<T> stream,
Func<T, TKey> keySelector)
{
col.Clear();
Dictionary<TKey, int> lookupTable = new Dictionary<TKey, int>();
return stream.Subscribe(item =>
{
var key = keySelector(item);
if (!lookupTable.ContainsKey(key))
{
lookupTable[key] = col.Count;
col.Add(item);
}
else
{
col[lookupTable[key]] = item;
}
});
}
Use it like so:
GroupedByExtentionCollection = new ObservableCollection<StatInfoItem<string>>();
IDisposable _disp = GroupedByExtentionCollection.MergeInsert(grouped.ObserveOnDispatcher(),
sii => sii.Item);
The bug I mentioned in my first attempt at a sliding window was the minor issue that the aggegates never went down to 0, even if the window had emptied out.
The problem line of code was cur.GroupBy(tsst => tsst.Value.Symbol) – if the window is empty, there is nothing to group – and as a result the aggregates don’t get computed.
Here’s the fix:
public IObservable<VWAPItem> GetVWAPWS(IObservable<Timestamped<StockTick>> oticks)
{
var existingWindows = new ConcurrentDictionary<string,int>();
return oticks
.ToSlidingWindow(new TimeSpan(0, 0, 0, 30), new TimeSpan(0, 0, 0, 0, 500))
.Select(sl => sl.Current)
.SelectMany(cur =>
{
IEnumerable<VWAPItem> grouped = cur.GroupBy(tsst => tsst.Value.Symbol)
.Select(grp =>
{
IEnumerable<StockTick> ticks = grp.Select(tsst2 => tsst2.Value);
var totalAmount = ticks.Sum(tk => tk.Size * tk.Price);
var totalVolume = ticks.Sum(tk => tk.Size);
return new VWAPItem(grp.Key, totalAmount,
totalVolume,
totalAmount / totalVolume);
});
foreach (var grpd in grouped)
{
existingWindows[grpd.Symbol] = 1;
}
IEnumerable< IEnumerable<VWAPItem>> outerJoin = existingWindows
.GroupJoin(grouped,
key => key.Key,
grped => grped.Symbol,
(key, item) =>
item.DefaultIfEmpty(new VWAPItem(key.Key, 0, 0, 0)));
return outerJoin.SelectMany(x => x);
});
}
A few months ago, playing with CTP 2 of StreamInsight, I created a small VWAP demo on a sliding window. Now that a proper CTP of the RX is available, I wanted to see how much effort the same demo would be without the CEP infrastructure of StreamInsight. I’ll admit that this was a little bit harder to write then I expected – and there’s still at least one bug remaining (updated) , but the code for actually computing the VWAPS feels much cleaner in the RX version then it did in the StreamInsight version. The debugability (which is really about transparency) of RX is a welcome difference to most CEP systems.
So here’s the code:
The generation of stock ticks remained nearly identical – however instead of timestamping by hand, I used to Timestamp() extension method. And to allow multiple observers to the same IObservable, the ticks are routed to a Subject.
public IObservable<Timestamped<StockTick>> GetTicks()
{
var subj = new Subject<Timestamped<StockTick>>();
var gen = Observable.Generate(
0
, ii => ii < 1000 // produce 1000 ticks
, ii => new StockTick() // next value
...
)
.Timestamp();
gen.Subscribe(tsst => subj.OnNext(tsst));
return subj;
}
Compute VWAP on a 10 second sliding window
public IObservable<VWAPItem> GetVWAPWS(IObservable<Timestamped<StockTick>> oticks)
{
return oticks
.ToSlidingWindow(new TimeSpan(0, 0, 0,10), new TimeSpan(0, 0, 0, 0, 500))
.Select(sl => sl.Current)
.SelectMany(cur =>
cur.GroupBy(tsst => tsst.Value.Symbol)
.Select(grp =>
{
IEnumerable<StockTick> ticks = grp.Select(tsst2 => tsst2.Value);
var totalAmount = ticks.Sum(tk => tk.Size * tk.Price);
var totalVolume = ticks.Sum(tk => tk.Size);
return new VWAPItem(grp.Key, totalAmount, totalVolume,
totalAmount / totalVolume);
}));
}
And the code for ToSlidingWindow()
public static IObservable<SlidingWindow<Timestamped<T>>> ToSlidingWindow<T>(
this IObservable<Timestamped<T>> source,
TimeSpan size, TimeSpan resolution)
{
Func<SlidingWindow<Timestamped<T>>, TimeoutJoinItem<T>, SlidingWindow<Timestamped<T>>>
windowing = (window, item) =>
{
Func<Timestamped<T>, bool> checkTimestamp =
cwi => cwi.Timestamp.Add(size) <= item.ComparisonTimestamp;
var newCurrent = window.Current.SkipWhile(checkTimestamp);
var removed = window.Current.TakeWhile(checkTimestamp);
var added = Enumerable.Repeat(item.TSItem, (item.IsTimeout) ? 0 : 1);
return
new SlidingWindow<Timestamped<T>>(newCurrent.Concat(added), added, removed);
};
DateTime priorleft = DateTime.MinValue;
return source.CombineLatest(Observable.Timer(resolution, resolution).Timestamp(),
(left, right) =>
{
bool isTimeout = left.Timestamp == priorleft;
priorleft = left.Timestamp;
return new TimeoutJoinItem<T>(left,
(isTimeout)? right.Timestamp: left.Timestamp,
isTimeout);
}).Scan(new SlidingWindow<Timestamped<T>>(), windowing)
.Where(sl => sl.Added.Count() > 0 || sl.Removed.Count() > 0);
}
The key elements in the above are
- Observable.Timer – this is our heartbeat which allows us to detect passage of time without new events
- CombineLatest – Join two IObservables – the data stream and the time stream
- Scan – this is Accumulate() for Observables – the windowing function takes the current Window and computes the new windows based on which elements have expired and been added
- And finally reduce noise by removing SlidingWindows which have not changed
The code to wire it up to a windows is standard stuff, just
var ticks = _model.GetTicks();
ticks
.ObserveOnDispatcher()
.Subscribe(tst => TickCollection.Add(tst));
var vwapDict= new Dictionary<string,VWAPItem>();
_model.GetVWAPWS(ticks)
.ObserveOnDispatcher()
.Subscribe(vwap =>
{
if (vwapDict.ContainsKey(vwap.Symbol))
VWAPCollection.Remove(vwapDict[vwap.Symbol]);
vwapDict[vwap.Symbol] = vwap;
VWAPCollection.Add(vwap);
});
And of course the required screenshot

Downloads of the Reactive Framework (RX) can now be found at MS DevLabs. Versions for 3.5 SP1, 4.0 Beta, and Silverlight 3 are available. Interestingly, the API size appears to be substantially larger than the preview which was leaked as part of the Silverlight 3 Toolkit. That DLL was all of 84KB, the current release is weighs in at 283KB.
In regards to CEP, the comparisons between StreamInsight and RX are interesting
| RX | | StreamInsight |
| Low – it’s managed code all the way down | Leaking abstraction | High; Linq2SI is like Linq2SQL, except the underling SI implementation isn’t well understood |
| Limited by CLR and GC | Performance | High b/c of native code |
| None out of box | Windowing support | Explicit |
| Easy | Adaptor support | Not hard, but not trivial |
In my last post I showed how to send StreamInsight output streams to a UI via the ReactiveFramework. Here’s we’ll do the reverse, by sending an RX stream into a CEP stream. Instead of a partial example, I’ll use an end to end example showing simulated stock ticks, computing the 5 min rolling VWAP, and showing the results on a UI.
First we’ll generate the ticks:
System.Collections.Generic.IObservable<StockTick> stockTicks =
System.Linq.Observable.Generate(
new Random() // inital state
, rnd => true // continue
, rnd => new StockTick() // next value
{
Price = rnd.NextDouble() * 1000,
Side = Sides[rnd.Next(Sides.Length - 1)],
Size = (long)(rnd.NextDouble() * 1000),
Symbol = Symbols[rnd.Next(Symbols.Length - 1)],
Timestamp = DateTime.Now
}
, rnd => (int)(rnd.NextDouble() * 2000) // waitInterval
, rnd => rnd // iterate
);
And now convert to a CEP stream:
var cepInput = stockTicks.ToCEP()
.ToCepStream(tick => tick.Timestamp);
Where ToCep() is just the inverse of ToRx(), defined previously.
public static S.IObservable<T> ToCEP<T>(this RX.IObservable<T> rxSource)
{
return new CEPAnonymousObservable<T>(
(S.IObserver<T> cepObserver) =>
RX.ObservableExtensions.Subscribe(rxSource, nextVal=> cepObserver.OnNext(nextVal)));
}
Computing the rolling 5 min VWAP, (grouped by symbol) takes some effort
var alteredDurationStream = cepInput
.ToPointEventStream()
.AlterEventDuration(tick => TimeSpan.FromMinutes(5));
var fiveMinVWaps = from fivemin in
alteredDurationStream
group fivemin by fivemin.Symbol into fGrp
from evwindow in fGrp.Snapshot()
select new
{
Symbol = fGrp.Key,
TotalAmount = evwindow.Sum(fmin => fmin.Size * fmin.Price),
TotalVolume = evwindow.Sum(fmin => fmin.Size),
};
var fiveMinVWaps2 = from fivemin in fiveMinVWaps
select new VWAPItem()
{
Symbol = fivemin.Symbol,
VWAP = fivemin.TotalAmount / fivemin.TotalVolume,
Timestamp = DateTime.Now,
};
Although this nearly looks like conventional .Net Linq code, it isn’t. Think Linq2SQL. These are expressions, not pure CLR lambdas, so it’s not possible to place a breakpoint, nor are any arbitrary .Net computations allowed. The reason for the additional fiveMinVWaps2 projection is that it’s not possible to compute anything but the Sum or Avg in a SnapShot().
Now that we have the data, we can convert to back to a RX stream:
PropertyInfo tsProp = typeof(VWAPItem).GetProperty("Timestamp");
var vwaps = fiveMinVWaps2.ToObservable(tsProp).ToRX();
And update an ObservableCollection
vwaps.Post(_sc).Subscribe(item =>
{
var exists = CEPOS1.Where(vw => vw.Symbol == item.Symbol).FirstOrDefault();
if (exists == null)
CEPOS1.Add(item);
else
exists.CopyFrom(item);
});
Which displays on a UI

One compelling feature of StreamInsight is it’s in-process hosting model. In addition to reducing the complexity of server side installs, it’s now possible to have a CEP engine in the client UI.
The simplest way of getting CEP streams onto the UI would be the Reactive Framework methods. Something like
queryOutputStream
.ToObservable(...)
.Post(syncContext)
.Subscribe(item=> collection.Add(item) );
But in the CTP that won’t work. As I discovered a few days ago The IObservable used in StreamInsight is defined in a different namespace and assembly than the IObservable in the System.Reactive. Furthermore the StreamInsight api lacks the base classes and extension methods defined in System.Reactive.
I didn’t want to go the normal route of creating an implementation of IObserver, on say a ViewModel, route the data through the dispatcher on onto a collection, as while it would have the benefits of simplicity and it would work, it would mean giving up on all the goodness in System.Reactive.
The first method I tried in an effort to convert a CEP IObservable into an RX IObservable didn’t work, but was instructive nonetheless.
Using StreamInsight’s own I/O adapter API, I would create an “Eventing” Adapter which would raise an conventional .Net event on an object of my choosing, then using the ReactiveFramework, convert that event to an RX IObservable.
But it’s not easy (or possible) to do. Instances of output adapters are created by OutputAdapterFactories, which in turn are created by Factory methods. You can send in a configuration object, but it needs to be XML serializable, so there’s no sending in of Action<> delegates.
But it turns out that it’s not hard to convert from a CEP IObservable to an RX IObservable.
First you need a CEP AnonymousObserver<T>
using S = System;
internal class AnonymousObserver<T> : S.IObserver<T>
{
private bool isStopped;
private S.Action _onCompleted;
private S.Action<S.Exception> _onError;
private S.Action<T> _onNext;
public AnonymousObserver(S.Action<T> onNext, S.Action<S.Exception> onError)
: this(onNext, onError, () => { })
{
}
public AnonymousObserver(S.Action<T> onNext, S.Action<S.Exception> onError, S.Action onCompleted)
{
_onNext = onNext;
_onError = onError;
_onCompleted = onCompleted;
}
public void OnCompleted()
{
if (!isStopped)
{
isStopped = true;
_onCompleted();
}
}
public void OnError(S.Exception exception)
{
if (!isStopped)
{
isStopped = true;
_onError(exception);
}
}
public void OnNext(T value)
{
if (!isStopped)
_onNext(value);
}
}
Then an extension method taking a CEP IObservable, returning a RX AnonymousObservable<T>, subscribing to it via the CEP IObserver and on the OnNext, calling the returning RX IObservable’s on OnNext.
Like so:
using RX = System.Collections.Generic;
using S = System;
public static class CEPExtMethods
{
public static RX.IObservable<T> ToRX<T>(this S.IObservable<T> source)
{
return new AnonymousObservable<T>(
(RX.IObserver<T> rxObserver) =>
source.Subscribe(new AnonymousObserver<T>(
nextVal => rxObserver.OnNext(nextVal),
rxObserver.OnError
)));
}
}
To use it:
var queryOutputStream = CreateQueryTemplate(input);
var queryOutput = queryOutputStream.ToObservable(typeof(EventTypeCount).GetField("Time"));
queryOutput.ToRX().Send(_sc).Subscribe(v => this.CEPOS1.Add(v));
And results from the Observable sample on screen

More Posts
Next page »