-
Notifications
You must be signed in to change notification settings - Fork 372
Expand file tree
/
Copy pathusers.py
More file actions
107 lines (93 loc) · 2.36 KB
/
users.py
File metadata and controls
107 lines (93 loc) · 2.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from fastapi import APIRouter, Request, Depends, Response, encoders
import typing as t
from app.db.session import get_db
from app.db.users.crud import (
get_users,
get_user,
create_user,
delete_user,
edit_user,
)
from app.db.users.schemas import UserCreate, UserEdit, User, UserOut
from app.core.auth import get_current_active_user, get_current_active_superuser
users_router = r = APIRouter()
@r.get(
"/users",
response_model=t.List[User],
response_model_exclude_none=True,
)
async def users_list(
response: Response,
db=Depends(get_db),
current_user=Depends(get_current_active_superuser),
):
"""
Get all users
"""
users = get_users(db)
# This is necessary for react-admin to work
response.headers["Content-Range"] = f"0-9/{len(users)}"
return users
@r.get("/users/me", response_model=User, response_model_exclude_none=True)
async def user_me(current_user=Depends(get_current_active_user)):
"""
Get own user
"""
return current_user
@r.get(
"/users/{user_id}",
response_model=User,
response_model_exclude_none=True,
)
async def user_details(
request: Request,
user_id: int,
db=Depends(get_db),
current_user=Depends(get_current_active_superuser),
):
"""
Get any user details
"""
user = get_user(db, user_id)
return user
# return encoders.jsonable_encoder(
# user, skip_defaults=True, exclude_none=True,
# )
@r.post("/users", response_model=User, response_model_exclude_none=True)
async def user_create(
request: Request,
user: UserCreate,
db=Depends(get_db),
current_user=Depends(get_current_active_superuser),
):
"""
Create a new user
"""
return create_user(db, user)
@r.put(
"/users/{user_id}", response_model=User, response_model_exclude_none=True
)
async def user_edit(
request: Request,
user_id: int,
user: UserEdit,
db=Depends(get_db),
current_user=Depends(get_current_active_superuser),
):
"""
Update existing user
"""
return edit_user(db, user_id, user)
@r.delete(
"/users/{user_id}", response_model=User, response_model_exclude_none=True
)
async def user_delete(
request: Request,
user_id: int,
db=Depends(get_db),
current_user=Depends(get_current_active_superuser),
):
"""
Delete existing user
"""
return delete_user(db, user_id)