目標(biāo)
用一個(gè)等待過(guò)程理解,能夠?qū)崿F(xiàn)一致的方便,70行代碼
例子說(shuō)明
某方需要2,執(zhí)行req2需要執(zhí)行這個(gè)時(shí)間,最終執(zhí)行時(shí)間是3秒,大約1秒,同時(shí)執(zhí)行;如果最終時(shí)間是秒,如果執(zhí)行的話,具體請(qǐng)參看,友情提示是,yield from 方法后面如果是它的工具,可以進(jìn)入____
import time
from collections import deque
_delay = deque()
class FutureX:
def __init__(self, coro=None, delay_second=None):
self.coro = coro
if delay_second:
self.start = delay_second + time.time()
def step(self):
coro = self.coro
try:
result = coro.send(None)
except StopIteration as e:
print(e.value)
pass
else:
if isinstance(result, FutureX):
_delay.append((self._wakeup, result))
else:
pass
def _wakeup(self):
self.step()
def __iter__(self):
yield self
return None
def coroutine(func):
co = func.__code__
func.__code__ = co.replace(co_flags=co.co_flags | 0x100)
return func
@coroutine
def sleep0(seconds):
future = FutureX(delay_second=seconds)
b = yield from future
return seconds
async def req1(delay_seconds):
resp_time = await sleep0(delay_seconds)
return resp_time
async def req2(delay_seconds):
resp_time = await sleep0(delay_seconds)
return resp_time
t1 = time.time()
f1, f2 = FutureX(req1(2)), FutureX(req2(1))
f1.step()
f2.step()
while _delay:
callback, args = _delay.popleft()
start = args.start
if not start:
continue
while True:
end = time.time()
if start <= end:
try:
callback()
except StopIteration as e:
pass
break
print(f'花費(fèi)的時(shí)間:{round(time.time() - t1,1)}')
'''
結(jié)果:
2
1
花費(fèi)的時(shí)間:2.0
'''
審核編輯:劉清
-
python
+關(guān)注
關(guān)注
58文章
4889瀏覽量
90330
發(fā)布評(píng)論請(qǐng)先 登錄
[VirtualLab] 使用Python運(yùn)行VirtualLab Fusion光學(xué)仿真
如何使用Python讀取不間斷數(shù)據(jù),并將其傳輸?shù)絃abVIEW進(jìn)行處理?
解析Linux的進(jìn)程、線程和協(xié)程
NICE協(xié)處理器接口信號(hào)解讀--以demo為例
利用Verdi調(diào)試協(xié)處理器的實(shí)現(xiàn)步驟
MD5信息摘要算法實(shí)現(xiàn)二(基于蜂鳥(niǎo)E203協(xié)處理器)
基于E203 RISC-V的音頻信號(hào)處理系統(tǒng) -協(xié)處理器的乘累加過(guò)程
NICE指令的完整執(zhí)行過(guò)程
NICE協(xié)處理器demo分析及測(cè)試
基于E203 NICE協(xié)處理器擴(kuò)展指令
基于E203 NICE協(xié)處理器擴(kuò)展指令2.0
CANoe產(chǎn)品體系19版本新功能(下)
python協(xié)程之a(chǎn)wait等待過(guò)程理解
評(píng)論