🎇简单了解 线程和进程
每个程序至少有一个进程,不同进程之间不共享内存
一个进程包含一个或多个线程,线程共享着同一个进程的内存
在操作系统保护模式下,一个进程崩溃不会对其他进程产生影响,而线程没有独立的内存空间,但它有自己的堆栈和局部变量
线程可以通过共享变量的方式实现多线程直接的通信,而进程的通信主要通过消息传递。相对共享变量而言,消息传递会消耗更多资源
同步和异步
同步是指不同的程序单元为了完成某个任务通过某种通信方式协调一致,意味着有序
异步指不同程序单元之间不需要协调也可单独完成任务
并发和并行
来自 Erlang之父 Joe Armstrong 的解释
并发:两个队列和一台咖啡机 并行:两个队列和两台咖啡机
并发表示多个程序可以在同一个时间段内被执行
并行表示多个程序可以在同一时刻 被运行
Threading库
官方文档: https://docs.python.org/zh-cn/3/library/threading.html
简单应用 import timefrom threading import Threaddef wait (n ): time.sleep(n) def main (): t = Thread(target=wait, args=[10 ]) t.start() t.join() print ("Done" ) if __name__ == '__main__' : main()
开发应用 使用继承方式创建线程,可以做一些额外的逻辑处理。
import timefrom threading import Threadclass MyThread (Thread ): def __init__ (self, func, args, tname='' ): Thread.__init__(self) self.tname = tname self.func = func self.args = args def run (self ): self.func(*self.args) def wait (n ): time.sleep(n) def main (): t = MyThread(wait, (10 ,), wait.__name__) t.start() t.join() if __name__ == '__main__' : main()
线程通信
线程间通信强调的是线程之间传递对象引用
Python中有GIL锁,是不是代表线程中的共享变量就一定是安全的?先看一个例子。
from threading import ThreadZERO = 0 def foo (): global ZERO for _ in range (10 **7 ): ZERO += 1 ZERO -= 1 def main (): thread_array = [Thread(target=foo) for _ in range (2 )] [t.start() for t in thread_array] [t.join() for t in thread_array] print (ZERO) if __name__ == '__main__' : main()
执行后会发现,每次ZERO
的结果都不一样,这是每个线程拿到GIL并不会一直执行下去,整个程序是异步并发的,所以函数内对ZERO
的操作并不是原子性。
解决方法:加互斥锁,将重要指令包装成原子操作。
from threading import Lock,ThreadZERO = 0 LOCK = Lock() def foo (): global ZERO for _ in range (10 **7 ): LOCK.acquire() ZERO += 1 ZERO -= 1 LOCK.release() def foo2 (): global ZERO for _ in range (10 **7 ): with LOCK: ZERO += 1 ZERO -= 1 def main (): thread_array = [Thread(target=foo) for _ in range (2 )] [t.start() for t in thread_array] [t.join() for t in thread_array] print (ZERO) if __name__ == '__main__' : main()
Multiprocessing库
Python 中的多进程是通过 multiprocessing 包来实现的,和多线程的 threading.Thread 差不多,它可以利用 multiprocessing.Process 对象来创建一个进程对象,其使用方法与threading库很像。
简单应用 import timefrom multiprocessing import Processdef foo (n ): time.sleep(2 ) print (f'test process: {n} ' ) def main (): process_array = [Process(target=foo, args=(i,)) for i in range (5 )] [p.start() for p in process_array] [p.join() for p in process_array] print ('main process finish' ) if __name__ == '__main__' : main()
开发应用 import randomimport timefrom multiprocessing import Processclass WorkerProcess (Process ): def __init__ (self, name, func ): Process.__init__(self) self.name = name self.func = func def run (self ): """ Run the thread """ self.func(self.name) def worker (name ): print (f'Started worker {name} ' ) worker_time = random.randint(1 , 5 ) time.sleep(worker_time) print (f'{name} worker finished in {worker_time} seconds' ) if __name__ == '__main__' : process_array = [WorkerProcess(name=f'computer_{i} ' , func=worker) for i in range (5 )] [p.start() for p in process_array] [p.join() for p in process_array] print ('main process finish' )
进程池
当需要启动大量的子进程,可以用进程池的方式批量创建子进程
import osimport randomimport timefrom multiprocessing import Pooldef wait_task (name ): print (f'Run task {name} , {os.getpid()} ' ) start = time.time() time.sleep(random.random() * 3 ) end = time.time() print (f'Task {name} runs {end-start} seconds' ) if __name__=='__main__' : print ('Parent process %s.' % os.getpid()) p = Pool(9 ) [p.apply_async(wait_task, args=(i,)) for i in range (5 )] p.close() p.join() print ('All subprocesses done' )
进程锁
对于所有在threading
存在的同步原语,multiprocessing
中都有类似的等价物。
import osimport randomimport timefrom multiprocessing import Lock, ProcessLOCK = Lock() def foo (i ): with LOCK: print (f'{i} :{os.getpid()} is running' ) time.sleep(random.randint(1 ,3 )) print (f'{i} :{os.getpid()} is done' ) if __name__ == '__main__' : [Process(target=foo, args=(i,)).start() for i in range (10 )]