Skip to content

Write Spark DataFrames to Elasticsearch

Elasticsearch is a NoSQL database (i.e. it's a database of JSON Documents). As of version 1.23.0 of blipdataforge, the facade DataPlatform.write_elasticsearch() was added, which allows users to write data from a Spark DataFrame into an Elasticsearch database.

Info

This function depends on the org.elasticsearch:elasticsearch-spark-30_2.12:8.17.3 maven library, so please add it to your ADF pipeline activity or install it in your cluster.

Writing data to an Elasticsearch database

The following code shows how to write data from a pyspark dataframe to an Elasticsearch database using the write_elasticsearch() function.

from blipdataforge import DataPlatform

df = spark.table("catalog.database.table")

# Considering you run your code in Databricks, you should retrieve the api key from a Databricks Secret Scope
api_key = dbutils.secrets.get(scope='your-secret-scope', key='key-to-your-elasticsearch-api-key')

# You can do some transformation here.

dp = DataPlatform()
dp.write_elasticsearch(
    df=df,
    host="https://my-project.es.eastus.azure.elastic.cloud",
    port=443,     
    api_key=api_key,
    index="my-index"
)

The code above will send each row in the dataframe as a JSON document to the Elasticsearch database. Alternatively, you can provide arguments to change how the data is written (write_mode and op_type), a column to use ad document ID (id_column) and the batch size (chunk_size). The list of all available parameters can be found at the function's documentation.