Skip to content

Streaming data output

Streaming output manager

StreamingOutput

Bases: Output

📡 StreamingOutput: Manages streaming output data.

Attributes:

Name Type Description
output_topic str

Kafka topic to ingest data.

producer KafkaProducer

Kafka producer for ingesting data.

Usage:

config = StreamingOutput("my_topic", "localhost:9094")
config.save({"key": "value"}, "ignored_filename")
config.flush()

Note: - Ensure the Kafka cluster is running and accessible.

__init__(output_topic, kafka_servers)

Initialize a new streaming output data.

Parameters:

Name Type Description Default
output_topic str

Kafka topic to ingest data.

required
kafka_servers str

Kafka bootstrap servers.

required

close()

🚪 Close the Kafka producer.

Raises:

Type Description
Exception

If no Kafka producer is available.

flush()

🔄 Flush the output by flushing the Kafka producer.

Raises:

Type Description
Exception

If no Kafka producer is available.

partition_available(partition)

🧐 Check if a partition is available in the Kafka topic.

Parameters:

Name Type Description Default
partition int

The partition to check.

required

Returns:

Name Type Description
bool bool

True if the partition is available, False otherwise.

Raises:

Type Description
Exception

If no Kafka producer is available.

save(data, **kwargs)

📤 Ingest data into the Kafka topic.

Parameters:

Name Type Description Default
data Any

The data to ingest.

required
filename str

This argument is ignored for streaming outputs.

required

Raises:

Type Description
Exception

If no Kafka producer is available or an error occurs.

save_to_partition(value, partition)

🎯 Send a message to a specific partition in the Kafka topic.

Parameters:

Name Type Description Default
value Any

The value of the message.

required
partition int

The partition to send the message to.

required

Raises:

Type Description
Exception

If no Kafka producer is available or an error occurs.