Python Tricks

关于 Python 的一些知识收集


语法

迭代器

可以被 for 循环的对象统称为可迭代对象。而迭代器除了可以使用 for 循环,还可以使用 next() 函数调用并返回下一个值。比如生成器同时也是一个迭代器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class FibIter():
def __init__(self, num):
self.num = num
self.a, self.b = 0, 1
self.idx = 0

def __iter__(self):
return self

def __next__(self):
if self.idx < self.num:
self.a, self.b = self.b, self.a + self.b
self.idx += 1
return self.a
raise StopIteration()

fib = FibIter(10)
print(next(fib))
for i in fib:
print(fib)

生成器

将列表生成式中的 [] 更改成 () 便可以得到一个生成器。区别在于列表在生成时已经完成了所有计算并将结果全部储存,而生成器只有在循环时才会计算当前需要的值。

1
2
lis = [m + n for m in 'ABCDEFG' for n in '12345']
gen = (m + n for m in 'ABCDEFG' for n in '12345')

函数中使用 yield 关键词也会产生生成器:

1
2
3
4
5
6
7
8
9
def fib(num):
n, a, b, = 0, 1, 1
while n < num:
yield b
a, b = a+b, a
n += 1

for i in fib(10):
print(i)

函数生成器中单独的 yield 可以用来接收参数,即将生成器作为协程使用:

1
2
3
4
5
6
7
8
9
def f():
while True:
x, y = yield
print(x, y)

a = f()
a.send(None) # first must be None
a.send([1, 2])
a.send([3, 4])

集合

1
2
3
4
5
6
7
8
set1 = {1, 2, 3, 3, 3, 2}
set2 = set(range(2, 11, 2))
print(set1 & set2) # intersection
print(set1 | set2) # union
print(set1 - set2) # difference
print(set1 ^ set2) # symmetric difference
print(set2 <= set1) # subset
print(set1 >= set2) # upperset

装饰器

本质上,装饰器是一个返回函数的高阶函数。Python 中可以使用 @ 来很方便地定义装饰器。下面的代码等价于 say=my_decorator(say)

1
2
3
4
5
6
7
8
9
10
def my_decorator(func):
def wrapper():
print("Before the function is called.")
func()
print("After the function is called.")
return wrapper

@my_decorator
def say():
print("hhh")

带参数+返回值的装饰器:

1
2
3
4
5
6
7
8
9
10
11
def my_decorator(func):
def wrapper(*args, **kwargs):
print("Before the function is called.")
val = func(*args, **kwargs)
print("After the function is called.")
return val
return wrapper

@my_decorator
def add(x, y):
return x + y

内置装饰器

  1. @property: 使得调用类中的成员函数像调用成员变量一样
  2. @staticmethod: 可以由类名直接调用,表明改函数不需要访问该类
  3. @classmethod: 可以由类名直接调用,只能访问类变量。默认第一个参数是类本身。常用来提供额外的构造器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Date
def __init__(self, year, month, day):
self.year = year
self.month = month
self.day = day

@classmethod
def today(cls):
t = time.localtime()
return cls(t.tm_year, t.tm_mon, t.tm_mday)

@property
def year(self):
return self.year

@staticmethod
def subtract(x, y):
return x - y

a = Date(2021, 4, 24)
b = Date.today() # 使用 classmethod 实现另一种初始化方法
b.year - a.year # 而非 b.year()
Date.subtract(2, 1) # 类名直接调用
b.subtract(2, 1) # 实例调用

多重继承

下方的代码 D 类同时继承了两个父类,即同时获得了两个父类的所有功能。注意多重继承时父类的顺序。

super 的工作原理如下:

1
2
3
def super(cls, inst):
mro = inst.__class__.mro()
return mro[mro.index(cls) + 1]

即从 inst 的 mro 列表中寻找 cls 的下一个类。因此下方的代码 super(B, self).foo() 会调用 C 的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class A(object):
def foo(self):
print('foo of A')

class B(A):
pass

class C(A):
def foo(self):
print('foo of C')

class D(B, C):
pass

class E(D):
def foo(self):
print('foo in E')
super().foo()
super(B, self).foo()
super(C, self).foo()

d = D()
d.foo()
e = E()
e.foo()

上下文管理

实现了上下文管理器的对象可以使用 with 语句。在执行时,会依次执行对象的 __enter__ 函数,块内的代码,对象的 __exit__ 函数。因此可以很方便实现自己的上下文管理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class My_Context_Manager():
def __init__(self, name):
self.name = name

def __enter__(self):
print('Begin')
return self

def __exit__(self, exc_type, exc_value, traceback):
print('Exit')

def func(self):
print('Simple function')

with My_Context_Manager('test') as c:
c.func()

注意 __exit__ 中的参数是用来做异常处理的,不能省略。

