Category Archives: Programming

Everything about software development, architecture, etc.

How to move data to ADLA table using ADF

In Universal Store Team, the Universal Payout Platform – Earnings Calculations project, we need to move data from on-prem SQL Server, as well as SQL Server within an Azure vnet and SQL Azure, to the Cloud. As our storage in the cloud we chose Azure Data Lake storage (ADLS) and as data movement vehicle we chose Azure Data Factory (ADF). Then we use Azure Data Lake analytics (ADLA) as our Big Data processing platform.

Here’s few simple steps how we’ve managed to copy data from SQL to ADLS and invoke U-SQL script to insert this data into ADLA table.

Prerequisites:

  • You have Azure subscription
  • You have ADF, ADLS and ADLA accounts setup
  • ADF is authorized to access ADLS and ADLA
    • As of today, ADLS linked service for authorization supports both personal OAuth user token and Azure service principal
    • ADLA supports only OAuth. However the support for the latter is coming. Please vote to expedite.

Steps:

  1. Create new pipeline. It will contain 2 sequential activities:
    {
      "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json",
      "name": "Pipeline-CopyDataFromSqlToAzureDataLakeStore",
      "properties": {
        "start": "{start}",
        "end": "{end}",
        "activities": [
        ]
      }
    }
    
  2. Add an activity to move the data. It will execute T-SQL stored procedure dbo.GetData() accepting 2 parameters of type DateTime2(0) which represent slice start and end:
    {
      "name": "CopyData",
      "type": "Copy",
      "inputs": [
        {
          "name": "Dataset-Source-Sql_data"
        }
      ],
      "outputs": [
        {
          "name": "Dataset-Destination-AzureDataLakeStore_data"
        }
      ],
      "typeProperties": {
        "source": {
          "type": "SqlSource",
          "sqlReaderQuery": "$$Text.Format('exec dbo.GetData \\'{0:yyyy-MM-dd HH:00:00}\\', \\'{1:yyyy-MM-dd HH:00:00}\\'', SliceStart, SliceEnd)"
        },
        "sink": {
          "type": "AzureDataLakeStoreSink",
          "writeBatchSize": 0,
          "writeBatchTimeout": "00:00:00"
        }
      },
      "policy": {
        "concurrency": 3,
        "executionPriorityOrder": "OldestFirst",
        "retry": 3,
        "timeout": "01:00:00"
      },
      "scheduler": {
        "frequency": "Hour",
        "interval": 1
      }
    }
    
  3. It needs the corresponding dataset which represents the actual activity/pipeline output:
    {
      "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
      "name": "Dataset-Destination-AzureDataLakeStore_data",
      "properties": {
        "type": "AzureDataLakeStore",
        "linkedServiceName": "LinkedService-Destination-AzureDataLakeStore",
        "typeProperties": {
          "folderPath": "data",
          "fileName": "{Slice}.json",
          "partitionedBy": [
            {
              "name": "Slice",
              "value": {
                "type": "DateTime",
                "date": "SliceStart",
                "format": "yyyy-MM-dd_HH-mm"
              }
            }
          ],
          "format": {
            "type": "JsonFormat",
            "filePattern": "arrayOfObjects",
            "encodingName": "UTF-8"
          }
        },
        "external": false,
        "availability": {
          "frequency": "Hour",
          "interval": 1
        },
        "policy": {
        }
      }
    }
    
  4. Add an activity to insert data into table. It will execute U-SQL procedure mydb.dbo.InsertData() again accepting 2 parameters of type System.DateTime which represent slice start and end. Please note that to run the activities sequentially the output of the former must be the input of the latter:
    {
      "name": "RunScript",
      "type": "DataLakeAnalyticsU-SQL",
      "linkedServiceName": "LinkedService-Destination-AzureDataLakeAnalytics",
      "inputs": [
        {
          "name": "Dataset-Destination-AzureDataLakeStore_data"
        }
      ],
      "outputs": [
        {
          "name": "Dataset-Destination-AzureDataLakeStore_dummy"
        }
      ],
      "typeProperties": {
        "script": "mydb.dbo.InsertData (Convert.ToDateTime(@sliceStart), Convert.ToDateTime(@sliceEnd));",
        "parameters": {
          "sliceStart": "$$SliceStart",
          "sliceEnd": "$$SliceEnd"
        },
        "priority": 1,
        "degreeOfParallelism": 3
      }
    }
    
  5. Since all activities, even those which don’t produce any output, need the corresponding dataset, create a dummy one:
    {
      "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
      "name": "Dataset-Destination-AzureDataLakeStore_dummy",
      "properties": {
      "type": "AzureDataLakeStore",
      "linkedServiceName": "LinkedService-Destination-AzureDataLakeStore",
      "typeProperties": {
        "folderPath": ""
       },
       "external": false,
       "availability": {
         "frequency": "Hour",
         "interval": 1
       },
       "policy": {
       }
     }
    }
    
  6. Create a U-SQL script to create the destination ADLA table. You can run it once:
    CREATE TABLE IF NOT EXISTS MyDataTable
    (
      Id long,
      Value string,
      CreatedDateTime DateTime,
    
      INDEX IX_MyDataTable_Id
      CLUSTERED (Id ASC)
    )
    DISTRIBUTED BY DIRECT HASH (Id);
    
  7. Create a U-SQL script to create an ADLA procedure which will read newly created file, parse and insert its content into the destination table:
    DROP PROCEDURE IF EXISTS InsertData;
    CREATE PROCEDURE InsertData (@sliceStart DateTime, @sliceEnd DateTime)
    AS
    BEGIN
      REFERENCE ASSEMBLY [Microsoft.Analytics.Samples.Formats];
      REFERENCE ASSEMBLY [Newtonsoft.Json];
    
      USING Microsoft.Analytics.Samples.Formats.Json;
    
      DECLARE @input = "/data/{CreatedDateTime:yyyy}-{CreatedDateTime:MM}-{CreatedDateTime:dd}_{CreatedDateTime:HH}-{CreatedDateTime:mm}_{CreatedDateTime:ss}.json";
    
      @data =
        EXTRACT Id long,
            Value string,
            CreatedDateTime DateTime
        FROM  @input
        USING new JsonExtractor();
    
      @filtereddata = 
        SELECT Id, Value, CreatedDateTime
        FROM @data
        WHERE CreatedDateTime >= @sliceStart and CreatedDateTime < @sliceEnd;
    
      INSERT INTO MyDataTable
      (
           Id, Value, CreatedDateTime
      )
      SELECT Id, Value, CreatedDateTime
      FROM @filtereddata;
    END;
    

