Skip to content

Data contract

Provides classes for working with data contract on Blip's data ecossystem.

Data contracts are the source of truth of data products in Blip's data ecossystem. It keeps information about domain ownership, columns metadata, update frequency, availability period of data and more. All this information is necessary to ensure the governance of our data ecossystem and is used in BlipDataForge library to enforce governance rules during data read, write and transformations.

DataContract dataclass

Base class to work with data contracts.

Data contracts are the basic output of the communication with Data Contract API. It contains metadata about data products used in governance enforcements.

Attributes:

Name Type Description
product_name str

Identification name of the data contract.

catalog str

The data product catalog.

schema str

The data product schema / database.

table str

The data product table.

columns list

Raw metadata about table columns.

owner_email str

Email of the data product owner.

support_email str

Email of the directly responsable for the freshness of the data product.

lifcycle_time str

Number of the days to store the data.

Source code in blipdataforge/data_contract.py
@dataclass
class DataContract():
    """Base class to work with data contracts.

    Data contracts are the basic output of the communication
    with Data Contract API. It contains metadata about data products
    used in governance enforcements.

    Attributes:
        product_name:
            Identification name of the data contract.
        catalog:
            The data product catalog.
        schema:
            The data product schema / database.
        table:
            The data product table.
        columns:
            Raw metadata about table columns.
        owner_email:
            Email of the data product owner.
        support_email:
            Email of the directly responsable for the
            freshness of the data product.
        lifcycle_time:
            Number of the days to store the data.
    """

    product_name: str
    catalog: str
    database: str
    table: str
    columns: list
    owner_email: str
    support_email: str
    lifecycle_time: Optional[int]

    def get_quality_checks(self) -> str:
        """Returns a quality check string based on Data Contract attributes.

        The quality checks generated by this functions cover up schema and
        expected values.

        Returns:
            A check string compatible with data platform quality engine.
        """
        checks = ""
        checks += f"checks for {self.catalog}.{self.database}.{self.table}:"
        checks += self._schema_check()
        checks += self._validity_check()

        return checks

    def _schema_check(self) -> str:
        columns_names = []
        columns_with_types = {}
        for column in self.columns:
            column_name = column['columnName']
            column_type = column['columnType']

            columns_names.append(column_name)
            columns_with_types.update({column_name: column_type})

        check = (
            "\n    - schema:"
            "\n        name: Columns with wrong type"
            "\n        fail:"
            "\n            when wrong column type:"
            f"\n                {columns_with_types}"
            "\n    - schema:"
            "\n        name: Missing any column"
            "\n        fail:"
            f"\n            when required column missing: {columns_names}"
        )

        return check

    def _validity_check(self) -> str:
        check = ""

        for column in self.columns:
            column_name = column['columnName']
            expected_values = column['columnExpectedValues']

            for expected_value in expected_values:
                metric = expected_value['label']
                value = expected_value['value']

                if "valid values" in metric:
                    value = value.split(',')

                check += (
                    f"\n    - invalid_count({column_name}) = 0:"
                    f"\n        name: Check {metric}"
                    f"\n        {metric}: {str(value)}"
                )

        return check

get_quality_checks()

Returns a quality check string based on Data Contract attributes.

The quality checks generated by this functions cover up schema and expected values.

Returns:

Type Description
str

A check string compatible with data platform quality engine.

Source code in blipdataforge/data_contract.py
def get_quality_checks(self) -> str:
    """Returns a quality check string based on Data Contract attributes.

    The quality checks generated by this functions cover up schema and
    expected values.

    Returns:
        A check string compatible with data platform quality engine.
    """
    checks = ""
    checks += f"checks for {self.catalog}.{self.database}.{self.table}:"
    checks += self._schema_check()
    checks += self._validity_check()

    return checks

DataContractRepository

The entrypoint for communication with Data Contract API.

This class implements the communication with Data Contract API endpoints.

