Skip to content

Facades

Facades to use Blip's Data Platform.

This module provides Facades that abstract Blip's Data Platform functions and features.

DataPlatform

An interface for communication with Blip's Data Platform.

This interface contains the most popular functions used by Data Engineers in ETL pipelines. The methods in this class encpasulate methods of other classes present in this library and are intended to facilitate the library usage.

Source code in blipdataforge/facades.py
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
class DataPlatform():
    """An interface for communication with Blip's Data Platform.

    This interface contains the most popular functions used by Data Engineers
    in ETL pipelines. The methods in this class encpasulate methods of other
    classes present in this library and are intended to facilitate the library
    usage.

    """
    def __init__(self) -> None:
        """Initializes the class with current execution context."""
        self.context = get_context()
        self.logger = get_lib_logger(context=self.context)
        self._confs = BlipDataForgeConfigs("/blipdataforge/conf.ini")

    def write(
        self,
        df: DataFrame,
        write_mode: str,
        catalog: str,
        database: str,
        table: str,
        path: Optional[str] = None,
        schema_mode: Optional[str] = None,
        partition_by: Optional[List[str]] = None,
        merge_fields: Optional[List[str]] = None,
        merge_condition: Optional[Union[str, List[str]]] = None,
        merge_schema: Optional[bool] = False,
        table_comment: Optional[str] = None,
        columns_comments: Optional[Dict[str, str]] = None,
    ) -> None:
        """Writes a spark `DataFrame` on data lake.

        If there is a data contract registred for the table being written,
        this methods automatically performs an automated data quality check
        based on data contract information.

        Args:
            df:
                A spark dataframe to be written into data lake.
            write_mode:
                Specifies the write behavior. Accepts values `overwrite`,
                `append` and `upsert`.

                If using `overwrite` or `append`, the database and table
                are created if not exists.

                If using `upsert`, define `merge_condition` and `merge_fields`
                parameters to configure upsert execution.
            catalog:
                The target catalog to be used.
            database:
                The target database (also called schema) to save the dataframe.
                It creates the database if not exists.
            table:
                The target table that will receive the dataframe. It creates
                the table if not exists.
            table_comment:
                The table's commentary, also called table description.
                The commentary should contain information regarding the meaning
                of the data for the business.
            columns_comments:
                The commentaries of columns, also called columns descriptions.
                A column description gives a brief explanation on the meaning of
                that column regarding the table context.
            path:
                DISCLAIMER: This parameter exists only for the DSLab migration
                and will be deprecated soon.

                The output path to write the table data. If not defined,
                the path will be defined using governance rules.
            schema_mode:
                The schema mode used to write the table schema. Accepts modes
                `mergeSchema` and `overwriteSchema`.
                This parameter has privileges over `merge_schema`, so if both
                parameters are set, the `schema_mode` will be used and
                `merge_schema` will be ignored.
            partition_by:
                List of column names to be used as partitions.
            merge_fields:
                List of column names to insert or update on upsert operation.
                The columns on source dataframe must have the same name of the
                columns on target table.

                If using `write_mode='upsert'` and `merge_fields` is not
                defined, the upsert operation will try to update all colummns
                of source dataframe into the target table.
            merge_condition:
                The condition that upsert operation will evaluate to decide
                whether a source record should be updated or inserted on
                target table. This parameter is considered mandatory if
                using `write_mode='upsert'`.

                This parameter accepts a `str` containing the condition or a
                `list` of column names that will be compared on equality.

                Example using `list`: to upsert a dataframe into a table if
                columns "id" and "name" of source dataframe matches with
                columns "id" and "name" of the target table, the list
                `["id", "name"]` must be passed to `merge_condition`
                parameter. In this case, the upsert operation will be called
                using the condition
                `"target.id = source.id AND target.name = source.name"`.

                Example using `str`: to upsert a dataframe into a table using
                a custom condition, a string with the condition can be passed
                to `merge_condition`. In this case the user must provide the
                column names preceded by "target." and "source.", like
                `target.Id = source.InternalId AND
                target.StorageDateDayBR >= 2023-01-01`.
            merge_schema:
                `DEPRECATED`. This parameter is set to be removed in future
                releases. Use `schema_mode` instead.
                Enables delta lake merge schema. See
                https://delta.io/blog/2023-02-08-delta-lake-schema-evolution/
                for details.
        """
        data_writer = DataWriter(
            df=df,
            write_mode=write_mode,
            catalog=catalog,
            database=database,
            table=table,
            table_comment=table_comment,
            columns_comments=columns_comments,
            path=path,
            context=self.context,
            schema_mode=schema_mode,
            partition_by=partition_by,
            merge_fields=merge_fields,
            merge_condition=merge_condition,
            merge_schema=merge_schema,
            logger=self.logger
        )

        self._automated_quality_checks(
            catalog=catalog,
            database=database,
            table=table,
            df=df
        )

        data_writer.write()
        try:
            data_writer.write_metadata()
        except Exception as e:
            self.logger.error(f"Failed to add metadata: {str(e)}")

    def delete(
        self,
        catalog: str,
        database: str,
        table: str,
        predicate: str = "",
        dry_run: bool = False
    ) -> None:
        """Deletes the rows that match a predicate.

        The query executed to delete the records follow the format
        `DELETE FROM database.table [WHERE predicate]`, where `database`,
        `table` and `predicate` are set by the arguments passed to this
        function.

        Args:
            catalog:
                The catalog where the target table is.
            database:
                The database/schema where the target table is.
            table:
                The target table from which records will be deleted.
            predicate:
                The condition used to find the rows to delete. When no
                predicate is provided, deletes all rows.
            dry_run:
                If True, performs a trial run without making actual changes.
                Defaults to False.
        """
        delete(
            context=self.context,
            catalog=catalog,
            database=database,
            table=table,
            predicate=predicate,
            dry_run=dry_run
        )

    def get_latest_record_date(
        self,
        catalog: str,
        database: str,
        table: str,
        date_column: str,
        date_format: str = "",
        offset: int = 0
    ) -> Union[str, date, datetime, Any]:
        """Returns the date of the latest record in a table.

        Optionally, an offset can be applied to the result to get an
        earlier date.

        Disclaimer: This functions is an alternate for the old `initial_info`
        from `TakeSpark` library.

        Args:
            catalog:
                The target catalog to be used.
            database:
                The target database to be used.
            table:
                The target table to be used.
            date_column:
                The column which contains the date info of the latest record.
            date_format:
                The format of the date, according to the `datetime` format
                directives. This parameter is used to apply the offset when
                the date columns is of `str` type. If this parameter is not
                informed, the offset will not be applied.

                Example: if the dates on the `date_column` are provided like
                "2023-11-21", the `date_format` parameter must be `%Y-%m-%d`.
                Refer to Format Codes in the datetime python class docs to
                understand how to build this parameter.
            offset:
                The amount of days to subtract from the latest date. If the
                column that contains the date is from `str` type, the
                `date_format` parameter must be provided in order to apply the
                offset.

        Returns:
            The latest date in the informed column, with the offset applied
            if it's the case. The type of the return depends on the type of
            the provided date column on the source table.
        """
        return get_latest_record_date(
            context=self.context,
            catalog=catalog,
            database=database,
            table=table,
            date_column=date_column,
            date_format=date_format,
            offset=offset
        )

    def run_quality_checks(
        self,
        checks: str,
        scan_definition_name: str,
        df: Optional[DataFrame] = None,
        api_key_id: str = "",
        api_key_secret: str = "",
    ) -> dict:
        """Run quality checks on a table or dataframe.

        Args:
            checks:
                A string with Soda quality checks.

                If the dataset name in the check string is not in the format
                `catalog.database.table`, a MalformedDatasetNameException
                exception will be raised.

                Example: to run a check over a table, define the check string
                as follows:
                checks for catalog.database.table:
                    - row_count > 0
            scan_definition_name:
                This parameter is used to aggregate results of checks from the
                same context. This parameter is also used as partition column
                of checks result table on Data Lake.
            df:
                A Spark dataframe to be checked.
            api_key_id:
                The Soda Cloud API key id to send the results to the cloud.
            api_key_secret:
                The Soda Cloud API key secret to send the results to the cloud.

        Returns:
            This function returns a dict with all results reported by Soda in
            the quality checks.
        """
        soda_checker = SodaChecker(
            context=self.context,
            checks=checks,
            scan_definition_name=scan_definition_name,
            df=df,
            api_key_id=api_key_id,
            api_key_secret=api_key_secret
        )
        checks_results = soda_checker.run_checks()
        self._report_quality_checks(checks_results)

        data_writer = DataWriter(
            df=soda_checker.get_result_as_spark_dataframe(),
            catalog=get_domain_catalog(self.context),
            database="blipdataforge",
            table="quality_checks_results",
            context=self.context,
            write_mode="append",
            schema_mode="mergeSchema",
            partition_by=["definitionName"],
            logger=self.logger
        )

        data_writer.write()
        return soda_checker.checks_result

    def get_data_contract(
        self,
        catalog: str,
        database: str,
        table: str,
        api_auth_token: Optional[str] = ""
    ) -> Union[DataContract, None]:
        """Retrieve a data contract from Blip's Data Contract API.

        Args:
            catalog:
                The desired catalog
            database:
                The desired database
            table:
                The desired table
            api_auth_token:
                The token to authenticate on the API. It is possible to
                specify the API token through environment variable
                `DATA_CONTRACT_API_TOKEN`. If both `api_auth_token` param
                and `DATA_CONTRACT_API_TOKEN` env var are set, the
                `api_auth_token` param will be used.

        Returns:
            A DataContract object if data contract exists or `None` otherwise.
        """
        api_token = (
            api_auth_token
            if api_auth_token
            else self._confs.DATA_CONTRACT_API_TOKEN
        )
        dc_repo = DataContractRepository(
            api_url=self._confs.DATA_CONTRACT_API_URL,
            api_auth_token=api_token
        )

        data_contract = dc_repo.get_contract(
            catalog=catalog,
            schema=database,
            table=table
        )

        if api_auth_token and not data_contract:
            api_response = dc_repo.last_response
            self.logger.warn(
                "Data Contract API responded with "
                f"'{api_response.status_code}: {api_response.reason}'."
            )

        return data_contract

    def send_data(
        self,
        payload: dict,
        data_routing_id: str,
        data_routing_token: Optional[str] = None
    ) -> dict:
        """Send data to various destinations using Data Routing layer.

        The Data Routing layer uses the Delta Sharing as the default
        output method for sharing data.

        The `payload` parameter must use the format:
        `{
            "since_datetime_value": start_date,
            "until_datetime_value": end_date
        }`
        where `since_datetime_value` and `until_datetime_value` must be in
        the format "%Y-%m-%d", like "2023-12-27".

        If the `data_routing_token` parameter is not set, the environment
        variable `DATA_ROUTING_TOKEN` will be used instead.

        Args:
            payload:
                The configuration to be used in the date filter to select the
                records that will be sent to the client.
            data_routing_id:
                The ID of the Data Routing pipeline created previously.
            data_routing_token:
                The Data Routing API token created previously.
        """
        token = (
            data_routing_token
            if data_routing_token
            else self._confs.DATA_ROUTING_TOKEN
        )

        data_routing_inteface = DataRoutingInterface(
            url=self._confs.DATA_ROUTING_FIREHOSE_URL,
            data_routing_token=token
        )

        self.logger.info(
            f"Sending data through routing ID '{data_routing_id}' using "
            f"payload '{payload}'"
        )

        try:
            return data_routing_inteface.send_data(
                pipeline_id=data_routing_id,
                payload=payload
            )
        except Exception as e:
            self.logger.error(
                f"Error occurred in send_data function: {str(e)}. "
                f"Payload used in request: {payload}")
            raise

    def ingest_data(
        self,
        payload: dict,
        data_routing_id: str,
        data_routing_token: Optional[str] = None,
    ) -> dict:
        """Ingest data from various sources using Data Routing layer.

        If the `data_routing_token` parameter is not set, the environment
        variable `DATA_ROUTING_TOKEN` will be used instead.

        Args:
            payload:
                The payload that will be sent to Data Routing API.
            data_routing_id:
                The ID of the Data Routing pipeline created previously.
            data_routing_token:
                The Data Routing API token created previously.
        """
        token = (
            data_routing_token
            if data_routing_token
            else self._confs.DATA_ROUTING_TOKEN
        )

        data_routing_inteface = DataRoutingInterface(
            url=self._confs.DATA_ROUTING_FIREHOSE_URL,
            data_routing_token=token
        )

        self.logger.info(
            f"Ingesting data through routing ID '{data_routing_id}' using "
            f"payload '{payload}'"
        )

        try:
            return data_routing_inteface.ingest_data(
                pipeline_id=data_routing_id,
                payload=payload
            )
        except Exception as e:
            self.logger.error(
                f"Error occurred in ingest_data function: {str(e)}. "
                f"Payload used in request: {payload}"
            )
            raise

    def _automated_quality_checks(
        self,
        catalog: str,
        database: str,
        table: str,
        df: DataFrame
    ) -> None:
        data_contract = self.get_data_contract(
            catalog=catalog,
            database=database,
            table=table,
            api_auth_token=self._confs.DATA_CONTRACT_API_TOKEN
        )

        if data_contract:
            self.logger.info(
                "Executig automated quality checks for "
                f"{catalog}.{database}.{table}."
            )

            scan_definition_name = f"automated__{catalog}__{database}__{table}"
            self.run_quality_checks(
                scan_definition_name=scan_definition_name,
                checks=data_contract.get_quality_checks(),
                df=df
            )

    def _report_quality_checks(self, check_results: dict) -> None:
        """Evaluate and report quality checks from Soda."""
        if len(check_results.keys()) == 0:
            return

        if check_results["hasErrors"]:
            self.logger.error(
                "Soda Quality Checks returned errors! "
                "Checkout logs for more info."
            )
        elif check_results["hasWarnings"] and not check_results["hasFailures"]:
            self.logger.warning(
                "Some Quality Checks raised warnings! "
                "Checkout logs for more info."
            )
        elif check_results["hasFailures"]:
            self.logger.warning(
                "Some Quality Checks raised failures! "
                "Checkout logs for more info."
            )
        else:
            self.logger.info("Quality checks finished. All good!")

        results_parser = SodaCheckResultsParser(check_results)
        scan_overview = results_parser._collect_scan_results()
        self.logger.info(json.dumps(scan_overview))

        results_per_check = results_parser._collect_check_results()
        for check in results_per_check:
            self.logger.info(json.dumps(check))

    def check_delta_update(
        self,
        tbl_list: List[str],
        date: Optional[str] = None,
        raise_error: Optional[bool] = True
    ) -> None:
        """Verify if one or more tables are up-to-date.

        This function checks if tables from `tbl_list` were updated later than
        `date`, using the expression `last_update_date > date`. If `False`, an
        exception is raised unless the `raise_error` is set to `False`.

        If `date` is not informed, this functions uses the today's date.

        Args:
            tbl_list:
                A list of string with table names using the format
                `catalog.database.table`.

            date:
                A date using ISO format that will be compared against last
                change date of each table in `tbl_list`.

            raise_error:
                If `True`, an exception is raised when a table is not
                up-to-date. Defaults to `False`.

        Raises:
            Exception:
                When `date` is greater than one or more table update date.
        """
        self.logger.info(f"Checking delta update for tables '{tbl_list}'")
        check_delta_update(
            context=self.context,
            tbl_list=tbl_list,
            date=date,
            raise_error=raise_error
        )

    def ingest_google_sheet(
        self,
        sheet_key: str,
        catalog: str,
        database: str,
        table: str,
        index: str = ""
    ) -> None:
        """Ingest google sheet through Gaia Data Routing.

        NOTE: To ingest data from a Google Sheet, you must
        share this Google Sheet with the following email: data-platform@datarouting-api.iam.gserviceaccount.com.
        If you don't do this, the service will have no access to the Sheet.
        For more details, read the docs: https://dataforgedocs.blip.tools/user_guide/ingest_google_sheet/

        This function creates an ingestion job by calling the Gaia Data Routing
        service. The ingestion job executes in another job cluster, and the link
        to the job run is logged in the terminal.

        Note that the ingestion job does not block the execution of the current
        job (i.e. the job that called this function), so the notebook will
        continue to execute right after this function is called.

        By default, this function reads all the data from the first sheet (or "page") of
        the Google Sheet. If you want to ingest data from another sheet/page of the Google
        Sheet, or, from a very specific range/region of the Google Sheet, you can provide
        a A1 notation value to the `index` argument, to select the specific range/region
        of the Google Sheet that you want to ingest/read.

        Args:
            sheet_key:
                The key of the target sheet to be ingested.
            catalog:
                The target catalog.
            database:
                The target database.
            table:
                The target table.
            index:
                A string (following the A1 notation) that specifies the range,
                or, the region of the Google Sheet to read and ingest.
        """
        if get_domain_catalog(self.context) != catalog:
            raise ValueError((
                "You provided a catalog that does not belong "
                "to your current environment."
            ))

        routing = DataRoutingAPIClient(
            url=self._confs.DATA_ROUTING_API_URL,
            auth_token=self._confs.DATA_ROUTING_API_TOKEN,
            context=self.context,
            logger=self.logger
        )

        routing.ingest_google_sheet(
            sheet_key=sheet_key,
            catalog=catalog,
            database=database,
            table=table,
            index=index
        )

    def persist_files(
        self,
        database: str,
        volume_name: str,
        destination_folder: str,
        files: List[str],
        catalog: Optional[str] = ""
    ) -> None:
        """Persist local files into domain's storage accont.

        You can use this function to persist/save files into the
        domain's storage account. This function always persist the files
        that you provide in the `files` argument inside the landingzone container
        of the domain's storage account.

        These files can then be accessed later through a path like this:
        `/Volumes/{catalog}/{database}/{volume_name}/{destination_folder}`.
        For more details, you should read more about Databricks Volumes at:
        https://learn.microsoft.com/pt-br/azure/databricks/volumes/

        When defining the volume in which the files will be saved, you can
        either provide a volume that already exists, or, a volume that
        still doesn't exist. If the volume provided does not exist yet in
        the environment, the `persist_files()` function will automatically create this
        volume for you.

        But if you want to provide a volume that already exist in the
        current environment, this volume must be connected to the landingzone
        container of your domain's storage account. If this volume is connected
        to a different container, then, this function will raise an exception.

        Args:
            database:
                The volume's database. The database will be created if does not
                exits.
            volume_name:
                The name of the volume to persist the files. The volume will be
                created if does not exits.
            destination_folder:
                The folder inside the volume where the files must be saved. The
                folder and its subfolders are created they don't exist.
            files:
                A list containing the absolute paths for the files to be
                saved into domains landingzone. An example would be
                `['/local_disk/tmp/file_1.csv', '/local_disk/tmp/file_2.txt']`
            catalog:
                The target catalog. If not informed, the function uses the
                default domain catalog.

        """
        _catalog = catalog if catalog else get_domain_catalog(context=self.context)

        persist_files(
            context=self.context,
            catalog=_catalog,
            database=database,
            volume_name=volume_name,
            destination_folder=destination_folder,
            files=files,
            logger=self.logger
        )

    def share_files(
        self,
        receiver_email: str,
        cc_email: str,
        database: str,
        volume_name: str,
        folder: str,
        files: Union[str, List[str]],
        catalog: Optional[str] = "",
        email_title_opts: Optional[EmailTitleOpts] = None,
    ) -> None:
        """Share data lake files.

        Currently this function only supports sharing files through download
        links sent via email.

        Args:
            receiver_email:
                The e-mail of who will receive the files
            cc_email:
                The email of who will get a copy of the email
            database:
                The files database
            volume_name:
                The volume name where the files are in.
            folder:
                The full path inside the volume where the files are in. This
                parameter must not include the volume name nor the file name.
            files:
                A file or a list of file names to be sent. Must include the
                file extension.
            catalog:
                The files catalog. If not informed, the current domain catalog
                will be used.
            email_title_opts:
                Optional. You can provide an `EmailTitleOpts` object in this argument
                to change the behaviour of title of the email that will be sent by the service.
                For more details, see the documentation of the function
                [`create_email_title_opts()`][blipdataforge.DataPlatform.create_email_title_opts].
                If none specified, the default email title is used instead.
        """
        _catalog = catalog if catalog else get_domain_catalog(context=self.context)

        routing = DataRoutingAPIClient(
            url=self._confs.DATA_ROUTING_API_URL,
            auth_token=self._confs.DATA_ROUTING_API_TOKEN,
            context=self.context,
            logger=self.logger
        )

        routing.share_files(
            receiver_email=receiver_email,
            cc_email=cc_email,
            files=files,
            catalog=_catalog,
            database=database,
            volume_name=volume_name,
            folder=folder,
            email_title_opts=email_title_opts
        )

    def write_eventhub(
        self,
        df: DataFrame,
        namespace: str,
        topic: str,
        conn_str: str,
        port: int = 9093,
        batch_size: int = 524288,
        request_timeout: int = 300000,
        linger_ms: int = 2,
        buffer_memory: int = 33554432,
        max_block_ms: int = 60000
    ) -> None:
        """Write Spark DataFrame to EventHub topic.

        You can use this method to write the data from a Spark DataFrame into
        an existing EventHub topic. For now, this method uses the normal/batch
        backend of spark (spark.write).

        EventHubs events need to be in a specific format, so this method adds a
        column "value" to the dataframe, containing the dataframe data as a
        JSON string.

        Args:
            df:
                A Spark DataFrame with the data that you want to write.
            namespace:
                The EventHub namespace to be used.
            topic:
                The name topic of the EventHub topic to write to.
            conn_str:
                The connection string to be used to connect and send
                data to eventhub. The connection string must be in the format:
                `Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;
                SharedAccessKeyName=<KEY NAME>;SharedAccessKey=<KEY VALUE>`.
            port:
                The port that will be used to connect with EventHub server. If
                not informed, use default port 9093.
            batch_size:
                This configuration controls the default batch size in bytes,
                and will be passed directly to the `kafka.batch.size` config.
            request_timeout:
                Controls the maximum amount of time the EventHub producer will
                wait for the response of a request.
            linger_ms:
                Controls how long the producer will wait for new records before
                sending the batch to the broker. This configuration can improve
                the transmission by reducing the amount of "uncompleted" batch
                requests sent to broker.
            buffer_memory:
                The total bytes of memory the producer can use to buffer
                records waiting to be sent to the server. If records are sent
                faster than they can be delivered to the server the producer
                will block for max.block.ms after which it will throw an
                exception.
            max_block_ms:
                It is used by producers and defines the maximum time
                (in milliseconds) that a synchronous send() call can be blocked
                while Kafka attempts to obtain metadata from the cluster or
                space in the internal buffer for the message.
        """
        connector = EventHubConnector(self.logger)
        eventhub_conf = EventHubProducerConf(
            topic=topic,
            namespace=namespace,
            conn_str=conn_str,
            port=port,
            batch_size=batch_size,
            request_timeout=request_timeout,
            linger_ms=linger_ms,
            buffer_memory=buffer_memory,
            max_block_ms=max_block_ms
        )

        if "value" in df.columns:
            raise ValueError(
                "The dataframe cannot contain a column named 'value'."
            )

        df_eventhub = df.withColumn(
            "value", to_json(struct([df[col] for col in df.columns]))
        )
        connector.write_eventhub(df=df_eventhub, eventhub=eventhub_conf)

    def create_email_title_opts(
        self,
        include_filename: Optional[bool] = None,
        start_date: Optional[date] = None,
        end_date: Optional[date] = None,
        labels: Optional[List[str]] = None
    ) -> EmailTitleOpts:
        """Create a EmailTitleOpts object.

        You can use this function to create/build a `EmailTitleOpts`
        object, which is used by the `share_files()` facade. With this
        object, you can control/change the behaviour of the title of the
        email that is sent by `share_files()`.

        The arguments in this function specify which specific information
        you want to add to (or include in) the title of the email. You
        can include the following types of information in the title of
        the email: the name of the file that is being shared; a set of dates to
        specify the start and end of the period of data that is being shared;
        a set of small labels that can be used to filter the email more easily.

        All arguments in this function are optional, meaning that, you can
        use all arguments at once, to include as much information as possible
        in the title of the email. But you can also use only one specific argument
        to include just one single extra information in the email title.

        For example, if you want to include just the name of the file that
        you are being shared through `share_files()`, then, you need to set
        just the `file_name` argument in this function, and nothing more.

        But if you want to include the range of dates in the title of the
        email, to describe the period of observations that are being
        exposed in the shared file, then, you need to set both the
        `start_date` and `end_date` arguments in this function.

        For example, consider the following snippet of code:

        ```python
        from blipdataforge import DataPlatform
        dp = DataPlatform()
        email_title = dp.create_email_title_opts(
            include_filename=True,
            start_date=date(2024,12,9),
            end_date=date(2024,12,10)
        )

        dp.share_files(
            receiver_email="pedro.duarte@blip.ai",
            cc_email="pedro.duarte@blip.ai",
            catalog="dataplatform_sandbox",
            database="blipdataforge",
            volume_name="sales_files",
            files=csvs,
            folder="ingest/sales",
            email_title_opts=email_title
        )
        ```

        With this snippet, the title of the email that is sent by `share_files()`,
        will be rendered in a format similar to this:

        ```
        Your data is ready for download - sales.csv - From 2024-12-09 to 2024-12-10
        ```

        Args:
            include_filename:
                A boolean (True/False) to indicate if you want (or not)
                to include in the title the name of the file that is
                being shared in the email. If multiple files are being
                shared in the email, then, the name of the first file
                is used.
            start_date:
                A `datetime.date` object, which is the start date of
                the period of data that is being shared in the email.
            end_date:
                A `datetime.date` object, which is the end date of
                the period of data that is being shared in the email.
            labels:
                A set of labels, or, in other words, a set of small strings
                to include in the email title. Each label (i.e. each string)
                in this list should have 20 characters max. If you include
                a string that have more than 20 characters in this list,
                an exception is raised. You can use these set of
                labels as a mechanism to quickly and easily filter a
                set of emails through the email search service of GMail.
        """
        return EmailTitleOpts(
            include_filename=include_filename,
            start_date=start_date,
            end_date=end_date,
            labels=labels
        )

    def get_streaming_engine(self) -> Union[StreamingEngine, None]:
        """Get streaming engine.

        This method returns a `StreaminEngine` object, used create or update
        streaming structures like tables and views.
        """
        return get_streaming_engine()

    def write_to_opensearch(
        self,
        df: DataFrame,
        host: str,
        port: int,
        user: str,
        password: str,
        index: str,
        chunk_size: int,
        write_mode: str = "append",
        op_type: str = "upsert",
        id_column: Optional[str] = None,
        use_spark_native_writer: bool = True,
    ) -> List[dict]:
        """Write Spark DataFrame to Opensearch database.

        You can use this function to write a Spark DataFrame to an Opensearch database.
        Opensearch is a NoSQL database (i.e. it's a database of JSON Documents). This
        function will basically: 1) collect all rows from your Spark DataFrame; 2) transform them into
        "dicts", which are the "JSON documents" equivalents in Python; 3) insert the
        necessary Opensearch operation directives into this list; 4) then, it will start
        sending this data to the Opensearch server in batches/chunks.

        The `user` and `password` arguments are related to the authentication process in the
        server, while the `host` and `port` arguments define where the Opensearch server is,
        and where the connection is established.

        The `chunk_size` argument defines the maximum number of rows from your Spark DataFrame
        that are included in each chunk that is sent to the Opensearch server. For example,
        if your DataFrame contains 10000 rows, and you set `chunk_size` to 1000, then, this
        function will split your DataFrame into 10 chunks, and send each chunk sequentially
        to the Opensearch server. By controlling the size of the chunk that is sent to the
        server, you can also control how much data (or, how much "stuff") the server needs to
        process in each request. This is useful if your Opensearch server have a rate limit, or,
        a if it does not have much powerful resources to process huge amounts of data in a
        single request.

        This function have two different backends: one is the Opensearch Spark native driver,
        and the other one is the Opensearch Python SDK. By default, this function uses the
        Opensearch Spark native driver to write the data. Because this native driver is much more
        scalable and appropriate for our big data environment. However, in order to use this native driver backend you must
        have a specific Maven package installed in your environment/cluster.

        https://central.sonatype.com/artifact/org.opensearch.client/opensearch-spark-30_2.12

        The function will check for the presence of the native driver in your environment. If it
        does not find this native driver, then, you likely does not have this Maven package installed
        in your environment, and therefore, an `AssertionError` is raised by the function, warning
        you about this problem.

        On the other side, we have the Python SDK backend, which does not scale very well. But it
        handles moderate DataFrames (< 400k rows) decently well, and also, offers a much better visibility
        of "what is happenning" behind the scene. Because the Python SDK have much better and useful error
        messages that you can use to debug your application, and also, it returns a list containing the raw
        HTTP responses of the server, which gives you a lot of information about the write
        operations performed in the server. Therefore, you can easily see if any of your records was not
        succesfully recorded in the server, for whatever reason.

        Unfortunately, you do not have such visibility in the Spark native driver backend. In fact, the
        Spark native driver, although it scales very well, it does not provide very useful/informative
        error messages for the users. Therefore, if there is something wrong in your code (e.g. you may
        forgot to change the hostname of your server in the code, or, you are using the wrong credentials,
        or, your data might have a different schema than the documents that are already present in index
        your index, etc.) you will probably have some hard time while debugging if you are using the Spark native driver
        backend.

        Args:
            df:
                A Spark DataFrame with the data that you want to write.
            host:
                A string containing a hostname, which identifies where your Opensearch
                database is hosted.
            port:
                A port number to use in the connection with the database.
            user:
                The username to use in the connection with the database.
            password:
                The password to use in conjunction with the username provided at `user`
                in the connection with the database.
            index:
                The name of the existing index in the database to use.
            chunk_size:
                Sets the size of each chunk of data that is sent to the Opensearch server. This chunk size
                should be the number of entries/rows to be included in each chunk.
            write_mode:
                The Spark write mode to use when writing the data (e.g. append, overwrite, etc.).
            op_type:
                Default to "upsert". The name of the Opensearch operation to use while writing
                the data to the database. Supported values are: "upsert" (for an upsert operation)
                "index" (for an index operation).
            id_column:
                Optional. The name of the column that contains the "document id" to use in the database.
            use_spark_native_writer:
                Default to True. Whether to use or not the Spark native writer class to write
                the data. If False, the Python SDK for Opensearch is used instead
                to write the data. However, the Python SDK writer is much slower and it is
                not capable of handling huge amounts of data, so be careful with it.

        Returns:
            A list of responses. In more details, each element in the response returned by the
            Opensearch server for each chunk/batch of data from your Spark DataFrame that was
            sent to the server. You can use this list of responses to investigate if some
            error occurred during the ingest process.
        """
        conn = OpensearchConnector(
            host=host,
            port=port,
            user=user,
            password=password,
            write_mode=write_mode,
            logger=self.logger,
            id_column=id_column,
            use_spark_native_writer=use_spark_native_writer
        )
        return conn.write_to_opensearch(
            df=df,
            index=index,
            chunk_size=chunk_size,
            op_type=op_type
        )

    def write_mongodb(
        self,
        conn_str: str,
        df: DataFrame,
        database: str,
        collection: str,
        write_mode: str,
        operation_type: str = "replace",
        upsert_document: bool = False,
        convert_json: Union[bool, str] = False,
        ignore_null: bool = False,
        id_field_list: List[str] = ["_id"],
        max_batch_size: int = 512,
        comment: str = ""
    ) -> None:
        """Write a dataframe to a MongoDB collection.

        Note: This function works only in Single User clusters, otherwise an
        AnalysisException will raise. Also, this function depends on the
        'org.mongodb.spark:mongo-spark-connector_2.12:10.4.1' maven library,
        so please add it to your ADF pipeline activity or install it in your
        Single User cluster.

        Args:
            conn_str:
                The connection string used to establish connection with MongoDB
                server. Must be in the format
                `mongodb+srv://<db_username>:<db_password>@<mongo_server>.mongodb.net/`
            df:
                The PySpark dataframe with the data to be written to the
                collection.
            database:
                The database where the target collection resides.
            collection:
                The target collection to write data to.
            write_mode:
                Write modes supported: `overwrite` and `append`. If you specify
                the `overwrite` write mode, the connector drops the target
                collection and creates a new collection that uses the default
                collection options. This behavior can affect collections that
                don't use the default options, such as sharded collections,
                collections with nondefault collations and time-series
                collections.
            operation_type:
                Specifies how the write operation must save the documents.
                Accepts `insert`, `replace` and `update`. Each behavior is
                described as follows:

                - `insert`: Insert the data, even if there is a match with
                `id_field_list`.
                - `replace`: Replace an existing document that matches the
                `id_field_list` value with the new data. If no match exists,
                the value of `upsert_document` indicates whether the connector
                inserts a new document.
                - `update`: Update an existing document that matches the
                `id_field_list` value with the new data. If no match exists,
                the value of `upsert_document` indicates whether the connector
                inserts a new document.
            upsert_document:
                When true, replace and update operations will insert the data
                if no match exists. For time series collections, you must set
                `upsert_document` to false.
            convert_json:
                Specifies whether the connector parses the string and converts
                extended JSON into BSON. The default value is False, which
                means that the connector leaves all values as strings.
                This function accepts `any`, `objectOrArrayOnly` or `False`,
                which the behaviors are as follows:

                - `any` (str): The connector converts all JSON values to BSON.
                    - "{a: 1}" becomes {a: 1}.
                    - "[1, 2, 3]" becomes [1, 2, 3].
                    - "true" becomes true.
                    - "01234" becomes 1234.
                    - "{a:b:c}" doesn't change.

                - `objectOrArrayOnly` (str): The connector converts only JSON
                objects and arrays to BSON.
                    - "{a: 1}" becomes {a: 1}.
                    - "[1, 2, 3]" becomes [1, 2, 3].
                    - "true" doesn't change.
                    - "01234" doesn't change.
                    - "{a:b:c}" doesn't change.

                - False (bool):The connector leaves all values as strings.
            ignore_null:
                When `True`, the connector ignores any null values when
                writing, including null values in arrays and nested documents,
                so if you have a document with a field with `null` value, just
                this field will NOT be written to mongo, but the document will
                be created with other fields that are not `null`. Defaults to
                `False`.
            id_field_list:
                 List of fields by which to split the collection data.
                 Defaults to `["_id"]`.
            max_batch_size:
                Specifies the maximum number of operations to batch in bulk
                operations. Defaults to `512`
            comment:
                The comment to append to the write operations. Comments appear
                in the output of the [Database profiler](https://www.mongodb.com/docs/manual/reference/database-profiler/).
        """
        mongo_conn = MongoDBConnector(
            conn_str=conn_str,
            logger=self.logger
        )

        try:
            mongo_conn.write(
                df=df,
                database=database,
                collection=collection,
                write_mode=write_mode,
                operation_type=operation_type,
                upsert_document=upsert_document,
                convert_json=convert_json,
                ignore_null=ignore_null,
                id_field_list=id_field_list,
                max_batch_size=max_batch_size,
                comment=comment
            )
        except SparkConnectGrpcException as e:
            self.logger.error(
                "The 'org.mongodb.spark:mongo-spark-connector_2.12:10.4.1' "
                "library must be installed in the cluster to interact with "
                "MongoDB. Please, add this library to you cluster and execute "
                "the job again."
            )
            raise e

        except AnalysisException as e:
            self.logger.error(
                "The write_mongodb function works only in Single User clusters."
                "Please use another cluster or execute your job from an ADF "
                "pipeline."
            )
            raise e

    def write_elasticsearch(
        self,
        host: str,
        port: int,
        index: str,
        df: DataFrame,
        write_mode: str = "append",
        op_type: str = "index",
        chunk_size: int = 1000,
        id_column: Optional[str] = None,
        api_key: Optional[str] = None,
        user: Optional[str] = None,
        password: Optional[str] = None
    ) -> None:
        """Write a dataframe to an Elasticsearch database.

        Note: This function depends on the
        'org.elasticsearch:elasticsearch-spark-30_2.12:8.17.3' maven
        library, so please add it to your ADF pipeline activity or install it
        in your cluster.

        Args:
            host:
                A string containing a hostname, which identifies where your
                Elasticsearch database is hosted. Usually in the form
                `https://my-project.es.eastus.azure.elastic.cloud`.
            port:
                A port number to use in the connection with the Elasticsearch.
            index:
                The name of the existing index in the database to use.
            df:
                The PySpark dataframe with the data to be written to the
                Elasticsearch.
            write_mode:
                The spark write mode. This parameter controls how to perform
                the write operation. Can be any of:

                - `overwrite`: executes a `delete` operation on index before
                writing the income data

                - `append`: just add the new data, without deletion.
            op_type:
                The operation elasticsearch should perform. Can be any of:

                - `index`: new data is added while existing data (based on its
                id) is replaced (reindexed).

                - `create`: adds new data - if the data already exists (based
                on its id), an exception is thrown.

                - `update`: updates existing data (based on its id). If no data
                is found, an exception is thrown.

                - `upsert`: known as merge or insert if the data does not
                exist, updates if the data exists (based on its id).

                - `delete`: deletes existing data (based on its id). If no data
                is found, an exception is thrown.
            chunk_size:
                Size (in entries) for batch writes. This setting is per task
                instance; it gets multiplied at runtime by the total number
                of running tasks/workers.
            id_column:
                The column in your pyspark DataFrame that contains the
                document ID.
            api_key:
                The API key to use in the connection with Elasticsearch. If
                both API key and user/pass are provided, the API key will be
                used.
            user:
                The username to use in the connection with the Elasticsearch.
            password:
                The password to use in conjunction with the username provided
                at `user` in the connection with the database.
        """
        connector = ElasticsearchConnector(
            host=host,
            port=port,
            logger=self.logger,
            api_key=api_key,
            user=user,
            password=password
        )

        connector.write(
            df=df,
            write_mode=write_mode,
            op_type=op_type,
            index=index,
            chunk_size=chunk_size,
            id_column=id_column
        )

__init__()

Initializes the class with current execution context.

Source code in blipdataforge/facades.py
def __init__(self) -> None:
    """Initializes the class with current execution context."""
    self.context = get_context()
    self.logger = get_lib_logger(context=self.context)
    self._confs = BlipDataForgeConfigs("/blipdataforge/conf.ini")

check_delta_update(tbl_list, date=None, raise_error=True)

Verify if one or more tables are up-to-date.

This function checks if tables from tbl_list were updated later than date, using the expression last_update_date > date. If False, an exception is raised unless the raise_error is set to False.

If date is not informed, this functions uses the today's date.

Parameters:

Name Type Description Default
tbl_list List[str]

A list of string with table names using the format catalog.database.table.

required
date Optional[str]

A date using ISO format that will be compared against last change date of each table in tbl_list.

None
raise_error Optional[bool]

If True, an exception is raised when a table is not up-to-date. Defaults to False.

True

Raises:

Type Description
Exception

When date is greater than one or more table update date.

Source code in blipdataforge/facades.py
def check_delta_update(
    self,
    tbl_list: List[str],
    date: Optional[str] = None,
    raise_error: Optional[bool] = True
) -> None:
    """Verify if one or more tables are up-to-date.

    This function checks if tables from `tbl_list` were updated later than
    `date`, using the expression `last_update_date > date`. If `False`, an
    exception is raised unless the `raise_error` is set to `False`.

    If `date` is not informed, this functions uses the today's date.

    Args:
        tbl_list:
            A list of string with table names using the format
            `catalog.database.table`.

        date:
            A date using ISO format that will be compared against last
            change date of each table in `tbl_list`.

        raise_error:
            If `True`, an exception is raised when a table is not
            up-to-date. Defaults to `False`.

    Raises:
        Exception:
            When `date` is greater than one or more table update date.
    """
    self.logger.info(f"Checking delta update for tables '{tbl_list}'")
    check_delta_update(
        context=self.context,
        tbl_list=tbl_list,
        date=date,
        raise_error=raise_error
    )

