Tag Archives: azure data factory

How to move data to ADLA table using ADF

In Universal Store Team, the Universal Payout Platform – Earnings Calculations project, we need to move data from on-prem SQL Server, as well as SQL Server within an Azure vnet and SQL Azure, to the Cloud. As our storage in the cloud we chose Azure Data Lake storage (ADLS) and as data movement vehicle we chose Azure Data Factory (ADF). Then we use Azure Data Lake analytics (ADLA) as our Big Data processing platform.

Here’s few simple steps how we’ve managed to copy data from SQL to ADLS and invoke U-SQL script to insert this data into ADLA table.

Prerequisites:

  • You have Azure subscription
  • You have ADF, ADLS and ADLA accounts setup
  • ADF is authorized to access ADLS and ADLA
    • As of today, ADLS linked service for authorization supports both personal OAuth user token and Azure service principal
    • ADLA supports only OAuth. However the support for the latter is coming. Please vote to expedite.

Steps:

  1. Create new pipeline. It will contain 2 sequential activities:
    {
      "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json",
      "name": "Pipeline-CopyDataFromSqlToAzureDataLakeStore",
      "properties": {
        "start": "{start}",
        "end": "{end}",
        "activities": [
        ]
      }
    }
    
  2. Add an activity to move the data. It will execute T-SQL stored procedure dbo.GetData() accepting 2 parameters of type DateTime2(0) which represent slice start and end:
    {
      "name": "CopyData",
      "type": "Copy",
      "inputs": [
        {
          "name": "Dataset-Source-Sql_data"
        }
      ],
      "outputs": [
        {
          "name": "Dataset-Destination-AzureDataLakeStore_data"
        }
      ],
      "typeProperties": {
        "source": {
          "type": "SqlSource",
          "sqlReaderQuery": "$$Text.Format('exec dbo.GetData \\'{0:yyyy-MM-dd HH:00:00}\\', \\'{1:yyyy-MM-dd HH:00:00}\\'', SliceStart, SliceEnd)"
        },
        "sink": {
          "type": "AzureDataLakeStoreSink",
          "writeBatchSize": 0,
          "writeBatchTimeout": "00:00:00"
        }
      },
      "policy": {
        "concurrency": 3,
        "executionPriorityOrder": "OldestFirst",
        "retry": 3,
        "timeout": "01:00:00"
      },
      "scheduler": {
        "frequency": "Hour",
        "interval": 1
      }
    }
    
  3. It needs the corresponding dataset which represents the actual activity/pipeline output:
    {
      "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
      "name": "Dataset-Destination-AzureDataLakeStore_data",
      "properties": {
        "type": "AzureDataLakeStore",
        "linkedServiceName": "LinkedService-Destination-AzureDataLakeStore",
        "typeProperties": {
          "folderPath": "data",
          "fileName": "{Slice}.json",
          "partitionedBy": [
            {
              "name": "Slice",
              "value": {
                "type": "DateTime",
                "date": "SliceStart",
                "format": "yyyy-MM-dd_HH-mm"
              }
            }
          ],
          "format": {
            "type": "JsonFormat",
            "filePattern": "arrayOfObjects",
            "encodingName": "UTF-8"
          }
        },
        "external": false,
        "availability": {
          "frequency": "Hour",
          "interval": 1
        },
        "policy": {
        }
      }
    }
    
  4. Add an activity to insert data into table. It will execute U-SQL procedure mydb.dbo.InsertData() again accepting 2 parameters of type System.DateTime which represent slice start and end. Please note that to run the activities sequentially the output of the former must be the input of the latter:
    {
      "name": "RunScript",
      "type": "DataLakeAnalyticsU-SQL",
      "linkedServiceName": "LinkedService-Destination-AzureDataLakeAnalytics",
      "inputs": [
        {
          "name": "Dataset-Destination-AzureDataLakeStore_data"
        }
      ],
      "outputs": [
        {
          "name": "Dataset-Destination-AzureDataLakeStore_dummy"
        }
      ],
      "typeProperties": {
        "script": "mydb.dbo.InsertData (Convert.ToDateTime(@sliceStart), Convert.ToDateTime(@sliceEnd));",
        "parameters": {
          "sliceStart": "$$SliceStart",
          "sliceEnd": "$$SliceEnd"
        },
        "priority": 1,
        "degreeOfParallelism": 3
      }
    }
    
  5. Since all activities, even those which don’t produce any output, need the corresponding dataset, create a dummy one:
    {
      "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
      "name": "Dataset-Destination-AzureDataLakeStore_dummy",
      "properties": {
      "type": "AzureDataLakeStore",
      "linkedServiceName": "LinkedService-Destination-AzureDataLakeStore",
      "typeProperties": {
        "folderPath": ""
       },
       "external": false,
       "availability": {
         "frequency": "Hour",
         "interval": 1
       },
       "policy": {
       }
     }
    }
    
  6. Create a U-SQL script to create the destination ADLA table. You can run it once:
    CREATE TABLE IF NOT EXISTS MyDataTable
    (
      Id long,
      Value string,
      CreatedDateTime DateTime,
    
      INDEX IX_MyDataTable_Id
      CLUSTERED (Id ASC)
    )
    DISTRIBUTED BY DIRECT HASH (Id);
    
  7. Create a U-SQL script to create an ADLA procedure which will read newly created file, parse and insert its content into the destination table:
    DROP PROCEDURE IF EXISTS InsertData;
    CREATE PROCEDURE InsertData (@sliceStart DateTime, @sliceEnd DateTime)
    AS
    BEGIN
      REFERENCE ASSEMBLY [Microsoft.Analytics.Samples.Formats];
      REFERENCE ASSEMBLY [Newtonsoft.Json];
    
      USING Microsoft.Analytics.Samples.Formats.Json;
    
      DECLARE @input = "/data/{CreatedDateTime:yyyy}-{CreatedDateTime:MM}-{CreatedDateTime:dd}_{CreatedDateTime:HH}-{CreatedDateTime:mm}_{CreatedDateTime:ss}.json";
    
      @data =
        EXTRACT Id long,
            Value string,
            CreatedDateTime DateTime
        FROM  @input
        USING new JsonExtractor();
    
      @filtereddata = 
        SELECT Id, Value, CreatedDateTime
        FROM @data
        WHERE CreatedDateTime >= @sliceStart and CreatedDateTime < @sliceEnd;
    
      INSERT INTO MyDataTable
      (
           Id, Value, CreatedDateTime
      )
      SELECT Id, Value, CreatedDateTime
      FROM @filtereddata;
    END;
    

