YamlCtl¶
Control spouts and bolts defined in a YAML file
YamlCtl
¶
Command-line interface for managing spouts and bolts based on a YAML configuration.
The YamlCtl class provides methods to run specific or all spouts and bolts defined in a YAML file. The YAML file's structure is defined by the Geniusfile schema.
Example YAML structures:
version: 1
spouts:
http_listener:
name: WebhookListener
method: listen
args:
port: 8081
state:
type: redis
args:
redis_host: "127.0.0.1"
redis_port: 6379
redis_db: 0
output:
type: batch
args:
bucket: geniusrise-test
folder: train
deploy:
type: k8s
args:
kind: deployment
name: webhook-listener
context_name: arn:aws:eks:us-east-1:genius-dev:cluster/geniusrise-dev
namespace: geniusrise
image: geniusrise/geniusrise
kube_config_path: ~/.kube/config
bolts:
text_classifier:
name: TextClassifier
method: classify
args:
model_name: bert-base-uncased
state:
type: none
input:
type: batch
args:
bucket: geniusrise-test
folder: train
output:
type: batch
args:
bucket: geniusrise-test
folder: model
deploy:
type: k8s
args:
kind: deployment
name: text-classifier
context_name: arn:aws:eks:us-east-1:genius-dev:cluster/geniusrise-dev
namespace: geniusrise
image: geniusrise/geniusrise
kube_config_path: ~/.kube/config
version: 1
spouts:
twitter_stream:
name: TwitterStream
method: stream
args:
api_key: "your_twitter_api_key"
hashtags: ["#AI", "#ML"]
state:
type: postgres
args:
postgres_host: "127.0.0.1"
postgres_port: 5432
postgres_user: "postgres"
postgres_password: "postgres"
postgres_database: "geniusrise"
postgres_table: "twitter_data"
output:
type: streaming
args:
output_topic: twitter_topic
kafka_servers: "localhost:9092"
deploy:
type: k8s
args:
kind: deployment
name: twitter-stream
context_name: arn:aws:eks:us-east-1:genius-dev:cluster/geniusrise-dev
namespace: geniusrise
image: geniusrise/geniusrise
kube_config_path: ~/.kube/config
bolts:
sentiment_analyzer:
name: SentimentAnalyzer
method: analyze
args:
model_name: "sentiment-model"
state:
type: dynamodb
args:
dynamodb_table_name: "SentimentAnalysis"
dynamodb_region_name: "us-east-1"
input:
type: streaming
args:
input_topic: twitter_topic
kafka_servers: "localhost:9092"
group_id: "sentiment-group"
output:
type: batch
args:
bucket: geniusrise-test
folder: sentiment_results
deploy:
type: k8s
args:
kind: deployment
name: sentiment-analyzer
context_name: arn:aws:eks:us-east-1:genius-dev:cluster/geniusrise-dev
namespace: geniusrise
image: geniusrise/geniusrise
kube_config_path: ~/.kube/config
Attributes:
Name | Type | Description |
---|---|---|
geniusfile |
Geniusfile
|
Parsed YAML configuration. |
spout_ctls |
Dict[str, SpoutCtl]
|
Dictionary of SpoutCtl instances. |
bolt_ctls |
Dict[str, BoltCtl]
|
Dictionary of BoltCtl instances. |
__init__(spout_ctls, bolt_ctls)
¶
Initialize YamlCtl with the path to the YAML file and control instances for spouts and bolts.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spout_ctls |
Dict[str, SpoutCtl]
|
Dictionary of SpoutCtl instances. |
required |
bolt_ctls |
Dict[str, BoltCtl]
|
Dictionary of BoltCtl instances. |
required |
create_parser(parser)
¶
Create and return the command-line parser for managing spouts and bolts.
deploy_bolt(bolt_name)
¶
Deploy a specific bolt based on its name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bolt_name |
str
|
Name of the bolt to run. |
required |
deploy_bolts()
¶
Deploy all bolts defined in the YAML configuration.
deploy_spout(spout_name)
¶
Deploy a specific spout based on its name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spout_name |
str
|
Name of the spout to deploy. |
required |
deploy_spouts()
¶
Deploy all spouts defined in the YAML configuration.
resolve_reference(input_type, ref_name)
¶
Resolve the reference of a bolt's input based on the input type (spout or bolt).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_type |
str
|
Type of the input ("spout" or "bolt"). |
required |
ref_name |
str
|
Name of the spout or bolt to refer to. |
required |
Returns:
Name | Type | Description |
---|---|---|
Output |
The output data of the referred spout or bolt. |
run(args)
¶
Run the command-line interface for managing spouts and bolts based on provided arguments. Please note that there is no ordering of the spouts and bolts in the YAML configuration. Each spout and bolt is an independent entity even when connected together.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
argparse.Namespace
|
Parsed command-line arguments. |
required |
run_bolt(bolt_name)
¶
Run a specific bolt based on its name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bolt_name |
str
|
Name of the bolt to run. |
required |
run_bolts(executor)
¶
Run all bolts defined in the YAML configuration.
run_spout(spout_name)
¶
Run a specific spout based on its name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spout_name |
str
|
Name of the spout to run. |
required |
run_spouts(executor)
¶
Run all spouts defined in the YAML configuration.