How to reuse lambda parameter across multiple combined expressions

If you’re having multiple separately defined expressions having common lambda parameter like these:

Expression<Func<User, bool>> pre1 = u => u.FirstName != null;
Expression<Func<User, bool>> pre2 = u => u.MiddleName != null;
Expression<Func<User, bool>> pre3 = u => u.LastName != null;

And trying to combine them into one and to reuse the said parameter like this:

ParameterExpression param = Expression.Parameter(typeof(User), "u");

var predicates = GetPredicates();
var body = predicates.Select(exp => exp.Body)
                     .Aggregate((left, right) => Expression.AndAlso(left, right));
Expression<Func<User, bool>> lambda = Expression.Lambda<Func<User, bool>>(body, param);
Func<User, bool> func = lambda.Compile();

Then you’ll likely get an exception:

Unhandled Exception: System.InvalidOperationException: variable ‘u’ of type ‘User’ referenced from scope ”, but it is not defined.

The reason is that spite the lambda parameters have same type and name they’re defined in different expressions thus in different scopes so I can’t be reused as one.

What you need to do is to use single parameter across all expressions, in other words to unify/merge/replace their parameters:

class ParameterReplacer : ExpressionVisitor
{
  private readonly ParameterExpression _param;

  private ParameterReplacer(ParameterExpression param)
  {
    _param = param;
  }

  protected override Expression VisitParameter(ParameterExpression node)
  {
    return node.Type == _param.Type ? // if types match on both of ends
      base.VisitParameter(_param) : // replace
      node; // ignore
  }

  public static T Replace(ParameterExpression param, T exp) where T : Expression
  {
    return (T)new ParameterReplacer(param).Visit(exp);
  }
}

And then:

var body = predicates.Select(exp => exp.Body)
                     .Select(exp => ParameterReplacer.Replace(param, exp))
                     .Aggregate((left, right) => Expression.AndAlso(left, right));

var lambda = Expression.Lambda<Func<User, bool>>(body, param);

That’s it, now compilation will work fine and produce the desired predicate.

Happy expressions building!

Posted in Programming | Tagged , | Leave a comment

Durable queue in Azure Service Fabric using WebJobs, Part 2: Web API

This the second post in a series about Durable queue in Azure Service Fabric using WebJobs:

Now let’s create another stateless service which will host the Web API. I used default Visual Studio template called “Stateless ASP.NET Core”. You can find how it configures Kestrel as an underlying application web server here.

This service/Web API will have a controller responsible to convert requests into queue messages, enqueue them. For the sake of demonstration, this would be an incidents management system:

  • Read requests go through cache. High volume can be handled. Cache is populated from storage during cold start.
  • Write requests go though cache as well and update it, then update storage. Implemented using HTTP verb PATCH to better support concurrency.
namespace DurableQueueSample.StatelessWebApiService.Controllers
{
  public sealed class IncidentController : Controller
  {
    public async Task Get(Guid incidentId)
    {
      var actorId = new ActorId($"GET+{incidentId}", Uri("fabric:/DurableQueueSampleApplication/ActorStatefulService");
      var actor = ActorProxy.Create(actorId);

      var incident = await actor.GetIncident(incidentId);
      return Ok(incident);
    }

    public async Task Patch(Guid incidentId, string name, string value)
    {
      var patchId = Guid.NewGuid();
      var patch = new IncidentPatch(patchId, incidentId, name, value);

      var messageContent = _jsonConverter.Convert(patch);
      await queueClient.EnqueueClient(queueName, messageContent);

      var url = _urlBuilder.BuildUrl(StatusController.GetStatusRouteName, new { patchId })!
      return Accepted(url);
    }
  }
}

In contrast to ASP.NET Web API which doesn’t have a built-in method to return 202 Accepted so you would need to roll out custom extension method, ASP.NET Core does have a family of them. We need the one which accepts Uri without object.

Method Get() routes request directly to the stateful service hosting requests processing actors. While method Patch() enqueues requests, doesn’t wait its processing to be completed, returns the control immediately after Azure Queue client acknowledges message reception. This way client browser also doesn’t wait, can check the status by following provided url, e.g. /api/status/?patchId={patchId}. However other clients, if try to retrieve the incident before the patch has been applied, might see outdated values. This is the price for update requests asynchronous processing.

Here’s how the queue trigger function looks like:

namespace DurableQueueSample.StatelessWebJobsService.Functions
{
  public sealed class IncidentPatchFunction
  {
    [Singleton("{IncidentId}")]
    public async Task Handle(
    [QueueTrigger(queueName)] IncidentPatch patch,
    TextWriter dashboardLogger)
    {
      var actorId = new ActorId($"PATCH+{patch.IncidentId}", Uri("fabric:/DurableQueueSampleApplication/ActorStatefulService");
      var actor = ActorProxy.Create(actorId);
      await actor.UpdsteIncident(patch.IncidentId, patch.PropertyName, patch.PropertyValur);
    }
  }
}

Next time: we’ll create another, this time stateful service which will host the actors.

Posted in Programming | Tagged , , | Leave a comment

Durable queue in Azure Service Fabric using WebJobs, Part 1: Introduction to Asynchronous Operations

This the first post in a series about Durable queue in Azure Service Fabric using WebJobs:

