Examples

Start local RabbitMq instance with Docker:

docker run -d --name rabbit -e RABBITMQ_DEFAULT_USER=amqp -e RABBITMQ_DEFAULT_PASS=amqp -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Open RabbitMq admin (user=amqp, password=amqp) at:

http://localhost:15672/

Then, run either of these examples:

Consumer

Following example demonstrates running a simple consumer.

import pika
from PikaBus.abstractions.AbstractPikaBus import AbstractPikaBus
from PikaBus.PikaBusSetup import PikaBusSetup
from PikaBus.PikaErrorHandler import PikaErrorHandler


def MessageHandlerMethod(**kwargs):
    """
    A message handler method may simply be a method with som **kwargs.
    The **kwargs will be given all incoming pipeline data, the bus and the incoming payload.
    """
    data: dict = kwargs['data']
    bus: AbstractPikaBus = kwargs['bus']
    payload: dict = kwargs['payload']
    print(payload)
    if payload['reply']:
        payload['reply'] = False
        bus.Reply(payload=payload)


# Use pika connection params to set connection details
credentials = pika.PlainCredentials('amqp', 'amqp')
connParams = pika.ConnectionParameters(
    host='localhost',
    port=5672,
    virtual_host='/',
    credentials=credentials)

# Create a PikaBusSetup instance with a listener queue, and add the message handler method.
pikaErrorHandler = PikaErrorHandler(errorQueue='error', maxRetries=1)
pikaBusSetup = PikaBusSetup(connParams,
                            defaultListenerQueue='myQueue',
                            defaultSubscriptions='myTopic',
                            pikaErrorHandler=pikaErrorHandler)
pikaBusSetup.AddMessageHandler(MessageHandlerMethod)

# Start consuming messages from the queue.
pikaBusSetup.StartConsumers()

input('Hit enter to stop all consuming channels \n\n')
pikaBusSetup.StopConsumers()

Publish Message

This example demonstrates how to publish a message in a one-to-many pattern with at least once guarantee. The mandatory received flag is turned on by default, so you will get an exception if there are no subscribers on the topic.

import pika
from PikaBus.PikaBusSetup import PikaBusSetup


# Use pika connection params to set connection details
credentials = pika.PlainCredentials('amqp', 'amqp')
connParams = pika.ConnectionParameters(
    host='localhost',
    port=5672,
    virtual_host='/',
    credentials=credentials)

# Create a PikaBusSetup instance without a listener queue
pikaBusSetup = PikaBusSetup(connParams)

# Create a temporary bus to publish messages.
bus = pikaBusSetup.CreateBus()
payload = {'hello': 'world!', 'reply': False}

# To publish a message means publishing a message on a topic received by any subscribers of the topic.
bus.Publish(payload=payload, topic='myTopic')

Send Message

This example demonstrates how to send a message in a one-to-one pattern with at least once guarantee. An exception will be thrown if the destination queue doesn’t exist.

import pika
import datetime
from PikaBus.PikaBusSetup import PikaBusSetup


# Use pika connection params to set connection details
credentials = pika.PlainCredentials('amqp', 'amqp')
connParams = pika.ConnectionParameters(
    host='localhost',
    port=5672,
    virtual_host='/',
    credentials=credentials)

# Create a PikaBusSetup instance without a listener queue
pikaBusSetup = PikaBusSetup(connParams)

# Create a temporary bus to send messages.
bus = pikaBusSetup.CreateBus()
payload = {'hello': 'world!', 'reply': False}

# To send a message means sending a message explicitly to one receiver. 
# The sending will fail if the destination queue `myQueue` doesn't exist.
# Create `myQueue` in the RabbitMq admin portal at http://localhost:15672 if it doesn't exist (user=amqp, password=amqp)
bus.Send(payload=payload, queue='myQueue')

# To defer a message means sending a message explicitly to one receiver with some delay before it is processed.
bus.Defer(payload=payload, delay=datetime.timedelta(seconds=10), queue='myQueue')

Transaction Handling

This example demonstrates how to send or publish messages in a transaction. The transaction is automatically handled in the with statement. Basically, all outgoing messages are published at transaction commit.

import pika
import json
from PikaBus.PikaBusSetup import PikaBusSetup
from PikaBus.abstractions.AbstractPikaBus import AbstractPikaBus


# Use pika connection params to set connection details.
credentials = pika.PlainCredentials('amqp', 'amqp')
connParams = pika.ConnectionParameters(
    host='localhost',
    port=5672,
    virtual_host='/',
    credentials=credentials)

# Create a PikaBusSetup instance without a listener queue.
pikaBusSetup = PikaBusSetup(connParams)

# Run Init to create default listener queue, exchanges and subscriptions.
pikaBusSetup.Init(listenerQueue='myQueue', subscriptions='myQueue')

