python之上下文管理、redis的发布订阅、rabbitmq

it2024-10-13  26

使用with打开文件的方式,是调用了上下文管理的功能

1 #打开文件的两种方法: 2 3 f = open('a.txt','r') 4 5 with open('a.txt','r') as f 6 7 实现使用with关闭socket 8 import contextlib 9 import socket 10 11 @contextlib.contextmanage 12 def Sock(ip,port): 13 socket = socket.socket() 14 socket.bind((ip,port)) 15 socket.listen(5) 16 try: 17 yield socket 18 finally: 19 socket.close() 20 21 #执行Sock函数传入参数,执行到yield socket返回值给s,执行with语句体,执行finally后面的语句 22 with Sock('127.0.0.1',8000) as s: 23 print(s)

redis的发布订阅

class RedisHelper: def __init__(self): #调用类时自动连接redis self.__conn = redis.Redis(host='192.168.1.100') def public(self, msg, chan): self.__conn.publish(chan, msg) return True def subscribe(self, chan): pub = self.__conn.pubsub() pub.subscribe(chan) pub.parse_response() return pub #订阅者 import s3 obj = s3.RedisHelper() data = obj.subscribe('fm111.7') print(data.parse_response()) #发布者 import s3 obj = s3.RedisHelper() obj.public('alex db', 'fm111.7')

 RabbitMQ

1 #消费者 2 import pika 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1')) 5 channel = connection.channel()#创建对象 6 7 channel.queue_declare(queue = 'wocao') 8 def callback(ch,method,properties,body): 9 print("[x] Received %r"%body) 10 11 channel.basic_consume(callback,queue = 'wocao',no_ack = True) 12 print('[*] Waiting for messages. To exit press CTRL+C') 13 channel.start_consuming() 14 15 #生产者 16 import pika 17 connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1')) 18 channel = connection.channel() 19 channel.queue_declare(queue = 'wocao')#指定一个队列,不存在此队列则创建 20 channel.basic_publish(exchange = '',routing_key = 'wocao',body = 'hello world!') 21 print("[x] Sent 'hello world!") 22 connection.close()

 exchange type类型

#生产者 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.11.87')) channel = connection.channel() #fanout类型,对绑定该exchange的队列实行广播 channel.exchange_declare(exchange='logs_fanout', type='fanout') # 随机创建队列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue # 绑定exchange channel.queue_bind(exchange='logs_fanout', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming() #消费者 import pika #发送方 connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.11.87')) channel = connection.channel() channel.exchange_declare(exchange='logs_fanout', type='fanout') message = "what's the fuck" #设置exchange的名 channel.basic_publish(exchange='logs_fanout', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close() 1 #根据关键字发送指定队列 2 #生产者(发布者) 3 import pika 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host = '127.0.0.1')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='direct_logs_1', 9 type='direct') # 关键字发送到队列 10 #对error关键字队列发送指令 11 severity = 'error' 12 message = '123' 13 channel.basic_publish(exchange = 'direct_logs_1', 14 routing_key = severity, 15 body = message) 16 print('[x] Sent %r:%r'%(severity,message)) 17 connection.close() 18 #消费者(订阅者) 19 import pika 20 #消费者 21 connection = pika.BlockingConnection(pika.ConnectionParameters( 22 host = '127.0.0.1')) 23 channel = connection.channel() 24 channel.exchange_declare(exchange='direct_logs_1', 25 type = 'direct')#关键字发送到队列 26 27 result = channel.queue_declare(exclusive=True) 28 queue_name = result.method.queue 29 serverities = ['error','info','warning'] 30 for severity in serverities: 31 channel.queue_bind(exchange='direct_logs_1', 32 queue = queue_name, 33 routing_key = severity) 34 def callback(ch,method,properties,body): 35 print('[x] %r:%r'%(method.routing_key,body)) 36 37 channel.basic_consume(callback, 38 queue = queue_name, 39 no_ack = True) 40 channel.start_consuming() 1 #实现消息不丢失接收方 2 import pika 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.211.55.4')) 4 channel = connection.channel() 5 channel.queue_declare(queue = 'hello') 6 7 def callback(ch,method,properties,body): 8 print('redeived %s'%body) 9 import time 10 time.sleep(10) 11 print('ok') 12 ch.basic_ack(delivery_tag= method.delivery_tag) 13 #no_ack = False接收方接受完请求后发送给对方一个接受成功的信号,如果没收到mq会重新将任务放到队列 14 channel.basic_consume(callback,queue = 'hello',no_ack=False) 15 print(' Waiting for messages.To exit press CTRL+C') 16 channel.start_consuming() 1 #发送方 2 #实现消息不丢失 3 import pika 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.211.55.4')) 5 channel = connection.channel() 6 channel.queue_declare(queue = 'hello',durable = True) 7 channel.basic_publish(exchange = '',routing_key = 'hello world', 8 properties = pika.BasicProperties( 9 delivery_mode=2, 10 ))#发送方不丢失,发送方保持持久化 11 print(' Waiting for messages.To exit press CTRL+C') 12 channel.start_consuming() 1 #接收方 2 import pika 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.100')) 5 channel = connection.channel() 6 7 8 channel.queue_declare(queue='hello', durable=True) 9 def callback(ch, method, properties, body): 10 print(" [x] Received %r" % body) 11 import time 12 time.sleep(10) 13 print 'ok' 14 ch.basic_ack(delivery_tag = method.delivery_tag) 15 channel.basic_consume(callback, 16 queue='hello', 17 no_ack=False) 18 channel.start_consuming()

RabbitMQ队列中默认情况下,接收方从队列中获取消息是顺序的,例如:接收方1只从队列中获取奇数的任务,接收方2只从队列中获取偶数任务

import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.100')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print 'ok' ch.basic_ack(delivery_tag = method.delivery_tag) #表示队列不分奇偶分配,谁来取任务就给谁 channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='hello', no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()

 

 

 

 

 

 

 

 

RabbitMQ会重新将该任务添加到队列中

转载于:https://www.cnblogs.com/liguangxu/p/5704390.html

最新回复(0)