create_email_title_opts(include_filename=None, start_date=None, end_date=None, labels=None)

Create a EmailTitleOpts object.

You can use this function to create/build a EmailTitleOpts object, which is used by the share_files() facade. With this object, you can control/change the behaviour of the title of the email that is sent by share_files().

The arguments in this function specify which specific information you want to add to (or include in) the title of the email. You can include the following types of information in the title of the email: the name of the file that is being shared; a set of dates to specify the start and end of the period of data that is being shared; a set of small labels that can be used to filter the email more easily.

All arguments in this function are optional, meaning that, you can use all arguments at once, to include as much information as possible in the title of the email. But you can also use only one specific argument to include just one single extra information in the email title.

For example, if you want to include just the name of the file that you are being shared through share_files(), then, you need to set just the file_name argument in this function, and nothing more.

But if you want to include the range of dates in the title of the email, to describe the period of observations that are being exposed in the shared file, then, you need to set both the start_date and end_date arguments in this function.

For example, consider the following snippet of code:

from blipdataforge import DataPlatform
dp = DataPlatform()
email_title = dp.create_email_title_opts(
    include_filename=True,
    start_date=date(2024,12,9),
    end_date=date(2024,12,10)
)

dp.share_files(
    receiver_email="pedro.duarte@blip.ai",
    cc_email="pedro.duarte@blip.ai",
    catalog="dataplatform_sandbox",
    database="blipdataforge",
    volume_name="sales_files",
    files=csvs,
    folder="ingest/sales",
    email_title_opts=email_title
)

