Coverage for app / core / key_builder.py: 100%

17 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-21 22:41 +0300

1import hashlib 

2 

3from fastapi import Request, Response 

4from typing import Callable, Optional, Any 

5from sqlalchemy.ext.asyncio import AsyncSession 

6from app.models import User 

7 

8def redis_custom_key_builder( 

9 func: Callable[..., Any], 

10 namespace: str = "", 

11 *, 

12 request: Optional[Request] = None, 

13 response: Optional[Response] = None, 

14 args: tuple[Any, ...], 

15 kwargs: dict[str, Any], 

16) -> str: 

17 """ 

18 Создаёт ключи для получения доступа к данным в хранилище `Redis`. 

19 При формировании ключей учитывает только те аргументы функций-обработчиков, 

20 которые необходимы для корректной работы кэша. 

21 """ 

22 # исключаем аргументы, которые не должны влиять на уникальность ключа 

23 types_to_exclude = (AsyncSession,) 

24 kw_cache = {} 

25 

26 for k, v in kwargs.items(): 

27 if isinstance(v, types_to_exclude): 

28 continue 

29 elif isinstance(v, User): 

30 kw_cache["user"] = v.id # извлекаем id пользователя 

31 else: 

32 kw_cache[k] = str(v) 

33 

34 # сортируем словарь по ключам, чтобы порядок ключей не влиял 

35 # на формирование `cache_key` 

36 kw_cache_sorted_lst = sorted(kw_cache.items()) 

37 

38 cache_key = hashlib.md5( 

39 f"{func.__module__}:{func.__name__}:{args}:{kw_cache_sorted_lst}".encode() 

40 ).hexdigest() 

41 return f"{namespace}:{cache_key}"