Python中的协程、线程和进程

通过一个简单的生产-消费模型,来体会一下Python中协程、线程及进程的使用。

首先是协程。分为Python2.5中yield/send实现的生成器和Python3.5中async/await进一步完善的协程。看代码和输出:

class YieldTest(object):
    @staticmethod
    def consumer(name):
        n = ''
        while True:
            n = yield n
            print(f"消费者{name}: 处理消息--{n}"
            sleep(1)
            n = f"消费者{name}处理完毕"

    @staticmethod
    def producer(cs, n):
        for i in range(n):
            print(f"生产者: 产生消息--{i}")
            r = cs[i % len(cs)].send(i)
            print(f"生产者: 收到回复--{r}")

    @classmethod
    def run(cls):
        start = time()
        cs = [cls.consumer(c) for c in range(2)]
        for c in cs: c.send(None)
        cls.producer(cs, 6)
        for c in cs: c.close()
        end = time()
        print(f"YieldTest共耗时:{end-start:.2f}s")
生产者: 产生消息--0
消费者0: 处理消息--0
生产者: 收到回复--消费者0处理完毕
生产者: 产生消息--1
消费者1: 处理消息--1
生产者: 收到回复--消费者1处理完毕
生产者: 产生消息--2
消费者0: 处理消息--2
生产者: 收到回复--消费者0处理完毕
生产者: 产生消息--3
消费者1: 处理消息--3
生产者: 收到回复--消费者1处理完毕
生产者: 产生消息--4
消费者0: 处理消息--4
生产者: 收到回复--消费者0处理完毕
生产者: 产生消息--5
消费者1: 处理消息--5
生产者: 收到回复--消费者1处理完毕
YieldTest共耗时:6.02s

所以说yield/send是个伪协程,它只是提供了在yield处保存现场并切换控制权,仍然需要其他函数调用send/next来触发,另外它并没有提供并发的能力,主线程需要等待生成器调用,包括IO等待,从结果上看耗时没有任何降低。生成器只能降低内存开销,无法提供并发能力,上面的生产-消费代码也是强耦合。如果想要通过yield/send提高并发,需要自己实现事件驱动的异步IO,比较麻烦。因此Python后续进一步完善,提供了asyncio和其他异步库。async/await代码及结果如下:

class AsyncTest(object):

    @staticmethod
    async def consumer(name, queue):
        while True:
            n = await queue.get()
            print(f"消费者{name}: 处理消息--{n}")
            await asyncio.sleep(1)
            queue.task_done()

    @staticmethod
    async def producer(queue, n):
        for i in range(n):
            print(f"生产者: 产生消息--{i}")
            await queue.put(i)

    @staticmethod
    async def main(n):
        q = asyncio.Queue()
        cs = [asyncio.ensure_future(AsyncTest.consumer(c, q)) for c in range(2)]
        await AsyncTest.producer(q, n)
        await q.join()
        for c in cs: c.cancel()

    @classmethod
    def run(cls):
        start = time()
        loop = asyncio.get_event_loop()
        loop.run_until_complete(cls.main(6))
        loop.close()
        end = time()
        print(f"AsyncTest共耗时:{end-start:.2f}s")
生产者: 产生消息--0
生产者: 产生消息--1
生产者: 产生消息--2
生产者: 产生消息--3
生产者: 产生消息--4
生产者: 产生消息--5
消费者0: 处理消息--0
消费者1: 处理消息--1
消费者0: 处理消息--2
消费者1: 处理消息--3
消费者0: 处理消息--4
消费者1: 处理消息--5
AsyncTest共耗时:3.01s

在这里通过队列解耦了生产者和消费者,通过异步IO提供并发,两个消费者使得处理时间减半。下面再看看多线程:

class ThreadTest(object):

    @staticmethod
    def consumer(name, queue):
        while True:
            n = queue.get()
            if n is None:
                break
            print(f"消费者{name}: 处理消息--{n}")
            sleep(1)

    @staticmethod
    def producer(queue, n, size):
        for i in range(n):
            print(f"生产者: 产生消息--{i}")
            queue.put(i)
        # 发送None结束consumer线程
        for i in range(size):
            queue.put(None)

    @classmethod
    def run(cls):
        start = time()
        q = queue.Queue()
        cs = [threading.Thread(target=ThreadTest.consumer, args=(c, q)) for c in range(2)]
        for c in cs: c.start()
        p = threading.Thread(target=ThreadTest.producer, args=(q, 6, len(cs)))
        p.start()
        for c in cs: c.join()
        end = time()
        print(f"ThreadTest共耗时:{end-start:.2f}s")
生产者: 产生消息--0
生产者: 产生消息--1
消费者0: 处理消息--0
生产者: 产生消息--2
生产者: 产生消息--3
生产者: 产生消息--4
生产者: 产生消息--5
消费者1: 处理消息--1
消费者0: 处理消息--2
消费者1: 处理消息--3
消费者0: 处理消息--4
消费者1: 处理消息--5
ThreadTest共耗时:3.01s

由于GIL的存在,Python多线程也是阉割版的,跑IO密集程序还可以,跑CPU密集就是浪费切换时间了。在此处也和协程一样,有效降低了处理时间。

最后是多进程,结构和多线程类似,最终处理时间增加也说明了新建进程开销还是很大滴:

class ProcessTest(object):

    @classmethod
    def run(cls):
        start = time()
        q = multiprocessing.Queue()
        cs = [multiprocessing.Process(target=ThreadTest.consumer, args=(c, q)) for c in range(2)]
        for c in cs: c.start()
        p = multiprocessing.Process(target=ThreadTest.producer, args=(q, 6, len(cs)))
        p.start()
        for c in cs: c.join()
        end = time()
        print(f"ProcessTest共耗时:{end-start:.2f}s")
生产者: 产生消息--0
生产者: 产生消息--1
生产者: 产生消息--2
生产者: 产生消息--3
生产者: 产生消息--4
消费者0: 处理消息--0
消费者1: 处理消息--1
生产者: 产生消息--5
消费者1: 处理消息--2
消费者0: 处理消息--3
消费者0: 处理消息--4
消费者1: 处理消息--5
ProcessTest共耗时:3.04s
Loading Disqus comments...
Table of Contents