Spaces:
Sleeping
Sleeping
File size: 3,670 Bytes
9390ea2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
import logging
from fastapi import HTTPException
from sqlalchemy.orm import Session
from components.dbo.models.log import Log as LogSQL
from schemas.log import LogCreateSchema, LogFilterSchema, LogSchema, PaginatedLogResponse
logger = logging.getLogger(__name__)
class LogService:
"""
Сервис для работы с параметрами LLM.
"""
def __init__(self, db: Session):
logger.info("LogService initializing")
self.db = db
def create(self, log_schema: LogCreateSchema):
logger.info("Creating a new log")
with self.db() as session:
new_log: LogSQL = LogSQL(**log_schema.model_dump())
session.add(new_log)
session.commit()
session.refresh(new_log)
return LogSchema(**new_log.to_dict())
def get_list(self, filters: LogFilterSchema) -> PaginatedLogResponse:
logger.info(f"Fetching logs with filters: {filters.model_dump(exclude_none=True)}")
with self.db() as session:
query = session.query(LogSQL)
# Применение фильтра по user_name
if filters.user_name:
query = query.filter(LogSQL.user_name == filters.user_name)
# Применение фильтра по диапазону date_created
if filters.date_from:
query = query.filter(LogSQL.date_created >= filters.date_from)
if filters.date_to:
query = query.filter(LogSQL.date_created <= filters.date_to)
total = query.count()
# Применение пагинации
offset = (filters.page - 1) * filters.page_size
logs = query.offset(offset).limit(filters.page_size).all()
# Вычисление общего количества страниц
total_pages = (total + filters.page_size - 1) // filters.page_size
# Формирование ответа
return PaginatedLogResponse(
data=[LogSchema(**log.to_dict()) for log in logs],
total=total,
page=filters.page,
page_size=filters.page_size,
total_pages=total_pages
)
def get_by_id(self, id: int) -> LogSchema:
with self.db() as session:
log: LogSQL = session.query(LogSQL).filter(LogSQL.id == id).first()
if not log:
raise HTTPException(
status_code=400, detail=f"Item with id {id} not found"
)
return LogSchema(**log.to_dict())
def update(self, id: int, new_log: LogSchema):
logger.info("Updating log")
with self.db() as session:
log: LogSQL = session.query(LogSQL).filter(LogSQL.id == id).first()
if not log:
raise HTTPException(
status_code=400, detail=f"Item with id {id} not found"
)
update_data = new_log.model_dump(exclude_unset=True)
for key, value in update_data.items():
if hasattr(log, key):
setattr(log, key, value)
session.commit()
session.refresh(log)
return log
def delete(self, id: int):
logger.info("Deleting log: {id}")
with self.db() as session:
log_to_del: LogSQL = session.query(LogSQL).get(id)
session.delete(log_to_del)
session.commit()
|