File size: 11,321 Bytes
2f9174e
 
 
 
 
 
 
 
 
 
 
 
1ec8bd7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2f9174e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1ec8bd7
2f9174e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1ec8bd7
 
 
 
 
 
 
2f9174e
1ec8bd7
 
 
 
 
2f9174e
1ec8bd7
2f9174e
1ec8bd7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2f9174e
 
 
 
1ec8bd7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2f9174e
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
import gradio as gr
import os
import docx
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory
from langchain_community.llms import HuggingFaceEndpoint
from langchain_huggingface import HuggingFaceEmbeddings
import torch
from PIL import Image
from torchvision import transforms
from torchvision.models import resnet50, ResNet50_Weights
from torchvision import transforms, models


class GeometryImageClassifier:
    def __init__(self):
        # Load ResNet50 but only use it for feature extraction
        self.model = models.resnet50(weights='DEFAULT')
        # Remove the final classification layer
        self.model.fc = torch.nn.Identity()
        self.model.eval()
        
        self.transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        
        # Pre-computed embeddings for our 3 reference images with manual labels
        self.reference_embeddings = {
            "flat.png": {
                "embedding": None,  # Will be computed on first run
                "label": "Flat or Sheet-Based"
            },
            "cylindrical.png": {
                "embedding": None,
                "label": "Cylindrical"
            },
            "complex.png": {
                "embedding": None,
                "label": "Complex Multi Axis Geometry"
            }
        }
        
    def compute_embedding(self, images):
        img = Image.open(images).convert('RGB')
        img_tensor = self.transform(img).unsqueeze(0)
        
        with torch.no_grad():
            embedding = self.model(img_tensor)
        return embedding.squeeze().numpy()
    
    def initialize_reference_embeddings(self, reference_folder):
        for image_name in self.reference_embeddings.keys():
            images = f"{reference_folder}/{image_name}"
            self.reference_embeddings[image_name]["embedding"] = self.compute_embedding(images)
    
    def find_closest_geometry(self, query_embedding):
        best_similarity = -1
        best_label = None
        
        for ref_data in self.reference_embeddings.values():
            similarity = cosine_similarity(
                query_embedding.reshape(1, -1),
                ref_data["embedding"].reshape(1, -1)
            )[0][0]
            
            if similarity > best_similarity:
                best_similarity = similarity
                best_label = ref_data["label"]
        
        return best_label
    
    def process_image(self, images):
        # Compute embedding for the input image
        query_embedding = self.compute_embedding(images)
        
        # Find the closest matching reference geometry
        return self.find_closest_geometry(query_embedding)

    
# βœ… Use a strong sentence embedding model
semantic_model = SentenceTransformer("all-MiniLM-L6-v2")


def extract_text_from_docx(file_path):
    """ βœ… Extracts normal text & tables from a .docx file for better retrieval. """
    doc = docx.Document(file_path)
    extracted_text = []

    for para in doc.paragraphs:
        if para.text.strip():
            extracted_text.append(para.text.strip())

    for table in doc.tables:
        extracted_text.append("πŸ“Œ Table Detected:")
        for row in table.rows:
            row_text = [cell.text.strip() for cell in row.cells]
            if any(row_text):
                extracted_text.append(" | ".join(row_text))

    return "\n".join(extracted_text)


def load_documents():
    """ βœ… Loads & processes documents, ensuring table data is properly extracted. """
    file_paths = {
        "Fastener_Types_Manual": "Fastener_Types_Manual.docx",
        "Manufacturing_Expert_Manual": "Manufacturing Expert Manual.docx"
    }

    all_splits = []

    for doc_name, file_path in file_paths.items():
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"Document not found: {file_path}")

        print(f"Extracting text from {file_path}...")
        full_text = extract_text_from_docx(file_path)

        text_splitter = RecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=200)
        doc_splits = text_splitter.create_documents([full_text])

        for chunk in doc_splits:
            chunk.metadata = {"source": doc_name}

        all_splits.extend(doc_splits)

    return all_splits


def create_db(splits):
    """ βœ… Creates a FAISS vector database from document splits. """
    embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-base-en-v1.5")
    vectordb = FAISS.from_documents(splits, embeddings)
    return vectordb


def retrieve_documents(query, retriever, embeddings):
    """ βœ… Retrieves the most relevant documents & filters out low-relevance ones. """
    query_embedding = np.array(embeddings.embed_query(query)).reshape(1, -1)
    results = retriever.invoke(query)

    if not results:
        return []

    doc_embeddings = np.array([embeddings.embed_query(doc.page_content) for doc in results])
    similarity_scores = cosine_similarity(query_embedding, doc_embeddings)[0]  # βœ… Proper cosine similarity

    MIN_SIMILARITY = 0.5  # πŸ”₯ Increased threshold to improve relevance
    filtered_results = [(doc, sim) for doc, sim in zip(results, similarity_scores) if sim >= MIN_SIMILARITY]

    # βœ… Debugging log
    print(f"πŸ” Query: {query}")
    print(f"πŸ“„ Retrieved Docs (before filtering): {[(doc.metadata.get('source', 'Unknown'), sim) for doc, sim in zip(results, similarity_scores)]}")
    print(f"βœ… Filtered Docs (after threshold {MIN_SIMILARITY}): {[(doc.metadata.get('source', 'Unknown'), sim) for doc, sim in filtered_results]}")

    return [doc for doc, _ in filtered_results] if filtered_results else []


