Skip to content

Commit 484047d

Browse files
Datarace fix, Kafka refactoring (#195)
1 parent 1361a58 commit 484047d

17 files changed

Lines changed: 262 additions & 226 deletions

File tree

auth_backend/auth_plugins/email.py

Lines changed: 157 additions & 153 deletions
Large diffs are not rendered by default.

auth_backend/auth_plugins/github.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,11 @@ async def _register(
106106
gh_id = cls.create_auth_method_param('user_id', github_user_id, user.id, db_session=db.session)
107107
new_user[cls.get_name()] = {"user_id": gh_id.value}
108108
userdata = await GithubAuth._convert_data_to_userdata_format(userinfo)
109-
await get_kafka_producer().produce(
109+
background_tasks.add_task(
110+
get_kafka_producer().produce,
110111
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
111112
GithubAuth.generate_kafka_key(user.id),
112113
userdata,
113-
bg_tasks=background_tasks,
114114
)
115115
await AuthPluginMeta.user_updated(new_user, old_user)
116116
return await cls._create_session(
@@ -162,11 +162,11 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun
162162
'No users found for github account', 'Не найдено пользователей для аккаунта GitHub', id_token
163163
)
164164
userdata = await GithubAuth._convert_data_to_userdata_format(userinfo)
165-
await get_kafka_producer().produce(
165+
background_tasks.add_task(
166+
get_kafka_producer().produce,
166167
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
167168
GithubAuth.generate_kafka_key(user.id),
168169
userdata,
169-
bg_tasks=background_tasks,
170170
)
171171
return await cls._create_session(
172172
user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name

auth_backend/auth_plugins/google.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,11 @@ async def _register(
114114
google_id = cls.create_auth_method_param('unique_google_id', userinfo['sub'], user.id, db_session=db.session)
115115
new_user = {cls.get_name(): {"unique_google_id": google_id.value}}
116116
userdata = await GoogleAuth._convert_data_to_userdata_format(userinfo)
117-
await get_kafka_producer().produce(
117+
background_tasks.add_task(
118+
get_kafka_producer().produce,
118119
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
119120
GoogleAuth.generate_kafka_key(user.id),
120121
userdata,
121-
bg_tasks=background_tasks,
122122
)
123123
await AuthPluginMeta.user_updated(new_user, old_user)
124124
return await cls._create_session(
@@ -154,11 +154,11 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun
154154
id_token=credentials.get("id_token"),
155155
)
156156
userdata = await GoogleAuth._convert_data_to_userdata_format(userinfo)
157-
await get_kafka_producer().produce(
157+
background_tasks.add_task(
158+
get_kafka_producer().produce,
158159
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
159160
GoogleAuth.generate_kafka_key(user.id),
160161
userdata,
161-
bg_tasks=background_tasks,
162162
)
163163
return await cls._create_session(
164164
user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name

auth_backend/auth_plugins/keycloak.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,11 @@ async def _register(
102102
keycloak_id = cls.create_auth_method_param('user_id', keycloak_user_id, user.id, db_session=db.session)
103103
new_user = {cls.get_name(): {"user_id": keycloak_id.value}}
104104
userdata = await KeycloakAuth._convert_data_to_userdata_format(userinfo)
105-
await get_kafka_producer().produce(
105+
background_tasks.add_task(
106+
get_kafka_producer().produce,
106107
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
107108
KeycloakAuth.generate_kafka_key(user.id),
108109
userdata,
109-
bg_tasks=background_tasks,
110110
)
111111
await AuthPluginMeta.user_updated(new_user, old_user)
112112
return await cls._create_session(
@@ -153,11 +153,11 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun
153153
id_token = jwt.encode(userinfo, cls.settings.ENCRYPTION_KEY, algorithm="HS256")
154154
raise OauthAuthFailed('No users found for keycloak account', id_token)
155155
userdata = await KeycloakAuth._convert_data_to_userdata_format(userinfo)
156-
await get_kafka_producer().produce(
156+
background_tasks.add_task(
157+
get_kafka_producer().produce,
157158
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
158159
KeycloakAuth.generate_kafka_key(user.id),
159160
userdata,
160-
bg_tasks=background_tasks,
161161
)
162162
return await cls._create_session(
163163
user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name

auth_backend/auth_plugins/lkmsu.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,11 @@ async def _register(
101101
lk_id = cls.create_auth_method_param('user_id', lk_user_id, user.id, db_session=db.session)
102102
new_user = {cls.get_name(): {"user_id": lk_id.value}}
103103
userdata = await LkmsuAuth._convert_data_to_userdata_format(userinfo)
104-
await get_kafka_producer().produce(
104+
background_tasks.add_task(
105+
get_kafka_producer().produce,
105106
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
106107
LkmsuAuth.generate_kafka_key(user.id),
107108
userdata,
108-
bg_tasks=background_tasks,
109109
)
110110
await AuthPluginMeta.user_updated(new_user, old_user)
111111
return await cls._create_session(
@@ -154,11 +154,11 @@ async def _login(
154154
'No users found for lk msu account', 'Не найдено пользователей с таким аккаунтом LK MSU', id_token
155155
)
156156
userdata = await LkmsuAuth._convert_data_to_userdata_format(userinfo)
157-
await get_kafka_producer().produce(
157+
background_tasks.add_task(
158+
get_kafka_producer().produce,
158159
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
159160
LkmsuAuth.generate_kafka_key(user.id),
160161
userdata,
161-
bg_tasks=background_tasks,
162162
)
163163
return await cls._create_session(
164164
user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name

auth_backend/auth_plugins/telegram.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,11 @@ async def _register(
8080
tg_id = cls.create_auth_method_param('user_id', telegram_user_id, user.id, db_session=db.session)
8181
new_user[cls.get_name()]["user_id"] = tg_id.value
8282
userdata = await TelegramAuth._convert_data_to_userdata_format(userinfo)
83-
await get_kafka_producer().produce(
83+
background_tasks.add_task(
84+
get_kafka_producer().produce,
8485
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
8586
TelegramAuth.generate_kafka_key(user.id),
8687
userdata,
87-
bg_tasks=background_tasks,
8888
)
8989
await AuthPluginMeta.user_updated(new_user, old_user)
9090
return await cls._create_session(
@@ -111,11 +111,11 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun
111111
'No users found for Telegram account', 'Не найдено пользователей с таким ТГ аккаунтом', id_token
112112
)
113113
userdata = await TelegramAuth._convert_data_to_userdata_format(userinfo)
114-
await get_kafka_producer().produce(
114+
background_tasks.add_task(
115+
get_kafka_producer().produce,
115116
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
116117
TelegramAuth.generate_kafka_key(user.id),
117118
userdata,
118-
bg_tasks=background_tasks,
119119
)
120120
return await cls._create_session(
121121
user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name

auth_backend/auth_plugins/vk.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,11 @@ async def _register(
113113
vk_id = cls.create_auth_method_param('user_id', vk_user_id, user.id, db_session=db.session)
114114
new_user[cls.get_name()]["user_id"] = vk_id.value
115115
userdata = await VkAuth._convert_data_to_userdata_format(userinfo['response'][0])
116-
await get_kafka_producer().produce(
116+
background_tasks.add_task(
117+
get_kafka_producer().produce,
117118
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
118119
VkAuth.generate_kafka_key(user.id),
119120
userdata,
120-
bg_tasks=background_tasks,
121121
)
122122
await AuthPluginMeta.user_updated(new_user, old_user)
123123
return await cls._create_session(
@@ -163,11 +163,11 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun
163163
'No users found for VK account', 'Не найдено пользователей с таким аккаунтом ВК', id_token
164164
)
165165
userdata = await VkAuth._convert_data_to_userdata_format(userinfo['response'][0])
166-
await get_kafka_producer().produce(
166+
background_tasks.add_task(
167+
get_kafka_producer().produce,
167168
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
168169
VkAuth.generate_kafka_key(user.id),
169170
userdata,
170-
bg_tasks=background_tasks,
171171
)
172172
return await cls._create_session(
173173
user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name

auth_backend/auth_plugins/yandex.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,11 @@ async def _register(
118118
ya_id = cls.create_auth_method_param('user_id', yandex_user_id, user.id, db_session=db.session)
119119
new_user[cls.get_name()]["user_id"] = ya_id.value
120120
userdata = await YandexAuth._convert_data_to_userdata_format(userinfo)
121-
await get_kafka_producer().produce(
121+
background_tasks.add_task(
122+
get_kafka_producer().produce,
122123
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
123124
YandexAuth.generate_kafka_key(user.id),
124125
userdata,
125-
bg_tasks=background_tasks,
126126
)
127127
await AuthPluginMeta.user_updated(new_user, old_user)
128128
return await cls._create_session(
@@ -167,11 +167,11 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun
167167
'No users found for Yandex account', 'Не найдено пользователей для аккаунт Яндекс', id_token
168168
)
169169
userdata = await YandexAuth._convert_data_to_userdata_format(userinfo)
170-
await get_kafka_producer().produce(
170+
background_tasks.add_task(
171+
get_kafka_producer().produce,
171172
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
172173
YandexAuth.generate_kafka_key(user.id),
173174
userdata,
174-
bg_tasks=background_tasks,
175175
)
176176
return await cls._create_session(
177177
user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name

auth_backend/exceptions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def __init__(self, dtime: datetime.timedelta):
6161
self.delay_time = dtime
6262
super().__init__(
6363
f'Too many email requests. Delay: {dtime}',
64-
f'Слишком много запрос к email. Задержка: {dtime}',
64+
f'Слишком много запросов к email. Задержка: {dtime}',
6565
)
6666

6767

auth_backend/kafka/kafka.py

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
from confluent_kafka import KafkaError, KafkaException, Message, Producer
66
from event_schema.auth import UserLogin, UserLoginKey
7-
from fastapi import BackgroundTasks
87

98
from auth_backend import __version__
109
from auth_backend.kafka.kafkameta import KafkaMeta
@@ -14,7 +13,7 @@
1413
log = logging.getLogger(__name__)
1514

1615

17-
class AIOKafka(KafkaMeta):
16+
class Kafka(KafkaMeta):
1817
"""
1918
Класс для работы с Kafka
2019
"""
@@ -59,7 +58,7 @@ def delivery_callback(self, err: KafkaError, msg: Message) -> None:
5958
else:
6059
log.info('%% Message delivered to %s [%d] @ %d\n' % (msg.topic(), msg.partition(), msg.offset()))
6160

62-
def _produce(self, topic: str, key: UserLoginKey, value: UserLogin) -> None:
61+
def produce(self, topic: str, key: UserLoginKey, value: UserLogin) -> None:
6362
"""
6463
Отправляет сообщение в Kafka
6564
Args:
@@ -82,29 +81,15 @@ def _produce(self, topic: str, key: UserLoginKey, value: UserLogin) -> None:
8281

8382
self._producer.poll(0)
8483

85-
async def produce(self, topic: str, key: UserLoginKey, value: UserLogin, *, bg_tasks: BackgroundTasks) -> None:
86-
"""
87-
Добавляет отправку сообщения в фоновые задачи
88-
Args:
89-
topic: топик в который будет написано сообщение
90-
key: ключ сообщения
91-
value: значение сообщение
92-
bg_tasks: fastapi background_tasks
93-
94-
Returns:
95-
Ничего
96-
"""
97-
bg_tasks.add_task(self._produce, topic, key, value)
98-
99-
async def close(self) -> None:
84+
def close(self) -> None:
10085
self._producer.flush()
10186

10287

103-
class AIOKafkaMock(KafkaMeta):
104-
async def produce(self, topic: str, key: Any, value: Any, *, bg_tasks: BackgroundTasks) -> Any:
88+
class KafkaMock(KafkaMeta):
89+
def produce(self, topic: str, key: Any, value: Any) -> Any:
10590
log.debug(f"Kafka cluster disabled, debug msg: {topic=}, {key=}, {value=}")
10691

107-
async def close(self) -> None:
92+
def close(self) -> None:
10893
return
10994

11095

@@ -115,5 +100,5 @@ def get_kafka_producer() -> KafkaMeta:
115100
иначе Mock кафки
116101
"""
117102
if get_settings().KAFKA_DSN:
118-
return AIOKafka()
119-
return AIOKafkaMock()
103+
return Kafka()
104+
return KafkaMock()

0 commit comments

Comments
 (0)