Skip to content

Kafka

Spout for Kafka

Bases: Spout

__init__(output, state, **kwargs)

Initialize the Kafka class.

Parameters:

Name Type Description Default
output StreamingOutput

An instance of the StreamingOutput class for saving the data.

required
state State

An instance of the State class for maintaining the state.

required
**kwargs

Additional keyword arguments.

{}

Using geniusrise to invoke via command line

genius Kafka rise \
    streaming \
        --output_kafka_topic kafka_test \
        --output_kafka_cluster_connection_string localhost:9094 \
    none \
    listen \
        --args topic=my_topic group_id=my_group

Using geniusrise to invoke via YAML file

version: "1"
spouts:
    my_kafka_spout:
        name: "Kafka"
        method: "listen"
        args:
            topic: "my_topic"
            group_id: "my_group"
        output:
            type: "streaming"
            args:
                output_topic: "kafka_test"
                kafka_servers: "localhost:9094"

listen(topic, group_id, bootstrap_servers='localhost:9092', username=None, password=None)

📖 Start listening for data from the Kafka topic.

Parameters:

Name Type Description Default
topic str

The Kafka topic to listen to.

required
group_id str

The Kafka consumer group ID.

required
bootstrap_servers str

The Kafka bootstrap servers. Defaults to "localhost:9092".

'localhost:9092'
username Optional[str]

The username for SASL/PLAIN authentication. Defaults to None.

None
password Optional[str]

The password for SASL/PLAIN authentication. Defaults to None.

None

Raises:

Type Description
Exception

If unable to connect to the Kafka server.