How to do incremental ingests in your table?
A significant part of working with Big Data is about doing small/incremental tasks. Instead of dealing with all the data at once, we normally deal with just small parts of it, one part at a time.
The blipdataforge library offers the get_latest_record_date() method, which
will help you to do incremental ingests (or "incrementally add new data") to your table.
This function is rather similar to the initial_info() function from the TakeSpark
library.
The sales_per_day table
As an example, let's use the sales table below, which represents the sales of a
fictitious product. Notice that I am saving this table in the
location dageneral_sandbox.blipdataforge.sales
into the Data Lake, using the write() method of the DataPlatform class:
from pyspark.sql import SparkSession
from blipdataforge.facades import DataPlatform
from datetime import datetime, date
spark = SparkSession.builder.getOrCreate()
dp = DataPlatform()
data = [
{'id': 1, 'value': 2311, 'date': date(2024,6,11), 'timestamp': datetime(2024,6,11,14,23,1)},
{'id': 2, 'value': 1095, 'date': date(2024,6,11), 'timestamp': datetime(2024,6,11,17,5,15)},
{'id': 3, 'value': 1544, 'date': date(2024,6,11), 'timestamp': datetime(2024,6,11,19,23,9)},
{'id': 4, 'value': 1987, 'date': date(2024,6,11), 'timestamp': datetime(2024,6,11,21,8,46)}
]
sales = spark.createDataFrame(data)
dp.write(
sales,
catalog="dageneral_sandbox",
database="blipdataforge",
table="sales",
write_mode="overwrite"
)
Let's suppose that new sales are made, every day, and in big volumes (like a thousand sales per day).
So this sales table is regurlarly feed with new data. New data come and it is inserted every
day into this table.
Now, let's suppose you have a second and more refined table called sales_per_day,
which contains the aggregate count of sales made per day. So, every time that
the sales table get's updated, you also need to update the data in the
sales_per_day table. Here is the code that creates this table:
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import col, count
sales = spark.table("dageneral_sandbox.blipdataforge.sales")
def sales_per_day(sales_df: DataFrame) -> DataFrame:
return sales_df\
.groupBy('date')\
.agg(count(col('date')).alias('n_sales'))
dp.write(
sales_per_day(sales),
catalog="dageneral_sandbox",
database="blipdataforge",
table="sales_per_day",
write_mode="overwrite"
)
How get_latest_record() helps you to update your tables?
With the get_latest_record() method, you can get the
maximum (or latest) date of records that are stored in your table
at the moment. This helps you to determine if your table already
contains data that describes the events of a particular date.
When you are incrementally updating your table, you are always
working with the latest dates in the calendar. With the get_latest_record() method,
you can check if your table already contains the data of
these latest dates. With this you can apply strategies to avoid duplicating
the data in your table.
As an example, if I run the get_latest_record_date() method over the sales_per_day table,
I will get a datetime.date object as result, containing the most recent
date of data present in this table. In the result below
we can see that the latest records written into the sales_per_day table
describes sales made in the date 2024-06-11.
dp.get_latest_record_date(
catalog="dageneral_sandbox",
database="blipdataforge",
table="sales_per_day",
date_column="date"
)
What this information give to us? It says that we already have data about
the date 2024-06-11 written into the table. And it also says that
the table is updated until this precise date. So, at this moment,
the table does not have data about future dates like 2024-06-12, or 2024-06-13, and so on.
So, if in this moment, the sales table receives data about a future date like 2024-06-12,
I know that I can simply append this new data safely into the sales_per_day. Because
I know that the sales_per_day table does not have any data about the 2024-06-12, I mean... not yet.
And because it does not have yet such data, I know that I will not create duplicates into the data.
The code example below, demonstrates this logic of using the result from get_latest_record_date()
as a safeguard to your data pipeline.
from datetime import date
today = date.today() # Suppose it returns "2024-06-12"
latest_record_date = dp.get_latest_record_date(
catalog="dageneral_sandbox",
database="blipdataforge",
table="sales_per_day",
date_column="date"
)
if latest_record_date >= today:
message = (
"The table already have data about this particular date. "
"Raising this exception to avoid inserting duplicated "
"data into the table."
)
raise Exception(message)
sales = spark.table("dageneral_sandbox.blipdataforge.sales")\
.filter(col('date') == today)
dp.write(
sales_per_day(sales),
catalog="dageneral_sandbox",
database="blipdataforge",
table="sales_per_day",
write_mode="append"
)
So, the code example above is an example of strategy you can use in conjunction
with get_latest_record_date() to correctly update your tables in the Data Lake.
But you can use the result of get_latest_record_date() in multiple ways to
apply different strategies in your data pipeline.
Instead of raising an Exception to halt the execution of you data pipeline, you
might want to change the write mode for example in the write() method.
When latest_record_date >= today you might want, for example, to use
a merge operation instead of using a simple append. Like this:
import warnings
from datetime import date
today = date.today() # Suppose it returns "2024-06-12"
latest_record_date = dp.get_latest_record_date(
catalog="dageneral_sandbox",
database="blipdataforge",
table="sales_per_day",
date_column="date"
)
sales = spark.table("dageneral_sandbox.blipdataforge.sales")\
.filter(col('date') == today)
if latest_record_date >= today:
message = (
"[WARNING]: The table already have data about this particular date. "
"Changing the strategy to merge!"
)
warnings.warn(message)
cond = "target.date = source.date"
dp.write(
sales_per_day(sales),
catalog="dageneral_sandbox",
database="blipdataforge",
table="sales_per_day",
write_mode="upsert",
merge_condition=cond
)
else:
dp.write(
sales_per_day(sales),
catalog="dageneral_sandbox",
database="blipdataforge",
table="sales_per_day",
write_mode="append"
)