Batch data output¶
Batch output manager
BatchOutput
¶
Bases: Output
📁 BatchOutput: Manages batch output data.
Attributes:
Name | Type | Description |
---|---|---|
output_folder |
str
|
Folder to save output files. |
bucket |
str
|
S3 bucket name. |
s3_folder |
str
|
Folder within the S3 bucket. |
partition_scheme |
Optional[str]
|
Partitioning scheme for S3, e.g., "year/month/day". |
Raises:
Type | Description |
---|---|
FileNotExistError
|
If the output folder does not exist. |
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_folder |
str
|
Folder to save output files. |
required |
bucket |
str
|
S3 bucket name. |
required |
s3_folder |
str
|
Folder within the S3 bucket. |
required |
partition_scheme |
Optional[str]
|
Partitioning scheme for S3, e.g., "year/month/day". |
None
|
Usage
# Initialize the BatchOutput instance
config = BatchOutput("/path/to/output", "my_bucket", "s3/folder", partition_scheme="%Y/%m/%d")
# Save data to a file
config.save({"key": "value"}, "example.json")
# Compose multiple BatchOutput instances
result = config1.compose(config2, config3)
# Convert output to a Spark DataFrame
spark_df = config.to_spark(spark_session)
# Copy files to a remote S3 bucket
config.to_s3()
# Flush the output to S3
config.flush()
# Collect metrics
metrics = config.collect_metrics()
__init__(output_folder, bucket, s3_folder, partition_scheme=None)
¶
Initialize a new batch output data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_folder |
str
|
Folder to save output files. |
required |
bucket |
str
|
S3 bucket name. |
required |
s3_folder |
str
|
Folder within the S3 bucket. |
required |
collect_metrics()
¶
Collect and return metrics, then clear them for future collection.
Returns:
Type | Description |
---|---|
Dict[str, float]
|
Dict[str, float]: Dictionary containing metrics. |
compose(*outputs)
¶
Compose multiple BatchOutput instances by merging their output folders.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
outputs |
Output
|
Variable number of BatchOutput instances. |
()
|
Returns:
Type | Description |
---|---|
Union[bool, str]
|
Union[bool, str]: True if successful, error message otherwise. |
flush()
¶
🔄 Flush the output by copying all files and directories from the output folder to a given S3 bucket and folder.
save(data, filename=None, **kwargs)
¶
💾 Save data to a file in the output folder.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Any
|
The data to save. |
required |
filename |
Optional[str]
|
The filename to use when saving the data to a file. |
None
|
to_kafka(output_topic, kafka_cluster_connection_string)
¶
Produce messages to a Kafka topic from the files in the output folder.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_topic |
str
|
Kafka topic to produce data to. |
required |
kafka_cluster_connection_string |
str
|
Connection string for the Kafka cluster. |
required |
key_serializer |
Optional[str]
|
Serializer for message keys. Defaults to None. |
required |
Raises:
Type | Description |
---|---|
KafkaConnectionError
|
If unable to connect to Kafka. |
Exception
|
If any other error occurs during processing. |
to_s3()
¶
☁️ Recursively copy all files and directories from the output folder to a given S3 bucket and folder.
to_spark(spark)
¶
Get a Spark DataFrame from the output folder.
Returns:
Type | Description |
---|---|
pyspark.sql.DataFrame
|
pyspark.sql.DataFrame: A Spark DataFrame where each row corresponds to a file in the output folder. |
Raises:
Type | Description |
---|---|
FileNotExistError
|
If the output folder does not exist. |
FileNotExistError
¶
Bases: Exception
❌ Custom exception for file not existing.
KafkaConnectionError
¶
Bases: Exception
❌ Custom exception for Kafka connection problems.