1 回答
TA贡献1842条经验 获得超12个赞
在开始讨论之前,我想强调一下 Azure 服务总线和 Celery 之间的差异。
Azure服务总线:
Microsoft Azure 服务总线是完全托管的企业集成消息代理。
芹菜 :
分布式任务队列。Celery是一个基于分布式消息传递的异步任务队列/作业队列。
对于你的情况我可以想到两种可能性:
您希望将 Service Bus 与 Celery 一起使用来代替其他消息代理。
用服务总线替换 Celery
1:您希望将 Service Bus 与 Celery 一起使用来代替其他消息代理。
2:完全用Service Bus替换Celery 以满足您的要求:
考虑
消息发送者是生产者
消息接收者是消费者
这是您必须处理的两个不同的应用程序。
解释 :
每次您想要执行操作时,您都可以从生产者客户端向主题发送消息。
消费者客户端 - 正在侦听的应用程序将接收消息并处理该消息。您可以将自定义流程附加到它 - 这样,只要消费者客户端收到消息,您的自定义流程就会执行。
以下是接收客户端的示例:
from azure.servicebus.aio import SubscriptionClient
import asyncio
import nest_asyncio
nest_asyncio.apply()
Receiving = True
#Topic 1 receiver :
conn_str= "<>"
name="Allmessages1"
SubsClient = SubscriptionClient.from_connection_string(conn_str, name)
receiver = SubsClient.get_receiver()
async def receive_message_from1():
await receiver.open()
print("Opening the Receiver for Topic1")
async with receiver:
while(Receiving):
msgs = await receiver.fetch_next()
for m in msgs:
print("Received the message from topic 1.....")
##### - Your code to execute when a message is received - ########
print(str(m))
##### - Your code to execute when a message is received - ########
await m.complete()
loop = asyncio.get_event_loop()
topic1receiver = loop.create_task(receive_message_from1())
下面一行之间的部分是每次收到消息时将执行的指令。
##### - Your code to execute when a message is received - ########
添加回答
举报