Skip to content

Streaming base

Package for defining abstractions and utilities for streaming processing.

This package provides interfaces and base classes to standardize how the streaming processes must be defined in pipelines.

Usage

The transformations parameter in methods like table, view, and append_flow is a key feature of this package. This parameter allows users to apply custom transformations to the data during the ingestion process.

A transformation is expected to be a callable (function) that accepts at least one argument— a DataFrame— and returns a transformed DataFrame. A trasnformation can accept extra parameters as well. For example:

def filter_columns(df: DataFrame, column_name: str) -> DataFrame:
    return df.select(column_name)

def enrich_data(df: DataFrame) -> DataFrame:
    return df.withColumn("new_column", lit("value"))

To pass the extra parameters to the transformation parameter, like the column_name in the example, you can use a list of positional arguments or a dict where the keys are the parameters names and the values are values to be used in each parameter. The code below shows both ways of passing a transformation:

# Passing a list of positional arguments
...
transformations = [
    enrich_data,
    (filter_columns, ["my_column"], {})
]

 # Passing a dict
 ...
transformations = [
    enrich_data,
    (filter_columns, [], {"colum_name": "my_column"})
]

The transformations are applied sequentially to the data source during the load process.

DataSource

Bases: ABC

Abstract base class for defining data sources.

Attributes:

Name Type Description
context Context

A shared context instance for managing Spark sessions.

Source code in blipdataforge/streaming/__init__.py
class DataSource(ABC):
    """Abstract base class for defining data sources.

    Attributes:
        context: A shared context instance for managing Spark sessions.
    """
    context: Context = get_context()

    @abstractmethod
    def load(self) -> DataFrame:
        """Abstract method to load data from the data source.

        Returns:
            A DataFrame representing the loaded data.
        """
        pass

load() abstractmethod

Abstract method to load data from the data source.

Returns:

Type Description
DataFrame

A DataFrame representing the loaded data.

Source code in blipdataforge/streaming/__init__.py
@abstractmethod
def load(self) -> DataFrame:
    """Abstract method to load data from the data source.

    Returns:
        A DataFrame representing the loaded data.
    """
    pass

StreamingEngine

Bases: ABC

Abstract base class for defining streaming engines.

Source code in blipdataforge/streaming/__init__.py
class StreamingEngine(ABC):
    """Abstract base class for defining streaming engines."""
    @abstractmethod
    def table(
        self,
        name: str,
        comment: str,
        data_source: DataSource,
        temporary: bool = False,
        partition_cols: List[str] = [],
        transformations: List[Union[
            Callable[..., Any],
            Tuple[Callable[..., Any], List[Any], Dict[str, Any]]
        ]] = [],
    ) -> None:
        """Defines a streaming Table.

        Args:
            name:
                The name of the table.
            comment:
                A description of the table.
            data_source:
                The data source providing the table's data.
            temporary:
                Whether the table is temporary. Defaults to False.
            partition_cols:
                List of columns to use for partitioning the table.
            transformations:
                List of transformations to apply to the data. Each
                transformation must be a callable that accepts a DataFrame and
                returns a DataFrame. It may also include additional arguments
                as a tuple (function, args, kwargs).
        """
        pass

    @abstractmethod
    def view(
        self,
        name: str,
        comment: str,
        data_source: DataSource,
        transformations: List[Union[
            Callable[..., Any],
            Tuple[Callable[..., Any], List[Any], Dict[str, Any]]
        ]] = [],
    ) -> None:
        """Defines a streaming View.

        Args:
            name:
                The name of the view.
            comment:
                A description of the view.
            data_source:
                The data source providing the view's data.
            transformations:
                List of transformations to apply to the data. Each
                transformation must be a callable that accepts a DataFrame and
                returns a DataFrame. It may also include additional arguments
                as a tuple (function, args, kwargs).
        """
        pass

    @abstractmethod
    def append_flow(
        self,
        target_table: str,
        data_source: DataSource,
        flow_name: str,
        comment: str,
        transformations: List[Union[
            Callable[..., Any],
            Tuple[Callable[..., Any], List[Any], Dict[str, Any]]
        ]] = [],
    ) -> None:
        """Appends data from a data source to a target streaming table.

        Args:
            target_table:
                The name of the target table.
            data_source:
                The data source providing the data.
            flow_name:
                The name of the flow.
            comment:
                A description of the flow.
            transformations:
                List of transformations to apply to the data. Each
                transformation must be a callable that accepts a DataFrame and
                returns a DataFrame. It may also include additional arguments
                as a tuple (function, args, kwargs).

        """
        pass

    @abstractmethod
    def create_streaming_table(
        self,
        name: str,
        comment: str,
        partition_cols: List[str] = [],
        schema: Optional[Union[StructType]] = None,
        table_properties: Optional[Dict[str, str]] = None,
    ) -> None:
        """Creates a streaming table.

        Args:
            name:
                The name of the streaming table.
            comment:
                A description of the table.
            partition_cols:
                List of columns to use for partitioning the table.
            schema:
                The schema of the table using pyspark `StructType`.
            table_properties:
                Additional properties for the table.
        """
        pass

append_flow(target_table, data_source, flow_name, comment, transformations=[]) abstractmethod

Appends data from a data source to a target streaming table.

Parameters:

Name Type Description Default
target_table str

The name of the target table.

required
data_source DataSource

The data source providing the data.

required
flow_name str

The name of the flow.

required
comment str

A description of the flow.

