1 回答
TA贡献1827条经验 获得超4个赞
您可以运行分布式系统并通过中央排队系统传递数据。采用 Unix 哲学并创建一个单一的应用程序,它可以完成一些任务并且做得很好。创建一个嗅探数据包的应用程序(您可以scapy在此处使用,因为如果您阻止任何内容都无关紧要),然后将它们发送到队列(RabitMQ、Redis、SQS 等)并让另一个应用程序处理来自队列的数据包。这种方法应该给你带来最少的头痛。
如果您需要在单个应用程序中运行所有内容,那么线程/多处理是唯一的选择。但是有一些设计模式你会想要遵循。您还可以将以下代码分解为单独的函数并使用专用的排队系统。
from threading import Thread
from time import sleep
from twisted.internet import defer, reactor
class Sniffer(Thread):
def __init__(self, _reactor, shared_queue):
super().__init__()
self.reactor = _reactor
self.shared_queue = shared_queue
def run(self):
"""
Sniffer logic here
"""
while True:
self.reactor.callFromThread(self.shared_queue.put, 'hello world')
sleep(5)
@defer.inlineCallbacks
def consume_from_queue(_id, _reactor, shared_queue):
item = yield shared_queue.get()
print(str(_id), item)
_reactor.callLater(0, consume_from_queue, _id, _reactor, shared_queue)
def main():
shared_queue = defer.DeferredQueue()
sniffer = Sniffer(reactor, shared_queue)
sniffer.daemon = True
sniffer.start()
workers = 4
for i in range(workers):
consume_from_queue(i+1, reactor, shared_queue)
reactor.run()
main()
该Sniffer班开始扭曲的控制之外。请注意sniffer.daemon = True,这是为了在主线程停止时线程将停止。如果它被设置为False(默认),那么只有当所有线程都结束时,应用程序才会退出。根据手头的任务,这可能总是也可能不总是可能的。如果您可以暂停嗅探以检查线程事件,那么您也许能够以更安全的方式停止线程。
self.reactor.callFromThread(self.shared_queue.put, 'hello world')是必要的,以便放入队列的项目发生在主反应器线程中,而不是Sniffer执行的线程中。这样做的主要好处是来自线程的消息会有某种同步(假设您计划扩展到嗅探多个接口)。另外,我不确定DeferredQueue对象是否是线程安全的 :) 我像对待它们一样对待它们。
由于在这种情况下 Twisted 不管理线程,因此开发人员管理至关重要。注意worker循环和consume_from_queue(i+1, reactor, shared_queue)。此循环确保只有所需数量的工作人员正在处理任务。在consume_from_queue()函数内部,shared_queue.get()将等待(非阻塞),直到将一个项目放入队列,打印该项目,然后安排另一个consume_from_queue().
添加回答
举报