Stream transforms in Coral8 via .Net

In the first post on integrating PowerShell and Coral8 I showed how to create a message sink. In C#ish pseudocode we did the following:

OutputStream<T>  =>  Action<T>

where the Action<T> was a PowerShell block to send a message via GMail.

Today the goal is to do a transform, something along the following

OutputStream<T>  =>  Func<T,T2> => InputStream<T2>

In this example, T will be a pair of stock symbols, T2 will be a pair stock symbols along with the correlation of their log normal closing prices for the past 10 days.

let’s get started.

Coral8
CREATE SCHEMA           StockPairSchema( SecA STRING,SecB STRING );
CREATE SCHEMA           StockPairCorrelationSchema INHERITS from StockPairSchema (CORR  FLOAT) ;
CREATE OUTPUT   STREAM  StockPairs          SCHEMA StockPairSchema;
CREATE INPUT    STREAM  StockCorrelations   SCHEMA StockPairCorrelationSchema;
ATTACH OUTPUT ADAPTER PairstoCorrelationsFunc TYPE PoShAdapter TO STREAM StockPairs
PROPERTIES
RESULTSSTREAM = "ccl://localhost:6789/Stream/Default/TestC8/StockCorrelations",

The process block is very simple:

foreach ($t in $input)
{
    $a = Get-LogReturns $t["SecA"]
    $b = Get-LogReturns $t["SecB"]
    $c = [Demo.Stats]::Correlate($a, $b)
    ,,($t["SecA"],$t["SecB"],$c) #// double commas so the values are not flattened
}

The block path defines the Get-LogReturns and Correlate function, so naturally it’s a bit longer.
Get-LogReturns:

$wc = New-Object Net.WebClient    
function Get-LogReturns ($sec)
{
    $qry = "http://ichart.finance.yahoo.com/table.csv?s=$sec"
    $secAData = ConvertFrom-Csv $wc.DownloadString($qry) | select  -First 10 | % { $_.'Adj Close' }
    for ($i=0; $i -lt $secAData.Count-1; $i++) { [Math]::Log( $secAData[$i] / $secAData[$i+1] )  }
}

[Demo.Stats]::Correlate

$csCode = @"
using System;
using System.Collections.Generic;
using System.Linq;
namespace Demo 
{
    public static class Stats
    {
        private static IEnumerable<TResult> Zip<TFirst, TSecond, TResult>(IList<TFirst> first, 
				IList<TSecond> second, 
				Func<TFirst, TSecond, TResult> func)
        {
            for (int ii = 0; ii < Math.Min(first.Count(),second.Count()); ii++)
                yield return func(first[ii],second[ii]);
        }
        public static double Correlate(object[] s1,object[] s2)
        {
            return Correlate(s1.Cast<double>(), s2.Cast<double>());
        }
        public static double Correlate(IEnumerable<double> s1,IEnumerable<double> s2)
        {
            var sum1 = s1.Sum();
            var sum2 = s2.Sum();
            var sumSq1 = s1.Select(v=> v*v).Sum();
            var sumSq2 = s2.Select(v=> v*v).Sum();
            var pSum = Zip(s1.ToList(),s2.ToList(), ( a, b) => a*b).Sum();
            var len = s1.Count();
            var num = pSum - ((sum1 * sum2) / len);
            var denom = Math.Sqrt(((sumSq1 - (sum1 * sum1) / len) * (sumSq2 - (sum2 * sum2) / len)));
            return (denom == 0.0) ? 0 : num / denom;
        }
    }
}
"@
Add-Type -TypeDefinition $csCode -Language CSharpVersion3 -PassThru 

And in case you missed it, the correlate function isn't in PowerShell at all, but rather coded up via in-line C# code, compiled at the startup of the adaptor, and running in the Coral8 server process.

1 Comment

  • Hi there,

    I'm new to the CEP world. I've created a basic Coral8 stream from a CSV file. Do you possibly have some sample code where I can bind the stream to a DataGridView in C# or VB.Net?

    Any help would be greatly appreciated

    Andi

Comments have been disabled for this content.