With this snippet, the title of the email that is sent by share_files(), will be rendered in a format similar to this:

Your data is ready for download - sales.csv - From 2024-12-09 to 2024-12-10

Parameters:

Name Type Description Default
include_filename Optional[bool]

A boolean (True/False) to indicate if you want (or not) to include in the title the name of the file that is being shared in the email. If multiple files are being shared in the email, then, the name of the first file is used.

None
start_date Optional[date]

A datetime.date object, which is the start date of the period of data that is being shared in the email.

None
end_date Optional[date]

A datetime.date object, which is the end date of the period of data that is being shared in the email.

None
labels Optional[List[str]]

A set of labels, or, in other words, a set of small strings to include in the email title. Each label (i.e. each string) in this list should have 20 characters max. If you include a string that have more than 20 characters in this list, an exception is raised. You can use these set of labels as a mechanism to quickly and easily filter a set of emails through the email search service of GMail.

None
Source code in blipdataforge/facades.py
def create_email_title_opts(
    self,
    include_filename: Optional[bool] = None,
    start_date: Optional[date] = None,
    end_date: Optional[date] = None,
    labels: Optional[List[str]] = None
) -> EmailTitleOpts:
    """Create a EmailTitleOpts object.

    You can use this function to create/build a `EmailTitleOpts`
    object, which is used by the `share_files()` facade. With this
    object, you can control/change the behaviour of the title of the
    email that is sent by `share_files()`.

    The arguments in this function specify which specific information
    you want to add to (or include in) the title of the email. You
    can include the following types of information in the title of
    the email: the name of the file that is being shared; a set of dates to
    specify the start and end of the period of data that is being shared;
    a set of small labels that can be used to filter the email more easily.

    All arguments in this function are optional, meaning that, you can
    use all arguments at once, to include as much information as possible
    in the title of the email. But you can also use only one specific argument
    to include just one single extra information in the email title.

    For example, if you want to include just the name of the file that
    you are being shared through `share_files()`, then, you need to set
    just the `file_name` argument in this function, and nothing more.

    But if you want to include the range of dates in the title of the
    email, to describe the period of observations that are being
    exposed in the shared file, then, you need to set both the
    `start_date` and `end_date` arguments in this function.

    For example, consider the following snippet of code:

    ```python
    from blipdataforge import DataPlatform
    dp = DataPlatform()
    email_title = dp.create_email_title_opts(
        include_filename=True,
        start_date=date(2024,12,9),
        end_date=date(2024,12,10)
    )

    dp.share_files(
        receiver_email="pedro.duarte@blip.ai",
        cc_email="pedro.duarte@blip.ai",
        catalog="dataplatform_sandbox",
        database="blipdataforge",
        volume_name="sales_files",
        files=csvs,
        folder="ingest/sales",
        email_title_opts=email_title
    )
    ```

    With this snippet, the title of the email that is sent by `share_files()`,
    will be rendered in a format similar to this:

    ```
    Your data is ready for download - sales.csv - From 2024-12-09 to 2024-12-10
    ```

    Args:
        include_filename:
            A boolean (True/False) to indicate if you want (or not)
            to include in the title the name of the file that is
            being shared in the email. If multiple files are being
            shared in the email, then, the name of the first file
            is used.
        start_date:
            A `datetime.date` object, which is the start date of
            the period of data that is being shared in the email.
        end_date:
            A `datetime.date` object, which is the end date of
            the period of data that is being shared in the email.
        labels:
            A set of labels, or, in other words, a set of small strings
            to include in the email title. Each label (i.e. each string)
            in this list should have 20 characters max. If you include
            a string that have more than 20 characters in this list,
            an exception is raised. You can use these set of
            labels as a mechanism to quickly and easily filter a
            set of emails through the email search service of GMail.
    """
    return EmailTitleOpts(
        include_filename=include_filename,
        start_date=start_date,
        end_date=end_date,
        labels=labels
    )

delete(catalog, database, table, predicate='', dry_run=False)

Deletes the rows that match a predicate.

The query executed to delete the records follow the format DELETE FROM database.table [WHERE predicate], where database, table and predicate are set by the arguments passed to this function.

Parameters:

Name Type Description Default
catalog str

The catalog where the target table is.

required
database str

The database/schema where the target table is.

required
table str

The target table from which records will be deleted.

required
predicate str

The condition used to find the rows to delete. When no predicate is provided, deletes all rows.

''
dry_run bool

If True, performs a trial run without making actual changes. Defaults to False.

False
Source code in blipdataforge/facades.py
def delete(
    self,
    catalog: str,
    database: str,
    table: str,
    predicate: str = "",
    dry_run: bool = False
) -> None:
    """Deletes the rows that match a predicate.

    The query executed to delete the records follow the format
    `DELETE FROM database.table [WHERE predicate]`, where `database`,
    `table` and `predicate` are set by the arguments passed to this
    function.

    Args:
        catalog:
            The catalog where the target table is.
        database:
            The database/schema where the target table is.
        table:
            The target table from which records will be deleted.
        predicate:
            The condition used to find the rows to delete. When no
            predicate is provided, deletes all rows.
        dry_run:
            If True, performs a trial run without making actual changes.
            Defaults to False.
    """
    delete(
        context=self.context,
        catalog=catalog,
        database=database,
        table=table,
        predicate=predicate,
        dry_run=dry_run
    )

get_data_contract(catalog, database, table, api_auth_token='')

Retrieve a data contract from Blip's Data Contract API.

Parameters:

Name Type Description Default
catalog str

The desired catalog

required
database str

The desired database

required
table str

The desired table

required
api_auth_token Optional[str]

The token to authenticate on the API. It is possible to specify the API token through environment variable DATA_CONTRACT_API_TOKEN. If both api_auth_token param and DATA_CONTRACT_API_TOKEN env var are set, the api_auth_token param will be used.

''

Returns:

Type Description
Union[DataContract, None]

A DataContract object if data contract exists or None otherwise.

Source code in blipdataforge/facades.py
def get_data_contract(
    self,
    catalog: str,
    database: str,
    table: str,
    api_auth_token: Optional[str] = ""
) -> Union[DataContract, None]:
    """Retrieve a data contract from Blip's Data Contract API.

    Args:
        catalog:
            The desired catalog
        database:
            The desired database
        table:
            The desired table
        api_auth_token:
            The token to authenticate on the API. It is possible to
            specify the API token through environment variable
            `DATA_CONTRACT_API_TOKEN`. If both `api_auth_token` param
            and `DATA_CONTRACT_API_TOKEN` env var are set, the
            `api_auth_token` param will be used.

    Returns:
        A DataContract object if data contract exists or `None` otherwise.
    """
    api_token = (
        api_auth_token
        if api_auth_token
        else self._confs.DATA_CONTRACT_API_TOKEN
    )
    dc_repo = DataContractRepository(
        api_url=self._confs.DATA_CONTRACT_API_URL,
        api_auth_token=api_token
    )

    data_contract = dc_repo.get_contract(
        catalog=catalog,
        schema=database,
        table=table
    )

    if api_auth_token and not data_contract:
        api_response = dc_repo.last_response
        self.logger.warn(
            "Data Contract API responded with "
            f"'{api_response.status_code}: {api_response.reason}'."
        )

    return data_contract

get_latest_record_date(catalog, database, table, date_column, date_format='', offset=0)

Returns the date of the latest record in a table.

Optionally, an offset can be applied to the result to get an earlier date.

Disclaimer: This functions is an alternate for the old initial_info from TakeSpark library.

Parameters:

Name Type Description Default
catalog str

The target catalog to be used.

required
database str

The target database to be used.

required
table str

The target table to be used.

required
date_column str

The column which contains the date info of the latest record.

required
date_format str

The format of the date, according to the datetime format directives. This parameter is used to apply the offset when the date columns is of str type. If this parameter is not informed, the offset will not be applied.

Example: if the dates on the date_column are provided like "2023-11-21", the date_format parameter must be %Y-%m-%d. Refer to Format Codes in the datetime python class docs to understand how to build this parameter.

''
offset int

The amount of days to subtract from the latest date. If the column that contains the date is from str type, the date_format parameter must be provided in order to apply the offset.

