File size: 4,845 Bytes
b134386 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# /core.py
"""
Core business logic for the code generation application.
This module orchestrates the entire process from receiving a user query to
generating the final code. It interacts with the services, extractors, and
utility modules to fulfill the request.
"""
from typing import Dict, List, Optional, Tuple, Generator, Any
from config import (
HTML_SYSTEM_PROMPT, GENERIC_SYSTEM_PROMPT,
HTML_SYSTEM_PROMPT_WITH_SEARCH, GENERIC_SYSTEM_PROMPT_WITH_SEARCH,
FOLLOW_UP_SYSTEM_PROMPT, SEARCH_START
)
from services import llm_service, search_service
from extractor import extract_text_from_file, extract_website_content
from utils import (
history_to_messages, remove_code_block, process_image_for_model,
apply_search_replace
)
# --- Type Definitions ---
History = List[Tuple[Optional[str], Optional[str]]]
def _determine_system_prompt(language: str, enable_search: bool, history: History) -> Tuple[str, bool]:
"""Determines the appropriate system prompt based on context."""
is_follow_up = False
if history and history[-1][1] and ("<!DOCTYPE html>" in history[-1][1] or "<html" in history[-1][1]):
is_follow_up = True
return FOLLOW_UP_SYSTEM_PROMPT, is_follow_up
if language == "html":
prompt = HTML_SYSTEM_PROMPT_WITH_SEARCH if enable_search else HTML_SYSTEM_PROMPT
else:
base_prompt = GENERIC_SYSTEM_PROMPT_WITH_SEARCH if enable_search else GENERIC_SYSTEM_PROMPT
prompt = base_prompt.format(language=language)
return prompt, is_follow_up
def _prepare_user_content(
query: str, image_data: Optional[Any], file_path: Optional[str],
website_url: Optional[str], enable_search: bool
) -> any:
"""Constructs the final user prompt including context from files, web, and search."""
context_parts = [query]
if file_path:
file_text = extract_text_from_file(file_path)
context_parts.append(f"\n\n--- Reference File Content ---\n{file_text[:8000]}")
if website_url:
web_text = extract_website_content(website_url)
context_parts.append(f"\n\n--- Website Content for Redesign ---\n{web_text[:8000]}")
full_query = "".join(context_parts)
if enable_search and search_service.is_available():
search_results = search_service.search(full_query)
full_query += f"\n\n--- Web Search Results ---\n{search_results}"
if image_data is not None:
return [
{"type": "text", "text": full_query},
{"type": "image_url", "image_url": {"url": process_image_for_model(image_data)}}
]
return full_query
def generate_code(
query: str,
image_data: Optional[Any],
file_path: Optional[str],
website_url: Optional[str],
history: History,
model_config: Dict[str, str],
enable_search: bool,
language: str
) -> Generator[Dict[str, Any], None, None]:
"""
Main generator function to handle a user request and stream responses.
"""
system_prompt, is_follow_up = _determine_system_prompt(language, enable_search, history)
messages = history_to_messages(history, system_prompt)
user_content = _prepare_user_content(query, image_data, file_path, website_url, enable_search)
messages.append({'role': 'user', 'content': user_content})
content_stream = ""
stream = llm_service.generate_code_stream(model_config['id'], messages)
for chunk in stream:
content_stream += chunk
if is_follow_up:
# For follow-ups, we apply changes incrementally
# This logic assumes the model sends complete change blocks.
last_html = history[-1][1] if history and history[-1][1] else ""
# Simple check to see if a full replacement block is present
if SEARCH_START in content_stream and content_stream.count(SEARCH_START) == content_stream.count(DIVIDER):
modified_html = apply_search_replace(last_html, content_stream)
processed_code = modified_html
else:
# If not a full block, show the raw diff for user to see progress
processed_code = f"Applying Changes:\n\n{content_stream}"
else:
processed_code = remove_code_block(content_stream)
yield {"code_output": processed_code}
# Final processing after stream ends
final_content = content_stream
if is_follow_up:
last_html = history[-1][1] if history and history[-1][1] else ""
final_code = apply_search_replace(last_html, final_content)
else:
final_code = remove_code_block(final_content)
# Add the interaction to history
# For user content, use the original query, not the enhanced one, for cleaner history display
history.append((query, final_code))
yield {"code_output": final_code, "history": history} |