Skip to content

Streaming sources

Module for defining data sources for streaming and batch data.

This module provides data source implementations, such as EventHub, StreamingTable, and Table, to facilitate users usage on streaming.

EventHubSource dataclass

Bases: DataSource

Represents a data source for consuming messages from Azure Event Hub.

Attributes:

Name Type Description
topic str

The name of the Event Hub topic to subscribe to.

namespace str

The Event Hub namespace.

connection_string str

The connection string for authenticating with Event Hub.

context Context

The Spark context for managing the Spark session.

starting_offsets str

The starting offsets for consuming messages. Defaults to "latest".

max_offsets_per_trigger int

The maximum number of offsets to process per trigger. Defaults to 72000.

schema Optional[StructType]

The schema of the data, if available. If the schema is not set, the target able will be created using the raw schema from EventHub. In both cases, a column named 'value' containing the raw data from topic will be added to the target table. This parameter defaults to None.

Source code in blipdataforge/streaming/sources.py
@dataclass
class EventHubSource(DataSource):
    """Represents a data source for consuming messages from Azure Event Hub.

    Attributes:
        topic:
            The name of the Event Hub topic to subscribe to.
        namespace:
            The Event Hub namespace.
        connection_string:
            The connection string for authenticating with Event Hub.
        context:
            The Spark context for managing the Spark session.
        starting_offsets:
            The starting offsets for consuming messages. Defaults to "latest".
        max_offsets_per_trigger:
            The maximum number of offsets to process per trigger.
            Defaults to 72000.
        schema:
            The schema of the data, if available. If the schema is not set, the
            target able will be created using the raw schema from EventHub. In
            both cases, a column named 'value' containing the raw data from
            topic will be added to the target table. This parameter defaults to
            None.
    """
    topic: str
    namespace: str
    connection_string: str
    starting_offsets: str = "latest"
    max_offsets_per_trigger: int = 72000
    schema: Optional[StructType] = None

    @property
    def server(self) -> str:
        """Returns the Event Hub server address."""
        return f"{self.namespace}.servicebus.windows.net:9093"

    def load(self) -> DataFrame:
        """Loads data from the Event Hub as a streaming DataFrame.

        Returns:
            A DataFrame representing the data stream.
        """
        df = (
            self.context.spark_session.readStream.format("kafka")
            .option("kafka.bootstrap.servers", self.server)
            .option("kafka.sasl.mechanism", "PLAIN")
            .option("kafka.security.protocol", "SASL_SSL")
            .option("kafka.sasl.jaas.config", (
                'kafkashaded.org.apache.kafka.common.security.plain'
                '.PlainLoginModule required username="$ConnectionString" '
                f'password="{self.connection_string}"' + ";"
            ))
            .option("kafka.ssl.endpoint.identification.algorithm", "https")
            .option("subscribe", self.topic)
            .option("maxOffsetsPerTrigger", self.max_offsets_per_trigger)
            .option("startingOffsets", self.starting_offsets)
            .load()
        )
        if self.schema:
            df = (
                df
                .select(
                    from_json(col("value").cast("string"), self.schema)
                    .alias("value"))
                .selectExpr("value.*", "value")
            )

        return df

server property

Returns the Event Hub server address.

load()

Loads data from the Event Hub as a streaming DataFrame.

Returns:

Type Description
DataFrame

A DataFrame representing the data stream.

Source code in blipdataforge/streaming/sources.py
def load(self) -> DataFrame:
    """Loads data from the Event Hub as a streaming DataFrame.

    Returns:
        A DataFrame representing the data stream.
    """
    df = (
        self.context.spark_session.readStream.format("kafka")
        .option("kafka.bootstrap.servers", self.server)
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.jaas.config", (
            'kafkashaded.org.apache.kafka.common.security.plain'
            '.PlainLoginModule required username="$ConnectionString" '
            f'password="{self.connection_string}"' + ";"
        ))
        .option("kafka.ssl.endpoint.identification.algorithm", "https")
        .option("subscribe", self.topic)
        .option("maxOffsetsPerTrigger", self.max_offsets_per_trigger)
        .option("startingOffsets", self.starting_offsets)
        .load()
    )
    if self.schema:
        df = (
            df
            .select(
                from_json(col("value").cast("string"), self.schema)
                .alias("value"))
            .selectExpr("value.*", "value")
        )

    return df

StreamingTableSource dataclass

Bases: DataSource

Represents a streaming data source from a Delta Live Table.

Attributes:

Name Type Description
name str

The name of the table.

database str

The database where the table resides. Defaults to an empty string.

catalog str

The catalog where the table resides. Defaults to an empty string.

Source code in blipdataforge/streaming/sources.py
@dataclass
class StreamingTableSource(DataSource):
    """Represents a streaming data source from a Delta Live Table.

    Attributes:
        name:
            The name of the table.
        database:
            The database where the table resides. Defaults to an empty string.
        catalog:
            The catalog where the table resides. Defaults to an empty string.
    """
    name: str
    database: str = ""
    catalog: str = ""

    def load(self) -> DataFrame:
        """Loads data from the streaming table as a DataFrame.

        Returns:
            A DataFrame representing the streaming data.

        Raises:
            ValueError: If the catalog is specified but the database is not.
        """
        if self.catalog and not self.database:
            raise ValueError("Database must be set when catalog is specified.")
        if not self.catalog and not self.database:
            self.database = "LIVE"

        target_table = (
            f"{self.catalog}.{self.database}.{self.name}"
            if self.catalog else f"{self.database}.{self.name}")

        return self.context.spark_session.readStream.table(target_table)

load()

Loads data from the streaming table as a DataFrame.

Returns:

Type Description
DataFrame

A DataFrame representing the streaming data.

Raises:

Type Description
ValueError

If the catalog is specified but the database is not.

Source code in blipdataforge/streaming/sources.py
def load(self) -> DataFrame:
    """Loads data from the streaming table as a DataFrame.

    Returns:
        A DataFrame representing the streaming data.

    Raises:
        ValueError: If the catalog is specified but the database is not.
    """
    if self.catalog and not self.database:
        raise ValueError("Database must be set when catalog is specified.")
    if not self.catalog and not self.database:
        self.database = "LIVE"

    target_table = (
        f"{self.catalog}.{self.database}.{self.name}"
        if self.catalog else f"{self.database}.{self.name}")

    return self.context.spark_session.readStream.table(target_table)

TableSource dataclass

Bases: DataSource

Represents a batch data source from a Delta table.

Attributes:

Name Type Description
name str

The name of the table.

database str

The database where the table resides.

catalog str

The catalog where the table resides.

Source code in blipdataforge/streaming/sources.py
@dataclass
class TableSource(DataSource):
    """Represents a batch data source from a Delta table.

    Attributes:
        name: The name of the table.
        database: The database where the table resides.
        catalog: The catalog where the table resides.
    """
    name: str
    database: str
    catalog: str

    def load(self) -> DataFrame:
        """Loads data from the table as a DataFrame.

        Returns:
            A DataFrame representing the table data.
        """
        return self.context.spark_session.read.table(
            f"{self.catalog}.{self.database}.{self.name}"
        )

load()

Loads data from the table as a DataFrame.

Returns:

Type Description
DataFrame

A DataFrame representing the table data.

Source code in blipdataforge/streaming/sources.py
def load(self) -> DataFrame:
    """Loads data from the table as a DataFrame.

    Returns:
        A DataFrame representing the table data.
    """
    return self.context.spark_session.read.table(
        f"{self.catalog}.{self.database}.{self.name}"
    )