Skip to content

Streaming data input

Streaming input manager

KafkaConnectionError

Bases: Exception

❌ Custom exception for kafka connection problems.

StreamingInput

Bases: Input

📡 StreamingInput: Manages streaming input data from Kafka and other streaming sources.

Attributes:

Name Type Description
input_topic str

Kafka topic to consume data from.

kafka_cluster_connection_string str

Connection string for the Kafka cluster.

group_id str

Kafka consumer group ID.

consumer KafkaConsumer

Kafka consumer instance.

Usage

input = StreamingInput("my_topic", "localhost:9094") for message in input.get(): print(message.value)

Parameters:

Name Type Description Default
input_topic str

Kafka topic to consume data from.

required
kafka_cluster_connection_string str

Connection string for the Kafka cluster.

required
group_id str

Kafka consumer group ID. Defaults to "geniusrise".

'geniusrise'
**kwargs

Additional keyword arguments for KafkaConsumer.

{}

Raises:

Type Description
KafkaConnectionError

If unable to connect to Kafka.

Usage
Using get method to consume from Kafka
input = StreamingInput("my_topic", "localhost:9094")
consumer = input.get()
for message in consumer:
    print(message.value)
Using from_streamz method to process streamz DataFrame
input = StreamingInput("my_topic", "localhost:9094")
streamz_df = ...  # Assume this is a streamz DataFrame
for row in input.from_streamz(streamz_df):
    print(row)
Using from_spark method to process Spark DataFrame
input = StreamingInput("my_topic", "localhost:9094")
spark_df = ...  # Assume this is a Spark DataFrame
map_func = lambda row: {"key": row.key, "value": row.value}
query_or_rdd = input.from_spark(spark_df, map_func)
Using compose method to merge multiple StreamingInput instances
input1 = StreamingInput("topic1", "localhost:9094")
input2 = StreamingInput("topic2", "localhost:9094")
result = input1.compose(input2)
Using close method to close the Kafka consumer
input = StreamingInput("my_topic", "localhost:9094")
input.close()
Using seek method to seek to a specific offset
input = StreamingInput("my_topic", "localhost:9094")
input.seek(42)
Using commit method to manually commit offsets
input = StreamingInput("my_topic", "localhost:9094")
input.commit()
Using collect_metrics method to collect Kafka metrics
input = StreamingInput("my_topic", "localhost:9094")
metrics = input.collect_metrics()
print(metrics)

__init__(input_topic, kafka_cluster_connection_string, group_id='geniusrise', **kwargs)

💥 Initialize a new streaming input data.

Parameters:

Name Type Description Default
input_topic str

Kafka topic to consume data.

required
kafka_cluster_connection_string str

Kafka cluster connection string.

required
group_id str

Kafka consumer group id. Defaults to "geniusrise".

'geniusrise'

close()

🚪 Close the Kafka consumer.

Raises:

Type Description
Exception

If an error occurs while closing the consumer.

collect_metrics()

📊 Collect metrics related to the Kafka consumer.

Returns:

Type Description
Dict[str, Union[int, float]]

Dict[str, Union[int, float]]: A dictionary containing metrics like latency.

commit()

✅ Manually commit offsets.

Raises:

Type Description
Exception

If an error occurs while committing offsets.

compose(*inputs)

Compose multiple StreamingInput instances by merging their iterators.

Parameters:

Name Type Description Default
inputs StreamingInput

Variable number of StreamingInput instances.

()

Returns:

Type Description
Union[bool, str]

Union[bool, str]: True if successful, error message otherwise.

Caveat

On merging different topics, other operations such as

from_spark(spark_df, map_func)

Process a Spark DataFrame as a stream, similar to Kafka processing.

Parameters:

Name Type Description Default
spark_df DataFrame

The Spark DataFrame to process.

required
map_func Callable[[Row], Any]

Function to map each row of the DataFrame.

required

Returns:

Type Description
Union[StreamingQuery, RDD[Any]]

Union[StreamingQuery, RDD[Any]]: Returns a StreamingQuery for streaming DataFrames, and an RDD for batch DataFrames.

Raises:

Type Description
Exception

If an error occurs during processing.

from_streamz(streamz_df, sentinel=None, timeout=5)

Process a streamz DataFrame as a stream, similar to Kafka processing.

Parameters:

Name Type Description Default
streamz_df ZDataFrame

The streamz DataFrame to process.

required
sentinel Any

The value that, when received, will stop the generator.

None
timeout int

The time to wait for an item from the queue before raising an exception.

5

Yields:

Name Type Description
Any Any

Yields each row as a dictionary.

get()

📥 Get data from the input topic.

Returns:

Name Type Description
KafkaConsumer KafkaConsumer

The Kafka consumer.

Raises:

Type Description
Exception

If no input source or consumer is specified.