-
-
Notifications
You must be signed in to change notification settings - Fork 237
Expand file tree
/
Copy pathconftest.py
More file actions
530 lines (426 loc) · 14.5 KB
/
conftest.py
File metadata and controls
530 lines (426 loc) · 14.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
import asyncio
import uuid
from typing import Any, AsyncGenerator
from unittest.mock import Mock
import pytest
from fastapi import FastAPI
from httpx import AsyncClient, ASGITransport
{%- if cookiecutter.enable_redis == "True" %}
from fakeredis import FakeServer
from fakeredis.aioredis import FakeConnection
from redis.asyncio import ConnectionPool
from {{cookiecutter.project_name}}.services.redis.dependency import get_redis_pool
{%- endif %}
{%- if cookiecutter.enable_rmq == "True" %}
from aio_pika import Channel
from aio_pika.abc import AbstractExchange, AbstractQueue
from aio_pika.pool import Pool
from {{cookiecutter.project_name}}.services.rabbit.dependencies import \
get_rmq_channel_pool
from {{cookiecutter.project_name}}.services.rabbit.lifespan import (init_rabbit,
shutdown_rabbit)
{%- endif %}
{%- if cookiecutter.enable_kafka == "True" %}
from aiokafka import AIOKafkaProducer
from {{cookiecutter.project_name}}.services.kafka.dependencies import get_kafka_producer
from {{cookiecutter.project_name}}.services.kafka.lifespan import (init_kafka,
shutdown_kafka)
{%- endif %}
from {{cookiecutter.project_name}}.settings import settings
from {{cookiecutter.project_name}}.web.application import get_app
{%- if cookiecutter.orm == "sqlalchemy" %}
from sqlalchemy.ext.asyncio import (AsyncConnection, AsyncEngine, AsyncSession,
async_sessionmaker, create_async_engine)
from {{cookiecutter.project_name}}.db.dependencies import get_db_session
from {{cookiecutter.project_name}}.db.utils import create_database, drop_database
{%- elif cookiecutter.orm == "tortoise" %}
import nest_asyncio
from tortoise import Tortoise
from tortoise.contrib.test import finalizer, initializer
from {{cookiecutter.project_name}}.db.config import MODELS_MODULES, TORTOISE_CONFIG
nest_asyncio.apply()
{%- elif cookiecutter.orm == "ormar" %}
from sqlalchemy.engine import create_engine
from {{cookiecutter.project_name}}.db.base import database
from {{cookiecutter.project_name}}.db.utils import create_database, drop_database
{%- elif cookiecutter.orm == "psycopg" %}
from psycopg import AsyncConnection
from psycopg_pool import AsyncConnectionPool
from {{cookiecutter.project_name}}.db.dependencies import get_db_pool
{%- elif cookiecutter.orm == "piccolo" %}
{%- if cookiecutter.db_info.name == "postgresql" %}
from piccolo.engine.postgres import PostgresEngine
{%- endif %}
from piccolo.conf.apps import Finder
from piccolo.table import create_tables, drop_tables
{%- elif cookiecutter.orm == "beanie" %}
import beanie
from pymongo import AsyncMongoClient
{%- endif %}
@pytest.fixture(scope="session")
def anyio_backend() -> str:
"""
Backend for anyio pytest plugin.
:return: backend name.
"""
return 'asyncio'
{%- if cookiecutter.orm == "sqlalchemy" %}
@pytest.fixture(scope="session")
async def _engine(anyio_backend: Any) -> AsyncGenerator[AsyncEngine, None]:
"""
Create engine and databases.
:yield: new engine.
"""
from {{cookiecutter.project_name}}.db.meta import meta
from {{cookiecutter.project_name}}.db.models import load_all_models
load_all_models()
await create_database()
engine = create_async_engine(str(settings.db_url))
async with engine.begin() as conn:
await conn.run_sync(meta.create_all)
try:
yield engine
finally:
await engine.dispose()
await drop_database()
@pytest.fixture
async def dbsession(
_engine: AsyncEngine,
) -> AsyncGenerator[AsyncSession, None]:
"""
Get session to database.
Fixture that returns a SQLAlchemy session with a SAVEPOINT, and the rollback to it
after the test completes.
:param _engine: current engine.
:yields: async session.
"""
connection = await _engine.connect()
trans = await connection.begin()
session_maker = async_sessionmaker(
connection,
expire_on_commit=False,
)
session = session_maker()
try:
yield session
finally:
await session.close()
await trans.rollback()
await connection.close()
{%- elif cookiecutter.orm == "tortoise" %}
@pytest.fixture(autouse=True)
async def initialize_db() -> AsyncGenerator[None, None]:
"""
Initialize models and database.
:yields: Nothing.
"""
initializer(
MODELS_MODULES,
db_url=str(settings.db_url),
app_label="models",
)
await Tortoise.init(config=TORTOISE_CONFIG)
yield
await Tortoise.close_connections()
finalizer()
{%- elif cookiecutter.orm == "ormar" %}
@pytest.fixture(autouse=True, scope="function")
async def initialize_db() -> AsyncGenerator[None, None]:
"""
Create models and databases.
:yield: new engine.
"""
from {{cookiecutter.project_name}}.db.base import meta
from {{cookiecutter.project_name}}.db.models import load_all_models
load_all_models()
create_database()
engine = create_engine(str(settings.db_url))
with engine.begin() as conn:
meta.create_all(conn)
engine.dispose()
await database.connect()
yield
await database.disconnect()
engine = create_engine(str(settings.db_url))
with engine.begin() as conn:
meta.drop_all(conn)
engine.dispose()
drop_database()
{%- elif cookiecutter.orm == "psycopg" %}
async def drop_db() -> None:
"""Drops database after tests."""
pool = AsyncConnectionPool(conninfo=str(settings.db_url.with_path("/postgres")), open=False)
await pool.open(wait=True)
async with pool.connection() as conn:
await conn.set_autocommit(True)
await conn.execute(
"SELECT pg_terminate_backend(pg_stat_activity.pid) " # noqa: S608
"FROM pg_stat_activity "
"WHERE pg_stat_activity.datname = %(dbname)s "
"AND pid <> pg_backend_pid();",
params={
"dbname": settings.db_base,
}
)
await conn.execute(
f"DROP DATABASE {settings.db_base}",
)
await pool.close()
async def create_db() -> None:
"""Creates database for tests."""
pool = AsyncConnectionPool(conninfo=str(settings.db_url.with_path("/postgres")), open=False)
await pool.open(wait=True)
async with pool.connection() as conn_check:
res = await conn_check.execute(
"SELECT 1 FROM pg_database WHERE datname=%(dbname)s",
params={
"dbname": settings.db_base,
}
)
db_exists = False
row = await res.fetchone()
if row is not None:
db_exists = row[0]
if db_exists:
await drop_db()
async with pool.connection() as conn_create:
await conn_create.set_autocommit(True)
await conn_create.execute(
f"CREATE DATABASE {settings.db_base};",
)
await pool.close()
async def create_tables(connection: AsyncConnection[Any]) -> None:
"""
Create tables for your database.
Since psycopg doesn't have migration tool,
you must create your tables for tests.
:param connection: connection to database.
"""
{%- if cookiecutter.add_dummy == 'True' %}
await connection.execute(
"CREATE TABLE dummy ("
"id SERIAL primary key,"
"name VARCHAR(200)"
");"
)
{%- endif %}
pass
@pytest.fixture
async def dbpool() -> AsyncGenerator[AsyncConnectionPool[Any], None]:
"""
Creates database connections pool to test database.
This connection must be used in tests and for application.
:yield: database connections pool.
"""
await create_db()
pool = AsyncConnectionPool(conninfo=str(settings.db_url), open=False)
await pool.open(wait=True)
async with pool.connection() as create_conn:
await create_tables(create_conn)
try:
yield pool
finally:
await pool.close()
await drop_db()
{%- elif cookiecutter.orm == "piccolo" %}
{%- if cookiecutter.db_info.name == "postgresql" %}
async def drop_database(engine: PostgresEngine) -> None:
"""
Drops test database.
:param engine: engine connected to postgres database.
"""
await engine.run_ddl(
"SELECT pg_terminate_backend(pg_stat_activity.pid) " # noqa: S608
"FROM pg_stat_activity "
f"WHERE pg_stat_activity.datname = '{settings.db_base}' "
"AND pid <> pg_backend_pid();",
)
await engine.run_ddl(
f"DROP DATABASE {settings.db_base};",
)
{%- endif %}
@pytest.fixture(autouse=True)
async def setup_db() -> AsyncGenerator[None, None]:
"""
Fixture to create all tables before test and drop them after.
:yield: nothing.
"""
{%- if cookiecutter.db_info.name == "postgresql" %}
engine = PostgresEngine(
config={
"database": "postgres",
"user": settings.db_user,
"password": settings.db_pass,
"host": settings.db_host,
"port": settings.db_port,
},
)
await engine.start_connection_pool()
db_exists = await engine.run_ddl(
f"SELECT 1 FROM pg_database WHERE datname='{settings.db_base}'" # noqa: S608
)
if db_exists:
await drop_database(engine)
await engine.run_ddl(f"CREATE DATABASE {settings.db_base}")
{%- endif %}
tables = Finder().get_table_classes()
create_tables(*tables, if_not_exists=True)
yield
drop_tables(*tables)
{%- if cookiecutter.db_info.name == "postgresql" %}
await drop_database(engine)
{%- endif %}
{%- elif cookiecutter.orm == "beanie" %}
@pytest.fixture(autouse=True)
async def setup_db() -> AsyncGenerator[None, None]:
"""
Fixture to create database connection.
:yield: nothing.
"""
client = AsyncMongoClient(settings.db_url.human_repr()) # type: ignore
from {{cookiecutter.project_name}}.db.models import load_all_models
await beanie.init_beanie(
database=client[settings.db_base],
document_models=load_all_models(), # type: ignore
)
yield
{%- endif %}
{%- if cookiecutter.enable_rmq == 'True' %}
@pytest.fixture
async def test_rmq_pool() -> AsyncGenerator[Channel, None]:
"""
Create rabbitMQ pool.
:yield: channel pool.
"""
app_mock = Mock()
init_rabbit(app_mock)
yield app_mock.state.rmq_channel_pool
await shutdown_rabbit(app_mock)
@pytest.fixture
async def test_exchange_name() -> str:
"""
Name of an exchange to use in tests.
:return: name of an exchange.
"""
return uuid.uuid4().hex
@pytest.fixture
async def test_routing_key() -> str:
"""
Name of routing key to use while binding test queue.
:return: key string.
"""
return uuid.uuid4().hex
@pytest.fixture
async def test_exchange(
test_exchange_name: str,
test_rmq_pool: Pool[Channel],
) -> AsyncGenerator[AbstractExchange, None]:
"""
Creates test exchange.
:param test_exchange_name: name of an exchange to create.
:param test_rmq_pool: channel pool for rabbitmq.
:yield: created exchange.
"""
async with test_rmq_pool.acquire() as conn:
exchange = await conn.declare_exchange(
name=test_exchange_name,
auto_delete=True,
)
yield exchange
await exchange.delete(if_unused=False)
@pytest.fixture
async def test_queue(
test_exchange: AbstractExchange,
test_rmq_pool: Pool[Channel],
test_routing_key: str,
) -> AsyncGenerator[AbstractQueue, None]:
"""
Creates queue connected to exchange.
:param test_exchange: exchange to bind queue to.
:param test_rmq_pool: channel pool for rabbitmq.
:param test_routing_key: routing key to use while binding.
:yield: queue binded to test exchange.
"""
async with test_rmq_pool.acquire() as conn:
queue = await conn.declare_queue(name=uuid.uuid4().hex)
await queue.bind(
exchange=test_exchange,
routing_key=test_routing_key,
)
yield queue
await queue.delete(if_unused=False, if_empty=False)
{%- endif %}
{%- if cookiecutter.enable_kafka == "True" %}
@pytest.fixture
async def test_kafka_producer() -> AsyncGenerator[AIOKafkaProducer, None]:
"""
Creates kafka's producer.
:yields: kafka's producer.
"""
app_mock = Mock()
await init_kafka(app_mock)
yield app_mock.state.kafka_producer
await shutdown_kafka(app_mock)
{%- endif %}
{% if cookiecutter.enable_redis == "True" -%}
@pytest.fixture
async def fake_redis_pool() -> AsyncGenerator[ConnectionPool, None]:
"""
Get instance of a fake redis.
:yield: FakeRedis instance.
"""
server = FakeServer()
server.connected = True
pool = ConnectionPool(connection_class=FakeConnection, server=server)
yield pool
await pool.disconnect()
{%- endif %}
@pytest.fixture
def fastapi_app(
{%- if cookiecutter.orm == "sqlalchemy" %}
dbsession: AsyncSession,
{%- elif cookiecutter.orm == "psycopg" %}
dbpool: AsyncConnectionPool[Any],
{%- endif %}
{% if cookiecutter.enable_redis == "True" -%}
fake_redis_pool: ConnectionPool,
{%- endif %}
{%- if cookiecutter.enable_rmq == 'True' %}
test_rmq_pool: Pool[Channel],
{%- endif %}
{%- if cookiecutter.enable_kafka == "True" %}
test_kafka_producer: AIOKafkaProducer,
{%- endif %}
) -> FastAPI:
"""
Fixture for creating FastAPI app.
:return: fastapi app with mocked dependencies.
"""
application = get_app()
{%- if cookiecutter.orm == "sqlalchemy" %}
application.dependency_overrides[get_db_session] = lambda: dbsession
{%- elif cookiecutter.orm == "psycopg" %}
application.dependency_overrides[get_db_pool] = lambda: dbpool
{%- endif %}
{%- if cookiecutter.enable_redis == "True" %}
application.dependency_overrides[get_redis_pool] = lambda: fake_redis_pool
{%- endif %}
{%- if cookiecutter.enable_rmq == 'True' %}
application.dependency_overrides[get_rmq_channel_pool] = lambda: test_rmq_pool
{%- endif %}
{%- if cookiecutter.enable_kafka == "True" %}
application.dependency_overrides[get_kafka_producer] = lambda: test_kafka_producer
{%- endif %}
return application # noqa: RET504
@pytest.fixture
async def client(
fastapi_app: FastAPI,
anyio_backend: Any
) -> AsyncGenerator[AsyncClient, None]:
"""
Fixture that creates client for requesting server.
:param fastapi_app: the application.
:yield: client for the app.
"""
async with AsyncClient(transport=ASGITransport(fastapi_app), base_url="http://test", timeout=2.0) as ac:
yield ac