Skip to content

Batch data input

Batch input manager

BatchInput

Bases: Input

📁 BatchInput: Manages batch input data.

Attributes:

Name Type Description
input_folder str

Folder to read input files.

bucket str

S3 bucket name.

s3_folder str

Folder within the S3 bucket.

partition_scheme Optional[str]

Partitioning scheme for S3, e.g., "year/month/day".

Raises:

Type Description
FileNotExistError

If the file does not exist.

Parameters:

Name Type Description Default
input_folder str

Folder to read input files from.

required
bucket str

S3 bucket name.

required
s3_folder str

Folder within the S3 bucket.

required
partition_scheme Optional[str]

Partitioning scheme for S3, e.g., "year/month/day".

None
Usage

Initialize BatchInput

input = BatchInput("/path/to/input", "my_bucket", "s3/folder")
Get the input folder
folder = input.get()
Save a Spark DataFrame to the input folder
input.from_spark(my_dataframe)
Compose multiple BatchInput instances
composed = input.compose(input1, input2)
Copy files from S3 to the input folder
input.from_s3()

Collect metrics

metrics = input.collect_metrics()

__init__(input_folder, bucket, s3_folder, partition_scheme=None)

Initialize a new BatchInput instance.

collect_metrics()

Collect and return metrics, then clear them for future collection.

Returns:

Type Description
Dict[str, float]

Dict[str, float]: Dictionary containing metrics.

compose(*inputs)

Compose multiple BatchInput instances by merging their input folders.

Parameters:

Name Type Description Default
inputs Input

Variable number of BatchInput instances.

()

Returns:

Type Description
Union[bool, str]

Union[bool, str]: True if successful, error message otherwise.

from_kafka(input_topic, kafka_cluster_connection_string, nr_messages=1000, group_id='geniusrise', partition_scheme=None)

Consume messages from a Kafka topic and save them as JSON files in the input folder. Stops consuming after reaching the latest message or the specified number of messages.

Parameters:

Name Type Description Default
input_topic str

Kafka topic to consume data from.

required
kafka_cluster_connection_string str

Connection string for the Kafka cluster.

required
nr_messages int

Number of messages to consume. Defaults to 1000.

1000
group_id str

Kafka consumer group ID. Defaults to "geniusrise".

'geniusrise'
partition_scheme Optional[str]

Optional partitioning scheme for Kafka, e.g., "year/month/day".

None

Returns:

Name Type Description
str str

The path to the folder where the consumed messages are saved as JSON files.

Raises:

Type Description
KafkaConnectionError

If unable to connect to Kafka.

Exception

If any other error occurs during processing.

from_s3(bucket=None, s3_folder=None)

Copy contents from a given S3 bucket and location to the input folder.

Raises:

Type Description
Exception

If the input folder is not specified.

from_spark(df)

Save the contents of a Spark DataFrame to the input folder with optional partitioning.

Parameters:

Name Type Description Default
df DataFrame

The Spark DataFrame to save.

required

Raises:

Type Description
FileNotExistError

If the input folder does not exist.

get()

Get the input folder path.

Returns:

Name Type Description
str str

The path to the input folder.

FileNotExistError

Bases: Exception

❌ Custom exception for file not existing.

KafkaConnectionError

Bases: Exception

❌ Custom exception for kafka connection problems.