異步 Python:基礎

2019-06-17

本文簡單介紹異步(asynchronous)的基本概念。

為何需要 async?

許多操作是會等待 I/O 的,為何不把等待的時間拿去做別的事情?異步主要就是把正在等待 I/O 的行程先暫停掉,讓其他可以跑的去跑,最大化 CPU 使用率。

協程(coroutine)代表可以被打斷執行的行程,這裡說的行程不是作業系統的 process,是指一個執行流程,例如一段 code。

async 程式不平行執行,透過 "event loop" 安排誰要執行

雖說是異步,但它是跑在一個 thread 上的,並沒有多 thread 會遇到的搶資源問題(multi-threading 是系統層面的平行)。因為一次只做一件事情,只要你的 atomic 操作中沒有夾雜 await,執行就絕對不會打斷。JavaScript 骨子裡就是 async,因為網頁載入需要做大量 request I/O 所以非常適合。不過這也造成許多問題,如 "callback hell",但現在已經有許多解決方法了。雖然看起來好像它「同時」做很多事情,但事實上一個時間點只會有一個東東在跑,不信的話你在 script 裡插入 while(1);,會發現整個網頁停擺。

其實裡面有個 loop 在管理一堆協程,有協程開始等待 I/O 就換下一個執行,那個 loop 就是一直重複做著這件事情。那如何知道某個 coroutine 在等待 I/O?在 Python 可以使用關鍵字 await 去「等待」某個未完成的 I/O,如此 loop 會知道這個 coroutine 準備要睡覺了,就換下一個跑,

所以常常聽到的 async/await,就是在說這種模式:定義可被打斷的 async 行程,使用 await 指出哪裏會等待 I/O(可以打斷)。

JavaScript 的異步

JavaScript 的 async/await 其實底層是使用 Promise 實作的。JS 因為原生就有異步,所以語法比較單純。因為我一開始是拿 JS 來理解的,所以這裡很硬的插入 JS 講解。

function sleep(t) {
  return new Promise((resolve) => setTimeout(resolve, t));
}

async function main(n) {
  let indent = '\t'.repeat(n);

  console.log(`${indent}[A]`);
  await sleep(0);
  console.log(`${indent}[B]`);
  await sleep(0);
  console.log(`${indent}[C]`);
}

console.log("(0)\t(1)\t(2)\n");
main(0);
main(1);
main(2);

不同行程(async function)顯示的位置會不一樣,如此可看出這三個 async function 的執行過程:

(0)     (1)     (2)

[A]
        [A]
                [A]
[B]
        [B]
                [B]
[C]
        [C]
                [C]

就算 await 的時間是 0,執行也會被切斷。那如果把 await 拔掉呢?

function sleep(t) {
  return new Promise((resolve) => setTimeout(resolve, t));
}

async function main(n) {
  let indent = '\t'.repeat(n);

  console.log(`${indent}[A]`);
  // await sleep(0);
  console.log(`${indent}[B]`);
  await sleep(0);
  console.log(`${indent}[C]`);
}

console.log("(0)\t(1)\t(2)\n");
main(0);
main(1);
main(2);

當然就不會被中斷了啊。

(0)     (1)     (2)

[A]
[B]
        [A]
        [B]
                [A]
                [B]
[C]
        [C]
                [C]

在 JavaScript 中,呼叫 async function 會建立 Promise,就是直接丟進 event loop 執行,反正遇到 await 中斷掉就對了。

async function anyway(n) {
  console.log(`async${n} before await`);
  await new Promise((r) => setTimeout(r, 0));
  console.log(`async${n} after await`);
}

anyway(1);
console.log('junk1');
anyway(2);
console.log('junk2');
anyway(3);
console.log('junk3');

輸出:

async1 before await
junk1
async2 before await
junk2
async3 before await
junk3
async1 after await
async2 after await
async3 after await

歐,講太多 JavaScript 了。

Python 的異步

如果 Python 有如上的能力,就會產生一些效能很誇張的東西。例如有人用異步加上 uvloop(高效的 event loop 實作),寫了一個號稱打趴所有 web server 的 japronto:https://github.com/squeaky-pl/japronto

Python 3.7 加入了 asyncio,支援很多 non-blocking I/O 操作,讓你很簡單的寫出異步程式。

import asyncio

async def main():
    print(123)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

因為 Python 不是 JavaScript 所以你要自己開個 loop 來用。main() 是一個 coroutine 物件,呼叫的時候不會執行,只有你把它丟到 event loop 去跑才會真的跑。corountine 類似 generator,類比起來就是把 await 想成 yield。你可以用 send(...) 控制 generator 或者 coroutine 的執行。

Coroutine / Task / Future 差在哪裏?

這三個東西都可以被 await,都必須在 event loop 裡面作用。

  • Coroutine 是最基本單元
    • await 一個 Coroutine 會等它 return
      • 如果把它想成 generator 就是等他跑完,拿回傳值
    • 定義 coroutine 可以用 async def@asyncio.coroutine
    • Coroutine 可以被暫停
      • async def: 利用 await 來說哪裡可以被暫停
      • @asyncio.coroutine: 利用 yield from 來說哪裡可以被暫停(也是等待 awaitable 結束)
  • Future
    • 未來會跑出的結果
    • 建立
      • future = asyncio.Future(loop=???)
      • future = loop.create_future()
    • result = await future: 等到結果被寫入才會繼續
      • 可以在其他 coroutine 中利用 future.set_result(result) 寫入結果
      • 也可以寫入例外 future.set_exception(e), await 的那行就會拋出 e
    • 可以用 future.result() 拿值、future.exception() 拿例外(假如有設)
  • Task 繼承自 Future,只是裡面多包了一個 coroutine
    • 建立
      • task = loop.create_task(coroutine)
      • task = asyncio.Task(coroutine, loop=???)
    • 跟 coroutine 的差異:如果 loop 是跑著的狀態,task 一被建立就會直接開始執行裡面的 coroutine
    • 不是跑在 task 裡的 coroutine 在 await 時都會卡住 main thread

