File size: 4,496 Bytes
61358dd
b134386
 
 
 
 
 
 
 
 
 
 
 
 
61358dd
b134386
 
 
 
61358dd
 
 
 
b134386
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61358dd
 
b134386
 
61358dd
 
 
b134386
 
 
 
 
 
 
 
 
61358dd
 
b134386
 
 
61358dd
b134386
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# /core.py (Corrected)

"""
Core business logic for the code generation application.

This module orchestrates the entire process from receiving a user query to
generating the final code. It interacts with the services, extractors, and
utility modules to fulfill the request.
"""
from typing import Dict, List, Optional, Tuple, Generator, Any

from config import (
    HTML_SYSTEM_PROMPT, GENERIC_SYSTEM_PROMPT,
    HTML_SYSTEM_PROMPT_WITH_SEARCH, GENERIC_SYSTEM_PROMPT_WITH_SEARCH,
    FOLLOW_UP_SYSTEM_PROMPT
)
from services import llm_service, search_service
from extractor import extract_text_from_file, extract_website_content
from utils import (
    history_to_messages,
    remove_code_block,
    process_image_for_model,
    apply_search_replace_changes  # <--- FIX: Corrected the function name here
)

# --- Type Definitions ---
History = List[Tuple[Optional[str], Optional[str]]]

def _determine_system_prompt(language: str, enable_search: bool, history: History) -> Tuple[str, bool]:
    """Determines the appropriate system prompt based on context."""
    is_follow_up = False
    if history and history[-1][1] and ("<!DOCTYPE html>" in history[-1][1] or "<html" in history[-1][1]):
        is_follow_up = True
        return FOLLOW_UP_SYSTEM_PROMPT, is_follow_up

    if language == "html":
        prompt = HTML_SYSTEM_PROMPT_WITH_SEARCH if enable_search else HTML_SYSTEM_PROMPT
    else:
        base_prompt = GENERIC_SYSTEM_PROMPT_WITH_SEARCH if enable_search else GENERIC_SYSTEM_PROMPT
        prompt = base_prompt.format(language=language)
    return prompt, is_follow_up

def _prepare_user_content(
    query: str, image_data: Optional[Any], file_path: Optional[str],
    website_url: Optional[str], enable_search: bool
) -> any:
    """Constructs the final user prompt including context from files, web, and search."""
    context_parts = [query]

    if file_path:
        file_text = extract_text_from_file(file_path)
        context_parts.append(f"\n\n--- Reference File Content ---\n{file_text[:8000]}")

    if website_url:
        web_text = extract_website_content(website_url)
        context_parts.append(f"\n\n--- Website Content for Redesign ---\n{web_text[:8000]}")

    full_query = "".join(context_parts)

    if enable_search and search_service.is_available():
        search_results = search_service.search(full_query)
        full_query += f"\n\n--- Web Search Results ---\n{search_results}"

    if image_data is not None:
        return [
            {"type": "text", "text": full_query},
            {"type": "image_url", "image_url": {"url": process_image_for_model(image_data)}}
        ]
    return full_query


def generate_code(
    query: str,
    image_data: Optional[Any],
    file_path: Optional[str],
    website_url: Optional[str],
    history: History,
    model_config: Dict[str, str],
    enable_search: bool,
    language: str
) -> Generator[Dict[str, Any], None, None]:
    """
    Main generator function to handle a user request and stream responses.
    """
    system_prompt, is_follow_up = _determine_system_prompt(language, enable_search, history)
    messages = history_to_messages(history, system_prompt)
    user_content = _prepare_user_content(query, image_data, file_path, website_url, enable_search)
    messages.append({'role': 'user', 'content': user_content})

    content_stream = ""
    stream = llm_service.generate_code_stream(model_config['id'], messages)

    for chunk in stream:
        content_stream += chunk
        processed_code = ""

        if is_follow_up:
            last_html = history[-1][1] if history and history[-1][1] else ""
            # <--- FIX: Use the corrected function name here
            modified_html = apply_search_replace_changes(last_html, content_stream)
            processed_code = modified_html
        else:
            processed_code = remove_code_block(content_stream)
        
        yield {"code_output": processed_code}

    # Final processing after stream ends
    final_content = content_stream
    if is_follow_up:
        last_html = history[-1][1] if history and history[-1][1] else ""
        # <--- FIX: And use the corrected function name here as well
        final_code = apply_search_replace_changes(last_html, final_content)
    else:
        final_code = remove_code_block(final_content)

    # Use the original query for history display, not the enhanced one.
    history.append((query, final_code))
    
    yield {"code_output": final_code, "history": history}