Stream transforms in Coral8 via .Net
In the first post on integrating PowerShell and Coral8 I showed how to create a message sink. In C#ish pseudocode we did the following:
OutputStream<T> => Action<T>
where the Action<T> was a PowerShell block to send a message via GMail.
Today the goal is to do a transform, something along the following
OutputStream<T> => Func<T,T2> => InputStream<T2>
In this example, T will be a pair of stock symbols, T2 will be a pair stock symbols along with the correlation of their log normal closing prices for the past 10 days.
let’s get started.
Coral8CREATE SCHEMA StockPairSchema( SecA STRING,SecB STRING ); CREATE SCHEMA StockPairCorrelationSchema INHERITS from StockPairSchema (CORR FLOAT) ; CREATE OUTPUT STREAM StockPairs SCHEMA StockPairSchema; CREATE INPUT STREAM StockCorrelations SCHEMA StockPairCorrelationSchema; ATTACH OUTPUT ADAPTER PairstoCorrelationsFunc TYPE PoShAdapter TO STREAM StockPairs PROPERTIES RESULTSSTREAM = "ccl://localhost:6789/Stream/Default/TestC8/StockCorrelations",
The process block is very simple:
foreach ($t in $input)
{
$a = Get-LogReturns $t["SecA"]
$b = Get-LogReturns $t["SecB"]
$c = [Demo.Stats]::Correlate($a, $b)
,,($t["SecA"],$t["SecB"],$c) #// double commas so the values are not flattened
}
The block path defines the Get-LogReturns and Correlate function, so naturally it’s a bit longer.
Get-LogReturns:
$wc = New-Object Net.WebClient
function Get-LogReturns ($sec)
{
$qry = "http://ichart.finance.yahoo.com/table.csv?s=$sec"
$secAData = ConvertFrom-Csv $wc.DownloadString($qry) | select -First 10 | % { $_.'Adj Close' }
for ($i=0; $i -lt $secAData.Count-1; $i++) { [Math]::Log( $secAData[$i] / $secAData[$i+1] ) }
}
[Demo.Stats]::Correlate
$csCode = @"
using System;
using System.Collections.Generic;
using System.Linq;
namespace Demo
{
public static class Stats
{
private static IEnumerable<TResult> Zip<TFirst, TSecond, TResult>(IList<TFirst> first,
IList<TSecond> second,
Func<TFirst, TSecond, TResult> func)
{
for (int ii = 0; ii < Math.Min(first.Count(),second.Count()); ii++)
yield return func(first[ii],second[ii]);
}
public static double Correlate(object[] s1,object[] s2)
{
return Correlate(s1.Cast<double>(), s2.Cast<double>());
}
public static double Correlate(IEnumerable<double> s1,IEnumerable<double> s2)
{
var sum1 = s1.Sum();
var sum2 = s2.Sum();
var sumSq1 = s1.Select(v=> v*v).Sum();
var sumSq2 = s2.Select(v=> v*v).Sum();
var pSum = Zip(s1.ToList(),s2.ToList(), ( a, b) => a*b).Sum();
var len = s1.Count();
var num = pSum - ((sum1 * sum2) / len);
var denom = Math.Sqrt(((sumSq1 - (sum1 * sum1) / len) * (sumSq2 - (sum2 * sum2) / len)));
return (denom == 0.0) ? 0 : num / denom;
}
}
}
"@
Add-Type -TypeDefinition $csCode -Language CSharpVersion3 -PassThru
And in case you missed it, the correlate function isn't in PowerShell at all, but rather coded up via in-line C# code, compiled at the startup of the adaptor, and running in the Coral8 server process.