Python 线程模块thread和threading

线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。

如果对进程、线程概念还不是很了解,建议看下CPU、超线程与进程、线程

Python的标准库提供了两个模块:threadthreading,前者是低级模块,后者是高级模块。threadingthread进行了封装,绝大多数情况下,我们只需要使用threading就可以了,这也是Python官方推荐。

Python 3中,thread已经更名成_thread。

thread

总结几点:

  1. 调用start_new_thread方法即启动线程,启动后MainThread必须等待,否则启动的子线程旋即退出。
  2. thread.exit()thread.interrupt_main()都是通过raise异常实现的。
  3. lock.acquire([waitflag])方法,当waitflag等于0时,表示非阻塞获取锁,即获取失败也立即返回。所以下面的demo中,worker方法中的lock.acquire(0)改为lock.acquire(1)时,运行时间将比现在多10s。
  4. 锁的releaseacquire可以不是同一个线程。
  5. Py3获取锁的方法lock.acquire新增了timeout参数,并且设置的值不能大于TIMEOUT_MAX

以下demo在Python2.7和Python3.5测试通过。

# coding=utf-8
"""
    Demo running on Py2 and Py3
"""

import sys
import time
from datetime import datetime
import traceback

try:
    import thread
except ImportError:
    import _thread as thread


def worker(run_num=3, internal=2, max_num=11, lock=None):
    """
    干活的worker
    :param run_num:
    :param internal:
    :param max_num:
    :param lock:
    :return:
    """

    if isinstance(lock, thread.LockType):
        """
            Py2
            lock.acquire([waitflag])
            无参数时, 无条件获取锁, 无法获取时, 会被阻塞,直到锁被释放
            有参数时, waitflag = 0 时,表示只有在不需要等待的情况下才获取锁, 非零情况与上面相同
            返回值 : 获得锁成功返回True, 获得锁失败返回False
            Py3
            lock.acquire(waitflag=1, timeout=-1)
            waitflag=0时,可以设置阻塞等待锁的超时时间(设置的值不能超过_thread.TIMEOUT_MAX)
        """
        if not lock.acquire(0):
            print(
                "Thread: {0} 获取锁失败!! 当前时间: {1}".format(
                    thread.get_ident(), datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
                )
            )
            return

    for i in range(run_num):
        if i >= max_num:
            print("Thread {0} 运行次数已达上限({1}),即将退出".format(thread.get_ident(), max_num))
            thread.interrupt_main()  # raise KeyboardInterrupt
            thread.exit()  # raise SystemExit
        print(
            "Thread: {0}, 当前时间: {1}".format(
                thread.get_ident(), datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
            )
        )
        time.sleep(internal)

    if isinstance(lock, thread.LockType) and lock.locked():
        # 如果锁的状态是unlock,调用lock.release()会抛RuntimeError异常
        lock.release()


def demo_no_lock():
    """
    未使用锁的demo
    :return:
    """

    work1_ident = thread.start_new_thread(worker, ())
    print("Thread work1 identifier: {0}".format(work1_ident))

    work2_ident = thread.start_new_thread(worker, (), dict(run_num=15, internal=1))
    print("Thread work2 identifier: {0}".format(work2_ident))

    # sleep时间必须足够长,等待子线程结束
    time.sleep(20)


def demo_lock():
    """
    使用锁的demo
    :return:
    """

    # 获取2个互斥锁
    locks = map(lambda _: thread.allocate_lock(), range(2))
    # 兼容py3,py3中map函数返回值为迭代器iterator
    if not isinstance(locks, list):
        locks = [i for i in locks]

    work1_ident = thread.start_new_thread(worker, (), dict(lock=locks[0]))
    print("Thread work1 identifier: {0}".format(work1_ident))

    work2_ident = thread.start_new_thread(
        worker, (), dict(run_num=10, internal=1, lock=locks[1])
    )
    print("Thread work2 identifier: {0}".format(work2_ident))

    # work3与work2共用一把锁,所以work3会等work2运行结束释放锁后才开始执行
    work3_ident = thread.start_new_thread(
        worker, (), dict(run_num=10, internal=1, lock=locks[1])
    )
    print("Thread work3 identifier: {0}".format(work3_ident))

    wait = True
    while wait:
        """
            lock.locked()
            返回值 : 如果锁已经被某个线程获取,返回True, 否则为False
        """
        if any(map(lambda x: x.locked(), locks)):
            print("MainThread 还有线程未结束, 继续等待")
        else:
            wait = False
        time.sleep(1)


if __name__ == "__main__":

    begin_time = time.time()

    version = "unlock"
    if len(sys.argv) > 1:
        version = sys.argv[1]
    try:
        # 未加锁版本
        if version == "unlock":
            demo_no_lock()

        # 加锁版本
        elif version == "lock":
            demo_lock()

    except KeyboardInterrupt:
        # 处理thread.interrupt_main()抛出的异常
        print(traceback.format_exc())

    print(
        "MainThread 运行结束, 耗时: {0}(ms)".format(
            round((time.time() - begin_time) * 1000, 2)
        )
    )

