Contents tagged with CLR

  • Stream transforms in Coral8 via .Net

    In the first post on integrating PowerShell and Coral8 I showed how to create a message sink. In C#ish pseudocode we did the following:

    OutputStream<T>  =>  Action<T>

    where the Action<T> was a PowerShell block to send a message via GMail.

    Today the goal is to do a transform, something along the following

    OutputStream<T>  =>  Func<T,T2> => InputStream<T2>

    In this example, T will be a pair of stock symbols, T2 will be a pair stock symbols along with the correlation of their log normal closing prices for the past 10 days.

    let’s get started.

    CREATE SCHEMA           StockPairSchema( SecA STRING,SecB STRING );
    CREATE SCHEMA           StockPairCorrelationSchema INHERITS from StockPairSchema (CORR  FLOAT) ;
    CREATE OUTPUT   STREAM  StockPairs          SCHEMA StockPairSchema;
    CREATE INPUT    STREAM  StockCorrelations   SCHEMA StockPairCorrelationSchema;
    ATTACH OUTPUT ADAPTER PairstoCorrelationsFunc TYPE PoShAdapter TO STREAM StockPairs
    RESULTSSTREAM = "ccl://localhost:6789/Stream/Default/TestC8/StockCorrelations",

    The process block is very simple:

    foreach ($t in $input)
        $a = Get-LogReturns $t["SecA"]
        $b = Get-LogReturns $t["SecB"]
        $c = [Demo.Stats]::Correlate($a, $b)
        ,,($t["SecA"],$t["SecB"],$c) #// double commas so the values are not flattened

    The block path defines the Get-LogReturns and Correlate function, so naturally it’s a bit longer.

    $wc = New-Object Net.WebClient    
    function Get-LogReturns ($sec)
        $qry = "$sec"
        $secAData = ConvertFrom-Csv $wc.DownloadString($qry) | select  -First 10 | % { $_.'Adj Close' }
        for ($i=0; $i -lt $secAData.Count-1; $i++) { [Math]::Log( $secAData[$i] / $secAData[$i+1] )  }


    $csCode = @"
    using System;
    using System.Collections.Generic;
    using System.Linq;
    namespace Demo 
        public static class Stats
            private static IEnumerable<TResult> Zip<TFirst, TSecond, TResult>(IList<TFirst> first, 
    				IList<TSecond> second, 
    				Func<TFirst, TSecond, TResult> func)
                for (int ii = 0; ii < Math.Min(first.Count(),second.Count()); ii++)
                    yield return func(first[ii],second[ii]);
            public static double Correlate(object[] s1,object[] s2)
                return Correlate(s1.Cast<double>(), s2.Cast<double>());
            public static double Correlate(IEnumerable<double> s1,IEnumerable<double> s2)
                var sum1 = s1.Sum();
                var sum2 = s2.Sum();
                var sumSq1 = s1.Select(v=> v*v).Sum();
                var sumSq2 = s2.Select(v=> v*v).Sum();
                var pSum = Zip(s1.ToList(),s2.ToList(), ( a, b) => a*b).Sum();
                var len = s1.Count();
                var num = pSum - ((sum1 * sum2) / len);
                var denom = Math.Sqrt(((sumSq1 - (sum1 * sum1) / len) * (sumSq2 - (sum2 * sum2) / len)));
                return (denom == 0.0) ? 0 : num / denom;
    Add-Type -TypeDefinition $csCode -Language CSharpVersion3 -PassThru 

    And in case you missed it, the correlate function isn't in PowerShell at all, but rather coded up via in-line C# code, compiled at the startup of the adaptor, and running in the Coral8 server process.