Files
zvk2_public/tests/zvk/event/test_event_loop.py
2019-03-15 15:02:19 +04:00

36 lines
691 B
Python

import asyncio
import pytest
from zvk.event.consumer import event_consumer
from zvk.event.event import Event
from zvk.event.queue import EventQueue
@event_consumer(consumes=['event_loop'])
async def consumer_loop(event, counter):
counter[0] += 1
await asyncio.sleep(0.1)
yield event
@pytest.mark.asyncio
async def test_event_cancellation():
event_queue = EventQueue()
counter = [0]
event_queue.register_consumer(consumer_loop)
starting_events = [Event('event_loop', counter=counter)]
queue_task = asyncio.create_task(event_queue.run(starting_events))
await asyncio.sleep(0.25)
event_queue.omae_wa_mou_shindeiru()
await queue_task
assert counter[0] == 3