shopbot/bot/database/db.py
2024-12-26 20:43:03 +03:00

76 lines
2.6 KiB
Python

import logging
from typing import AsyncGenerator, Optional
from environs import Env
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.future import select
from sqlalchemy.orm import sessionmaker
from shopbot.bot.database.models import Base, UserModel
env = Env()
env.read_env()
DATABASE_URL = env("DATABASE_URL")
engine = create_async_engine(DATABASE_URL, echo=False)
async_session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
async def get_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session as session:
yield session
async def initialize_database():
try:
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
except SQLAlchemyError as e:
logging.error(f"Database error during initialization: {e}", exc_info=True)
except Exception as e:
logging.error(
f"An unexpected error occurred during initialization: {e}", exc_info=True
)
async def register_user(tg_id: int, username: Optional[str] = None):
try:
async with async_session() as session:
stmt = (
insert(UserModel)
.values(tg_id=tg_id, username=username, balance=0)
.on_conflict_do_nothing(index_elements=["tg_id"])
)
await session.execute(stmt)
await session.commit()
logging.info(f"User with tg_id {tg_id} registered.")
except SQLAlchemyError as e:
logging.error(f"Database error during user registration: {e}", exc_info=True)
except Exception as e:
logging.error(
f"An unexpected error occurred during user registration: {e}", exc_info=True
)
async def update_balance(tg_id: int, amount: int):
try:
async with async_session() as session:
async with session.begin():
user = await session.scalar(
select(UserModel).where(UserModel.tg_id == tg_id)
)
if user:
user.balance += amount
logging.info(f"User {tg_id} balance updated by {amount} rub.")
else:
logging.warning(f"User with tg_id {tg_id} not found.")
except SQLAlchemyError as e:
logging.error(f"Database error during balance update: {e}", exc_info=True)
except Exception as e:
logging.error(
f"An unexpected error occurred during balance update: {e}", exc_info=True
)