-
Notifications
You must be signed in to change notification settings - Fork 69
Expand file tree
/
Copy pathtest_consumer.py
More file actions
62 lines (42 loc) · 1.83 KB
/
test_consumer.py
File metadata and controls
62 lines (42 loc) · 1.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from __future__ import annotations
import asyncio
import pytest
from graphql_server.channels.handlers.base import ChannelsConsumer
class DummyChannelLayer:
def __init__(self) -> None:
self.added: list[tuple[str, str]] = []
self.discarded: list[tuple[str, str]] = []
async def group_add(self, group: str, channel: str) -> None:
self.added.append((group, channel))
async def group_discard(self, group: str, channel: str) -> None:
self.discarded.append((group, channel))
@pytest.mark.asyncio
async def test_channel_listen_receives_messages_and_cleans_up() -> None:
consumer = ChannelsConsumer()
layer = DummyChannelLayer()
consumer.channel_layer = layer
consumer.channel_name = "chan"
gen = consumer.channel_listen("test.message", groups=["g"], timeout=0.1)
async def send() -> None:
await asyncio.sleep(0)
queue = next(iter(consumer.listen_queues["test.message"]))
queue.put_nowait({"type": "test.message", "payload": 1})
asyncio.create_task(send())
with pytest.deprecated_call(match="Use listen_to_channel instead"):
message = await gen.__anext__()
assert message == {"type": "test.message", "payload": 1}
await gen.aclose()
assert layer.added == [("g", "chan")]
assert layer.discarded == [("g", "chan")]
@pytest.mark.asyncio
async def test_channel_listen_times_out() -> None:
consumer = ChannelsConsumer()
layer = DummyChannelLayer()
consumer.channel_layer = layer
consumer.channel_name = "chan"
gen = consumer.channel_listen("test.message", groups=["g"], timeout=0.01)
with pytest.deprecated_call(match="Use listen_to_channel instead"):
with pytest.raises(StopAsyncIteration):
await gen.__anext__()
assert layer.added == [("g", "chan")]
assert layer.discarded == [("g", "chan")]