Writing data to Opensearch servers
Opensearch is a NoSQL database (i.e. it's a database of JSON Documents). From version
1.21.9 and onwards, the blipdataforge library offers a new facade named write_to_opensearch(),
which you can use to write data from a Spark DataFrame into an Opensearch server.
Here is a quick example of usage of this facade:
from blipdataforge import DataPlatform
dp = DataPlatform()
server = "blip-opensearch.aivencloud.com"
port = 22751
user = "<user>"
password = "<pass>"
index = "cdp-tenant-contacts-gaia"
df = spark.sql("""
SELECT *
FROM blipraw.raw.messages
LIMIT 5
""")
responses = dp.write_to_opensearch(
df,
host=server,
port=port,
user=user,
password=password,
index=index,
chunk_size=100,
)
Different backends with different trade-offs
Currently, the write_to_opensearch() facade supports two different backends, which are:
- Opensearch Spark native driver.
- Opensearch Python SDK.
By default, the facade always use the Spark native driver backend cited above. But, if you prefer, you can use
the Python SDK backend by setting the use_spark_native_writer argument of the facade to False.
Opensearch Spark native driver backend
This is the default backend and the recommend one to use because it scales much better when we are dealing with huge amounts of data. In other words, the first backend is much more appropriate for our big data environment.
IMPORTANT: in order to use this backend, you need to prepare your environment before you use it.
That is, in order to use this native driver backend you must have a specific Maven package installed in your
environment/cluster. More specifically, the opensearch-spark package:
https://central.sonatype.com/artifact/org.opensearch.client/opensearch-spark-30_2.12
To make things simpler, just add the following Maven package reference into the list of "dependent libraries"
in your cluster or pipeline configuration, and should be good to go: org.opensearch.client:opensearch-spark-30_2.12:1.3.0.
When you use this backend, the function always check first for the presence of the native driver in your environment. If it
does not find this native driver, then, you likely does not have the Maven package cited above installed
in your environment, and therefore, an AssertionError is raised by the function, warning
you about this problem.
Warning
Although this backend brings good performance and scalability, this backend also comes with bad visibility. In other words, this Spark native driver backend does not provide very useful/informative error messages for the users.
Therefore, if there is something wrong in your code (e.g. you may forgot to change the hostname of your server in the code, or, you are using the wrong credentials, or, your data might have a different schema than the documents that are already present in your index, etc.) you will probably have some hard time while debugging if you are using the Spark native driver backend.
The most likely error that you might encounter while using this backend is a "403/Forbidden" error message, which basically means that, with the inputs that you provided, the driver is asking the Opensearch server to perform an operation that you don't have enough authorization to perform. However, this is kind of generic, i.e. this "forbidden" can mean a lot of things. There are many different operations that you may or may not have enough authorization to perform in the database.
For example, maybe, the data that you have provided does not follow the schema of the JSON documents that are already indexed in the database index that you are using. If that is your case, then, you will likely cause a "refresh the index" operation in the database, and this operation essentially needs to redefine/recreate the index in the database, and, you might not have enough authorization to perform this "refresh the index" operation, therefore, causing a "403/Forbidden" error.
This is just one example, but there are many other examples of operations that you might be silently causing/triggering through this backend, and that you do not have enough authorization to perform. Unfortunately, the error messages provided by this backend are really poor in details. So you will probably need to do a lot of testing, until you find the perfect combination of inputs and data that work for your case.
Opensearch Python SDK backend
If you are working only with reasonably small Spark DataFrames, you might
prefer to use the Python SDK backend of the function, instead of the recommended Spark native
driver backend. You can select this backend by setting the use_spark_native_writer argument of the facade to False.
This backend handles moderate DataFrames decently well. But it does not scale very well with huge amounts of data. If you try to use it with big Spark DataFrames, you will likely get error messages about "not enough memory in the cluster". So try to use it only with small/moderate Spark DataFrames.
Although it does not scale very well, this specific backend offers one advantage, which is much better visibility of "what is happenning" behind the scene. Because the Python SDK have much better and useful error messages that you can use to debug your application, and also, it returns a list containing the raw HTTP responses of the server, which gives you a lot of information about the write operations performed in the server. Therefore, you can easily see if any of your records was not succesfully recorded in the server, for whatever reason.
This is an example of the type of return value that you get from the facade when you are using this backend:
[
{
'took': 3, 'ingest_took': 0, 'errors': False, 'items': [
{
'update': {
'_index': 'cdp-tenants',
'_id': '0', '_version': 12,
'result': 'updated', '_shards': {},
'status': 200
}
}
]
},
# ... one item per chunk of data sent to the server
]
ID column
Every record in a No-SQL database such (as Opensearch) needs to have an associated ID. Depending on the type of operation that you are performing in the database, this ID can be generated at runtime by the database for each record that you ask it to index/register.
However, when you use an "upsert" mode in your write operation, then, you need to provide the ID's of each record upfront. What this means is that you need to have a column in your Spark DataFrame that contains the ID's of each record, when you use the "upsert" write mode.
You can specify which column that contains the ID's of your records through the id_column
argument in the facade. Just provide a string with the name of the column that contains
such ID's in this argument, and you should be good to go.
Therefore, if you set the op_type argument to "upsert" but not set the id_column
argument, the function will raise an error warning you about this mistake.
Chunk size
The chunk_size argument defines the maximum number of rows from your Spark DataFrame
that are included in each chunk that is sent to the Opensearch server.
For example, if your DataFrame contains 10000 rows, and you set chunk_size to 1000, then, this
function will split your DataFrame into 10 chunks, and send each chunk sequentially
to the Opensearch server.
By controlling the size of the chunk that is sent to the server, you can also control how much data (or, how much "stuff") the server needs to process in each request. This is useful if your Opensearch server have a rate limit, or, a if it does not have much powerful resources to process huge amounts of data in a single request.
Authentication
Currently, this facade supports only basic HTTP auth, with a user and a password. You
can set these credentials through the user and password arguments of the facade.
DNS and server location
The host and port arguments of the facade define where the Opensearch server is,
and where the connection is established.
Spark write mode
Usually, you won't have to care about which Spark write mode is being used when using
this "write to Opensearch" facade. By default, the write_to_opensearch() facade uses
the Spark write mode "append" when writing your data. But if you want to, you can change
the Spark write mode used by setting the write_mode argument of the facade.
Is worth mentioning that, if you are using the Python SDK backend, then, the Spark write mode is automatically ignored, and, therefore, have zero effect.
With the op_type argument you specify which type of operation you want to use to insert the
new data into the Opensearch database. On the other hand, by setting the Spark write mode,
you are adding a new effect into the process. Usually, you want to use the Spark
write mode "append" in most situations, because this write mode does not introduces any
new effect into the mix. In other words, the write mode "append" does nothing, it "means
nothing" to the Opensearch Spark native driver.
But, if you set the Spark write mode to anything else, then, you start to introduce new operations in the mix. In the list below, you can see which effects each Spark write mode has on the process:
overwrite: adeleteoperation is performed in the Opensearch index before the Spark data is written into the Opensearch index.errororerrorifexists: if the Opensearch index is empty (i.e. document count is zero, or, the index doesn't even exist), nothing happens, the Spark data is written into the Opensearch index as expected. Otherwise, an exception is raised.ignore: if the Opensearch index is empty (i.e. document count is zero, or, the index doesn't even exist), the Spark data is written into the Opensearch index as expected. Otherwise, the Spark data is completely ignored, that is, it is not written into the Opensearch index.