Skip to content

Boltctl

The main bolt controller

BoltCtl

Class for managing bolts end-to-end from the command line.

__init__(discovered_bolt)

Initialize BoltCtl with a DiscoveredBolt object.

Parameters:

Name Type Description Default
discovered_bolt DiscoveredBolt

DiscoveredBolt object used to create and manage bolts.

required

create_bolt(input_type, output_type, state_type, id, **kwargs)

Create a bolt of a specific type.

Parameters:

Name Type Description Default
input_type str

The type of input ("batch" or "streaming").

required
output_type str

The type of output ("batch" or "streaming").

required
state_type str

The type of state manager ("none", "redis", "postgres", or "dynamodb").

required
**kwargs

Additional keyword arguments for initializing the bolt.

Keyword Arguments:
    Batch input:
    - input_folder (str): The input folder argument.
    - input_s3_bucket (str): The input bucket argument.
    - input_s3_folder (str): The input S3 folder argument.
    Batch output:
    - output_folder (str): The output folder argument.
    - output_s3_bucket (str): The output bucket argument.
    - output_s3_folder (str): The output S3 folder argument.
    Streaming input:
    - input_kafka_cluster_connection_string (str): The input Kafka servers argument.
    - input_kafka_topic (str): The input kafka topic argument.
    - input_kafka_consumer_group_id (str): The Kafka consumer group id.
    Streaming output:
    - output_kafka_cluster_connection_string (str): The output Kafka servers argument.
    - output_kafka_topic (str): The output kafka topic argument.
    Redis state manager config:
    - redis_host (str): The Redis host argument.
    - redis_port (str): The Redis port argument.
    - redis_db (str): The Redis database argument.
    Postgres state manager config:
    - postgres_host (str): The PostgreSQL host argument.
    - postgres_port (str): The PostgreSQL port argument.
    - postgres_user (str): The PostgreSQL user argument.
    - postgres_password (str): The PostgreSQL password argument.
    - postgres_database (str): The PostgreSQL database argument.
    - postgres_table (str): The PostgreSQL table argument.
    DynamoDB state manager config:
    - dynamodb_table_name (str): The DynamoDB table name argument.
    - dynamodb_region_name (str): The DynamoDB region name argument.

{}

Returns:

Name Type Description
Bolt Bolt

The created bolt.

create_parser(parser)

Add arguments to the command-line parser for managing the bolt.

Parameters:

Name Type Description Default
parser argparse.ArgumentParser

Command-line parser.

required

deploy_bolt(args)

Deploy a spout of a specific type.

Parameters:

Name Type Description Default
**kwargs

Additional keyword arguments for initializing the spout.

Keyword Arguments:
    Batch input:
    - input_folder (str): The input folder argument.
    - input_s3_bucket (str): The input bucket argument.
    - input_s3_folder (str): The input S3 folder argument.
    Batch outupt:
    - output_folder (str): The output folder argument.
    - output_s3_bucket (str): The output bucket argument.
    - output_s3_folder (str): The output S3 folder argument.
    Streaming input:
    - input_kafka_cluster_connection_string (str): The input Kafka servers argument.
    - input_kafka_topic (str): The input kafka topic argument.
    - input_kafka_consumer_group_id (str): The Kafka consumer group id.
    Streaming output:
    - output_kafka_cluster_connection_string (str): The output Kafka servers argument.
    - output_kafka_topic (str): The output kafka topic argument.
    Redis state manager config:
    - redis_host (str): The host address for the Redis server.
    - redis_port (int): The port number for the Redis server.
    - redis_db (int): The Redis database to be used.
    Postgres state manager config:
    - postgres_host (str): The host address for the PostgreSQL server.
    - postgres_port (int): The port number for the PostgreSQL server.
    - postgres_user (str): The username for the PostgreSQL server.
    - postgres_password (str): The password for the PostgreSQL server.
    - postgres_database (str): The PostgreSQL database to be used.
    - postgres_table (str): The PostgreSQL table to be used.
    DynamoDB state manager config:
    - dynamodb_table_name (str): The name of the DynamoDB table.
    - dynamodb_region_name (str): The AWS region for DynamoDB.
    Deployment
    - k8s_kind (str): Kind opf kubernetes resource to be deployed as, choices are "deployment", "service", "job", "cron_job"
    - k8s_name (str): Name of the Kubernetes resource.
    - k8s_image (str): Docker image for the Kubernetes resource.
    - k8s_replicas (int): Number of replicas.
    - k8s_env_vars (json): Environment variables as a JSON string.
    - k8s_cpu (str): CPU requirements.
    - k8s_memory (str): Memory requirements.
    - k8s_storage (str): Storage requirements.
    - k8s_gpu (str): GPU requirements.
    - k8s_kube_config_path (str): Name of the Kubernetes cluster local config.
    - k8s_api_key (str): GPU requirements.
    - k8s_api_host (str): GPU requirements.
    - k8s_verify_ssl (str): GPU requirements.
    - k8s_ssl_ca_cert (str): GPU requirements.
    - k8s_cluster_name (str): Name of the Kubernetes cluster.
    - k8s_context_name (str): Name of the kubeconfig context.
    - k8s_namespace (str): Kubernetes namespace.", default="default
    - k8s_labels (json): Labels for Kubernetes resources, as a JSON string.
    - k8s_annotations (json): Annotations for Kubernetes resources, as a JSON string.
    - k8s_port (int): Port to run the spout on as a service.
    - k8s_target_port (int): Port to expose the spout on as a service.
    - k8s_schedule (str): Schedule to run the spout on as a cron job.

required

execute_bolt(bolt, method_name, *args, **kwargs)

Execute a method of a bolt.

Parameters:

Name Type Description Default
bolt Bolt

The bolt to execute.

required
method_name str

The name of the method to execute.

required
*args

Positional arguments to pass to the method.

()
**kwargs

Keyword arguments to pass to the method.

{}

Returns:

Name Type Description
Any

The result of the method.

run(args)

Run the command-line interface.

Parameters:

Name Type Description Default
args argparse.Namespace

Parsed command-line arguments.

required