<td id="6000o"><rt id="6000o"></rt></td>
  • <sup id="6000o"><button id="6000o"></button></sup>
  • <optgroup id="6000o"></optgroup>
  • <strong id="6000o"></strong>
  • Python知識分享網 - 專業的Python學習網站 學Python,上Python222
    Python asyncio 庫源碼分析
    匿名網友發布于:2023-06-26 12:33:17
    (侵權舉報)

    前言

    本著 「路漫漫其修遠兮, 吾將上下而求索」 的精神。終于要開始深入研究 Python 中 asyncio 的源碼實現啦。

    本文章可能篇幅較長,因為是逐行分析 asyncio 的實現,也需要讀者具有一定的 asyncio 編碼經驗和功底,推薦剛開始踏上 Python 異步編程之旅的朋友們可以先從官方文檔入手,由淺入深步步為營。

    若在讀的你對此感興趣,那么很開心能與你分享我的學習成果。

    本次源碼分析將在 Python 3.11.3 的版本上進行探索。

    PS: 筆者功力有限,若有不足之處還望及時指正,因為是逐行分析所以過程稍顯枯燥。
    更建議屏幕前的你打開 source code 跟隨整篇文章花費一定的時間一起研究,盡信書不如無書,對此文持以質疑的態度去閱讀將有更大的收獲。

    全局代碼

    在 Python 中,當一個模塊被導入時,Python 解釋器會執行該模塊中的全局代碼。

    而全局代碼則是指在模塊中未被封裝在函數或類中的代碼,它們會在模塊被導入時率先執行。

    這意味著全局代碼可以包括變量的初始化、函數的定義、類的定義、條件語句、循環等。這些代碼在模塊被導入時執行,用于設置模塊的初始狀態或執行一些必要的操作。

    查看源碼時,一定不要忽略全局代碼。

    PS: 一個小技巧,查看全局代碼最好的辦法就是將所有的 fold 都先收起來,vim 中使用 zM 快捷鍵即可。

    導入模塊

    研究任何一個模塊,我們需先從 import 開始,因為那里的代碼會率先執行:

     

    import asyncio
    

     

    點進 asyncio 模塊之后,可以發現它的入口文件 __init__ 篇幅是較為簡短的:

     

    import sys
    
    from .base_events import *
    from .coroutines import *
    from .events import *
    from .exceptions import *
    from .futures import *
    from .locks import *
    from .protocols import *
    from .runners import *
    from .queues import *
    from .streams import *
    from .subprocess import *
    from .tasks import *
    from .taskgroups import *
    from .timeouts import *
    from .threads import *
    from .transports import *
    
    # __all__ 指的是 from asyncio import *
    # 時 * 所包含的資源
    __all__ = (base_events.__all__ +
               coroutines.__all__ +
               events.__all__ +
               exceptions.__all__ +
               futures.__all__ +
               locks.__all__ +
               protocols.__all__ +
               runners.__all__ +
               queues.__all__ +
               streams.__all__ +
               subprocess.__all__ +
               tasks.__all__ +
               threads.__all__ +
               timeouts.__all__ +
               transports.__all__)
    
    # 若是 win32 平臺, 則添加 windows_events 中的 __all__
    if sys.platform == 'win32':
        from .windows_events import *
        __all__ += windows_events.__all__
    # 若是 unix 平臺, 則添加 windows_events 中的 __all__
    else:
        from .unix_events import *
        __all__ += unix_events.__all__
    

     

    base_events

    base_events 是在 asyncio 入口文件中第一個被 import 的模塊,提供了一些基本的類和設置項,如 BaseEventLoop 以及 Server 等等 ...

    base_events 中全局執行的代碼不多,以下是其導入的 build-in package:

     

    import collections
    import collections.abc
    import concurrent.futures
    import functools
    import heapq
    import itertools
    import os
    import socket
    import stat
    import subprocess
    import threading
    import time
    import traceback
    import sys
    import warnings
    import weakref
    try:
        import ssl
    except ImportError:  # pragma: no cover
        ssl = None
    

     

    自定義的 package:

     

    from . import constants
    from . import coroutines
    from . import events
    from . import exceptions
    from . import futures
    from . import protocols
    from . import sslproto
    from . import staggered
    from . import tasks
    from . import transports
    from . import trsock
    from .log import logger
    

     

    關注幾個有用的信息點:

    # 該模塊只允許通過 * 導入 BaseEventLoop 以及 Server 類
    __all__ = 'BaseEventLoop','Server',
    
    
    # 定義異步事件循環中允許的最小計劃定時器句柄數
    # loop.call_later() 以及 loop.call_at() 都是在創建定時器句柄
    # 當計劃定時器句柄的數量低于該值,事件循環可能會采取一些優化措施
    # 例如減少時間片的分配或合并定時器句柄,以提高性能和效率
    _MIN_SCHEDULED_TIMER_HANDLES = 100
    
    # 定義了被取消的定時器句柄數量與總計劃定時器句柄數量之間的最小比例
    # 如果取消的定時器句柄數量超過了計劃定時器句柄數量的這個比例
    # 事件循環可能會采取一些優化措施,例如重新分配或重新排序定時器句柄列表,以提高性能和效率
    _MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
    
    # 一個布爾值, 用來判斷當前 socket 是否支持 IPV6
    _HAS_IPv6 = hasattr(socket, 'AF_INET6')
    
    # 事件循環 SELECT 時的等待事件
    MAXIMUM_SELECT_TIMEOUT = 24 * 3600
    

     

    除此之外,還有關于 socket 部分的:

     

    # 當前的 socket 模塊是否具有非延遲特性
    if hasattr(socket, 'TCP_NODELAY'):
        def _set_nodelay(sock):
            if (sock.family in {socket.AF_INET, socket.AF_INET6} and
                    sock.type == socket.SOCK_STREAM and
                    sock.proto == socket.IPPROTO_TCP):
                # 啟用 tcp 協議非延遲特性,即禁用 Nagle 算法
                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
    else:
        def _set_nodelay(sock):
            pass
    

     

    constants

    constants 是在 base_events 中第一個被 import 的。其作用是定義一些通過 asyncio 進行網絡編程時的常量數據。

    它的源碼雖然簡單但涉及知識面較廣,基本是與網絡編程相關的,若想深入研究還需下一陣苦功夫:

     

    import enum
    
    # 在使用 asyncio 進行網絡編程時
    # 寫入操作多次失?。ㄈ珂溄觼G失的情況下)記錄一條 warning 日志
    LOG_THRESHOLD_FOR_CONNLOST_WRITES = 5
    
    # 在使用 asyncio 進行網絡編程時
    # 若對方斷聯、則在重試 accept 之前等待的秒數
    # 參見 selector_events._accept_connection() 具體實現
    ACCEPT_RETRY_DELAY = 1
    
    # 在 asyncio debug 模式下需捕獲的堆棧條目數量(數量越大,運行越慢)
    # 旨在方便的為開發人員追蹤問題,例如找出事件循環中的協程或回調的調用路徑
    DEBUG_STACK_DEPTH = 10
    
    # 在使用 asyncio 進行網絡編程時
    # SSL/TSL 加密通信握手時可能會產生斷聯或失敗
    # 而該常量是指等待 SSL 握手完成的秒數,他和 Nginx 的 timeout 匹配
    SSL_HANDSHAKE_TIMEOUT = 60.0
    
    # 在使用 asyncio 進行網絡編程時
    # 等待 SSL 關閉完成的秒數
    # 如 asyncio.start_server() 方法以及 asyncio.start_tls() 方法
    # 在鏈接關閉后,會使用 n 秒來進行確認對方已經成功關閉了鏈接
    # 若在 n 秒內未得到確認,則引發 TimeoutError
    SSL_SHUTDOWN_TIMEOUT = 30.0
    
    # 在使用 loop.sendfile() 方法傳輸文件時
    # 后備緩沖區的大?。ㄓ行┪募到y不支持零拷貝,因此需要一個緩沖區)
    SENDFILE_FALLBACK_READBUFFER_SIZE = 1024 * 256
    
    # 當在 SSL/TSL 握手期間,若 read 的內核緩沖區數據大小
    # 超過了下面設定的值,則會等待其內核緩沖區大小降低后
    # 再次進行 read
    FLOW_CONTROL_HIGH_WATER_SSL_READ = 256  # KiB
    
    # 同上,只不過這個是寫入的上限流量閾值
    FLOW_CONTROL_HIGH_WATER_SSL_WRITE = 512  # KiB
    
    class _SendfileMode(enum.Enum):
        UNSUPPORTED = enum.auto()
        TRY_NATIVE = enum.auto()
        FALLBACK = enum.auto()
    

     

    coroutines

    coroutines 是在 base_events 中第二個被 import 的。其作用是提供一些布爾的判定接口,如判斷對象是否是 coroutine、當前是否是 debug 模式等等。

    其全局代碼不多,暫可不必太過關注:

     

    # 該模塊只允許通過 * 導入 iscoroutinefunction 以及 iscoroutine 函數
    __all__ = 'iscoroutinefunction', 'iscoroutine'
    
    
    # ...
    _is_coroutine = object()
    
    # 優先檢查原生協程以加快速度
    # asyncio.iscoroutine
    _COROUTINE_TYPES = (types.CoroutineType, types.GeneratorType,
                        collections.abc.Coroutine)
    _iscoroutine_typecache = set()
    

     

    events

    events 是在 base_events 中第三個被 import 的。其作為是定義一些與事件循環相關的高級接口或定義一些事件循環的抽象基類供內部或開發者使用。

    注意他在這里還 import 了自定義模塊 format_helpers,但是 format_helpers 中并未有任何運行的全局代碼,所以后面直接略過了:

     

    from . import format_helpers
    

     

    以下是它的全局代碼:

     

    # __all__ 中難免會看到一些熟悉的身影
    # 比如 get_event_loop get_running_loop 等等
    __all__ = (
        'AbstractEventLoopPolicy',
        'AbstractEventLoop', 'AbstractServer',
        'Handle', 'TimerHandle',
        'get_event_loop_policy', 'set_event_loop_policy',
        'get_event_loop', 'set_event_loop', 'new_event_loop',
        'get_child_watcher', 'set_child_watcher',
        '_set_running_loop', 'get_running_loop',
        '_get_running_loop',
    )
    
    # ...
    
    # 該變量有 2 個作用
    # 分別是決定如何創建和獲取事件循環對象
    #   - 比如一個線程一個事件循環
    #   - 或者一個任務一個事件循環
    # 再者就是獲取事件循環,通過 get_event_loop_policy 方法即可拿到該變量
    _event_loop_policy = None
    
    # 一把線程鎖、用于保護事件循環策略的實例化
    _lock = threading.Lock()
    
    # ...
    
    class _RunningLoop(threading.local):
        loop_pid = (None, None)
    
    # 這個好像是獲取以及設置當前的 running loop,由 _get_running_loop 使用。
    # 它將 loop 和 pid 進行綁定
    _running_loop = _RunningLoop()
    
    # 為了一些測試而取的以 _py 開始的別名
    _py__get_running_loop = _get_running_loop
    _py__set_running_loop = _set_running_loop
    _py_get_running_loop = get_running_loop
    _py_get_event_loop = get_event_loop
    _py__get_event_loop = _get_event_loop
    
    
    try:
        # 純注釋翻譯:
        # get_event_loop() 是最常調用的方法之一
        # 異步函數。純 Python 實現是
        # 大約比 C 加速慢 4 倍。
        # PS: C 語言的部分就先暫時不看了
        from _asyncio import (_get_running_loop, _set_running_loop,
                              get_running_loop, get_event_loop, _get_event_loop)
    except ImportError:
        pass
    else:
        # 為了一些測試而取的以 _c 開始的別名
        _c__get_running_loop = _get_running_loop
        _c__set_running_loop = _set_running_loop
        _c_get_running_loop = get_running_loop
        _c_get_event_loop = get_event_loop
        _c__get_event_loop = _get_event_loop
    

     

    exceptions

    exceptions 是在 base_events 中第四個被 import 的。其主要作用是定義了一些異常類:

     

    __all__ = ('BrokenBarrierError',
               'CancelledError', 'InvalidStateError', 'TimeoutError',
               'IncompleteReadError', 'LimitOverrunError',
               'SendfileNotAvailableError')
    

     

    有些異常類中實現了 __reduce__() 方法。該方法允許自定義對象在被序列化或持久化過程中的狀態和重建方式。

    示例:

     

    import pickle
    from typing import Any
    
    
    def ser_fn(name):
        return name
    
    
    class Example:
    
        def __init__(self, name) -> None:
            self.name = name
    
        def __reduce__(self) -> str | tuple[Any, ...]:
            """
            反序列化時,將調用 ser_fn 并且傳入參數
            下面注釋的第一個例子是重新實例化一下
            第二個例子是更直觀的演示該方法的作用
            """
            # return (__class__, (self.name, ))
            return (ser_fn, ("反序列化結果", ))
    
    
    if __name__ == "__main__":
        obj = Example("instance")
    
        serializer = pickle.dumps(obj)
        deserializer = pickle.loads(serializer)
    
        print(deserializer)
    
    # 反序列化結果
    

     

    futures

    futures 是在 base_events 中第五個被 import 的。其作用是定義了 asyncio 中未來對象的實現方式。

    在看其全局代碼之前,首先推薦閱讀官方文檔:

    asyncio futures 介紹

    該 futures 和 collections 的 futures 有些許區別,futures 也算是 Python 異步編程中比較難以理解的一個點,后續有機會再和大家詳細探討。

    futures 文件中導入了 base_futures 自定義模塊,但 base_futures 中暫時沒有值得關注的點,所以先在此略過:

     

    from . import base_futures
    
    # 下面 3 個都已經粗略看過一次
    from . import events
    from . import exceptions
    from . import format_helpers
    

     

    其全局代碼如下:

     

    
    # 一個函數,用于判斷對象是否是一個未來對象
    isfuture = base_futures.isfuture
    
    
    # 用于表明未來對象的當前一些狀態的標志
    # 分別是 等待執行、取消執行、完成執行
    _PENDING = base_futures._PENDING
    _CANCELLED = base_futures._CANCELLED
    _FINISHED = base_futures._FINISHED
    
    
    # 棧的調試 LOG 級別
    STACK_DEBUG = logging.DEBUG - 1  # heavy-duty debugging
    
    # ...
    
    class Future:
        pass
    
    _PyFuture = Future
    
    
    # ...
    try:
        import _asyncio
    except ImportError:
        pass
    else:
        Future = _CFuture = _asyncio.Future
    

     

    protocols

    protocols 是在 base_events 中第六個被 import 的。其作用主要是定義一些內部協議。

    如 '緩沖區控制流協議'、'接口數據報協議'、'子進程調用接口協議' 等等。

    暫先關注其 __all__ 即可:

     

    __all__ = (
        'BaseProtocol', 'Protocol', 'DatagramProtocol',
        'SubprocessProtocol', 'BufferedProtocol',
    )
    

     

    sslproto

    sslproto 是在 base_events 中第七個被 import 的。其作用是定義和具體實現 SSL/TLS 協議。

    同默認的 socket 模塊不同,asyncio 所提供的流式傳輸是已經實現好了 SSL/TLS 協議功能的。

    下面先對 SSL/TLS 做一個簡短的介紹。

    SSL/TLS 是一個獨立的協議,其功能主要用于網絡通信的加密和安全,如數據加密、身份認證等等。
    TLS 的前身為 SSL 協議,是 SSL 的現代版和改進版。

    在 sslproto 中也導入了一些標準庫以及自定義的模塊:

     

    
    import collections
    import enum
    import warnings
    try:
        import ssl
    except ImportError:  # pragma: no cover
        ssl = None
    
    # 自定義模塊
    from . import constants
    from . import exceptions
    from . import protocols
    
    # 下面 2 個還沒看過,transports 可以大概瞅瞅,但 log 沒必要看
    # 他本質就是使用 logging 模塊獲得一個 log 對象
    # 名字就是當前的 package name,即為 asyncio
    from . import transports
    from .log import logger
    

     

    其全局代碼如下:

     

    
    # ...
    if ssl is not None:
        SSLAgainErrors = (ssl.SSLWantReadError, ssl.SSLSyscallError)
    

     

    除此之外,它還定義了一些類,如 'SSL 協議'、'應用協議狀態' 等等,這里不做細述。

    transports

    transports 在 sslproto 文件中被導入,主要定義一些傳輸類。

    如 '讀傳輸'、'寫傳輸'、'數據報傳輸'、'子進程接口傳輸' 等等,和 protocols 中的協議類關系較為密切。

    一般來說若要基于 asyncio 進行二次開發,如開發 http 協議的 web 服務程序等等,才會關注到這里。

    其下很多代碼看不到具體實現,直接看其 __all__ 變量吧:

     

    __all__ = (
        'BaseTransport', 'ReadTransport', 'WriteTransport',
        'Transport', 'DatagramTransport', 'SubprocessTransport',
    )
    

     

    staggered

    staggered 是在 base_events 中第八個被 import 的。其作用是如何支持正在運行的協程在時間點中錯開(主要針對 socket 網絡編程)。

    他實現了一個協程函數 staggered_race 以及導入了一些內部或自定義模塊:

     

    
    __all__ = 'staggered_race',
    
    import contextlib
    import typing
    
    from . import events
    from . import exceptions as exceptions_mod
    
    from . import locks
    from . import tasks
    
    async def staggered_race(...):
        pass
    

     

    locks

    locks 在 staggered 中被導入,其作用是實現了一些協程鎖。

    官方文檔:協程同步

    具體有 '同步鎖'、'事件鎖'、'條件鎖'、'信號量鎖'、'有界信號量鎖'、'屏障鎖'。相比于 threading 少了 '遞歸鎖' 和一些其他的鎖。

     

    __all__ = ('Lock', 'Event', 'Condition', 'Semaphore',
               'BoundedSemaphore', 'Barrier')
    

     

    mixins

    mixins 在 locks 中被導入,其作用是提供一些工具集功能。

    代碼量較少:

     

    import threading
    from . import events
    
    # 實現一把全局的線程同步鎖
    _global_lock = threading.Lock()
    
    class _LoopBoundMixin:
        _loop = None
    
        def _get_loop(self):
            loop = events._get_running_loop()
    
            if self._loop is None:
                with _global_lock:
                    if self._loop is None:
                        self._loop = loop
            if loop is not self._loop:
                raise RuntimeError(f'{self!r} is bound to a different event loop')
            return loop
    

     

    tasks

    tasks 在 locks 中被導入,其作用是定義 Task 對象、提供一些管理 Task 對象的高級接口。

     

    
    # 有很多熟悉的高級接口,均來自于 tasks 模塊
    __all__ = (
        'Task', 'create_task',
        'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
        'wait', 'wait_for', 'as_completed', 'sleep',
        'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
        'current_task', 'all_tasks',
        '_register_task', '_unregister_task', '_enter_task', '_leave_task',
    )
    

     

    下面是它導入的內置模塊和第三方模塊:

     

    import concurrent.futures
    import contextvars
    import functools
    import inspect
    import itertools
    import types
    import warnings
    import weakref
    from types import GenericAlias
    
    # 除了 base_tasks 其他都已經全部 load 掉了
    from . import base_tasks
    from . import coroutines
    from . import events
    from . import exceptions
    from . import futures
    from .coroutines import _is_coroutine
    

     

    其全局代碼有:

     

    
    # 生成新的 task 時的命名計數器
    # 這里不采用 +=1 的操作是因為協程并非線程安全
    # 通過迭代器不斷的向后計數,可以完美的保證線程安全(Ps: GET 新技能)
    _task_name_counter = itertools.count(1).__next__
    
    # ...
    
    _PyTask = Task
    
    
    FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
    FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
    ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
    
    
    # 包含所有正在活動的任務
    _all_tasks = weakref.WeakSet()
    
    # 一個字典,包含當前正在活動的任務 {loop: task}
    _current_tasks = {}
    
    # ...
    
    _py_register_task = _register_task
    _py_unregister_task = _unregister_task
    _py_enter_task = _enter_task
    _py_leave_task = _leave_task
    
    
    try:
        from _asyncio import (_register_task, _unregister_task,
                              _enter_task, _leave_task,
                              _all_tasks, _current_tasks)
    except ImportError:
        pass
    else:
        _c_register_task = _register_task
        _c_unregister_task = _unregister_task
        _c_enter_task = _enter_task
        _c_leave_task = _leave_task
    

     

    trsock

    trsock 是在 base_events 中第九個被 import 的。其作用是實現了一個 '傳輸套接字' 的類。

    具體是對模塊內的,暫不深究。

    runners

    runners 是在 asyncio 入口文件中第八個被 import 的。其作用為定義 asyncio 的入口方法 run 以及定義管理事件循環聲明周期類的 'Runner'。

     

    __all__ = ('Runner', 'run')
    

     

    Runner 功能為 Python 3.11 新功能。

    queues

    queues 是在 asyncio 入口文件中第九個被 import 的。其作用是定義一些用于協程信息同步的隊列。

     

    __all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
    

     

    streams

    streams 是在 asyncio 入口文件中第十個被 import 的。其作用是定義流式傳輸相關的具體實現類,如 '可讀流'、'可寫流' 等等。

     

    __all__ = (
        'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
        'open_connection', 'start_server')
    

     

    如果是在 Unix 平臺下,則 __all__ 會新增一些內容:

     

    if hasattr(socket, 'AF_UNIX'):
        __all__ += ('open_unix_connection', 'start_unix_server')
    
    # 讀寫流操作的緩沖區大小為 64kb
    _DEFAULT_LIMIT = 2 ** 16
    

     

    該文件與 transports 關系較為密切。

    subprocess

    subprocess 是在 asyncio 入口文件中第十一個被 import 的。其作用是定義子進程通信相關的類,如 'SubprocessProtocol' 和 'Protocol' 等等。

     

    __all__ = 'create_subprocess_exec', 'create_subprocess_shell'
    
    # ...
    
    PIPE = subprocess.PIPE
    STDOUT = subprocess.STDOUT
    DEVNULL = subprocess.DEVNULL
    

     

    taskgroups

    taskgroups 是在 asyncio 入口文件中中第十二個被 import 的。其作用是定義了任務組。

     

    __all__ = ["TaskGroup"]
    

     

    此功能為 Python 3.11 新功能。

    timeouts

    timeouts 是在 asyncio 入口文件中中第十三個被 import 的。其作用是定義了超時相關的類和函數。

     

    __all__ = (
        "Timeout",
        "timeout",
        "timeout_at",
    )
    
    
    class _State(enum.Enum):
        CREATED = "created"
        ENTERED = "active"
        EXPIRING = "expiring"
        EXPIRED = "expired"
        EXITED = "finished"
    

     

    threads

    threads 是在 asyncio 入口文件中第十四個被 import 的。其作用是定義了函數 to_thread。

     

    __all__ = "to_thread",
    
    
    async def to_thread(func, /, *args, **kwargs):
        pass
    

     

    模塊導入關系圖

    整個 asyncio 模塊的初始化模塊導入關系圖如下:

    Python asyncio 庫源碼分析  圖1

    由 asyncio.run 引發的故事

    asyncio.run() 作為目前 Python 較為推崇的協程起始方式。研究其內部啟動順序及執行順序是十分有必要的。

    在 Python 3.11 版本后 asyncio 新增了多種協程起始方式,但 asyncio.run 的地位依舊不容置疑。

    如果后續有機會,我們可以再繼續探討研究 Python 3.7 之前的協程起始方式。

    事件循環的初始化過程

    函數基本介紹

    asyncio.run() 位于 asyncio.runners 文件中,其函數簽名如下:

     

    def run(main, *, debug=None):
        pass
    

     

    如同官方文檔所說,該方法如果在沒有 Runner 參與的情況下,應當只調用一次。

    在 Python 3.11 版本后,新加入的 Runner 類使其源碼發生了一定的變化,但其內部邏輯總是萬變不離其宗的。

     

    def run(main, *, debug=None):
        # 若當前的線程已經存在一個正在運行的事件循環、則拋出異常
        if events._get_running_loop() is not None:
            raise RuntimeError(
                "asyncio.run() cannot be called from a running event loop")
    
        with Runner(debug=debug) as runner:
            return runner.run(main)
    

     

    _get_running_loop

    源碼如下:

     

    # ...
    class _RunningLoop(threading.local):
        loop_pid = (None, None)
    
    # ...
    _running_loop = _RunningLoop()
    
    # ...
    def _get_running_loop():
        running_loop, pid = _running_loop.loop_pid
        # 這里條件不滿足,所以返回的必然是 None
        if running_loop is not None and pid == os.getpid():
            return running_loop
    

     

    對于了解過 threading.local 源代碼的同學這里應該比較好理解。

    Ps: threading.local 所實現的功能是讓每一個線程能夠存儲自己獨有的數據,其原理
    大致是維護一個 global dict,其結構為 {"threading_ident": "data"}

    Runner 類

    繼續回到 asyncio.run() 函數中,可以發現它 with 了 Runner 類:

     

    def run(main, *, debug=None):
        # ...
        with Runner(debug=debug) as runner:
            return runner.run(main)
    

     

    先看 Runner 的 __init__() 方法,再看其 __enter__ 方法。

     

    class _State(enum.Enum):
        CREATED = "created"
        INITIALIZED = "initialized"
        CLOSED = "closed"
    
    class Runner:
        def __init__(self, *, debug=None, loop_factory=None):
            self._state = _State.CREATED
            self._debug = debug
            self._loop_factory = loop_factory
            self._loop = None
            self._context = None
            self._interrupt_count = 0
            self._set_event_loop = False
    
        def __enter__(self):
            self._lazy_init()
            return self
    
        def _lazy_init(self):
            # 如果是關閉狀態,則拋出異常
            if self._state is _State.CLOSED:
                raise RuntimeError("Runner is closed")
            # 如果是初始化狀態,則返回
            if self._state is _State.INITIALIZED:
                return
            # 如果 loop 工廠函數是 None
            if self._loop_factory is None:
                # 創建一個新的 loop
                self._loop = events.new_event_loop()
                if not self._set_event_loop:
                    events.set_event_loop(self._loop)
                    self._set_event_loop = True
            else:
                self._loop = self._loop_factory()
    
            if self._debug is not None:
                self._loop.set_debug(self._debug)
    
            self._context = contextvars.copy_context()
            self._state = _State.INITIALIZED
    

     

    events.new_event_loop

    events.new_event_loop 的源碼如下,他通過拿到當前事件循環策略來得到一個新的事件循環:

     

    
    # ...
    _event_loop_policy = None
    _lock = threading.Lock()
    
    # ...
    def new_event_loop():
        return get_event_loop_policy().new_event_loop()
    
    # ...
    def get_event_loop_policy():
        if _event_loop_policy is None:
            _init_event_loop_policy()
        return _event_loop_policy
    
    # ...
    def _init_event_loop_policy():
        global _event_loop_policy
        # 思考點:
        #  這里為何要加線程鎖?
        #  是為了避免多線程多事件循環狀態下 _event_loop_policy 的
        #  數據同步問題嗎?防止同時多次運行 DefaultEventLoopPolicy 實例化嗎?
        with _lock:
            if _event_loop_policy is None:
                from . import DefaultEventLoopPolicy
                _event_loop_policy = DefaultEventLoopPolicy()
    

     

    _UnixDefaultEventLoopPolicy

    接下來我們要繼續看 DefaultEventLoopPolicy 的代碼實現,它位于 unix_events 文件中。

     

    class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
    
        _loop_factory = None
    
        class _Local(threading.local):
            _loop = None
            _set_called = False
    
        def __init__(self):
            # 2. 為當前線程生成了一個獨立的 threading location
            self._local = self._Local()
    
        def new_event_loop(self):
            # 3. 實例化 _UnixSelectorEventLoop
            return self._loop_factory()
    
    class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
        _loop_factory = _UnixSelectorEventLoop
    
        # 1. 初始化類
        def __init__(self):
            super().__init__()
            self._watcher = None
    
    DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
    

     

    _UnixSelectorEventLoop

    繼續看 _UnixSelectorEventLoop 的實例化過程:

     

    
    # ---- coroutines ----
    
    def _is_debug_mode():
        return sys.flags.dev_mode or (not sys.flags.ignore_environment and
                                      bool(os.environ.get('PYTHONASYNCIODEBUG')))
    
    # ---- base_events ----
    
    class BaseEventLoop(events.AbstractEventLoop):
        def __init__(self):
            # 3. 實例化對象字典填充
            self._timer_cancelled_count = 0
            self._closed = False
            self._stopping = False
            # deque 雙端隊列
            self._ready = collections.deque()
            self._scheduled = []
            # 默認執行器
            self._default_executor = None
            self._internal_fds = 0
            self._thread_id = None
            # 1e-09
            self._clock_resolution = time.get_clock_info('monotonic').resolution
            # 默認異常處理程序
            self._exception_handler = None
            self.set_debug(coroutines._is_debug_mode())
            self.slow_callback_duration = 0.1
            self._current_handle = None
            self._task_factory = None
            self._coroutine_origin_tracking_enabled = False
            self._coroutine_origin_tracking_saved_depth = None
    
            self._asyncgens = weakref.WeakSet()
            self._asyncgens_shutdown_called = False
            self._executor_shutdown_called = False
    
    # ---- selector_events ----
    
    class BaseSelectorEventLoop(base_events.BaseEventLoop):
        def __init__(self, selector=None):
            # 2. 繼續調用父類 __init__ 方法,填充實例化對象的 __dict__ 字典
            super().__init__()
            # 3. 判斷 selector 是否為 None
            if selector is None:
                # 得到一個默認的 io 復用選擇器
                # select poll epoll
                selector = selectors.DefaultSelector()
            logger.debug('Using selector: %s', selector.__class__.__name__)
            self._selector = selector
            # 4. 調用 _make_self_pipe 方法
            self._make_self_pipe()
            # 10. 通過 weakref 創建出 1 個弱引用映射類
            self._transports = weakref.WeakValueDictionary()
    
    
        def _make_self_pipe(self):
            # 5. 創建 1 個非阻塞的 socket 對象
            self._ssock, self._csock = socket.socketpair()
            self._ssock.setblocking(False)
            self._csock.setblocking(False)
            self._internal_fds += 1
            # 6. 調用 address,傳入當前 sock 對象的文件描述符
            self._add_reader(self._ssock.fileno(), self._read_from_self)
    
    
        def _add_reader(self, fd, callback, *args):
            # 7. 檢查當前類是否是關閉狀態
            self._check_closed()
            # 9. 實例化注冊一個 handle,注意這里的 callback 是
            # self._read_from_self, args 為 ()
            handle = events.Handle(callback, args, self, None)
            try:
                # 第一次運行這里會報錯,返回當前文件對象注冊的 SelectorKey
                key = self._selector.get_key(fd)
            except KeyError:
                # 若報錯則注冊一個讀事件,將 handle 放入
                self._selector.register(fd, selectors.EVENT_READ,
                                        (handle, None))
            else:
                # 如果是第二次運行這個方法,則拿到 event
                # 疑問點(register 時放入的是 handle 和 None)
                # 為何出來就成了可讀流和可寫流?其實是事件循環開啟后的一系列處理
                # 可參照 事件循環的 sele._selector.select 以及 BaseSelectorEventLoop._process_events() 方法
                # 結果中的 reader 表示可讀流的事件處理器對象,而 writer 為 None
                mask, (reader, writer) = key.events, key.data
                # 修改 fd 的注冊事件
                # select 中 1 是讀事件,2 是寫事件。按位或后的結果總是較大值
                # 或兩者的和
                self._selector.modify(fd, mask | selectors.EVENT_READ,
                                      (handle, writer))
                # 如果沒有可讀流,則關閉,說明 except 那里沒有注冊好 handle 或者被 unregister 掉了
                if reader is not None:
                    reader.cancel()
            return handle
    
        def _check_closed(self):
            # 8. 若是關閉狀態則直接拋出異常
            if self._closed:
                raise RuntimeError('Event loop is closed')
    
    
    # ---- unix_events ----
    
    class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
        def __init__(self, selector=None):
            # 1. 調用父類進行實例化數據填充,構建 __dict__ 字典
            super().__init__(selector)
            self._signal_handlers = {}
    

     

    events.Handle

    events.Handle 類的源碼如下,這個 Handle 類是 asyncio 中各類任務的上層封裝,十分重要:

     

    
    # handle = events.Handle(callback, args, self, None)
    # callback = BaseSelectorEventLoop._read_from_self
    # self = _UnixSelectorEventLoop instance
    
    class Handle:
        __slots__ = ('_callback', '_args', '_cancelled', '_loop',
                     '_source_traceback', '_repr', '__weakref__',
                     '_context')
    
        def __init__(self, callback, args, loop, context=None):
            # 若當前上下文為空,則 copy 當前上下文
            if context is None:
                context = contextvars.copy_context()
            self._context = context
            # loop 就是 _UnixSelectorEventLoop 的實例化對象
            self._loop = loop
            self._callback = callback
            # ()
            self._args = args
            self._cancelled = False
            self._repr = None
            # 先不看 debug 模式
            if self._loop.get_debug():
                self._source_traceback = format_helpers.extract_stack(
                    sys._getframe(1))
            else:
                self._source_traceback = None
    

     

    event.set_event_loop

    至此,_loop_factory 已經全部走完了。實際上也沒干太特別的事情,就創建了一個 DefaultSelector 以及實例化了一個 socket 對象并注冊進了 DefaultSelector 中。

    我們要接著看 Runner:

     

    class _State(enum.Enum):
        CREATED = "created"
        INITIALIZED = "initialized"
        CLOSED = "closed"
    
    class Runner:
        def __init__(self, *, debug=None, loop_factory=None):
            self._state = _State.CREATED
            self._debug = debug
            self._loop_factory = loop_factory
            self._loop = None
            self._context = None
            self._interrupt_count = 0
            self._set_event_loop = False
    
        def __enter__(self):
            self._lazy_init()
            return self
    
        def _lazy_init(self):
            # 如果是關閉狀態,則拋出異常
            if self._state is _State.CLOSED:
                raise RuntimeError("Runner is closed")
            # 如果是初始化狀態,則返回
            if self._state is _State.INITIALIZED:
                return
            # 如果 loop 工廠函數是 None
            if self._loop_factory is None:
                # 創建一個新的 loop,這里的返回對象就是 _UnixSelectorEventLoop 的實例化對象
                self._loop = events.new_event_loop()
                if not self._set_event_loop:
                    events.set_event_loop(self._loop)
                    self._set_event_loop = True
            else:
                self._loop = self._loop_factory()
    
            if self._debug is not None:
                self._loop.set_debug(self._debug)
    
            self._context = contextvars.copy_context()
            self._state = _State.INITIALIZED
    

     

    events.set_event_loop 源碼:

     

    def get_event_loop_policy():
        if _event_loop_policy is None:
            _init_event_loop_policy()
        # 1. 應該走這里,實際上 _event_loop_policy 就是 _UnixDefaultEventLoopPolicy 的實例對象
        return _event_loop_policy
    
    def set_event_loop(loop):
        # 2. 運行 _UnixDefaultEventLoopPolicy 實例對象的 set_event_loop
        get_event_loop_policy().set_event_loop(loop)
    

     

    _UnixDefaultEventLoopPolicy 的 set_event_loop 方法:

     

    # ---- events ----
    
    class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
        _loop_factory = None
    
        class _Local(threading.local):
            _loop = None
            _set_called = False
    
        def __init__(self):
            self._local = self._Local()
    
        def set_event_loop(self, loop):
            # 2. 通過 threading lock 設置標志位置
            self._local._set_called = True
            if loop is not None and not isinstance(loop, AbstractEventLoop):
                raise TypeError(f"loop must be an instance of AbstractEventLoop or None, not '{type(loop).__name__}'")
            self._local._loop = loop
    
    # ---- unix_events ----
    
    class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
        _loop_factory = _UnixSelectorEventLoop
    
        # ...
        def set_event_loop(self, loop):
            # 這個 loop 是 _UnixSelectorEventLoop 的實例化對象
    
            # 1. super 父類的同名方法
            super().set_event_loop(loop)
    
            # 3. 實例化的時候這里是 None, 不會運行下面的條件
            if (self._watcher is not None and
                    threading.current_thread() is threading.main_thread()):
                self._watcher.attach_loop(loop)
    

     

    至此 Runner._lazy_init 應該全部走完了:

     

    class _State(enum.Enum):
        CREATED = "created"
        INITIALIZED = "initialized"
        CLOSED = "closed"
    
    class Runner:
        def __init__(self, *, debug=None, loop_factory=None):
            self._state = _State.CREATED
            self._debug = debug
            self._loop_factory = loop_factory
            self._loop = None
            self._context = None
            self._interrupt_count = 0
            self._set_event_loop = False
    
        def __enter__(self):
            self._lazy_init()
            return self
    
        def _lazy_init(self):
            # 如果是關閉狀態,則拋出異常
            if self._state is _State.CLOSED:
                raise RuntimeError("Runner is closed")
            # 如果是初始化狀態,則返回
            if self._state is _State.INITIALIZED:
                return
    
            # 如果 loop 工廠函數是 None
            if self._loop_factory is None:
                # 創建一個新的 loop,這里的返回對象就是 _UnixSelectorEventLoop 的實例化對象
                self._loop = events.new_event_loop()
                # 設置新的標志位,代表事件循環已經初始化成功
                if not self._set_event_loop:
                    events.set_event_loop(self._loop)
                    self._set_event_loop = True
    
            # 不會走這里
            else:
                self._loop = self._loop_factory()
    
            if self._debug is not None:
                self._loop.set_debug(self._debug)
    
    
            # copy 當前上下文
            self._context = contextvars.copy_context()
            # 修改狀態
            self._state = _State.INITIALIZED
    

     

    注意,此時事件循環已經初始化完成了,但還沒有正式啟動。

    事件循環初始化流程圖

    以下是事件循環的初始化流程圖:

    Python asyncio 庫源碼分析 圖2

    事件循環的啟動和任務的執行

    在上面我們大概看了一下事件循環的初始化。接下來應該走到 runner.run() 方法中看他如何運行事件循環。

     

    def run(main, *, debug=None):
        # ...
        with Runner(debug=debug) as runner:
            return runner.run(main)
    

     

    runner.run

    源代碼如下:

     

    
    class _RunningLoop(threading.local):
        loop_pid = (None, None)
    
    # ...
    _running_loop = _RunningLoop()
    
    # ...
    def _get_running_loop():
        running_loop, pid = _running_loop.loop_pid
        if running_loop is not None and pid == os.getpid():
            return running_loop
    
    # -------------
    
    class Runner:
        def __init__(self, *, debug=None, loop_factory=None):
            self._state = _State.CREATED # INITIALIZED
            self._debug = debug
            self._loop_factory = loop_factory  # _UnixSelectorEventLoop 實例對象
            self._loop = None
            self._context = None          # dict
            self._interrupt_count = 0
            self._set_event_loop = False  # True
    
        def _lazy_init(self):
            if self._state is _State.CLOSED:
                raise RuntimeError("Runner is closed")
    
            # 2. 直接返回
            if self._state is _State.INITIALIZED:
                return
    
            # ...
    
        def run(self, coro, *, context=None):
    
            # 若不是一個協程函數,則拋出異常
            if not coroutines.iscoroutine(coro):
                raise ValueError("a coroutine was expected, got {!r}".format(coro))
    
            # 若 event loop 已經運行了,則拋出異常
            # 這里還沒有運行
            if events._get_running_loop() is not None:
                raise RuntimeError(
                    "Runner.run() cannot be called from a running event loop")
    
            # 1. 運行 _lazy_init
            self._lazy_init()
    
            # 3. 不是 None
            if context is None:
                context = self._context
    
            # 4. 創建協程并發任務
            task = self._loop.create_task(coro, context=context)
    
    
            # .. 后面再看
            if (threading.current_thread() is threading.main_thread()
                and signal.getsignal(signal.SIGINT) is signal.default_int_handler
            ):
                sigint_handler = functools.partial(self._on_sigint, main_task=task)
                try:
                    signal.signal(signal.SIGINT, sigint_handler)
                except ValueError:
                    sigint_handler = None
            else:
                sigint_handler = None
    
            self._interrupt_count = 0
            try:
                return self._loop.run_until_complete(task)
            except exceptions.CancelledError:
                if self._interrupt_count > 0:
                    uncancel = getattr(task, "uncancel", None)
                    if uncancel is not None and uncancel() == 0:
                        raise KeyboardInterrupt()
                raise  # CancelledError
            finally:
                if (sigint_handler is not None
                    and signal.getsignal(signal.SIGINT) is sigint_handler
                ):
                    signal.signal(signal.SIGINT, signal.default_int_handler)
    

     

    self._loop.create_task

    _UnixSelectorEventLoop 和其父類 BaseSelectorEventLoop 本身沒有實現 create_task() 方法,是在其超類 BaseEventLoop 所實現。

    BaseEventLoop.create_task() 實際上就是 asyncio.create_task() 方法的底層。

     

    
    class BaseEventLoop(events.AbstractEventLoop):
    
        def __init__(self):
            self._timer_cancelled_count = 0
            self._closed = False
            self._stopping = False
            self._ready = collections.deque()
            self._scheduled = []
            self._default_executor = None
            self._internal_fds = 0
            self._thread_id = None
            self._clock_resolution = time.get_clock_info('monotonic').resolution
            self._exception_handler = None
            self.set_debug(coroutines._is_debug_mode())
            self.slow_callback_duration = 0.1
            self._current_handle = None
            self._task_factory = None
            self._coroutine_origin_tracking_enabled = False
            self._coroutine_origin_tracking_saved_depth = None
    
            self._asyncgens = weakref.WeakSet()
            self._asyncgens_shutdown_called = False
            self._executor_shutdown_called = False
    
        def create_task(self, coro, *, name=None, context=None):
            """Schedule a coroutine object.
    
            Return a task object.
            """
            # 先檢查是否關閉,返回的結果必定是 False
            self._check_closed()
    
            # 任務工廠為 None
            if self._task_factory is None:
                task = tasks.Task(coro, loop=self, name=name, context=context)
                if task._source_traceback:
                    del task._source_traceback[-1]
    
            # 若通過 asyncio.get_running_loop().set_task_factory() 設置了任務工廠函數的話
            # 那么就運行 else 的代碼塊
            else:
                if context is None:
                    # Use legacy API if context is not needed
                    task = self._task_factory(self, coro)
                else:
                    task = self._task_factory(self, coro, context=context)
    
                tasks._set_task_name(task, name)
    
            return task
    
        def _check_closed(self):
            # 若是關閉狀態則直接拋出異常
            if self._closed:
                raise RuntimeError('Event loop is closed')
    
    
        def set_task_factory(self, factory):
            if factory is not None and not callable(factory):
                raise TypeError('task factory must be a callable or None')
            self._task_factory = factory
    

     

    tasks.Task

    tasks.Task 的源碼如下:

     

    
    # ----- futures -----
    
    class Future:
        _state = _PENDING
        _result = None
        _exception = None
        _loop = None
        _source_traceback = None
        _cancel_message = None
        _cancelled_exc = None
    
        _asyncio_future_blocking = False
    
        __log_traceback = False
    
        def __init__(self, *, loop=None):
            # 2. loop 傳入的不是 None、所以這里直接走 else
            if loop is None:
                self._loop = events._get_event_loop()
            else:
                self._loop = loop
            self._callbacks = []
            if self._loop.get_debug():
                self._source_traceback = format_helpers.extract_stack(
                    sys._getframe(1))
    
    
    # ...
    _PyFuture = Future
    
    # ----- tasks -----
    
    _task_name_counter = itertools.count(1).__next__
    
    # ...
    class Task(futures._PyFuture):
    
        _log_destroy_pending = True
    
        def __init__(self, coro, *, loop=None, name=None, context=None):
            # 1. 運行 super 也就是 Future 的 __init__ 方法
            super().__init__(loop=loop)
            if self._source_traceback:
                del self._source_traceback[-1]
    
            # 若不是一個 coroutine 則拋出異常
            if not coroutines.iscoroutine(coro):
                self._log_destroy_pending = False
                raise TypeError(f"a coroutine was expected, got {coro!r}")
    
            # 若沒有指定 name 則生成一個 name
            if name is None:
                self._name = f'Task-{_task_name_counter()}'
            else:
                self._name = str(name)
    
            self._num_cancels_requested = 0
            self._must_cancel = False
            self._fut_waiter = None
            self._coro = coro
            if context is None:
                self._context = contextvars.copy_context()
            else:
                self._context = context
    
            # 運行 _UnixSelectorEventLoop 的 call_soon 方法
            self._loop.call_soon(self.__step, context=self._context)
            _register_task(self)
    

     

    BaseEventLoop.call_soon

    _UnixSelectorEventLoop 未實現 call_soon() 方法,而是在其超類 BaseEventLoop 中實現:

     

    class BaseEventLoop(events.AbstractEventLoop):
        def __init__(self):
            # ...
            # deque 雙端隊列
            self._ready = collections.deque()
    
        def _call_soon(self, callback, args, context):
            # handle 的源碼可參照上面初始化 event loop 時的操作
            handle = events.Handle(callback, args, self, context)
            if handle._source_traceback:
                del handle._source_traceback[-1]
            # 將 handle 放入 _ready 中
            self._ready.append(handle)
            return handle
    
        def call_soon(self, callback, *args, context=None):
            # callback: Task.__step 方法
            # args: ()
            # context: dict
            self._check_closed()
            # 不走 debug,沒必要細看
            if self._debug:
                self._check_thread()
                self._check_callback(callback, 'call_soon')
            handle = self._call_soon(callback, args, context)
            if handle._source_traceback:
                del handle._source_traceback[-1]
            return handle
    

     

    _register_task

    在 loop.call_soon() 執行執行完畢后,Task 的 __init__() 方法最后會運行 _register_task() 方法。

     

    
    # 包含所有活動任務的 WeakSet。
    _all_tasks = weakref.WeakSet()
    
    def _register_task(task):
        """在 asyncio 中注冊一個由循環執行的新任務。"""
        _all_tasks.add(task)
    

     

    runner.run

    現在,讓我們繼續回到 runner.run() 方法中。

     

    class Runner:
        def __init__(self, *, debug=None, loop_factory=None):
            self._state = _State.CREATED # INITIALIZED
            self._debug = debug
            self._loop_factory = loop_factory  # _UnixSelectorEventLoop 實例對象
            self._loop = None
            self._context = None          # dict
            self._interrupt_count = 0
            self._set_event_loop = False  # True
    
        def run(self, coro, *, context=None):
    
            if not coroutines.iscoroutine(coro):
                raise ValueError("a coroutine was expected, got {!r}".format(coro))
    
            if events._get_running_loop() is not None:
                raise RuntimeError(
                    "Runner.run() cannot be called from a running event loop")
    
            self._lazy_init()
    
            if context is None:
                context = self._context
    
            task = self._loop.create_task(coro, context=context)
    
            # 如果當前線程是主線程并且當前使用了 SIGNAL 的默認處理程序結果是 True
            # 這里是 ctrl + c 終止程序的信號
            if (threading.current_thread() is threading.main_thread()
                and signal.getsignal(signal.SIGINT) is signal.default_int_handler
            ):
                # 則信號處理程序設置為 self._on_sigint 程序, 并將主任務傳遞進去
                sigint_handler = functools.partial(self._on_sigint, main_task=task)
    
                # 嘗試設置當前的信號處理程序
                try:
                    signal.signal(signal.SIGINT, sigint_handler)
                except ValueError:
                    sigint_handler = None
            else:
                sigint_handler = None
    
            self._interrupt_count = 0
    
            try:
                # 核心代碼
                return self._loop.run_until_complete(task)
            except exceptions.CancelledError:
                    # 異常處理邏輯
                    uncancel = getattr(task, "uncancel", None)
                    if uncancel is not None and uncancel() == 0:
                        raise KeyboardInterrupt()
                raise  # CancelledError
            finally:
                # 解綁 ctrl+c 的信號處理
                if (sigint_handler is not None
                    and signal.getsignal(signal.SIGINT) is sigint_handler
                ):
                    signal.signal(signal.SIGINT, signal.default_int_handler)
    
    
        def _on_sigint(self, signum, frame, main_task):
            # 主線程里 +1
            self._interrupt_count += 1
            if self._interrupt_count == 1 and not main_task.done():
                # 取消主任務
                main_task.cancel()
                self._loop.call_soon_threadsafe(lambda: None)
                return
            raise KeyboardInterrupt()
    

     

    BaseEventLoop.run_until_complete

    _UnixSelectorEventLoop 并未實現 run_until_complete() 方法。而是由其超類 BaseEventLoop 所實現。

    BaseEventLoop.run_until_complete() 源碼如下:

     

    class BaseEventLoop(events.AbstractEventLoop):
    
        def __init__(self):
            self._timer_cancelled_count = 0
            self._closed = False
            self._stopping = False
            self._ready = collections.deque()  # 應該塞了一個 handle
            self._scheduled = []
            self._default_executor = None
            self._internal_fds = 0
    
        def _check_closed(self):
            if self._closed:
                raise RuntimeError('Event loop is closed')
    
        def is_running(self):
            """Returns True if the event loop is running."""
            return (self._thread_id is not None)
    
        def _check_running(self):
            if self.is_running():
                raise RuntimeError('This event loop is already running')
            if events._get_running_loop() is not None:
                raise RuntimeError(
                    'Cannot run the event loop while another loop is running')
    
        def run_until_complete(self, future):
            # future 就是 main coroutine 的入口函數的 task
    
            # 1. 未關閉
            self._check_closed()
    
            # 2. self._thread_id 現在是 None,所以這里不會報錯
            self._check_running()
    
            # False
            new_task = not futures.isfuture(future)
    
            # 3. 將 task 傳入,返回一個 future 對象
            future = tasks.ensure_future(future, loop=self)
    
            if new_task:
                future._log_destroy_pending = False
    
            # 5. 給 main coroutine 的入口函數的 task 添加一個回調函數
            future.add_done_callback(_run_until_complete_cb)
    
            try:
                # 6. 開始運行
                self.run_forever()
            except:
                if new_task and future.done() and not future.cancelled():
                    future.exception()
                raise
            finally:
                # 執行完成后,解綁毀回調函數
                future.remove_done_callback(_run_until_complete_cb)
            # 若報錯,則代表事件循環關閉了
            if not future.done():
                raise RuntimeError('Event loop stopped before Future completed.')
            # 返回未來對象的結果
            return future.result()
    
    # ---- tasks ----
    
    def ensure_future(coro_or_future, *, loop=None):
        return _ensure_future(coro_or_future, loop=loop)
    
    def _ensure_future(coro_or_future, *, loop=None):
        # 4. 保證是 future
        # True
        if futures.isfuture(coro_or_future):
            # False
            if loop is not None and loop is not futures._get_loop(coro_or_future):
                raise ValueError('The future belongs to a different loop than '
                                'the one specified as the loop argument')
            # 直接 return
            return coro_or_future
    
        # 如果不是一個 coro 或者 future、則進行其他處理
        called_wrap_awaitable = False
        if not coroutines.iscoroutine(coro_or_future):
            if inspect.isawaitable(coro_or_future):
                coro_or_future = _wrap_awaitable(coro_or_future)
                called_wrap_awaitable = True
            else:
                raise TypeError('An asyncio.Future, a coroutine or an awaitable '
                                'is required')
    
        if loop is None:
            loop = events._get_event_loop(stacklevel=4)
        try:
            return loop.create_task(coro_or_future)
        except RuntimeError:
            if not called_wrap_awaitable:
                coro_or_future.close()
            raise
    

     

    BaseEventLoop.run_forever

    BaseEventLoop.run_forever() 的源碼如下:

    class BaseEventLoop(events.AbstractEventLoop):
        def __init__(self):
            self._timer_cancelled_count = 0
            self._closed = False
            self._stopping = False
            self._ready = collections.deque()  # 應該塞了一個 handle
            self._scheduled = []
            self._default_executor = None
            self._internal_fds = 0
            self._thread_id = 111   # 當前線程 id
    
        def _check_closed(self):
            if self._closed:
                raise RuntimeError('Event loop is closed')
    
        def is_running(self):
            """Returns True if the event loop is running."""
            return (self._thread_id is not None)
    
        def _check_running(self):
            if self.is_running():
                raise RuntimeError('This event loop is already running')
            if events._get_running_loop() is not None:
                raise RuntimeError(
                    'Cannot run the event loop while another loop is running')
    
        def run_forever(self):
            """Run until stop() is called."""
    
            # 1. 未關閉
            self._check_closed()
            # 2. 未運行
            self._check_running()
            # 3. 不重要
            self._set_coroutine_origin_tracking(self._debug)
    
            # 4. 獲取舊的異步生成器鉤子
            old_agen_hooks = sys.get_asyncgen_hooks()
            try:
                # 5. 將當前事件循環的 _thread_id 給賦值
                self._thread_id = threading.get_ident()
                # 6. 設置異步生成器的鉤子函數
                sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
                                       finalizer=self._asyncgen_finalizer_hook)
    
                # 7. 設置正在運行的 loop
                events._set_running_loop(self)
    
                # 8. 調用 _run_once
                while True:
                    self._run_once()
                    # 9. 如果 _stopping 為 True、則跳出
                    if self._stopping:
                        break
            finally:
                # 恢復標志位、恢復生成器鉤子函數
                self._stopping = False
                self._thread_id = None
                events._set_running_loop(None)
                self._set_coroutine_origin_tracking(False)
                sys.set_asyncgen_hooks(*old_agen_hooks)
    
    
        def _asyncgen_firstiter_hook(self, agen):
            # 在之前調用
            if self._asyncgens_shutdown_called:
                warnings.warn(
                    f"asynchronous generator {agen!r} was scheduled after "
                    f"loop.shutdown_asyncgens() call",
                    ResourceWarning, source=self)
    
            self._asyncgens.add(agen)
    
    
        def _asyncgen_finalizer_hook(self, agen):
            # 在之后調用
            self._asyncgens.discard(agen)
            if not self.is_closed():
                self.call_soon_threadsafe(self.create_task, agen.aclose())
    
    
    # ---- events ----
    
    def _set_running_loop(loop):
        _running_loop.loop_pid = (loop, os.getpid())
    

     

    BaseEventLoop._run_once

    BaseEventLoop._run_once() 方法源碼如下:

     

    class BaseEventLoop(events.AbstractEventLoop):
        def __init__(self):
            self._timer_cancelled_count = 0
            self._closed = False
            self._stopping = False
            self._ready = collections.deque()  # 應該塞了 1 個 handle
            self._scheduled = []
            self._default_executor = None
            self._internal_fds = 0
            self._thread_id = 111   # 當前線程 id
    
        def _run_once(self):
            sched_count = len(self._scheduled)
    
            # 1. 判斷當前需要調度的數量,是否大于 _MIN_SCHEDULED_TIMER_HANDLES
            # 并且已取消的計時器句柄數量除以需要調度的數量大于 _MIN_SCHEDULED_TIMER_HANDLES
            # 這里的條件肯定是不滿足的
            if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
                self._timer_cancelled_count / sched_count >
                    _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
                new_scheduled = []
                for handle in self._scheduled:
                    if handle._cancelled:
                        handle._scheduled = False
                    else:
                        new_scheduled.append(handle)
    
                heapq.heapify(new_scheduled)
                self._scheduled = new_scheduled
                self._timer_cancelled_count = 0
            else:
                # 2. 這里的 while 循環也跑不起來的,因為 self._scheduled 是 []
                while self._scheduled and self._scheduled[0]._cancelled:
                    self._timer_cancelled_count -= 1
                    handle = heapq.heappop(self._scheduled)
                    handle._scheduled = False
    
            # 3. timeout 這里應該是滿足條件的,置 0
            timeout = None
            if self._ready or self._stopping:
                timeout = 0
    
            elif self._scheduled:
                when = self._scheduled[0]._when
                timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
    
            # 4. BaseSelectorEventLoop 子類中有這個 _selector,這里直接開啟監聽。
            # 這里監聽的對象只有 1 個 socket 對象,由于沒有事件觸發,所以這里會直接跳過
            event_list = self._selector.select(timeout)
            # BaseEventLoop._process_events
            self._process_events(event_list)
            event_list = None
    
            end_time = self.time() + self._clock_resolution
    
            # 5. 不會進行循環
            while self._scheduled:
                handle = self._scheduled[0]
                if handle._when >= end_time:
                    break
                handle = heapq.heappop(self._scheduled)
                handle._scheduled = False
                self._ready.append(handle)
    
            # 6. self._ready 的長度應該為 1,里面放了一個 handle
            ntodo = len(self._ready)
    
            for i in range(ntodo):
                # 彈出第一個 handle,若沒取消則運行其 _run 方法
                handle = self._ready.popleft()
                if handle._cancelled:
                    continue
                # 若開啟了調試模式,則還需要記錄時間
                if self._debug:
                    try:
                        self._current_handle = handle
                        t0 = self.time()
                        handle._run()
                        dt = self.time() - t0
                        if dt >= self.slow_callback_duration:
                            logger.warning('Executing %s took %.3f seconds',
                                           _format_handle(handle), dt)
                    finally:
                        self._current_handle = None
                else:
                    # 運行其 _run 方法
                    handle._run()
            handle = None
    

     

    handle._run

    handle 對象是用戶所創建的任務對象的抽象層。

    因為 Task 內部實際上是調用了 loop.call_soon() 方法將 coroutine 放在 Task 對象中,而 Task 對象的 __step() 方法又將作為 callback 封裝給 handle. 并 register task 至 _all_tasks 這個 WeakSet 中。

    換而言之、事件循環總是通過 _ready 隊列拿到不同的 handle,并通過 handle 來執行最初的 coroutine 任務。

    以下是 handle._run() 方法的源碼:

     

    
    class Handle:
    
        def __init__(self, callback, args, loop, context=None):
    
            if context is None:
                context = contextvars.copy_context()
    
            self._context = context  # Task 對象創建時的上下文環境
            self._loop = loop  # 當前的 event loop
            self._callback = callback  # 就是 Task 對象的 __step
            self._args = args
            self._cancelled = False
            self._repr = None
            if self._loop.get_debug():
                self._source_traceback = format_helpers.extract_stack(
                    sys._getframe(1))
            else:
                self._source_traceback = None
    
    
        def cancel(self):
            if not self._cancelled:
                self._cancelled = True
                if self._loop.get_debug():
                    self._repr = repr(self)
                self._callback = None
                self._args = None
    
        def cancelled(self):
            return self._cancelled
    
        def _run(self):
            try:
                # 運行 Task 對象的 __step 方法
                self._context.run(self._callback, *self._args)
            # 若有異常,則交由默認的異常處理函數進行處理
            except (SystemExit, KeyboardInterrupt):
                raise
            except BaseException as exc:
                cb = format_helpers._format_callback_source(
                    self._callback, self._args)
                msg = f'Exception in callback {cb}'
                context = {
                    'message': msg,
                    'exception': exc,
                    'handle': self,
                }
                if self._source_traceback:
                    context['source_traceback'] = self._source_traceback
                self._loop.call_exception_handler(context)
            self = None  # 發生異常時需要中斷循環。
    

     

    Task.__step

    Task.__step() 中的邏輯是如何運行傳入的協程函數:

     

    
    # 包含所有正在活動的任務
    _all_tasks = weakref.WeakSet()
    
    # 一個字典,包含當前正在活動的任務 {loop: task}
    _current_tasks = {}
    
    def _enter_task(loop, task):
        # 4. 為當前的 loop 添加活動任務
        # 若當前 loop 已經有一個活動任務,則拋出 RuntimeError
        current_task = _current_tasks.get(loop)
        if current_task is not None:
            raise RuntimeError(f"Cannot enter into task {task!r} while another "
                               f"task {current_task!r} is being executed.")
    
        _current_tasks[loop] = task
    
    
    def _leave_task(loop, task):
        # 10. 取消活動任務
        current_task = _current_tasks.get(loop)
        if current_task is not task:
            raise RuntimeError(f"Leaving task {task!r} does not match "
                               f"the current task {current_task!r}.")
        del _current_tasks[loop]
    
    class Task(futures._PyFuture):
    
        _log_destroy_pending = True
    
        def __init__(self, coro, *, loop=None, name=None, context=None):
            super().__init__(loop=loop)
            if self._source_traceback:
                del self._source_traceback[-1]
    
            if not coroutines.iscoroutine(coro):
                self._log_destroy_pending = False
                raise TypeError(f"a coroutine was expected, got {coro!r}")
    
            if name is None:
                self._name = f'Task-{_task_name_counter()}'
            else:
                self._name = str(name)
    
            self._num_cancels_requested = 0
            self._must_cancel = False
            self._fut_waiter = None
            # 當前運行時來看,這里應該是入口函數的 coroutine
            # 即為 asyncio.run(main()) 的 main()
            self._coro = coro
            if context is None:
                self._context = contextvars.copy_context()
            else:
                self._context = context
    
            self._loop.call_soon(self.__step, context=self._context)
            _register_task(self)
    
    
    
        def __step(self, exc=None):
            # 1. 若當前任務已經 done 掉則拋出異常(這里的異常會被 handle._run 捕獲的)
            if self.done():
                raise exceptions.InvalidStateError(
                    f'_step(): already done: {self!r}, {exc!r}')
    
            # 2. 若需要取消,且 exc 不是 CancelledError 類型的異常,則創建一個取消任務
            # 實際上就是將 exc 賦值成一個 CancelledError 的對象
            if self._must_cancel:
                if not isinstance(exc, exceptions.CancelledError):
                    exc = self._make_cancelled_error()
                self._must_cancel = False
            coro = self._coro
            self._fut_waiter = None
    
            # 3. 調用 _enter_task() 函數
            _enter_task(self._loop, self)
    
            # Call either coro.throw(exc) or coro.send(None).
            try:
                # 主動啟動協程對象
                if exc is None:
                    # 我們直接使用 `send` 方法,因為協程
                    # 沒有 __iter__ 和 __next__ 方法。
                    result = coro.send(None)
                else:
                    # 如果有 exc 則通過 throw 向協程函數內部拋出異常
                    result = coro.throw(exc)
    
            except StopIteration as exc:
                # 4. 若協程函數執行完畢則判斷是否需要取消
                if self._must_cancel:
                    # 通過調度后嘗試取消任務(下次事件循環過程中觸發)
                    self._must_cancel = False
                    super().cancel(msg=self._cancel_message)
                else:
                    # 設置結果
                    super().set_result(exc.value)
    
            except exceptions.CancelledError as exc:
                # 5. 保存原始異常,以便我們稍后將其鏈接起來
                self._cancelled_exc = exc
                # 通過調度后嘗試取消任務(下次事件循環過程中觸發)
                super().cancel()  # I.e., Future.cancel(self).
    
            except (KeyboardInterrupt, SystemExit) as exc:
                # 6. 如果是 <c-c> 或者系統推出,則設置異常任務,立即觸發
                super().set_exception(exc)
                raise
    
            except BaseException as exc:
                # 7. 若是其他基本異常,則設置異常任務,立即觸發
                super().set_exception(exc)
    
            else:
                # 8. 沒有異常,對協程結果開始進行判定
                # 首先,查看 result 是否具有 _asyncio_future_blocking 屬性
                blocking = getattr(result, '_asyncio_future_blocking', None)
                if blocking is not None:
                    # 如果 result 對象所屬的事件循環與當前任務的事件循環不一致
                    # 則拋出 RuntimeError 異常(下次事件循環過程中觸發)
                    if futures._get_loop(result) is not self._loop:
                        new_exc = RuntimeError(
                            f'Task {self!r} got Future '
                            f'{result!r} attached to a different loop')
                        self._loop.call_soon(
                            self.__step, new_exc, context=self._context)
    
                    # 如果 blocking 為 True
                    elif blocking:
    
                        # 如果返回的結果就是 Task 本身, 則引發 RuntimeError
                        # (下次事件循環過程中觸發)
                        if result is self:
                            new_exc = RuntimeError(
                                f'Task cannot await on itself: {self!r}')
                            self._loop.call_soon(
                                self.__step, new_exc, context=self._context)
    
                        # 將 self.__wakeup 設置為 result 對象的回調函數
                        # 并將 result 對象作為等待者保存在 _fut_waiter 屬性中
                        # 如果此時任務需要取消,并且成功取消了等待者,則將 _must_cancel 標志設置為 False。
                        else:
    
                            result._asyncio_future_blocking = False
                            result.add_done_callback(
                                self.__wakeup, context=self._context)
    
                            self._fut_waiter = result
                            if self._must_cancel:
                                if self._fut_waiter.cancel(
                                        msg=self._cancel_message):
                                    self._must_cancel = False
    
                    # 如果 blocking 值為 False
                    # 則拋出 RuntimeError 異常(下次事件循環過程中觸發)
                    else:
                        new_exc = RuntimeError(
                            f'yield was used instead of yield from '
                            f'in task {self!r} with {result!r}')
                        self._loop.call_soon(
                            self.__step, new_exc, context=self._context)
    
                # 如果結果對象 result 為 None
                # 表示協程使用了 yield 語句,它調度一個新的事件循環迭代,即再次調用 __step 方法。
                # 直到 StopIteration 被觸發后,協程函數才真正運行完畢
                elif result is None:
                    self._loop.call_soon(self.__step, context=self._context)
    
                # 如果結果對象 result 是一個生成器對象
                # 則拋出 RuntimeError 異常,表示協程在生成器中使用了錯誤的語法。
                # (下次事件循環過程中觸發)
                elif inspect.isgenerator(result):
                    # Yielding a generator is just wrong.
                    new_exc = RuntimeError(
                        f'yield was used instead of yield from for '
                        f'generator in task {self!r} with {result!r}')
                    self._loop.call_soon(
                        self.__step, new_exc, context=self._context)
                else:
                    # 對于其他類型的結果對象,拋出 RuntimeError 異常,表示協程產生了無效的
                    # 結果(下次事件循環過程中觸發)
                    new_exc = RuntimeError(f'Task got bad yield: {result!r}')
                    self._loop.call_soon(
                        self.__step, new_exc, context=self._context)
    
            finally:
                # 9. 最后,使用 _leave_task 取消活動任務
                _leave_task(self._loop, self)
                # 發生異常,需要中斷循環
                self = None
    

     

    關于回調函數的處理

    眾所周知,無論是 task 對象還是 future 未來對象,我們都可以通過 add_done_callback() 方法來為其新增一個回調函數。

    那么在上面 task.__step() 方法運行的過程中,回調函數是在何時運行呢?

    先從 add_done_callback() 方法看起,它其實是由 Task 類的父類 Future 實現:

     

    class Future:
        _state = _PENDING
        _result = None
        _exception = None
        _loop = None
        _source_traceback = None
        _cancel_message = None
        _cancelled_exc = None
    
    
        def __init__(self, *, loop=None):
            """
            if loop is None:
                self._loop = events._get_event_loop()
            else:
                self._loop = loop
            self._callbacks = []
            if self._loop.get_debug():
                self._source_traceback = format_helpers.extract_stack(
                    sys._getframe(1))
    
        # ...
    
        def add_done_callback(self, fn, *, context=None):
            # 若當前對象的狀態不是 peding
            # 則將 callback 放在下次事件循環中運行
            if self._state != _PENDING:
                self._loop.call_soon(fn, self, context=context)
            else:
                # 否則,將回調函數放在列表中
                if context is None:
                    context = contextvars.copy_context()
                self._callbacks.append((fn, context))
    

     

    調用 callback 的方法是由 Future 實現,方法名為 __schedule_callbacks,源碼如下:

     

    
        def __schedule_callbacks(self):
            callbacks = self._callbacks[:]
            if not callbacks:
                return
    
            self._callbacks[:] = []
            # 循環所有回調函數、統一將其安排在下一次循環中按順序執行
            for callback, ctx in callbacks:
                self._loop.call_soon(callback, self, context=ctx)
    

     

    接下來我們只需要找到在那些方法中會調用 __schedule_callbacks 就知道了其執行時機,以下方法均為 Future 類提供:

     

        def set_result(self, result):
            if self._state != _PENDING:
                raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
            self._result = result
            self._state = _FINISHED         # 修改任務狀態
            self.__schedule_callbacks()
    
        def set_exception(self, exception):
            if self._state != _PENDING:
                raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
            if isinstance(exception, type):
                exception = exception()
            if type(exception) is StopIteration:
                raise TypeError("StopIteration interacts badly with generators "
                                "and cannot be raised into a Future")
            self._exception = exception
            self._exception_tb = exception.__traceback__
            self._state = _FINISHED         # 修改任務狀態
            self.__schedule_callbacks()
            self.__log_traceback = True
    
        def cancel(self, msg=None):
            self.__log_traceback = False
            if self._state != _PENDING:
                return False
            self._state = _CANCELLED       # 修改任務狀態
            self._cancel_message = msg
            self.__schedule_callbacks()
            return True
    

     

    以此可見,回調函數的執行會放在事件循環的就緒隊列中,如果 task 或者 future 的 callback 在執行過程中擁有較長的阻塞時長時,將會阻塞整個事件循環!

    除此之外,每一次 callback 的執行必須是在當前主任務運行完畢后執行。舉個例子:

     

    ready = [task1, task2, task3]
    

     

    若第一個 task 有 callback, 則其 callback 會放在最后:

     

    ready = [task2, task3, task1_cb]
    

     

    callback 運行前必須先運行 task2 和 task3。

    主協程任務的結束

    當主協程任務結束后,所有的子協程任務也會結束掉。這是為什么呢?我們繼續從源碼角度進行分析。

    首先在 BaseEventLoop.run_until_complete() 方法中,_ready 隊列會在下次循環中添加 1 個 callback:

     

    future.add_done_callback(_run_until_complete_cb)
    
    
    def add_done_callback(self, fn, *, context=None):
        if self._state != _PENDING:
            self._loop.call_soon(fn, self, context=context)
        else:
            # 主協程任務的狀態此時應該是 peding
            # 所以他只會在主協程任務結束后將回調添加到 ready 隊列中
            if context is None:
                context = contextvars.copy_context()
            self._callbacks.append((fn, context))
    

     

    當主協程任務在 BaseEventLoop.__step() 方法中被運行 set_result()、set_exception()、或者 cancel() 任意一個時,base_events._run_until_complete_cb() 都會被添加進 _ready 隊列中。

    而 base_events._run_until_complete_cb() 方法的實現如下:

     

    def _run_until_complete_cb(fut):
        if not fut.cancelled():
            exc = fut.exception()
            if isinstance(exc, (SystemExit, KeyboardInterrupt)):
                # Issue #22429: run_forever() already finished, no need to
                # stop it.
                return
        futures._get_loop(fut).stop()
    

     

    事件循環的 stop() 方法實現、直接看 BaseEventLoop.stop() 即可,因為 _UnixSelectorEventLoop 包括 BaseSelectorEventLoop 都未實現該方法:

     

        def stop(self):
            self._stopping = True
    

    最后再回過頭看 BaseEventLoop.run_forever() 方法,是不是明了了些?:

     

        def run_forever(self):
            # ...
            try:
                self._thread_id = threading.get_ident()
                sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
                                       finalizer=self._asyncgen_finalizer_hook)
    
                events._set_running_loop(self)
                while True:
                    self._run_once()
                    if self._stopping:
                        break
            # ...
    

     

    總結、在主協程任務運行時,其 callback 方法 base_events._run_until_complete_cb() 并不會馬上添加至 ready 隊列中。

    一但主協程任務運行完畢(調用 cancel()、set_result()、set_exception())時,callback 會立即添加到 ready 隊列中。

    這意味著事件循環即將結束,但在 callback 之前的子任務還可以繼續運行,一旦當 callback 執行完畢,那么就意味著事件循環被關閉掉了。BaseEventLoop._run_once() 方法也不會繼續運行。至此整個事件循環的生命周期才真正結束。

    事件循環啟動和任務執行流程圖

    基本的事件循環啟動和任務執行流程圖如下:

    Python asyncio 庫源碼分析  圖3

     

    本章結語

    由于平時要忙工作什么的,算下來這篇文章總共花了我大概小半個月時間,不過算起來收獲頗豐。

    至少筆者在讀完 asyncio 事件循環后,也有了一些新的感悟:

    • 每一個事件循環都有一個 sock 對象和一個系統選擇器,這也是 loop.create_server() 方法的基礎,在每次運行 BaseEventLoop._run_once() 方法時都會去檢測一遍系統選擇器有沒有準備好的事件描述符,若有則運行其他邏輯(當然這部分還沒有深入研究,但大體上是不會錯的)
    • 事件循環中有很多對 loop 的操作,如 new_event_loop()、set_event_loop()、get_event_loop()、get_running_loop() 等等,通過源碼閱讀可以更好的清楚他們的作用
    • 清楚了 create_task() 以及 call_soon() 方法的關系,明白了 Task 對象和 Future 對象以及 Handle 對象的關系
    • 知道了事件循環是定序執行子任務的,也知道了回調函數的添加以及執行時機,更重要的是明白了事件循環是如何實現的
    • 知曉了一些鉤子函數的真實作用,如 set_task_factory() 等等

    其實 asyncio 不止單單一個事件循環、除此之外還有 socket、流式傳輸、各種鎖的應用等等,事件循環只能說是 asyncio 中的基礎。

    最后的最后,希望大家繼續努力,保持學習,不忘初心。

    還是開篇那句話 「路漫漫其修遠兮, 吾將上下而求索」與諸君共勉之。

    轉載自:https://www.cnblogs.com/hanabi-cnblogs/p/17494522.html
    欧美精品18videosex性欧美,老师的粉嫩小又紧水又多,久久国产高潮流白浆免费观看,国产睡熟迷奷系列网站
    <td id="6000o"><rt id="6000o"></rt></td>
  • <sup id="6000o"><button id="6000o"></button></sup>
  • <optgroup id="6000o"></optgroup>
  • <strong id="6000o"></strong>