Python 的 contextmanager 装饰器提供了更简单的写法:

1
2
3
4
5
6
7
8
9
10
11
12
13
from contextlib import contextmanager
import time
from time import perf_counter

@contextmanager
def count(sec):
start = perf_counter()
yield sec
end = perf_counter()
print(f'{end - start} s')

with count(1) as c:
time.sleep(c)

在执行时,遇到 yield 语句后会执行 with 语句内的内容,最后执行 yield 之后的部分。

魔术方法

比较

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Student():
def __init__(self, name, age):
self.name = name
self.age = age

def __eq__(self, other): # ==
return self.name == other.name and self.age == other.age
def __ne__(self, other): # !=
return self.name != other.name or self.age != other.age
def __gt__(self, other): # >
if self.age > other.age:
return True
elif self.age < other.age:
return False
else:
return self.name > other.name
def __lt__(self, other): # <
return not self.__gt__(other)

a = Student('a', 1)
b = Student('b', 2)
c = Student('c', 2)
print(a < b, b > c)

表示

  • 如果想要将自定义类放入 set 或作为 dict 的键,则必须重写 __hash___eq__ 两个魔术方法
  • __str__ 在使用 print 函数时被调用,__repr__print 一个 list/set/dict 时被调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Student():
def __init__(self, name, age):
self.name = name
self.age = age

def __hash__(self):
return hash(self.name) + hash(self.age)

def __eq__(self, other):
return self.name == other.name and self.age == other.age

def __str__(self):
return f'{self.name}: {self.age}'

def __repr__(self):
return self.__str__()

stu = set()
stu.add(Student('a', 1))
stu.add(Student('a', 1))
stu.add(Student('b', 2))
print(stu)

文档

在文件开头或函数开头的 string 会被赋值到 __doc__ 这一变量中

1
2
3
4
5
6
7
8
9
'''脚本介绍'''

def main():
'''函数介绍'''
print(__doc__)
print(main.__doc__)

if __name__ == '__main__':
main()

并发编程

  • 并发:一个时间段内,有多个程序在同一个 CPU 上运行,但是任意时刻只有一个程序在 CPU 上运行。
  • 并行:在任意时刻,有多个程序运行在多个 CPU 上

多线程

  • 由于 Python 全局锁 GIL 的存在,多线程无法利用多核优势,不适合计算密集型任务。
  • 线程之间可以使用全局变量进行通信。

以下情况考虑多线程:

  • 程序需要维护很多共享状态时(list/dict/set)
  • 程序的大量时间花费在 I/O 操作上,如爬虫
1
2
3
4
5
6
7
8
9
10
11
12
import threading
import time

def sleeper(n, name):
print(f'{name} start')
time.sleep(n)
print(f'{name} finished')

t = threading.Thread(target=sleeper, args=(2, 'thread1'))
# t.daemon = True # 主程序停止则线程停止,默认为 False
t.start()
print('All done')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import threading
import time

def sleeper(n, name):
print(f'{name} start')
time.sleep(n)
print(f'{name} finished')

threads = []
for i in range(5):
t = threading.Thread(target=sleeper, args=(2, f'thread-{i}'))
threads.append(t)
t.start()
for t in threads:
t.join() # 等待该线程执行完毕

print('All done')

多个线程竞争一个资源:使用锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading
from concurrent.futures import ThreadPoolExecutor

class Counter():
def __init__(self):
self.lock = threading.Lock()
self.count = 0

def increment(self):
with self.lock: # 加锁
self.count += 1

def worker(c):
for _ in range(100000):
c.increment()

c = Counter()
with ThreadPoolExecutor() as pool:
for _ in range(5):
pool.submit(worker, c)
print(c.count)

多个线程的调度:使用 Condition 来暂停/唤醒线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from concurrent.futures import ThreadPoolExecutor
from random import randint
from time import sleep
import threading

class Account():
def __init__(self, balance=0):
self.balance = balance
lock = threading.Lock()
self.condition = threading.Condition(lock)

def withdraw(self, money):
with self.condition:
while money > self.balance:
self.condition.wait() # 等待 notify 解锁
self.balance -= money

def deposit(self, money):
with self.condition:
self.balance += money
self.condition.notify_all()

def add_money(account):
while True:
money = randint(5, 10)
account.deposit(money)
print(threading.current_thread().name, ':', money, '====>', account.balance)
sleep(0.5)

def sub_money(account):
while True:
money = randint(10, 30)
account.withdraw(money)
print(threading.current_thread().name, ':', money, '<====', account.balance)
sleep(1)

account = Account()
with ThreadPoolExecutor(max_workers=10) as pool:
for _ in range(5):
pool.submit(add_money, account)
pool.submit(sub_money, account)

多进程

进程间的数据是完全隔离的

