Streaming data output¶
Streaming output manager
StreamingOutput
¶
Bases: Output
📡 StreamingOutput: Manages streaming output data.
Attributes:
Name | Type | Description |
---|---|---|
output_topic |
str
|
Kafka topic to ingest data. |
producer |
KafkaProducer
|
Kafka producer for ingesting data. |
Usage:
config = StreamingOutput("my_topic", "localhost:9094")
config.save({"key": "value"}, "ignored_filename")
config.flush()
Note: - Ensure the Kafka cluster is running and accessible.
__init__(output_topic, kafka_servers)
¶
Initialize a new streaming output data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_topic |
str
|
Kafka topic to ingest data. |
required |
kafka_servers |
str
|
Kafka bootstrap servers. |
required |
close()
¶
🚪 Close the Kafka producer.
Raises:
Type | Description |
---|---|
Exception
|
If no Kafka producer is available. |
flush()
¶
🔄 Flush the output by flushing the Kafka producer.
Raises:
Type | Description |
---|---|
Exception
|
If no Kafka producer is available. |
partition_available(partition)
¶
🧐 Check if a partition is available in the Kafka topic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
partition |
int
|
The partition to check. |
required |
Returns:
Name | Type | Description |
---|---|---|
bool |
bool
|
True if the partition is available, False otherwise. |
Raises:
Type | Description |
---|---|
Exception
|
If no Kafka producer is available. |
save(data, **kwargs)
¶
📤 Ingest data into the Kafka topic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Any
|
The data to ingest. |
required |
filename |
str
|
This argument is ignored for streaming outputs. |
required |
Raises:
Type | Description |
---|---|
Exception
|
If no Kafka producer is available or an error occurs. |
save_to_partition(value, partition)
¶
🎯 Send a message to a specific partition in the Kafka topic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any
|
The value of the message. |
required |
partition |
int
|
The partition to send the message to. |
required |
Raises:
Type | Description |
---|---|
Exception
|
If no Kafka producer is available or an error occurs. |