Spaces:
Sleeping
Sleeping
import logging | |
from fastapi import HTTPException | |
from sqlalchemy.orm import Session | |
from components.dbo.models.log import Log as LogSQL | |
from schemas.log import LogCreateSchema, LogFilterSchema, LogSchema, PaginatedLogResponse | |
logger = logging.getLogger(__name__) | |
class LogService: | |
""" | |
Сервис для работы с параметрами LLM. | |
""" | |
def __init__(self, db: Session): | |
logger.info("LogService initializing") | |
self.db = db | |
def create(self, log_schema: LogCreateSchema): | |
logger.info("Creating a new log") | |
with self.db() as session: | |
new_log: LogSQL = LogSQL(**log_schema.model_dump()) | |
session.add(new_log) | |
session.commit() | |
session.refresh(new_log) | |
return LogSchema(**new_log.to_dict()) | |
def get_list(self, filters: LogFilterSchema) -> PaginatedLogResponse: | |
logger.info(f"Fetching logs with filters: {filters.model_dump(exclude_none=True)}") | |
with self.db() as session: | |
query = session.query(LogSQL) | |
# Применение фильтра по user_name | |
if filters.user_name: | |
query = query.filter(LogSQL.user_name == filters.user_name) | |
# Применение фильтра по диапазону date_created | |
if filters.date_from: | |
query = query.filter(LogSQL.date_created >= filters.date_from) | |
if filters.date_to: | |
query = query.filter(LogSQL.date_created <= filters.date_to) | |
total = query.count() | |
# Применение пагинации | |
offset = (filters.page - 1) * filters.page_size | |
logs = query.offset(offset).limit(filters.page_size).all() | |
# Вычисление общего количества страниц | |
total_pages = (total + filters.page_size - 1) // filters.page_size | |
# Формирование ответа | |
return PaginatedLogResponse( | |
data=[LogSchema(**log.to_dict()) for log in logs], | |
total=total, | |
page=filters.page, | |
page_size=filters.page_size, | |
total_pages=total_pages | |
) | |
def get_by_id(self, id: int) -> LogSchema: | |
with self.db() as session: | |
log: LogSQL = session.query(LogSQL).filter(LogSQL.id == id).first() | |
if not log: | |
raise HTTPException( | |
status_code=400, detail=f"Item with id {id} not found" | |
) | |
return LogSchema(**log.to_dict()) | |
def update(self, id: int, new_log: LogSchema): | |
logger.info("Updating log") | |
with self.db() as session: | |
log: LogSQL = session.query(LogSQL).filter(LogSQL.id == id).first() | |
if not log: | |
raise HTTPException( | |
status_code=400, detail=f"Item with id {id} not found" | |
) | |
update_data = new_log.model_dump(exclude_unset=True) | |
for key, value in update_data.items(): | |
if hasattr(log, key): | |
setattr(log, key, value) | |
session.commit() | |
session.refresh(log) | |
return log | |
def delete(self, id: int): | |
logger.info("Deleting log: {id}") | |
with self.db() as session: | |
log_to_del: LogSQL = session.query(LogSQL).get(id) | |
session.delete(log_to_del) | |
session.commit() | |