0

Returns:

Type Description
Union[str, date, datetime, Any]

The latest date in the informed column, with the offset applied

Union[str, date, datetime, Any]

if it's the case. The type of the return depends on the type of

Union[str, date, datetime, Any]

the provided date column on the source table.

Source code in blipdataforge/facades.py
def get_latest_record_date(
    self,
    catalog: str,
    database: str,
    table: str,
    date_column: str,
    date_format: str = "",
    offset: int = 0
) -> Union[str, date, datetime, Any]:
    """Returns the date of the latest record in a table.

    Optionally, an offset can be applied to the result to get an
    earlier date.

    Disclaimer: This functions is an alternate for the old `initial_info`
    from `TakeSpark` library.

    Args:
        catalog:
            The target catalog to be used.
        database:
            The target database to be used.
        table:
            The target table to be used.
        date_column:
            The column which contains the date info of the latest record.
        date_format:
            The format of the date, according to the `datetime` format
            directives. This parameter is used to apply the offset when
            the date columns is of `str` type. If this parameter is not
            informed, the offset will not be applied.

            Example: if the dates on the `date_column` are provided like
            "2023-11-21", the `date_format` parameter must be `%Y-%m-%d`.
            Refer to Format Codes in the datetime python class docs to
            understand how to build this parameter.
        offset:
            The amount of days to subtract from the latest date. If the
            column that contains the date is from `str` type, the
            `date_format` parameter must be provided in order to apply the
            offset.

    Returns:
        The latest date in the informed column, with the offset applied
        if it's the case. The type of the return depends on the type of
        the provided date column on the source table.
    """
    return get_latest_record_date(
        context=self.context,
        catalog=catalog,
        database=database,
        table=table,
        date_column=date_column,
        date_format=date_format,
        offset=offset
    )

get_streaming_engine()

Get streaming engine.

This method returns a StreaminEngine object, used create or update streaming structures like tables and views.

Source code in blipdataforge/facades.py
def get_streaming_engine(self) -> Union[StreamingEngine, None]:
    """Get streaming engine.

    This method returns a `StreaminEngine` object, used create or update
    streaming structures like tables and views.
    """
    return get_streaming_engine()

ingest_data(payload, data_routing_id, data_routing_token=None)

Ingest data from various sources using Data Routing layer.

If the data_routing_token parameter is not set, the environment variable DATA_ROUTING_TOKEN will be used instead.

Parameters:

Name Type Description Default
payload dict

The payload that will be sent to Data Routing API.

required
data_routing_id str

The ID of the Data Routing pipeline created previously.

required
data_routing_token Optional[str]

The Data Routing API token created previously.

None
Source code in blipdataforge/facades.py
def ingest_data(
    self,
    payload: dict,
    data_routing_id: str,
    data_routing_token: Optional[str] = None,
) -> dict:
    """Ingest data from various sources using Data Routing layer.

    If the `data_routing_token` parameter is not set, the environment
    variable `DATA_ROUTING_TOKEN` will be used instead.

    Args:
        payload:
            The payload that will be sent to Data Routing API.
        data_routing_id:
            The ID of the Data Routing pipeline created previously.
        data_routing_token:
            The Data Routing API token created previously.
    """
    token = (
        data_routing_token
        if data_routing_token
        else self._confs.DATA_ROUTING_TOKEN
    )

    data_routing_inteface = DataRoutingInterface(
        url=self._confs.DATA_ROUTING_FIREHOSE_URL,
        data_routing_token=token
    )

    self.logger.info(
        f"Ingesting data through routing ID '{data_routing_id}' using "
        f"payload '{payload}'"
    )

    try:
        return data_routing_inteface.ingest_data(
            pipeline_id=data_routing_id,
            payload=payload
        )
    except Exception as e:
        self.logger.error(
            f"Error occurred in ingest_data function: {str(e)}. "
            f"Payload used in request: {payload}"
        )
        raise

ingest_google_sheet(sheet_key, catalog, database, table, index='')

Ingest google sheet through Gaia Data Routing.

NOTE: To ingest data from a Google Sheet, you must share this Google Sheet with the following email: data-platform@datarouting-api.iam.gserviceaccount.com. If you don't do this, the service will have no access to the Sheet. For more details, read the docs: https://dataforgedocs.blip.tools/user_guide/ingest_google_sheet/

This function creates an ingestion job by calling the Gaia Data Routing service. The ingestion job executes in another job cluster, and the link to the job run is logged in the terminal.

Note that the ingestion job does not block the execution of the current job (i.e. the job that called this function), so the notebook will continue to execute right after this function is called.

By default, this function reads all the data from the first sheet (or "page") of the Google Sheet. If you want to ingest data from another sheet/page of the Google Sheet, or, from a very specific range/region of the Google Sheet, you can provide a A1 notation value to the index argument, to select the specific range/region of the Google Sheet that you want to ingest/read.

Parameters:

Name Type Description Default
sheet_key str

The key of the target sheet to be ingested.

required
catalog str

The target catalog.

required
database str

The target database.

required
table str

The target table.

required
index str

A string (following the A1 notation) that specifies the range, or, the region of the Google Sheet to read and ingest.

''
Source code in blipdataforge/facades.py
def ingest_google_sheet(
    self,
    sheet_key: str,
    catalog: str,
    database: str,
    table: str,
    index: str = ""
) -> None:
    """Ingest google sheet through Gaia Data Routing.

    NOTE: To ingest data from a Google Sheet, you must
    share this Google Sheet with the following email: data-platform@datarouting-api.iam.gserviceaccount.com.
    If you don't do this, the service will have no access to the Sheet.
    For more details, read the docs: https://dataforgedocs.blip.tools/user_guide/ingest_google_sheet/

    This function creates an ingestion job by calling the Gaia Data Routing
    service. The ingestion job executes in another job cluster, and the link
    to the job run is logged in the terminal.

    Note that the ingestion job does not block the execution of the current
    job (i.e. the job that called this function), so the notebook will
    continue to execute right after this function is called.

    By default, this function reads all the data from the first sheet (or "page") of
    the Google Sheet. If you want to ingest data from another sheet/page of the Google
    Sheet, or, from a very specific range/region of the Google Sheet, you can provide
    a A1 notation value to the `index` argument, to select the specific range/region
    of the Google Sheet that you want to ingest/read.

    Args:
        sheet_key:
            The key of the target sheet to be ingested.
        catalog:
            The target catalog.
        database:
            The target database.
        table:
            The target table.
        index:
            A string (following the A1 notation) that specifies the range,
            or, the region of the Google Sheet to read and ingest.
    """
    if get_domain_catalog(self.context) != catalog:
        raise ValueError((
            "You provided a catalog that does not belong "
            "to your current environment."
        ))

    routing = DataRoutingAPIClient(
        url=self._confs.DATA_ROUTING_API_URL,
        auth_token=self._confs.DATA_ROUTING_API_TOKEN,
        context=self.context,
        logger=self.logger
    )

    routing.ingest_google_sheet(
        sheet_key=sheet_key,
        catalog=catalog,
        database=database,
        table=table,
        index=index
    )

persist_files(database, volume_name, destination_folder, files, catalog='')

Persist local files into domain's storage accont.

You can use this function to persist/save files into the domain's storage account. This function always persist the files that you provide in the files argument inside the landingzone container of the domain's storage account.

These files can then be accessed later through a path like this: /Volumes/{catalog}/{database}/{volume_name}/{destination_folder}. For more details, you should read more about Databricks Volumes at: https://learn.microsoft.com/pt-br/azure/databricks/volumes/

When defining the volume in which the files will be saved, you can either provide a volume that already exists, or, a volume that still doesn't exist. If the volume provided does not exist yet in the environment, the persist_files() function will automatically create this volume for you.

But if you want to provide a volume that already exist in the current environment, this volume must be connected to the landingzone container of your domain's storage account. If this volume is connected to a different container, then, this function will raise an exception.

Parameters:

Name Type Description Default
database str

The volume's database. The database will be created if does not exits.

required
volume_name str

The name of the volume to persist the files. The volume will be created if does not exits.

required
destination_folder str

The folder inside the volume where the files must be saved. The folder and its subfolders are created they don't exist.

required
files List[str]

A list containing the absolute paths for the files to be saved into domains landingzone. An example would be ['/local_disk/tmp/file_1.csv', '/local_disk/tmp/file_2.txt']

required
catalog Optional[str]

The target catalog. If not informed, the function uses the default domain catalog.

''
Source code in blipdataforge/facades.py
def persist_files(
    self,
    database: str,
    volume_name: str,
    destination_folder: str,
    files: List[str],
    catalog: Optional[str] = ""
) -> None:
    """Persist local files into domain's storage accont.

    You can use this function to persist/save files into the
    domain's storage account. This function always persist the files
    that you provide in the `files` argument inside the landingzone container
    of the domain's storage account.

    These files can then be accessed later through a path like this:
    `/Volumes/{catalog}/{database}/{volume_name}/{destination_folder}`.
    For more details, you should read more about Databricks Volumes at:
    https://learn.microsoft.com/pt-br/azure/databricks/volumes/

    When defining the volume in which the files will be saved, you can
    either provide a volume that already exists, or, a volume that
    still doesn't exist. If the volume provided does not exist yet in
    the environment, the `persist_files()` function will automatically create this
    volume for you.

    But if you want to provide a volume that already exist in the
    current environment, this volume must be connected to the landingzone
    container of your domain's storage account. If this volume is connected
    to a different container, then, this function will raise an exception.

    Args:
        database:
            The volume's database. The database will be created if does not
            exits.
        volume_name:
            The name of the volume to persist the files. The volume will be
            created if does not exits.
        destination_folder:
            The folder inside the volume where the files must be saved. The
            folder and its subfolders are created they don't exist.
        files:
            A list containing the absolute paths for the files to be
            saved into domains landingzone. An example would be
            `['/local_disk/tmp/file_1.csv', '/local_disk/tmp/file_2.txt']`
        catalog:
            The target catalog. If not informed, the function uses the
            default domain catalog.

    """
    _catalog = catalog if catalog else get_domain_catalog(context=self.context)

    persist_files(
        context=self.context,
        catalog=_catalog,
        database=database,
        volume_name=volume_name,
        destination_folder=destination_folder,
        files=files,
        logger=self.logger
    )

run_quality_checks(checks, scan_definition_name, df=None, api_key_id='', api_key_secret='')

Run quality checks on a table or dataframe.

Parameters:

Name Type Description Default
checks str

A string with Soda quality checks.

If the dataset name in the check string is not in the format catalog.database.table, a MalformedDatasetNameException exception will be raised.

Example: to run a check over a table, define the check string as follows: checks for catalog.database.table: - row_count > 0

required
scan_definition_name str

This parameter is used to aggregate results of checks from the same context. This parameter is also used as partition column of checks result table on Data Lake.

required
df Optional[DataFrame]

A Spark dataframe to be checked.

None
api_key_id str

The Soda Cloud API key id to send the results to the cloud.

''
api_key_secret str

The Soda Cloud API key secret to send the results to the cloud.

''

Returns:

Type Description
dict

This function returns a dict with all results reported by Soda in

dict

the quality checks.

Source code in blipdataforge/facades.py
def run_quality_checks(
    self,
    checks: str,
    scan_definition_name: str,
    df: Optional[DataFrame] = None,
    api_key_id: str = "",
    api_key_secret: str = "",
) -> dict:
    """Run quality checks on a table or dataframe.

    Args:
        checks:
            A string with Soda quality checks.

            If the dataset name in the check string is not in the format
            `catalog.database.table`, a MalformedDatasetNameException
            exception will be raised.

            Example: to run a check over a table, define the check string
            as follows:
            checks for catalog.database.table:
                - row_count > 0
        scan_definition_name:
            This parameter is used to aggregate results of checks from the
            same context. This parameter is also used as partition column
            of checks result table on Data Lake.
        df:
            A Spark dataframe to be checked.
        api_key_id:
            The Soda Cloud API key id to send the results to the cloud.
        api_key_secret:
            The Soda Cloud API key secret to send the results to the cloud.

    Returns:
        This function returns a dict with all results reported by Soda in
        the quality checks.
    """
    soda_checker = SodaChecker(
        context=self.context,
        checks=checks,
        scan_definition_name=scan_definition_name,
        df=df,
        api_key_id=api_key_id,
        api_key_secret=api_key_secret
    )
    checks_results = soda_checker.run_checks()
    self._report_quality_checks(checks_results)

    data_writer = DataWriter(
        df=soda_checker.get_result_as_spark_dataframe(),
        catalog=get_domain_catalog(self.context),
        database="blipdataforge",
        table="quality_checks_results",
        context=self.context,
        write_mode="append",
        schema_mode="mergeSchema",
        partition_by=["definitionName"],
        logger=self.logger
    )

    data_writer.write()
    return soda_checker.checks_result