def validate_query_semantically(query, retrieved_docs):
    """ βœ… Ensures the query meaning is covered in the retrieved documents. """
    if not retrieved_docs:
        return False

    combined_text = " ".join([doc.page_content for doc in retrieved_docs])
    query_embedding = semantic_model.encode(query, normalize_embeddings=True)
    doc_embedding = semantic_model.encode(combined_text, normalize_embeddings=True)

    similarity_score = np.dot(query_embedding, doc_embedding)  # βœ… Cosine similarity already normalized

    print(f"πŸ” Semantic Similarity Score: {similarity_score}")

    return similarity_score >= 0.3  # πŸ”₯ Stricter threshold to ensure correctness


def handle_query(query, history, retriever, qa_chain, embeddings):
    """ βœ… Handles user queries & prevents hallucination. """
    retrieved_docs = retrieve_documents(query, retriever, embeddings)

    if not retrieved_docs or not validate_query_semantically(query, retrieved_docs):
        return history + [(query, "I couldn't find any relevant information.")], ""

    response = qa_chain.invoke({"question": query, "chat_history": history})
    assistant_response = response['answer'].strip()

    # βœ… Final hallucination check
    if not validate_query_semantically(query, retrieved_docs):
        assistant_response = "I couldn't find any relevant information."

    assistant_response += f"\n\nπŸ“„ **Source:** {', '.join(set(doc.metadata.get('source', 'Unknown') for doc in retrieved_docs))}"

    # βœ… Debugging logs
    print(f"πŸ€– LLM Response: {assistant_response[:300]}")  # βœ… Limit output for debugging

    history.append((query, assistant_response))
    return history, ""


def initialize_chatbot(vector_db):
    """ βœ… Initializes chatbot with improved retrieval & processing. """
    memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True, output_key='answer')

    embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-base-en-v1.5")

    retriever = vector_db.as_retriever(search_kwargs={"k": 5, "search_type": "similarity"})

    system_prompt = """You are an AI assistant that answers questions **ONLY based on the provided documents**.
- **If no relevant documents are retrieved, respond with: "I couldn't find any relevant information."**
- **If the meaning of the query does not match the retrieved documents, say "I couldn't find any relevant information."**
- **Do NOT attempt to answer from general knowledge.**
"""

    llm = HuggingFaceEndpoint(
        repo_id="mistralai/Mistral-7B-Instruct-v0.2",
        huggingfacehub_api_token=os.environ.get("Another"),
        temperature=0.1,
        max_new_tokens=400,  
        task="text-generation",
        system_prompt=system_prompt
    )

    qa_chain = ConversationalRetrievalChain.from_llm(
        llm=llm,
        retriever=retriever,
        memory=memory,
        return_source_documents=True,
        verbose=False
    )

    return retriever, qa_chain, embeddings


def process_image_and_generate_query(image):
    classifier = GeometryImageClassifier()
    geometry_type = classifier.process_image(image)
    
    query = f"I have a {geometry_type} geometry, which screw should I use and what is the best machine to use for {geometry_type} geometry?"
    return geometry_type, query

def demo():
    # Initialize classifier once at startup
    classifier = GeometryImageClassifier()
    classifier.initialize_reference_embeddings("images")
    
    # Initialize chatbot components
    retriever, qa_chain, embeddings = initialize_chatbot(create_db(load_documents()))
    
    with gr.Blocks() as app:
        gr.Markdown("### πŸ€– **Fastener Agent with Image Recognition** πŸ“š")
        
        with gr.Row():
            with gr.Column(scale=1):
                image_input = gr.Image(type="filepath", label="Upload Geometry Image")
                geometry_label = gr.Textbox(label="Detected Geometry Type", interactive=False)
                
            with gr.Column(scale=2):
                chatbot = gr.Chatbot()
                query_input = gr.Textbox(label="Ask me a question")
                query_btn = gr.Button("Ask")

        def image_upload_handler(image):
            if image is None:
                return "", ""
            # Use the initialized classifier
            geometry_type = classifier.process_image(image)
            suggested_query = f"I have a {geometry_type} geometry, which screw should I use and what is the best machine to use for {geometry_type} geometry?"
            return geometry_type, suggested_query

        def user_query_handler(query, history):
            return handle_query(query, history, retriever, qa_chain, embeddings)

        image_input.change(
            image_upload_handler,
            inputs=[image_input],
            outputs=[geometry_label, query_input]
        )
        
        query_btn.click(
            user_query_handler,
            inputs=[query_input, chatbot],
            outputs=[chatbot, query_input]
        )
        
        query_input.submit(
            user_query_handler,
            inputs=[query_input, chatbot],
            outputs=[chatbot, query_input]
        )

    app.launch()

if __name__ == "__main__":
    demo()