threading 是 python 中一个内置的多线程库,关于模块这里不做过多介绍,本文主要记录一下遇到的问题及解决方法。
一直以来都是用如下方法执行多线程任务,正常情况下是没问题。
import threading, time
## 限制线程的最大数量
threadmax = threading.BoundedSemaphore(16)
## 将锁内的代码串行化
lock = threading.Lock()
## 多线程 join 用
l = []
def do_something(i):
'''
## 上锁的例子
:param i:
:return:
'''
with lock: ## 上锁
print(f"当前第 {i} 个任务正在执行")
time.sleep(5)
## 释放信号量,可用信号量加一
threadmax.release()
def do_something_unlock(i):
'''
## 不上锁的例子
:param i:
:return:
'''
print(f"当前第 {i} 个任务正在执行")
time.sleep(5)
## 释放信号量,可用信号量加一
threadmax.release()
if __name__ == '__main__':
for i in range(1000):
## 增加信号量,可用信号量减一
threadmax.acquire()
t = threading.Thread(target=do_something, args=(i,))
t.start()
l.append(t)
for t in l:
t.join()
print('END')
但是工作中遇到一个需求,程序需要 24 小时运行,并且一直循环遍历某个列表并发去执行任务,所以就简单的在上边例子的基础上加了个 while 循环,如下:
import threading, time
## 限制线程的最大数量
threadmax = threading.BoundedSemaphore(16)
## 多线程 join 用
l = []
def do_something(i):
'''
## 执行任务
:param i:
:return:
'''
print(f"当前第 {i} 个任务正在执行")
time.sleep(5)
## 释放信号量,可用信号量加一
threadmax.release()
if __name__ == '__main__':
while 1:
for i in range(1000):
## 增加信号量,可用信号量减一
threadmax.acquire()
t = threading.Thread(target=do_something, args=(i,))
t.start()
l.append(t)
for t in l:
t.join()
print('end once')
运行几天之后发现问题,内存一直在增加,逐行测试之后,发现问题出在 L 这个变量上。
L 这个变量主要是用来存放需要 join 的任务的,在第一个 for 循环中,每个线程开始执行之后,都会存放到 L 这个列表里,用来支持第二个 for 循环中的 join 调用。
join 本质是用于堵塞当前主线程的类,其作用是阻止全部的线程执行完之前程序继续往下运行,直到被调用的线程全部执行完毕或者超时。
举个例子,do_something 这个函数需要执行一段时间,那么你多线程执行这个函数之后,如果没有 join,那么第一个 for 循环执行完之后就会接着往后执行,不会等待 do_something 这个函数执行完,也就是会直接执行执行 print('end once')
这行代码,代码示例如下:
import threading, time
## 限制线程的最大数量
threadmax = threading.BoundedSemaphore(16)
## 多线程 join 用
l = []
def do_something(i):
'''
## 执行任务
:param i:
:return:
'''
print(f"当前第 {i} 个任务正在执行")
time.sleep(2)
print(f"当前第 {i} 个任务执行完成")
## 释放信号量,可用信号量加一
threadmax.release()
if __name__ == '__main__':
for i in range(10):
## 增加信号量,可用信号量减一
threadmax.acquire()
t = threading.Thread(target=do_something, args=(i,))
t.start()
print('end once')
## 控制台输出
当前第 0 个任务正在执行
当前第 1 个任务正在执行
当前第 2 个任务正在执行
当前第 3 个任务正在执行
当前第 4 个任务正在执行
当前第 5 个任务正在执行
当前第 6 个任务正在执行
当前第 7 个任务正在执行
当前第 8 个任务正在执行
当前第 9 个任务正在执行
end once
当前第 9 个任务执行完成
当前第 6 个任务执行完成当前第 7 个任务执行完成
当前第 5 个任务执行完成
当前第 4 个任务执行完成当前第 3 个任务执行完成当前第 2 个任务执行完成
当前第 8 个任务执行完成
当前第 0 个任务执行完成
当前第 1 个任务执行完成
Process finished with exit code 0
可以看到,print('end once')
这行代码并不是在最后执行的,如果想让 print('end once')
等到所有线程都运行完成后再执行,那就需要 join, 代码示例如下:
import threading, time
## 限制线程的最大数量
threadmax = threading.BoundedSemaphore(16)
## 多线程 join 用
l = []
def do_something(i):
'''
## 执行任务
:param i:
:return:
'''
print(f"当前第 {i} 个任务正在执行")
time.sleep(2)
print(f"当前第 {i} 个任务执行完成")
## 释放信号量,可用信号量加一
threadmax.release()
if __name__ == '__main__':
for i in range(10):
## 增加信号量,可用信号量减一
threadmax.acquire()
t = threading.Thread(target=do_something, args=(i,))
t.start()
l.append(t)
for t in l:
t.join()
print('end once')
## 控制台输出
当前第 0 个任务正在执行
当前第 1 个任务正在执行
当前第 2 个任务正在执行
当前第 3 个任务正在执行
当前第 4 个任务正在执行
当前第 5 个任务正在执行
当前第 6 个任务正在执行
当前第 7 个任务正在执行
当前第 8 个任务正在执行
当前第 9 个任务正在执行
当前第 7 个任务执行完成当前第 9 个任务执行完成当前第 2 个任务执行完成当前第 1 个任务执行完成
当前第 4 个任务执行完成当前第 6 个任务执行完成
当前第 3 个任务执行完成
当前第 5 个任务执行完成
当前第 0 个任务执行完成
当前第 8 个任务执行完成
end once
Process finished with exit code 0
这样就达到了我们的目的,这种写法一般情况下没有问题,但是如果在外边又套了一层 while,那就有问题了,因为 L 这个列表一直是在增加的,没有释放,所以导致内存一直增加。
找到问题根本就好说了,给他加一个释放
import threading, time
## 限制线程的最大数量
threadmax = threading.BoundedSemaphore(16)
## 多线程 join 用
l = []
def do_something(i):
'''
## 执行任务
:param i:
:return:
'''
print(f"当前第 {i} 个任务正在执行")
time.sleep(5)
## 释放信号量,可用信号量加一
threadmax.release()
if __name__ == '__main__':
while 1:
for i in range(1000):
## 增加信号量,可用信号量减一
threadmax.acquire()
t = threading.Thread(target=do_something, args=(i,))
t.start()
l.append(t)
for t in l:
t.join()
l.remove(t)
print('end once')
加上 l.remove(t)
这一行,经测试内存稳定,如果不需要等待的话,也可以直接不 join。