send_data(payload, data_routing_id, data_routing_token=None)

Send data to various destinations using Data Routing layer.

The Data Routing layer uses the Delta Sharing as the default output method for sharing data.

The payload parameter must use the format: { "since_datetime_value": start_date, "until_datetime_value": end_date } where since_datetime_value and until_datetime_value must be in the format "%Y-%m-%d", like "2023-12-27".

If the data_routing_token parameter is not set, the environment variable DATA_ROUTING_TOKEN will be used instead.

Parameters:

Name Type Description Default
payload dict

The configuration to be used in the date filter to select the records that will be sent to the client.

required
data_routing_id str

The ID of the Data Routing pipeline created previously.

required
data_routing_token Optional[str]

The Data Routing API token created previously.

None
Source code in blipdataforge/facades.py
def send_data(
    self,
    payload: dict,
    data_routing_id: str,
    data_routing_token: Optional[str] = None
) -> dict:
    """Send data to various destinations using Data Routing layer.

    The Data Routing layer uses the Delta Sharing as the default
    output method for sharing data.

    The `payload` parameter must use the format:
    `{
        "since_datetime_value": start_date,
        "until_datetime_value": end_date
    }`
    where `since_datetime_value` and `until_datetime_value` must be in
    the format "%Y-%m-%d", like "2023-12-27".

    If the `data_routing_token` parameter is not set, the environment
    variable `DATA_ROUTING_TOKEN` will be used instead.

    Args:
        payload:
            The configuration to be used in the date filter to select the
            records that will be sent to the client.
        data_routing_id:
            The ID of the Data Routing pipeline created previously.
        data_routing_token:
            The Data Routing API token created previously.
    """
    token = (
        data_routing_token
        if data_routing_token
        else self._confs.DATA_ROUTING_TOKEN
    )

    data_routing_inteface = DataRoutingInterface(
        url=self._confs.DATA_ROUTING_FIREHOSE_URL,
        data_routing_token=token
    )

    self.logger.info(
        f"Sending data through routing ID '{data_routing_id}' using "
        f"payload '{payload}'"
    )

    try:
        return data_routing_inteface.send_data(
            pipeline_id=data_routing_id,
            payload=payload
        )
    except Exception as e:
        self.logger.error(
            f"Error occurred in send_data function: {str(e)}. "
            f"Payload used in request: {payload}")
        raise

share_files(receiver_email, cc_email, database, volume_name, folder, files, catalog='', email_title_opts=None)

Share data lake files.

Currently this function only supports sharing files through download links sent via email.

Parameters:

Name Type Description Default
receiver_email str

The e-mail of who will receive the files

required
cc_email str

The email of who will get a copy of the email

required
database str

The files database

required
volume_name str

The volume name where the files are in.

required
folder str

The full path inside the volume where the files are in. This parameter must not include the volume name nor the file name.

required
files Union[str, List[str]]

A file or a list of file names to be sent. Must include the file extension.

required
catalog Optional[str]

The files catalog. If not informed, the current domain catalog will be used.

''
email_title_opts Optional[EmailTitleOpts]

Optional. You can provide an EmailTitleOpts object in this argument to change the behaviour of title of the email that will be sent by the service. For more details, see the documentation of the function create_email_title_opts(). If none specified, the default email title is used instead.

None
Source code in blipdataforge/facades.py
def share_files(
    self,
    receiver_email: str,
    cc_email: str,
    database: str,
    volume_name: str,
    folder: str,
    files: Union[str, List[str]],
    catalog: Optional[str] = "",
    email_title_opts: Optional[EmailTitleOpts] = None,
) -> None:
    """Share data lake files.

    Currently this function only supports sharing files through download
    links sent via email.

    Args:
        receiver_email:
            The e-mail of who will receive the files
        cc_email:
            The email of who will get a copy of the email
        database:
            The files database
        volume_name:
            The volume name where the files are in.
        folder:
            The full path inside the volume where the files are in. This
            parameter must not include the volume name nor the file name.
        files:
            A file or a list of file names to be sent. Must include the
            file extension.
        catalog:
            The files catalog. If not informed, the current domain catalog
            will be used.
        email_title_opts:
            Optional. You can provide an `EmailTitleOpts` object in this argument
            to change the behaviour of title of the email that will be sent by the service.
            For more details, see the documentation of the function
            [`create_email_title_opts()`][blipdataforge.DataPlatform.create_email_title_opts].
            If none specified, the default email title is used instead.
    """
    _catalog = catalog if catalog else get_domain_catalog(context=self.context)

    routing = DataRoutingAPIClient(
        url=self._confs.DATA_ROUTING_API_URL,
        auth_token=self._confs.DATA_ROUTING_API_TOKEN,
        context=self.context,
        logger=self.logger
    )

    routing.share_files(
        receiver_email=receiver_email,
        cc_email=cc_email,
        files=files,
        catalog=_catalog,
        database=database,
        volume_name=volume_name,
        folder=folder,
        email_title_opts=email_title_opts
    )

write(df, write_mode, catalog, database, table, path=None, schema_mode=None, partition_by=None, merge_fields=None, merge_condition=None, merge_schema=False, table_comment=None, columns_comments=None)

Writes a spark DataFrame on data lake.

If there is a data contract registred for the table being written, this methods automatically performs an automated data quality check based on data contract information.

Parameters:

Name Type Description Default
df DataFrame

A spark dataframe to be written into data lake.

required
write_mode str

Specifies the write behavior. Accepts values overwrite, append and upsert.

If using overwrite or append, the database and table are created if not exists.

If using upsert, define merge_condition and merge_fields parameters to configure upsert execution.

required
catalog str

The target catalog to be used.

required
database str

The target database (also called schema) to save the dataframe. It creates the database if not exists.

required
table str

The target table that will receive the dataframe. It creates the table if not exists.

required
table_comment Optional[str]

The table's commentary, also called table description. The commentary should contain information regarding the meaning of the data for the business.

None
columns_comments Optional[Dict[str, str]]

The commentaries of columns, also called columns descriptions. A column description gives a brief explanation on the meaning of that column regarding the table context.

None
path Optional[str]

DISCLAIMER: This parameter exists only for the DSLab migration and will be deprecated soon.

The output path to write the table data. If not defined, the path will be defined using governance rules.

None
schema_mode Optional[str]

The schema mode used to write the table schema. Accepts modes mergeSchema and overwriteSchema. This parameter has privileges over merge_schema, so if both parameters are set, the schema_mode will be used and merge_schema will be ignored.

None
partition_by Optional[List[str]]

List of column names to be used as partitions.

None
merge_fields Optional[List[str]]

List of column names to insert or update on upsert operation. The columns on source dataframe must have the same name of the columns on target table.

If using write_mode='upsert' and merge_fields is not defined, the upsert operation will try to update all colummns of source dataframe into the target table.

None
merge_condition Optional[Union[str, List[str]]]

The condition that upsert operation will evaluate to decide whether a source record should be updated or inserted on target table. This parameter is considered mandatory if using write_mode='upsert'.

This parameter accepts a str containing the condition or a list of column names that will be compared on equality.

Example using list: to upsert a dataframe into a table if columns "id" and "name" of source dataframe matches with columns "id" and "name" of the target table, the list ["id", "name"] must be passed to merge_condition parameter. In this case, the upsert operation will be called using the condition "target.id = source.id AND target.name = source.name".

Example using str: to upsert a dataframe into a table using a custom condition, a string with the condition can be passed to merge_condition. In this case the user must provide the column names preceded by "target." and "source.", like target.Id = source.InternalId AND target.StorageDateDayBR >= 2023-01-01.

None
merge_schema Optional[bool]

DEPRECATED. This parameter is set to be removed in future releases. Use schema_mode instead. Enables delta lake merge schema. See https://delta.io/blog/2023-02-08-delta-lake-schema-evolution/ for details.

False
Source code in blipdataforge/facades.py
def write(
    self,
    df: DataFrame,
    write_mode: str,
    catalog: str,
    database: str,
    table: str,
    path: Optional[str] = None,
    schema_mode: Optional[str] = None,
    partition_by: Optional[List[str]] = None,
    merge_fields: Optional[List[str]] = None,
    merge_condition: Optional[Union[str, List[str]]] = None,
    merge_schema: Optional[bool] = False,
    table_comment: Optional[str] = None,
    columns_comments: Optional[Dict[str, str]] = None,
) -> None:
    """Writes a spark `DataFrame` on data lake.

    If there is a data contract registred for the table being written,
    this methods automatically performs an automated data quality check
    based on data contract information.

    Args:
        df:
            A spark dataframe to be written into data lake.
        write_mode:
            Specifies the write behavior. Accepts values `overwrite`,
            `append` and `upsert`.

            If using `overwrite` or `append`, the database and table
            are created if not exists.

            If using `upsert`, define `merge_condition` and `merge_fields`
            parameters to configure upsert execution.
        catalog:
            The target catalog to be used.
        database:
            The target database (also called schema) to save the dataframe.
            It creates the database if not exists.
        table:
            The target table that will receive the dataframe. It creates
            the table if not exists.
        table_comment:
            The table's commentary, also called table description.
            The commentary should contain information regarding the meaning
            of the data for the business.
        columns_comments:
            The commentaries of columns, also called columns descriptions.
            A column description gives a brief explanation on the meaning of
            that column regarding the table context.
        path:
            DISCLAIMER: This parameter exists only for the DSLab migration
            and will be deprecated soon.

            The output path to write the table data. If not defined,
            the path will be defined using governance rules.
        schema_mode:
            The schema mode used to write the table schema. Accepts modes
            `mergeSchema` and `overwriteSchema`.
            This parameter has privileges over `merge_schema`, so if both
            parameters are set, the `schema_mode` will be used and
            `merge_schema` will be ignored.
        partition_by:
            List of column names to be used as partitions.
        merge_fields:
            List of column names to insert or update on upsert operation.
            The columns on source dataframe must have the same name of the
            columns on target table.

            If using `write_mode='upsert'` and `merge_fields` is not
            defined, the upsert operation will try to update all colummns
            of source dataframe into the target table.
        merge_condition:
            The condition that upsert operation will evaluate to decide
            whether a source record should be updated or inserted on
            target table. This parameter is considered mandatory if
            using `write_mode='upsert'`.

            This parameter accepts a `str` containing the condition or a
            `list` of column names that will be compared on equality.

            Example using `list`: to upsert a dataframe into a table if
            columns "id" and "name" of source dataframe matches with
            columns "id" and "name" of the target table, the list
            `["id", "name"]` must be passed to `merge_condition`
            parameter. In this case, the upsert operation will be called
            using the condition
            `"target.id = source.id AND target.name = source.name"`.

            Example using `str`: to upsert a dataframe into a table using
            a custom condition, a string with the condition can be passed
            to `merge_condition`. In this case the user must provide the
            column names preceded by "target." and "source.", like
            `target.Id = source.InternalId AND
            target.StorageDateDayBR >= 2023-01-01`.
        merge_schema:
            `DEPRECATED`. This parameter is set to be removed in future
            releases. Use `schema_mode` instead.
            Enables delta lake merge schema. See
            https://delta.io/blog/2023-02-08-delta-lake-schema-evolution/
            for details.
    """
    data_writer = DataWriter(
        df=df,
        write_mode=write_mode,
        catalog=catalog,
        database=database,
        table=table,
        table_comment=table_comment,
        columns_comments=columns_comments,
        path=path,
        context=self.context,
        schema_mode=schema_mode,
        partition_by=partition_by,
        merge_fields=merge_fields,
        merge_condition=merge_condition,
        merge_schema=merge_schema,
        logger=self.logger
    )

    self._automated_quality_checks(
        catalog=catalog,
        database=database,
        table=table,
        df=df
    )

    data_writer.write()
    try:
        data_writer.write_metadata()
    except Exception as e:
        self.logger.error(f"Failed to add metadata: {str(e)}")

write_elasticsearch(host, port, index, df, write_mode='append', op_type='index', chunk_size=1000, id_column=None, api_key=None, user=None, password=None)

Write a dataframe to an Elasticsearch database.

Note: This function depends on the 'org.elasticsearch:elasticsearch-spark-30_2.12:8.17.3' maven library, so please add it to your ADF pipeline activity or install it in your cluster.

