Coverage for app / core / security.py: 100%
60 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-21 22:41 +0300
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-21 22:41 +0300
1import secrets
2from datetime import UTC, datetime, timedelta
3from string import ascii_lowercase, digits
4from typing import Annotated
6from app.core.database import get_db
7import jwt
8from bcrypt import checkpw, gensalt, hashpw
9from fastapi import Depends, HTTPException, status
10from fastapi.security import OAuth2PasswordBearer
11from sqlalchemy.ext.asyncio import AsyncSession
13from app.core.config import settings
14from app.core.exceptions import UniqueCodeGenerationError
15from app.models import User
16from app.repositories.link_repository import LinkRepository
17from app.repositories.user_repository import UserRepository
19oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
20oauth2_scheme_optional = OAuth2PasswordBearer(tokenUrl="/auth/login", auto_error=False)
23def get_password_hash(password: str) -> str:
24 """
25 Добавляет соль к паролю и хэширует его.
26 """
27 salt = gensalt()
28 return hashpw(password.encode("utf-8"), salt).decode("utf-8")
31def verify_password(password: str, hashed_password: str) -> bool:
32 """
33 Проверяет, что введённый пароль соответствует хэшу пароля,
34 который хранится в базе данных.
35 """
36 return checkpw(password.encode("utf-8"), hashed_password.encode("utf-8"))
39def create_jwt_token(user_id: int) -> str:
40 """
41 Создаёт JWT-токен.
42 """
43 now = datetime.now(tz=UTC)
44 expire = now + timedelta(minutes=settings.jwt_access_expire_minutes)
45 payload = {"sub": str(user_id), "exp": expire, "iat": now}
46 token = jwt.encode(
47 payload, settings.jwt_secret_key, algorithm=settings.jwt_algorithm
48 )
49 return token
52async def decode_jwt_token(token: str, db: AsyncSession) -> dict:
53 """
54 Получает полезную нагрузку (`payload`) из JWT-токена, извлекает из неё
55 идентификатор пользователя и возвращает соответствующий экземпляр класса `User`.
56 """
57 try:
58 payload = jwt.decode(
59 token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm]
60 )
61 user_id = int(payload.get("sub"))
62 user = await UserRepository.get_by_id(db, user_id)
63 if not user:
64 raise HTTPException(
65 status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found"
66 )
67 return user
69 except jwt.ExpiredSignatureError:
70 raise HTTPException(
71 status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has expired"
72 )
74 except jwt.InvalidTokenError:
75 raise HTTPException(
76 status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token"
77 )
80async def get_current_user(
81 token: Annotated[str, Depends(oauth2_scheme)],
82 db: Annotated[AsyncSession, Depends(get_db)],
83) -> User:
84 """
85 Используется как зависимость для защищённых эндпоинтов.
86 Если токен не передан в заголовке "Authorization: Bearer <token>",
87 то вызывает ошибку аутентификаци. Иначе возвращает экземпляр класса `User`
88 по id, полученному из декодированной полезной нагрузки JWT-токена.
89 """
90 return await decode_jwt_token(token, db)
93async def get_current_user_optional(
94 token: Annotated[str | None, Depends(oauth2_scheme_optional)],
95 db: Annotated[AsyncSession, Depends(get_db)],
96) -> User | None:
97 """
98 Используется как зависимость для опционально защищённых эндпоинтов.
99 Аналогична функции `get_current_user`, но при отсутствии токена
100 не вызывает ошибку, а возвращает `None`.
101 """
102 if not token:
103 return None
104 try:
105 return await decode_jwt_token(token, db)
106 except HTTPException:
107 return None
110def generate_short_code(size: int = 10) -> str:
111 """
112 Создаёт строку, состоящую из латинских букв в нижнем регистре, символов '_-' и цифр.
113 """
114 chars = ascii_lowercase + "_-" + digits
116 return "".join(secrets.choice(chars) for _ in range(size))
119async def generate_short_code_n_times(db: AsyncSession, n_tries: int = 3) -> str:
120 """
121 Генерирует уникальный короткий код несколько раз.
122 """
123 for _ in range(n_tries):
124 random_short_code = generate_short_code()
125 link = await LinkRepository.get_by_short_code(db, random_short_code)
126 if not link:
127 break
128 else:
129 raise UniqueCodeGenerationError()
130 return random_short_code