Conclusion

That’s it. Next time the pipeline runs, first activity will move the data within the specified time window as a file in ADLS, second activity will insert the data into the table by invoking a script in ADLA.

Happy data movement!

How to deploy Azure Data Factory pipeline and its dependencies programatically using PowerShell

Since ADF unfortunately doesn’t provide a built-in way for automated deployment, to do this you have to write a custom script. Then you can run it on build server such as VSTS or TeamCity.

If you’re in the interactive mode, you need to login and select the corresponding subscription. If you run the script on build server, the subscription should be selected for you automatically.

Login-AzureRmAccount
Select-AzureRmSubscription -SubscriptionName YourSubName

Here’s the parameters you can’t get programmatically so need to specify explicitly. Last parameter is a path to JSON files that represent the resources to be deployed: linked services, datasets, and pipelines.

param(
    [Parameter(Mandatory=$true)][string]$ResourceGroupName,
    [Parameter(Mandatory=$true)][string]$Location,
    [Parameter(Mandatory=$true)][string]$DataFactoryName,
    [Parameter(Mandatory=$true)][string]$InputFolderPath
)

First, create the factory itself. Flag -Force helps to continue on error if it already exists. Then load it into a variable:

New-AzureRmDataFactory -ResourceGroupName $ResourceGroupName -Name $DataFactoryName -Location $Location -Force -ErrorAction Stop
$dataFactory = Get-AzureRmDataFactory -ResourceGroupName $ResourceGroupName -Name $DataFactoryName -ErrorAction Stop

