A Simple RabbitMQ Consumer with Python


Here’s a very simple RabbitMQ consumer in Python, deployed in a Kubernetes cluster. I used this to act as a consumer for testing purposes.

First the Python code:

# sink.py
import pika, sys, os
import logging

logger = logging.getLogger('sink')
rmqu = os.environ.get('RMQ_USER')
rmqp = os.environ.get('RMQ_PASS')
q = os.environ.get('RMQ_Q')
vhost = os.environ.get('RMQ_VHOST')
service = os.environ.get('RMQ_SVC')

def callback(ch, method, properties, body):
    logger.info(" [x] Received %r" % body)

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters(service, 5672, vhost, credentials=pika.PlainCredentials(rmqu, rmqp)))
    channel = connection.channel()
    channel.basic_consume(queue=q, on_message_callback=callback, auto_ack=True)
    channel.start_consuming()

if __name__ == "__main__":
    main()

The above code can be saved as a ConfigMap in the cluster:

kubectl create cm sink --from-file=sink.py

Then the Kubernetes manifest:

# sink.yaml
apiVersion: v1
kind: Pod
metadata:
  labels:
    app: rmq-sink
  name: rmq-sink-some-queue
spec:
  containers:
  - command:
    - bash
    - -c
    - |
      pip3 install pika
      python3 /root/scripts/sink.py
    env:
      - name: RMQ_USER
        value: queue_user
      - name: RMQ_PASS
        value: queue_pass
      - name: RMQ_Q
        value: some.queue
      - name: RMQ_SVC
      	value: rmq-service.rmq-namespace.svc
    image: python:3.10
    name: rmq-sink
    resources:
      requests:
        cpu: 100m
        memory: 200Mi
    volumeMounts:
    - mountPath: /root/scripts
      name: script
  volumes:
  - name: script
    configMap:
      name: sink

At last this can be simply deployed with

kubectl apply -f sink.yaml

🙂