Skip to content

YamlCtl

Control spouts and bolts defined in a YAML file

YamlCtl

Command-line interface for managing spouts and bolts based on a YAML configuration.

The YamlCtl class provides methods to run specific or all spouts and bolts defined in a YAML file. The YAML file's structure is defined by the Geniusfile schema.

Example YAML structures:

version: 1

spouts:
http_listener:
    name: WebhookListener
    method: listen
    args:
    port: 8081
    state:
    type: redis
    args:
        redis_host: "127.0.0.1"
        redis_port: 6379
        redis_db: 0
    output:
    type: batch
    args:
        bucket: geniusrise-test
        folder: train
    deploy:
    type: k8s
    args:
        kind: deployment
        name: webhook-listener
        context_name: arn:aws:eks:us-east-1:genius-dev:cluster/geniusrise-dev
        namespace: geniusrise
        image: geniusrise/geniusrise
        kube_config_path: ~/.kube/config

bolts:
text_classifier:
    name: TextClassifier
    method: classify
    args:
    model_name: bert-base-uncased
    state:
    type: none
    input:
    type: batch
    args:
        bucket: geniusrise-test
        folder: train
    output:
    type: batch
    args:
        bucket: geniusrise-test
        folder: model
    deploy:
    type: k8s
    args:
        kind: deployment
        name: text-classifier
        context_name: arn:aws:eks:us-east-1:genius-dev:cluster/geniusrise-dev
        namespace: geniusrise
        image: geniusrise/geniusrise
        kube_config_path: ~/.kube/config
version: 1

spouts:
twitter_stream:
    name: TwitterStream
    method: stream
    args:
    api_key: "your_twitter_api_key"
    hashtags: ["#AI", "#ML"]
    state:
    type: postgres
    args:
        postgres_host: "127.0.0.1"
        postgres_port: 5432
        postgres_user: "postgres"
        postgres_password: "postgres"
        postgres_database: "geniusrise"
        postgres_table: "twitter_data"
    output:
    type: streaming
    args:
        output_topic: twitter_topic
        kafka_servers: "localhost:9092"
    deploy:
    type: k8s
    args:
        kind: deployment
        name: twitter-stream
        context_name: arn:aws:eks:us-east-1:genius-dev:cluster/geniusrise-dev
        namespace: geniusrise
        image: geniusrise/geniusrise
        kube_config_path: ~/.kube/config

bolts:
sentiment_analyzer:
    name: SentimentAnalyzer
    method: analyze
    args:
    model_name: "sentiment-model"
    state:
    type: dynamodb
    args:
        dynamodb_table_name: "SentimentAnalysis"
        dynamodb_region_name: "us-east-1"
    input:
    type: streaming
    args:
        input_topic: twitter_topic
        kafka_servers: "localhost:9092"
        group_id: "sentiment-group"
    output:
    type: batch
    args:
        bucket: geniusrise-test
        folder: sentiment_results
    deploy:
    type: k8s
    args:
        kind: deployment
        name: sentiment-analyzer
        context_name: arn:aws:eks:us-east-1:genius-dev:cluster/geniusrise-dev
        namespace: geniusrise
        image: geniusrise/geniusrise
        kube_config_path: ~/.kube/config

Attributes:

Name Type Description
geniusfile Geniusfile

Parsed YAML configuration.

spout_ctls Dict[str, SpoutCtl]

Dictionary of SpoutCtl instances.

bolt_ctls Dict[str, BoltCtl]

Dictionary of BoltCtl instances.

__init__(spout_ctls, bolt_ctls)

Initialize YamlCtl with the path to the YAML file and control instances for spouts and bolts.

Parameters:

Name Type Description Default
spout_ctls Dict[str, SpoutCtl]

Dictionary of SpoutCtl instances.

required
bolt_ctls Dict[str, BoltCtl]

Dictionary of BoltCtl instances.

required

create_parser(parser)

Create and return the command-line parser for managing spouts and bolts.

deploy_bolt(bolt_name)

Deploy a specific bolt based on its name.

Parameters:

Name Type Description Default
bolt_name str

Name of the bolt to run.

required

deploy_bolts()

Deploy all bolts defined in the YAML configuration.

deploy_spout(spout_name)

Deploy a specific spout based on its name.

Parameters:

Name Type Description Default
spout_name str

Name of the spout to deploy.

required

deploy_spouts()

Deploy all spouts defined in the YAML configuration.

resolve_reference(input_type, ref_name)

Resolve the reference of a bolt's input based on the input type (spout or bolt).

Parameters:

Name Type Description Default
input_type str

Type of the input ("spout" or "bolt").

required
ref_name str

Name of the spout or bolt to refer to.

required

Returns:

Name Type Description
Output

The output data of the referred spout or bolt.

run(args)

Run the command-line interface for managing spouts and bolts based on provided arguments. Please note that there is no ordering of the spouts and bolts in the YAML configuration. Each spout and bolt is an independent entity even when connected together.

Parameters:

Name Type Description Default
args argparse.Namespace

Parsed command-line arguments.

required

run_bolt(bolt_name)

Run a specific bolt based on its name.

Parameters:

Name Type Description Default
bolt_name str

Name of the bolt to run.

required

run_bolts(executor)

Run all bolts defined in the YAML configuration.

run_spout(spout_name)

Run a specific spout based on its name.

Parameters:

Name Type Description Default
spout_name str

Name of the spout to run.

required

run_spouts(executor)

Run all spouts defined in the YAML configuration.