Skip to content

Bolt

Core Bolt class

Bolt

Bases: Task

Base class for all bolts.

A bolt is a component that consumes streams of data, processes them, and possibly emits new data streams.

__call__(method_name, *args, **kwargs)

Execute a method locally and manage the state.

Parameters:

Name Type Description Default
method_name str

The name of the method to execute.

required
*args

Positional arguments to pass to the method.

()
**kwargs

Keyword arguments to pass to the method. Keyword Arguments: - Additional keyword arguments specific to the method.

{}

Returns:

Name Type Description
Any Any

The result of the method.

__init__(input, output, state, id=None, **kwargs)

The Bolt class is a base class for all bolts in the given context. It inherits from the Task class and provides methods for executing tasks both locally and remotely, as well as managing their state, with state management options including in-memory, Redis, PostgreSQL, and DynamoDB, and input and output data for batch, streaming, stream-to-batch, and batch-to-streaming.

The Bolt class uses the Input, Output and State classes, which are abstract base classes for managing input data, output data and states, respectively. The Input and Output classes each have two subclasses: StreamingInput, BatchInput, StreamingOutput and BatchOutput, which manage streaming and batch input and output data, respectively. The State class is used to get and set state, and it has several subclasses for different types of state managers.

The Bolt class also uses the ECSManager and K8sManager classes in the execute_remote method, which are used to manage tasks on Amazon ECS and Kubernetes, respectively.

Usage
  • Create an instance of the Bolt class by providing an Input object, an Output object and a State object.
  • The Input object specifies the input data for the bolt.
  • The Output object specifies the output data for the bolt.
  • The State object handles the management of the bolt's state.
Example

input = Input(...) output = Output(...) state = State(...) bolt = Bolt(input, output, state)

Parameters:

Name Type Description Default
input Input

The input data.

required
output Output

The output data.

required
state State

The state manager.

required

create(klass, input_type, output_type, state_type, id=None, **kwargs) staticmethod

Create a bolt of a specific type.

This static method is used to create a bolt of a specific type. It takes in an input type, an output type, a state type, and additional keyword arguments for initializing the bolt.

The method creates the input, output, and state manager based on the provided types, and then creates and returns a bolt using these configurations.

Parameters:

Name Type Description Default
klass type

The Bolt class to create.

required
input_type str

The type of input ("batch" or "streaming").

required
output_type str

The type of output ("batch" or "streaming").

required
state_type str

The type of state manager ("none", "redis", "postgres", or "dynamodb").

required
**kwargs

Additional keyword arguments for initializing the bolt.

Keyword Arguments:
    Batch input:
    - input_folder (str): The input folder argument.
    - input_s3_bucket (str): The input bucket argument.
    - input_s3_folder (str): The input S3 folder argument.
    Batch output config:
    - output_folder (str): The output folder argument.
    - output_s3_bucket (str): The output bucket argument.
    - output_s3_folder (str): The output S3 folder argument.
    Streaming input:
    - input_kafka_cluster_connection_string (str): The input Kafka servers argument.
    - input_kafka_topic (str): The input kafka topic argument.
    - input_kafka_consumer_group_id (str): The Kafka consumer group id.
    Streaming output:
    - output_kafka_cluster_connection_string (str): The output Kafka servers argument.
    - output_kafka_topic (str): The output kafka topic argument.
    Stream-to-Batch input:
    - buffer_size (int): Number of messages to buffer.
    - input_kafka_cluster_connection_string (str): The input Kafka servers argument.
    - input_kafka_topic (str): The input kafka topic argument.
    - input_kafka_consumer_group_id (str): The Kafka consumer group id.
    Batch-to-Streaming input:
    - buffer_size (int): Number of messages to buffer.
    - input_folder (str): The input folder argument.
    - input_s3_bucket (str): The input bucket argument.
    - input_s3_folder (str): The input S3 folder argument.
    Stream-to-Batch output:
    - buffer_size (int): Number of messages to buffer.
    - output_folder (str): The output folder argument.
    - output_s3_bucket (str): The output bucket argument.
    - output_s3_folder (str): The output S3 folder argument.
    Redis state manager config:
    - redis_host (str): The Redis host argument.
    - redis_port (str): The Redis port argument.
    - redis_db (str): The Redis database argument.
    Postgres state manager config:
    - postgres_host (str): The PostgreSQL host argument.
    - postgres_port (str): The PostgreSQL port argument.
    - postgres_user (str): The PostgreSQL user argument.
    - postgres_password (str): The PostgreSQL password argument.
    - postgres_database (str): The PostgreSQL database argument.
    - postgres_table (str): The PostgreSQL table argument.
    DynamoDB state manager config:
    - dynamodb_table_name (str): The DynamoDB table name argument.
    - dynamodb_region_name (str): The DynamoDB region name argument.

{}

Returns:

Name Type Description
Bolt Bolt

The created bolt.

Raises:

Type Description
ValueError

If an invalid input type, output type, or state type is provided.