Batch data input¶
Batch input manager
BatchInput
¶
Bases: Input
📁 BatchInput: Manages batch input data.
Attributes:
Name | Type | Description |
---|---|---|
input_folder |
str
|
Folder to read input files. |
bucket |
str
|
S3 bucket name. |
s3_folder |
str
|
Folder within the S3 bucket. |
partition_scheme |
Optional[str]
|
Partitioning scheme for S3, e.g., "year/month/day". |
Raises:
Type | Description |
---|---|
FileNotExistError
|
If the file does not exist. |
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_folder |
str
|
Folder to read input files from. |
required |
bucket |
str
|
S3 bucket name. |
required |
s3_folder |
str
|
Folder within the S3 bucket. |
required |
partition_scheme |
Optional[str]
|
Partitioning scheme for S3, e.g., "year/month/day". |
None
|
Usage
Initialize BatchInput¶
Get the input folder¶
Save a Spark DataFrame to the input folder¶
Compose multiple BatchInput instances¶
Copy files from S3 to the input folder¶
Collect metrics¶
__init__(input_folder, bucket, s3_folder, partition_scheme=None)
¶
Initialize a new BatchInput instance.
collect_metrics()
¶
Collect and return metrics, then clear them for future collection.
Returns:
Type | Description |
---|---|
Dict[str, float]
|
Dict[str, float]: Dictionary containing metrics. |
compose(*inputs)
¶
Compose multiple BatchInput instances by merging their input folders.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inputs |
Input
|
Variable number of BatchInput instances. |
()
|
Returns:
Type | Description |
---|---|
Union[bool, str]
|
Union[bool, str]: True if successful, error message otherwise. |
from_kafka(input_topic, kafka_cluster_connection_string, nr_messages=1000, group_id='geniusrise', partition_scheme=None)
¶
Consume messages from a Kafka topic and save them as JSON files in the input folder. Stops consuming after reaching the latest message or the specified number of messages.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_topic |
str
|
Kafka topic to consume data from. |
required |
kafka_cluster_connection_string |
str
|
Connection string for the Kafka cluster. |
required |
nr_messages |
int
|
Number of messages to consume. Defaults to 1000. |
1000
|
group_id |
str
|
Kafka consumer group ID. Defaults to "geniusrise". |
'geniusrise'
|
partition_scheme |
Optional[str]
|
Optional partitioning scheme for Kafka, e.g., "year/month/day". |
None
|
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The path to the folder where the consumed messages are saved as JSON files. |
Raises:
Type | Description |
---|---|
KafkaConnectionError
|
If unable to connect to Kafka. |
Exception
|
If any other error occurs during processing. |
from_s3(bucket=None, s3_folder=None)
¶
Copy contents from a given S3 bucket and location to the input folder.
Raises:
Type | Description |
---|---|
Exception
|
If the input folder is not specified. |
from_spark(df)
¶
Save the contents of a Spark DataFrame to the input folder with optional partitioning.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame
|
The Spark DataFrame to save. |
required |
Raises:
Type | Description |
---|---|
FileNotExistError
|
If the input folder does not exist. |
get()
¶
Get the input folder path.
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The path to the input folder. |
FileNotExistError
¶
Bases: Exception
❌ Custom exception for file not existing.
KafkaConnectionError
¶
Bases: Exception
❌ Custom exception for kafka connection problems.