Next three loops read files based on the following convention:

  • Linked services start with LinkedService, e.g. LinkedService-Dev.json
  • Datasets start with Dataset, e.g. Dataset-Orders.json
  • Pipelines start with Pipeline, e.g. Pipeline-CopyOrders.json

First, create linked services. If any references a gateway then create it as well. Since the cmdlet doesn’t support flag -Force, we can use -ErrorAction Continue to continue on error if it already exists:

$files = Get-ChildItem $InputFolderPath -Recurse -Include *.json -Filter LinkedService* -ErrorAction Stop
foreach ($file in $files)  
{
    Write-Output "Creating linked services from $($file.FullName)"
    New-AzureRmDataFactoryLinkedService -DataFactory $dataFactory -File $file.FullName -Force -ErrorAction Stop

    $json = Get-Content $file.FullName -Raw -ErrorAction Stop
    $svc = $json | ConvertFrom-Json
    $gwName = $svc.properties.typeProperties.gatewayName
    if ($gwName)
    {
        Write-Output "Creating gateway $($gwName) from $($file.FullName)" 
        New-AzureRmDataFactoryGateway -DataFactory $dataFactory -Name $gwName -ErrorAction Continue
    }
}

Then create datasets:

$files = Get-ChildItem $InputFolderPath -Recurse -Include *.json -Filter Dataset* -ErrorAction Stop
foreach ($file in $files)  
{
    Write-Output "Creating dataset from $($file.FullName)"
    New-AzureRmDataFactoryDataset -DataFactory $dataFactory -File $file.FullName -Force -ErrorAction Stop 
}

And finally, pipelines:

$files = Get-ChildItem $InputFolderPath -Recurse -Include *.json -Filter Pipeline* -ErrorAction Stop
foreach ($file in $files)  
{
    Write-Output "Creating pipeline from $($file.FullName)"
    New-AzureRmDataFactoryPipeline -DataFactory $dataFactory -File $file.FullName -Force -ErrorAction Stop 
}

That’s it. By this time all pipelines should be deployed, verified and started. Happy data movement!

How to enable tracing in ASP.NET Web API using Application Insights

If you were wondering on other day what this line from the default template does:

    config.EnableSystemDiagnosticsTracing();

Then know that it does exactly what is says: enables tracing using system.diagnostics API.

To forward this tracing to Application Insights, you need to install these two packages:

Install-Package Microsoft.AspNet.WebApi.Tracing
Install-Package Microsoft.ApplicationInsights.TraceListener

and add a trace listener:

<system.diagnostics>
  <trace autoflush="true">
    <listeners>
      <add name="AppInsightsListener" type="Microsoft.ApplicationInsights.TraceListener.ApplicationInsightsTraceListener, Microsoft.ApplicationInsights.TraceListener" />
    </listeners>
  </trace>
</system.diagnostic

That’s it! The Trace category is now being populated:

asp.net web api tracing ai

Happy tracing!

The ObjectContext instance has been disposed and can no longer be used for operations that require a connection

TL;DR: Don’t return task before disposing context, await it instead

If you’re getting an Entity Framework exception:

