Skip to content

Latest commit

 

History

History
906 lines (832 loc) · 32.5 KB

File metadata and controls

906 lines (832 loc) · 32.5 KB

在第一部分主要是介绍Python的语法、数据结构以及OOP概念,接下来则是对其进行实际的使用,用于安全开发等

2.1 文件和目录基本操作

在文件的基本操作中包含文件的打开、关闭、读、写、重命名以及删除等,分别对应openclosereadwrite几个方法,想要对一个文件进行操作,要先使用open方法获取文件的访问权限,定义如下所示

open(file,mode='r',buffering=-1,encoding=None,errors=None,newline=None,closefd=True,opener=None)

其中file为必须,即文件的相对或绝对路径;mode可选,即选择何种方式打开;buffering设置缓冲;encoding,编码方式,一般使用utf8errors报错级别;newline区分换行符;closefd传入的file参数类型;opener打开文件的一个回调函数,接接收被打开文件的文件描述符,其中mode的参数可选值如下所示

mode 描述
t 默认,文本模式
x 写模式,新建文件(如果存在则会报错)
b 二进制模式
+ 打开一个文件进行更新(可读可写)
U 通用换行模式
r 只读模式
rb 以二进制格式打开以一个文件用于只读
r+ 打开一个文件用于读写
rb+ 以二进制格式打开一个文件用于读写
w 打开一个文件只用于写,如果存在文件,则打开并删除内容进行覆写
wb 以二进制格式打开一个文件只用于写入
w+ 打开一个文件用于读写
wb+ 以二进制格式打开一个文件用于读写
a 打开一个文件用于追加,如果文件存在,新写的内容将会追加到原有内容之后
ab 以二进制格式打开进行追加
a+ 打开一个文件用于读写
ab+ 二进制格式追加读写

2.1.1 文件读取

通常在文件的操作中常用参数为file以及mode,如果操作时发现存在乱码情况,则要考虑使用encoding,创建一个测试文件test.txt,然后使用下面示列代码进行读取

data = open('./test.txt', 'r').read()
print(data)

对代码进行调整,观察读取数量以及其他模式的数据是怎么样的,示例代码如下所示

print(10*"-" + "常规读取开始" + 10*"-")
file = open('test.txt','r')
print(file.read(3))
file.close()
print(10*"-" + "常规读取完成" + 10*"-")
print()
print(10*"-" + "二进制读取开始" + 10*"-")
file = open('test.txt','rb')
print(file.read(3))
file.close()
print(10*"-" + "二进制读取完成" + 10*"-")

最终的输出结果如下所示

----------常规读取开始----------
a
b
----------常规读取完成----------

----------二进制读取开始----------
b'a\nb'
----------二进制读取完成----------

除此之外,还可以在读取时使用for循环来逐行读取,如下所示

file = open('test.txt','r')
for c in file:
    print(c)
file.close()

这种方法适用于较大文件的读取,如果以read方法读取大文件,则示列代码如下所示

with open('./test.txt', 'r') as f:
    while True:
        c = f.read(1)
        if not c:
            break
        print(c)

上面的代码是进行逐字符读取

2.1.2 文件写入

对于文件的写入通常为aw模式,下面展示两种模式

with open('test.txt', 'a') as f:
    f.write('Hello, World!\n') # 原有内容中进行追加

with open('test.txt', 'w') as f:
    f.write('Hello, World!\n') # 覆盖原有内容

因此当对文件准备进行写入操作时需要留意相关的使用

2.1.3 文件的创建、删除、重命名

在文件的创建中(未存在时)可以使用w+,对于删除则用os.remove()os.rename()则用于对文件进行重命名,具体定义如下所示

os.remove(path) # path指待删除文件的路径
os.rename(src,dst) # 其中src是指要修改的目录名,dst表示修改后的目录名

os.remove('./test.txt')
os.rename('./test.txt','./abc.txt')

2.2 目录操作