  • Part 1: Introduction to Asynchronous Operations
  • Part 2: Web API

The built-in into Azure Service Fabric (shortly ASF, or just SF) actor model and its queuing/dispatching mechanism has one, besides other if any, serious disadvantage: if an actor is created to process each incoming HTTP request (for given id), since actors by design are single-threaded, then all subsequent requests won’t be processed until the actor is done with the first one. Then again only one request is picked-up for processing, all other are basically waiting in a simple in-memory queue. And so on. What is unacceptable under heavy load because all incoming HTTP requests are blocked and waiting so are the users in browsers behind them.

Before switching to ASF, I was successfully using Azure Web Apps with WebJobs in the back to off-load heavy on computing HTTP requests which would be otherwise dropped by timeout. The workflow works as follows:

  • Web API accepts HTTP request, converts it into queue message payload, enqueues this message into Azure Queue, returns status code 202 Accepted which means that the request was not processed yet but rather considered for future processing. According to RFC 7231 – HTTP/1.1 Semantics and Content, RESTful Cookbook, or Microsoft docs for HDInsight, response should also include the Location header which client can follow to check the status of the operation.
  • WebJobs host polls the queue over configurable period of time and notifies a so called queue trigger which from the SDK consumer perspective looks like an asynchronous method invocation. One of the advtages is that the message payload can be read as either a string or a custom strongly-typed model class.
  • If trigger succeeded (didn’t throw exception) then message is considered processed and disappears from the queue.
  • Otherwise trigger is invoked again. If unsuccessful invocation threshold has been reached then message becomes poison and is put into a separate, so called poison message queue which can be monitored as any other, regular queue.

To enable the same workflow on an ASF cluster, start the WebJobs host within a stateless service:

namespace DurableQueueSample.StatelessWebJobsService
{
  internal sealed class StatelessWebJobsService : StatelessService
  {
    public StatelessWebJobsService(StatelessServiceContext context)
      : base(context)
    {
    }

    protected override async Task RunAsync(CancellationToken cancellationToken)
    {
      try
      {
        var config = new JobHostConfiguration
        {
          DashboardConnectionString = connectionString,
          StorageConnectionString = connectionString,
          Queues =
          {
            BatchSize = 5,
            MaxDequeueCoun = 3,
            MaxPollingInterval = TimeSpan.FromSeconds(30)
          }
        };
        var host = new JobHost(config);
        await host.StartAsync(cancellationToken);
      }
      catch (Exception ex)
      {
        ServiceEventSource.Current.ServiceStartupFailedEvent(ex.ToString());
        throw;
      }
    }
  }
}

I omitted host configuration for brevity, you can find more in its source code, luckily it’s fully open sourced. I usually set the following properties instantiated by a Dependency Injection (DI) container such as Simple Injector:

