Skip to content

ZeroMQ

Spout for ZeroMQ

Bases: Spout

__init__(output, state, **kwargs)

Initialize the ZeroMQ class.

Parameters:

Name Type Description Default
output StreamingOutput

An instance of the StreamingOutput class for saving the data.

required
state State

An instance of the State class for maintaining the state.

required
**kwargs

Additional keyword arguments.

{}

Using geniusrise to invoke via command line

genius ZeroMQ rise \
    streaming \
        --output_kafka_topic zmq_test \
        --output_kafka_cluster_connection_string localhost:9094 \
    none \
    listen \
        --args endpoint=tcp://localhost:5555 topic=my_topic syntax=json

Using geniusrise to invoke via YAML file

version: "1"
spouts:
    my_zmq_spout:
        name: "ZeroMQ"
        method: "listen"
        args:
            endpoint: "tcp://localhost:5555"
            topic: "my_topic"
            syntax: "json"
        output:
            type: "streaming"
            args:
                output_topic: "zmq_test"
                kafka_servers: "localhost:9094"

listen(endpoint, topic, syntax, socket_type='SUB')

📖 Start listening for data from the ZeroMQ server.

Parameters:

Name Type Description Default
endpoint str

The endpoint to connect to (e.g., "tcp://localhost:5555").

required
topic str

The topic to subscribe to.

required
syntax str

The syntax to be used (e.g., "json").

required
socket_type Optional[str]

The type of ZeroMQ socket (default is "SUB").

'SUB'

Raises:

Type Description
Exception

If unable to connect to the ZeroMQ server or process messages.