Tag Archives: azure data lake

How to move data to ADLA table using ADF

In Universal Store Team, the Universal Payout Platform – Earnings Calculations project, we need to move data from on-prem SQL Server, as well as SQL Server within an Azure vnet and SQL Azure, to the Cloud. As our storage in the cloud we chose Azure Data Lake storage (ADLS) and as data movement vehicle we chose Azure Data Factory (ADF). Then we use Azure Data Lake analytics (ADLA) as our Big Data processing platform.

Here’s few simple steps how we’ve managed to copy data from SQL to ADLS and invoke U-SQL script to insert this data into ADLA table.

Prerequisites:

  • You have Azure subscription
  • You have ADF, ADLS and ADLA accounts setup
  • ADF is authorized to access ADLS and ADLA
    • As of today, ADLS linked service for authorization supports both personal OAuth user token and Azure service principal
    • ADLA supports only OAuth. However the support for the latter is coming. Please vote to expedite.

Steps:

  1. Create new pipeline. It will contain 2 sequential activities:
    {
      "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json",
      "name": "Pipeline-CopyDataFromSqlToAzureDataLakeStore",
      "properties": {
        "start": "{start}",
        "end": "{end}",
        "activities": [
        ]
      }
    }
    
  2. Add an activity to move the data. It will execute T-SQL stored procedure dbo.GetData() accepting 2 parameters of type DateTime2(0) which represent slice start and end:
    {
      "name": "CopyData",
      "type": "Copy",
      "inputs": [
        {
          "name": "Dataset-Source-Sql_data"
        }
      ],
      "outputs": [
        {
          "name": "Dataset-Destination-AzureDataLakeStore_data"
        }
      ],
      "typeProperties": {
        "source": {
          "type": "SqlSource",
          "sqlReaderQuery": "$$Text.Format('exec dbo.GetData \\'{0:yyyy-MM-dd HH:00:00}\\', \\'{1:yyyy-MM-dd HH:00:00}\\'', SliceStart, SliceEnd)"
        },
        "sink": {
          "type": "AzureDataLakeStoreSink",
          "writeBatchSize": 0,
          "writeBatchTimeout": "00:00:00"
        }
      },
      "policy": {
        "concurrency": 3,
        "executionPriorityOrder": "OldestFirst",
        "retry": 3,
        "timeout": "01:00:00"
      },
      "scheduler": {
        "frequency": "Hour",
        "interval": 1
      }
    }
    
  3. It needs the corresponding dataset which represents the actual activity/pipeline output:
    {
      "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
      "name": "Dataset-Destination-AzureDataLakeStore_data",
      "properties": {
        "type": "AzureDataLakeStore",
        "linkedServiceName": "LinkedService-Destination-AzureDataLakeStore",
        "typeProperties": {
          "folderPath": "data",
          "fileName": "{Slice}.json",
          "partitionedBy": [
            {
              "name": "Slice",
              "value": {
                "type": "DateTime",
                "date": "SliceStart",
                "format": "yyyy-MM-dd_HH-mm"
              }
            }
          ],
          "format": {
            "type": "JsonFormat",
            "filePattern": "arrayOfObjects",
            "encodingName": "UTF-8"
          }
        },
        "external": false,
        "availability": {
          "frequency": "Hour",
          "interval": 1
        },
        "policy": {
        }
      }
    }
    
  4. Add an activity to insert data into table. It will execute U-SQL procedure mydb.dbo.InsertData() again accepting 2 parameters of type System.DateTime which represent slice start and end. Please note that to run the activities sequentially the output of the former must be the input of the latter:
    {
      "name": "RunScript",
      "type": "DataLakeAnalyticsU-SQL",
      "linkedServiceName": "LinkedService-Destination-AzureDataLakeAnalytics",
      "inputs": [
        {
          "name": "Dataset-Destination-AzureDataLakeStore_data"
        }
      ],
      "outputs": [
        {
          "name": "Dataset-Destination-AzureDataLakeStore_dummy"
        }
      ],
      "typeProperties": {
        "script": "mydb.dbo.InsertData (Convert.ToDateTime(@sliceStart), Convert.ToDateTime(@sliceEnd));",
        "parameters": {
          "sliceStart": "$$SliceStart",
          "sliceEnd": "$$SliceEnd"
        },
        "priority": 1,
        "degreeOfParallelism": 3
      }
    }
    
  5. Since all activities, even those which don’t produce any output, need the corresponding dataset, create a dummy one:
    {
      "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
      "name": "Dataset-Destination-AzureDataLakeStore_dummy",
      "properties": {
      "type": "AzureDataLakeStore",
      "linkedServiceName": "LinkedService-Destination-AzureDataLakeStore",
      "typeProperties": {
        "folderPath": ""
       },
       "external": false,
       "availability": {
         "frequency": "Hour",
         "interval": 1
       },
       "policy": {
       }
     }
    }
    
  6. Create a U-SQL script to create the destination ADLA table. You can run it once:
    CREATE TABLE IF NOT EXISTS MyDataTable
    (
      Id long,
      Value string,
      CreatedDateTime DateTime,
    
      INDEX IX_MyDataTable_Id
      CLUSTERED (Id ASC)
    )
    DISTRIBUTED BY DIRECT HASH (Id);
    
  7. Create a U-SQL script to create an ADLA procedure which will read newly created file, parse and insert its content into the destination table:
    DROP PROCEDURE IF EXISTS InsertData;
    CREATE PROCEDURE InsertData (@sliceStart DateTime, @sliceEnd DateTime)
    AS
    BEGIN
      REFERENCE ASSEMBLY [Microsoft.Analytics.Samples.Formats];
      REFERENCE ASSEMBLY [Newtonsoft.Json];
    
      USING Microsoft.Analytics.Samples.Formats.Json;
    
      DECLARE @input = "/data/{CreatedDateTime:yyyy}-{CreatedDateTime:MM}-{CreatedDateTime:dd}_{CreatedDateTime:HH}-{CreatedDateTime:mm}_{CreatedDateTime:ss}.json";
    
      @data =
        EXTRACT Id long,
            Value string,
            CreatedDateTime DateTime
        FROM  @input
        USING new JsonExtractor();
    
      @filtereddata = 
        SELECT Id, Value, CreatedDateTime
        FROM @data
        WHERE CreatedDateTime >= @sliceStart and CreatedDateTime < @sliceEnd;
    
      INSERT INTO MyDataTable
      (
           Id, Value, CreatedDateTime
      )
      SELECT Id, Value, CreatedDateTime
      FROM @filtereddata;
    END;
    

Conclusion

That’s it. Next time the pipeline runs, first activity will move the data within the specified time window as a file in ADLS, second activity will insert the data into the table by invoking a script in ADLA.

Happy data movement!