System.ObjectDisposedException: The ObjectContext instance has been disposed and can no longer be used for operations that require a connection
at System.Data.Entity.Core.Objects.ObjectContext.ReleaseConnection()
at System.Data.Entity.Core.Objects.ObjectContext.d__3d`1.MoveNext()
— End of stack trace from previous location where exception was thrown —
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Data.Entity.Core.Objects.ObjectContext.d__39.MoveNext()

in the code such as:

public Task<Order> GetOrders()
{
    using (var dbContext = new MyDbContext(connectionString))
    {
        return dbContext.Orders.ToArrayAsync()
    }
}

then it means that you’re disposing the context before the task is completed. Await it, indeed:

public async Task<Order> GetOrders()
{
    using (var dbContext = new MyDbContext(connectionString))
    {
        return await dbContext.Orders.ToArrayAsync()
    }
}

Happy awaiting!

Could not load file or assembly ‘Newtonsoft.Json, Version=6.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed’ or one of its dependencies. The located assembly’s manifest definition does not match the assembly reference

TL;DR: Add reference to Newtonsoft.Json.dll in every project

If you’re getting a runtime exception:

System.IO.FileLoadException: Could not load file or assembly ‘Newtonsoft.Json, Version=6.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed’ or one of its dependencies. The located assembly’s manifest definition does not match the assembly reference. (Exception from HRESULT: 0x80131040).

which by the way is impossible to troubleshoot by searching in Google or on StackOverflow because it’s way too generic and has a millions of possible causes and solutions.

In our particular case, we had to make sure that every project that directly and (what is more likely) indirectly depends on this assembly, has a reference to the version you’re using, has the nuget package installed.

Checking you have correct assembly redirects in-place won’t hurt as well:

<configuration>
  <runtime>
    <assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
      <dependentAssembly>
        <assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" />
        <bindingRedirect oldVersion="0.0.0.0-8.0.0.0" newVersion="8.0.0.0" />
      </dependentAssembly>
  </runtime>
</configuration>

Happy building!

How to generate ids programmatically using SQL Server and Entity Framework

TL;DR: create a sequence and execute stored proc to read from it

Generating ids in the database is used to be the default and near to the only one approach but now days this trend is fading away. Here’s some scenarios discovered by me recently where it doesn’t play well:
CQRS (see the comments, also Steven’s blog)
– bulk insert into many-to-many mapping table (see my previous post)

So here’s a solution. First, create a sequence (available in SQL Server 2012 or later, and in SQL Azure):

create sequence dbo.OrderId as int
start with 1
increment by 1

Then, read from it using the built-in stored procedure:

exec sys.sp_sequence_get_range @sequence_name, @range_size, @range_first_value OUT" 

And finally, execute the stored procedure programmatically. In this particular example, using Entity Framework:

 public async Task<int> GetSequenceStart(string sequenceName, int count)
{
    var sequenceNameParam = new SqlParameter
    {
        ParameterName = "@sequence_name",
        SqlDbType = SqlDbType.NVarChar,
        Direction = ParameterDirection.Input,
        Value = sequenceName
    };
    var countParam = new SqlParameter
    {
        ParameterName = "@range_size",
        SqlDbType = SqlDbType.Int,
        Direction = ParameterDirection.Input,
        Value = count
    };
    var outputParam = new SqlParameter
    {
        ParameterName = "@range_first_value",
        SqlDbType = SqlDbType.Variant,
        Direction = ParameterDirection.Output
    };

    await _dbContext.ExecuteSqlCommandAsync(
        "exec sys.sp_sequence_get_range @sequence_name, @range_size, @range_first_value OUT",
        sequenceNameParam,
        countParam,
        outputParam);

    return (int)outputParam.Value;
}

The drawbacks of this solution is that it doesn’t scale that well:
– if you’re using more than one database, you have to have one of them as a master and it will receive higher load.
– each application instance must be fully aware of such hierarchy what crosses the boundaries of horizontal scaling (aka scaling out).

Happy sequencing!

How to push NuGet package to VSO feed v3

To push a nuget package to Visual Studio Online feed v3 use the following command:

nuget.exe push MyPackage.1.0.0.nupkg
    -Source https://account.pkgs.visualstudio.com/DefaultCollection/_packaging/My_Feed/nuget/v3/index.json
    -ConfigFile nuget.config
    -ApiKey My_Feed

The crucial part here is to include -ApiKey My_Feed, otherwise you’ll get 404 Not Found error.

Happy packaging!

How to map linking table for many-to-many relationship using Entity Framework 6

TL;DR: use synonym.

As promised earlier to myself, I blog about interesting challenges and the ways I solved them.

The other day, me and my team faced a limitation in EF6: you can either configure a linking table for many-to-many relationship between two entities or map this table to an entity. But not both simultaneously.

For instance, for the relationship between Orders and Addresses, when one order can be linked to multiple addresses (shipping, billing, legal, etc.) and one address can be used for multiple orders.

modelBuilder.Entity<Order>
            .HasMany(x => x.Addresses)
            .WithMany(x => x.Orders)
            .Map(c => c.ToTable("OrderAddresses")
                       .MapLeftKey("OrderId")
                       .MapRightKey("AddressId"));

And in the same time you’d like to have the intermediate, linking entity to be mapped too. To query it independently or for bulk operations such as insert.

modelBuilder.Entity<OrderAddress>
            .ToTable("OrderAddresses");

This case you’ll get an exception during context configuration validation:

System.InvalidOperationException: The navigation property ‘Addresses’ declared on type ‘Order’ has been configured with conflicting mapping information.

The solution would be to create a synonym (which in my project we keep in a separate schema called syn):

create schema syn
create synonym syn.OrderAddresses for dbo.OrderAddresses

and map the intermediate entity to it:

modelBuilder.Entity<OrderAddress>
            .ToTable("OrderAddresses", "syn");

Note the overload accepting the schema name.

Now you can bulk insert into this table directly:

using (var scope = new TransactionScope())
{
    dbContext.BulkInsert(orders);
    dbContext.BulkInsert(addresses);
    dbContext.BulkInsert(orderAddresses);

    scope.Complete();
}

We’re using EntityFramework.BulkInsert but also you can try EntityFramework.Utilities that supports both bulk insert and delete. However it also has a bug and doesn’t expose an overload accepting SqlBulkCopyOptions.

Happy mapping!

How to convert Google API Service Account certificate to base64

Recently in Google Developer Console by mistake I generated a new SSL certificate for my project’s Service Account so had to convert it again from p12 file to base64 representation and store its thumbprint separately.

Here’s how you can do it:

var cert = new X509Certificate2("project-595a8425b1d7.p12", "notasecret", X509KeyStorageFlags.Exportable);
var thumbprint = cert.Thumbprint;
var base64 = Convert.ToBase64String(cert.Export(X509ContentType.Pfx, "notasecret")); // or Pkcs12 which works as well

And just to make sure output string actually works and certificate contains private key:

var test = new X509Certificate2(Convert.FromBase64String(base64), "notasecret");
Debug.Assert(test.PrivateKey != null);

Replacing the for loop with Seq.iter

Here’s the initial function in C#:

public string BuildQuery(IEnumerable<KeyValuePair<string, string>> args)
{
    var coll = HttpUtility.ParseQueryString(String.Empty, _urlEncoder);
    foreach (var arg in args)
    {
        coll.Add(arg.Key, arg.Value);
    }
    return coll.ToString();
}

First I rewrote it in F# the following pretty naïve way, and forgot about it for a while:

member this.BuildQuery(args : IEnumerable<KeyValuePair<string, string>>) : string =
    let coll = HttpUtility.ParseQueryString(String.Empty, urlEncoder)
    for arg in args do
        coll.Add(arg.Key, arg.Value)
    coll.ToString()

But today I recurred to it and rewrote in a better way:

member this.BuildQuery(args : IEnumerable<KeyValuePair<string, string>>) : string =
    let coll = HttpUtility.ParseQueryString(String.Empty, urlEncoder)
    args |> Seq.iter (fun arg -> coll.Add(arg.Key, arg.Value))
    coll.ToString()