threading

# coding=utf-8

"""
    Demo running on Py2
"""

import sys
import time
import traceback
from datetime import datetime
import thread
import threading
import logging

logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)


def worker(run_num=3, internal=2, max_num=11, lock=None):
    """
    干活的worker
    """

    if isinstance(lock, thread.LockType) or isinstance(lock, threading._RLock):
        if not lock.acquire(True):
            print(
                "Thread: {0} 获取锁失败!! 当前时间: {1}".format(
                    thread.get_ident(), datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
                )
            )
            return

    current_thread = threading.current_thread()
    for i in range(run_num):
        print(
            "Thread: {0}-{1}, 当前时间: {2}".format(
                current_thread.name,
                current_thread.ident,
                datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"),
            )
        )
        time.sleep(internal)

    if isinstance(lock, thread.LockType) or isinstance(lock, threading._RLock):
        # 如果锁的状态是unlock,调用lock.release()会抛RuntimeError异常
        lock.release()


def demo_no_lock():
    """
    未使用锁的demo
    :return:
    """

    work1 = threading.Thread(target=worker, name="work1", args=(5, 1))
    work1.setDaemon(True)

    work2 = threading.Thread(
        target=worker, name="work2", kwargs=dict(run_num=5, internal=3)
    )
    work2.setDaemon(False)

    print("Thread work1 start running...")
    work1.start()

    print("Thread work2 start running...")
    work2.start()

    """
    threading线程可以设置是否为守护线程
    如果设置为守护线程,才需要sleep一段时间,等待子线程结束
    """
    time.sleep(8)


def demo_lock():
    """
    使用锁的demo
    :return:
    """
    # 获取2个互斥锁
    locks = map(lambda _: threading.Lock(), range(2))

    work3 = threading.Thread(target=worker, name="work3", kwargs=dict(lock=locks[0]))

    work4 = threading.Thread(
        target=worker, name="work4", kwargs=dict(run_num=10, internal=1, lock=locks[1])
    )

    # work4与work5共用一把锁,所以work5会等work4运行结束释放锁后才开始执行
    work5 = threading.Thread(
        target=worker, name="work5", kwargs=dict(run_num=10, internal=1, lock=locks[1])
    )

    print("Thread work3 start running...")
    work3.start()

    print("Thread work4 start running...")
    work4.start()

    print("Thread work5 start running...")
    work5.start()

    work3.join()
    work4.join()
    work5.join()


def demo_monitor():

    work6 = threading.Thread(target=worker, name="work6", kwargs=dict(run_num=5))
    work6.daemon = True
    work6.start()

    while work6.is_alive():
        print("Thread work6 is still running...")
        time.sleep(1)


class CountDownThread(threading.Thread):

    def __init__(self, *args, **kwargs):
        self.times = kwargs.pop("times", 10)
        super(CountDownThread, self).__init__(*args, **kwargs)

    def run(self):
        while self.times > 0:
            print(
                "Thread {0} T-minus {1} seconds".format(
                    threading.current_thread().name, self.times
                )
            )
            self.times -= 1
            time.sleep(1)


def demo_subclass():
    work7 = CountDownThread(name="work7", times=8)
    work7.start()


if __name__ == "__main__":

    begin_time = time.time()

    version = "unlock"
    if len(sys.argv) > 1:
        version = sys.argv[1]

    try:
        # 未加锁版本
        if version == "unlock":
            demo_no_lock()

        # 加锁版本
        elif version == "lock":
            demo_lock()

        # monitor版本
        elif version == "monitor":
            demo_monitor()

        # subclass版本
        elif version == "subclass":
            demo_subclass()

    except:
        print(traceback.format_exc())

    print(
        "MainThread 运行结束, 耗时: {0}(ms)".format(
            round((time.time() - begin_time) * 1000, 2)
        )
    )

GIL

既然用到Python的多线程,必须得知道GIL的存在。

GIL(Global Interpreter Lock)

启动与CPU核心数量相同的N个线程,在4核CPU上可以监控到CPU占用率仅有160%,也就是使用不到两核。

即使启动100个线程,使用率也就170%左右,仍然不到两核。

但是用C、C++或Java来改写相同的死循环,直接可以把全部核心跑满,4核就跑到400%,8核就跑到800%,为什么Python不行呢?

因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。

所以,在Python中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。

不过,也不用过于担心,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。

参考资料:
Python 2 thread 官方文档
Python 3 _thread 官方文档
Python 3 _thread 中文译文
Pythoh 2 threading 官方文档
Pythoh 3 threading 官方文档
Pythoh 3 threading 中文译文
Python 3 cookbook 并发编程
深入 GIL: 如何寫出快速且 thread-safe 的 Python – Grok the GIL: How to write fast and thread-safe Python

多线程
Python多线程之threading Event
【Python】threading.Event模块控制多线程
Launching parallel tasks
python多线程间通信机制-event