在目录操作中通常涉及到目录和目录下的文件遍历,包括枚举目录和文件的相关属性、创建和删除目录以及对目录重命名,先创建一个testDir,结构如下所示

├── 1
│   ├── 1.1
│   └── 1.2
├── 2
│   ├── 2.1
│   │   └── 21.txt
│   └── 2.2
├── 3
│   ├── 3.1
│   │   ├── 3.1.1
│   │   └── 3.1.2
│   │       └── 312.txt
│   └── 3.2
└── 4.txt

2.2.1 目录枚举

可以通过os以及sys模块来列出当前目录下的文件和子目录,示例代码如下所示

import os,sys

os.chdir('./testDir')

def listCurrentDirectory(path):
    files = os.listdir(path)
    for name in files:
        print(name)

listCurrentDirectory('.')

其中os.chdir()方法是将程序上下文的当前路径指向testDir,接着利用定义的函数来循环打印该路径下的子目录以及文件的名称,最终输出结果如下所示

1
4.txt
3
2

可以对代码进行进一步的改动,以获取更多的信息,代码如下所示

import os,sys

os.chdir('./testDir')

def listCurrentDirectory(path):
    files = os.listdir(path)
    for name in files:
        pathName = os.path.join(path,name)
        print(os.stat(pathName).st_mode) # inode模式
        print(os.stat(pathName).st_ino)  # inode节点号
        print(os.stat(pathName).st_dev)  # 设备号
        print(os.stat(pathName).st_nlink) # 链接数
        print(os.stat(pathName).st_uid)  # 所有者用户id
        print(os.stat(pathName).st_gid)  # 所有者组id
        print(os.stat(pathName).st_size) # 文件大小
        print(os.stat(pathName).st_atime) # 最后访问时间
        print(os.stat(pathName).st_mtime) # 最后修改时间
        print(os.stat(pathName).st_ctime) # 最后状态改变时间
        print("-"*20)

listCurrentDirectory('.')

输出结果如下所示

16877
23057958
16777230
4
501
20
128
1767769703.2751656
1767769604.1379273
1767769604.1379273
--------------------
33188
23058903
16777230
1
501
20
0
1767769787.7183774
1767769787.7183774
1767769787.718489
--------------------
16877
23057981
16777230
4
501
20
128
1767769703.2751584
1767769635.4250436
1767769635.4250436
--------------------
16877
23057960
16777230
4
501
20
128
1767769703.2751505
1767769624.392783
1767769624.392783
--------------------

这些只是相应的详细信息,接下来还要知道文件的类型,因此对代码进行改动,如下所示

import os,sys,stat

os.chdir('./testDir')

def listCurrentDirectory(path):
    files = os.listdir(path)
    for name in files:
        pathName = os.path.join(path,name)
        mode = os.stat(pathName).st_mode
        if stat.S_ISDIR(mode):
            print(f"{pathName} 是目录")
        elif stat.S_ISREG(mode):
            print(f"{pathName} 是文件")
        else:
            print(f"{pathName} 其他类型")
        print("-"*20)

listCurrentDirectory('.')

输出结果如下所示

./1 是目录
--------------------
./4.txt 是文件
--------------------
./3 是目录
--------------------
./2 是目录
--------------------

除此之外还有其他的方法,如下所示

stat.S_ISLNK(mode) #判断是否链接文件
stat.S_ISSOCK(mode) #判断是否套接字文件
stat.S_ISFIFO(mode) #判断是否命名管道
stat.S_ISBLK(mode) #判断是否块设备
stat.S_ISCHR(mode) #判断是否字符设置

2.2.2 目录遍历

前面的主要是当前目录下的一级目录进行枚举,接下来进行多级目录的枚举,前面已经知道了怎么判断是目录还是文件,因此可以通过判断来实现多级的遍历,实现递归调用,这种方法可以通过一个简单的os.walk()方法进行实现,语法如下所示

os.walk(top[, topdown=True[, onerror=None[, followlinks=False]]])

