Have you ever wanted to process in near real time new files added to your Azure Storage account (BLOB)? Have you tried using Azure EventHub but files are too large to make this a practical solution?
Let me present you to ABS-AQS file source…

Optimized Azure Blob Storage File Source with Azure Queue Storage
Databricks have added the support of near real time processing of Changes via the Storage Account event processing via Storage queues.
Detailed in their documentation, you can setup a Databricks readstream to monitor the Azure Storage queue which tracks all the changes.
In order to make this work, you will need a few things as detailed here:
- An Azure Storage Account (BLOB)
- Create a storage queue
- Setting up events using Storage Queue as the end point.
- Generate a connection string for Databricks to use and note a storage account key.
Doing step #2 will ask you to create an EventGrid Subscription. This part was confusion to me, so here’s how it should be done.

I recommend unchecking the “Subscribe to all event types”. This will at best highlight all the events you want to process. For example, you don’t care for files that are deleted.
You will need to select Storage Queues for the Endpoint Type

Doing the above will add a new configuration for Endpoint which you will need to set to the storage queue created above.
Finally, select Event Grid Schema, which is detailed here, for the Event Schema.

Testing your setup
If you did everything correctly, you should see records created when you drop files in your BLOB storage account

Processing Events in Databricks
Now that the nuts and bolts are done, you can now process the events in Databricks.
Setting up WASB access to blob
spark.conf.set( "fs.azure.account.key.<STORAGE ACCOUNT NAME>.blob.core.windows.net", <STORAGE ACCOUNT KEY>)
Specifying the layout of the files to process
In this example, I will process JSON deposited in the BLOB Storage Account. Doing so will require me to specify a schema. This can be defined using StructType.
For example, the following JSON:
{
"Amounts": {
"amount": "14131441179.6617",
"currencyCode": "EU"
}
}
Will be represented as the following schema:
val schema =
StructType( List (
StructField("Amounts",
StructType(List(
StructField("amount",StringType,true),
StructField("currencyCode",StringType,true)
))
))
)
Setup the Structured Streaming
When setting up the input stream, you will need to specify a few things:
format: | This should be set to abs-aqs |
fileFormat: | The format of the files such as parquet, json, csv, text, and so on. |
queueName: | The name of the Storage Account Queue created earlier |
multiLine: | This allows your JSON to formated on multiple lines |
ignoreDeletes: | Optional, this ignore deleted events |
schema: | The structured schema defined for the JSON |
See this link for complete details on all the options available.
You can now execute the following Scala code to prep the stream:
val inputStream = spark
.readStream.format("abs-aqs")
.option("fileFormat", "json")
.option("queueName", "<QUEUE NAME CREATED>")
.option("multiLine", true)
.option("ignoreDeletes", "true")
.option("connectionString", <STORAGE ACCOUNT CONNECTION STRING>)
.schema(schema)
.load()
Starting the stream
You are now ready to capture the streaming events coming from your Azure Storage. The following will simply output to the screen but you could much more, like saving to a Delta table.
Display(inputStream)
Thanks!
Hi Benjamin,
Is there a way to readStream the json message that is added to the queue instead of the file itself? So I want my readStream to return the json that EventGrid adds to the queue (topic, subject, eventType, eventTime, id, data…)
Thanks in advance.
Mego
The json is stored in an Azure table which you could read or push the table entry to an EventHub.
Hi Benjamin, Thanks for the article.
Is the “abs-aqs” supported only when running script on databricks cluster ?
fileStreamDF = sparkSession
.readStream.format(“abs-aqs”)
Not sure what you mean by scripts on the DB cluster. But Databricks has the necessary libraries to readStream from the tables in Azure storage. Not sure those libraries are available outside ADB.
Thanks for the very informative article. I have a question on the schema requirements
In your example if say your input randomly changes to
{
“Amounts”: {
“country”: “Germany”,
“currencyCode”: “EU”
}
}
Would you be able to capture the common field “currencyCode”
Even with the same columns, I maybe wrong but I find it fails when the input order does not match the schema. Say if the input is like
{
“Amounts”: {
“currencyCode”: “EU”,
“amount”: “14131441179.6617”,
}
}
Any way to handle scenarios like these?
Thanks in advance! Much appreciate any feedback.
-Jay
Position should really matter for JSON as it’s made to evolve. If you define the schema properly, it should be able to reconcile based on the name and not position.
Can we use the same queue to run two different streaming pipelines.
You would need to use event grid back to Table queues. But yes, it’s possible