Streaming data input¶
Streaming input manager
KafkaConnectionError
¶
Bases: Exception
❌ Custom exception for kafka connection problems.
StreamingInput
¶
Bases: Input
📡 StreamingInput: Manages streaming input data from Kafka and other streaming sources.
Attributes:
Name | Type | Description |
---|---|---|
input_topic |
str
|
Kafka topic to consume data from. |
kafka_cluster_connection_string |
str
|
Connection string for the Kafka cluster. |
group_id |
str
|
Kafka consumer group ID. |
consumer |
KafkaConsumer
|
Kafka consumer instance. |
Usage
input = StreamingInput("my_topic", "localhost:9094") for message in input.get(): print(message.value)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_topic |
str
|
Kafka topic to consume data from. |
required |
kafka_cluster_connection_string |
str
|
Connection string for the Kafka cluster. |
required |
group_id |
str
|
Kafka consumer group ID. Defaults to "geniusrise". |
'geniusrise'
|
**kwargs |
Additional keyword arguments for KafkaConsumer. |
{}
|
Raises:
Type | Description |
---|---|
KafkaConnectionError
|
If unable to connect to Kafka. |
Usage
Using get
method to consume from Kafka¶
input = StreamingInput("my_topic", "localhost:9094")
consumer = input.get()
for message in consumer:
print(message.value)
Using from_streamz
method to process streamz DataFrame¶
input = StreamingInput("my_topic", "localhost:9094")
streamz_df = ... # Assume this is a streamz DataFrame
for row in input.from_streamz(streamz_df):
print(row)
Using from_spark
method to process Spark DataFrame¶
input = StreamingInput("my_topic", "localhost:9094")
spark_df = ... # Assume this is a Spark DataFrame
map_func = lambda row: {"key": row.key, "value": row.value}
query_or_rdd = input.from_spark(spark_df, map_func)
Using compose
method to merge multiple StreamingInput instances¶
input1 = StreamingInput("topic1", "localhost:9094")
input2 = StreamingInput("topic2", "localhost:9094")
result = input1.compose(input2)
Using close
method to close the Kafka consumer¶
Using seek
method to seek to a specific offset¶
Using commit
method to manually commit offsets¶
Using collect_metrics
method to collect Kafka metrics¶
__init__(input_topic, kafka_cluster_connection_string, group_id='geniusrise', **kwargs)
¶
💥 Initialize a new streaming input data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_topic |
str
|
Kafka topic to consume data. |
required |
kafka_cluster_connection_string |
str
|
Kafka cluster connection string. |
required |
group_id |
str
|
Kafka consumer group id. Defaults to "geniusrise". |
'geniusrise'
|
close()
¶
🚪 Close the Kafka consumer.
Raises:
Type | Description |
---|---|
Exception
|
If an error occurs while closing the consumer. |
collect_metrics()
¶
📊 Collect metrics related to the Kafka consumer.
Returns:
Type | Description |
---|---|
Dict[str, Union[int, float]]
|
Dict[str, Union[int, float]]: A dictionary containing metrics like latency. |
commit()
¶
✅ Manually commit offsets.
Raises:
Type | Description |
---|---|
Exception
|
If an error occurs while committing offsets. |
compose(*inputs)
¶
Compose multiple StreamingInput instances by merging their iterators.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inputs |
StreamingInput
|
Variable number of StreamingInput instances. |
()
|
Returns:
Type | Description |
---|---|
Union[bool, str]
|
Union[bool, str]: True if successful, error message otherwise. |
Caveat
On merging different topics, other operations such as
from_spark(spark_df, map_func)
¶
Process a Spark DataFrame as a stream, similar to Kafka processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark_df |
DataFrame
|
The Spark DataFrame to process. |
required |
map_func |
Callable[[Row], Any]
|
Function to map each row of the DataFrame. |
required |
Returns:
Type | Description |
---|---|
Union[StreamingQuery, RDD[Any]]
|
Union[StreamingQuery, RDD[Any]]: Returns a StreamingQuery for streaming DataFrames, and an RDD for batch DataFrames. |
Raises:
Type | Description |
---|---|
Exception
|
If an error occurs during processing. |
from_streamz(streamz_df, sentinel=None, timeout=5)
¶
Process a streamz DataFrame as a stream, similar to Kafka processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
streamz_df |
ZDataFrame
|
The streamz DataFrame to process. |
required |
sentinel |
Any
|
The value that, when received, will stop the generator. |
None
|
timeout |
int
|
The time to wait for an item from the queue before raising an exception. |
5
|
Yields:
Name | Type | Description |
---|---|---|
Any |
Any
|
Yields each row as a dictionary. |
get()
¶
📥 Get data from the input topic.
Returns:
Name | Type | Description |
---|---|---|
KafkaConsumer |
KafkaConsumer
|
The Kafka consumer. |
Raises:
Type | Description |
---|---|
Exception
|
If no input source or consumer is specified. |