其中top是你所要遍历的目录的地址,返回的是一个三元组(root,dirs,files),root是指当前正在遍历的这个文件夹本身的地址,dirs是一个list,内容是该文件夹中所有的目录的名字(不包括子目录),files同样是list,内容是该文件夹中所有的文件(不包括子目录);topdown为可选参数,值为True则优先遍历top目录,否则优先遍历top的子目录(默认为开启);对于参数onerror也为可选,需要一个callable对象,当walk需要异常时,会调用;followlinks可选,如果为True,则会遍历目录下的快捷方式(linux下是软连接symbolic link)实际所指的目录(默认关闭),如果为False,则优先遍历top的子目录,测试代码如下所示

import os,sys,stat

os.chdir('./testDir')

def walkDir(path):
    for dirname, subdirList, fileList in os.walk(path):
        print(f'发现目录: {dirname}')
        for fname in fileList:
            print(f'\t{fname}')

walkDir('.')

输出结果如下所示

发现目录: .
        4.txt
发现目录: ./1
发现目录: ./1/1.1
发现目录: ./1/1.2
发现目录: ./3
发现目录: ./3/3.2
发现目录: ./3/3.1
发现目录: ./3/3.1/3.1.2
        312.txt
发现目录: ./3/3.1/3.1.1
发现目录: ./2
发现目录: ./2/2.1
        21.txt
发现目录: ./2/2.2

2.2.3 修改权限

对于文件的权限修改则使用os.chmod(),其语法如下所示

os.chmod(path,mode)

其中path是指文件名路径货目录路径,接着可以使用flag来按位或操作生成,目录的读取权限表示可以获取目录里文件名列表,执行权限表示可以把工作目录切换到此目录,删除添加目录里的文件必须同时有写和执行权限,文件权限以用户id->id->其它顺序检验,最先匹配的允许或禁止权限被应用,如下所示

stat.S_IXOTH: 其他用户有执行权限0o001
stat.S_IWOTH: 其他用户有写权限0o002
stat.S_IROTH: 其他用户有读权限0o004
stat.S_IRWXO: 其他用户有全部权限权限掩码0o007
stat.S_IXGRP: 组用户有执行权限0o010
stat.S_IWGRP: 组用户有写权限0o020
stat.S_IRGRP: 组用户有读权限0o040
stat.S_IRWXG: 组用户有全部权限权限掩码0o070
stat.S_IXUSR: 拥有者具有执行权限0o100
stat.S_IWUSR: 拥有者具有写权限0o200
stat.S_IRUSR: 拥有者具有读权限0o400
stat.S_IRWXU: 拥有者有全部权限权限掩码0o700
stat.S_ISVTX: 目录里文件目录只有拥有者才可删除更改0o1000
stat.S_ISGID: 执行此文件其进程有效组为文件所在组0o2000
stat.S_ISUID: 执行此文件其进程有效用户为文件所有者0o4000
stat.S_IREAD: windows下设为只读
stat.S_IWRITE: windows下取消只读

下面的代码用来测试修改文件的权限

os.chmod('test.txt', stat.S_IXGRP) # 设置文件可以通过用户组执行
os.chmod('test.txt', stat.S_IWOTH) # 设置文件可以被其他用户写入

2.2.4 目录的重命名、创建和删除

在文件的操作中还有对名称的更改等行为

2.2.4.1 目录重命名

前面介绍过,使用os.rename方法可以实现对文件和目录的重命名,相似的方法还有os.renames方法,下面是两种的使用

os.rename('1.txt','2.txt')
os.renames('./a/3.txt', './b/4.txt') # 实现递归重命名

2.2.4.2 目录创建

在目录创建时可以使用os.mkdir,具体的语法定义如下所示

os.mkdir('Dir')

这种方法只能在单层目录进行创建,可以使用os.makedirs方法实现递归创建目录,如下所示

os.makedirs('/A/B/C')

2.2.4.3 目录删除