  • DashboardConnectionString
  • StorageConnectionString
  • JobActivator
  • NameResolver
  • TypeLocator

Next time: we’ll create another stateless service which will host the Web API.

Posted in Programming | Tagged , , | Leave a comment

Documentation as a Feature, Documentation is a Feature

TL;DR: documentation should be treated, i.e. planned, estimated and maintained, as any other functional part of a project.

I don’t always write documentation but when I do, I believe it’s more important it to be up-to-date than to exist. That it, it’s better to do not have any documentation than to have an out-of-date one. Because outdated documentation is misleading, misinforms your clients, might do more harm than good.

Posted in Programming | Tagged , | Leave a comment

How to move data from SQL Server to ADLA table using ADF

In Universal Store Team, the Universal Payout Platform – Earnings Calculations project, we need to move data from on-prem SQL Server, as well as SQL Server within an Azure vnet and SQL Azure, to the Cloud. As our storage in the cloud we chose Azure Data Lake storage (ADLS) and as data movement vehicle we chose Azure Data Factory (ADF). Then we use Azure Data Lake analytics (ADLA) as our Big Data processing platform.

Here’s few simple steps how we’ve managed to copy data from SQL to ADLS and invoke U-SQL script to insert this data into ADLA table.

Prerequisites:

  • You have Azure subscription
  • You have ADF, ADLS and ADLA accounts setup
  • ADF is authorized to access ADLS and ADLA
    • As of today, ADLS linked service for authorization supports both personal OAuth user token and Azure service principal
    • ADLA supports only OAuth. However the support for the latter is coming. Please vote to expedite.

Steps:

  1. Create new pipeline. It will contain 2 sequential activities:
    {
      "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json",
      "name": "Pipeline-CopyDataFromSqlToAzureDataLakeStore",
      "properties": {
        "start": "{start}",
        "end": "{end}",
        "activities": [
        ]
      }
    }
    
  2. Add an activity to move the data. It will execute T-SQL stored procedure dbo.GetData() accepting 2 parameters of type DateTime2(0) which represent slice start and end:
    {
      "name": "CopyData",
      "type": "Copy",
      "inputs": [
        {
          "name": "Dataset-Source-Sql_data"
        }
      ],
      "outputs": [
        {
          "name": "Dataset-Destination-AzureDataLakeStore_data"
        }
      ],
      "typeProperties": {
        "source": {
          "type": "SqlSource",
          "sqlReaderQuery": "$$Text.Format('exec dbo.GetData \'{0:yyyy-MM-dd HH:00:00}\', \'{1:yyyy-MM-dd HH:00:00}\'', SliceStart, SliceEnd)"
        },
        "sink": {
          "type": "AzureDataLakeStoreSink",
          "writeBatchSize": 0,
          "writeBatchTimeout": "00:00:00"
        }
      },
      "policy": {
        "concurrency": 3,
        "executionPriorityOrder": "OldestFirst",
        "retry": 3,
        "timeout": "01:00:00"
      },
      "scheduler": {
        "frequency": "Hour",
        "interval": 1
      }
    }
    
  3. It needs the corresponding dataset which represents the actual activity/pipeline output:
    {
      "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
      "name": "Dataset-Destination-AzureDataLakeStore_data",
      "properties": {
        "type": "AzureDataLakeStore",
        "linkedServiceName": "LinkedService-Destination-AzureDataLakeStore",
        "typeProperties": {
          "folderPath": "data",
          "fileName": "{Slice}.json",
          "partitionedBy": [
            {
              "name": "Slice",
              "value": {
                "type": "DateTime",
                "date": "SliceStart",
                "format": "yyyy-MM-dd_HH-mm"
              }
            }
          ],
          "format": {
            "type": "JsonFormat",
            "filePattern": "arrayOfObjects",
            "encodingName": "UTF-8"
          }
        },
        "external": false,
        "availability": {
          "frequency": "Hour",
          "interval": 1
        },
        "policy": {
        }
      }
    }
    
