python使用多线程threading模块长期循环运行内存泄漏问题解决

#编程技术 2022-04-26 15:45:00 | 全文 1818 字,阅读约需 4 分钟 | 加载中... 次浏览

👋 相关阅读


threading 是 python 中一个内置的多线程库,关于模块这里不做过多介绍,本文主要记录一下遇到的问题及解决方法。

一直以来都是用如下方法执行多线程任务,正常情况下是没问题。

import threading, time

## 限制线程的最大数量
threadmax = threading.BoundedSemaphore(16)
## 将锁内的代码串行化
lock = threading.Lock()
## 多线程 join 用
l = []


def do_something(i):
    '''
    ## 上锁的例子
    :param i:
    :return:
    '''
    with lock:  ## 上锁
        print(f"当前第 {i} 个任务正在执行")
    time.sleep(5)
    ## 释放信号量,可用信号量加一
    threadmax.release()


def do_something_unlock(i):
    '''
    ## 不上锁的例子
    :param i:
    :return:
    '''
    print(f"当前第 {i} 个任务正在执行")
    time.sleep(5)
    ## 释放信号量,可用信号量加一
    threadmax.release()


if __name__ == '__main__':
    for i in range(1000):
        ## 增加信号量,可用信号量减一
        threadmax.acquire()
        t = threading.Thread(target=do_something, args=(i,))
        t.start()
        l.append(t)
    for t in l:
        t.join()

    print('END')

但是工作中遇到一个需求,程序需要 24 小时运行,并且一直循环遍历某个列表并发去执行任务,所以就简单的在上边例子的基础上加了个 while 循环,如下:

import threading, time

## 限制线程的最大数量
threadmax = threading.BoundedSemaphore(16)
## 多线程 join 用
l = []


def do_something(i):
    '''
    ## 执行任务
    :param i:
    :return:
    '''
    print(f"当前第 {i} 个任务正在执行")
    time.sleep(5)
    ## 释放信号量,可用信号量加一
    threadmax.release()


if __name__ == '__main__':
	while 1:
		for i in range(1000):
			## 增加信号量,可用信号量减一
			threadmax.acquire()
			t = threading.Thread(target=do_something, args=(i,))
			t.start()
			l.append(t)
		for t in l:
			t.join()
    	print('end once')

运行几天之后发现问题,内存一直在增加,逐行测试之后,发现问题出在 L 这个变量上。

L 这个变量主要是用来存放需要 join 的任务的,在第一个 for 循环中,每个线程开始执行之后,都会存放到 L 这个列表里,用来支持第二个 for 循环中的 join 调用。

join 本质是用于堵塞当前主线程的类,其作用是阻止全部的线程执行完之前程序继续往下运行,直到被调用的线程全部执行完毕或者超时。

举个例子,do_something 这个函数需要执行一段时间,那么你多线程执行这个函数之后,如果没有 join,那么第一个 for 循环执行完之后就会接着往后执行,不会等待 do_something 这个函数执行完,也就是会直接执行执行 print('end once') 这行代码,代码示例如下:

import threading, time

## 限制线程的最大数量
threadmax = threading.BoundedSemaphore(16)
## 多线程 join 用
l = []


def do_something(i):
    '''
    ## 执行任务
    :param i:
    :return:
    '''
    print(f"当前第 {i} 个任务正在执行")
    time.sleep(2)
    print(f"当前第 {i} 个任务执行完成")
    ## 释放信号量,可用信号量加一
    threadmax.release()


if __name__ == '__main__':
    for i in range(10):
        ## 增加信号量,可用信号量减一
        threadmax.acquire()
        t = threading.Thread(target=do_something, args=(i,))
        t.start()
    print('end once')


## 控制台输出
当前第 0 个任务正在执行
当前第 1 个任务正在执行
当前第 2 个任务正在执行
当前第 3 个任务正在执行
当前第 4 个任务正在执行
当前第 5 个任务正在执行
当前第 6 个任务正在执行
当前第 7 个任务正在执行
当前第 8 个任务正在执行
当前第 9 个任务正在执行
end once

当前第 9 个任务执行完成
当前第 6 个任务执行完成当前第 7 个任务执行完成
当前第 5 个任务执行完成
当前第 4 个任务执行完成当前第 3 个任务执行完成当前第 2 个任务执行完成
当前第 8 个任务执行完成



当前第 0 个任务执行完成
当前第 1 个任务执行完成

Process finished with exit code 0

可以看到,print('end once') 这行代码并不是在最后执行的,如果想让 print('end once') 等到所有线程都运行完成后再执行,那就需要 join, 代码示例如下:

import threading, time

## 限制线程的最大数量
threadmax = threading.BoundedSemaphore(16)
## 多线程 join 用
l = []


def do_something(i):
    '''
    ## 执行任务
    :param i:
    :return:
    '''
    print(f"当前第 {i} 个任务正在执行")
    time.sleep(2)
    print(f"当前第 {i} 个任务执行完成")
    ## 释放信号量,可用信号量加一
    threadmax.release()


if __name__ == '__main__':
    for i in range(10):
        ## 增加信号量,可用信号量减一
        threadmax.acquire()
        t = threading.Thread(target=do_something, args=(i,))
        t.start()
        l.append(t)
    for t in l:
        t.join()
    print('end once')

## 控制台输出
当前第 0 个任务正在执行
当前第 1 个任务正在执行
当前第 2 个任务正在执行
当前第 3 个任务正在执行
当前第 4 个任务正在执行
当前第 5 个任务正在执行
当前第 6 个任务正在执行
当前第 7 个任务正在执行
当前第 8 个任务正在执行
当前第 9 个任务正在执行
当前第 7 个任务执行完成当前第 9 个任务执行完成当前第 2 个任务执行完成当前第 1 个任务执行完成
当前第 4 个任务执行完成当前第 6 个任务执行完成
当前第 3 个任务执行完成

当前第 5 个任务执行完成


当前第 0 个任务执行完成

当前第 8 个任务执行完成
end once

Process finished with exit code 0

这样就达到了我们的目的,这种写法一般情况下没有问题,但是如果在外边又套了一层 while,那就有问题了,因为 L 这个列表一直是在增加的,没有释放,所以导致内存一直增加。

找到问题根本就好说了,给他加一个释放

import threading, time

## 限制线程的最大数量
threadmax = threading.BoundedSemaphore(16)
## 多线程 join 用
l = []


def do_something(i):
    '''
    ## 执行任务
    :param i:
    :return:
    '''
    print(f"当前第 {i} 个任务正在执行")
    time.sleep(5)
    ## 释放信号量,可用信号量加一
    threadmax.release()


if __name__ == '__main__':
	while 1:
		for i in range(1000):
			## 增加信号量,可用信号量减一
			threadmax.acquire()
			t = threading.Thread(target=do_something, args=(i,))
			t.start()
			l.append(t)
		for t in l:
			t.join()
			l.remove(t)
    	print('end once')

加上 l.remove(t) 这一行,经测试内存稳定,如果不需要等待的话,也可以直接不 join。

·




×