Coverage for app / core / security.py: 100%

60 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-21 22:41 +0300

1import secrets 

2from datetime import UTC, datetime, timedelta 

3from string import ascii_lowercase, digits 

4from typing import Annotated 

5 

6from app.core.database import get_db 

7import jwt 

8from bcrypt import checkpw, gensalt, hashpw 

9from fastapi import Depends, HTTPException, status 

10from fastapi.security import OAuth2PasswordBearer 

11from sqlalchemy.ext.asyncio import AsyncSession 

12 

13from app.core.config import settings 

14from app.core.exceptions import UniqueCodeGenerationError 

15from app.models import User 

16from app.repositories.link_repository import LinkRepository 

17from app.repositories.user_repository import UserRepository 

18 

19oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login") 

20oauth2_scheme_optional = OAuth2PasswordBearer(tokenUrl="/auth/login", auto_error=False) 

21 

22 

23def get_password_hash(password: str) -> str: 

24 """ 

25 Добавляет соль к паролю и хэширует его. 

26 """ 

27 salt = gensalt() 

28 return hashpw(password.encode("utf-8"), salt).decode("utf-8") 

29 

30 

31def verify_password(password: str, hashed_password: str) -> bool: 

32 """ 

33 Проверяет, что введённый пароль соответствует хэшу пароля, 

34 который хранится в базе данных. 

35 """ 

36 return checkpw(password.encode("utf-8"), hashed_password.encode("utf-8")) 

37 

38 

39def create_jwt_token(user_id: int) -> str: 

40 """ 

41 Создаёт JWT-токен. 

42 """ 

43 now = datetime.now(tz=UTC) 

44 expire = now + timedelta(minutes=settings.jwt_access_expire_minutes) 

45 payload = {"sub": str(user_id), "exp": expire, "iat": now} 

46 token = jwt.encode( 

47 payload, settings.jwt_secret_key, algorithm=settings.jwt_algorithm 

48 ) 

49 return token 

50 

51 

52async def decode_jwt_token(token: str, db: AsyncSession) -> dict: 

53 """ 

54 Получает полезную нагрузку (`payload`) из JWT-токена, извлекает из неё 

55 идентификатор пользователя и возвращает соответствующий экземпляр класса `User`. 

56 """ 

57 try: 

58 payload = jwt.decode( 

59 token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm] 

60 ) 

61 user_id = int(payload.get("sub")) 

62 user = await UserRepository.get_by_id(db, user_id) 

63 if not user: 

64 raise HTTPException( 

65 status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found" 

66 ) 

67 return user 

68 

69 except jwt.ExpiredSignatureError: 

70 raise HTTPException( 

71 status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has expired" 

72 ) 

73 

74 except jwt.InvalidTokenError: 

75 raise HTTPException( 

76 status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token" 

77 ) 

78 

79 

80async def get_current_user( 

81 token: Annotated[str, Depends(oauth2_scheme)], 

82 db: Annotated[AsyncSession, Depends(get_db)], 

83) -> User: 

84 """ 

85 Используется как зависимость для защищённых эндпоинтов. 

86 Если токен не передан в заголовке "Authorization: Bearer <token>", 

87 то вызывает ошибку аутентификаци. Иначе возвращает экземпляр класса `User` 

88 по id, полученному из декодированной полезной нагрузки JWT-токена. 

89 """ 

90 return await decode_jwt_token(token, db) 

91 

92 

93async def get_current_user_optional( 

94 token: Annotated[str | None, Depends(oauth2_scheme_optional)], 

95 db: Annotated[AsyncSession, Depends(get_db)], 

96) -> User | None: 

97 """ 

98 Используется как зависимость для опционально защищённых эндпоинтов. 

99 Аналогична функции `get_current_user`, но при отсутствии токена 

100 не вызывает ошибку, а возвращает `None`. 

101 """ 

102 if not token: 

103 return None 

104 try: 

105 return await decode_jwt_token(token, db) 

106 except HTTPException: 

107 return None 

108 

109 

110def generate_short_code(size: int = 10) -> str: 

111 """ 

112 Создаёт строку, состоящую из латинских букв в нижнем регистре, символов '_-' и цифр. 

113 """ 

114 chars = ascii_lowercase + "_-" + digits 

115 

116 return "".join(secrets.choice(chars) for _ in range(size)) 

117 

118 

119async def generate_short_code_n_times(db: AsyncSession, n_tries: int = 3) -> str: 

120 """ 

121 Генерирует уникальный короткий код несколько раз. 

122 """ 

123 for _ in range(n_tries): 

124 random_short_code = generate_short_code() 

125 link = await LinkRepository.get_by_short_code(db, random_short_code) 

126 if not link: 

127 break 

128 else: 

129 raise UniqueCodeGenerationError() 

130 return random_short_code