Multi Processing and Threading

進程和線程最主要的區別是,多進程中所有 variables 都被複製一份,而多線程中被所有線程共享。

多進程模塊 multiprocessing,進程間的數據交互可以使用 Queue,它在多個進程中共享。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing import Process, Queue

def func_write(q):
for data in ["A", "B"]: q.put(data)
def func_read(q):
while True:
data = q.get(True)
print(data)

q = Queue() # can be modify by multi processes, not copy
pw = Process(target=func_write, args=(q,))
pr = Process(target=func_read, args=(q,))
pw.start()
pr.start()
pw.join() # main process wait for pw finishing
pr.terminate() # kill endless loop process

外部進程模塊 subprocess: status, output = subprocess.getstatusoutput(cmd)

多線程模塊 threading,啟動方法與多進程一致。多進程和多線程 class 都有一個 .join() 方法,會使主進程/線程阻塞在 join 這一行;也都有一個 .daemon 屬性,設置為 True 時主進程/線程運行完成時會直接結束並結束掉這個子進程/線程,否則會在運行到最後時等待子進程/線程結束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import os, time
import threading, multiprocessing

def loop():
print('pid=%d, ppid=%d, thread %s is running...' % (
os.getpid(), os.getppid(), threading.current_thread().name
))
n = 0
while n < 3:
n = n + 1
print('thread %s >>> %s' % (threading.current_thread().name, n))
time.sleep(2)
print('pid=%d, ppid=%d, thread %s end' % (
os.getpid(), os.getppid(), threading.current_thread().name
))

print('pid=%d, ppid=%d, thread %s is running...' % (
os.getpid(), os.getppid(), threading.current_thread().name
))

# t = threading.Thread(target=loop, name='LoopThread')
# t.daemon = True

t = multiprocessing.Process(target=loop)
# t.daemon = True

t.start()
# t.join()

print('pid=%d, ppid=%d, main thread %s finished.' % (
os.getpid(), os.getppid(), threading.current_thread().name
))

使用進程池批量創建進程處理任務並得到返回值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from multiprocessing import Pool

pool = Pool(N_PROCESSES)
data_per_process = len(data) // N_PROCESSES + 1
pool_res = []

for i in range(0, len(data), data_per_process):
i_data = data[i: i+data_per_process]
res = pool.apply_async(
target_func, args=(i_data,)
)
pool_res.append(res)
pool.close()
pool.join()

for res in res_pool:
res = res.get()

Flask

響應 HTTP 請求,構造返回:

  • 請求:[GET] URL、地址欄參數 | [POST] 表單、文本、JSON、二進制文件
  • 返回:文本(HTML)、JSON、base64 二進制

基本的使用方法是創建一個 Flask 對象,然後通過 @app.route() 將函數綁定到 url 上,函數的 return 為 HTML 請求的響應(例如返回一個 HTML 文本,那麼響應的 Header 中 Content-Type: text/html),最後通過 app.run() 啟動。

請求時的一些參數或 post body 或文件 等等 的內容通過 flask.request 獲取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from flask import Flask, request

app = Flask(__name__)

# decorator
@app.route('/', methods=['GET', 'POST'])
def home():
return '<h1>Home</h1>'

@app.route('/signin', methods=['GET'])
def signin_form():
# post form when click sign in
return ('<form action="/signin" method="post">'
'<p><input name="username"></p>'
'<p><input name="password" type="password"></p>'
'<p><button type="submit">Sign In</button></p>'
'</form>')

@app.route('/signin', methods=['POST'])
def signin():
if request.form['username']=='admin' and request.form['password']=='pwd':
return '<h3>Hello, admin!</h3>'
return '<h3>Bad username or password.</h3>'

app.run()

上述帳號信息通過 POST 傳輸,地址欄中不會展示傳輸的數據,可以通過 request.form 拿到 body 中的內容(當請求的 Content-Type: application/x-www-form-urlencoded 時)。信息也可以通過 GET 傳輸,GET 請求沒有 body,通過地址傳輸信息,類似 curl "127.0.0.1:5000/api/login?username=admin&password=pwd",通過 request.args.get() 得到信息。