对于目录的删除,则可以使用os.rmdir方法,如果要递归删除,则使用os.removedirs方法,如下所示

os.rmdir(path)

2.2.5 作业

根据文件类型来实现多级目录的枚举

import os,sys,stat

os.chdir('./testDir')

def enumDir(path):
    files = os.listdir(path)
    for name in files:
        pathName = os.path.join(path,name)
        mode = os.stat(pathName).st_mode
        if stat.S_ISDIR(mode):
            print(f"Directory: {pathName}")
            enumDir(pathName)
        else:
            print(pathName)

enumDir('.')

2.3 多线程编程

对于Python的多线程来说,和其他的语言是有很大区别的,原则上讲是假的多线程,具体解释如下所示

Python代码的执行由Python虚拟机(解释器)来控制。Python在设计之初就考虑要在主循环中,同时只有一个线程在执行,就像单CPU的系统中运行多个进程那样,内存中可以存放多个程序,但任意时刻,只有一个程序在CPU中运行。同样地,虽然Python解释器可以运行多个线程,只有一个线程在解释器中运行。对Python虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同时只有一个线程在运行。在多线程环境中,Python虚拟机按照以下方式执行:
1.设置GIL
2.切换到一个线程去执行
3.运行
4.把线程设置为睡眠状态
5.解锁GIL
6.再次重复1-5

对所有面向I/O的(会调用内建的操作系统C代码的)程序来说,GIL会在这个I/O调用之前被释放,以允许其他线程在这个线程等待I/O的时候运行。如果某线程并未使用很多I/O操作,它会在自己的时间片内一直占用处理器和GIL。也就是说,I/O密集型的Python程序比计算密集型的Python程序更能充分利用多线程的好处。

链接:https://www.zhihu.com/question/23474039/answer/269526476

Python中可以使用thread(_thread)模块和threading模块来创建底层线程,由于threading完全可以替代thread模块,同时提供了更为丰富的功能,所以基本上使用threading模块

2.3.1 创建线程

threading模块中的Thread类代表一个线程,其__init__方法定义如下

def __init__(self,group=None,target=None,name=None,args=(),kwargs=None,*,daemon=None)

先使用最简单的直接初始化的方法来创建线程,代码如下所示

import threading

class SimpleCreator:
    def f(self):
        print('线程执行\n')
    def __init__(self):
        return
    def creatThread(self):
        for i in range(3):
            t = threading.Thread(target=self.f)
            t.start()

if __name__ == "__main__":
    sc = SimpleCreator()
    sc.creatThread()

上面的代码在测试类SimpleCreator定义了一个方法f,该方法被调用时打印相关输出,在另一个方法creatThread方法中循环创建三个Thread类的实例,构造函数中只传入target参数,值为方法f,接下来每个Thread类的实例会调用start方法,该方法的作用是启动线程,在上述代码中的if __name__ == '__main__'是用于判断是否是入口文件。为了让线程能执行更多的任务,我们需要利用args参数给线程传参,修改后的代码如下

import threading

class SimpleCreator:
    def f(self,id):
        print(f'线程执行: {id}')
        return
    def __init__(self):
        return
    def creatThread(self):
        for i in range(3):
            t = threading.Thread(target=self.f,args=(i,))
            t.start()

if __name__ == "__main__":
    sc = SimpleCreator()
    sc.creatThread()

输出如下所示

线程执行: 0
线程执行: 1
线程执行: 2

同样的,可以通过类的继承来实现自定义类,并控制线程的执行,代码如下所示

from threading import Thread

class MyThread(Thread):
    def __init__(self, id):
        super(MyThread, self).__init__()
        self.id = id

    def run(self):
        print(f"Thread {self.id} is running")

if __name__ == "__main__":
    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()

最终的输出结果如下所示

Thread 1 is running
Thread 2 is running

2.3.2 线程标识

每个线程默认都有唯一的标识符,可以通过ThreadgetName方法获取到,如下代码所示

