2 回答
TA贡献1780条经验 获得超3个赞
您需要在多线程代码中进行一些可靠的并发控制,您需要确保按顺序发生一些事情:
您需要确保队列不会再次读取列表,直到线程从队列中插入的第一轮中清除为止。
pip
您需要确保线程不会同时改变相同的元素,这将导致其中一个线程引发异常,即它无法删除已删除的元素。
你可以利用 在多线程程序的流上放置一些控制,让我们声明一个名为 的事件,Queue 将等待这个事件得到满足,以便它开始其第二次迭代,以查询列表的所有元素。该事件将在成功从列表中删除 pip 后由您的一个线程设置。Event
first_iteration_processsed
代码示例:
import requests, threading, random, string, json, time, queue, re
from threading import Event
num_worker_threads = int(input("Threads: "))
lista = ['asd', 'asdjk', 'pip', 'lasd', 'lol']
print(str(lista))
iteration_processsed = Event()
iteration_processsed.set()
def do_work(i):
# global lista
try:
print(i.strip())
print(str(lista))
if i == "pip":
lista.remove("pip")
print("Removed pip successfully")
if i == "iter_end":
iteration_processsed.set()
except Exception as e:
print(e)
def worker():
while True:
item = q.get()
if item is not None:
do_work(item)
q.task_done()
q = queue.Queue()
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for i in range(3): # I have only put 2 cycles
iteration_processsed.wait()
print(f"Iteration {i} started")
iteration_processsed.clear()
for line in lista:
q.put(line)
q.put('iter_end')
q.join()
for i in range(num_worker_threads):
q.put(None)
for t in threads:
t.join()
让我们试试这个:
Threads: 2
['asd', 'asdjk', 'pip', 'lasd', 'lol']
Iteration 0 started
asd
['asd', 'asdjk', 'pip', 'lasd', 'lol']
asdjk
['asd', 'asdjk', 'pip', 'lasd', 'lol']
pip
['asd', 'asdjk', 'pip', 'lasd', 'lol']
Removed pip successfully
lasd
['asd', 'asdjk', 'lasd', 'lol']
lol
iter_end
['asd', 'asdjk', 'lasd', 'lol']
['asd', 'asdjk', 'lasd', 'lol']
Iteration 1 started
asd
asdjk
['asd', 'asdjk', 'lasd', 'lol']
['asd', 'asdjk', 'lasd', 'lol']
lasd
lol
['asd', 'asdjk', 'lasd', 'lol']
['asd', 'asdjk', 'lasd', 'lol']
iter_end
['asd', 'asdjk', 'lasd', 'lol']
Iteration 2 started
asd
asdjk
['asd', 'asdjk', 'lasd', 'lol']
['asd', 'asdjk', 'lasd', 'lol']
lasd
lol
['asd', 'asdjk', 'lasd', 'lol']
['asd', 'asdjk', 'lasd', 'lol']
iter_end
['asd', 'asdjk', 'lasd', 'lol']
现在,正如你所看到的,在删除pip之前,第二次迭代永远不会开始,当然,这里的实现离子非常特定于这种情况,但我想你可以把它调整到你自己的更一般的目的,也许添加更多事件来锁定更多的操作,以按照某种预定义的顺序执行。您可以从文档中阅读有关事件的更多信息,或者本文是一个良好的开端,https://www.bogotobogo.com/python/Multithread/python_multithreading_Event_Objects_between_Threads.php
TA贡献1876条经验 获得超7个赞
您的代码具有争用条件。比赛在以下方面之间进行:
主线程,它将尝试向队列添加两次,每次迭代一次。
"pip"
lista
工作线程,它们集体从队列中提取项目并对其进行处理。如果处理某个条目,则会将其从 中删除。
"pip"
lista
如果第一个条目由工作线程处理,并在迭代并添加第二个条目之前从中删除,则不会再次处理它。不过,从你对这个问题的描述来看,我猜想,由于 GIL 的某种组合和代码不同部分的计时,在工作人员可以对此做任何事情之前,你非常一致地将 两个副本添加到队列中。但这并不能得到严格保证。如果在主线程的迭代之间添加了延迟,则可能会让工作线程有时间按预期赶上并删除:"pip"
lista
"pip"
"pip"
lista
"pip"
for i in range(2):
for line in lista:
q.put(line)
time.sleep(1) # delay for a second
显然,这可能不是你真正想做的,因为它仍然是一场比赛,只是一场现在可能被其他赛车手赢得的比赛(不能保证其他线程也不会花费太长时间)。更好的解决方案是设计代码,使其完全没有争用条件。
一个想法可能是在列表迭代之间的队列中,以便在再次重新添加所有值之前由工作线程处理它们。这与方法非常相似,但它更可靠,因为主线程将等待,只要工人在添加更多项目之前处理队列中的项目。join
time.sleep
另一个想法可能是在将值放入队列之前,在主线程中过滤条目。只是一个检查可能会做到这一点!"pip"
if line != "pip"
添加回答
举报