Skip to content

Kinesis

Spout for Kinesis

Bases: Spout

__init__(output, state, **kwargs)

Initialize the Kinesis class.

Parameters:

Name Type Description Default
output StreamingOutput

An instance of the StreamingOutput class for saving the data.

required
state State

An instance of the State class for maintaining the state.

required
**kwargs

Additional keyword arguments.

{}

Using geniusrise to invoke via command line

genius Kinesis rise \
    streaming \
        --output_kafka_topic kinesis_test \
        --output_kafka_cluster_connection_string localhost:9094 \
    none \
    listen \
        --args stream_name=my_stream shard_id=shardId-000000000000

Using geniusrise to invoke via YAML file

version: "1"
spouts:
    my_kinesis_spout:
        name: "Kinesis"
        method: "listen"
        args:
            stream_name: "my_stream"
            shard_id: "shardId-000000000000"
        output:
            type: "streaming"
            args:
                output_topic: "kinesis_test"
                kafka_servers: "localhost:9094"

listen(stream_name, shard_id='shardId-000000000000', region_name=None, aws_access_key_id=None, aws_secret_access_key=None)

📖 Start listening for data from the Kinesis stream.

Parameters:

Name Type Description Default
stream_name str

The name of the Kinesis stream.

required
shard_id str

The shard ID to read from. Defaults to "shardId-000000000000".

'shardId-000000000000'
region_name str

The AWS region name.

None
aws_access_key_id str

AWS access key ID for authentication.

None
aws_secret_access_key str

AWS secret access key for authentication.

None

Raises:

Type Description
Exception

If there is an error while processing Kinesis records.