以下情况考虑多进程:

  • 计算密集型任务
  • 程序的输入可以并行的分块,并且运算结果可合并
1
2
3
4
5
6
7
8
9
10
from multiprocessing import Pool, cpu_count

def f(x):
for _ in range(10000):
x = 1 / (1 + x)
return x

if __name__ == '__main__':
with Pool(cpu_count()) as p:
res = p.map(f, range(cpu_count()*1000))

异步 I/O

异步 I/O 是一种单线程、单进程设计,但仍能用作多任务处理(aiohttp 异步 HTTP 网络访问)

  • async def: 定义一个协程函数,内部可以使用 await 语句
  • await: 暂停该协程的执行,同时让其它协程运行,直到 await 后面的语句执行完毕
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
import random

async def makerandom(i, threshold):
print(f'{i} start')
k = random.randint(0, 10)
while k <= threshold:
print(f'{k} too low')
await asyncio.sleep(i + 1)
k = random.randint(0, 10)
print(f'---> Finished with {k}')
return k

async def main():
res = await asyncio.gather(*(makerandom(i, 10 - i - 1) for i in range(3)))
return res

if __name__ == "__main__":
r1, r2, r3 = asyncio.run(main())

密码学

  • AES:对称加密算法
  • RSA:非对称加密算法
  • md5:加密哈希函数。可以将字符串转换为 32 字节字符串
  • base64:基于 64 个可打印字符编码二进制数据。常用在网页中传递少量二进制数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import base64
from hashlib import md5
from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto import Random
from Crypto.PublicKey import RSA

# AES
message = '中文'
# 生成密钥
key = md5(b'1qaz2wsx').digest()
iv = Random.new().read(AES.block_size)
# 加密
cipher = AES.new(key, AES.MODE_CFB, iv)
data = cipher.encrypt(message.encode())
print(data)
# 解密
cipher = AES.new(key, AES.MODE_CFB, iv)
message = cipher.decrypt(data)
print(message.decode())

# RSA
message = 'hello, world!'
# 生成密钥
key_pair = RSA.generate(1024)
pub_key = RSA.importKey(key_pair.publickey().exportKey())
pri_key = RSA.importKey(key_pair.exportKey())
# 加密
encryptor = PKCS1_OAEP.new(pub_key)
data = encryptor.encrypt(message.encode())
print(data)
# base64 编码
print(base64.b64encode(data))
# 解密
decryptor = PKCS1_OAEP.new(pri_key)
message = decryptor.decrypt(data)
print(message.decode())

测试

单元测试

使用 pytest 来做单元测试。pytest 会自动寻找当前目录及子目录下所有 test_*.py 文件,并运行其中所有以 test_ 开头的函数。

1
2
3
4
5
6
7
# 使用 assert 语句
def test_example():
assert 1 == 2

# 如果没有抛出指定错误则 fail
with pytest.raise(ExpectedError):
...

覆盖率测试

使用 coverge 来做代码覆盖率测试

  • 命令行使用 coverage run -m pytest [args],会在当前目录下生成 .coverage 文件

  • coverage report -m:显示生成的报告
  • coverage html:以网页的形式展示报告,可以互动

Python 包

一般的 python 包的结构如下:

1
2
3
4
5
6
pkg_name
├── pkg_name
│ ├── module1.py
│ └── module2.py
├── README.md
└── setup.py

打包

使用 setuptools 进行打包,创建 setup.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from setuptools import setup

setup(
name='<name>',
version='<version>',
author='<author>',
packages=['<dir1>', ...]
install_requires=[
'<package1>',
...
],
entry_points={
'console_scripts': [
'<name> = <package>.<module>:<function>',
...
]
})
  • packages:需要被打包的代码所在的文件夹
  • install_requires:项目依赖项
  • console_scripts:将项目中的某个函数注册为可执行命令

版本编号

一般使用三位数字的编号,例如 0.1.2。分别对应大版本、小版本和补丁。增加版本号的规则如下:

  • 增加大版本编号:存在与之前版本不兼容的情况
  • 增加小版本编号:增加新功能,但不影响之前的版本
  • 增加补丁编号:修改 bug

安装

pip install -e .:在当前目录下寻找 setup.py 文件并以编辑模式安装包,即修改代码后不需要重新安装包。

简单来说,当安装包时,文件内容会被复制到一个会被 python 搜索的文件夹中。大部分情况是复制到 python 安装目录下的 site-packages 文件夹。可以使用如下方法查看 python 的搜索路径:

1
2
import sys
sys.path

默认参数

默认参数语句总在 def 定义函数时被求值,且仅执行一次。因此在将可变对象作为函数的默认参数时需要注意:

1
2
3
4
5
6
def func(data=[]):
data.append(1)
return data

func()
func()

上述代码返回值分别为 [1][1, 1]