""" SRT Generator (Belarusian Edition) – fixed version """ from __future__ import annotations import json import logging import mimetypes import os import re import threading import uuid from datetime import datetime from pathlib import Path from typing import Callable, List, Tuple import gradio as gr from google import genai from google.genai import types from pydub import AudioSegment # --------------------------------------------------------------------------- # LOGGING # --------------------------------------------------------------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler("transcription.log", encoding="utf-8"), logging.StreamHandler(), ], ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # ENVIRONMENT # --------------------------------------------------------------------------- API_KEY = os.getenv("gembeh") MODEL_NAME = os.getenv("mod") PROMPT_TEXT = os.getenv("p", "") if not API_KEY or not MODEL_NAME: raise EnvironmentError("Set env vars 'gembeh' (API key) and 'mod' (model name)") CLIENT = genai.Client(api_key=API_KEY) # --------------------------------------------------------------------------- # GEMINI CONFIG # --------------------------------------------------------------------------- GEN_CFG = types.GenerateContentConfig( thinking_config=types.ThinkingConfig(thinking_budget=-1), temperature=0.35, response_mime_type="application/json", system_instruction=[types.Part.from_text(text=PROMPT_TEXT)], ) # --------------------------------------------------------------------------- # CONSTANTS # --------------------------------------------------------------------------- MAX_MB = 600 # maximum upload size (MiB) ALLOWED_AUDIO_PREFIX = ("audio/",) ALLOWED_VIDEO_PREFIX = ("video/",) HISTORY = Path("transcripts"); HISTORY.mkdir(exist_ok=True) TEXT_KEYS = ("text", "text_raw") # accepted keys for transcript text # --------------------------------------------------------------------------- # REGEXES FOR TIME PARSING # --------------------------------------------------------------------------- _RE_HMS_MS = re.compile(r"^(?:(\d{1,2}):)?(\d{1,2}):(\d{1,2})[.,](\d{1,3})$") # HH:MM:SS,ms _RE_MS_MS = re.compile(r"^(\d{1,2}):(\d{1,2})[.,](\d{1,3})$") # MM:SS,ms _RE_SECONDS = re.compile(r"^\d+(?:[.,]\d+)?$") # SS[.ms] # --------------------------------------------------------------------------- # HELPERS # --------------------------------------------------------------------------- def _validate(path: str, mime_prefixes: tuple[str, ...]) -> None: """Validate file (existence, size, mime).""" if not path or not os.path.isfile(path): raise ValueError("Файл не знойдзены.") if os.path.getsize(path) / 1_048_576 > MAX_MB: raise ValueError("Файл занадта вялікі.") mime, _ = mimetypes.guess_type(path) if not mime or not mime.startswith(mime_prefixes): raise ValueError(f"Непадтрыманы тып файла: {mime or 'невядомы' }.") def _parse_raw_time(raw: float | int | str) -> float: """Convert supported time formats → seconds (float).""" if isinstance(raw, (int, float)): return float(raw) s = str(raw).strip() if not s: return 0.0 if (m := _RE_HMS_MS.match(s)): h, m_, sec, ms = (int(x or 0) for x in m.groups()) return h * 3600 + m_ * 60 + sec + ms / 1_000 if (m := _RE_MS_MS.match(s)): m_, sec, ms = (int(x) for x in m.groups()) return m_ * 60 + sec + ms / 1_000 if _RE_SECONDS.match(s): return float(s.replace(",", ".")) raise ValueError(f"Невядомы фармат часу: {raw!r}") def _sec_to_ts(raw: float | int | str) -> str: sec = _parse_raw_time(raw) h, rem = divmod(sec, 3600) m, rem = divmod(rem, 60) s_int = int(rem) ms_int = int(round((rem - s_int) * 1000)) return f"{int(h):02d}:{int(m):02d}:{s_int:02d},{ms_int:03d}" # --------------------------------------------------------------------------- # SANITISATION # --------------------------------------------------------------------------- def _sanitize_segments(raw_segments: list[dict]) -> list[dict]: """Ensure segments are consistent: start < end, non‑overlapping.""" fixed: list[dict] = [] prev_end = 0.0 for idx, seg in enumerate(raw_segments, 1): start = _parse_raw_time(seg["start"]) end = _parse_raw_time(seg["end"]) text = seg["text"] # Swap if necessary if end < start: logger.warning("Segment %s: end < start – swapping", idx) start, end = end, start # Shift if overlap if start < prev_end: logger.warning("Segment %s: overlap – shifting", idx) start = prev_end + 0.001 if end <= start: end = start + 1.0 fixed.append({"start": start, "end": end, "text": text}) prev_end = end return fixed # --------------------------------------------------------------------------- # GEMINI TRANSCRIPTION # --------------------------------------------------------------------------- def _transcribe(path: str, status: Callable[[str], None]) -> str: mime, _ = mimetypes.guess_type(path) logger.info("System prompt: %s", PROMPT_TEXT) status("🔍 Пачынаем транскрыпцыю …") with open(path, "rb") as f: audio_bytes = f.read() user_content = types.Content(role="user", parts=[types.Part.from_bytes(data=audio_bytes, mime_type=mime)]) chunks = CLIENT.models.generate_content_stream(model=MODEL_NAME, contents=[user_content], config=GEN_CFG) text = "".join(ch.text or "" for ch in chunks) logger.info("Gemini raw response (first 5k): %s", text[:5000]) return text # --------------------------------------------------------------------------- # PIPELINE # --------------------------------------------------------------------------- def _extract_text(seg: dict) -> str: for key in TEXT_KEYS: if seg.get(key): return str(seg[key]).strip() return "" def transcribe_audio(path: str, status: Callable[[str], None]): _validate(path, ALLOWED_AUDIO_PREFIX) stop = threading.Event() def spinner(): frames = ["⏳", "⏳.", "⏳..", "⏳..."] while not stop.is_set(): for f in frames: status(f"Транскрыпцыя ідзе {f}") if stop.wait(0.6): break threading.Thread(target=spinner, daemon=True).start() try: raw = _transcribe(path, status) finally: stop.set() if not raw: raise RuntimeError("❌ Пусты адказ ад мадэлі.") tag = datetime.now().strftime("%Y%m%d_%H%M%S") (HISTORY / f"response_{tag}.txt").write_text(raw, "utf-8") status("📥 Апрацоўка транскрыпцыі …") try: segments_json = json.loads(raw) except json.JSONDecodeError as exc: raise ValueError("Gemini response is not valid JSON – see logs.") from exc raw_segments: list[dict] = [] for idx, seg in enumerate(segments_json, 1): if not {"start", "end"}.issubset(seg): logger.warning("Segment #%s missing timing – skipped", idx) continue txt = _extract_text(seg) if not txt: logger.warning("Segment #%s empty text – skipped", idx) continue raw_segments.append({"start": seg["start"], "end": seg["end"], "text": txt}) if not raw_segments: raise ValueError("Gemini returned no usable segments – cannot build SRT.") # --- NEW: sanitise timings --- return _sanitize_segments(raw_segments) def transcripts_to_srt(segments: List[dict]) -> Tuple[str, str]: lines: list[str] = [] for idx, seg in enumerate(segments, 1): lines.append( f"{idx}\n{_sec_to_ts(seg['start'])} --> {_sec_to_ts(seg['end'])}\n{seg['text']}\n" ) content = "\n".join(lines) out_path = HISTORY / f"subtitles_{datetime.now().strftime('%Y%m%d_%H%M%S')}.srt" out_path.write_text(content, "utf-8") return content, str(out_path) # --------------------------------------------------------------------------- # AUDIO / VIDEO HELPERS # --------------------------------------------------------------------------- def extract_audio(video: str, status: Callable[[str], None]): _validate(video, ALLOWED_VIDEO_PREFIX) status("🎞 Вылучаем аўдыё з відэа …") audio = AudioSegment.from_file(video) path = f"extracted_{uuid.uuid4().hex}.mp3" audio.export(path, format="mp3") status("✅ Аўдыё вылучана.") return path def process_audio(path: str, status: Callable[[str], None]): segments = transcribe_audio(path, status) status("📝 Канвертацыя ў SRT …") return transcripts_to_srt(segments) def process_video(path: str, status: Callable[[str], None]): return process_audio(extract_audio(path, status), status) def handle_file(audio: str | None, video: str | None, status: Callable[[str], None]): status("🔄 Пачатак апрацоўкі …") if audio: return process_audio(audio, status) if video: return process_video(video, status) raise ValueError("Ні адзін файл не загружаны.") # --------------------------------------------------------------------------- # GRADIO UI # --------------------------------------------------------------------------- def build_ui(): with gr.Blocks(title="Gemini SRT Generator (Belarusian Edition)") as demo: gr.Markdown( """ ## Загрузіце аўдыё або відэа — атрымаеце SRT‑субцітры [Суполка беларускага ШІ](https://t.me/belarusai) • [Buy Me A Coffee](https://buymeacoffee.com/tuteishygpt) """ ) with gr.Row(): audio_in = gr.Audio(type="filepath", label="🎙 Аўдыёфайл") video_in = gr.Video(label="🎥 Відэафайл") btn = gr.Button("🚀 Апрацаваць") with gr.Row(): txt_out = gr.Textbox(label="📄 SRT‑транскрыпцыя", lines=14, autoscroll=True) file_out = gr.File(label="⬇️ SRT‑файл") status_box = gr.Textbox(label="🛠️ Статус", interactive=False, autoscroll=True) def runner(a: str | None, v: str | None): def set_status(msg: str): status_box.value = msg return handle_file(a, v, set_status) btn.click(runner, [audio_in, video_in], [txt_out, file_out]) demo.queue(max_size=40, default_concurrency_limit=8) return demo def main(): build_ui().launch() if __name__ == "__main__": main()