Async Socket

異步的出現其實主要就是用來最小化 IO 操作的無意義等待,像是網路連線。asyncio 裡面有些異步 socket 的方法可以用,寫起來甚至比原本的 socket 還單純。

  • Client

    import asyncio
    
    async def main():
        reader, writer = await asyncio.open_connection('127.0.0.1', 9487)
        writer.write(b'shit')
        print(await reader.read(4))
        writer.close() # 關閉 writer 就等於關閉連線
    
    asyncio.run(main())
    
  • Server

    import asyncio
    
    async def main():
        async def handler(reader, writer):
            writer.write(b'fuck')
            print(await reader.read(4))
            writer.close()
              
        server = await asyncio.start_server(handler, '127.0.0.1', 9487)
        await server.serve_forever()
    
    asyncio.run(main())
    

除此之外還有很多函式庫可以用:https://github.com/timofurrer/awesome-asyncio

異步執行 blocking 程式

一般來說可以這樣開 Thread 來跑 I/O blocking 程式

from threading import Thread
from time import sleep

threads = []

for i in range(10):
    thread = Thread(target=sleep, args=[1])
    threads.append(thread)

for t in threads:
    # 同時下去跑
    t.start()

for t in threads:
    t.join() # [卡住1秒]

或是這樣

from multiprocessing import Pool
from time import sleep

p = Pool(20) # 因為是 process, sleep() 要宣告在這行前面

async_results = [p.apply_async(sleep, [1]) for _ in range(40)]
for ar in async_results:
    ar.wait() # [卡住1秒] 因為用 20 個 process 跑 40 個 sleep(1)

p.map(sleep, [1] * 10) # [卡住1秒]

其實還可以這樣:

from concurrent.futures import (
    ThreadPoolExecutor,
    ProcessPoolExecutor)

from time import sleep

with ThreadPoolExecutor(10) as executor:
    list(executor.map(sleep, [1] * 10)) # [卡住1秒] 要用 list() 開啟 generator 才會開始跑

    futures = [executor.submit(sleep, 1) for _ in range(10)] # 這個跟 asyncio.Future 不一樣
    results = [future.result() for future in futures] # [卡住1秒]

以上只要等待執行結果時,main thread 都會卡住,這段時間 main thread 不能做任何事情。其實也沒必要卡住,可以把它丟進 event loop,這樣就可以 switch 去做別的事情了:

import asyncio

from concurrent.futures import ThreadPoolExecutor
from time import sleep

async def main():
    async def sleeps(n):
        print('start sleep')
        executor = ThreadPoolExecutor()
        loop = asyncio.get_running_loop()
        futures = [loop.run_in_executor(executor, sleep, 1) for _ in range(n)]
        await asyncio.gather(*futures) # 全部跑完之前 loop 還可以去跑其他 coroutine
        print('end sleep')
    
    wakeup = asyncio.Task(sleeps(10)) # 不會卡住
    await asyncio.sleep(0) # 這行會讓 loop 換到 sleeps(10) 執行,此時會 "start sleep"
    
    # 你可以同時做其他事情
    print('asyncio 好好用')

    await wakeup # [等他sleep完] 等的過程中 loop 還是可以去跑別的東西
    print('早安安')

asyncio.run(main()) # [卡]
  • 形如 loop.run_in_executor(None, func, ...) 不指定 executor 預設就是用 ThreadPoolExecutor

不同 thread 之間

  • 一個 thread 只會有一個 event loop 在跑
  • 考慮兩個 loop 在不同 thread 上跑,兩個不同的 loop 沒有辦法互相 await 各自的 Future,因為 Future 只會屬於其中一個 loop,別的 loop 不會知道我的 Future 被 set_result,因為是我這個 loop 去 set_result 的。
  • 可以在別的 thread 呼叫 loop 中 Future 的 set_result,但是不安全

所以有 call_soon_threadsafe 可以用

import asyncio
import time
from threading import Thread

loop = asyncio.get_event_loop()
future = loop.create_future()

async def main():
    await future
    print('done')

# 這個會被塞進 thread 跑
def set_future():
    print('sleep')
    time.sleep(1)
    print('set_result')
    # future.set_result(None) # 不好,因為 future 的 loop 不是跑在本 thread
    loop.call_soon_threadsafe(future.set_result, None) # 比較安全

Thread(target=set_future).start() # 這樣比較好

loop.run_until_complete(main())

還有一個是 asyncio.run_coroutine_threadsafe(...) 讓你在別的 thread 可以塞 coroutine 去跑

import asyncio
import time
from threading import Thread

loop = asyncio.get_event_loop()
future = loop.create_future()

async def main():
    await future
    print('done')

async def sleep_print(t, s):
    await asyncio.sleep(t)
    print(s)

# 這個會被塞進 thread 跑
def run_coro_after(t):
    print('sleep')
    time.sleep(1)
    print('run coroutine')
    asyncio.run_coroutine_threadsafe(
        sleep_print,
        1, 'message from other thread',
        loop=loop)

Thread(target=run_coro_after, args=[1]).start()
loop.run_until_complete(main())