  4. Add an activity to insert data into table. It will execute U-SQL procedure mydb.dbo.InsertData() again accepting 2 parameters of type System.DateTime which represent slice start and end. Please note that to run the activities sequentially the output of the former must be the input of the latter:
    {
      "name": "RunScript",
      "type": "DataLakeAnalyticsU-SQL",
      "linkedServiceName": "LinkedService-Destination-AzureDataLakeAnalytics",
      "inputs": [
        {
          "name": "Dataset-Destination-AzureDataLakeStore_data"
        }
      ],
      "outputs": [
        {
          "name": "Dataset-Destination-AzureDataLakeStore_dummy"
        }
      ],
      "typeProperties": {
        "script": "mydb.dbo.InsertData (Convert.ToDateTime(@sliceStart), Convert.ToDateTime(@sliceEnd));",
        "parameters": {
          "sliceStart": "$$SliceStart",
          "sliceEnd": "$$SliceEnd"
        },
        "priority": 1,
        "degreeOfParallelism": 3
      }
    }
    
  5. Since all activities, even those which don’t produce any output, need the corresponding dataset, create a dummy one:
    {
      "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
      "name": "Dataset-Destination-AzureDataLakeStore_dummy",
      "properties": {
      "type": "AzureDataLakeStore",
      "linkedServiceName": "LinkedService-Destination-AzureDataLakeStore",
      "typeProperties": {
        "folderPath": ""
       },
       "external": false,
       "availability": {
         "frequency": "Hour",
         "interval": 1
       },
       "policy": {
       }
     }
    }
    
  6. Create a U-SQL script to create the destination ADLA table. You can run it once:
    CREATE TABLE IF NOT EXISTS MyDataTable
    (
      Id long,
      Value string,
      CreatedDateTime DateTime,
    
      INDEX IX_MyDataTable_Id
      CLUSTERED (Id ASC)
    )
    DISTRIBUTED BY DIRECT HASH (Id);
    
  7. Create a U-SQL script to create an ADLA procedure which will read newly created file, parse and insert its content into the destination table:
    DROP PROCEDURE IF EXISTS InsertData;
    CREATE PROCEDURE InsertData (@sliceStart DateTime, @sliceEnd DateTime)
    AS
    BEGIN
      REFERENCE ASSEMBLY [Microsoft.Analytics.Samples.Formats];
      REFERENCE ASSEMBLY [Newtonsoft.Json];
    
      USING Microsoft.Analytics.Samples.Formats.Json;
    
      DECLARE @input = "/data/{CreatedDateTime:yyyy}-{CreatedDateTime:MM}-{CreatedDateTime:dd}_{CreatedDateTime:HH}-{CreatedDateTime:mm}_{CreatedDateTime:ss}.json";
    
      @data =
        EXTRACT Id long,
            Value string,
            CreatedDateTime DateTime
        FROM  @input
        USING new JsonExtractor();
    
      @filtereddata = 
        SELECT Id, Value, CreatedDateTime
        FROM @data
        WHERE CreatedDateTime >= @sliceStart and CreatedDateTime < @sliceEnd;
    
      INSERT INTO MyDataTable
      (
           Id, Value, CreatedDateTime
      )
      SELECT Id, Value, CreatedDateTime
      FROM @filtereddata;
    END;
    

Conclusion

That’s it. Next time the pipeline runs, first activity will move the data within the specified time window as a file in ADLS, second activity will insert the data into the table by invoking a script in ADLA.

Happy data movement!

Posted in Programming | Tagged , | Leave a comment

How to deploy Azure Data Factory pipeline and its dependencies programatically using PowerShell

Since ADF unfortunately doesn’t provide a built-in way for automated deployment, to do this you have to write a custom script. Then you can run it on build server such as VSTS or TeamCity.

If you’re in the interactive mode, you need to login and select the corresponding subscription. If you run the script on build server, the subscription should be selected for you automatically.

Login-AzureRmAccount
Select-AzureRmSubscription -SubscriptionName YourSubName

Here’s the parameters you can’t get programmatically so need to specify explicitly. Last parameter is a path to JSON files that represent the resources to be deployed: linked services, datasets, and pipelines.

param(
    [Parameter(Mandatory=$true)][string]$ResourceGroupName,
    [Parameter(Mandatory=$true)][string]$Location,
    [Parameter(Mandatory=$true)][string]$DataFactoryName,
    [Parameter(Mandatory=$true)][string]$InputFolderPath
)

First, create the factory itself. Flag -Force helps to continue on error if it already exists. Then load it into a variable:

New-AzureRmDataFactory -ResourceGroupName $ResourceGroupName -Name $DataFactoryName -Location $Location -Force -ErrorAction Stop
$dataFactory = Get-AzureRmDataFactory -ResourceGroupName $ResourceGroupName -Name $DataFactoryName -ErrorAction Stop

Next three loops read files based on the following convention:

