Bulk load to Azure SQL Database with  Structured Streaming

Bulk load to Azure SQL Database with Structured Streaming

As you may have noticed, my go to language is Scala when it comes to Azure Databricks and Spark. With that said, Scala has a great library to read and write data to Azure SQL Database. Along with some great options, the one that stands out the most is the ability to bulk load.

See here for details on this library.

I’ve been asked a few times about doing the same in Python; easier said than done. You basically have 2 options (that I’ve found): JDBC or PyODBC. PyODBC is great and it also allows you to execute stored procedures but it’s a Python library and not PySpark. The difference? One is distributed and made to take advantage of Databricks clusters. As for JDBC, that works fairly well but struggled on how to push batches at a time in a Structured Streaming setting.

Structured Streaming? Why?

Structured streaming is a nice feature in Azure Databricks that allows you to perform the computation incrementally and continuously updates the result as streaming data arrives. This method as becomes more popular than the traditional Spark streaming.

Here’s a good example for both Scala and Python: https://docs.microsoft.com/en-us/azure/databricks/_static/notebooks/structured-streaming-python.html

So, What’s this blog post about?

Glad you asked 🙂 I will be showing how to take a structured streaming and batch push to Azure SQL Database.

Prerequisites

In order to execute the code in this blog post, the following are required:

Azure SQL Database:You can create a small singleton DB
Server / Database properties:– Server Name
– Database Name
– SQL User Name
– SQL User Password
An Azure Databricks workspace with cluster:Cluster can have 1 worker for this example

My Notebook

Below is the notebook I used in Databricks.


Conclusion

As this notebook run, you will notice your SQL DB table increase in batches of 100k records.

You can play with the partition count to increase parallelism and / or the rows per second to increase the batch. Monitor as you increase as you may saturate the JVM processes for Spark or the Azure SQL Database throughput.

Hope this helps!

CI/CD with Databricks and Azure DevOps

CI/CD with Databricks and Azure DevOps

So you’ve created notebooks in your Databricks workspace, collaborated with your peers and now you’re ready to operationalize your work. This is a simple process if you only need to copy to another folder within the same workspace. But what if you needed to separate your DEV and PROD?

Things get a little more complicated when it comes to automating code deployment; hence CI/CD.

Managing passwords and secrets in Azure Databricks with KeyVault

Managing passwords and secrets in Azure Databricks with KeyVault

When it comes to securing passwords or exchanging secrets between users sharing the same notebooks or simply not have them in clear texts, changes in the integration between Databricks and Azure have made things even easier… I’m referring to secrets within Databricks and Azure KeyVault.

Managing passwords and connection strings with Databricks

Managing passwords and connection strings with Databricks

(Update) At the time of writing this article, integration with Azure KeyVault didn’t exist. Things evolved and suggestions proposed in this post are no longer best practices. See my new blog posts on how to leverage secrets to manage passwords in a more secure way.

https://thedataguy.blog/2018/12/05/managing-passwords-and-secrets-in-azure-databricks-with-keyvault/

Ever had to connect to a Data base or any other systems that require a username and password? Ever wonder how to best manage the sensitive nature of these types of information?

Although hardcoding them in your notebook works when working solo, things can get complicated when working in a collaborative mode or if you wish to share your work with others.

There are many ways this can be tackled but the best way I’ve found is to store the sensitive information on remote storage; Azure Storage Accounts (WASB) or Azure Data Lake.

The below outlines how to handle database connection strings with credentials stored in WASB using JSON.

First would be to create the JSON file containing the connection strings. Here’s an example of a format that can be use:

{  
  "Connections":[  
     {  
        "Name":"DataBricksLab",
        "Type":"jdbc",
        "Properties":{ 
           "ConnectionString":"jdbc:sqlserver://***.database.windows.net:1433;database=***;user=***;password=***;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
        }
     },
     {  
        "Name":"ContosoRetailDW",
        "Type":"jdbc",
        "Properties":{  
           "ConnectionString":"jdbc:sqlserver://***.database.windows.net:1433;database=***;user=***;password=***;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
        }
     }
  ]
}

Then, you will need to upload this file to your Azure Blob storage Account.See this link for instructions on how to create a new Azure Storage Account: Create a storage account

Then you will need to upload your file into a container. There are multiple ways of transferring data to your BLOB; Azure Portal, Powershell, CLI, Import/Export tool, Azure Storage Explorer or AzCopy.

Now comes the fun stuff! In your notebook, I created a setup folder under your user in which I have places some scala code to read, parse and make available your connection strings. This approach will allow you to share any work you’ve done without giving your shared secret and makes this reusable.

Example of folder structure:

Let’s go through the UserSecretSetup notebook which contains all the code used to get and parse the JSON file

First, I’ve setup some parameters which allow me to specify at runtime the connection details to my BLOB:

dbutils.widgets.text("inputConfigWASBName","","WASB Storage Account Name")
val inputConfigWASBName = dbutils.widgets.get("inputConfigWASBName")

dbutils.widgets.text("inputConfigWASBContainerName","","WASB Container Name")
val inputConfigWASBContainerName = dbutils.widgets.get("inputConfigWASBContainerName")

dbutils.widgets.text("inputConfigWASBSAS","","WASB SAS token")
val inputConfigWASBSAS = dbutils.widgets.get("inputConfigWASBSAS")

dbutils.widgets.text("inputConfigFile","", "Name of JSON Config File")
val inputConfigFile = dbutils.widgets.get("inputConfigFile").trim

Then I setup Spark with the necessary configuration information to my Blob storare:

spark.conf.set(
  s"fs.azure.sas.$inputConfigWASBContainerName.$inputConfigWASBName.blob.core.windows.net",
  inputConfigWASBSAS)

*HINT, notice how I dynamically set the storage account name, container name, etc? In Scala you need to prefix your string with “s” then insert your variable name prefixed with “$”: example $inputConfigWASBName

Once that is done I’m ready to read my JSON file from my BLOB storage and store in a data frame:

val json_df = spark.read
    .option("multiline", true)
    .json(s"wasbs://$inputConfigWASBContainerName@$inputConfigWASBName.blob.core.windows.net/$inputConfigFile")

*HINT, when reading JSON, by default, spark.read.json expects all records to be on a single line.

Example :

,{"MLrate":"31","Nrout":"0","up":null,"Crate":"2"} 
,{"MLrate":"30","Nrout":"5","up":null,"Crate":"2"}
Adding .option(“multiline”, false) allows formatted JSON documents to be read.

Finally, I extract the connection string information from the dataframe and create a Spark SQL temp view which allows other spark contexts to read; i.e. Python:

import org.apache.spark.sql.functions._

val connections = json_df.select(explode($"Connections").as("Connection"))
val connection = connections.select("connection.Name", "connection.Type","connection.Properties.ConnectionSting")

connection.createOrReplaceTempView("conn_df")

Now you have all the connection strings loaded in a Spark SQL temp view which can be read from any other notebooks.

Here’s an example of executing the setup notebook while passing parameters:

%run ../setup/UserSecretSetup $inputConfigWASBName="storage account name" $inputConfigWASBContainerName = "container name" $inputConfigWASBSAS = "SAS container token" $inputConfigFile = "configFile.json"

Once executed, you can get the connection string you want:

Scala:

%scala
val connString = spark.sql("SELECT ConnectionSting FROM conn_df WHERE Name = 'ConnectionName'").first().getString(0)

Python:

%python
connString = spark.sql("SELECT ConnectionSting FROM conn_df WHERE Name = 'ConnectionName'").select("ConnectionSting").collect()[0][0]

Link to my setup notebook which can be imported in your workspace :