Skip to content

Batch data output

Batch output manager

BatchOutput

Bases: Output

📁 BatchOutput: Manages batch output data.

Attributes:

Name Type Description
output_folder str

Folder to save output files.

bucket str

S3 bucket name.

s3_folder str

Folder within the S3 bucket.

partition_scheme Optional[str]

Partitioning scheme for S3, e.g., "year/month/day".

Raises:

Type Description
FileNotExistError

If the output folder does not exist.

Parameters:

Name Type Description Default
output_folder str

Folder to save output files.

required
bucket str

S3 bucket name.

required
s3_folder str

Folder within the S3 bucket.

required
partition_scheme Optional[str]

Partitioning scheme for S3, e.g., "year/month/day".

None
Usage
# Initialize the BatchOutput instance
config = BatchOutput("/path/to/output", "my_bucket", "s3/folder", partition_scheme="%Y/%m/%d")

# Save data to a file
config.save({"key": "value"}, "example.json")

# Compose multiple BatchOutput instances
result = config1.compose(config2, config3)

# Convert output to a Spark DataFrame
spark_df = config.to_spark(spark_session)

# Copy files to a remote S3 bucket
config.to_s3()

# Flush the output to S3
config.flush()

# Collect metrics
metrics = config.collect_metrics()

__init__(output_folder, bucket, s3_folder, partition_scheme=None)

Initialize a new batch output data.

Parameters:

Name Type Description Default
output_folder str

Folder to save output files.

required
bucket str

S3 bucket name.

required
s3_folder str

Folder within the S3 bucket.

required

collect_metrics()

Collect and return metrics, then clear them for future collection.

Returns:

Type Description
Dict[str, float]

Dict[str, float]: Dictionary containing metrics.

compose(*outputs)

Compose multiple BatchOutput instances by merging their output folders.

Parameters:

Name Type Description Default
outputs Output

Variable number of BatchOutput instances.

()

Returns:

Type Description
Union[bool, str]

Union[bool, str]: True if successful, error message otherwise.

flush()

🔄 Flush the output by copying all files and directories from the output folder to a given S3 bucket and folder.

save(data, filename=None, **kwargs)

💾 Save data to a file in the output folder.

Parameters:

Name Type Description Default
data Any

The data to save.

required
filename Optional[str]

The filename to use when saving the data to a file.

None

to_kafka(output_topic, kafka_cluster_connection_string)

Produce messages to a Kafka topic from the files in the output folder.

Parameters:

Name Type Description Default
output_topic str

Kafka topic to produce data to.

required
kafka_cluster_connection_string str

Connection string for the Kafka cluster.

required
key_serializer Optional[str]

Serializer for message keys. Defaults to None.

required

Raises:

Type Description
KafkaConnectionError

If unable to connect to Kafka.

Exception

If any other error occurs during processing.

to_s3()

☁️ Recursively copy all files and directories from the output folder to a given S3 bucket and folder.

to_spark(spark)

Get a Spark DataFrame from the output folder.

Returns:

Type Description
pyspark.sql.DataFrame

pyspark.sql.DataFrame: A Spark DataFrame where each row corresponds to a file in the output folder.

Raises:

Type Description
FileNotExistError

If the output folder does not exist.

FileNotExistError

Bases: Exception

❌ Custom exception for file not existing.

KafkaConnectionError

Bases: Exception

❌ Custom exception for Kafka connection problems.