# Create a temporary bus transaction using the `with` statement
# to transmit all outgoing messages at the end of the transaction.
with pikaBusSetup.CreateBus() as bus:
    bus: AbstractPikaBus = bus
    payload = {'hello': 'world!', 'reply': False}
    bus.Send(payload=payload, queue='myQueue')
    bus.Publish(payload=payload, topic='myQueue')

# Fetch and print all messages from the queue synchronously.
with pikaBusSetup.CreateBus() as bus:
    bus: AbstractPikaBus = bus
    message = bus.channel.basic_get('myQueue', auto_ack=True)
    while message[0] is not None:
        print(json.loads(message[2]))
        message = bus.channel.basic_get('myQueue', auto_ack=True)

Error Handling

By default, PikaBus implements error handling by forwarding failed messages to a durable queue named error after 5 retry attemps with backoff policy between each attempt. Following example demonstrates how it is possible to change the error handler settings, or even replace the error handler.

import pika
from PikaBus.abstractions.AbstractPikaBus import AbstractPikaBus
from PikaBus.PikaBusSetup import PikaBusSetup
from PikaBus.PikaErrorHandler import PikaErrorHandler


def failingMessageHandlerMethod(**kwargs):
    """
    This message handler fails every time for some dumb reason ..
    """
    data: dict = kwargs['data']
    bus: AbstractPikaBus = kwargs['bus']
    payload: dict = kwargs['payload']
    print(payload)
    raise Exception("I'm just failing as I'm told ..")


# Use pika connection params to set connection details
credentials = pika.PlainCredentials('amqp', 'amqp')
connParams = pika.ConnectionParameters(
    host='localhost',
    port=5672,
    virtual_host='/',
    credentials=credentials)

# Create a PikaBusSetup instance with a listener queue and your own PikaErrorHandler definition.
pikaErrorHandler = PikaErrorHandler(errorQueue='error', maxRetries=1)
pikaBusSetup = PikaBusSetup(connParams,
                            defaultListenerQueue='myFailingQueue',
                            pikaErrorHandler=pikaErrorHandler)
pikaBusSetup.AddMessageHandler(failingMessageHandlerMethod)

# Start consuming messages from the queue.
pikaBusSetup.Init()
pikaBusSetup.StartConsumers()

# Create a temporary bus to subscribe on topics and send, defer or publish messages.
bus = pikaBusSetup.CreateBus()
payload = {'hello': 'world!', 'reply': True}

# To send a message means sending a message explicitly to one receiver.
# In this case the message will keep failing and end up in an dead-letter queue called `error`.
# Locate the failed message in the `error` queue at the RabbitMq admin portal on http://localhost:15672 (user=amqp, password=amqp)
bus.Send(payload=payload, queue='myFailingQueue')

input('Hit enter to stop all consuming channels \n\n')
pikaBusSetup.StopConsumers()

REST API With Flask & PikaBus

Following example demonstrates how to combine a REST API with PikaBus running as a background job. PikaBus handles restarts and downtime since it’s fault-tolerant with auto-reconnect and state recovery. It is possible to combine PikaBus with any other web framework, such as Tornado, since it’s a self-contained background job.

import pika
import logging
from flask import Flask
from PikaBus.abstractions.AbstractPikaBus import AbstractPikaBus
from PikaBus.PikaBusSetup import PikaBusSetup

# Requirements
# - pip install flask

logging.basicConfig(format=f'[%(levelname)s] %(name)s - %(message)s', level='WARNING')
log = logging.getLogger(__name__)


def MessageHandlerMethod(**kwargs):
    """
    A message handler method may simply be a method with som **kwargs.
    The **kwargs will be given all incoming pipeline data, the bus and the incoming payload.
    """
    data: dict = kwargs['data']
    bus: AbstractPikaBus = kwargs['bus']
    payload: dict = kwargs['payload']
    print(f'Received message: {payload}')


# Use pika connection params to set connection details
credentials = pika.PlainCredentials('amqp', 'amqp')
connParams = pika.ConnectionParameters(
    host='localhost',
    port=5672,
    virtual_host='/',
    credentials=credentials)

# Create a PikaBusSetup instance with a listener queue, and add the message handler method.
pikaBusSetup = PikaBusSetup(connParams,
                            defaultListenerQueue='myFlaskQueue',
                            defaultSubscriptions='myFlaskTopic')
pikaBusSetup.AddMessageHandler(MessageHandlerMethod)

# Start consuming messages from the queue
pikaBusSetup.StartConsumers()

# Create a flask app
app = Flask(__name__)


# Create an api route that simply publishes a message
@app.route('/')
def Publish():
    with pikaBusSetup.CreateBus() as bus:
        bus: AbstractPikaBus = bus
        payload = {'hello': 'world!', 'reply': True}
        bus.Publish(payload=payload, topic='myTopic')
        return 'Payload published :D'


# Run flask app on http://localhost:5005/
app.run(debug=True, host='0.0.0.0', port=5005)