除了裝飾器,還可以通過 add_url_rule 將函數與 url 綁定,app.run 時可設置 ip 為 "0.0.0.0" 啟動局域網訪問(http://127.0.0.1:5000/ -> http://10.x.x.x:5000/)。

1
2
3
4
5
6
7
8
9
10
def login():
if (request.args.get('username', type=str) == "admin"
and request.args.get('password', type=str) == "pwd"):
return '<h3>Hello, admin!</h3>'
return '<h3>Bad username or password.</h3>'
# bind func using `add_url_rule` instead of decorator
# url, endpoint (for url_for(), = None is ok), func, methods=['POST','GET'] default is GET only
app.add_url_rule("/api/login", "login", login)

app.run("0.0.0.0", 5000)

url 中也可直接設置參數,這些參數則可作為綁定的函數的參數傳入。

對於信息的返回,除了 HTML 文本外經常還會返回 json,使用 jsonify 返回的 Content-Type 會設置為 application/json,而使用 Python json.dumps 則為 text/html

對於信息的讀取,二進制文件,可以 POST Content-Type: multipart/form-data 通過 request.files 得到;json 可以通過 request.get_json 得到。如果 header 中沒有正確設置 Content-Type,可以通過 request.get_data(as_text=True) -> json.loads() 讀取(任何 post str: text/html etc 都可以通過 get_data 獲取 decode('utf-8'))。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import json
from flask import jsonify, Response
# int/string is optional
@app.route('/hello/<int:userid>/<string:username>', methods=['GET'])
def hello(userid, username):
return_dict = {"id": userid, "name": username}
# return json.dumps(return_dict)
return jsonify(return_dict)

@app.route('/api/json', methods=['POST'])
def json2html():
print("json api post content type:", request.content_type)
if request.get_json() is not None:
json_data = request.get_json()
html = "<p>content type is application/json</p>"
else:
html = "<p>content type is text/html</p>"
json_data = json.loads(request.get_data(as_text=True))
for key in json_data:
html += f"<p>{key}: {json_data[key]}</p>"
return html

import io
import base64
from PIL import Image
@app.route('/api/image_base64', methods=['POST'])
def rotate_image():
print("image api post content type:", request.content_type)
data = {"successed": False}
if request.method == 'POST' and request.files.get('image'):
image = request.files["image"].read()
image = Image.open(io.BytesIO(image)).convert("RGB")
image = image.rotate(90)

buffer = io.BytesIO()
image.save(buffer, format="JPEG")
result_img_str = base64.b64encode(buffer.getvalue()).decode('ascii')
data["base64_image"] = result_img_str
data["successed"] = True
return jsonify(data)

@app.route('/api/image_binary/<string:image_id>', methods=['GET'])
def load_image(image_id):
image_dir = ""
image = open(image_dir+image_id+".jpg", 'rb').read()
return Response(image, status=200, mimetype="image/jpeg")
return image, 200, {'Content-Type': 'image/jpeg'}
# view directly from browser

app.run("0.0.0.0", 5000)

urllib and requests

發送 HTTP 請求到指定的頁面,構造請求的內容,解析返回。

urllib

使用 urllib.request.urlopen(url) 可直接發送一個 GET 請求到指定 url。返回無論是 文本、HTML、JSON、base64 都作為文本讀入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from urllib import request

with request.urlopen('http://127.0.0.1:5000/') as f:
data = f.read()
# data = json.loads(data) # if response is json data
print('Status:', f.status, f.reason) # f.getcode()=f.status
for k, v in f.getheaders():
print('%s: %s' % (k, v))
print('Data:', data.decode('utf-8'))

# Status: 200 OK
# Content-Type: text/html; charset=utf-8
# Content-Length: 13
# Server: Werkzeug/2.0.0 Python/3.7.9
# Date: Mon, 15 Feb 2021 06:00:00 GMT
# Data: <h1>Home</h1>

請求中需要包含 HTTP Header 就需要先構造一個 Request。

1
2
3
req = request.Request('http://www.douban.com/')  # method='GET'
req.add_header('User-Agent', 'Mozilla/5.0 (iPhone; CPU iPhone OS 14_5_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1 Mobile/15E148 Safari/604.1')
request.urlopen(req)

POST 請求需要用到 urlopendata= 參數。默認 Content-Type: application/x-www-form-urlencoded (form)。另一種格式是 json,需要先構造一個 Request(需要 header)。含有 data 默認 method 為 POST?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from urllib import request, parse

# post form
login_data = parse.urlencode([
('username', 'admin'),
('password', 'pwd'),
]) # username=admin&password=password form

with request.urlopen('http://127.0.0.1:5000/signin', data=login_data.encode('utf-8')) as f:
data = f.read()
print('Status:', f.status, f.reason)
for k, v in f.getheaders():
print('%s: %s' % (k, v))
print('Data:', data.decode('utf-8'))

# post json
import json
login_data = json.dumps({
'username': 'admin',
'password': 'pwd',
}, ensure_ascii=False)
login_data = bytes(login_data, 'utf-8') # login_data.encode('utf-8')
headers = {'Content-Type':'application/json'}
req = request.Request("http://127.0.0.1:5000/api/json",
headers=headers, data=login_data, method='POST')
with request.urlopen(req) as f:
data = f.read()
print('Data:', data.decode('utf-8'))

requests

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import requests

# get with headers
r = requests.get(
'https://www.douban.com/',
headers={'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 14_5_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1 Mobile/15E148 Safari/604.1'}
) # headers 可省略
r.status_code, r.headers # headers of response
r.text # text '<h3>Hello, admin!</h3>'
r.content # show binary of any pure return b'<h3>Hello, admin!</h3>'

# get binary image with 'Content-Type': 'image/jpeg'
r = requests.get("https://i0.hdslb.com/bfs/archive/.jpg")
image = Image.open(BytesIO(r.content))

# get with url params
r = requests.get('http://127.0.0.1:5000/api/login', params={'username': 'admin', 'password': 'pwd'})
r.url # 'http://127.0.0.1:5000/api/login?username=admin&password=pwd'

# get json object
r = requests.get('https://query.yahooapis.com/v1/public/yql?q=select%20*%20from%20weather.forecast%20where%20woeid%20%3D%202151330&format=json')
r = requests.get('http://127.0.0.1:5000/hello/39/admin')
r.json() # dict {'id': 39, 'name': 'admin'}, = json.loads(r.content.decode("utf-8"))


# post form (application/x-www-form-urlencoded)
r = requests.post('http://127.0.0.1:5000/signin', data={'username': 'admin', 'password': 'pwd'})

# post json
login_data = {'username': 'admin', 'password': 'pwd'}
r1 = requests.post('http://127.0.0.1:5000/api/json', data=json.dumps(login_data), headers={'Content-Type': 'application/json; charset=utf-8'})
r2 = requests.post('http://127.0.0.1:5000/api/json', json=login_data)

# post binary file (multipart/form-data)
image = open(image_path, 'rb').read()
payload = {'image': image}
r = requests.post('http://127.0.0.1:5000/api/image_base64', files=payload).json()
rotated_image = base64.b64decode(r['base64_image'])
Image.open(io.BytesIO(rotated_image))

curl

http://www.ruanyifeng.com/blog/2019/09/curl-reference.html

HTTP API

一個 Data Class 來定時拉取、處理、更新數據,將結果調用 HTTP Class 的更新函數保存到其 .data 中。HTTP Class 更新和返回數據的方法需要有線程鎖,保證這兩個函數在兩個 Class(處於兩個線程)中不會同時調用(改變和獲取數據不同時發生)。由於兩個 Class 都需要 serve forever,且更新的頻率會遠低於查詢的,為了不互相阻塞需要在兩個線程中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import time
import datetime
import threading
from flask import Flask, request, jsonify

class RunSearch:
def __init__(self):
self.app = Flask(__name__)
self.data = None
# same lock can only be acquire once before release
self.mutex = threading.Lock()
self.app.config['JSON_AS_ASCII'] = False # jsonify allow none-ascii
self.app.add_url_rule('/api/search', 'search', self.http_server)

def http_server(self):
try:
query_key = request.args.get("key", type=str)
ret, result = self.get_data(query_key)
result = {"code": ret, "result": result}
except Exception as e:
print(e)
result = {"code": -1, "result": []}
return jsonify(result)

def get_data(self, query_key):
# prohibit data update while get
try:
self.mutex.acquire()
if not isinstance(self.data, dict):
return -1, []
elif query_key not in self.data:
return -2, []
return 0, self.data[query_key]
except Exception as e:
print(e)
return -1, []
finally:
self.mutex.release()

def update_data(self, new_data):
try:
self.mutex.acquire()
self.data = new_data
print("data update successful")
except Exception as e:
print(e)
finally:
self.mutex.release()

# simple code without except
# with self.mutex:
# self.data = new_data
# print("data update successful")

class Data(threading.Thread):
# hold instance of http server
def __init__(self, http_server):
super().__init__() # threading.Thread.__init__(self)
self.http_server = http_server
self.last_update_logdate = None
# self.daemon = True

def run(self):
while True:
logdate = datetime.datetime.now() + datetime.timedelta(days=-1)
if logdate.strftime("%Y%m%d") == self.last_update_logdate:
print("already have latest data")
time.sleep(10)
continue
new_data = self.generate_new_data(logdate)
self.http_server.update_data(new_data)
self.last_update_logdate = logdate.strftime("%Y%m%d")
time.sleep(10)

def generate_new_data(self, logdate):
return {"example": logdate}

http_server = RunSearch()
data = Data(http_server)
data.start() # won't stop here by while loop, this thread won't stop
http_server.app.run("0.0.0.0", 5000) # Running on http://114.224.62.114:5000/
# ctrl+c to stop Flask, main thread end -> daemon child thread end
# daemon=False ctrl+c twice
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import time, threading
from flask import Flask, request

class Executor:
def __init__(self):
self.shutdown = False

def serve_forever(self):
while not self.shutdown:
print("running")
time.sleep(10)
print("end")

class ExecutorHttpThread(threading.Thread):
def __init__(self, executor):
super().__init__()
self.executor = executor
self.daemon = True # main thread won't waiting for http thread when ending
self.app = Flask("ExecutorHttp")
self.app.add_url_rule('/shutdown', None, self.shutdown)

def shutdown(self):
shutdown = request.environ.get('werkzeug.server.shutdown')
self.executor.shutdown = True # shutdown executor
if shutdown is not None: shutdown() # shutdown http immediately
return "server shutdown"

def run(self):
self.app.run()

executor = Executor()
http = ExecutorHttpThread(executor)
http.start() # -> call run()
executor.serve_forever()