Streaming base
Package for defining abstractions and utilities for streaming processing.
This package provides interfaces and base classes to standardize how the streaming processes must be defined in pipelines.
Usage
The transformations parameter in methods like table, view, and
append_flow is a key feature of this package. This parameter allows
users to apply custom transformations to the data during the ingestion
process.
A transformation is expected to be a callable (function) that accepts at least one argument— a DataFrame— and returns a transformed DataFrame. A trasnformation can accept extra parameters as well. For example:
def filter_columns(df: DataFrame, column_name: str) -> DataFrame:
return df.select(column_name)
def enrich_data(df: DataFrame) -> DataFrame:
return df.withColumn("new_column", lit("value"))
To pass the extra parameters to the transformation parameter, like the
column_name in the example, you can use a list of positional arguments or
a dict where the keys are the parameters names and the values are values to
be used in each parameter. The code below shows both ways of passing a
transformation:
# Passing a list of positional arguments
...
transformations = [
enrich_data,
(filter_columns, ["my_column"], {})
]
# Passing a dict
...
transformations = [
enrich_data,
(filter_columns, [], {"colum_name": "my_column"})
]
The transformations are applied sequentially to the data source during the load process.
DataSource
Bases: ABC
Abstract base class for defining data sources.
Attributes:
| Name | Type | Description |
|---|---|---|
context |
Context
|
A shared context instance for managing Spark sessions. |
Source code in blipdataforge/streaming/__init__.py
load()
abstractmethod
Abstract method to load data from the data source.
Returns:
| Type | Description |
|---|---|
DataFrame
|
A DataFrame representing the loaded data. |
StreamingEngine
Bases: ABC
Abstract base class for defining streaming engines.
Source code in blipdataforge/streaming/__init__.py
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 | |
append_flow(target_table, data_source, flow_name, comment, transformations=[])
abstractmethod
Appends data from a data source to a target streaming table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target_table
|
str
|
The name of the target table. |
required |
data_source
|
DataSource
|
The data source providing the data. |
required |
flow_name
|
str
|
The name of the flow. |
required |
comment
|
str
|
A description of the flow. |
required |
transformations
|
List[Union[Callable[..., Any], Tuple[Callable[..., Any], List[Any], Dict[str, Any]]]]
|
List of transformations to apply to the data. Each transformation must be a callable that accepts a DataFrame and returns a DataFrame. It may also include additional arguments as a tuple (function, args, kwargs). |
[]
|
Source code in blipdataforge/streaming/__init__.py
create_streaming_table(name, comment, partition_cols=[], schema=None, table_properties=None)
abstractmethod
Creates a streaming table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The name of the streaming table. |
required |
comment
|
str
|
A description of the table. |
required |
partition_cols
|
List[str]
|
List of columns to use for partitioning the table. |
[]
|
schema
|
Optional[Union[StructType]]
|
The schema of the table using pyspark |
None
|
table_properties
|
Optional[Dict[str, str]]
|
Additional properties for the table. |
None
|
Source code in blipdataforge/streaming/__init__.py
table(name, comment, data_source, temporary=False, partition_cols=[], transformations=[])
abstractmethod
Defines a streaming Table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The name of the table. |
required |
comment
|
str
|
A description of the table. |
required |
data_source
|
DataSource
|
The data source providing the table's data. |
required |
temporary
|
bool
|
Whether the table is temporary. Defaults to False. |
False
|
partition_cols
|
List[str]
|
List of columns to use for partitioning the table. |
[]
|
transformations
|
List[Union[Callable[..., Any], Tuple[Callable[..., Any], List[Any], Dict[str, Any]]]]
|
List of transformations to apply to the data. Each transformation must be a callable that accepts a DataFrame and returns a DataFrame. It may also include additional arguments as a tuple (function, args, kwargs). |
[]
|
Source code in blipdataforge/streaming/__init__.py
view(name, comment, data_source, transformations=[])
abstractmethod
Defines a streaming View.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The name of the view. |
required |
comment
|
str
|
A description of the view. |
required |
data_source
|
DataSource
|
The data source providing the view's data. |
required |
transformations
|
List[Union[Callable[..., Any], Tuple[Callable[..., Any], List[Any], Dict[str, Any]]]]
|
List of transformations to apply to the data. Each transformation must be a callable that accepts a DataFrame and returns a DataFrame. It may also include additional arguments as a tuple (function, args, kwargs). |
[]
|
Source code in blipdataforge/streaming/__init__.py
get_streaming_engine()
Get the streaming engine according to the environment.