doc-mcp / main.py
mdabidhussain's picture
created doc-mcp
56f7920
raw
history blame
46.3 kB
import asyncio
import os
import time
from typing import Dict, List
import gradio as gr
from dotenv import load_dotenv
from llama_index.core import Settings
from llama_index.core.text_splitter import SentenceSplitter
from rag.config import embed_model, get_available_repos, llm
from rag.github_file_loader import \
fetch_markdown_files as fetch_files_with_loader, load_github_files
from rag.ingest import ingest_documents_async
load_dotenv()
Settings.llm = llm
Settings.embed_model = embed_model
Settings.node_parser = SentenceSplitter(chunk_size=3072)
def get_available_repositories():
return get_available_repos()
def start_file_loading(
repo_url: str, selected_files: List[str], current_progress: Dict
):
"""Step 1: Load files from GitHub"""
print("\nπŸ”„ STARTING FILE LOADING STEP")
print(f"πŸ“ Repository: {repo_url}")
print(f"πŸ“‹ Selected files: {selected_files}")
if not selected_files:
return {
"status": "error",
"message": "❌ No files selected for loading",
"progress": 0,
"details": "",
"step": "file_loading",
}
total_files = len(selected_files)
start_time = time.time()
# Parse repo name from URL
if "github.com" in repo_url:
repo_name = (
repo_url.replace("https://github.com/", "")
.replace("http://github.com/", "")
.strip("/")
)
if "/" not in repo_name:
return {
"status": "error",
"message": "❌ Invalid repository URL format",
"progress": 0,
"details": "",
"step": "file_loading",
}
else:
repo_name = repo_url.strip()
try:
batch_size = 25
all_documents = []
all_failed = []
current_progress.update(
{
"status": "loading",
"message": f"πŸš€ Loading files from {repo_name}",
"progress": 0,
"total_files": total_files,
"processed_files": 0,
"phase": "File Loading",
"details": f"Processing {total_files} files in batches...",
"step": "file_loading",
}
)
for i in range(0, len(selected_files), batch_size):
batch = selected_files[i : i + batch_size]
print(f"\nπŸ“¦ PROCESSING BATCH {i // batch_size + 1}")
print(f" Files: {batch}")
# Update progress for current batch
progress_percentage = (i / total_files) * 100
current_progress.update(
{
"progress": progress_percentage,
"processed_files": i,
"current_batch": i // batch_size + 1,
"details": f"Loading batch {i // batch_size + 1}: {', '.join([f.split('/')[-1] for f in batch])}",
}
)
try:
documents, failed = load_github_files(
repo_name=repo_name,
file_paths=batch,
branch="main",
concurrent_requests=10,
github_token=os.getenv("GITHUB_API_KEY"),
)
print("βœ… Load results:")
print(f" - Documents: {len(documents)}")
print(f" - Failed: {len(failed)}")
if documents:
for j, doc in enumerate(documents):
print(f" πŸ“„ Doc {j + 1}: {doc.doc_id}")
print(f" Size: {len(doc.text)} chars")
# Ensure repo metadata is set
if "repo" not in doc.metadata:
doc.metadata["repo"] = repo_name
print(f" βœ… Added repo metadata: {repo_name}")
all_documents.extend(documents)
all_failed.extend(failed)
except Exception as batch_error:
print(f"❌ Batch processing error: {batch_error}")
all_failed.extend(batch)
loading_time = time.time() - start_time
# Store loaded documents in progress state for next step
current_progress.update(
{
"status": "loaded",
"message": f"βœ… File Loading Complete! Loaded {len(all_documents)} documents",
"progress": 100,
"phase": "Files Loaded",
"details": f"Successfully loaded {len(all_documents)} documents in {loading_time:.1f}s",
"step": "file_loading_complete",
"loaded_documents": all_documents, # Store documents for next step
"failed_files": all_failed,
"loading_time": loading_time,
"repo_name": repo_name,
}
)
return current_progress
except Exception as e:
total_time = time.time() - start_time
error_msg = f"❌ File loading error after {total_time:.1f}s: {str(e)}"
print(error_msg)
current_progress.update(
{
"status": "error",
"message": error_msg,
"progress": 0,
"phase": "Failed",
"details": str(e),
"error": str(e),
"step": "file_loading",
}
)
return current_progress
def start_vector_ingestion(current_progress: Dict):
"""Step 2: Ingest loaded documents into vector store"""
print("\nπŸ”„ STARTING VECTOR INGESTION STEP")
# Check if we have loaded documents from previous step
if current_progress.get("step") != "file_loading_complete":
return {
"status": "error",
"message": "❌ No loaded documents found. Please load files first.",
"progress": 0,
"details": "",
"step": "vector_ingestion",
}
all_documents = current_progress.get("loaded_documents", [])
repo_name = current_progress.get("repo_name", "")
if not all_documents:
return {
"status": "error",
"message": "❌ No documents available for vector ingestion",
"progress": 0,
"details": "",
"step": "vector_ingestion",
}
vector_start_time = time.time()
# Update state for vector store phase
current_progress.update(
{
"status": "vectorizing",
"message": "πŸ”„ Generating embeddings and storing in vector database",
"progress": 0,
"phase": "Vector Store Ingestion",
"details": f"Processing {len(all_documents)} documents for embedding...",
"step": "vector_ingestion",
}
)
try:
print("πŸ”„ STARTING VECTOR STORE INGESTION")
print(f" Repository: {repo_name}")
print(f" Documents to process: {len(all_documents)}")
# Call the async ingestion function with repo name
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(ingest_documents_async(all_documents, repo_name))
finally:
loop.close()
vector_time = time.time() - vector_start_time
loading_time = current_progress.get("loading_time", 0)
total_time = loading_time + vector_time
print(f"βœ… Vector ingestion completed in {vector_time:.2f} seconds")
failed_files_data = current_progress.get("failed_files", [])
if isinstance(failed_files_data, list):
failed_files_count = len(failed_files_data)
else:
failed_files_count = failed_files_data if isinstance(failed_files_data, int) else 0
# Update final success state with repository update flag
current_progress.update(
{
"status": "complete",
"message": "βœ… Complete Ingestion Pipeline Finished!",
"progress": 100,
"phase": "Complete",
"details": f"Successfully processed {len(all_documents)} documents for {repo_name}",
"step": "complete",
"total_time": total_time,
"documents_processed": len(all_documents),
"failed_files_count": failed_files_count, # Use count instead of trying len()
"failed_files": failed_files_data, # Keep original data
"vector_time": vector_time,
"loading_time": loading_time,
"repo_name": repo_name,
"repository_updated": True, # Flag to trigger repo list refresh
}
)
return current_progress
except Exception as ingest_error:
vector_time = time.time() - vector_start_time
print(f"❌ Vector ingestion failed after {vector_time:.2f} seconds")
print(f"❌ Error: {ingest_error}")
# Get failed files data safely
failed_files_data = current_progress.get("failed_files", [])
if isinstance(failed_files_data, list):
failed_files_count = len(failed_files_data)
else:
failed_files_count = failed_files_data if isinstance(failed_files_data, int) else 0
current_progress.update(
{
"status": "error",
"message": "❌ Vector Store Ingestion Failed",
"progress": 0,
"phase": "Failed",
"details": f"Error: {str(ingest_error)}",
"error": str(ingest_error),
"step": "vector_ingestion",
"failed_files_count": failed_files_count,
"failed_files": failed_files_data,
}
)
return current_progress
def start_file_loading_generator(
repo_url: str, selected_files: List[str], current_progress: Dict
):
"""Step 1: Load files from GitHub with yield-based real-time updates"""
print("\nπŸ”„ STARTING FILE LOADING STEP")
print(f"πŸ“ Repository: {repo_url}")
print(f"πŸ“‹ Selected files: {len(selected_files)} files")
if not selected_files:
error_progress = {
"status": "error",
"message": "❌ No files selected for loading",
"progress": 0,
"details": "Please select at least one file to proceed.",
"step": "file_loading",
}
yield error_progress
return error_progress
total_files = len(selected_files)
start_time = time.time()
# Parse repo name from URL
if "github.com" in repo_url:
repo_name = (
repo_url.replace("https://github.com/", "")
.replace("http://github.com/", "")
.strip("/")
)
if "/" not in repo_name:
error_progress = {
"status": "error",
"message": "❌ Invalid repository URL format",
"progress": 0,
"details": "Expected format: owner/repo or https://github.com/owner/repo",
"step": "file_loading",
}
yield error_progress
return error_progress
else:
repo_name = repo_url.strip()
try:
batch_size = 10
all_documents = []
all_failed = []
# Initial progress update
initial_progress = {
"status": "loading",
"message": f"πŸš€ Starting file loading from {repo_name}",
"progress": 0,
"total_files": total_files,
"processed_files": 0,
"successful_files": 0,
"failed_files": 0,
"phase": "File Loading",
"details": f"Preparing to load {total_files} files in batches of {batch_size}...",
"step": "file_loading",
"current_batch": 0,
"total_batches": (len(selected_files) + batch_size - 1) // batch_size,
"repo_name": repo_name,
}
yield initial_progress
time.sleep(0.5)
for i in range(0, len(selected_files), batch_size):
batch = selected_files[i : i + batch_size]
current_batch_num = i // batch_size + 1
total_batches = (len(selected_files) + batch_size - 1) // batch_size
# Update progress at batch start
batch_start_progress = {
"status": "loading",
"message": f"πŸ”„ Loading batch {current_batch_num}/{total_batches}",
"progress": (i / total_files) * 90,
"processed_files": i,
"successful_files": len(all_documents),
"failed_files": len(all_failed),
"current_batch": current_batch_num,
"total_batches": total_batches,
"phase": "File Loading",
"details": f"Processing batch {current_batch_num}: {', '.join([f.split('/')[-1] for f in batch[:3]])}{'...' if len(batch) > 3 else ''}",
"step": "file_loading",
"repo_name": repo_name,
}
yield batch_start_progress
try:
print(f"\nπŸ“¦ PROCESSING BATCH {current_batch_num}/{total_batches}")
print(f" Files: {[f.split('/')[-1] for f in batch]}")
documents, failed = load_github_files(
repo_name=repo_name,
file_paths=batch,
branch="main",
concurrent_requests=10,
github_token=os.getenv("GITHUB_API_KEY"),
)
print("βœ… Load results:")
print(f" - Documents: {len(documents)}")
print(f" - Failed: {len(failed)}")
# Process documents
for j, doc in enumerate(documents):
print(f" πŸ“„ Doc {j + 1}: {doc.doc_id}")
print(f" Size: {len(doc.text)} chars")
if "repo" not in doc.metadata:
doc.metadata["repo"] = repo_name
print(f" βœ… Added repo metadata: {repo_name}")
all_documents.extend(documents)
all_failed.extend(failed)
# Update progress after batch completion
batch_complete_progress = {
"status": "loading",
"message": f"βœ… Completed batch {current_batch_num}/{total_batches}",
"progress": ((i + len(batch)) / total_files) * 90,
"processed_files": i + len(batch),
"successful_files": len(all_documents),
"failed_files": len(all_failed),
"current_batch": current_batch_num,
"total_batches": total_batches,
"phase": "File Loading",
"details": f"βœ… Batch {current_batch_num} complete: {len(documents)} loaded, {len(failed)} failed. Total progress: {len(all_documents)} documents loaded.",
"step": "file_loading",
"repo_name": repo_name,
}
yield batch_complete_progress
time.sleep(0.3)
except Exception as batch_error:
print(f"❌ Batch processing error: {batch_error}")
all_failed.extend(batch)
error_progress = {
"status": "loading",
"message": f"⚠️ Error in batch {current_batch_num}",
"progress": ((i + len(batch)) / total_files) * 90,
"processed_files": i + len(batch),
"successful_files": len(all_documents),
"failed_files": len(all_failed),
"current_batch": current_batch_num,
"phase": "File Loading",
"details": f"❌ Batch {current_batch_num} error: {str(batch_error)[:100]}... Continuing with next batch.",
"step": "file_loading",
"repo_name": repo_name,
}
yield error_progress
loading_time = time.time() - start_time
# Final completion update
completion_progress = {
"status": "loaded",
"message": f"βœ… File Loading Complete! Loaded {len(all_documents)} documents",
"progress": 100,
"phase": "Files Loaded Successfully",
"details": f"🎯 Final Results:\nβœ… Successfully loaded: {len(all_documents)} documents\n❌ Failed files: {len(all_failed)}\n⏱️ Total time: {loading_time:.1f}s\nπŸ“Š Success rate: {(len(all_documents)/(len(all_documents)+len(all_failed))*100):.1f}%",
"step": "file_loading_complete",
"loaded_documents": all_documents,
"failed_files": all_failed,
"loading_time": loading_time,
"repo_name": repo_name,
"total_files": total_files,
"processed_files": total_files,
"successful_files": len(all_documents),
}
yield completion_progress
return completion_progress
except Exception as e:
total_time = time.time() - start_time
error_msg = f"❌ File loading error after {total_time:.1f}s: {str(e)}"
print(error_msg)
error_progress = {
"status": "error",
"message": error_msg,
"progress": 0,
"phase": "Loading Failed",
"details": f"Critical error during file loading:\n{str(e)}",
"error": str(e),
"step": "file_loading",
}
yield error_progress
return error_progress
# Progress display component
def format_progress_display(progress_state: Dict) -> str:
"""Format progress state into readable display with enhanced details"""
if not progress_state:
return "πŸš€ Ready to start ingestion...\n\nπŸ“‹ **Two-Step Process:**\n1️⃣ Load files from GitHub repository\n2️⃣ Generate embeddings and store in vector database"
status = progress_state.get("status", "unknown")
message = progress_state.get("message", "")
progress = progress_state.get("progress", 0)
phase = progress_state.get("phase", "")
details = progress_state.get("details", "")
# Enhanced progress bar
filled = int(progress / 2.5) # 40 chars total
progress_bar = "β–ˆ" * filled + "β–‘" * (40 - filled)
# Status emoji mapping
status_emoji = {
"loading": "⏳",
"loaded": "βœ…",
"vectorizing": "🧠",
"complete": "πŸŽ‰",
"error": "❌"
}
emoji = status_emoji.get(status, "πŸ”„")
output = f"{emoji} **{message}**\n\n"
# Phase and progress section
output += f"πŸ“Š **Current Phase:** {phase}\n"
output += f"πŸ“ˆ **Progress:** {progress:.1f}%\n"
output += f"[{progress_bar}] {progress:.1f}%\n\n"
# Step-specific details for file loading
if progress_state.get("step") == "file_loading":
processed = progress_state.get("processed_files", 0)
total = progress_state.get("total_files", 0)
successful = progress_state.get("successful_files", 0)
failed = progress_state.get("failed_files", 0)
if total > 0:
output += "πŸ“ **File Processing Status:**\n"
output += f" β€’ Total files: {total}\n"
output += f" β€’ Processed: {processed}/{total}\n"
output += f" β€’ βœ… Successful: {successful}\n"
output += f" β€’ ❌ Failed: {failed}\n"
if "current_batch" in progress_state and "total_batches" in progress_state:
output += f" β€’ πŸ“¦ Current batch: {progress_state['current_batch']}/{progress_state['total_batches']}\n"
output += "\n"
# Step-specific details for vector ingestion
elif progress_state.get("step") == "vector_ingestion":
docs_count = progress_state.get("documents_count", 0)
repo_name = progress_state.get("repo_name", "Unknown")
if docs_count > 0:
output += "🧠 **Vector Processing Status:**\n"
output += f" β€’ Repository: {repo_name}\n"
output += f" β€’ Documents: {docs_count:,}\n"
output += f" β€’ Stage: {phase}\n\n"
# Detailed information
output += f"πŸ“ **Details:**\n{details}\n"
# Final summary for completion
if status == "complete":
total_time = progress_state.get("total_time", 0)
docs_processed = progress_state.get("documents_processed", 0)
failed_files = progress_state.get("failed_files", 0)
vector_time = progress_state.get("vector_time", 0)
loading_time = progress_state.get("loading_time", 0)
repo_name = progress_state.get("repo_name", "Unknown")
output += "\n🎊 **INGESTION COMPLETED SUCCESSFULLY!**\n"
output += "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n"
output += f"🎯 **Repository:** {repo_name}\n"
output += f"πŸ“„ **Documents processed:** {docs_processed:,}\n"
output += f"❌ **Failed files:** {len(failed_files) if isinstance(failed_files, list) else failed_files}\n"
output += f"⏱️ **Total time:** {total_time:.1f} seconds\n"
output += f" β”œβ”€ File loading: {loading_time:.1f}s\n"
output += f" └─ Vector processing: {vector_time:.1f}s\n"
output += f"πŸ“Š **Processing rate:** {docs_processed/total_time:.1f} docs/second\n\n"
output += "πŸš€ **Next Step:** Go to the 'Query Interface' tab to start asking questions!"
elif status == "error":
error = progress_state.get("error", "Unknown error")
output += "\nπŸ’₯ **ERROR OCCURRED**\n"
output += "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n"
output += f"❌ **Error Details:** {error[:300]}{'...' if len(error) > 300 else ''}\n"
output += "\nπŸ”§ **Troubleshooting Tips:**\n"
output += " β€’ Check your GitHub token permissions\n"
output += " β€’ Verify repository URL format\n"
output += " β€’ Ensure selected files exist\n"
output += " β€’ Check network connectivity\n"
return output
# Create the main Gradio interface
with gr.Blocks(title="Doc-MCP") as demo:
gr.Markdown("# πŸ“šDoc-MCP: Documentation RAG System")
gr.Markdown(
"Transform GitHub documentation repositories into accessible MCP (Model Context Protocol) servers for AI agents. Upload documentation, generate vector embeddings, and query with intelligent context retrieval."
)
# State variables
files_state = gr.State([])
progress_state = gr.State({})
with gr.Tabs():
with gr.TabItem("πŸ“₯ Documentation Ingestion"):
gr.Markdown("### πŸš€ Two-Step Documentation Processing Pipeline")
gr.Markdown(
"**Step 1:** Fetch markdown files from GitHub repository β†’ **Step 2:** Generate vector embeddings and store in MongoDB Atlas"
)
with gr.Row():
with gr.Column(scale=2):
repo_input = gr.Textbox(
label="πŸ“‚ GitHub Repository URL",
placeholder="Enter: owner/repo or https://github.com/owner/repo (e.g., gradio-app/gradio)",
value="",
info="Enter any GitHub repository containing markdown documentation"
)
load_btn = gr.Button("πŸ” Discover Documentation Files", variant="secondary")
with gr.Column(scale=1):
status_output = gr.Textbox(
label="Repository Discovery Status", interactive=False, lines=4,
placeholder="Repository scanning results will appear here..."
)
with gr.Row():
select_all_btn = gr.Button("πŸ“‹ Select All Documents", variant="secondary")
clear_all_btn = gr.Button("πŸ—‘οΈ Clear Selection", variant="secondary")
# File selection
with gr.Accordion(label="Available Documentation Files"):
file_selector = gr.CheckboxGroup(
choices=[], label="Select Markdown Files for RAG Processing", visible=False
)
# Two-step ingestion controls
gr.Markdown("### πŸ”„ RAG Pipeline Execution")
gr.Markdown("Process your documentation through our advanced RAG pipeline using Nebius AI embeddings and MongoDB Atlas vector storage.")
with gr.Row():
with gr.Column():
step1_btn = gr.Button(
"πŸ“₯ Step 1: Load Files from GitHub",
variant="primary",
size="lg",
interactive=False,
)
with gr.Column():
step2_btn = gr.Button(
"πŸ”„ Step 2: Start Ingestion",
variant="primary",
size="lg",
interactive=False,
)
with gr.Row():
refresh_btn = gr.Button("πŸ”„ Refresh Progress", variant="secondary")
reset_btn = gr.Button("πŸ—‘οΈ Reset Progress", variant="secondary")
# Progress display
progress_display = gr.Textbox(
label="πŸ“Š Real-time Ingestion Progress",
interactive=False,
lines=25,
value="πŸš€ Ready to start two-step ingestion process...\n\nπŸ“‹ Steps:\n1️⃣ Load files from GitHub repository\n2️⃣ Generate embeddings and store in vector database",
max_lines=30,
show_copy_button=True,
)
# Event handlers
def load_files_handler(repo_url: str):
if not repo_url.strip():
return (
gr.CheckboxGroup(choices=[], visible=False),
"Please enter a repository URL",
[],
gr.Button(interactive=False),
gr.Button(interactive=False),
)
files, message = fetch_files_with_loader(repo_url)
if files:
return (
gr.CheckboxGroup(
choices=files,
value=[],
label=f"Select Files from {repo_url} ({len(files)} files)",
visible=True,
),
message,
files,
gr.Button(interactive=True), # Enable step 1 button
gr.Button(interactive=False), # Keep step 2 disabled
)
else:
return (
gr.CheckboxGroup(choices=[], visible=False),
message,
[],
gr.Button(interactive=False),
gr.Button(interactive=False),
)
def start_step1_generator(repo_url: str, selected_files: List[str], current_progress: Dict):
"""Start Step 1 with generator-based real-time progress updates"""
for progress_update in start_file_loading_generator(repo_url, selected_files, current_progress.copy()):
progress_text = format_progress_display(progress_update)
step2_enabled = progress_update.get("step") == "file_loading_complete"
yield (
progress_update,
progress_text,
gr.Button(interactive=step2_enabled),
)
def start_step2(current_progress: Dict):
"""Start Step 2: Vector Ingestion"""
new_progress = start_vector_ingestion(current_progress.copy())
progress_text = format_progress_display(new_progress)
return new_progress, progress_text
def refresh_progress(current_progress: Dict):
"""Refresh the progress display"""
progress_text = format_progress_display(current_progress)
return progress_text
def reset_progress():
"""Reset all progress"""
return (
{},
"Ready to start two-step ingestion process...",
gr.Button(interactive=False),
)
def select_all_handler(available_files):
if available_files:
return gr.CheckboxGroup(value=available_files)
return gr.CheckboxGroup(value=[])
def clear_all_handler():
return gr.CheckboxGroup(value=[])
# Wire up events
load_btn.click(
fn=load_files_handler,
inputs=[repo_input],
outputs=[
file_selector,
status_output,
files_state,
step1_btn,
step2_btn,
],
show_api=False,
)
select_all_btn.click(
fn=select_all_handler,
inputs=[files_state],
outputs=[file_selector],
show_api=False,
)
clear_all_btn.click(
fn=clear_all_handler, outputs=[file_selector], show_api=False
)
step1_btn.click(
fn=start_step1_generator,
inputs=[repo_input, file_selector, progress_state],
outputs=[progress_state, progress_display, step2_btn],
show_api=False,
)
step2_btn.click(
fn=start_step2,
inputs=[progress_state],
outputs=[progress_state, progress_display],
show_api=False,
)
refresh_btn.click(
fn=refresh_progress,
inputs=[progress_state],
outputs=[progress_display],
show_api=False,
)
reset_btn.click(
fn=reset_progress,
outputs=[progress_state, progress_display, step2_btn],
show_api=False,
)
# ================================
# Tab 2: Query Interface
# ================================
with gr.TabItem("πŸ€– AI Documentation Assistant"):
gr.Markdown("### πŸ’¬ Intelligent Documentation Q&A")
gr.Markdown(
"Query your processed documentation using advanced semantic search. Get contextual answers with source citations powered by Nebius LLM and vector similarity search."
)
with gr.Row():
with gr.Column(scale=2):
# Repository selection
repo_dropdown = gr.Dropdown(
choices=get_available_repositories(),
label="Select Documentation Repository",
value=None,
interactive=True,
allow_custom_value=False,
)
refresh_repos_btn = gr.Button(
"πŸ”„ Refresh Repositories", variant="secondary", size="sm"
)
# Query mode selection
query_mode = gr.Radio(
choices=["default", "text_search", "hybrid"],
label="Query Mode",
value="default",
info="default: semantic similarity, text_search: keyword-based, hybrid: combines both",
)
# Query input
query_input = gr.Textbox(
label="Your Query",
placeholder="Ask about the documentation...",
lines=3,
)
query_btn = gr.Button("πŸ” Search", variant="primary", size="lg")
# Response display as text area
response_output = gr.Textbox(
label="Response",
value="Your query response will appear here...",
lines=10,
interactive=False,
)
with gr.Column(scale=2):
gr.Markdown("### Source Nodes (JSON)")
# Source nodes display as JSON
sources_output = gr.JSON(
label="Source Nodes",
value={
"message": "Source nodes will appear here after querying..."
},
)
def get_available_docs_repo():
"""
List the available docs of repositories
Returns:
List of repo names
"""
try:
repos = get_available_repositories()
return gr.Dropdown(choices=repos, value=repos[0] if repos else None)
except Exception as e:
print(f"Error refreshing repository list: {e}")
return gr.Dropdown(choices=[], value=None)
# Simple query handler
def handle_query(repo: str, mode: str, query: str):
"""
Handle query request - returns raw data from retriever
Args:
repo: Selected repository
mode: Query mode (default, text_search, hybrid)
query: User's query
Returns:
Raw result dict from QueryRetriever.make_query()
"""
if not query.strip():
return {"error": "Please enter a query."}
if not repo:
return {"error": "Please select a repository."}
try:
# Import QueryRetriever here to avoid circular imports
from rag.query import QueryRetriever
# Create query retriever for the selected repo
retriever = QueryRetriever(repo)
# Make the query and return raw result
result = retriever.make_query(query, mode)
return result
except Exception as e:
print(f"Query error: {e}")
import traceback
traceback.print_exc()
return {"error": f"Query failed: {str(e)}"}
def make_query(repo: str, mode: str, query: str):
"""
Retrieve relevant documentation context for a given query using specified retrieval mode.
This function is designed to support Retrieval-Augmented Generation (RAG) by extracting
the most relevant context chunks from indexed documentation sources.
Args:
repo: Selected repository
mode: Query mode
query: User's query
Returns:
Tuple of (response_text, source_nodes_json)
"""
# Get raw result
result = handle_query(repo, mode, query)
# Extract response text
if "error" in result:
response_text = f"Error: {result['error']}"
source_nodes = {"error": result["error"]}
else:
response_text = result.get("response", "No response available")
source_nodes = result.get("source_nodes", [])
return response_text, source_nodes
refresh_repos_btn.click(
fn=get_available_docs_repo,
outputs=[repo_dropdown],
api_name="List available docs",
)
# Simple event wiring - single button click
query_btn.click(
fn=make_query,
inputs=[repo_dropdown, query_mode, query_input],
outputs=[response_output, sources_output],
api_name="Query docs",
)
# Also allow Enter key to trigger query
query_input.submit(
fn=make_query,
inputs=[repo_dropdown, query_mode, query_input],
outputs=[response_output, sources_output],
show_api=False,
)
# ================================
# Tab 3: Repository Management
# ================================
with gr.TabItem("πŸ—‚οΈ Repository Management"):
gr.Markdown("Manage your ingested repositories - view details and delete repositories when needed.")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### πŸ“Š Repository Statistics")
stats_display = gr.JSON(
label="Database Statistics",
value={"message": "Click refresh to load statistics..."}
)
refresh_stats_btn = gr.Button("πŸ”„ Refresh Statistics", variant="secondary")
with gr.Column(scale=2):
gr.Markdown("### πŸ“‹ Repository Details")
repos_table = gr.Dataframe(
headers=["Repository", "Files", "Last Updated"],
datatype=["str", "number", "str"],
label="Ingested Repositories",
interactive=False,
wrap=True
)
refresh_repos_btn = gr.Button("πŸ”„ Refresh Repository List", variant="secondary")
gr.Markdown("### πŸ—‘οΈ Delete Repository")
gr.Markdown("**⚠️ Warning:** This will permanently delete all documents and metadata for the selected repository.")
with gr.Row():
with gr.Column(scale=2):
delete_repo_dropdown = gr.Dropdown(
choices=[],
label="Select Repository to Delete",
value=None,
interactive=True,
allow_custom_value=False,
)
# Confirmation checkbox
confirm_delete = gr.Checkbox(
label="I understand this action cannot be undone",
value=False
)
delete_btn = gr.Button(
"πŸ—‘οΈ Delete Repository",
variant="stop",
size="lg",
interactive=False
)
with gr.Column(scale=1):
deletion_status = gr.Textbox(
label="Deletion Status",
value="Select a repository and confirm to enable deletion.",
interactive=False,
lines=6
)
# Management functions
def load_repository_stats():
"""Load overall repository statistics"""
try:
from rag.config import get_repository_stats
stats = get_repository_stats()
return stats
except Exception as e:
return {"error": f"Failed to load statistics: {str(e)}"}
def load_repository_details():
"""Load detailed repository information as a table"""
try:
from rag.config import get_repo_details
details = get_repo_details()
if not details:
return [["No repositories found", 0, "N/A"]]
# Format for dataframe
table_data = []
for repo in details:
last_updated = repo.get("last_updated", "Unknown")
if hasattr(last_updated, 'strftime'):
last_updated = last_updated.strftime("%Y-%m-%d %H:%M")
elif last_updated != "Unknown":
last_updated = str(last_updated)
table_data.append([
repo.get("repo_name", "Unknown"),
repo.get("file_count", 0),
last_updated
])
return table_data
except Exception as e:
return [["Error loading repositories", 0, str(e)]]
def update_delete_dropdown():
"""Update the dropdown with available repositories"""
try:
repos = get_available_repositories()
return gr.Dropdown(choices=repos, value=None)
except Exception as e:
print(f"Error updating delete dropdown: {e}")
return gr.Dropdown(choices=[], value=None)
def check_delete_button_state(repo_selected, confirmation_checked):
"""Enable/disable delete button based on selection and confirmation"""
if repo_selected and confirmation_checked:
return gr.Button(interactive=True)
else:
return gr.Button(interactive=False)
def delete_repository(repo_name: str, confirmed: bool):
"""Delete the selected repository"""
if not repo_name:
return "❌ No repository selected.", gr.Dropdown(choices=[]), gr.Checkbox(value=False)
if not confirmed:
return "❌ Please confirm deletion by checking the checkbox.", gr.Dropdown(choices=[]), gr.Checkbox(value=False)
try:
from rag.config import delete_repository_data
# Perform deletion
result = delete_repository_data(repo_name)
# Prepare status message
status_msg = result["message"]
if result["success"]:
status_msg += "\n\nπŸ“Š Deletion Summary:"
status_msg += f"\n- Vector documents removed: {result['vector_docs_deleted']}"
status_msg += f"\n- Repository record deleted: {'Yes' if result['repo_record_deleted'] else 'No'}"
status_msg += f"\n\nβœ… Repository '{repo_name}' has been completely removed."
# Update dropdown (remove deleted repo)
updated_dropdown = update_delete_dropdown()
# Reset confirmation checkbox
reset_checkbox = gr.Checkbox(value=False)
return status_msg, updated_dropdown, reset_checkbox
except Exception as e:
error_msg = f"❌ Error deleting repository: {str(e)}"
return error_msg, gr.Dropdown(choices=[]), gr.Checkbox(value=False)
# Wire up management events
refresh_stats_btn.click(
fn=load_repository_stats,
outputs=[stats_display],
show_api=False
)
refresh_repos_btn.click(
fn=load_repository_details,
outputs=[repos_table],
show_api=False
)
# Update delete dropdown when refreshing repos
refresh_repos_btn.click(
fn=update_delete_dropdown,
outputs=[delete_repo_dropdown],
show_api=False
)
# Enable/disable delete button based on selection and confirmation
delete_repo_dropdown.change(
fn=check_delete_button_state,
inputs=[delete_repo_dropdown, confirm_delete],
outputs=[delete_btn],
show_api=False
)
confirm_delete.change(
fn=check_delete_button_state,
inputs=[delete_repo_dropdown, confirm_delete],
outputs=[delete_btn],
show_api=False
)
# Delete repository
delete_btn.click(
fn=delete_repository,
inputs=[delete_repo_dropdown, confirm_delete],
outputs=[deletion_status, delete_repo_dropdown, confirm_delete],
show_api=False
)
# Load data on tab load
demo.load(
fn=load_repository_stats,
outputs=[stats_display],
show_api=False
)
demo.load(
fn=load_repository_details,
outputs=[repos_table],
show_api=False
)
demo.load(
fn=update_delete_dropdown,
outputs=[delete_repo_dropdown],
show_api=False
)
if __name__ == "__main__":
demo.launch(mcp_server=True)