文章目次
1、消息队列
1.1、界说
消息队列就是底子数据结构中的“先进先出”的一种数据机构,生活中买东西,需要列队,先排的人先买消费,就是范例的“先进先出”
1.2、MQ管理什么问题
- MQ是一直存在,不外随着微服务架构的盛行,成了管理微服务之间问题的常用工具
复制代码 1.2.1、应用解耦
- 以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操纵异常当转酿成基于消息队列的方式后,系统间调用的问题会淘汰许多,好比物流系统因为发生故障,需要几分钟来修复,在这几分钟的时间里,物流系统要处理处罚的内存被缓存在消息队列中,用户的下单操纵可以正常完成,当物流系统规复后,继承处理处罚订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性
复制代码
1.2.2、流量消峰
- 如果订单系统最多能处理处罚一万次订单,这个处理处罚本领应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回效果,但是在高峰期,如果有两万次下单操纵系统是处理处罚不了的,只能限制订单凌驾一万后不允许用户下单使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理处罚,这事有些用户大概在下单十几秒后才气收到下单乐成的操纵,但是比不能下单的体验要好
复制代码 1.2.3、消息分发
多个服务队数据感兴趣,只需要监听同一类消息即可处理处罚
比方A产生数据,B对数据感兴趣。如果没有消息的队列A每次处理处罚完需要调用一下B服务,过了一段时间C对数据也感性,A就需要改代码,调用B服务,调用C服务,只要有服务需要,A服务都要改动代码,很不方便
有了消息队列后,A只管发送一次消息,B对消息感兴趣,只需要监听消息,C感兴趣,C也去监听消息,A服务作为底子服务完全不需要有改动
1.2.4、异步消息
有些服务间调用是异步的,比方A调用B,B需要泯灭很长时间执行,但是A需要知道B什么时候可以执行完,以前一般有两种方式,A过一段时间去调用B的查询api查询,大概A提供一个callback api,B执行完之后调用api通知A服务,这两种方式都不是很优雅
使用消息总线,可以很方便管理这个问题,A调用B服务后,只需要监听B处理处罚完成的消息,当B处理处罚完成后,会发送一条消息给MQ,MQ会将此消息转发给A服务
这样A服务既不消循环调用B的查询api,也不消提供callback api,同样B服务也不消做这些操纵,A服务还能实时的得到异步处理处罚乐成的消息
2、Rabbitmq安装
官网:https://www.rabbitmq.com/getstarted.html
2.1、服务端原生安装
- # 安装设置epel源# 安装erlangyum -y install erlang# 安装RabbitMQyum -y install rabbitmq-server
复制代码 2.2、服务端Docker安装
- docker pull rabbitmq:managementdocker run -di --name Myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management
复制代码 2.3、 客户端安装
2.4、设置用户和暗码
- rabbitmqctl add_user allen 123# 设置用户为administrator脚色rabbitmqctl set_user_tags allen administrator# 设置权限rabbitmqctl set_permissions -p "/" root ".*" ".*" ".*"# 然后重启rabbiMQ服务systemctl reatart rabbitmq-server # 然后可以使用刚才的用户远程毗连rabbitmq server了
复制代码 3、基于Queue实现生产者消费者模子
- import Queueimport threadingmessage = Queue.Queue(10)def producer(i): while True: message.put(i)def consumer(i): while True: msg = message.get()for i in range(12): t = threading.Thread(target=producer, args=(i,)) t.start()for i in range(10): t = threading.Thread(target=consumer, args=(i,)) t.start()
复制代码 4、根本使用(生产者消费者模子)
对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列
生产者
- import pika# 无暗码# connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80'))# 有暗码credentials = pika.PlainCredentials("admin", "admin")# 拿到毗连对象connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80', credentials=credentials))# 拿到channel对象channel = connection.channel()# 声明一个队列channel.queue_declare(queue='hello') # 指定队列名称# 生产者向队列中放一条消息channel.basic_publish(exchange='', routing_key='hello', # 消息队列名称 body='Hello Python!')print("Sent 'Hello Python!'")# 关闭毗连connection.close()
复制代码 消费者
- import pika# 无暗码# connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80'))# 有暗码credentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80', credentials=credentials))def main(): channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True) channel.start_consuming()if __name__ == '__main__': main()
复制代码 5、消息确认机制
重要设置参数
- # 通知服务端,消息取走了,如果auto_ack=False,不加下面的设置,消息会一直存在ch.basic_ack(delivery_tag=method.delivery_tag)
复制代码 生产者
- import pika# 无暗码# connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80'))# 有暗码credentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80', credentials=credentials))channel = connection.channel()# 声明一个队列(创建一个队列)channel.queue_declare(queue='hello2')channel.basic_publish(exchange='', routing_key='hello2', body='Hello Python!')print("Sent 'Hello Python!'")connection.close()
复制代码 消费者
- import pika, sys, os# 无暗码# connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80'))# 有暗码credentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80', credentials=credentials))def main(): channel = connection.channel() # 声明一个队列(创建一个队列) channel.queue_declare(queue='hello2') def callback(ch, method, properties, body): print(" [x] Received %r" % body) # 真正的消息处理处罚完了,再发确认 # 通知服务端,消息取走了,如果auto_ack=False,不加下面的设置,消息会一直存在 ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(queue='hello2', on_message_callback=callback,auto_ack=False) channel.start_consuming()if __name__ == '__main__': main()
复制代码 6、消息安全之durable长期化
重要设置参数
- # 在声明队列时,指定长期化channel.queue_declare(queue='hello3', durable=True)# 在发布消息的时候,指定消息长期化properties=pika.BasicProperties( delivery_mode=2, # make message persistent)
复制代码 生产者
- import pika# 无暗码# connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80'))# 有暗码credentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80', credentials=credentials))channel = connection.channel()# 声明一个队列(创建一个队列),durable=True支持长期化,队列必须是新的才可以channel.queue_declare(queue='hello3', durable=True)channel.basic_publish(exchange='', routing_key='hello3', body='Hello Python!', properties=pika.BasicProperties( delivery_mode=2, # make message persistent 消息也长期化 ))print("Sent 'Hello Python!'")connection.close()
复制代码 消费者
- import pika, sys, os# 无暗码# connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80'))# 有暗码credentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80', credentials=credentials))def main(): channel = connection.channel() # 声明一个队列(创建一个队列),durable=True支持长期化,队列必须是新的才可以 channel.queue_declare(queue='hello3', durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) # 真正的消息处理处罚完了,再发确认 ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(queue='hello3', on_message_callback=callback) channel.start_consuming()if __name__ == '__main__': main()
复制代码 7、闲置消费
说明
- 正常情况如果有多个消费者,是按照顺序第一个消息给第一个消费者,第二个消息给第二个消费者但是大概第一个消息的消费者处理处罚消息很耗时,一直没竣事,就可以让第二个消费者优先得到闲置的消息
复制代码 重要设置参数
- # 谁闲置谁获取,没须要按照顺序一个一个来channel.basic_qos(prefetch_count=1)
复制代码 生产者
- import pika# 无暗码# connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80'))# 有暗码credentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80', credentials=credentials))channel = connection.channel()# 声明一个队列(创建一个队列),durable=True支持长期化,队列必须是新的才可以channel.queue_declare(queue='hello4', durable=True)channel.basic_publish(exchange='', routing_key='hello4', body='Hello Python!', properties=pika.BasicProperties( delivery_mode=2, # make message persistent 消息也长期化 ))print("Sent 'Hello Python!'")connection.close()
复制代码 消费者1
- import pika, sys, os# 无暗码# connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80'))# 有暗码credentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80', credentials=credentials))def main(): channel = connection.channel() # 声明一个队列(创建一个队列),durable=True支持长期化,队列必须是新的才可以 channel.queue_declare(queue='hello4', durable=True) def callback(ch, method, properties, body): import time time.sleep(50) # 模拟大量延迟,消费者1处于阻塞状态,这种情况下闲置的消费者会获取消息 print(" [x] Received %r" % body) # 真正的消息处理处罚完了,再发确认 ch.basic_ack(delivery_tag=method.delivery_tag) # 谁闲置谁获取,没须要按照顺序一个一个来 channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='hello4', on_message_callback=callback) channel.start_consuming()if __name__ == '__main__': main()
复制代码 消费者2
- import pika, sys, os# 无暗码# connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80'))# 有暗码credentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80', credentials=credentials))def main(): channel = connection.channel() # 声明一个队列(创建一个队列),durable=True支持长期化,队列必须是新的才可以 channel.queue_declare(queue='hello4', durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) # 真正的消息处理处罚完了,再发确认 ch.basic_ack(delivery_tag=method.delivery_tag) # 谁闲置谁获取,没须要按照顺序一个一个来 channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='hello4', on_message_callback=callback) channel.start_consuming()if __name__ == '__main__': main()
复制代码 8、发布订阅
发布者
- import pika# 有暗码credentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80', credentials=credentials))channel = connection.channel()# 声明队列没有指定名字,指定了exchangechannel.exchange_declare(exchange='logs', exchange_type='fanout')message = "info: Hello World!"channel.basic_publish(exchange='logs', routing_key='', body=message)print(" [x] Sent %r" % message)connection.close()
复制代码 订阅者(启动频频订阅者会生成几个队列)
- import pika# 有暗码credentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80', credentials=credentials))channel = connection.channel()channel.exchange_declare(exchange='logs', exchange_type='fanout')# queue不能指定名字,随机生成,当前文件执行n遍就会生成n个差别的名字result = channel.queue_declare(queue='', exclusive=True)queue_name = result.method.queueprint(queue_name)channel.queue_bind(exchange='logs', queue=queue_name)print('
- [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body): print(" [x] %r" % body)channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True)channel.start_consuming()
复制代码 9、发布订阅高级之Routing(按关键字匹配)
发布者
- import pika# 有暗码credentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80', credentials=credentials))channel = connection.channel()# 声明队列没有指定名字,指定了exchange,exchange必须是新的,不要用已经存在的channel.exchange_declare(exchange='logs1', exchange_type='direct')message = "sports: Hello World!"channel.basic_publish(exchange='logs1', routing_key='sports', # 多个关键字,指定routing_key,订阅者只要设置了当前routing_key,都会收到发布者发送的消息 body=message)print(" [x] Sent %r" % message)connection.close()
复制代码 订阅者1
- import pika# 有暗码credentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80', credentials=credentials))channel = connection.channel()# exchange='logs1',exchange(秘书)的名称# exchange_type='direct',秘书工作方式将消息发送给差别的关键字channel.exchange_declare(exchange='logs1', exchange_type='direct')# queue不能指定名字,随机生成,当前文件执行n遍就会生成n个差别的名字# 随机生成一个队列result = channel.queue_declare(queue='', exclusive=True)queue_name = result.method.queueprint(queue_name)# 订阅了sports和star# 让exchange和queque举行绑定channel.queue_bind(exchange='logs1', queue=queue_name, routing_key='sports')channel.queue_bind(exchange='logs1', queue=queue_name, routing_key='star')print('
- [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body): print(" [x] %r" % body)channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True)channel.start_consuming()
复制代码 订阅者2
- import pika# 有暗码credentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80', credentials=credentials))channel = connection.channel()# exchange='logs1',exchange(秘书)的名称# exchange_type='direct',秘书工作方式将消息发送给差别的关键字channel.exchange_declare(exchange='logs1', exchange_type='direct')# queue不能指定名字,随机生成,当前文件执行n遍就会生成n个差别的名字# 随机生成一个队列result = channel.queue_declare(queue='', exclusive=True)queue_name = result.method.queueprint(queue_name)# 订阅了star# 让exchange和queque举行绑定channel.queue_bind(exchange='logs1', queue=queue_name, routing_key='star')print('
- [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body): print(" [x] %r" % body)channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True)channel.start_consuming()
复制代码 10、发布订阅高级之Topic(按关键字暗昧匹配)
重要参数
*只能加一个单词
#可以加任意单词字符
发布者
- import pika# 有暗码credentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80', credentials=credentials))channel = connection.channel()# 声明队列没有指定名字,指定了exchangechannel.exchange_declare(exchange='logs2', exchange_type='topic')message = "hello.java.php.go!"channel.basic_publish(exchange='logs2', # routing_key='hello.java', 这种方式都能收到 routing_key='hello.java.php.go', # 只有hello.#能收到 body=message)print(" [x] Sent %r" % message)connection.close()
复制代码 订阅者1
- import pika# 有暗码credentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80', credentials=credentials))channel = connection.channel()# exchange='logs2',exchange(秘书)的名称# exchange_type='topic',暗昧匹配channel.exchange_declare(exchange='logs2', exchange_type='topic')# queue不能指定名字,随机生成,当前文件执行n遍就会生成n个差别的名字result = channel.queue_declare(queue='', exclusive=True)queue_name = result.method.queueprint(queue_name)channel.queue_bind(exchange='logs2', queue=queue_name, routing_key='hello.#')print('
- [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body): print(" [x] %r" % body)channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True)channel.start_consuming()
复制代码 订阅者2
- import pika# 有暗码credentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80', credentials=credentials))channel = connection.channel()# exchange='logs2',exchange(秘书)的名称# exchange_type='topic',暗昧匹配channel.exchange_declare(exchange='logs2', exchange_type='topic')# queue不能指定名字,随机生成,当前文件执行n遍就会生成n个差别的名字result = channel.queue_declare(queue='', exclusive=True)queue_name = result.method.queueprint(queue_name)channel.queue_bind(exchange='logs2', queue=queue_name, routing_key='hello.*')print('
- [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body): print(" [x] %r" % body)channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True)channel.start_consuming()
复制代码 11、基于rabbitmq实现rpc
服务端
- import pikacredentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.80', credentials=credentials))channel = connection.channel()# 监听任务队列channel.queue_declare(queue='rpc_queue')def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2)def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) # props.reply_to 要放效果的队列 # props.correlation_id 任务 ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_qos(prefetch_count=1)channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)print(" [x] Awaiting RPC requests")channel.start_consuming()
复制代码 客户端
- import pikaimport uuidclass FibonacciRpcClient(object): def __init__(self): self.credentials = pika.PlainCredentials("admin", "admin") self.connection = pika.BlockingConnection( pika.ConnectionParameters('192.168.12.80', credentials=self.credentials)) self.channel = self.connection.channel() # 随机生成一个消息队列(用于吸收效果) result = self.channel.queue_declare(queue='', exclusive=True) self.callback_queue = result.method.queue # 监听消息队列中是否有值返回,如果有值则执行 on_response 函数(一旦有效果,则执行on_response) self.channel.basic_consume( queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) # 客户端 给 服务端 发送一个任务: 任务id = corr_id / 任务内容 = '30' / 用于吸收效果的队列名称 self.channel.basic_publish( exchange='', routing_key='rpc_queue', # 服务端吸收任务的队列名称 properties=pika.BasicProperties( reply_to=self.callback_queue, # 用于吸收效果的队列 correlation_id=self.corr_id, # 任务ID ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response)fibonacci_rpc = FibonacciRpcClient()print(" [x] Requesting fib(30)")response = fibonacci_rpc.call(30)print(" [.] Got %r" % response)
复制代码 12、python中的rpc框架
- 自带的: SimpleXMLRPCServer(数据包大,速度慢)第三方: ZeroRPC(底层使用ZeroMQ和MessagePack,速度快,响应时间短,并发高),grpc(谷歌推出支持夸语言)
复制代码 12.1、SimpleXMLRPCServer使用
服务端
- from xmlrpc.server import SimpleXMLRPCServerclass RPCServer(object): def __init__(self): super(RPCServer, self).__init__() print(self) self.send_data = 'hello world' self.recv_data = None def getObj(self): print('get data') return self.send_data def sendObj(self, data): print('send data') self.recv_data = data print(self.recv_data)# SimpleXMLRPCServerserver = SimpleXMLRPCServer(('localhost', 4242), allow_none=True)server.register_introspection_functions()server.register_instance(RPCServer())server.serve_forever()
复制代码 客户端
- import timefrom xmlrpc.client import ServerProxy# SimpleXMLRPCServerdef xmlrpc_client(): print('xmlrpc client') c = ServerProxy('http://localhost:4242') data = {'client:' + str(i): i for i in range(100)} start = time.perf_counter() for i in range(50): a = c.getObj() print(a) for i in range(50): c.sendObj(data) print('xmlrpc total time %s' % (time.perf_counter() - start))if __name__ == '__main__': xmlrpc_client()
复制代码 12.2、ZeroRPC使用
服务端
- import zerorpcclass RPCServer(object): def __init__(self): super(RPCServer, self).__init__() print(self) self.send_data = 'hello world' self.recv_data = None def getObj(self): print('get data') return self.send_data def sendObj(self, data): print('send data') self.recv_data = data print(self.recv_data)# zerorpcs = zerorpc.Server(RPCServer())s.bind('tcp://0.0.0.0:4243')s.run()
复制代码 客户端
- import zerorpcimport time# zerorpcdef zerorpc_client(): print('zerorpc client') c = zerorpc.Client() c.connect('tcp://127.0.0.1:4243') data = 'hello world' start = time.perf_counter() for i in range(500): a = c.getObj() print(a) for i in range(500): c.sendObj(data) print('total time %s' % (time.perf_counter() - start))if __name__ == '__main__': zerorpc_client()
复制代码 来源:https://blog.csdn.net/wcg920212/article/details/112003947
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |