之前的几篇文章介绍了一下RabbitMQ的概念以及环境的搭建和配置,有了RabbitMQ环境就可以基于其实现一些特殊的任务场景了。RabbitMQ官方有个很好的Tutorials基本覆盖了RabbitMQ的各中常见应用场景,现以代码加注释的方式以其客户端pika为例简单介绍如下。更详尽的信息可参阅: 。
之前的几篇文章:
RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接收者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queue接收message。
应用场景1-“Hello Word”
一个P向queue发送一个message,一个C从该queue接收message并打印。
send.py
producer,连接至RabbitMQ Server,声明队列,发送message,关闭连接,退出。import pika #与RabbitMQ Server建立连接 #连接到的broker在本机-localhost上 connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #声明队列以向其发送消息消息 #向不存在的位置发送消息时RabbitMQ将消息丢弃 #queue='hello'指定队列名字 channel.queue_declare(queue='hello', durable=True) #message不能直接发送给queue,需经exchange到达queue,此处使用以空字符串标识的默认的exchange #使用默认exchange时允许通过routing_key明确指定message将被发送给哪个queue #body参数指定了要发送的message内容 channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print " [x] Sent 'Hello World!'" #关闭与RabbitMq Server间的连接 connection.close()
receive.py
consumer,连接至RabbitMQ Server,声明队列,接收消息并进行处理这里为打印出消息,退出。import pika #建立到达RabbitMQ Server的connection #此处RabbitMQ Server位于本机-localhost connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #声明queue,确认要从中接收message的queue #queue_declare函数是幂等的,可运行多次,但只会创建一次 #若可以确信queue是已存在的,则此处可省略该声明,如producer已经生成了该queue #但在producer和consumer中重复声明queue是一个好的习惯 channel.queue_declare(queue='hello') print ' [*] Waiting for messages. To exit press CTRL+C' #定义回调函数 #一旦从queue中接收到一个message回调函数将被调用 #ch:channel #method: #properties: #body:message def callback(ch, method, properties, body): print " [x] Received %r" % (body,) #从queue接收message的参数设置 #包括从哪个queue接收message,用于处理message的callback,是否要确认message #默认情况下是要对消息进行确认的,以防止消息丢失。 #此处将no_ack明确指明为True,不对消息进行确认。 channel.basic_consume(callback, queue='hello', no_ack=True) #开始循环从queue中接收message并使用callback进行处理 channel.start_consuming()
测试
python send.py python receive.py
应用场景2-work queues
将耗时的消息处理通过队列分配给多个consumer来处理,我们称此处的consumer为worker,我们将此处的queue称为Task Queue,其目的是为了避免资源密集型的task的同步处理,也即立即处理task并等待完成。相反,调度task使其稍后被处理。也即把task封装进message并发送到task queue,worker进程在后台运行,从task queue取出task并执行job,若运行了多个worker,则task可在多个worker间分配。
new_task.py(生产者)建立连接,声明队列,发送可以模拟耗时任务的message,断开连接、退出。import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #仅仅对message进行确认不能保证message不丢失,比如RabbitMQ崩溃了queue就会丢失 #因此还需使用durable=True声明queue是持久化的,这样即便Rabb崩溃了重启后queue仍然存在 channel.queue_declare(queue='task_queue', durable=True) #从命令行构造将要发送的message message = ' '.join(sys.argv[1:]) or "Hello World!" #除了要声明queue是持久化的外,还需声明message是持久化的 #basic_publish的properties参数指定message的属性 #此处pika.BasicProperties中的delivery_mode=2指明message为持久的 #这样一来RabbitMQ崩溃重启后queue仍然存在其中的message也仍然存在 #需注意的是将message标记为持久的并不能完全保证message不丢失,因为 #从RabbitMQ接收到message到将其存储到disk仍需一段时间,若此时RabbitMQ崩溃则message会丢失 #况且RabbitMQ不会对每条message做fsync动作 #可通过publisher confirms实现更强壮的持久性保证 channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print " [x] Sent %r" % (message,) connection.close()
worker.py(消费者)
建立连接,声明队列,不断的接收message,处理任务,进行确认。import pika import time #默认情况RabbirMQ将message以round-robin方式发送给下一个consumer #每个consumer接收到的平均message量是一样的 #可以同时运行两个或三个该程序进行测试 connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #仅仅对message进行确认不能保证message不丢失,比如RabbitMQ崩溃了 #还需使用durable=True声明queue是持久化的,这样即便Rabb崩溃了重启后queue仍然存在其中的message不会丢失 #RabbitMQ中不允许使用不同的参数定义同名queue channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' #回调函数,函数体模拟耗时的任务处理:以message中'.'的数量表示sleep的秒数 def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" #对message进行确认 ch.basic_ack(delivery_tag = method.delivery_tag) #若存在多个consumer每个consumer的负载可能不同,有些处理的快有些处理的慢 #RabbitMQ并不管这些,只是简单的以round-robin的方式分配message #这可能造成某些consumer积压很多任务处理不完而一些consumer长期处于饥饿状态 #可以使用prefetch_count=1的basic_qos方法可告知RabbitMQ只有在consumer处理并确认了上一个message后才分配新的message给他 #否则分给另一个空闲的consumer channel.basic_qos(prefetch_count=1) #这里移除了no_ack=True这个参数,也即需要对message进行确认(默认行为) #否则consumer在偶然down后其正在处理和分配到该consumer还未处理的message可能发生丢失 #因为此时RabbitMQ在发送完message后立即从内存删除该message #假如没有设置no_ack=True则consumer在偶然down掉后其正在处理和分配至该consumer但还未来得及处理的message会重新分配到其他consumer #没有设置no_ack=True则consumer在收到message后会向RabbitMQ反馈已收到并处理了message告诉RabbitMQ可以删除该message #RabbitMQ中没有超时的概念,只有在consumer down掉后重新分发message channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
测试:
python new_task.py "A very hard task which takes two seconds.." python worker.py
应用场景3-Publish/Subscribe
在应用场景2中一个message(task)仅被传递给了一个comsumer(worker)。现在我们设法将一个message传递给多个consumer。这种模式被称为publish/subscribe。此处以一个简单的日志系统为例进行说明。该系统包含一个log发送程序和一个log接收并打印的程序。由log发送者发送到queue的消息可以被所有运行的log接收者接收。因此,我们可以运行一个log接收者直接在屏幕上显示log,同时运行另一个log接收者将log写入磁盘文件。
receive_logs.py日志消息接收者:建立连接,声明exchange,将exchange与queue进行绑定,开始不停的接收log并打印。
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #作为好的习惯,在producer和consumer中分别声明一次以保证所要使用的exchange存在 channel.exchange_declare(exchange='logs', type='fanout') #在不同的producer和consumer间共享queue时指明queue的name是重要的 #但某些时候,比如日志系统,需要接收所有的log message而非一个子集 #而且仅对当前的message 流感兴趣,对于过时的message不感兴趣,那么 #可以申请一个临时队列这样,每次连接到RabbitMQ时会以一个随机的名字生成 #一个新的空的queue,将exclusive置为True,这样在consumer从RabbitMQ断开后会删除该queue result = channel.queue_declare(exclusive=True) #用于获取临时queue的name queue_name = result.method.queue #exchange与queue之间的关系成为binding #binding告诉exchange将message发送该哪些queue channel.queue_bind(exchange='logs', queue=queue_name) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r" % (body,) #从指定地queue中consume message且不确认 channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
emit_log.py
日志消息发送者:建立连接,声明fanout类型的exchange,通过exchage向queue发送日志消息,消息被广播给所有接收者,关闭连接,退出。import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #producer只能通过exchange将message发给queue #exchange的类型决定将message路由至哪些queue #可用的exchange类型:direct\topic\headers\fanout #此处定义一个名称为'logs'的'fanout'类型的exchange,'fanout'类型的exchange简单的将message广播到它所知道的所有queue channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" #将message publish到名为log的exchange中 #因为是fanout类型的exchange,这里无需指定routing_key channel.basic_publish(exchange='logs', routing_key='', body=message) print " [x] Sent %r" % (message,) connection.close()
测试
python receive_logs.py python emit_log.py "info: This is the log message"
应用场景4-Routing
应用场景3中构建了简单的log系统,可以将log message广播至多个receiver。现在我们将考虑只把指定的message类型发送给其subscriber,比如,只把error message写到log file而将所有log message显示在控制台。
receive_logs_direct.pylog message接收者:建立连接,声明direct类型的exchange,声明queue,使用提供的参数作为routing_key将queue绑定到exchange,开始循环接收log message并打印。#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #声明一个名为direct_logs类型为direct的exchange #同时在producer和consumer中声明exchage或queue是个好习惯,以保证其存在 channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue #从命令行获取参数:routing_key severities = sys.argv[1:] if not severities: print >> sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],) sys.exit(1) for severity in severities: #exchange和queue之间的binding可接受routing_key参数 #该参数的意义依赖于exchange的类型 #fanout类型的exchange直接忽略该参数 #direct类型的exchange精确匹配该关键字进行message路由 #对多个queue使用相同的binding_key是合法的 channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r:%r" % (method.routing_key, body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
emit_log_direct.py
log message发送者:建立连接,声明direct类型的exchange,生成并发送log message到exchange,关闭连接,退出。import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #声明一个名为direct_logs的direct类型的exchange #direct类型的exchange channel.exchange_declare(exchange='direct_logs', type='direct') #从命令行获取basic_publish的配置参数 severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' #向名为direct_logs的exchage按照设置的routing_key发送message channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print " [x] Sent %r:%r" % (severity, message) connection.close()
测试:
python receive_logs_direct.py infopython emit_log_direct.py info "The message"
应用场景5-topic
应用场景4中改进的log系统中用direct类型的exchange替换应用场景3中的fanout类型exchange实现将不同的log message发送给不同的subscriber(也即分别通过不同的routing_key将queue绑定到exchange,这样exchange便可将不同的message根据message内容路由至不同的queue)。但仍然存在限制,不能根据多个规则路由消息,比如接收者要么只能收error类型的log message要么只能收info类型的message。如果我们不仅想根据log的重要级别如info、warning、error等来进行log message路由还想同时根据log message的来源如auth、cron、kern来进行路由。为了达到此目的,需要topic类型的exchange。topic类型的exchange中routing_key中可以包含两个特殊字符:“*”用于替代一个词,“#”用于0个或多个词。
receive_logs_topic.pylog message接收者:建立连接,声明topic类型的exchange,声明queue,根据程序参数构造routing_key,根据routing_key将queue绑定到exchange,循环接收并处理message。#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #声明一个名为direct_logs类型为direct的exchange #同时在producer和consumer中声明exchage或queue是个好习惯,以保证其存在 channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue #从命令行获取参数:routing_key severities = sys.argv[1:] if not severities: print >> sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],) sys.exit(1) for severity in severities: #exchange和queue之间的binding可接受routing_key参数 #该参数的意义依赖于exchange的类型 #fanout类型的exchange直接忽略该参数 #direct类型的exchange精确匹配该关键字进行message路由 #对多个queue使用相同的binding_key是合法的 channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r:%r" % (method.routing_key, body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
emit_log_topic.py
log message发送者:建立连接、声明topic类型的exchange、根据程序参数构建routing_key和要发送的message,以构建的routing_key将message发送给topic类型的exchange,关闭连接,退出。#!/usr/bin/env python #encoding:utf8 import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #声明一个名为topic_logs的topic类型的exchange #topic类型的exchange可通过通配符对message进行匹配从而路由至不同queue channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print " [x] Sent %r:%r" % (routing_key, message) connection.close()
测试:
python receive_logs_topic.py "*.rabbit" python emit_log_topic.py red.rabbit Hello
应用场景6-PRC
在应用场景2中描述了如何使用work queue将耗时的task分配到不同的worker中。但是,如果我们task是想在远程的计算机上运行一个函数并等待返回结果呢。这根场景2中的描述是一个完全不同的故事。这一模式被称为远程过程调用。现在,我们将构建一个RPC系统,包含一个client和可扩展的RPC server,通过返回斐波那契数来模拟RPC service。
rpc_server.py
RPC server:建立连接,声明queue,定义了一个返回指定数字的斐波那契数的函数,定义了一个回调函数在接收到包含参数的调用请求后调用自己的返回斐波那契数的函数并将结果发送到与接收到message的queue相关联的queue,并进行确认。开始接收调用请求并用回调函数进行请求处理。#!/usr/bin/env python #encoding:utf8 import pika #建立到达RabbitMQ Server的connection connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #声明一个名为rpc_queue的queue channel.queue_declare(queue='rpc_queue') #计算指定数字的斐波那契数 def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) #回调函数,从queue接收到message后调用该函数进行处理 def on_request(ch, method, props, body): #由message获取要计算斐波那契数的数字 n = int(body) print " [.] fib(%s)" % (n,) #调用fib函数获得计算结果 response = fib(n) #exchage为空字符串则将message发送个到routing_key指定的queue #这里queue为回调函数参数props中reply_ro指定的queue #要发送的message为计算所得的斐波那契数 #properties中correlation_id指定为回调函数参数props中co的rrelation_id #最后对消息进行确认 ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) #只有consumer已经处理并确认了上一条message时queue才分派新的message给它 channel.basic_qos(prefetch_count=1) #设置consumeer参数,即从哪个queue获取消息使用哪个函数进行处理,是否对消息进行确认 channel.basic_consume(on_request, queue='rpc_queue') print " [x] Awaiting RPC requests" #开始接收并处理消息 channel.start_consuming()
rpc_client.py
RPC client:远程过程调用发起者:定义了一个类,类中初始化到RabbitMQ Server的连接、声明回调queue、开始在回调queue上等待接收响应、定义了在回调queue上接收到响应后的处理函数on_response根据响应关联的correlation_id属性作出响应、定义了调用函数并在其中向调用queue发送包含correlation_id等属性的调用请求、初始化一个client实例,以30为参数发起远程过程调用。#!/usr/bin/env python #encoding:utf8 import pika import uuid #在一个类中封装了connection建立、queue声明、consumer配置、回调函数等 class FibonacciRpcClient(object): def __init__(self): #建立到RabbitMQ Server的connection self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() #声明一个临时的回调队列 result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue #此处client既是producer又是consumer,因此要配置consume参数 #这里的指明从client自己创建的临时队列中接收消息 #并使用on_response函数处理消息 #不对消息进行确认 self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) #定义回调函数 #比较类的corr_id属性与props中corr_id属性的值 #若相同则response属性为接收到的message def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): #初始化response和corr_id属性 self.response = None self.corr_id = str(uuid.uuid4()) #使用默认exchange向server中定义的rpc_queue发送消息 #在properties中指定replay_to属性和correlation_id属性用于告知远程server #correlation_id属性用于匹配request和response self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), #message需为字符串 body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) #生成类的实例 fibonacci_rpc = FibonacciRpcClient() print " [x] Requesting fib(30)" #调用实例的call方法 response = fibonacci_rpc.call(30) print " [.] Got %r" % (response,)
测试:
python rpc_server.py python rpc_client.py