Write Spark DataFrames to Azure EventHub
As of version 1.18.0 of blipdataforge, a new facade (DataPlatform.write_eventhub()) was added, which allows you to write data from a Spark DataFrame into an Azure EventHub topic. In order to use this facade, you need to collect the following information:
- the name of the eventhub namespace that contains the topic.
- the name of the topic you want to write to.
- the connection string to connect with EventHub.
- optionally, the port where the EventHub server listens to. If not defined, the function uses the default port
9093.
Publisher vs subscriber
Azure EventHub is based on a "publishers versus subscribers" relationship. If you want to write data to Azure EventHub, then, it means that you want to be a "producer", that produces messages (or "arbitrary data") to an EventHub topic.
In contrast, if you want to read data from Azure EventHub, then, it means that you want to be a "subscriber". You want to subscribe to a EventHub topic, and start to listen and read the messages that are being produced in this topic.
Writing the data to the EventHub topic
Given that you have collected the information mentioned previously, you can write the data from you Spark DataFrame into the desired EventHub topic, by following like this:
from blipdataforge import DataPlatform
df = spark.table("catalog.database.table")
# Considering you run your code in Databricks, you should retrieve the connection string from a Databricks Secret Scope
conn_str = dbutils.secrets.get(scope='your-secret-scope', key='key-to-your-connection-string')
# You can do some transformation here.
dp = DataPlatform()
dp.write_eventhub(
df=df,
namespace="eventhub-namespace",
topic="eventhub-topic",
conn_str=conn_str
)
The code above will produce one event per row in the dataframe, so considering the dataframe having 100000 rows, there will be produced 100000 events to the EventHub topic. The content of the isa JSON formatted string containg the column name and its value.
Tunning the EventHub write
Depending on you dataframe (sizing, # of records, etc), you may want to tune the streaming configuration to optimize the data transfer. For this, the write_eventhub() function accepts the parameters batch_size and request_timeout.
The batch_size controls the size (in bytes) of the batch that will be sent on each request. This setting has a default value of 524288 (aprox. 500KB). Raising this parameter too much can lead to a BufferExhaustedException.
The request_timeout (time expressed in miliseconds) controls how long the producer will wait for a response for each request. This value can be tunned to avoid a TimeoutException.
The linger.ms controls how long the producer will wait for new records before sending the batch to the broker. This configuration can improve the transmission by reducing the amount of "uncompleted" batch requests sent to broker. For more information, please refer to the official Kafka linger.ms documentation.
The code below shows how to set these configurations.