  • Linked services start with LinkedService, e.g. LinkedService-SQL.json
  • Datasets start with Dataset, e.g. Dataset-Orders.json
  • Pipelines start with Pipeline, e.g. Pipeline-CopyOrders.json

First, create linked services. If any references a gateway then create it as well. Since the cmdlet doesn’t support flag -Force, we can use -ErrorAction Continue to continue on error if it already exists:

$files = Get-ChildItem $InputFolderPath -Recurse -Include *.json -Filter LinkedService* -ErrorAction Stop
foreach ($file in $files)  
{
    Write-Output "Creating linked services from $($file.FullName)"
    New-AzureRmDataFactoryLinkedService -DataFactory $dataFactory -File $file.FullName -Force -ErrorAction Stop

    $json = Get-Content $file.FullName -Raw -ErrorAction Stop
    $svc = $json | ConvertFrom-Json
    $gwName = $svc.properties.typeProperties.gatewayName
    if ($gwName)
    {
        Write-Output "Creating gateway $($gwName) from $($file.FullName)" 
        New-AzureRmDataFactoryGateway -DataFactory $dataFactory -Name $gwName -ErrorAction Continue
    }
}

Then create datasets:

$files = Get-ChildItem $InputFolderPath -Recurse -Include *.json -Filter Dataset* -ErrorAction Stop
foreach ($file in $files)  
{
    Write-Output "Creating dataset from $($file.FullName)"
    New-AzureRmDataFactoryDataset -DataFactory $dataFactory -File $file.FullName -Force -ErrorAction Stop 
}

And finally, pipelines:

$files = Get-ChildItem $InputFolderPath -Recurse -Include *.json -Filter Pipeline* -ErrorAction Stop
foreach ($file in $files)  
{
    Write-Output "Creating pipeline from $($file.FullName)"
    New-AzureRmDataFactoryPipeline -DataFactory $dataFactory -File $file.FullName -Force -ErrorAction Stop 
}

That’s it. By this time all pipelines should be deployed, verified and started. Happy data movement!

Posted in Programming | Tagged , | 16 Comments

How to enable tracing in ASP.NET Web API using Application Insights

If you were wondering on other day what this line from the default template does:

