多进程

一、关于多进程的介绍

  • 1、python由于GIL的存在,python中的多线程其实并不是真正的多线程(多个线程在抢占资源),如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。
  • 2、multiprocessing包是Python中的多进程管理包,实现方式及APIthreading.Thread类似。

二、创建多进程的方式(和多线程的方式差不多一致)

  • 1、使用函数的方式创建多进程

    from multiprocessing import Process
    import time
    
    def foo(name):
        time.sleep(1)
        print('hello', name, time.time())
    
    if __name__ == "__main__":
        process_list = []
        for i in range(5):
            p = Process(target=foo, args=('张三',))
            process_list.append(p)
            p.start()
    
        for i in process_list:
            p.join()
    
        print('结束进程')
    
  • 2、使用类的方式创建多进程

    from multiprocessing import Process
    import time
    
    class MyProcess(Process):
        def __init__(self):
            super().__init__(name='进程')
    
        def run(self):
            time.sleep(1)
            print('hello', self.name, self.pid, self.is_alive(), time.ctime())
    
    if __name__ == "__main__":
        process_list = []
        for i in range(5):
            p = MyProcess()
            process_list.append(p)
            p.start()
    
        for i in process_list:
            p.join()
    
        print('结束进程')
    

三、关于Process类的常用方法(可以查看上面使用类的方式创建的进程)

  • 1、构造方法:Process([group [, target [, name [, args [, kwargs]]]]])

    • group: 线程组,目前还没有实现,库引用中提示必须是None;
    • target: 要执行的方法;
    • name: 进程名;
    • args/kwargs: 要传入方法的参数。
  • 2、实例方法:

    • is_alive():返回进程是否在运行。
    • join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
    • start():进程准备就绪,等待CPU调度
    • run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
    • terminate():不管任务是否完成,立即停止工作进程
  • 3、属性:

    • daemon: 和线程的setDeamon功能一样
    • name: 进程名字。
    • pid: 进程号。

四、进程之间的通讯

  • 方式一、使用进程队列(注意不是线程队列)

    from multiprocessing import Process, Queue
    import time
    
    class MyProcess(Process):
        """
        自定义一个进程
        """
    
        def __init__(self, queue, index):
            super().__init__(name='自定义进程')
            self.queue = queue
            self.index = index
    
        def run(self):
            time.sleep(2)
            self.queue.put(self.index)
            print(self.name, self.pid)
    
    if __name__ == "__main__":
        q = Queue()
        print('主进程', id(q))
    
        process_list = []
    
        for i in range(5):
            p = MyProcess(q, i)
            p.start()
            process_list.append(p)
    
        for i in process_list:
            p.join()
    
        print(q.get())
        print(q.get())
        print(q.get())
        print('结束进程')
    
  • 方式二、使用管道(双向通讯)

    from multiprocessing import Process, Pipe
    import time
    
    class MyProcess(Process):
        """
        自定义进程
        """
    
        def __init__(self, conn):
            super().__init__(name='子进程')
            self.conn = conn
    
        def run(self):
            time.sleep(3)
            self.conn.send('子进程发送数据')
            response = self.conn.recv()
            print('{}接收数据:{}'.format(self.name, response))
            self.conn.close()
    
    if __name__ == "__main__":
        parent_conn, child_conn = Pipe()
    
        p = MyProcess(child_conn)
        p.start()
    
        # 接收子进程数据
        print('主进程接收的数据:', parent_conn.recv())
        # 给子进程发送数据
        parent_conn.send('儿子你好')
    
        p.join()
        print('结束进程')
    
  • 方式三、使用Managers共享数据(即一个进程去更改另一个进程的数据)

    from multiprocessing import Process, Manager
    
    class MyProcess(Process):
        def __init__(self, p_dict, k, v):
            super().__init__()
            self.p_dict = p_dict
            self.k = k
            self.v = v
    
        def run(self):
            self.p_dict[self.k] = self.v
    
    if __name__ == "__main__":
        process_dict = Manager().dict()
        p1 = MyProcess(process_dict, 'name', '张三')
        p1.start()
        p1.join()
        print(process_dict)
        print('结束进程')
    

五、进程池

  • 1、进程池主要的两个方法

    • apply
    • apply_async
  • 2、创建进程池

    import time
    import multiprocessing
    
    def get_html(n):
        time.sleep(1)
        print('http://www.baidu.com/{}页面请求成功'.format(n))
        return 'http://www.baidu.com/{}'.format(n)
    
    if __name__ == "__main__":
        # 创建一个进程池,如果里面没写值就会默认你CPU的多少
        pool = multiprocessing.Pool(multiprocessing.cpu_count())
        result = pool.apply_async(get_html, args=(1,))
    
        pool.close()
        pool.join()
        print('主进程中:', result.get())
    
  • 3、使用imap

    import time
    import multiprocessing
    
    def get_html(n):
        time.sleep(1)
        print('http://www.baidu.com/{}页面请求成功'.format(n))
        return 'http://www.baidu.com/{}'.format(n)
    
    if __name__ == "__main__":
        # 创建一个进程池,如果里面没写值就会默认你CPU的多少
        pool = multiprocessing.Pool(multiprocessing.cpu_count())
    
        for result in pool.imap(get_html, [1, 5, 4, 7, 2, 9, 7]):
            print('主进程中:{}'.format(result))
    
  • 4、imap_unordered的使用

    import time
    import multiprocessing
    
    def get_html(n):
        time.sleep(1)
        print('http://www.baidu.com/{}页面请求成功'.format(n))
        return 'http://www.baidu.com/{}'.format(n)
    
    if __name__ == "__main__":
        # 创建一个进程池,如果里面没写值就会默认你CPU的多少
        pool = multiprocessing.Pool(multiprocessing.cpu_count())
    
        for result in pool.imap_unordered(get_html, [1, 5, 4, 7, 2, 9, 7]):
            print('主进程中:{}'.format(result))
    
  • 5、多进程回调函数

    import time
    import multiprocessing
    
    def get_html(n):
        time.sleep(1)
        print('http://www.baidu.com/{}页面请求成功'.format(n))
        return 'http://www.baidu.com/{}'.format(n)
    
    def get_detail(arg):
        print('get_detail函数', arg)
    
    if __name__ == "__main__":
        # 创建一个进程池,如果里面没写值就会默认你CPU的多少
        pool = multiprocessing.Pool(multiprocessing.cpu_count())
        result = pool.apply_async(func=get_html, args=(1,), callback=get_detail)
    
        pool.close()
        pool.join()
        print('主进程中:', result.get())
    

六、进程池的另外一种方式

from multiprocessing import Process, Pool
import time
import os

def foo(i):
    time.sleep(1)
    print(i)
    return i + 100

def bar(arg):
    print(os.getpid())
    print('bar函数', arg)

if __name__ == "__main__":
    pool = Pool(5)
    for i in range(10):
        pool.apply_async(func=foo, args=(i,), callback=bar)

    pool.close()
    pool.join()
    print('结束进程')

results matching ""

    No results matching ""