mgbam commited on
Commit
b134386
·
verified ·
1 Parent(s): bf0d7be

Rename chat_processing.py to core.py

Browse files
Files changed (2) hide show
  1. chat_processing.py +0 -213
  2. core.py +122 -0
chat_processing.py DELETED
@@ -1,213 +0,0 @@
1
- import re
2
- from typing import Dict, List, Optional, Tuple
3
- import base64
4
- import numpy as np
5
- from PIL import Image
6
- import gradio as gr
7
-
8
- from config import GRADIO_SUPPORTED_LANGUAGES, SEARCH_START, DIVIDER, REPLACE_END
9
-
10
- History = List[Tuple[str, str]]
11
- Messages = List[Dict[str, str]]
12
-
13
- def get_gradio_language(language):
14
- return language if language in GRADIO_SUPPORTED_LANGUAGES else None
15
-
16
- def history_to_messages(history: History, system: str) -> Messages:
17
- messages = [{'role': 'system', 'content': system}]
18
- for h in history:
19
- # Handle multimodal content in history
20
- user_content = h[0]
21
- if isinstance(user_content, list):
22
- # Extract text from multimodal content
23
- text_content = ""
24
- for item in user_content:
25
- if isinstance(item, dict) and item.get("type") == "text":
26
- text_content += item.get("text", "")
27
- user_content = text_content if text_content else str(user_content)
28
-
29
- messages.append({'role': 'user', 'content': user_content})
30
- messages.append({'role': 'assistant', 'content': h[1]})
31
- return messages
32
-
33
- def messages_to_history(messages: Messages) -> History:
34
- assert messages[0]['role'] == 'system'
35
- history = []
36
- for q, r in zip(messages[1::2], messages[2::2]):
37
- # Extract text content from multimodal messages for history
38
- user_content = q['content']
39
- if isinstance(user_content, list):
40
- text_content = ""
41
- for item in user_content:
42
- if isinstance(item, dict) and item.get("type") == "text":
43
- text_content += item.get("text", "")
44
- user_content = text_content if text_content else str(user_content)
45
-
46
- history.append((user_content, r['content']))
47
- return history
48
-
49
- def history_to_chatbot_messages(history: History) -> List[Dict[str, str]]:
50
- """Convert history tuples to chatbot message format"""
51
- messages = []
52
- for user_msg, assistant_msg in history:
53
- # Handle multimodal content
54
- if isinstance(user_msg, list):
55
- text_content = ""
56
- for item in user_msg:
57
- if isinstance(item, dict) and item.get("type") == "text":
58
- text_content += item.get("text", "")
59
- user_msg = text_content if text_content else str(user_msg)
60
-
61
- messages.append({"role": "user", "content": user_msg})
62
- messages.append({"role": "assistant", "content": assistant_msg})
63
- return messages
64
-
65
- def remove_code_block(text):
66
- # Try to match code blocks with language markers
67
- patterns = [
68
- r'```(?:html|HTML)\n([\s\S]+?)\n```', # Match ```html or ```HTML
69
- r'```\n([\s\S]+?)\n```', # Match code blocks without language markers
70
- r'```([\s\S]+?)```' # Match code blocks without line breaks
71
- ]
72
- for pattern in patterns:
73
- match = re.search(pattern, text, re.DOTALL)
74
- if match:
75
- extracted = match.group(1).strip()
76
- return extracted
77
- # If no code block is found, check if the entire text is HTML
78
- if text.strip().startswith('<!DOCTYPE html>') or text.strip().startswith('<html') or text.strip().startswith('<'):
79
- return text.strip()
80
- return text.strip()
81
-
82
- def clear_history():
83
- return [], [], None, "" # Empty lists for both tuple format and chatbot messages, None for file, empty string for website URL
84
-
85
- def update_image_input_visibility(model):
86
- """Update image input visibility based on selected model"""
87
- is_ernie_vl = model.get("id") == "baidu/ERNIE-4.5-VL-424B-A47B-Base-PT"
88
- is_glm_vl = model.get("id") == "THUDM/GLM-4.1V-9B-Thinking"
89
- return gr.update(visible=is_ernie_vl or is_glm_vl)
90
-
91
- def update_submit_button(query):
92
- """Enable submit button if query is not empty"""
93
- return gr.update(interactive=bool(query))
94
-
95
- def create_multimodal_message(text, image=None):
96
- """Create a multimodal message with text and optional image"""
97
- if image is None:
98
- return {"role": "user", "content": text}
99
-
100
- from file_processing import process_image_for_model
101
- content = [
102
- {
103
- "type": "text",
104
- "text": text
105
- },
106
- {
107
- "type": "image_url",
108
- "image_url": {
109
- "url": process_image_for_model(image)
110
- }
111
- }
112
- ]
113
-
114
- return {"role": "user", "content": content}
115
- def apply_search_replace_changes(original_html: str, changes_text: str) -> str:
116
- """Apply search/replace changes to HTML content"""
117
- if not changes_text.strip():
118
- return original_html
119
-
120
- # Split the changes text into individual search/replace blocks
121
- blocks = []
122
- current_block = ""
123
- lines = changes_text.split('\n')
124
-
125
- for line in lines:
126
- if line.strip() == SEARCH_START:
127
- if current_block.strip():
128
- blocks.append(current_block.strip())
129
- current_block = line + '\n'
130
- elif line.strip() == REPLACE_END:
131
- current_block += line + '\n'
132
- blocks.append(current_block.strip())
133
- current_block = ""
134
- else:
135
- current_block += line + '\n'
136
-
137
- if current_block.strip():
138
- blocks.append(current_block.strip())
139
-
140
- modified_html = original_html
141
-
142
- for block in blocks:
143
- if not block.strip():
144
- continue
145
-
146
- # Parse the search/replace block
147
- lines = block.split('\n')
148
- search_lines = []
149
- replace_lines = []
150
- in_search = False
151
- in_replace = False
152
-
153
- for line in lines:
154
- if line.strip() == SEARCH_START:
155
- in_search = True
156
- in_replace = False
157
- elif line.strip() == DIVIDER:
158
- in_search = False
159
- in_replace = True
160
- elif line.strip() == REPLACE_END:
161
- in_replace = False
162
- elif in_search:
163
- search_lines.append(line)
164
- elif in_replace:
165
- replace_lines.append(line)
166
-
167
- # Apply the search/replace
168
- if search_lines:
169
- search_text = '\n'.join(search_lines).strip()
170
- replace_text = '\n'.join(replace_lines).strip()
171
-
172
- if search_text in modified_html:
173
- modified_html = modified_html.replace(search_text, replace_text)
174
- else:
175
- print(f"Warning: Search text not found in HTML: {search_text[:100]}...")
176
-
177
- return modified_html
178
-
179
- def send_to_sandbox(code):
180
- # Add a wrapper to inject necessary permissions and ensure full HTML
181
- wrapped_code = f"""
182
- <!DOCTYPE html>
183
- <html>
184
- <head>
185
- <meta charset=\"UTF-8\">
186
- <meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\">
187
- <script>
188
- // Safe localStorage polyfill
189
- const safeStorage = {{
190
- _data: {{}},
191
- getItem: function(key) {{ return this._data[key] || null; }},
192
- setItem: function(key, value) {{ this._data[key] = value; }},
193
- removeItem: function(key) {{ delete this._data[key]; }},
194
- clear: function() {{ this._data = {{}}; }}
195
- }};
196
- Object.defineProperty(window, 'localStorage', {{
197
- value: safeStorage,
198
- writable: false
199
- }});
200
- window.onerror = function(message, source, lineno, colno, error) {{
201
- console.error('Error:', message);
202
- }};
203
- </script>
204
- </head>
205
- <body>
206
- {code}
207
- </body>
208
- </html>
209
- """
210
- encoded_html = base64.b64encode(wrapped_code.encode('utf-8')).decode('utf-8')
211
- data_uri = f"data:text/html;charset=utf-8;base64,{encoded_html}"
212
- iframe = f'<iframe src="{data_uri}" width="100%" height="920px" sandbox="allow-scripts allow-same-origin allow-forms allow-popups allow-modals allow-presentation" allow="display-capture"></iframe>'
213
- return iframe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
core.py ADDED
@@ -0,0 +1,122 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # /core.py
2
+
3
+ """
4
+ Core business logic for the code generation application.
5
+
6
+ This module orchestrates the entire process from receiving a user query to
7
+ generating the final code. It interacts with the services, extractors, and
8
+ utility modules to fulfill the request.
9
+ """
10
+ from typing import Dict, List, Optional, Tuple, Generator, Any
11
+
12
+ from config import (
13
+ HTML_SYSTEM_PROMPT, GENERIC_SYSTEM_PROMPT,
14
+ HTML_SYSTEM_PROMPT_WITH_SEARCH, GENERIC_SYSTEM_PROMPT_WITH_SEARCH,
15
+ FOLLOW_UP_SYSTEM_PROMPT, SEARCH_START
16
+ )
17
+ from services import llm_service, search_service
18
+ from extractor import extract_text_from_file, extract_website_content
19
+ from utils import (
20
+ history_to_messages, remove_code_block, process_image_for_model,
21
+ apply_search_replace
22
+ )
23
+
24
+ # --- Type Definitions ---
25
+ History = List[Tuple[Optional[str], Optional[str]]]
26
+
27
+ def _determine_system_prompt(language: str, enable_search: bool, history: History) -> Tuple[str, bool]:
28
+ """Determines the appropriate system prompt based on context."""
29
+ is_follow_up = False
30
+ if history and history[-1][1] and ("<!DOCTYPE html>" in history[-1][1] or "<html" in history[-1][1]):
31
+ is_follow_up = True
32
+ return FOLLOW_UP_SYSTEM_PROMPT, is_follow_up
33
+
34
+ if language == "html":
35
+ prompt = HTML_SYSTEM_PROMPT_WITH_SEARCH if enable_search else HTML_SYSTEM_PROMPT
36
+ else:
37
+ base_prompt = GENERIC_SYSTEM_PROMPT_WITH_SEARCH if enable_search else GENERIC_SYSTEM_PROMPT
38
+ prompt = base_prompt.format(language=language)
39
+ return prompt, is_follow_up
40
+
41
+ def _prepare_user_content(
42
+ query: str, image_data: Optional[Any], file_path: Optional[str],
43
+ website_url: Optional[str], enable_search: bool
44
+ ) -> any:
45
+ """Constructs the final user prompt including context from files, web, and search."""
46
+ context_parts = [query]
47
+
48
+ if file_path:
49
+ file_text = extract_text_from_file(file_path)
50
+ context_parts.append(f"\n\n--- Reference File Content ---\n{file_text[:8000]}")
51
+
52
+ if website_url:
53
+ web_text = extract_website_content(website_url)
54
+ context_parts.append(f"\n\n--- Website Content for Redesign ---\n{web_text[:8000]}")
55
+
56
+ full_query = "".join(context_parts)
57
+
58
+ if enable_search and search_service.is_available():
59
+ search_results = search_service.search(full_query)
60
+ full_query += f"\n\n--- Web Search Results ---\n{search_results}"
61
+
62
+ if image_data is not None:
63
+ return [
64
+ {"type": "text", "text": full_query},
65
+ {"type": "image_url", "image_url": {"url": process_image_for_model(image_data)}}
66
+ ]
67
+ return full_query
68
+
69
+
70
+ def generate_code(
71
+ query: str,
72
+ image_data: Optional[Any],
73
+ file_path: Optional[str],
74
+ website_url: Optional[str],
75
+ history: History,
76
+ model_config: Dict[str, str],
77
+ enable_search: bool,
78
+ language: str
79
+ ) -> Generator[Dict[str, Any], None, None]:
80
+ """
81
+ Main generator function to handle a user request and stream responses.
82
+ """
83
+ system_prompt, is_follow_up = _determine_system_prompt(language, enable_search, history)
84
+ messages = history_to_messages(history, system_prompt)
85
+ user_content = _prepare_user_content(query, image_data, file_path, website_url, enable_search)
86
+ messages.append({'role': 'user', 'content': user_content})
87
+
88
+ content_stream = ""
89
+ stream = llm_service.generate_code_stream(model_config['id'], messages)
90
+
91
+ for chunk in stream:
92
+ content_stream += chunk
93
+
94
+ if is_follow_up:
95
+ # For follow-ups, we apply changes incrementally
96
+ # This logic assumes the model sends complete change blocks.
97
+ last_html = history[-1][1] if history and history[-1][1] else ""
98
+ # Simple check to see if a full replacement block is present
99
+ if SEARCH_START in content_stream and content_stream.count(SEARCH_START) == content_stream.count(DIVIDER):
100
+ modified_html = apply_search_replace(last_html, content_stream)
101
+ processed_code = modified_html
102
+ else:
103
+ # If not a full block, show the raw diff for user to see progress
104
+ processed_code = f"Applying Changes:\n\n{content_stream}"
105
+ else:
106
+ processed_code = remove_code_block(content_stream)
107
+
108
+ yield {"code_output": processed_code}
109
+
110
+ # Final processing after stream ends
111
+ final_content = content_stream
112
+ if is_follow_up:
113
+ last_html = history[-1][1] if history and history[-1][1] else ""
114
+ final_code = apply_search_replace(last_html, final_content)
115
+ else:
116
+ final_code = remove_code_block(final_content)
117
+
118
+ # Add the interaction to history
119
+ # For user content, use the original query, not the enhanced one, for cleaner history display
120
+ history.append((query, final_code))
121
+
122
+ yield {"code_output": final_code, "history": history}