Coverage for app / core / tasks.py: 37%

19 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-21 22:41 +0300

1from apscheduler.schedulers.asyncio import AsyncIOScheduler 

2from sqlalchemy import delete 

3from datetime import datetime, UTC 

4 

5from app.core.database import AsyncSessionLocal 

6from app.models import Link 

7 

8# асинхронный планировщик задач 

9scheduler = AsyncIOScheduler() 

10 

11async def delete_expired_links(): 

12 """ 

13 Удаляет просроченные ссылки. 

14 """ 

15 async with AsyncSessionLocal() as session: 

16 try: 

17 now = datetime.now(tz=UTC).replace(tzinfo=None) 

18 query = delete(Link).where(Link.expires_at < now) 

19 result = await session.execute(query) 

20 await session.commit() 

21 cntr = result.rowcount 

22 if cntr > 0: 

23 print(f"APScheduler: {cntr} links deleted") 

24 except Exception: 

25 await session.rollback() 

26 print("APScheduler error")