Skip to content

Spoutctl

The main spout controller

SpoutCtl

Class for managing spouts end-to-end from the command line.

__init__(discovered_spout)

Initialize SpoutCtl with a DiscoveredSpout object.

Parameters:

Name Type Description Default
discovered_spout DiscoveredSpout

DiscoveredSpout object used to create and manage spouts.

required

create_parser(parser)

Add arguments to the command-line parser for managing the spout.

Parameters:

Name Type Description Default
parser argparse.ArgumentParser

Command-line parser.

required

create_spout(output_type, state_type, id, **kwargs)

Create a spout of a specific type.

Parameters:

Name Type Description Default
output_type str

The type of output ("batch" or "streaming").

required
state_type str

The type of state manager ("none", "redis", "postgres", or "dynamodb").

required
**kwargs

Additional keyword arguments for initializing the spout.

Keyword Arguments:
    Batch output:
    - output_folder (str): The directory where output files should be stored temporarily.
    - output_s3_bucket (str): The name of the S3 bucket for output storage.
    - output_s3_folder (str): The S3 folder for output storage.
    Streaming output:
    - output_kafka_topic (str): Kafka output topic for streaming spouts.
    - output_kafka_cluster_connection_string (str): Kafka connection string for streaming spouts.
    Stream to Batch output:
    - output_folder (str): The directory where output files should be stored temporarily.
    - output_s3_bucket (str): The name of the S3 bucket for output storage.
    - output_s3_folder (str): The S3 folder for output storage.
    - buffer_size (int): Number of messages to buffer.
    Redis state manager config:
    - redis_host (str): The host address for the Redis server.
    - redis_port (int): The port number for the Redis server.
    - redis_db (int): The Redis database to be used.
    Postgres state manager config:
    - postgres_host (str): The host address for the PostgreSQL server.
    - postgres_port (int): The port number for the PostgreSQL server.
    - postgres_user (str): The username for the PostgreSQL server.
    - postgres_password (str): The password for the PostgreSQL server.
    - postgres_database (str): The PostgreSQL database to be used.
    - postgres_table (str): The PostgreSQL table to be used.
    DynamoDB state manager config:
    - dynamodb_table_name (str): The name of the DynamoDB table.
    - dynamodb_region_name (str): The AWS region for DynamoDB.

{}

Returns:

Name Type Description
Spout Spout

The created spout.

deploy_spout(args)

Deploy a spout of a specific type.

Parameters:

Name Type Description Default
**kwargs

Additional keyword arguments for initializing the spout.

Keyword Arguments:
    Batch output:
    - output_folder (str): The directory where output files should be stored temporarily.
    - output_s3_bucket (str): The name of the S3 bucket for output storage.
    - output_s3_folder (str): The S3 folder for output storage.
    Streaming output:
    - output_kafka_topic (str): Kafka output topic for streaming spouts.
    - output_kafka_cluster_connection_string (str): Kafka connection string for streaming spouts.
    Stream to Batch output:
    - output_folder (str): The directory where output files should be stored temporarily.
    - output_s3_bucket (str): The name of the S3 bucket for output storage.
    - output_s3_folder (str): The S3 folder for output storage.
    - buffer_size (int): Number of messages to buffer.
    Redis state manager config:
    - redis_host (str): The host address for the Redis server.
    - redis_port (int): The port number for the Redis server.
    - redis_db (int): The Redis database to be used.
    Postgres state manager config:
    - postgres_host (str): The host address for the PostgreSQL server.
    - postgres_port (int): The port number for the PostgreSQL server.
    - postgres_user (str): The username for the PostgreSQL server.
    - postgres_password (str): The password for the PostgreSQL server.
    - postgres_database (str): The PostgreSQL database to be used.
    - postgres_table (str): The PostgreSQL table to be used.
    DynamoDB state manager config:
    - dynamodb_table_name (str): The name of the DynamoDB table.
    - dynamodb_region_name (str): The AWS region for DynamoDB.
    Deployment
    - k8s_kind (str): Kind opf kubernetes resource to be deployed as, choices are "deployment", "service", "job", "cron_job"
    - k8s_name (str): Name of the Kubernetes resource.
    - k8s_image (str): Docker image for the Kubernetes resource.
    - k8s_replicas (int): Number of replicas.
    - k8s_env_vars (json): Environment variables as a JSON string.
    - k8s_cpu (str): CPU requirements.
    - k8s_memory (str): Memory requirements.
    - k8s_storage (str): Storage requirements.
    - k8s_gpu (str): GPU requirements.
    - k8s_kube_config_path (str): Name of the Kubernetes cluster local config.
    - k8s_api_key (str): GPU requirements.
    - k8s_api_host (str): GPU requirements.
    - k8s_verify_ssl (str): GPU requirements.
    - k8s_ssl_ca_cert (str): GPU requirements.
    - k8s_cluster_name (str): Name of the Kubernetes cluster.
    - k8s_context_name (str): Name of the kubeconfig context.
    - k8s_namespace (str): Kubernetes namespace.", default="default
    - k8s_labels (json): Labels for Kubernetes resources, as a JSON string.
    - k8s_annotations (json): Annotations for Kubernetes resources, as a JSON string.
    - k8s_port (int): Port to run the spout on as a service.
    - k8s_target_port (int): Port to expose the spout on as a service.
    - k8s_schedule (str): Schedule to run the spout on as a cron job.

required

execute_spout(spout, method_name, *args, **kwargs)

Execute a method of a spout.

Parameters:

Name Type Description Default
spout Spout

The spout to execute.

required
method_name str

The name of the method to execute.

required
*args

Positional arguments to pass to the method.

()
**kwargs

Keyword arguments to pass to the method.

{}

Returns:

Name Type Description
Any

The result of the method.

run(args)

Run the command-line interface.

Parameters:

Name Type Description Default
args argparse.Namespace

Parsed command-line arguments.

required