    config.EnableSystemDiagnosticsTracing();

Then know that it does exactly what is says: enables tracing using system.diagnostics API.

To forward this tracing to Application Insights, you need to install these two packages:

Install-Package Microsoft.AspNet.WebApi.Tracing
Install-Package Microsoft.ApplicationInsights.TraceListener

and add a trace listener:

<system.diagnostics>
  <trace autoflush="true">
    <listeners>
      <add name="AppInsightsListener" type="Microsoft.ApplicationInsights.TraceListener.ApplicationInsightsTraceListener, Microsoft.ApplicationInsights.TraceListener" />
    </listeners>
  </trace>
</system.diagnostic

That’s it! The Trace category is now being populated:

asp.net web api tracing ai

Happy tracing!

Posted in Programming | Tagged , , | Leave a comment

The ObjectContext instance has been disposed and can no longer be used for operations that require a connection

TL;DR: Don’t return task before disposing context, await it instead

If you’re getting an Entity Framework exception:

System.ObjectDisposedException: The ObjectContext instance has been disposed and can no longer be used for operations that require a connection
at System.Data.Entity.Core.Objects.ObjectContext.ReleaseConnection()
at System.Data.Entity.Core.Objects.ObjectContext.d__3d`1.MoveNext()
— End of stack trace from previous location where exception was thrown —
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Data.Entity.Core.Objects.ObjectContext.d__39.MoveNext()

in the code such as this:

public Task<Order> GetOrders()
{
    using (var dbContext = new MyDbContext(connectionString))
    {
        return dbContext.Orders.ToArrayAsync();
    }
}

then it means that you’re disposing the context before the task is completed. Await it, indeed:

public async Task<Order> GetOrders()
{
    using (var dbContext = new MyDbContext(connectionString))
    {
        return await dbContext.Orders.ToArrayAsync();
    }
}

Happy awaiting!

Posted in Programming | Tagged , | Leave a comment

Could not load file or assembly ‘Newtonsoft.Json, Version=6.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed’ or one of its dependencies. The located assembly’s manifest definition does not match the assembly reference

TL;DR: Add reference to Newtonsoft.Json.dll in every project

If you’re getting a runtime exception:

System.IO.FileLoadException: Could not load file or assembly ‘Newtonsoft.Json, Version=6.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed’ or one of its dependencies. The located assembly’s manifest definition does not match the assembly reference. (Exception from HRESULT: 0x80131040).

which by the way is impossible to troubleshoot by searching in Google or on StackOverflow because it’s way too generic and has a millions of possible causes and solutions.

In our particular case, we had to make sure that every project that directly and (what is more likely) indirectly depends on this assembly, has a reference to the version you’re using, has the nuget package installed.

Checking you have correct assembly redirects in-place won’t hurt as well:

<configuration>
  <runtime>
    <assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
      <dependentAssembly>
        <assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" />
        <bindingRedirect oldVersion="0.0.0.0-8.0.0.0" newVersion="8.0.0.0" />
      </dependentAssembly>
  </runtime>
</configuration>

Happy building!

Posted in Programming | Tagged | Leave a comment

How to generate ids programmatically using SQL Server and Entity Framework

TL;DR: create a sequence and execute stored proc to read from it

Generating ids in the database is used to be the default and near to the only one approach but now days this trend is fading away. Here’s some scenarios discovered by me recently where it doesn’t play well:
CQRS (see the comments, also Steven’s blog)
– bulk insert into many-to-many mapping table (see my previous post)

So here’s a solution. First, create a sequence (available in SQL Server 2012 or later, and in SQL Azure):

create sequence dbo.OrderId as int
start with 1
increment by 1

Then, read from it using the built-in stored procedure:

exec sys.sp_sequence_get_range @sequence_name, @range_size, @range_first_value OUT" 

And finally, execute the stored procedure programmatically. In this particular example, using Entity Framework:

 public async Task<int> GetSequenceStart(string sequenceName, int count)
{
    var sequenceNameParam = new SqlParameter
    {
        ParameterName = "@sequence_name",
        SqlDbType = SqlDbType.NVarChar,
        Direction = ParameterDirection.Input,
        Value = sequenceName
    };
    var countParam = new SqlParameter
    {
        ParameterName = "@range_size",
        SqlDbType = SqlDbType.Int,
        Direction = ParameterDirection.Input,
        Value = count
    };
    var outputParam = new SqlParameter
    {
        ParameterName = "@range_first_value",
        SqlDbType = SqlDbType.Variant,
        Direction = ParameterDirection.Output
    };

    await _dbContext.ExecuteSqlCommandAsync(
        "exec sys.sp_sequence_get_range @sequence_name, @range_size, @range_first_value OUT",
        sequenceNameParam,
        countParam,
        outputParam);

    return (int)outputParam.Value;
}

The drawbacks of this solution is that it doesn’t scale that well:
– if you’re using more than one database, you have to have one of them as a master and it will receive higher load.
– each application instance must be fully aware of such hierarchy what crosses the boundaries of horizontal scaling (aka scaling out).

Happy sequencing!

Posted in Programming | Tagged , | Leave a comment