Parameters:

Name Type Description Default
host str

A string containing a hostname, which identifies where your Elasticsearch database is hosted. Usually in the form https://my-project.es.eastus.azure.elastic.cloud.

required
port int

A port number to use in the connection with the Elasticsearch.

required
index str

The name of the existing index in the database to use.

required
df DataFrame

The PySpark dataframe with the data to be written to the Elasticsearch.

required
write_mode str

The spark write mode. This parameter controls how to perform the write operation. Can be any of:

  • overwrite: executes a delete operation on index before writing the income data

  • append: just add the new data, without deletion.

'append'
op_type str

The operation elasticsearch should perform. Can be any of:

  • index: new data is added while existing data (based on its id) is replaced (reindexed).

  • create: adds new data - if the data already exists (based on its id), an exception is thrown.

  • update: updates existing data (based on its id). If no data is found, an exception is thrown.

  • upsert: known as merge or insert if the data does not exist, updates if the data exists (based on its id).

  • delete: deletes existing data (based on its id). If no data is found, an exception is thrown.

'index'
chunk_size int

Size (in entries) for batch writes. This setting is per task instance; it gets multiplied at runtime by the total number of running tasks/workers.

1000
id_column Optional[str]

The column in your pyspark DataFrame that contains the document ID.

None
api_key Optional[str]

The API key to use in the connection with Elasticsearch. If both API key and user/pass are provided, the API key will be used.

None
user Optional[str]

The username to use in the connection with the Elasticsearch.

None
password Optional[str]

The password to use in conjunction with the username provided at user in the connection with the database.

None
Source code in blipdataforge/facades.py
def write_elasticsearch(
    self,
    host: str,
    port: int,
    index: str,
    df: DataFrame,
    write_mode: str = "append",
    op_type: str = "index",
    chunk_size: int = 1000,
    id_column: Optional[str] = None,
    api_key: Optional[str] = None,
    user: Optional[str] = None,
    password: Optional[str] = None
) -> None:
    """Write a dataframe to an Elasticsearch database.

    Note: This function depends on the
    'org.elasticsearch:elasticsearch-spark-30_2.12:8.17.3' maven
    library, so please add it to your ADF pipeline activity or install it
    in your cluster.

    Args:
        host:
            A string containing a hostname, which identifies where your
            Elasticsearch database is hosted. Usually in the form
            `https://my-project.es.eastus.azure.elastic.cloud`.
        port:
            A port number to use in the connection with the Elasticsearch.
        index:
            The name of the existing index in the database to use.
        df:
            The PySpark dataframe with the data to be written to the
            Elasticsearch.
        write_mode:
            The spark write mode. This parameter controls how to perform
            the write operation. Can be any of:

            - `overwrite`: executes a `delete` operation on index before
            writing the income data

            - `append`: just add the new data, without deletion.
        op_type:
            The operation elasticsearch should perform. Can be any of:

            - `index`: new data is added while existing data (based on its
            id) is replaced (reindexed).

            - `create`: adds new data - if the data already exists (based
            on its id), an exception is thrown.

            - `update`: updates existing data (based on its id). If no data
            is found, an exception is thrown.

            - `upsert`: known as merge or insert if the data does not
            exist, updates if the data exists (based on its id).

            - `delete`: deletes existing data (based on its id). If no data
            is found, an exception is thrown.
        chunk_size:
            Size (in entries) for batch writes. This setting is per task
            instance; it gets multiplied at runtime by the total number
            of running tasks/workers.
        id_column:
            The column in your pyspark DataFrame that contains the
            document ID.
        api_key:
            The API key to use in the connection with Elasticsearch. If
            both API key and user/pass are provided, the API key will be
            used.
        user:
            The username to use in the connection with the Elasticsearch.
        password:
            The password to use in conjunction with the username provided
            at `user` in the connection with the database.
    """
    connector = ElasticsearchConnector(
        host=host,
        port=port,
        logger=self.logger,
        api_key=api_key,
        user=user,
        password=password
    )

    connector.write(
        df=df,
        write_mode=write_mode,
        op_type=op_type,
        index=index,
        chunk_size=chunk_size,
        id_column=id_column
    )

write_eventhub(df, namespace, topic, conn_str, port=9093, batch_size=524288, request_timeout=300000, linger_ms=2, buffer_memory=33554432, max_block_ms=60000)

Write Spark DataFrame to EventHub topic.

You can use this method to write the data from a Spark DataFrame into an existing EventHub topic. For now, this method uses the normal/batch backend of spark (spark.write).

EventHubs events need to be in a specific format, so this method adds a column "value" to the dataframe, containing the dataframe data as a JSON string.

Parameters:

Name Type Description Default
df DataFrame

A Spark DataFrame with the data that you want to write.

required
namespace str

The EventHub namespace to be used.

required
topic str

The name topic of the EventHub topic to write to.

required
conn_str str

The connection string to be used to connect and send data to eventhub. The connection string must be in the format: Endpoint=sb://<NAMESPACE>.servicebus.windows.net/; SharedAccessKeyName=<KEY NAME>;SharedAccessKey=<KEY VALUE>.

required
port int

The port that will be used to connect with EventHub server. If not informed, use default port 9093.

9093
batch_size int

This configuration controls the default batch size in bytes, and will be passed directly to the kafka.batch.size config.

524288
request_timeout int

Controls the maximum amount of time the EventHub producer will wait for the response of a request.

300000
linger_ms int

Controls how long the producer will wait for new records before sending the batch to the broker. This configuration can improve the transmission by reducing the amount of "uncompleted" batch requests sent to broker.

2
buffer_memory int

The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will block for max.block.ms after which it will throw an exception.

33554432
max_block_ms int

It is used by producers and defines the maximum time (in milliseconds) that a synchronous send() call can be blocked while Kafka attempts to obtain metadata from the cluster or space in the internal buffer for the message.

60000
Source code in blipdataforge/facades.py
def write_eventhub(
    self,
    df: DataFrame,
    namespace: str,
    topic: str,
    conn_str: str,
    port: int = 9093,
    batch_size: int = 524288,
    request_timeout: int = 300000,
    linger_ms: int = 2,
    buffer_memory: int = 33554432,
    max_block_ms: int = 60000
) -> None:
    """Write Spark DataFrame to EventHub topic.

    You can use this method to write the data from a Spark DataFrame into
    an existing EventHub topic. For now, this method uses the normal/batch
    backend of spark (spark.write).

    EventHubs events need to be in a specific format, so this method adds a
    column "value" to the dataframe, containing the dataframe data as a
    JSON string.

    Args:
        df:
            A Spark DataFrame with the data that you want to write.
        namespace:
            The EventHub namespace to be used.
        topic:
            The name topic of the EventHub topic to write to.
        conn_str:
            The connection string to be used to connect and send
            data to eventhub. The connection string must be in the format:
            `Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;
            SharedAccessKeyName=<KEY NAME>;SharedAccessKey=<KEY VALUE>`.
        port:
            The port that will be used to connect with EventHub server. If
            not informed, use default port 9093.
        batch_size:
            This configuration controls the default batch size in bytes,
            and will be passed directly to the `kafka.batch.size` config.
        request_timeout:
            Controls the maximum amount of time the EventHub producer will
            wait for the response of a request.
        linger_ms:
            Controls how long the producer will wait for new records before
            sending the batch to the broker. This configuration can improve
            the transmission by reducing the amount of "uncompleted" batch
            requests sent to broker.
        buffer_memory:
            The total bytes of memory the producer can use to buffer
            records waiting to be sent to the server. If records are sent
            faster than they can be delivered to the server the producer
            will block for max.block.ms after which it will throw an
            exception.
        max_block_ms:
            It is used by producers and defines the maximum time
            (in milliseconds) that a synchronous send() call can be blocked
            while Kafka attempts to obtain metadata from the cluster or
            space in the internal buffer for the message.
    """
    connector = EventHubConnector(self.logger)
    eventhub_conf = EventHubProducerConf(
        topic=topic,
        namespace=namespace,
        conn_str=conn_str,
        port=port,
        batch_size=batch_size,
        request_timeout=request_timeout,
        linger_ms=linger_ms,
        buffer_memory=buffer_memory,
        max_block_ms=max_block_ms
    )

    if "value" in df.columns:
        raise ValueError(
            "The dataframe cannot contain a column named 'value'."
        )

    df_eventhub = df.withColumn(
        "value", to_json(struct([df[col] for col in df.columns]))
    )
    connector.write_eventhub(df=df_eventhub, eventhub=eventhub_conf)

write_mongodb(conn_str, df, database, collection, write_mode, operation_type='replace', upsert_document=False, convert_json=False, ignore_null=False, id_field_list=['_id'], max_batch_size=512, comment='')

Write a dataframe to a MongoDB collection.

Note: This function works only in Single User clusters, otherwise an AnalysisException will raise. Also, this function depends on the 'org.mongodb.spark:mongo-spark-connector_2.12:10.4.1' maven library, so please add it to your ADF pipeline activity or install it in your Single User cluster.

Parameters:

Name Type Description Default
conn_str str

The connection string used to establish connection with MongoDB server. Must be in the format mongodb+srv://<db_username>:<db_password>@<mongo_server>.mongodb.net/

required
df DataFrame

The PySpark dataframe with the data to be written to the collection.

required
database str

The database where the target collection resides.

required
collection str

The target collection to write data to.

required
write_mode str

Write modes supported: overwrite and append. If you specify the overwrite write mode, the connector drops the target collection and creates a new collection that uses the default collection options. This behavior can affect collections that don't use the default options, such as sharded collections, collections with nondefault collations and time-series collections.

required
operation_type str

Specifies how the write operation must save the documents. Accepts insert, replace and update. Each behavior is described as follows:

  • insert: Insert the data, even if there is a match with id_field_list.
  • replace: Replace an existing document that matches the id_field_list value with the new data. If no match exists, the value of upsert_document indicates whether the connector inserts a new document.
  • update: Update an existing document that matches the id_field_list value with the new data. If no match exists, the value of upsert_document indicates whether the connector inserts a new document.
'replace'
upsert_document bool

When true, replace and update operations will insert the data if no match exists. For time series collections, you must set upsert_document to false.

False
convert_json Union[bool, str]

Specifies whether the connector parses the string and converts extended JSON into BSON. The default value is False, which means that the connector leaves all values as strings. This function accepts any, objectOrArrayOnly or False, which the behaviors are as follows:

  • any (str): The connector converts all JSON values to BSON.

    • "{a: 1}" becomes {a: 1}.
    • "[1, 2, 3]" becomes [1, 2, 3].
    • "true" becomes true.
    • "01234" becomes 1234.
    • "{a:b:c}" doesn't change.
  • objectOrArrayOnly (str): The connector converts only JSON objects and arrays to BSON.

    • "{a: 1}" becomes {a: 1}.
    • "[1, 2, 3]" becomes [1, 2, 3].
    • "true" doesn't change.
    • "01234" doesn't change.
    • "{a:b:c}" doesn't change.
  • False (bool):The connector leaves all values as strings.

False
ignore_null bool

When True, the connector ignores any null values when writing, including null values in arrays and nested documents, so if you have a document with a field with null value, just this field will NOT be written to mongo, but the document will be created with other fields that are not null. Defaults to False.

False
id_field_list List[str]

List of fields by which to split the collection data. Defaults to ["_id"].

['_id']
max_batch_size int

Specifies the maximum number of operations to batch in bulk operations. Defaults to 512

512
comment str

The comment to append to the write operations. Comments appear in the output of the Database profiler.