Conclusion

That’s it. Next time the pipeline runs, first activity will move the data within the specified time window as a file in ADLS, second activity will insert the data into the table by invoking a script in ADLA.

Happy data movement!

How to deploy Azure Data Factory pipeline and its dependencies programatically using PowerShell

Since ADF unfortunately doesn’t provide a built-in way for automated deployment, to do this you have to write a custom script. Then you can run it on build server such as VSTS or TeamCity.

If you’re in the interactive mode, you need to login and select the corresponding subscription. If you run the script on build server, the subscription should be selected for you automatically.

Login-AzureRmAccount
Select-AzureRmSubscription -SubscriptionName YourSubName

Here’s the parameters you can’t get programmatically so need to specify explicitly. Last parameter is a path to JSON files that represent the resources to be deployed: linked services, datasets, and pipelines.

param(
    [Parameter(Mandatory=$true)][string]$ResourceGroupName,
    [Parameter(Mandatory=$true)][string]$Location,
    [Parameter(Mandatory=$true)][string]$DataFactoryName,
    [Parameter(Mandatory=$true)][string]$InputFolderPath
)

First, create the factory itself. Flag -Force helps to continue on error if it already exists. Then load it into a variable:

New-AzureRmDataFactory -ResourceGroupName $ResourceGroupName -Name $DataFactoryName -Location $Location -Force -ErrorAction Stop
$dataFactory = Get-AzureRmDataFactory -ResourceGroupName $ResourceGroupName -Name $DataFactoryName -ErrorAction Stop

Next three loops read files based on the following convention:

  • Linked services start with LinkedService, e.g. LinkedService-Dev.json
  • Datasets start with Dataset, e.g. Dataset-Orders.json
  • Pipelines start with Pipeline, e.g. Pipeline-CopyOrders.json

First, create linked services. If any references a gateway then create it as well. Since the cmdlet doesn’t support flag -Force, we can use -ErrorAction Continue to continue on error if it already exists:

$files = Get-ChildItem $InputFolderPath -Recurse -Include *.json -Filter LinkedService* -ErrorAction Stop
foreach ($file in $files)  
{
    Write-Output "Creating linked services from $($file.FullName)"
    New-AzureRmDataFactoryLinkedService -DataFactory $dataFactory -File $file.FullName -Force -ErrorAction Stop

    $json = Get-Content $file.FullName -Raw -ErrorAction Stop
    $svc = $json | ConvertFrom-Json
    $gwName = $svc.properties.typeProperties.gatewayName
    if ($gwName)
    {
        Write-Output "Creating gateway $($gwName) from $($file.FullName)" 
        New-AzureRmDataFactoryGateway -DataFactory $dataFactory -Name $gwName -ErrorAction Continue
    }
}

Then create datasets:

$files = Get-ChildItem $InputFolderPath -Recurse -Include *.json -Filter Dataset* -ErrorAction Stop
foreach ($file in $files)  
{
    Write-Output "Creating dataset from $($file.FullName)"
    New-AzureRmDataFactoryDataset -DataFactory $dataFactory -File $file.FullName -Force -ErrorAction Stop 
}

And finally, pipelines:

$files = Get-ChildItem $InputFolderPath -Recurse -Include *.json -Filter Pipeline* -ErrorAction Stop
foreach ($file in $files)  
{
    Write-Output "Creating pipeline from $($file.FullName)"
    New-AzureRmDataFactoryPipeline -DataFactory $dataFactory -File $file.FullName -Force -ErrorAction Stop 
}

That’s it. By this time all pipelines should be deployed, verified and started. Happy data movement!