python多线程&多进程

Reference:

https://www.cnblogs.com/kaituorensheng/p/4465768.html

https://zhuanlan.zhihu.com/p/46368084

https://www.runoob.com/python3/python3-multithreading.html

  1. 名词

    • 进程(process)和线程(thread)

      • cpu在处理任务时,把时间分成若干个小时间段,这些时间段很小的,系统中有很多进程,每个进程中又包含很多线程,在同一时间段 内,电脑CPU只能处理一个线程,下一个时间段,可能又去执行别的线程了(时间片轮转,从而实现伪多任务),具体顺序取决于其调度逻辑
      • 多核cpu可以实现真正的并行,同一个时刻每个cpu上都可以跑一个任务
      • 多进程:每个进程分别执行指定任务,进程间互相独立,每个时刻并行的实际进程数取决于cpu数量

      • 多线程:单个cpu同一时刻只能处理一个线程,一个任务可能由多个工人来完成,工人们相互协同,这则是多线程

    • python的多进程:multiprocess模块

    • python的多线程:threading模块

    • 每个进程在执行过程中拥有独立的内存单元,而一个进程的多个线程在执行过程中共享内存。

  2. 多进程multiprocess

    • 母进程:当我们执行一个python脚本,if main下面实际运行的主体就是母进程
    • 子进程:我们使用multiprocess显式创建的进程,都是子进程
    • join()方法:用来让母进程阻塞,等待所有子进程执行完成再结束
  • 使用multiprocess的多进程,可以通过process方法和pool方法
    • process方法:适用进程较少时候,无法批量开启/关闭
    • pool方法:批量管理
    • 参数:输入参数都差不多,第一个是要执行的函数方法target/func,第二个是输入参数args

🌰Process方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from multiprocessing import Process
import os
import time


def long_time_task(i):
print('子进程: {} - 任务{}'.format(os.getpid(), i))
time.sleep(2)
print("结果: {}".format(8 ** 20))


if __name__=='__main__':
print('当前母进程: {}'.format(os.getpid()))
start = time.time()
p1 = Process(target=long_time_task, args=(1,))
p2 = Process(target=long_time_task, args=(2,))
print('等待所有子进程完成。')
p1.start()
p2.start()
p1.join()
p2.join()
end = time.time()
print("总共用时{}秒".format((end - start)))
  • process方法使用Process实例化一个进程对象,然后调用它的start方法开启进程

    🌰Pool方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    from multiprocessing import Pool, cpu_count
    import os
    import time


    def long_time_task(i):
    print('子进程: {} - 任务{}'.format(os.getpid(), i))
    time.sleep(2)
    print("结果: {}".format(8 ** 20))
    return True # 用于演示pool适用于有返回值


    if __name__=='__main__':
    print("CPU内核数:{}".format(cpu_count())) # 4
    print('当前母进程: {}'.format(os.getpid()))
    start = time.time()
    p = Pool(4)
    results = []
    for i in range(5):
    # p.apply_async(long_time_task, args=(i,))
    results.append(p.apply_async(long_time_task, args=(i,)))
    print('等待所有子进程完成。')
    p.close()
    p.join()
    end = time.time()
    print("总共用时{}秒".format((send - start)))

    # 查看返回值
    for res in results:
    print(res.get())
  • apply_async(func, args=(), kwds={}, callback=None):向进程池提交需要执行的函数及参数,各个进程采用非阻塞(异步)的调用方式,即每个子进程只管运行自己的,不管其它进程是否已经完成。

  • close():关闭进程池(pool),不再接受新的任务。
  • join():主进程阻塞等待子进程的退出, 调用join()之前必须先调用close()或terminate()方法,使其不再接受新的Process。
  1. 多线程threading

    • python的多线程是伪多线程,因为主进程只有一个,所以只用了单核,只是通过碎片化进程、调度、全局锁等操作,cpu利用率提升了
    • 所以我想并行处理百万量级的数据入库操作时,多进程的效率明显高于多线程
    • 【问题】从我观察上看多线程基本就是串行??

      🌰threading

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      import threading
      import time


      def long_time_task():
      print('当子线程: {}'.format(threading.current_thread().name))
      time.sleep(2)
      print("结果: {}".format(8 ** 20))


      if __name__=='__main__':
      start = time.time()
      print('这是主线程:{}'.format(threading.current_thread().name))
      for i in range(5):
      t = threading.Thread(target=long_time_task, args=())
      t.setDaemon(True)
      t.start()
      t.join()

      end = time.time()
      print("总共用时{}秒".format((end - start)))


      # 继承&有返回值的写法
      def long_time_task(i):
      time.sleep(2)
      return 8**20


      class MyThread(threading.Thread):
      def __init__(self, func, args , name='', ):
      threading.Thread.__init__(self)
      self.func = func
      self.args = args
      self.name = name
      self.result = None

      def run(self):
      print('开始子进程{}'.format(self.name))
      self.result = self.func(self.args[0],)
      print("结果: {}".format(self.result))
      print('结束子进程{}'.format(self.name))
      def get_result(self):
      threading.Thread.join(self) # 等待线程执行完毕
      return self.result

      if __name__=='__main__':
      start = time.time()
      threads = []
      for i in range(1, 3):
      t = MyThread(long_time_task, (i,), str(i))
      threads.append(t)

      for t in threads:
      t.start()
      for t in threads:
      t.join()

      end = time.time()
      print("总共用时{}秒".format((end - start)))
    • join方法:等待所有进程执行完,主进程再执行完

    • setDaemon(True):主线程执行完就退出