''
Source code in blipdataforge/facades.py
def write_mongodb(
    self,
    conn_str: str,
    df: DataFrame,
    database: str,
    collection: str,
    write_mode: str,
    operation_type: str = "replace",
    upsert_document: bool = False,
    convert_json: Union[bool, str] = False,
    ignore_null: bool = False,
    id_field_list: List[str] = ["_id"],
    max_batch_size: int = 512,
    comment: str = ""
) -> None:
    """Write a dataframe to a MongoDB collection.

    Note: This function works only in Single User clusters, otherwise an
    AnalysisException will raise. Also, this function depends on the
    'org.mongodb.spark:mongo-spark-connector_2.12:10.4.1' maven library,
    so please add it to your ADF pipeline activity or install it in your
    Single User cluster.

    Args:
        conn_str:
            The connection string used to establish connection with MongoDB
            server. Must be in the format
            `mongodb+srv://<db_username>:<db_password>@<mongo_server>.mongodb.net/`
        df:
            The PySpark dataframe with the data to be written to the
            collection.
        database:
            The database where the target collection resides.
        collection:
            The target collection to write data to.
        write_mode:
            Write modes supported: `overwrite` and `append`. If you specify
            the `overwrite` write mode, the connector drops the target
            collection and creates a new collection that uses the default
            collection options. This behavior can affect collections that
            don't use the default options, such as sharded collections,
            collections with nondefault collations and time-series
            collections.
        operation_type:
            Specifies how the write operation must save the documents.
            Accepts `insert`, `replace` and `update`. Each behavior is
            described as follows:

            - `insert`: Insert the data, even if there is a match with
            `id_field_list`.
            - `replace`: Replace an existing document that matches the
            `id_field_list` value with the new data. If no match exists,
            the value of `upsert_document` indicates whether the connector
            inserts a new document.
            - `update`: Update an existing document that matches the
            `id_field_list` value with the new data. If no match exists,
            the value of `upsert_document` indicates whether the connector
            inserts a new document.
        upsert_document:
            When true, replace and update operations will insert the data
            if no match exists. For time series collections, you must set
            `upsert_document` to false.
        convert_json:
            Specifies whether the connector parses the string and converts
            extended JSON into BSON. The default value is False, which
            means that the connector leaves all values as strings.
            This function accepts `any`, `objectOrArrayOnly` or `False`,
            which the behaviors are as follows:

            - `any` (str): The connector converts all JSON values to BSON.
                - "{a: 1}" becomes {a: 1}.
                - "[1, 2, 3]" becomes [1, 2, 3].
                - "true" becomes true.
                - "01234" becomes 1234.
                - "{a:b:c}" doesn't change.

            - `objectOrArrayOnly` (str): The connector converts only JSON
            objects and arrays to BSON.
                - "{a: 1}" becomes {a: 1}.
                - "[1, 2, 3]" becomes [1, 2, 3].
                - "true" doesn't change.
                - "01234" doesn't change.
                - "{a:b:c}" doesn't change.

            - False (bool):The connector leaves all values as strings.
        ignore_null:
            When `True`, the connector ignores any null values when
            writing, including null values in arrays and nested documents,
            so if you have a document with a field with `null` value, just
            this field will NOT be written to mongo, but the document will
            be created with other fields that are not `null`. Defaults to
            `False`.
        id_field_list:
             List of fields by which to split the collection data.
             Defaults to `["_id"]`.
        max_batch_size:
            Specifies the maximum number of operations to batch in bulk
            operations. Defaults to `512`
        comment:
            The comment to append to the write operations. Comments appear
            in the output of the [Database profiler](https://www.mongodb.com/docs/manual/reference/database-profiler/).
    """
    mongo_conn = MongoDBConnector(
        conn_str=conn_str,
        logger=self.logger
    )

    try:
        mongo_conn.write(
            df=df,
            database=database,
            collection=collection,
            write_mode=write_mode,
            operation_type=operation_type,
            upsert_document=upsert_document,
            convert_json=convert_json,
            ignore_null=ignore_null,
            id_field_list=id_field_list,
            max_batch_size=max_batch_size,
            comment=comment
        )
    except SparkConnectGrpcException as e:
        self.logger.error(
            "The 'org.mongodb.spark:mongo-spark-connector_2.12:10.4.1' "
            "library must be installed in the cluster to interact with "
            "MongoDB. Please, add this library to you cluster and execute "
            "the job again."
        )
        raise e

    except AnalysisException as e:
        self.logger.error(
            "The write_mongodb function works only in Single User clusters."
            "Please use another cluster or execute your job from an ADF "
            "pipeline."
        )
        raise e

write_to_opensearch(df, host, port, user, password, index, chunk_size, write_mode='append', op_type='upsert', id_column=None, use_spark_native_writer=True)

Write Spark DataFrame to Opensearch database.

You can use this function to write a Spark DataFrame to an Opensearch database. Opensearch is a NoSQL database (i.e. it's a database of JSON Documents). This function will basically: 1) collect all rows from your Spark DataFrame; 2) transform them into "dicts", which are the "JSON documents" equivalents in Python; 3) insert the necessary Opensearch operation directives into this list; 4) then, it will start sending this data to the Opensearch server in batches/chunks.

The user and password arguments are related to the authentication process in the server, while the host and port arguments define where the Opensearch server is, and where the connection is established.

The chunk_size argument defines the maximum number of rows from your Spark DataFrame that are included in each chunk that is sent to the Opensearch server. For example, if your DataFrame contains 10000 rows, and you set chunk_size to 1000, then, this function will split your DataFrame into 10 chunks, and send each chunk sequentially to the Opensearch server. By controlling the size of the chunk that is sent to the server, you can also control how much data (or, how much "stuff") the server needs to process in each request. This is useful if your Opensearch server have a rate limit, or, a if it does not have much powerful resources to process huge amounts of data in a single request.

This function have two different backends: one is the Opensearch Spark native driver, and the other one is the Opensearch Python SDK. By default, this function uses the Opensearch Spark native driver to write the data. Because this native driver is much more scalable and appropriate for our big data environment. However, in order to use this native driver backend you must have a specific Maven package installed in your environment/cluster.

https://central.sonatype.com/artifact/org.opensearch.client/opensearch-spark-30_2.12

The function will check for the presence of the native driver in your environment. If it does not find this native driver, then, you likely does not have this Maven package installed in your environment, and therefore, an AssertionError is raised by the function, warning you about this problem.

On the other side, we have the Python SDK backend, which does not scale very well. But it handles moderate DataFrames (< 400k rows) decently well, and also, offers a much better visibility of "what is happenning" behind the scene. Because the Python SDK have much better and useful error messages that you can use to debug your application, and also, it returns a list containing the raw HTTP responses of the server, which gives you a lot of information about the write operations performed in the server. Therefore, you can easily see if any of your records was not succesfully recorded in the server, for whatever reason.

Unfortunately, you do not have such visibility in the Spark native driver backend. In fact, the Spark native driver, although it scales very well, it does not provide very useful/informative error messages for the users. Therefore, if there is something wrong in your code (e.g. you may forgot to change the hostname of your server in the code, or, you are using the wrong credentials, or, your data might have a different schema than the documents that are already present in index your index, etc.) you will probably have some hard time while debugging if you are using the Spark native driver backend.

Parameters:

Name Type Description Default
df DataFrame

A Spark DataFrame with the data that you want to write.

required
host str

A string containing a hostname, which identifies where your Opensearch database is hosted.

required
port int

A port number to use in the connection with the database.

required
user str

The username to use in the connection with the database.

required
password str

The password to use in conjunction with the username provided at user in the connection with the database.

required
index str

The name of the existing index in the database to use.

required
chunk_size int

Sets the size of each chunk of data that is sent to the Opensearch server. This chunk size should be the number of entries/rows to be included in each chunk.

required
write_mode str

The Spark write mode to use when writing the data (e.g. append, overwrite, etc.).

'append'
op_type str

Default to "upsert". The name of the Opensearch operation to use while writing the data to the database. Supported values are: "upsert" (for an upsert operation) "index" (for an index operation).

'upsert'
id_column Optional[str]

Optional. The name of the column that contains the "document id" to use in the database.

None
use_spark_native_writer bool

Default to True. Whether to use or not the Spark native writer class to write the data. If False, the Python SDK for Opensearch is used instead to write the data. However, the Python SDK writer is much slower and it is not capable of handling huge amounts of data, so be careful with it.

True

Returns:

Type Description
List[dict]

A list of responses. In more details, each element in the response returned by the

List[dict]

Opensearch server for each chunk/batch of data from your Spark DataFrame that was

List[dict]

sent to the server. You can use this list of responses to investigate if some

List[dict]

error occurred during the ingest process.

Source code in blipdataforge/facades.py
def write_to_opensearch(
    self,
    df: DataFrame,
    host: str,
    port: int,
    user: str,
    password: str,
    index: str,
    chunk_size: int,
    write_mode: str = "append",
    op_type: str = "upsert",
    id_column: Optional[str] = None,
    use_spark_native_writer: bool = True,
) -> List[dict]:
    """Write Spark DataFrame to Opensearch database.

    You can use this function to write a Spark DataFrame to an Opensearch database.
    Opensearch is a NoSQL database (i.e. it's a database of JSON Documents). This
    function will basically: 1) collect all rows from your Spark DataFrame; 2) transform them into
    "dicts", which are the "JSON documents" equivalents in Python; 3) insert the
    necessary Opensearch operation directives into this list; 4) then, it will start
    sending this data to the Opensearch server in batches/chunks.

    The `user` and `password` arguments are related to the authentication process in the
    server, while the `host` and `port` arguments define where the Opensearch server is,
    and where the connection is established.

    The `chunk_size` argument defines the maximum number of rows from your Spark DataFrame
    that are included in each chunk that is sent to the Opensearch server. For example,
    if your DataFrame contains 10000 rows, and you set `chunk_size` to 1000, then, this
    function will split your DataFrame into 10 chunks, and send each chunk sequentially
    to the Opensearch server. By controlling the size of the chunk that is sent to the
    server, you can also control how much data (or, how much "stuff") the server needs to
    process in each request. This is useful if your Opensearch server have a rate limit, or,
    a if it does not have much powerful resources to process huge amounts of data in a
    single request.

    This function have two different backends: one is the Opensearch Spark native driver,
    and the other one is the Opensearch Python SDK. By default, this function uses the
    Opensearch Spark native driver to write the data. Because this native driver is much more
    scalable and appropriate for our big data environment. However, in order to use this native driver backend you must
    have a specific Maven package installed in your environment/cluster.

    https://central.sonatype.com/artifact/org.opensearch.client/opensearch-spark-30_2.12

    The function will check for the presence of the native driver in your environment. If it
    does not find this native driver, then, you likely does not have this Maven package installed
    in your environment, and therefore, an `AssertionError` is raised by the function, warning
    you about this problem.

    On the other side, we have the Python SDK backend, which does not scale very well. But it
    handles moderate DataFrames (< 400k rows) decently well, and also, offers a much better visibility
    of "what is happenning" behind the scene. Because the Python SDK have much better and useful error
    messages that you can use to debug your application, and also, it returns a list containing the raw
    HTTP responses of the server, which gives you a lot of information about the write
    operations performed in the server. Therefore, you can easily see if any of your records was not
    succesfully recorded in the server, for whatever reason.

    Unfortunately, you do not have such visibility in the Spark native driver backend. In fact, the
    Spark native driver, although it scales very well, it does not provide very useful/informative
    error messages for the users. Therefore, if there is something wrong in your code (e.g. you may
    forgot to change the hostname of your server in the code, or, you are using the wrong credentials,
    or, your data might have a different schema than the documents that are already present in index
    your index, etc.) you will probably have some hard time while debugging if you are using the Spark native driver
    backend.

    Args:
        df:
            A Spark DataFrame with the data that you want to write.
        host:
            A string containing a hostname, which identifies where your Opensearch
            database is hosted.
        port:
            A port number to use in the connection with the database.
        user:
            The username to use in the connection with the database.
        password:
            The password to use in conjunction with the username provided at `user`
            in the connection with the database.
        index:
            The name of the existing index in the database to use.
        chunk_size:
            Sets the size of each chunk of data that is sent to the Opensearch server. This chunk size
            should be the number of entries/rows to be included in each chunk.
        write_mode:
            The Spark write mode to use when writing the data (e.g. append, overwrite, etc.).
        op_type:
            Default to "upsert". The name of the Opensearch operation to use while writing
            the data to the database. Supported values are: "upsert" (for an upsert operation)
            "index" (for an index operation).
        id_column:
            Optional. The name of the column that contains the "document id" to use in the database.
        use_spark_native_writer:
            Default to True. Whether to use or not the Spark native writer class to write
            the data. If False, the Python SDK for Opensearch is used instead
            to write the data. However, the Python SDK writer is much slower and it is
            not capable of handling huge amounts of data, so be careful with it.

    Returns:
        A list of responses. In more details, each element in the response returned by the
        Opensearch server for each chunk/batch of data from your Spark DataFrame that was
        sent to the server. You can use this list of responses to investigate if some
        error occurred during the ingest process.
    """
    conn = OpensearchConnector(
        host=host,
        port=port,
        user=user,
        password=password,
        write_mode=write_mode,
        logger=self.logger,
        id_column=id_column,
        use_spark_native_writer=use_spark_native_writer
    )
    return conn.write_to_opensearch(
        df=df,
        index=index,
        chunk_size=chunk_size,
        op_type=op_type
    )