Python 线程进程相关库的简单应用

了解线程和进程、同步和异步、并发和并行和相关库的使用。

0%

🎇简单了解

线程和进程

  • 每个程序至少有一个进程,不同进程之间不共享内存
  • 一个进程包含一个或多个线程,线程共享着同一个进程的内存
  • 在操作系统保护模式下,一个进程崩溃不会对其他进程产生影响,而线程没有独立的内存空间,但它有自己的堆栈和局部变量
  • 线程可以通过共享变量的方式实现多线程直接的通信,而进程的通信主要通过消息传递。相对共享变量而言,消息传递会消耗更多资源

同步和异步

  • 同步是指不同的程序单元为了完成某个任务通过某种通信方式协调一致,意味着有序
  • 异步指不同程序单元之间不需要协调也可单独完成任务

并发和并行

来自 Erlang之父 Joe Armstrong 的解释

并发:两个队列和一台咖啡机
并行:两个队列和两台咖啡机

  • 并发表示多个程序可以在同一个时间段内被执行
  • 并行表示多个程序可以在同一时刻被运行

Threading库

官方文档: https://docs.python.org/zh-cn/3/library/threading.html

简单应用

import time
from threading import Thread


def wait(n): # 需要被线程执行的函数
time.sleep(n)

def main():
# 实例化线程
t = Thread(target=wait, args=[10])
t.start() # 启动线程, 线程开始执行
t.join() # 会将主线程挂起,直到子线程运行结束
print("Done")

if __name__ == '__main__':
main()

开发应用

使用继承方式创建线程,可以做一些额外的逻辑处理。

import time
from threading import Thread


class MyThread(Thread):
def __init__(self, func, args, tname=''):
# 调用父类构造函数
Thread.__init__(self)
self.tname = tname
self.func = func
self.args = args

def run(self):
# 线程执行的具体逻辑
self.func(*self.args)

def wait(n):
time.sleep(n)

def main():
# 实例化线程
t = MyThread(wait, (10,), wait.__name__)
t.start()
t.join()

if __name__ == '__main__':
main()

线程通信

线程间通信强调的是线程之间传递对象引用

Python中有GIL锁,是不是代表线程中的共享变量就一定是安全的?先看一个例子。

from threading import Thread

ZERO = 0 # 共享变量

def foo():
global ZERO
for _ in range(10**7): # 多次操作
ZERO += 1
ZERO -= 1

def main():
thread_array = [Thread(target=foo) for _ in range(2)]

[t.start() for t in thread_array]
[t.join() for t in thread_array]

print(ZERO)

if __name__ == '__main__':
main()

执行后会发现,每次ZERO的结果都不一样,这是每个线程拿到GIL并不会一直执行下去,整个程序是异步并发的,所以函数内对ZERO的操作并不是原子性。

解决方法:加互斥锁,将重要指令包装成原子操作。

from threading import Lock,Thread

ZERO = 0 # 共享变量
LOCK = Lock() # 创建lock对象

def foo():
global ZERO
for _ in range(10**7): # 多次操作
LOCK.acquire() # 获得lock对象,lock状态变为locked,并且阻塞其他线程获取lock对象
ZERO += 1
ZERO -= 1
LOCK.release() # 释放lock对象

def foo2():
global ZERO
for _ in range(10**7): # 多次操作
with LOCK: # 另一种使用方法:使用 with 简化操作
ZERO += 1
ZERO -= 1

def main():
thread_array = [Thread(target=foo) for _ in range(2)]

[t.start() for t in thread_array]
[t.join() for t in thread_array]

print(ZERO)

if __name__ == '__main__':
main()

Multiprocessing库

Python 中的多进程是通过 multiprocessing 包来实现的,和多线程的 threading.Thread 差不多,它可以利用 multiprocessing.Process 对象来创建一个进程对象,其使用方法与threading库很像。

简单应用

import time
from multiprocessing import Process


def foo(n):
time.sleep(2)
print(f'test process: {n}')

def main():
process_array = [Process(target=foo, args=(i,)) for i in range(5)]

[p.start() for p in process_array]
[p.join() for p in process_array]

print('main process finish')

if __name__ == '__main__':
main()

开发应用

import random
import time
from multiprocessing import Process


class WorkerProcess(Process):

def __init__(self, name, func):
Process.__init__(self)
self.name = name
self.func = func

def run(self):
""" Run the thread """
self.func(self.name)

def worker(name):
print(f'Started worker {name}')
worker_time = random.randint(1, 5)
time.sleep(worker_time)
print(f'{name} worker finished in {worker_time} seconds')

if __name__ == '__main__':
process_array = [WorkerProcess(name=f'computer_{i}', func=worker) for i in range(5)]

[p.start() for p in process_array]
[p.join() for p in process_array]

print('main process finish')

进程池

当需要启动大量的子进程,可以用进程池的方式批量创建子进程

import os
import random
import time
from multiprocessing import Pool


def wait_task(name):
print(f'Run task {name}, {os.getpid()}')
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print(f'Task {name} runs {end-start} seconds')

if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Pool(9) # 默认大小是CPU的核数

[p.apply_async(wait_task, args=(i,)) for i in range(5)]

p.close()
p.join()
print('All subprocesses done')

进程锁

对于所有在threading存在的同步原语,multiprocessing中都有类似的等价物。

import os
import random
import time
from multiprocessing import Lock, Process

LOCK = Lock()

def foo(i):
with LOCK:
print(f'{i}:{os.getpid()} is running')
time.sleep(random.randint(1,3))
print(f'{i}:{os.getpid()} is done')

if __name__ == '__main__':
[Process(target=foo, args=(i,)).start() for i in range(10)]
------------ 已触及底线了 感谢您的阅读 ------------
  • 本文作者: OWQ
  • 本文链接: https://www.owq.world/f89f9ea4/
  • 版权声明: 本站所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处( ̄︶ ̄)↗