from threading import Thread,currentThread

class MyThread(Thread):
    def __init__(self, n):
        if n != "":
            super(MyThread, self).__init__(name=n)
        else:
            super(MyThread, self).__init__()
    def run(self):
        print(f"Thread Name: {currentThread().getName()} is running.")

if __name__ == "__main__":
    t1 = MyThread("")
    t2 = MyThread("Thread-2")
    t1.start()
    t2.start()
    print(currentThread().getName())

上面的代码依然是通过自定义线程类来进行的测试,构造函数对传入的参数进行判断,如果值不为空,则赋值给Thread类的name参数,该方法操作会修改默认的线程名称,运行结果如下所示

Thread Name: Thread-1 is running.

Thread Name: Thread-2 is running.
MainThread

2.3.3 setDaemon

在程序的执行中,执行一个主程序,如果主线程又创建了一个子线程,主线程和子线程就兵分两路分别运行,当主线程完成后想退出时,会校验子线程是否完成,如果未完成,则会等待子线程完成后再退出。因此,如果需要在主线程完成后,不管子线程是否完成,都要和主线程一起退出的情况时,则可以使用setDaemon方法了,需要注意的是必须在start()方法调用之前设置,如下代码

from threading import Thread
import time

class MyThread(Thread):
    def __init__(self):
        super(MyThread, self).__init__()
    def run(self):
        time.sleep(5)
        print("我是子线程: "+self.getName())

if __name__ == "__main__":
    t1 = MyThread()
    t1.setDaemon(True)
    t1.start()

print("我是主线程!")

输出结果如下所示

我是主线程!

分析上面的代码可以发现,子线程t1的内容并未打印,是因为t1.setDaemon(True)的操作,这里将父线程设置成了守护线程,在父线程print("我是主线程!")完成后就结束了

2.3.4 join

主线程A中,创建了子线程B,并且在主线程A中调用了B.join(),则主线程A会在调用的地方等待,直到子线程B完成操作后才会接着往下执行,代码如下所示

from threading import Thread
import time

class MyThread(Thread):
    def __init__(self, id):
        super(MyThread, self).__init__()
        self.id = id
    def run(self):
        time.sleep(3)
        print(f"Thread {self.id} finished")

if __name__ == "__main__":
    t1 = MyThread(999)
    t1.start()
    for i in range(5):
        print(f"Main thread iteration {i}")

运行结果如下所示

Main thread iteration 0
Main thread iteration 1
Main thread iteration 2
Main thread iteration 3
Main thread iteration 4
Thread 999 finished

当执行到Main thread iteration 4后会有明显的停顿,这是因为在循环执行完成后开始执行t1线程,此时修改代码引入join方法,代码如下所示

from threading import Thread
import time

class MyThread(Thread):
    def __init__(self, id):
        super(MyThread, self).__init__()
        self.id = id
    def run(self):
        time.sleep(3)
        print(f"Thread {self.id} finished")

if __name__ == "__main__":
    t1 = MyThread(999)
    t1.start()
    t1.join() # 引入 join 方法,等待子线程结束
    for i in range(5):
        print(f"Main thread iteration {i}")

执行结果如下所示

Thread 999 finished
Main thread iteration 0
Main thread iteration 1
Main thread iteration 2
Main thread iteration 3
Main thread iteration 4

当代码开始运行时,会有明显的停顿,之后执行了t1线程,在执行结束后再去执行主线程中的循环,这就是join方法的使用和效果

2.3.5 Timer

Timer(定时器)是Thread的派生类,用于在指定时间后调用一个方法,代码如下所示

import threading,time

def hello():
    print("Hello, Timer!")

if __name__ == "__main__":
    t = threading.Timer(3.0, hello)
    t.start()

上面的代码中,调用hreading.Timer创建一个线程t,第一个参数3.0代表start之后3秒钟,该线程才开始执行,第二个参数hello是该线下要调用的函数,运行结果如下所示

Hello, Timer!

