from transformers import pipeline from rcsbsearchapi import AttributeQuery from rcsbsearchapi.search import SequenceQuery, SeqMotifQuery import os from dotenv import load_dotenv from shiny import App, render, ui, reactive from itables.shiny import DT import pandas as pd import warnings import re import time # from UniprotKB_P_Sequence_RCSB_API_test import ProteinSearchEngine # import plotly.graph_objects as go from shinywidgets import output_widget, render_widget import requests # import asyncio from Bio import PDB from Bio.PDB.PDBList import PDBList from Bio.PDB.Polypeptide import protein_letters_3to1 import shutil warnings.filterwarnings('ignore') # Load environment variables from .env file load_dotenv() class PDBSearchAssistant: def __init__(self, model_name="google/flan-t5-large"): # google/flan-t5-large or Rostlab/prot_t5_xl_uniref50 11GB # Set up HuggingFace pipeline with better model self.pipe = pipeline( "text2text-generation", model=model_name, max_new_tokens=1024, temperature=0.1, torch_dtype="auto", device="cpu" # cuda or cpu ) self.prompt_template = """ Extract specific search parameters from the protein-related query: 1. Protein name or type 2. Resolution cutoff (in Å) 3. Protein sequence information 4. Specific PDB ID 5. Experimental method (X-RAY, EM, NMR) 6. Organism/Species information 7. Sequence similarity (in %) Format: Protein: [protein name or type] Resolution: [maximum resolution in Å, if mentioned] Sequence: [any sequence mentioned] PDB_ID: [specific PDB ID if mentioned] Method: [experimental method if mentioned] Organism: [organism/species if mentioned] Similarity: [similarity percentage if mentioned] Examples: Query: "Find structures with sequence MNIFEMLRIDEGLRLKIYKDTEGYYTIGIGHLLTKSPSLNAAKSELDKAIGRN and resolution better than 2.5Å" Protein: none Resolution: 2.5 Sequence: MNIFEMLRIDEGLRLKIYKDTEGYYTIGIGHLLTKSPSLNAAKSELDKAIGRN PDB_ID: none Method: none Organism: none Similarity: 100 Query: "human insulin" Protein: insulin Resolution: none Sequence: none PDB_ID: none Method: none Organism: Homo sapiens Similarity: none Query: "mouse insulin" Protein: insulin Resolution: none Sequence: none PDB_ID: none Method: none Organism: Mus musculus Similarity: none Query: "Spike protein" Protein: Spike protein Resolution: none Sequence: none PDB_ID: none Method: none Organism: none Similarity: none Query: "Human hemoglobin C resolution better than 2.5Å" Protein: hemoglobin C Resolution: 2.5 Sequence: none PDB_ID: none Method: none Organism: Homo sapiens Similarity: none Query: "Find structures containing sequence with similarity 90% MNIFEMLRIDEGLRLKIYKDTEGYYTIGIGHLLTKSPSLNAAKSELDKAIGRN" Protein: none Resolution: none Sequence: MNIFEMLRIDEGLRLKIYKDTEGYYTIGIGHLLTKSPSLNAAKSELDKAIGRN PDB_ID: none Method: none Organism: none Similarity: 90 Query: "Get sequence of PDB ID 8ET6" Protein: none Organism: none Resolution: none Sequence: none PDB_ID: 8ET6 Method: none Now analyze: Query: {query} """ self.pdb_dir = "pdb_tmp" # 임시 PDB 파일 저장 디렉토리 os.makedirs(self.pdb_dir, exist_ok=True) self.pdbl = PDBList() def search_pdb(self, query): try: # Get search parameters from LLM formatted_prompt = self.prompt_template.format(query=query) response = self.pipe(formatted_prompt)[0]['generated_text'] print("Generated parameters:", response) # Parse LLM response resolution_limit = None pdb_id = None sequence = None method = None organism = None has_resolution_query = False resolution_direction = "less" similarity = None print("Raw LLM response:", response) # Extract resolution with improved pattern matching # Look for the first valid resolution value (non-zero) resolution_matches = re.finditer(r'[Rr]esolution:\s*(\d+(?:\.\d+)?)', response) for match in resolution_matches: try: value = float(match.group(1)) if value > 0: # Only accept positive resolution values resolution_limit = value has_resolution_query = True print(f"Extracted resolution: {resolution_limit}Å") break # Stop after finding the first valid resolution except ValueError: continue # Clean and normalize remaining response # Remove all resolution entries to avoid confusion cleaned_response = re.sub(r'[Rr]esolution:\s*\d+(?:\.\d+)?(?:\s*Å?)?\s*', '', response) print("cleaned_responese :", cleaned_response) # Split remaining response into clean key-value pairs response_pairs = {} for pair in re.finditer(r'(\w+):\s*([^:]+?)(?=\s+\w+:|$)', cleaned_response): key, value = pair.groups() print(key, value) key = key.lower() value = value.strip() if value.lower() not in ['none', 'n/a']: response_pairs[key] = value print("Parsed response pairs:", response_pairs) # Debug print # case LLM remove all input, if input has any param word -> replace input to value if not response_pairs: if 'protein' in response: response_pairs['protein'] = response print("Replaced response pairs:", response_pairs) # Debug print # Extract sequence and similarity from cleaned pairs if 'sequence' in response_pairs: sequence = response_pairs['sequence'] if len(sequence) >= 25: print(f"Extracted sequence: {sequence}") if 'similarity' in response_pairs: try: similarity_str = response_pairs['similarity'].replace('%', '') similarity = float(similarity_str) print(f"Extracted similarity: {similarity}%") except ValueError: pass if 'pdb_id' in response_pairs: pdb_id = response_pairs['pdb_id'].upper() if 'method' in response_pairs: method = response_pairs['method'].upper() if 'organism' in response_pairs: organism = response_pairs['organism'] # If similarity not found in LLM response, try query if similarity is None: similarity_match = re.search(r'similarity\s+(\d+(?:\.\d+)?)\s*%', query.lower()) if similarity_match: try: similarity = float(similarity_match.group(1)) print(f"Extracted similarity from query: {similarity}%") except ValueError: pass # If still no similarity specified and sequence exists, use default if similarity is None and sequence: similarity = 100 print("No similarity specified, using default 100%") # Parse resolution from query if not found in LLM response if not has_resolution_query: resolution_pattern = r'resolution (?:better|worse|less|greater) than (\d+\.?\d*)(?:\s*Å|A)?' resolution_match = re.search(resolution_pattern, query.lower()) if resolution_match: resolution_limit = float(resolution_match.group(1)) has_resolution_query = True print(f"Extracted resolution from query: {resolution_limit}Å") # Add protein name extraction from response pairs protein_name = None if 'protein' in response_pairs: protein_name = response_pairs['protein'] print(f"Extracted protein name: {protein_name}") # Build queries list queries = [] # Add protein name query if specified if protein_name: print(f"Adding protein name filter: {protein_name}") try: protein_query = AttributeQuery( attribute="struct.title", operator="contains_words", value=protein_name ) queries.append(protein_query) protein_entity_query = AttributeQuery( attribute="rcsb_entity_container_identifiers.entity_names.value", operator="contains_words", value=protein_name ) queries.append(protein_entity_query) print(f"Created protein queries successfully: {protein_query}, {protein_entity_query}") except Exception as e: print(f"Error creating protein queries: {str(e)}") # Add sequence query if present query_words = query.split() for word in query_words: if (len(word) >= 25 and all(c in 'ACDEFGHIKLMNPQRSTVWY' for c in word.upper()) and sum(c.isupper() for c in word) / len(word) > 0.8): sequence = word break if sequence: if len(sequence) < 25: print("Warning: Sequence must be at least 25 residues long. Skipping sequence search.") else: if similarity is None: similarity = 100 print("No similarity specified, using default 100%") identity_cutoff = similarity / 100.0 print(f"Adding sequence search with identity {similarity}% (cutoff: {identity_cutoff})") sequence_query = SequenceQuery( sequence, identity_cutoff=identity_cutoff, evalue_cutoff=1, sequence_type="protein" ) queries.append(sequence_query) print(f"Created sequence query with parameters: {sequence_query.params}") # Add resolution query if present if resolution_limit and has_resolution_query: operator = "less_or_equal" if resolution_direction == "less" else "greater_or_equal" print(f"Adding resolution filter: {operator} {resolution_limit}Å") resolution_query = AttributeQuery( attribute="rcsb_entry_info.resolution_combined", operator=operator, value=resolution_limit ) queries.append(resolution_query) print(f"Created resolution query with cutoff: {resolution_limit}Å") # Add PDB ID search if specified if pdb_id: print(f"Searching for specific PDB ID: {pdb_id}") id_query = AttributeQuery( attribute="rcsb_id", operator="exact_match", value=pdb_id.upper() ) queries = [id_query] # Override other queries for direct PDB ID search # Add experimental method filter if specified if method: print(f"Adding experimental method filter: {method}") method_query = AttributeQuery( attribute="exptl.method", operator="exact_match", value=method ) queries.append(method_query) # Add organism filter if specified if organism: print(f"Adding organism filter: {organism}") organism_query = AttributeQuery( attribute="rcsb_entity_source_organism.taxonomy_lineage.name", operator="exact_match", value=organism ) queries.append(organism_query) # Combine queries with improved error handling if queries: try: if protein_name and len(queries) >= 2: print("Combining protein queries with OR") protein_queries = queries[0] | queries[1] print("Successfully combined protein queries") if len(queries) > 2: print("Combining with additional queries using AND") final_query = queries[0] & queries[1] # final_query = protein_queries # for q in queries[2:]: # final_query = final_query & q else: final_query = protein_queries else: final_query = queries[0] for q in queries[1:]: final_query = final_query & q print("Final query:", final_query) # Execute search session = final_query.exec(results_verbosity="minimal") # query return identifier, score results = [] # Process results with additional information # search_engine = ProteinSearchEngine() try: for entry in session: try: # PDB ID 추출 방식 개선 if isinstance(entry, dict): if entry.get('score') > 0.75: pdb_id = entry.get('identifier') elif hasattr(entry, 'identifier'): pdb_id = entry.identifier else: pdb_id = str(entry) pdb_id = pdb_id.upper() # PDB ID는 항상 대문자 if not pdb_id or len(pdb_id) != 4: # PDB ID는 항상 4자리 continue # thresh hold if len(results) > 1 and results[-1]["PDB ID"] == pdb_id: break # RCSB PDB REST API를 직접 사용하여 구조 정보 가져오기 structure_url = f"https://data.rcsb.org/rest/v1/core/entry/{pdb_id}" response = requests.get(structure_url) if response.status_code != 200: continue structure_data = response.json() # 결과 구성 result = { 'PDB ID': pdb_id, 'Title': structure_data.get('struct', {}).get('title', 'N/A'), '# of total residues': structure_data.get('refine_hist', [{}])[0].get('pdbx_number_residues_total', 'N/A'), '# of atoms of protein': structure_data.get('refine_hist', [{}])[0].get('pdbx_number_atoms_protein', 'N/A'), 'Resolution': f"{structure_data.get('rcsb_entry_info', {}).get('resolution_combined', [0.0])[0]:.2f}Å", 'Method': structure_data.get('exptl', [{}])[0].get('method', 'Unknown'), 'Release Date': structure_data.get('rcsb_accession_info', {}).get('initial_release_date', 'N/A') } results.append(result) # Limit to max 500 if len(results) >= 500: break except Exception as e: print(f"Error processing entry: {str(e)}") continue except Exception as e: print(f"Error processing results: {str(e)}") print(f"Error type: {type(e)}") print(f"Found {len(results)} structures") return results except Exception as e: print(f"Error combining queries: {str(e)}") print(f"Query state: {queries}") return [] return [] except Exception as e: print(f"Error during search: {str(e)}") print(f"Error type: {type(e)}") return [] def get_sequences_by_pdb_id(self, pdb_id): """Get sequences for all chains in a PDB structure using Biopython""" try: # Download PDB file pdb_path = self.pdbl.retrieve_pdb_file( pdb_id, pdir=self.pdb_dir, file_format="pdb" ) # Get structure info from RCSB API for additional details structure_url = f"https://data.rcsb.org/rest/v1/core/entry/{pdb_id}" response = requests.get(structure_url) structure_data = response.json() if response.status_code == 200 else {} if not pdb_path or not os.path.exists(pdb_path): print(f"Failed to download PDB file for {pdb_id}") sequences = [] entity_ids = structure_data.get('rcsb_entry_container_identifiers', {}).get('polymer_entity_ids', {}) for i in entity_ids: sequence_url = f"https://data.rcsb.org/rest/v1/core/polymer_entity/{pdb_id}/{i}" seq_response = requests.get(sequence_url) seq_data = seq_response.json() if response.status_code == 200 else {} sequence = seq_data.get('entity_poly', {}).get('pdbx_seq_one_letter_code_can', 'N/A') # pdbx_seq_one_letter_code chain_info = { 'chain_id': seq_data.get('entity_poly', {}).get('pdbx_strand_id', 'N/A'), # chain.id 'entity_id': i, # Default entity ID 'description': structure_data.get('struct', {}).get('title', 'N/A'), 'sequence': sequence, 'length': len(sequence), 'resolution': structure_data.get('rcsb_entry_info', {}).get('resolution_combined', [0.0])[0], 'method': structure_data.get('exptl', [{}])[0].get('method', 'Unknown'), 'release_date': structure_data.get('rcsb_accession_info', {}).get('initial_release_date', 'N/A') } sequences.append(chain_info) print("not Bio pdb list") return sequences # Parse structure parser = PDB.PDBParser(QUIET=True) structure = parser.get_structure(pdb_id, pdb_path) sequences = [] # Extract sequences from each chain for model in structure: for chain in model: sequence = "" for residue in chain: if PDB.is_aa(residue, standard=True): try: # 3글자 아미노산 코드를 1글자로 변환 resname = residue.get_resname() if resname in protein_letters_3to1: sequence += protein_letters_3to1[resname] except: continue if sequence: # Only add if sequence is not empty chain_info = { 'chain_id': chain.id, 'entity_id': '1', # Default entity ID 'description': structure_data.get('struct', {}).get('title', 'N/A'), 'sequence': sequence, 'length': len(sequence), 'resolution': structure_data.get('rcsb_entry_info', {}).get('resolution_combined', [0.0])[0], 'method': structure_data.get('exptl', [{}])[0].get('method', 'Unknown'), 'release_date': structure_data.get('rcsb_accession_info', {}).get('initial_release_date', 'N/A') } sequences.append(chain_info) # Cleanup downloaded file if os.path.exists(pdb_path): os.remove(pdb_path) return sequences except Exception as e: print(f"Error getting sequences for PDB ID {pdb_id}: {str(e)}") return [] def __del__(self): """Cleanup temporary directory on object destruction""" if hasattr(self, 'pdb_dir') and os.path.exists(self.pdb_dir): shutil.rmtree(self.pdb_dir) def process_query(self, query): """Process query and return results""" try: # Get search parameters from LLM formatted_prompt = self.prompt_template.format(query=query) response = self.pipe(formatted_prompt)[0]['generated_text'] print("Generated parameters:", response) # Parse LLM response for PDB ID pdb_id = None for line in response.split('\n'): if 'PDB_ID:' in line: value = line.split('PDB_ID:')[1].strip() if value.lower() not in ['none', 'n/a']: pdb_id = value.upper() break # Check if query is asking for sequence sequence_keywords = ['sequence', 'seq'] is_sequence_query = any(keyword in query.lower() for keyword in sequence_keywords) if is_sequence_query and pdb_id: # Get sequences for the PDB ID sequences = self.get_sequences_by_pdb_id(pdb_id) return { "type": "sequence", "results": sequences } # If not a sequence query or no PDB ID found, proceed with normal structure search return { "type": "structure", "results": self.search_pdb(query) } except Exception as e: print(f"Error processing query: {str(e)}") return {"type": "structure", "results": []} def render_html(pdb_id, chain_count): if pdb_id is None or chain_count <= 0: return "" chains = [chr(65 + i) for i in range(chain_count)] # chain block chain_html_blocks = "".join([ f"""