muryshev's picture
update
9390ea2
raw
history blame
3.67 kB
import logging
from fastapi import HTTPException
from sqlalchemy.orm import Session
from components.dbo.models.log import Log as LogSQL
from schemas.log import LogCreateSchema, LogFilterSchema, LogSchema, PaginatedLogResponse
logger = logging.getLogger(__name__)
class LogService:
"""
Сервис для работы с параметрами LLM.
"""
def __init__(self, db: Session):
logger.info("LogService initializing")
self.db = db
def create(self, log_schema: LogCreateSchema):
logger.info("Creating a new log")
with self.db() as session:
new_log: LogSQL = LogSQL(**log_schema.model_dump())
session.add(new_log)
session.commit()
session.refresh(new_log)
return LogSchema(**new_log.to_dict())
def get_list(self, filters: LogFilterSchema) -> PaginatedLogResponse:
logger.info(f"Fetching logs with filters: {filters.model_dump(exclude_none=True)}")
with self.db() as session:
query = session.query(LogSQL)
# Применение фильтра по user_name
if filters.user_name:
query = query.filter(LogSQL.user_name == filters.user_name)
# Применение фильтра по диапазону date_created
if filters.date_from:
query = query.filter(LogSQL.date_created >= filters.date_from)
if filters.date_to:
query = query.filter(LogSQL.date_created <= filters.date_to)
total = query.count()
# Применение пагинации
offset = (filters.page - 1) * filters.page_size
logs = query.offset(offset).limit(filters.page_size).all()
# Вычисление общего количества страниц
total_pages = (total + filters.page_size - 1) // filters.page_size
# Формирование ответа
return PaginatedLogResponse(
data=[LogSchema(**log.to_dict()) for log in logs],
total=total,
page=filters.page,
page_size=filters.page_size,
total_pages=total_pages
)
def get_by_id(self, id: int) -> LogSchema:
with self.db() as session:
log: LogSQL = session.query(LogSQL).filter(LogSQL.id == id).first()
if not log:
raise HTTPException(
status_code=400, detail=f"Item with id {id} not found"
)
return LogSchema(**log.to_dict())
def update(self, id: int, new_log: LogSchema):
logger.info("Updating log")
with self.db() as session:
log: LogSQL = session.query(LogSQL).filter(LogSQL.id == id).first()
if not log:
raise HTTPException(
status_code=400, detail=f"Item with id {id} not found"
)
update_data = new_log.model_dump(exclude_unset=True)
for key, value in update_data.items():
if hasattr(log, key):
setattr(log, key, value)
session.commit()
session.refresh(log)
return log
def delete(self, id: int):
logger.info("Deleting log: {id}")
with self.db() as session:
log_to_del: LogSQL = session.query(LogSQL).get(id)
session.delete(log_to_del)
session.commit()