2.3.6 锁(lock)

由于线程之间是随机调度,当多个线程同时修改同一条数据时可能会出现脏数据,所以出现了线程锁,即同一时刻只允许一个线程执行操作。线程锁用于锁定资源,可以定义多个锁,当只需要独占某一资源时,任何一个锁都可以锁这个资源,互斥锁时一种同一时刻只允许一个线程访问资源的锁,示列代码如下所示

import threading,time

num = 0
def run(n):
    lock.acquire() # 上锁
    global num # 声明使用全局变量num
    print('start:',num)
    num += 1
    print('end:',num)
    lock.release() # 释放锁

lock = threading.Lock() # 创建锁
for i in range(5):
    t = threading.Thread(target=run,args=("t-%s"%i,)) # 创建线程
    t.start() # 启动线程
    t.join() # 阻塞线程,等待线程运行结束

上述代码调用lock.acquire()获取锁,通过lock.release()来释放锁,两者之间的代码同时只能被一个线程访问,当前线程未退出时,起他线程会等待其执行。

2.3.6.1 作业

结合2.1以及2.2的内容,编写一个多线程版本的文件枚举程序,同时输入多个目录,每个线程负责一个目录递归获取该目录下的所有文件

A方案
import os,sys,stat,threading

def enumDir(path: str) -> None:
    try:
        entries = os.listdir(path)
    except (PermissionError, FileNotFoundError) as e:
        print(f"[SKIP] Cannot list: {path} ({e})")
        return

    for name in entries:
        path_name = os.path.join(path, name)
        try:
            mode = os.stat(path_name).st_mode
        except(PermissionError, FileNotFoundError) as e:
            print(f"[SKIP] Cannot stat: {path_name} ({e})")
            continue
        if stat.S_ISDIR(mode):
            print(f"[DIR] {path_name}")
            enumDir(path_name)
        else:
            print(f"[FILE] {path_name}")

def enumDirs_MultiThread(paths: list[str]) -> None:
    threads = []
    for p in paths:
        t = threading.Thread(target=enumDir, args=(p,))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

if __name__ == "__main__":
    paths = ["./testDir/1","./testDir/2","./testDir/3"]
    enumDirs_MultiThread(paths)
B方案
import os,sys,stat,threading

def enumDir(path: str) -> None:
    thread_name = threading.current_thread().name # 获取当前线程名称
    try:
        entries = os.listdir(path)
    except (PermissionError, FileNotFoundError) as e:
        print(f"[{thread_name}][SKIP] Cannot list: {paths} ({e})")
        return
    
    for name in entries:
        path_name = os.path.join(path, name)
        try:
            mode = os.stat(path_name).st_mode
        except(PermissionError, FileNotFoundError) as e:
            print(f"[{thread_name}] [SKIP] Cannot stat: {path_name} ({e})")
            continue
        if stat.S_ISDIR(mode):
            print(f"[{thread_name}] [DIR] {path_name}")
            enumDir(path_name)
        else:
            print(f"[{thread_name}] [FILE] {path_name}")

def enumDirs_MultiThread(paths: list[str]) -> None:
    threads = []
    for p in paths:
        t = threading.Thread(target=enumDir, args=(p,), name=f"scan-{p}", daemon=True)
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

if __name__ == "__main__":
    paths = ["./testDir/1","./testDir/2","./testDir/3"]
    enumDirs_MultiThread(paths)

2.4 多进程编程

前面提到Python并不能真正的实现并行编程,但是多进程的模式是可以实现的。进程之间是无法直接共享数据,需要通过QueuePipe或者Manager方式进行进程之间的通信

2.4.1 创建进程

multiprocessing模块提供了类似threading模块中多线程编程的模式,辅助我们进行多进程开发,示例代码如下所示

import os
from multiprocessing import Process

def info(title):
    print(title)
    print('Module name:', __name__)
    print('Parent process:', os.getppid())
    print('Process id:', os.getpid())

