File size: 4,845 Bytes
b134386
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# /core.py

"""
Core business logic for the code generation application.

This module orchestrates the entire process from receiving a user query to
generating the final code. It interacts with the services, extractors, and
utility modules to fulfill the request.
"""
from typing import Dict, List, Optional, Tuple, Generator, Any

from config import (
    HTML_SYSTEM_PROMPT, GENERIC_SYSTEM_PROMPT,
    HTML_SYSTEM_PROMPT_WITH_SEARCH, GENERIC_SYSTEM_PROMPT_WITH_SEARCH,
    FOLLOW_UP_SYSTEM_PROMPT, SEARCH_START
)
from services import llm_service, search_service
from extractor import extract_text_from_file, extract_website_content
from utils import (
    history_to_messages, remove_code_block, process_image_for_model,
    apply_search_replace
)

# --- Type Definitions ---
History = List[Tuple[Optional[str], Optional[str]]]

def _determine_system_prompt(language: str, enable_search: bool, history: History) -> Tuple[str, bool]:
    """Determines the appropriate system prompt based on context."""
    is_follow_up = False
    if history and history[-1][1] and ("<!DOCTYPE html>" in history[-1][1] or "<html" in history[-1][1]):
        is_follow_up = True
        return FOLLOW_UP_SYSTEM_PROMPT, is_follow_up

    if language == "html":
        prompt = HTML_SYSTEM_PROMPT_WITH_SEARCH if enable_search else HTML_SYSTEM_PROMPT
    else:
        base_prompt = GENERIC_SYSTEM_PROMPT_WITH_SEARCH if enable_search else GENERIC_SYSTEM_PROMPT
        prompt = base_prompt.format(language=language)
    return prompt, is_follow_up

def _prepare_user_content(
    query: str, image_data: Optional[Any], file_path: Optional[str],
    website_url: Optional[str], enable_search: bool
) -> any:
    """Constructs the final user prompt including context from files, web, and search."""
    context_parts = [query]

    if file_path:
        file_text = extract_text_from_file(file_path)
        context_parts.append(f"\n\n--- Reference File Content ---\n{file_text[:8000]}")

    if website_url:
        web_text = extract_website_content(website_url)
        context_parts.append(f"\n\n--- Website Content for Redesign ---\n{web_text[:8000]}")

    full_query = "".join(context_parts)

    if enable_search and search_service.is_available():
        search_results = search_service.search(full_query)
        full_query += f"\n\n--- Web Search Results ---\n{search_results}"

    if image_data is not None:
        return [
            {"type": "text", "text": full_query},
            {"type": "image_url", "image_url": {"url": process_image_for_model(image_data)}}
        ]
    return full_query


def generate_code(
    query: str,
    image_data: Optional[Any],
    file_path: Optional[str],
    website_url: Optional[str],
    history: History,
    model_config: Dict[str, str],
    enable_search: bool,
    language: str
) -> Generator[Dict[str, Any], None, None]:
    """
    Main generator function to handle a user request and stream responses.
    """
    system_prompt, is_follow_up = _determine_system_prompt(language, enable_search, history)
    messages = history_to_messages(history, system_prompt)
    user_content = _prepare_user_content(query, image_data, file_path, website_url, enable_search)
    messages.append({'role': 'user', 'content': user_content})

    content_stream = ""
    stream = llm_service.generate_code_stream(model_config['id'], messages)

    for chunk in stream:
        content_stream += chunk
        
        if is_follow_up:
            # For follow-ups, we apply changes incrementally
            # This logic assumes the model sends complete change blocks.
            last_html = history[-1][1] if history and history[-1][1] else ""
            # Simple check to see if a full replacement block is present
            if SEARCH_START in content_stream and content_stream.count(SEARCH_START) == content_stream.count(DIVIDER):
                 modified_html = apply_search_replace(last_html, content_stream)
                 processed_code = modified_html
            else:
                 # If not a full block, show the raw diff for user to see progress
                 processed_code = f"Applying Changes:\n\n{content_stream}"
        else:
            processed_code = remove_code_block(content_stream)
        
        yield {"code_output": processed_code}

    # Final processing after stream ends
    final_content = content_stream
    if is_follow_up:
        last_html = history[-1][1] if history and history[-1][1] else ""
        final_code = apply_search_replace(last_html, final_content)
    else:
        final_code = remove_code_block(final_content)

    # Add the interaction to history
    # For user content, use the original query, not the enhanced one, for cleaner history display
    history.append((query, final_code))
    
    yield {"code_output": final_code, "history": history}