Writing a Spark DataFrame into the Data Lake
blipdataforge is a library that will help you get
your data in the right place.
How To?
If you have a Spark DataFrame containing the data from your product, or, your new API integration,
or some other source, and you want to save it in the Data Lake,
the blipdataforge library can help you get this done.
Inside of your python script or Databricks notebook you can import the DataPlatform class:
This class provides functionalities often used by Data Engineers, like read/write DataFrames from/to the Data Lake.
After you've imported the class, you must instantiate it and call the write() method, passing the required parameters:
from blipdataforge import DataPlatform
from pyspark.sql import SparkSession
from datetime import date
from pyspark.sql import Row
spark = SparkSession.builder.getOrCreate()
dp = DataPlatform()
data = [
Row(id = 1, value = 28.3, date = date(2021,1,1)),
Row(id = 2, value = 15.8, date = date(2021,1,1)),
Row(id = 3, value = 20.1, date = date(2021,1,2)),
Row(id = 4, value = 12.6, date = date(2021,1,3))
]
df = spark.createDataFrame(data)
# Write DataFrame to table at reference "clients_sandbox.product_analysis.sales"
dp.write(
df,
catalog="clients_sandbox",
database="product_analysis",
table="sales",
write_mode="overwrite",
table_comment="This table contains all the analysis done for product XYZ.",
column_comments={
"id": "The identification number of the analysis.",
"value": "The analysis output.",
"date": "The date where the product was analyzed."
}
)
You're now able to add any DataFrame to the Data Lake. If the operation fails an error will appear under your notebook cell or in the prompt when running locally.
For more details on how the write() method works, checkout the Facades section of the Library reference.
Write modes
When you write/save some data into the Data Lake, you need to specify how you want to save this
data. More specifically, you need to specify what the write() method should do in case
there is already some existing data in the Data Lake.
For example, if you save your data into a table in the Data Lake called product_analysis.sales,
what if this table already exists in the Data Lake? What the write() method should do in this case?
With the write_mode argument you can solve this question, by setting it to one of these modes:
"overwrite": if the table already exists in the Data Lake, the data present in this table is replaced by the data that you are currently saving. In other words, is like if the table was created again from scratch;"append": if the table already exists in the Data Lake, the data that you are currently saving is simply added to this table that already exists. This is the easiest mode to use when doing incremental updates in your tables;"upsert": uses the merge technology from the Delta Lake. If the table already exists in the Data Lake, the data that you are currently saving is added to this table that already exists by using a merge operation. If you use this mode, then, you must also use the argumentmerge_conditionto specify which are the merge conditions that you want to use, when thewrite()method performs the merge operation.
In the last example, we used write_mode = "overwrite", which means that, if the table already exists in the
Data Lake, then, this version of the table would be destroyed, and a new version of it would be created
with the data present in the df object.
Adding metadata to the table
Note
Adding metadata to table using write function is only supported in clusters with DBR 16.4 LTS or higher.
As of version 1.25.0 of the BlipDataForge library, it is possible to add table and columns descriptions by using the table_comment and column_comments. The table_comment accepts a string that will be used as table description, and column_comments accepts a dictionary where the keys are the columns names and the values are the respective columns descriptions, like {'column_1_name': 'Column 1 description.', 'column_2_name': 'Column 2 description.'}.
Also, starting from version 1.25.0, every write execution adds the notebook path as the table's userMetadata. To see this information just run the query DESCRIBE HISTORY <CATALOG>.<DATABASE>.<TABLE> and look the column userMetadata.
About catalogs and environments
As we described in the Identifying the current environment (DEV or PRD) section,
we do have different Databricks workspaces for each environment (dev or prd).
But we did not talked about catalogs yet.
In essence, we also have different catalogs to store your data in each environment. In other words, the data
that you produce in the dev environment is stored in a different catalog that
the data that you produce in the prd environment.
At least, that should be a core standard for you and your workflow.
The data that you produce in the dev environment is, for most of the time, highly experimental and unstable.
That is why, you should reserve a separate location for these "experiments", which are the sandbox catalogs.
In contrast, the data produced in the prd environment is much more stable, and much more trustworthy.
That is why we use the trustedzone catalogs in the prd environment.
A different catalog for each environment
So, the basic idea is: when you run your code in the dev environment, you should save
your data inside the sandbox catalog of your domain. But if you are running your code
in the prd environment, then, you should save the data inside the trustedzone catalog
of your domain.
Domains are: clients, dageneral, rdproduct, etc. So, we could list the avaialable catalogs for each domain as:
- Catalogs for clients:
clients_sandboxandclients_trustedzone. - Catalogs for dageneral:
dageneral_sandboxanddageneral_trustedzone. - etc.
This helps you to save only validated and high-quality data into the trustedzone catalog.
And, as a consequence, the users of your service will have a higher trust in using this catalog for operational purposes.
-- {DOMAIN_NAME}_trustedzone
+-- DATABASE1
+-- TABLE1
+-- TABLE2
+-- DATABASE2
+-- TABLE1
+-- TABLE1
Both production (prd) and development (dev) environments will have the same structure of database and tables changing only the catalog where the data belongs.
So, for example, if you write a notebook that creates a new
table through the DataPlatform.write() method, let's say you named this table as product_monitoring.sales, then,
when you run this notebook in the dev environment, the full reference to the table created will be
{DOMAIN-NAME}_sandbox.product_monitoring.sales. In contrast, if you run this
same notebook in the prd environment, then, the full reference to the table becomes
{DOMAIN-NAME}_trustedzone.product_monitoring.sales.
The blipdataforge library must be used in every write and drop operations.
When using the library we ensure that data will be stored in a standardized manner, making it easier to manage and maintain data governance policies. Further, the library helps prevent data leaks, reduces the risk of accidental data loss or misplacement, and promotes a more efficient and organized data handling process.
Below is described the requirements to use each environment and its restrictions.
Development (dev) environment
All sandbox catalogs are considered development environments, and they can be effectively used to test your ingestion scripts and other data-related operations.
In this setup, each domain has its own sandbox catalog, which follows the format {DOMAIN-NAME}_sandbox. This organization ensures that each domain can work in isolation without interfering with each other's data and operations.
Production (prd) environment
To run the Blip Data Forge library, each domain must have a designated service account. This service account will be responsible for executing all pipelines that update production tables. It is essential to ensure that proper access controls and permissions are granted to the service account, allowing it to perform the necessary operations while maintaining data security and governance. This approach helps to centralize and manage pipeline executions and ensures accountability for any changes made to production tables by using the Blip Data Forge library.
- Domain service account.
- Domain write permission to External Location.
- Service account PAT (personal access token) to run pipelines.
- Azure Data Factory (ADF) linked service with PAT created in item 3.
The production tables will be found under the catalog {DOMAIN-NAME}_trustedzone.