def f(name):
    info('Function f')
    print(f'Hello, {name}')

if __name__ == '__main__':
    print('Main line')
    p = Process(target=f, args=('Bob',)) # 创建进程
    p.start() # 启动进程
    p.join() # 等待子进程结束

Process类定义在multiprocess模块中,使用方法和Thread类基本类似,参数也是,因此可以创建多个进程来实现真正的并行任务,上面的代码输出如下所示

Main line
Function f
Module name: __mp_main__
Parent process: 41357
Process id: 41360
Hello, Bob

2.4.2 进程间通信

多进程之间的通信通过Queue()Pipe()来实现

2.4.2.1 Queue

Queue类提供了put方法存放数据,get方法获取数据,get方法获取数据的同时会清空队列,示例代码如下所示

import os,time
from multiprocessing import Process,Queue

def f(q, data):
    q.put(data) # 将数据放入队列

def out(q):
    time.sleep(4) # 模拟等待数据准备
    print('从队列获取数据:', q.get()) # 从队列获取数据

if __name__ == '__main__':
    q = Queue() # 创建队列
    p = Process(target=f, args=(q, [1, 2, 3, 4, 5])) # 创建生产者进程
    p.start() # 启动生产者进程
    p.join() # 等待生产者进程结束

    p1 = Process(target=out, args=(q,)) # 创建消费者进程
    p1.start() # 启动消费者进程
    p1.join() # 等待消费者进程结束

上述代码首先用Queue()创建一个队列,之后进程p绑定的函数为f,进行数据的添加;然后进程p1绑定函数out,进行数据的读取,最终的运行结果如下所示

从队列获取数据: [1, 2, 3, 4, 5]

从输出可以看出,两个进程之间实现了数据的共享

2.4.2.2 Pipe

Pipe()即管道模式,调用Pipe()返回管道的两端的Connection,本质是进程之间的数据传递,而不是数据共享,返回两个连接对象,分别表示管道的两端,每端都有send()recv()方法,如果两进程试图在同一时间的同一端进行读取和写入,那么可能会损坏管道中的数据,示例代码如下所示

import multiprocessing

def consumer(pipe):
    output_p, input_p = pipe
    input_p.close() # 关闭输入端
    while True:
        try:
            item = output_p.recv()
            print(f"Consumed: {item}")
        except EOFError:
            break
# 生产项目并将其放置到队列上, sequence是一个可迭代对象
def producer(sequence, input_p):
    for item in sequence:
        input_p.send(item)
        print(f"Produced: {item}")

if __name__ == "__main__":
    (output_p,input_p) = multiprocessing.Pipe(True) # 创建一个管道, 双向通信
    cons_p = multiprocessing.Process(target=consumer,args=((output_p,input_p),))
    cons_p.start()
    output_p.close() # 关闭输出端
    print("生产者关闭输出")

    # 生产项目
    sequence = [1,2,3,4]
    producer(sequence,input_p)
    input_p.close() # 关闭输入端
    print("生产者关闭输入")
    cons_p.join() # 等待消费者进程结束

输出结果如下所示

生产者关闭输出
Produced: 1
Produced: 2
Produced: 3
Produced: 4
生产者关闭输入
Consumed: 1
Consumed: 2
Consumed: 3
Consumed: 4

上面的代码在主进程中调用multiprocessing.Pipe(True)创建了管道,元组(conn1,conn2)作为返回值,其中conn1conn2是表示管道两端的Connection对象。Pipe接收一个布尔型参数,默认为true,表示全双工通信,如果设置为falseconn1只能用于接收,而conn2只能用于发送。必须在创建和启动使用管道的Process对象之前调用Pipe()方法。 当主进程创建Pipe的时候,其两个Connections的连接都是主进程,当主进程创建子进程后,同时拷贝Connections,此时有了四个Connections。此后,关闭主进程的一个Out Connection,关闭一个子进程的In Connection,那么就建立好了一个输入在主进程,输出在子进程的管道,原理图如下所示