Source code in blipdataforge/data_contract.py
class DataContractRepository():
    """The entrypoint for communication with Data Contract API.

    This class implements the communication with Data Contract API endpoints.
    """

    def __init__(self, api_url: str, api_auth_token: str,) -> None:
        """Initializes an instance to communicate with Data Contract API.

        If the environment variable `API_DATACONTRACT_URL` exists, it will
        be used as the base url of the API. If it does not exists, a
        predefined value will be used.

        Args:
            api_url:
                Tue url used to communicate with Data Contract API.
            api_auth_token:
                The token to authenticate on the API.
        """
        self.base_url = api_url
        self.headers = {"x-blip-platform-auth-token": f"{api_auth_token}"}
        self.logger = get_lib_logger(context=get_context())

    def get_contract(
        self,
        catalog: str,
        schema: str,
        table: str
    ) -> Union[DataContract, None]:
        """Request a specific data contract.

        Args:
            catalog:
                Target catalog.
            schema:
                Target schema / database.
            table:
                Target table.

        Returns:
            The object with data contract information.
        """
        url = f"{self.base_url}/{catalog}/{schema}/{table}"

        raw_response = self._request(method="GET", url=url)
        self._log_response(
            catalog=catalog,
            database=schema,
            table=table,
            response=raw_response
        )

        if self._is_valid_response(raw_response):
            json_response = raw_response.json()
            return DataContract(
                product_name=json_response["productName"],
                catalog=json_response["catalog"],
                database=json_response["schema"],
                table=json_response["tableName"],
                columns=json_response["columns"],
                owner_email=json_response["ownerEmail"],
                support_email=json_response["supportEmail"],
                lifecycle_time=json_response.get("lifecycleTimeInDays")
            )
        else:
            return None

    def list_contracts(self) -> Union[list, None]:
        """List all data contracts.

        Returns:
            list
                A list with dicts, each dict containing a resume of a
                data contract.
        """
        raw_response = self._request(method="GET", url=self.base_url)

        if self._is_valid_response(raw_response):
            return raw_response.json()
        else:
            return None

    def _request(self, method: str, url: str) -> requests.Response:
        raw_response = requests.request(
            method=method,
            url=url,
            headers=self.headers
        )

        self.last_response = raw_response

        return raw_response

    def _is_valid_response(self, response: requests.Response) -> bool:
        if 200 <= response.status_code <= 299:
            return True
        else:
            return False

    def _log_response(
        self,
        catalog: str,
        database: str,
        table: str,
        response: requests.Response
    ) -> None:
        table_address = f"{catalog}.{database}.{table}"
        if self._is_valid_response(response):
            self.logger.info(f"Data Contract for {table_address} found!")
        else:
            self.logger.error(f"Data Contract for {table_address} not found!")

__init__(api_url, api_auth_token)

Initializes an instance to communicate with Data Contract API.

If the environment variable API_DATACONTRACT_URL exists, it will be used as the base url of the API. If it does not exists, a predefined value will be used.

Parameters:

Name Type Description Default
api_url str

Tue url used to communicate with Data Contract API.

required
api_auth_token str

The token to authenticate on the API.

required
Source code in blipdataforge/data_contract.py
def __init__(self, api_url: str, api_auth_token: str,) -> None:
    """Initializes an instance to communicate with Data Contract API.

    If the environment variable `API_DATACONTRACT_URL` exists, it will
    be used as the base url of the API. If it does not exists, a
    predefined value will be used.

    Args:
        api_url:
            Tue url used to communicate with Data Contract API.
        api_auth_token:
            The token to authenticate on the API.
    """
    self.base_url = api_url
    self.headers = {"x-blip-platform-auth-token": f"{api_auth_token}"}
    self.logger = get_lib_logger(context=get_context())

get_contract(catalog, schema, table)

Request a specific data contract.

Parameters:

Name Type Description Default
catalog str

Target catalog.

required
schema str

Target schema / database.

required
table str

Target table.

required

Returns:

Type Description
Union[DataContract, None]

The object with data contract information.

Source code in blipdataforge/data_contract.py
def get_contract(
    self,
    catalog: str,
    schema: str,
    table: str
) -> Union[DataContract, None]:
    """Request a specific data contract.

    Args:
        catalog:
            Target catalog.
        schema:
            Target schema / database.
        table:
            Target table.

    Returns:
        The object with data contract information.
    """
    url = f"{self.base_url}/{catalog}/{schema}/{table}"

    raw_response = self._request(method="GET", url=url)
    self._log_response(
        catalog=catalog,
        database=schema,
        table=table,
        response=raw_response
    )

    if self._is_valid_response(raw_response):
        json_response = raw_response.json()
        return DataContract(
            product_name=json_response["productName"],
            catalog=json_response["catalog"],
            database=json_response["schema"],
            table=json_response["tableName"],
            columns=json_response["columns"],
            owner_email=json_response["ownerEmail"],
            support_email=json_response["supportEmail"],
            lifecycle_time=json_response.get("lifecycleTimeInDays")
        )
    else:
        return None

list_contracts()

List all data contracts.

Returns:

Type Description
Union[list, None]

list A list with dicts, each dict containing a resume of a data contract.

Source code in blipdataforge/data_contract.py
def list_contracts(self) -> Union[list, None]:
    """List all data contracts.

    Returns:
        list
            A list with dicts, each dict containing a resume of a
            data contract.
    """
    raw_response = self._request(method="GET", url=self.base_url)

    if self._is_valid_response(raw_response):
        return raw_response.json()
    else:
        return None