76 lines
2.6 KiB
Python
76 lines
2.6 KiB
Python
import logging
|
|
from typing import AsyncGenerator, Optional
|
|
|
|
from environs import Env
|
|
from sqlalchemy.dialects.postgresql import insert
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
|
|
from sqlalchemy.future import select
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
from shopbot.bot.database.models import Base, UserModel
|
|
|
|
env = Env()
|
|
env.read_env()
|
|
|
|
DATABASE_URL = env("DATABASE_URL")
|
|
|
|
engine = create_async_engine(DATABASE_URL, echo=False)
|
|
async_session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
|
|
|
|
|
|
async def get_session() -> AsyncGenerator[AsyncSession, None]:
|
|
async with async_session as session:
|
|
yield session
|
|
|
|
|
|
async def initialize_database():
|
|
try:
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(Base.metadata.create_all)
|
|
except SQLAlchemyError as e:
|
|
logging.error(f"Database error during initialization: {e}", exc_info=True)
|
|
except Exception as e:
|
|
logging.error(
|
|
f"An unexpected error occurred during initialization: {e}", exc_info=True
|
|
)
|
|
|
|
|
|
async def register_user(tg_id: int, username: Optional[str] = None):
|
|
try:
|
|
async with async_session() as session:
|
|
stmt = (
|
|
insert(UserModel)
|
|
.values(tg_id=tg_id, username=username, balance=0)
|
|
.on_conflict_do_nothing(index_elements=["tg_id"])
|
|
)
|
|
await session.execute(stmt)
|
|
await session.commit()
|
|
logging.info(f"User with tg_id {tg_id} registered.")
|
|
except SQLAlchemyError as e:
|
|
logging.error(f"Database error during user registration: {e}", exc_info=True)
|
|
except Exception as e:
|
|
logging.error(
|
|
f"An unexpected error occurred during user registration: {e}", exc_info=True
|
|
)
|
|
|
|
|
|
async def update_balance(tg_id: int, amount: int):
|
|
try:
|
|
async with async_session() as session:
|
|
async with session.begin():
|
|
user = await session.scalar(
|
|
select(UserModel).where(UserModel.tg_id == tg_id)
|
|
)
|
|
if user:
|
|
user.balance += amount
|
|
logging.info(f"User {tg_id} balance updated by {amount} rub.")
|
|
else:
|
|
logging.warning(f"User with tg_id {tg_id} not found.")
|
|
except SQLAlchemyError as e:
|
|
logging.error(f"Database error during balance update: {e}", exc_info=True)
|
|
except Exception as e:
|
|
logging.error(
|
|
f"An unexpected error occurred during balance update: {e}", exc_info=True
|
|
)
|