from transformers import pipeline from rcsbsearchapi import TextQuery, AttributeQuery, Query from rcsbsearchapi.search import Sort, SequenceQuery import os from dotenv import load_dotenv from shiny import App, render, ui, reactive import pandas as pd import warnings import re from UniprotKB_P_Sequence_RCSB_API_test import ProteinQuery, ProteinSearchEngine import plotly.graph_objects as go from shinywidgets import output_widget, render_widget import requests import asyncio from Bio import PDB from Bio.PDB.PDBList import PDBList from Bio.PDB.Polypeptide import protein_letters_3to1 import shutil warnings.filterwarnings('ignore') # Load environment variables from .env file load_dotenv() # os.environ["TRANSFORMERS_CACHE"] = "./transformers_cache" # os.makedirs("./transformers_cache", exist_ok=True) class PDBSearchAssistant: def __init__(self, model_name="google/flan-t5-large"): # Set up HuggingFace pipeline with better model self.pipe = pipeline( "text2text-generation", model=model_name, max_new_tokens=1024, temperature=0.1, torch_dtype="auto", device="cpu" ) self.prompt_template = """ Extract specific search parameters from the protein-related query: 1. Protein name or type 2. Resolution cutoff (in Å) 3. Protein sequence information 4. Specific PDB ID 5. Experimental method (X-RAY, EM, NMR) 6. Organism/Species information 7. Sequence similarity (in %) Format: Protein: [protein name or type] Organism: [organism/species if mentioned] Resolution: [maximum resolution in Å, if mentioned] Sequence: [any sequence mentioned] PDB_ID: [specific PDB ID if mentioned] Method: [experimental method if mentioned] Examples: Query: "Find human insulin structures with X-ray better than 2.5Å resolution" Protein: insulin Organism: Homo sapiens Resolution: 2.5 Sequence: none PDB_ID: none Method: X-RAY Query: "Find structures containing sequence with similarity 90% MNIFEMLRIDEGLRLKIYKDTEGYYTIGIGHLLTKSPSLNAAKSELDKAIGRNTNGVITKDEAEKLFNQDVDAAVRGILRNAKLKPVYDSLDAVRRAALINMVFQMGETGVAGFTNSLRMLQQKRWDEAAVNLAKSRWYNQTPNRAKRVITTFRTGTWDAYKNL" Protein: none Organism: none Resolution: none Sequence: MNIFEMLRIDEGLRLKIYKDTEGYYTIGIGHLLTKSPSLNAAKSELDKAIGRNTNGVITKDEAEKLFNQDVDAAVRGILRNAKLKPVYDSLDAVRRAALINMVFQMGETGVAGFTNSLRMLQQKRWDEAAVNLAKSRWYNQTPNRAKRVITTFRTGTWDAYKNL PDB_ID: none Method: none Similarity: 90 Query: "Get sequence of PDB ID 8ET6" Protein: none Organism: none Resolution: none Sequence: none PDB_ID: 8ET6 Method: none Query: "Find mouse lysozyme structures" Protein: lysozyme Organism: Mus musculus Resolution: none Sequence: none PDB_ID: none Method: none Now analyze: Query: {query} """ self.pdb_dir = "pdb_tmp" # 임시 PDB 파일 저장 디렉토리 os.makedirs(self.pdb_dir, exist_ok=True) self.pdbl = PDBList() def search_pdb(self, query): try: # Get search parameters from LLM formatted_prompt = self.prompt_template.format(query=query) response = self.pipe(formatted_prompt)[0]['generated_text'] print("Generated parameters:", response) # Parse LLM response resolution_limit = None pdb_id = None sequence = None method = None organism = None has_resolution_query = False resolution_direction = "less" similarity = None # Initialize similarity print("Raw LLM response:", response) # Debug print # Parse LLM response first to get similarity value for line in response.split('\n'): line = line.strip().lower() # Convert to lowercase if 'similarity:' in line: try: similarity_str = line.split('similarity:')[1].strip() if similarity_str.lower() not in ['none', 'n/a']: similarity = float(similarity_str) print(f"Successfully extracted similarity: {similarity}%") except (ValueError, IndexError) as e: print(f"Error parsing similarity: {e}") continue # If similarity is still None, try to extract from original query if similarity is None: # Case insensitive search for similarity pattern similarity_match = re.search(r'similarity\s+(\d+(?:\.\d+)?)\s*%', query.lower()) if similarity_match: try: similarity = float(similarity_match.group(1)) print(f"Extracted similarity from query: {similarity}%") except ValueError as e: print(f"Error parsing similarity from query: {e}") # Check if query contains resolution-related terms resolution_terms = { 'better': 'less', 'best': 'less', 'highest': 'less', 'good': 'less', 'fine': 'less', 'worse': 'greater', 'worst': 'greater', 'lowest': 'greater', 'poor': 'greater', 'resolution': None, 'å': None, 'angstrom': None, 'than': None, 'under': 'less', 'below': 'less', 'above': 'greater', 'over': 'greater' } # Check if the original query mentions resolution query_lower = query.lower() # Determine resolution direction from query for term, direction in resolution_terms.items(): if term in query_lower: has_resolution_query = True if direction: # if not None resolution_direction = direction # Also check for numerical values with Å resolution_match = re.search(r'(\d+\.?\d*)\s*å?.*resolution', query_lower) if resolution_match: has_resolution_query = True try: resolution_limit = float(resolution_match.group(1)) except ValueError: pass # Clean and parse LLM response for line in response.split('\n'): if 'Resolution:' in line: value = line.split('Resolution:')[1].strip() if value.lower() not in ['none', 'n/a'] and has_resolution_query: try: # Extract just the number res_value = ''.join(c for c in value if c.isdigit() or c == '.') resolution_limit = float(res_value) except ValueError: pass elif 'Method:' in line: value = line.split('Method:')[1].strip() if value.lower() not in ['none', 'n/a']: method = value.upper() elif 'Sequence:' in line: value = line.split('Sequence:')[1].strip() if value.lower() not in ['none', 'n/a']: sequence = value elif 'PDB_ID:' in line: value = line.split('PDB_ID:')[1].strip() if value.lower() not in ['none', 'n/a']: pdb_id = value elif 'Organism:' in line: value = line.split('Organism:')[1].strip() if value.lower() not in ['none', 'n/a']: organism = value # Build search query queries = [] # Check if the query contains a protein sequence pattern # Check for amino acid sequence (minimum 25 residues) query_words = query.split() for word in query_words: # Check if the word consists of valid amino acid letters if (len(word) >= 25 and # minimum 25 residues requirement all(c in 'ACDEFGHIKLMNPQRSTVWY' for c in word.upper()) and sum(c.isupper() for c in word) / len(word) > 0.8): sequence = word break # If sequence is found, use SequenceQuery if sequence: if len(sequence) < 25: print("Warning: Sequence must be at least 25 residues long. Skipping sequence search.") sequence = None else: # Use the previously extracted similarity value if similarity is None: similarity = 100 # default value print("No similarity specified, using default 100%") identity_cutoff = similarity / 100.0 # Convert percentage to decimal print(f"Adding sequence search with identity {similarity}% (cutoff: {identity_cutoff}) for sequence: {sequence}") sequence_query = SequenceQuery( sequence, identity_cutoff=identity_cutoff, evalue_cutoff=1, sequence_type="protein" ) queries.append(sequence_query) print(f"Created sequence query with parameters: {sequence_query.params}") # If no sequence, proceed with text search else: # Clean the original query and add text search clean_query = query.lower() # Remove resolution numbers and terms if they exist if has_resolution_query: clean_query = re.sub(r'\d+\.?\d*\s*å?', '', clean_query) for term in resolution_terms: clean_query = clean_query.replace(term, '') # Clean up extra spaces and trim clean_query = ' '.join(clean_query.split()) print("Cleaned query:", clean_query) # Add text search if query is not empty if clean_query.strip(): text_query = AttributeQuery( attribute="struct.title", operator="contains_phrase", value=clean_query ) queries.append(text_query) # Add resolution filter if specified if resolution_limit and has_resolution_query: operator = "less_or_equal" if resolution_direction == "less" else "greater_or_equal" print(f"Adding resolution filter: {operator} {resolution_limit}Å") resolution_query = AttributeQuery( attribute="rcsb_entry_info.resolution_combined", operator=operator, value=resolution_limit ) queries.append(resolution_query) # Add PDB ID search if specified if pdb_id: print(f"Searching for specific PDB ID: {pdb_id}") id_query = AttributeQuery( attribute="rcsb_id", operator="exact_match", value=pdb_id.upper() ) queries = [id_query] # Override other queries for direct PDB ID search # Add experimental method filter if specified if method: print(f"Adding experimental method filter: {method}") method_query = AttributeQuery( attribute="exptl.method", operator="exact_match", value=method ) queries.append(method_query) # Add organism filter if specified if organism: print(f"Adding organism filter: {organism}") organism_query = AttributeQuery( attribute="rcsb_entity_source_organism.taxonomy_lineage.name", operator="exact_match", value=organism ) queries.append(organism_query) # Combine queries with AND operator if queries: final_query = queries[0] for q in queries[1:]: final_query = final_query & q print("Final query:", final_query) # Execute search session = final_query.exec() results = [] # Process results with additional information search_engine = ProteinSearchEngine() try: for entry in session: try: # PDB ID 추출 방식 개선 if isinstance(entry, dict): pdb_id = entry.get('identifier') elif hasattr(entry, 'identifier'): pdb_id = entry.identifier else: pdb_id = str(entry) pdb_id = pdb_id.upper() # PDB ID는 항상 대문자 if not pdb_id or len(pdb_id) != 4: # PDB ID는 항상 4자리 continue # RCSB PDB REST API를 직접 사용하여 구조 정보 가져오기 structure_url = f"https://data.rcsb.org/rest/v1/core/entry/{pdb_id}" response = requests.get(structure_url) if response.status_code != 200: continue structure_data = response.json() # 결과 구성 result = { 'PDB ID': pdb_id, 'Title': structure_data.get('struct', {}).get('title', 'N/A'), '# of total residues': structure_data.get('refine_hist', [{}])[0].get('pdbx_number_residues_total', 'N/A'), '# of atoms of protein': structure_data.get('refine_hist', [{}])[0].get('pdbx_number_atoms_protein', 'N/A'), 'Resolution': f"{structure_data.get('rcsb_entry_info', {}).get('resolution_combined', [0.0])[0]:.2f}Å", 'Method': structure_data.get('exptl', [{}])[0].get('method', 'Unknown'), 'Release Date': structure_data.get('rcsb_accession_info', {}).get('initial_release_date', 'N/A') } results.append(result) # Limit to top 10 results if len(results) >= 10: break except Exception as e: print(f"Error processing entry: {str(e)}") continue except Exception as e: print(f"Error processing results: {str(e)}") print(f"Error type: {type(e)}") print(f"Found {len(results)} structures") return results return [] except Exception as e: print(f"Error during search: {str(e)}") print(f"Error type: {type(e)}") return [] def get_sequences_by_pdb_id(self, pdb_id): """Get sequences for all chains in a PDB structure using Biopython""" try: # Download PDB file pdb_path = self.pdbl.retrieve_pdb_file( pdb_id, pdir=self.pdb_dir, file_format="pdb" ) if not pdb_path or not os.path.exists(pdb_path): print(f"Failed to download PDB file for {pdb_id}") return [] # Parse structure parser = PDB.PDBParser(QUIET=True) structure = parser.get_structure(pdb_id, pdb_path) # Get structure info from RCSB API for additional details structure_url = f"https://data.rcsb.org/rest/v1/core/entry/{pdb_id}" response = requests.get(structure_url) structure_data = response.json() if response.status_code == 200 else {} sequences = [] # Extract sequences from each chain for model in structure: for chain in model: sequence = "" for residue in chain: if PDB.is_aa(residue, standard=True): try: # 3글자 아미노산 코드를 1글자로 변환 resname = residue.get_resname() if resname in protein_letters_3to1: sequence += protein_letters_3to1[resname] except: continue if sequence: # Only add if sequence is not empty chain_info = { 'chain_id': chain.id, 'entity_id': '1', # Default entity ID 'description': structure_data.get('struct', {}).get('title', 'N/A'), 'sequence': sequence, 'length': len(sequence), 'resolution': structure_data.get('rcsb_entry_info', {}).get('resolution_combined', [0.0])[0], 'method': structure_data.get('exptl', [{}])[0].get('method', 'Unknown'), 'release_date': structure_data.get('rcsb_accession_info', {}).get('initial_release_date', 'N/A') } sequences.append(chain_info) # Cleanup downloaded file if os.path.exists(pdb_path): os.remove(pdb_path) return sequences except Exception as e: print(f"Error getting sequences for PDB ID {pdb_id}: {str(e)}") return [] def __del__(self): """Cleanup temporary directory on object destruction""" if hasattr(self, 'pdb_dir') and os.path.exists(self.pdb_dir): shutil.rmtree(self.pdb_dir) def process_query(self, query): """Process query and return results""" try: # Get search parameters from LLM formatted_prompt = self.prompt_template.format(query=query) response = self.pipe(formatted_prompt)[0]['generated_text'] print("Generated parameters:", response) # Parse LLM response for PDB ID pdb_id = None for line in response.split('\n'): if 'PDB_ID:' in line: value = line.split('PDB_ID:')[1].strip() if value.lower() not in ['none', 'n/a']: pdb_id = value.upper() break # Check if query is asking for sequence sequence_keywords = ['sequence', 'seq'] is_sequence_query = any(keyword in query.lower() for keyword in sequence_keywords) if is_sequence_query and pdb_id: # Get sequences for the PDB ID sequences = self.get_sequences_by_pdb_id(pdb_id) return { "type": "sequence", "results": sequences } # If not a sequence query or no PDB ID found, proceed with normal structure search return { "type": "structure", "results": self.search_pdb(query) } except Exception as e: print(f"Error processing query: {str(e)}") return {"type": "structure", "results": []} def pdbsummary(name): search_engine = ProteinSearchEngine() query = ProteinQuery( name, max_resolution= 5.0 ) results = search_engine.search(query) answer = "" for i, structure in enumerate(results, 1): answer += f"\n{i}. PDB ID : {structure.pdb_id}\n" answer += f"\nResolution : {structure.resolution:.2f} A \n" answer += f"Method : {structure.method}\n Title : {structure.title}\n" answer += f"Release Date : {structure.release_date}\n Sequence length: {len(structure.sequence)} aa\n" answer += f" Sequence:\n {structure.sequence}\n" return answer def render_html(pdb_id): if pdb_id is None: return "" html_content = f"""