required
transformations List[Union[Callable[..., Any], Tuple[Callable[..., Any], List[Any], Dict[str, Any]]]]

List of transformations to apply to the data. Each transformation must be a callable that accepts a DataFrame and returns a DataFrame. It may also include additional arguments as a tuple (function, args, kwargs).

[]
Source code in blipdataforge/streaming/__init__.py
@abstractmethod
def append_flow(
    self,
    target_table: str,
    data_source: DataSource,
    flow_name: str,
    comment: str,
    transformations: List[Union[
        Callable[..., Any],
        Tuple[Callable[..., Any], List[Any], Dict[str, Any]]
    ]] = [],
) -> None:
    """Appends data from a data source to a target streaming table.

    Args:
        target_table:
            The name of the target table.
        data_source:
            The data source providing the data.
        flow_name:
            The name of the flow.
        comment:
            A description of the flow.
        transformations:
            List of transformations to apply to the data. Each
            transformation must be a callable that accepts a DataFrame and
            returns a DataFrame. It may also include additional arguments
            as a tuple (function, args, kwargs).

    """
    pass

create_streaming_table(name, comment, partition_cols=[], schema=None, table_properties=None) abstractmethod

Creates a streaming table.

Parameters:

Name Type Description Default
name str

The name of the streaming table.

required
comment str

A description of the table.

required
partition_cols List[str]

List of columns to use for partitioning the table.

[]
schema Optional[Union[StructType]]

The schema of the table using pyspark StructType.

None
table_properties Optional[Dict[str, str]]

Additional properties for the table.

None
Source code in blipdataforge/streaming/__init__.py
@abstractmethod
def create_streaming_table(
    self,
    name: str,
    comment: str,
    partition_cols: List[str] = [],
    schema: Optional[Union[StructType]] = None,
    table_properties: Optional[Dict[str, str]] = None,
) -> None:
    """Creates a streaming table.

    Args:
        name:
            The name of the streaming table.
        comment:
            A description of the table.
        partition_cols:
            List of columns to use for partitioning the table.
        schema:
            The schema of the table using pyspark `StructType`.
        table_properties:
            Additional properties for the table.
    """
    pass

table(name, comment, data_source, temporary=False, partition_cols=[], transformations=[]) abstractmethod

Defines a streaming Table.

Parameters:

Name Type Description Default
name str

The name of the table.

required
comment str

A description of the table.

required
data_source DataSource

The data source providing the table's data.

required
temporary bool

Whether the table is temporary. Defaults to False.

False
partition_cols List[str]

List of columns to use for partitioning the table.

[]
transformations List[Union[Callable[..., Any], Tuple[Callable[..., Any], List[Any], Dict[str, Any]]]]

List of transformations to apply to the data. Each transformation must be a callable that accepts a DataFrame and returns a DataFrame. It may also include additional arguments as a tuple (function, args, kwargs).

[]
Source code in blipdataforge/streaming/__init__.py
@abstractmethod
def table(
    self,
    name: str,
    comment: str,
    data_source: DataSource,
    temporary: bool = False,
    partition_cols: List[str] = [],
    transformations: List[Union[
        Callable[..., Any],
        Tuple[Callable[..., Any], List[Any], Dict[str, Any]]
    ]] = [],
) -> None:
    """Defines a streaming Table.

    Args:
        name:
            The name of the table.
        comment:
            A description of the table.
        data_source:
            The data source providing the table's data.
        temporary:
            Whether the table is temporary. Defaults to False.
        partition_cols:
            List of columns to use for partitioning the table.
        transformations:
            List of transformations to apply to the data. Each
            transformation must be a callable that accepts a DataFrame and
            returns a DataFrame. It may also include additional arguments
            as a tuple (function, args, kwargs).
    """
    pass

view(name, comment, data_source, transformations=[]) abstractmethod

Defines a streaming View.

Parameters:

Name Type Description Default
name str

The name of the view.

required
comment str

A description of the view.

required
data_source DataSource

The data source providing the view's data.

required
transformations List[Union[Callable[..., Any], Tuple[Callable[..., Any], List[Any], Dict[str, Any]]]]

List of transformations to apply to the data. Each transformation must be a callable that accepts a DataFrame and returns a DataFrame. It may also include additional arguments as a tuple (function, args, kwargs).

[]
Source code in blipdataforge/streaming/__init__.py
@abstractmethod
def view(
    self,
    name: str,
    comment: str,
    data_source: DataSource,
    transformations: List[Union[
        Callable[..., Any],
        Tuple[Callable[..., Any], List[Any], Dict[str, Any]]
    ]] = [],
) -> None:
    """Defines a streaming View.

    Args:
        name:
            The name of the view.
        comment:
            A description of the view.
        data_source:
            The data source providing the view's data.
        transformations:
            List of transformations to apply to the data. Each
            transformation must be a callable that accepts a DataFrame and
            returns a DataFrame. It may also include additional arguments
            as a tuple (function, args, kwargs).
    """
    pass

get_streaming_engine()

Get the streaming engine according to the environment.

Source code in blipdataforge/streaming/__init__.py
def get_streaming_engine() -> Union[StreamingEngine, None]:
    """Get the streaming engine according to the environment."""
    if is_running_on_databricks():
        try:
            from .dlt import DeltaLiveTablesEngine
            return DeltaLiveTablesEngine()
        except Exception:
            raise ImportError(
                "Looks like this cluster does not support Delta Live Tables. "
                "Please use a different cluster."
            )

    return None