Skip to content


The main bolt controller


Class for managing bolts end-to-end from the command line.


Initialize BoltCtl with a DiscoveredBolt object.


Name Type Description Default
discovered_bolt DiscoveredBolt

DiscoveredBolt object used to create and manage bolts.


create_bolt(input_type, output_type, state_type, id, **kwargs)

Create a bolt of a specific type.


Name Type Description Default
input_type str

The type of input ("batch" or "streaming").

output_type str

The type of output ("batch" or "streaming").

state_type str

The type of state manager ("none", "redis", "postgres", or "dynamodb").


Additional keyword arguments for initializing the bolt.

Keyword Arguments:
    Batch input:
    - input_folder (str): The input folder argument.
    - input_s3_bucket (str): The input bucket argument.
    - input_s3_folder (str): The input S3 folder argument.
    Batch output:
    - output_folder (str): The output folder argument.
    - output_s3_bucket (str): The output bucket argument.
    - output_s3_folder (str): The output S3 folder argument.
    Streaming input:
    - input_kafka_cluster_connection_string (str): The input Kafka servers argument.
    - input_kafka_topic (str): The input kafka topic argument.
    - input_kafka_consumer_group_id (str): The Kafka consumer group id.
    Streaming output:
    - output_kafka_cluster_connection_string (str): The output Kafka servers argument.
    - output_kafka_topic (str): The output kafka topic argument.
    Redis state manager config:
    - redis_host (str): The Redis host argument.
    - redis_port (str): The Redis port argument.
    - redis_db (str): The Redis database argument.
    Postgres state manager config:
    - postgres_host (str): The PostgreSQL host argument.
    - postgres_port (str): The PostgreSQL port argument.
    - postgres_user (str): The PostgreSQL user argument.
    - postgres_password (str): The PostgreSQL password argument.
    - postgres_database (str): The PostgreSQL database argument.
    - postgres_table (str): The PostgreSQL table argument.
    DynamoDB state manager config:
    - dynamodb_table_name (str): The DynamoDB table name argument.
    - dynamodb_region_name (str): The DynamoDB region name argument.



Name Type Description
Bolt Bolt

The created bolt.


Add arguments to the command-line parser for managing the bolt.


Name Type Description Default
parser argparse.ArgumentParser

Command-line parser.



Deploy a spout of a specific type.


Name Type Description Default

Additional keyword arguments for initializing the spout.

Keyword Arguments:
    Batch input:
    - input_folder (str): The input folder argument.
    - input_s3_bucket (str): The input bucket argument.
    - input_s3_folder (str): The input S3 folder argument.
    Batch outupt:
    - output_folder (str): The output folder argument.
    - output_s3_bucket (str): The output bucket argument.
    - output_s3_folder (str): The output S3 folder argument.
    Streaming input:
    - input_kafka_cluster_connection_string (str): The input Kafka servers argument.
    - input_kafka_topic (str): The input kafka topic argument.
    - input_kafka_consumer_group_id (str): The Kafka consumer group id.
    Streaming output:
    - output_kafka_cluster_connection_string (str): The output Kafka servers argument.
    - output_kafka_topic (str): The output kafka topic argument.
    Redis state manager config:
    - redis_host (str): The host address for the Redis server.
    - redis_port (int): The port number for the Redis server.
    - redis_db (int): The Redis database to be used.
    Postgres state manager config:
    - postgres_host (str): The host address for the PostgreSQL server.
    - postgres_port (int): The port number for the PostgreSQL server.
    - postgres_user (str): The username for the PostgreSQL server.
    - postgres_password (str): The password for the PostgreSQL server.
    - postgres_database (str): The PostgreSQL database to be used.
    - postgres_table (str): The PostgreSQL table to be used.
    DynamoDB state manager config:
    - dynamodb_table_name (str): The name of the DynamoDB table.
    - dynamodb_region_name (str): The AWS region for DynamoDB.
    - k8s_kind (str): Kind opf kubernetes resource to be deployed as, choices are "deployment", "service", "job", "cron_job"
    - k8s_name (str): Name of the Kubernetes resource.
    - k8s_image (str): Docker image for the Kubernetes resource.
    - k8s_replicas (int): Number of replicas.
    - k8s_env_vars (json): Environment variables as a JSON string.
    - k8s_cpu (str): CPU requirements.
    - k8s_memory (str): Memory requirements.
    - k8s_storage (str): Storage requirements.
    - k8s_gpu (str): GPU requirements.
    - k8s_kube_config_path (str): Name of the Kubernetes cluster local config.
    - k8s_api_key (str): GPU requirements.
    - k8s_api_host (str): GPU requirements.
    - k8s_verify_ssl (str): GPU requirements.
    - k8s_ssl_ca_cert (str): GPU requirements.
    - k8s_cluster_name (str): Name of the Kubernetes cluster.
    - k8s_context_name (str): Name of the kubeconfig context.
    - k8s_namespace (str): Kubernetes namespace.", default="default
    - k8s_labels (json): Labels for Kubernetes resources, as a JSON string.
    - k8s_annotations (json): Annotations for Kubernetes resources, as a JSON string.
    - k8s_port (int): Port to run the spout on as a service.
    - k8s_target_port (int): Port to expose the spout on as a service.
    - k8s_schedule (str): Schedule to run the spout on as a cron job.


execute_bolt(bolt, method_name, *args, **kwargs)

Execute a method of a bolt.


Name Type Description Default
bolt Bolt

The bolt to execute.

method_name str

The name of the method to execute.


Positional arguments to pass to the method.


Keyword arguments to pass to the method.



Name Type Description

The result of the method.


Run the command-line interface.


Name Type Description Default
args argparse.Namespace

Parsed command-line arguments.