2.4.3 进程锁

和多线程编程一样,当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突,示例代码如下所示

import multiprocessing,time

def job(v,num,l):
    l.acquire() # 锁住
    for _ in range(5):
        time.sleep(0.1)
        v.value += num # 共享内存变量
        print(v.value)
    print(f"进程 {num} 完成操作")
    l.release() # 释放锁

def multicore():
    l = multiprocessing.Lock() # 创建锁
    v = multiprocessing.Value('i',0) # 创建共享内存变量
    p1 = multiprocessing.Process(target=job, args=(v,1,l)) # 创建进程1
    p2 = multiprocessing.Process(target=job, args=(v,3,l)) # 创建进程2
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == "__main__":
    multicore()

上面的代码通过multiprocessing.Lock()来创建一个进程锁,然后调用multiprocessing.Value来定义一个共享内存变量v,初始值为0。接下来启动两个线程p1p2同时对变量v进行操作,为了保证二者互不影响,在方法job中调用了acquire方法和release方法,对其间的代码块进行加锁,以此保证同时只能有一个进程修改v的值,运行结果如下所示

1
2
3
4
5
进程 1 完成操作
8
11
14
17
20
进程 3 完成操作

2.4.4 进程池

由于进程启动的开销比较大,使用多线程的时候会导致大量内存空间被消耗,为了防止这种情况发生,可以使用进程池。进程池会缓存一些进程在池子中,使用的时候直接拿来用,使用完毕回收到池子中,常用方法如下所示

apply() 同步执行(串行)
apply_async() 异步执行(并行)
terminate() 立刻关闭进程池
join() 主进程等待所有子进程执行完毕,必须在close()或terminate()之后
close() 等待所有进程结束后,才关闭进程池

对于进程池的使用,示例代码如下所示

from multiprocessing import Process,Pool
import time

def Foo(i):
    time.sleep(2)
    return i+100

def Bar(arg):
    print(f'--> 执行完毕: {arg}')

def main():
    pool = Pool(5) # 创建一个包含5个进程的进程池

    for i in range(10):
        pool.apply_async(func=Foo, args=(i,), callback=Bar) # func子进程执行完毕后,调用callback指定的回调函数,否则不会执行(callback由父进程执行)

    print('----start----')
    pool.close() # 关闭进程池,表示不能再往进程池中添加进程
    pool.join() # 等待进程池中的所有进程执行完毕

if __name__ == '__main__':
    main()

最终的输出如下所示

end
--> 执行完毕: 101
--> 执行完毕: 100
--> 执行完毕: 102
--> 执行完毕: 103
--> 执行完毕: 104
--> 执行完毕: 105
--> 执行完毕: 106
--> 执行完毕: 108
--> 执行完毕: 109
--> 执行完毕: 107

2.4.5 作业

通过多进程编程以及前面的文件操作的内容编写一个多进程实现文件扫描枚举

import multiprocessing,os,stat

def enumDir(path: str) -> None:
    try:
        entries = os.listdir(path)
    except (PermissionError, FileNotFoundError) as e:
        print(f"[SKIP] Cannot list: {path} ({e})")
        return
    for name in entries:
        path_name = os.path.join(path, name)
        try:
            mode = os.stat(path_name).st_mode
        except(PermissionError, FileNotFoundError) as e:
            print(f"[SKIP] Cannot stat: {path_name} ({e})")
            continue
        if stat.S_ISDIR(mode):
            print(f"[DIR] {path_name}")
            enumDir(path_name)
        else:
            print(f"[FILE] {path_name}")

def enumDirs_MultiProcess(paths: list[str]) -> None:
    pool = multiprocessing.Pool(processes=len(paths))
    for p in paths:
        pool.apply_async(func=enumDir, args=(p,))
    pool.close()
    pool.join()

if __name__ == "__main__":
    paths = ["../testDir/1","../testDir/2","../testDir/3"]
    enumDirs_MultiProcess(paths)