from transformers import pipeline from rcsbsearchapi import TextQuery, AttributeQuery, Query from rcsbsearchapi.search import Sort, SequenceQuery import os from dotenv import load_dotenv from shiny import App, render, ui, reactive import pandas as pd import warnings import re from UniprotKB_P_Sequence_RCSB_API_test import ProteinQuery, ProteinSearchEngine import plotly.graph_objects as go from shinywidgets import output_widget, render_widget import requests import asyncio from Bio import PDB from Bio.PDB.PDBList import PDBList from Bio.PDB.Polypeptide import protein_letters_3to1 import shutil warnings.filterwarnings('ignore') # Load environment variables from .env file load_dotenv() # os.environ["TRANSFORMERS_CACHE"] = "./transformers_cache" # os.makedirs("./transformers_cache", exist_ok=True) class PDBSearchAssistant: def __init__(self, model_name="google/flan-t5-large"): # Set up HuggingFace pipeline with better model self.pipe = pipeline( "text2text-generation", model=model_name, max_new_tokens=512, temperature=0.3, torch_dtype="auto", device="cpu" ) self.prompt_template = """ Extract specific search parameters from the protein-related query: 1. Protein name or type 2. Resolution cutoff (in Å) 3. Protein sequence information 4. Specific PDB ID 5. Experimental method (X-RAY, EM, NMR) 6. Organism/Species information Format: Protein: [protein name or type] Organism: [organism/species if mentioned] Resolution: [maximum resolution in Å, if mentioned] Sequence: [any sequence mentioned] PDB_ID: [specific PDB ID if mentioned] Method: [experimental method if mentioned] Examples: Query: "Find human insulin structures with X-ray better than 2.5Å resolution" Protein: insulin Organism: Homo sapiens Resolution: 2.5 Sequence: none PDB_ID: none Method: X-RAY Query: "Find structures containing sequence MNIFEMLRIDEGLRLKIYKDTEGYYTIGIGHLLTKSPSLNAAKSELDKAIGRNTNGVITKDEAEKLFNQDVDAAVRGILRNAKLKPVYDSLDAVRRAALINMVFQMGETGVAGFTNSLRMLQQKRWDEAAVNLAKSRWYNQTPNRAKRVITTFRTGTWDAYKNL" Protein: none Organism: none Resolution: none Sequence: MNIFEMLRIDEGLRLKIYKDTEGYYTIGIGHLLTKSPSLNAAKSELDKAIGRNTNGVITKDEAEKLFNQDVDAAVRGILRNAKLKPVYDSLDAVRRAALINMVFQMGETGVAGFTNSLRMLQQKRWDEAAVNLAKSRWYNQTPNRAKRVITTFRTGTWDAYKNL PDB_ID: none Method: none Query: "Get sequence of PDB ID 8ET6" Protein: none Organism: none Resolution: none Sequence: none PDB_ID: 8ET6 Method: none Query: "Find mouse lysozyme structures" Protein: lysozyme Organism: Mus musculus Resolution: none Sequence: none PDB_ID: none Method: none Query: "Show me E. coli protein structures solved by Cryo-EM" Protein: none Organism: Escherichia coli Resolution: none Sequence: none PDB_ID: none Method: EM Query: "Find S. cerevisiae structures with resolution better than 1.8Å" Protein: none Organism: Saccharomyces cerevisiae Resolution: 1.8 Sequence: none PDB_ID: none Method: none Query: "Sequence of 7BZ5" Protein: none Organism: none Resolution: none Sequence: none PDB_ID: 7BZ5 Method: none Now analyze: Query: {query} """ self.pdb_dir = "pdb_tmp" # 임시 PDB 파일 저장 디렉토리 os.makedirs(self.pdb_dir, exist_ok=True) self.pdbl = PDBList() def search_pdb(self, query): try: # Get search parameters from LLM formatted_prompt = self.prompt_template.format(query=query) response = self.pipe(formatted_prompt)[0]['generated_text'] print("Generated parameters:", response) # Parse LLM response resolution_limit = None pdb_id = None sequence = None method = None organism = None has_resolution_query = False resolution_direction = "less" # Check if query contains resolution-related terms resolution_terms = { 'better': 'less', 'best': 'less', 'highest': 'less', 'good': 'less', 'fine': 'less', 'worse': 'greater', 'worst': 'greater', 'lowest': 'greater', 'poor': 'greater', 'resolution': None, 'å': None, 'angstrom': None, 'than': None, 'under': 'less', 'below': 'less', 'above': 'greater', 'over': 'greater' } # Check if the original query mentions resolution query_lower = query.lower() # Determine resolution direction from query for term, direction in resolution_terms.items(): if term in query_lower: has_resolution_query = True if direction: # if not None resolution_direction = direction # Also check for numerical values with Å if re.search(r'\d+\.?\d*\s*å?', query_lower): has_resolution_query = True # Clean and parse LLM response for line in response.split('\n'): if 'Resolution:' in line: value = line.split('Resolution:')[1].strip() if value.lower() not in ['none', 'n/a'] and has_resolution_query: try: # Extract just the number res_value = ''.join(c for c in value if c.isdigit() or c == '.') resolution_limit = float(res_value) except ValueError: pass elif 'Method:' in line: value = line.split('Method:')[1].strip() if value.lower() not in ['none', 'n/a']: method = value.upper() elif 'Sequence:' in line: value = line.split('Sequence:')[1].strip() if value.lower() not in ['none', 'n/a']: sequence = value elif 'PDB_ID:' in line: value = line.split('PDB_ID:')[1].strip() if value.lower() not in ['none', 'n/a']: pdb_id = value elif 'Organism:' in line: value = line.split('Organism:')[1].strip() if value.lower() not in ['none', 'n/a']: organism = value # Build search query queries = [] # Check if the query contains a protein sequence pattern # Check for amino acid sequence (minimum 25 residues) query_words = query.split() for word in query_words: # Check if the word consists of valid amino acid letters if (len(word) >= 25 and # minimum 25 residues requirement all(c in 'ACDEFGHIKLMNPQRSTVWY' for c in word.upper()) and sum(c.isupper() for c in word) / len(word) > 0.8): sequence = word break # If sequence is found, use SequenceQuery if sequence: if len(sequence) < 25: print("Warning: Sequence must be at least 25 residues long. Skipping sequence search.") sequence = None else: print(f"Adding sequence search with identity 100% for sequence: {sequence}") sequence_query = SequenceQuery( sequence, identity_cutoff=1.0, # 100% identity evalue_cutoff=1, sequence_type="protein" ) queries.append(sequence_query) # If no sequence, proceed with text search else: # Clean the original query and add text search clean_query = query.lower() # Remove resolution numbers and terms if they exist if has_resolution_query: clean_query = re.sub(r'\d+\.?\d*\s*å?', '', clean_query) for term in resolution_terms: clean_query = clean_query.replace(term, '') # Clean up extra spaces and trim clean_query = ' '.join(clean_query.split()) print("Cleaned query:", clean_query) # Add text search if query is not empty if clean_query.strip(): text_query = AttributeQuery( attribute="struct.title", operator="contains_phrase", value=clean_query ) queries.append(text_query) # Add resolution filter if specified if resolution_limit and has_resolution_query: operator = "less_or_equal" if resolution_direction == "less" else "greater_or_equal" print(f"Adding resolution filter: {operator} {resolution_limit}Å") resolution_query = AttributeQuery( attribute="rcsb_entry_info.resolution_combined", operator=operator, value=resolution_limit ) queries.append(resolution_query) # Add PDB ID search if specified if pdb_id: print(f"Searching for specific PDB ID: {pdb_id}") id_query = AttributeQuery( attribute="rcsb_id", operator="exact_match", value=pdb_id.upper() ) queries = [id_query] # Override other queries for direct PDB ID search # Add experimental method filter if specified if method: print(f"Adding experimental method filter: {method}") method_query = AttributeQuery( attribute="exptl.method", operator="exact_match", value=method ) queries.append(method_query) # Add organism filter if specified if organism: print(f"Adding organism filter: {organism}") organism_query = AttributeQuery( attribute="rcsb_entity_source_organism.taxonomy_lineage.name", operator="exact_match", value=organism ) queries.append(organism_query) # Combine queries with AND operator if queries: final_query = queries[0] for q in queries[1:]: final_query = final_query & q print("Final query:", final_query) # Execute search session = final_query.exec() results = [] # Process results with additional information search_engine = ProteinSearchEngine() try: for entry in session: try: # PDB ID 추출 방식 개선 if isinstance(entry, dict): pdb_id = entry.get('identifier') elif hasattr(entry, 'identifier'): pdb_id = entry.identifier else: pdb_id = str(entry) pdb_id = pdb_id.upper() # PDB ID는 항상 대문자 if not pdb_id or len(pdb_id) != 4: # PDB ID는 항상 4자리 continue # RCSB PDB REST API를 직접 사용하여 구조 정보 가져오기 structure_url = f"https://data.rcsb.org/rest/v1/core/entry/{pdb_id}" response = requests.get(structure_url) if response.status_code != 200: continue structure_data = response.json() # 결과 구성 result = { 'PDB ID': pdb_id, 'Resolution': f"{structure_data.get('rcsb_entry_info', {}).get('resolution_combined', [0.0])[0]:.2f}Å", 'Method': structure_data.get('exptl', [{}])[0].get('method', 'Unknown'), 'Title': structure_data.get('struct', {}).get('title', 'N/A'), 'Release Date': structure_data.get('rcsb_accession_info', {}).get('initial_release_date', 'N/A') } results.append(result) # Limit to top 10 results if len(results) >= 10: break except Exception as e: print(f"Error processing entry: {str(e)}") continue except Exception as e: print(f"Error processing results: {str(e)}") print(f"Error type: {type(e)}") print(f"Found {len(results)} structures") return results return [] except Exception as e: print(f"Error during search: {str(e)}") print(f"Error type: {type(e)}") return [] def get_sequences_by_pdb_id(self, pdb_id): """Get sequences for all chains in a PDB structure using Biopython""" try: # Download PDB file pdb_path = self.pdbl.retrieve_pdb_file( pdb_id, pdir=self.pdb_dir, file_format="pdb" ) if not pdb_path or not os.path.exists(pdb_path): print(f"Failed to download PDB file for {pdb_id}") return [] # Parse structure parser = PDB.PDBParser(QUIET=True) structure = parser.get_structure(pdb_id, pdb_path) # Get structure info from RCSB API for additional details structure_url = f"https://data.rcsb.org/rest/v1/core/entry/{pdb_id}" response = requests.get(structure_url) structure_data = response.json() if response.status_code == 200 else {} sequences = [] # Extract sequences from each chain for model in structure: for chain in model: sequence = "" for residue in chain: if PDB.is_aa(residue, standard=True): try: # 3글자 아미노산 코드를 1글자로 변환 resname = residue.get_resname() if resname in protein_letters_3to1: sequence += protein_letters_3to1[resname] except: continue if sequence: # Only add if sequence is not empty chain_info = { 'chain_id': chain.id, 'entity_id': '1', # Default entity ID 'description': structure_data.get('struct', {}).get('title', 'N/A'), 'sequence': sequence, 'length': len(sequence), 'resolution': structure_data.get('rcsb_entry_info', {}).get('resolution_combined', [0.0])[0], 'method': structure_data.get('exptl', [{}])[0].get('method', 'Unknown'), 'release_date': structure_data.get('rcsb_accession_info', {}).get('initial_release_date', 'N/A') } sequences.append(chain_info) # Cleanup downloaded file if os.path.exists(pdb_path): os.remove(pdb_path) return sequences except Exception as e: print(f"Error getting sequences for PDB ID {pdb_id}: {str(e)}") return [] def __del__(self): """Cleanup temporary directory on object destruction""" if hasattr(self, 'pdb_dir') and os.path.exists(self.pdb_dir): shutil.rmtree(self.pdb_dir) def process_query(self, query): """Process query and return results""" try: # Get search parameters from LLM formatted_prompt = self.prompt_template.format(query=query) response = self.pipe(formatted_prompt)[0]['generated_text'] print("Generated parameters:", response) # Parse LLM response for PDB ID pdb_id = None for line in response.split('\n'): if 'PDB_ID:' in line: value = line.split('PDB_ID:')[1].strip() if value.lower() not in ['none', 'n/a']: pdb_id = value.upper() break # Check if query is asking for sequence sequence_keywords = ['sequence', 'seq'] is_sequence_query = any(keyword in query.lower() for keyword in sequence_keywords) if is_sequence_query and pdb_id: # Get sequences for the PDB ID sequences = self.get_sequences_by_pdb_id(pdb_id) return { "type": "sequence", "results": sequences } # If not a sequence query or no PDB ID found, proceed with normal structure search return { "type": "structure", "results": self.search_pdb(query) } except Exception as e: print(f"Error processing query: {str(e)}") return {"type": "structure", "results": []} def pdbsummary(name): search_engine = ProteinSearchEngine() query = ProteinQuery( name, max_resolution= 5.0 ) results = search_engine.search(query) answer = "" for i, structure in enumerate(results, 1): answer += f"\n{i}. PDB ID : {structure.pdb_id}\n" answer += f"\nResolution : {structure.resolution:.2f} A \n" answer += f"Method : {structure.method}\n Title : {structure.title}\n" answer += f"Release Date : {structure.release_date}\n Sequence length: {len(structure.sequence)} aa\n" answer += f" Sequence:\n {structure.sequence}\n" return answer def create_interactive_table(df): if df.empty: return go.Figure() # Reorder columns column_order = ['PDB ID', 'Resolution', 'Method', 'Title', 'Release Date'] df = df[column_order] # Release Date 형식 변경 (YYYY-MM-DD) df['Release Date'] = pd.to_datetime(df['Release Date']).dt.strftime('%Y-%m-%d') # Create interactive table table = go.Figure(data=[go.Table( header=dict( values=list(df.columns), fill_color='paleturquoise', align='center', # 헤더 중앙 정렬 font=dict(size=16), # 헤더 글자 크기 증가 ), cells=dict( values=[ [f'{cell}' if i == 0 else cell for cell in df[col]] for i, col in enumerate(df.columns) ], align='center', # 셀 내용 중앙 정렬 font=dict(size=15), # 셀 글자 크기 증가 height=35 # 셀 높이 증가 ), columnwidth=[80, 80, 100, 400, 100], customdata=[['html'] * len(df) if i == 0 else [''] * len(df) for i in range(len(df.columns))], hoverlabel=dict(bgcolor='white') )]) # Update table layout table.update_layout( margin=dict(l=20, r=20, t=20, b=20), height=450, # 테이블 전체 높이 증가 autosize=True ) return table # Simplified Shiny app UI definition app_ui = ui.page_fluid( ui.tags.head( ui.tags.style(""" .container-fluid { max-width: 1200px; margin: 0 auto; padding: 20px; } .table a { color: #0d6efd; text-decoration: none; } .table a:hover { color: #0a58ca; text-decoration: underline; } .shiny-input-container { max-width: 100%; margin: 0 auto; } #query { height: 150px; font-size: 16px; padding: 15px; width: 80%; margin: 0 auto; display: block; white-space: pre-wrap; word-wrap: break-word; resize: vertical; overflow-y: auto; } .content-wrapper { text-align: center; max-width: 1000px; margin: 0 auto; } .search-button { margin: 20px 0; } h2, h4 { text-align: center; margin: 20px 0; } .example-box { background-color: #f8f9fa; border-radius: 8px; padding: 20px; margin: 20px auto; width: 80%; text-align: left; } .example-box p { font-weight: bold; margin-bottom: 10px; padding-left: 20px; } .example-box ul { margin: 0; padding-left: 40px; } .example-box li { word-wrap: break-word; margin: 10px 0; line-height: 1.5; } .query-label { display: block; text-align: left; margin-bottom: 10px; margin-left: 10%; font-weight: bold; } .status-box { background-color: #f8f9fa; border-radius: 8px; padding: 15px; margin: 20px auto; width: 80%; text-align: left; } .status-label { font-weight: bold; margin-right: 10px; } .status-ready { color: #198754; /* Bootstrap success color */ font-weight: bold; } .sequence-results { width: 80%; margin: 20px auto; text-align: left; font-family: monospace; white-space: pre-wrap; word-wrap: break-word; background-color: #f8f9fa; border-radius: 8px; padding: 20px; overflow-x: hidden; } .sequence-text { word-break: break-all; margin: 10px 0; line-height: 1.5; } .status-spinner { display: none; margin-left: 10px; vertical-align: middle; } .status-spinner.active { display: inline-block; } """) ), ui.div( {"class": "content-wrapper"}, ui.h2("Advanced PDB Structure Search Tool"), ui.row( ui.column(12, ui.tags.label( "Search Query", {"class": "query-label", "for": "query"} ), ui.input_text_area( "query", "", value="Human insulin", width="100%", resize="vertical" ), ) ), ui.row( ui.column(12, ui.div( {"class": "example-box"}, ui.p("Example queries:"), ui.tags.ul( ui.tags.li("Human hemoglobin C resolution better than 2.5Å"), ui.tags.li("Find structures containing sequence MNIFEMLRIDEGLRLKIYKDTEGYYTIGIGHLLTKSPSLNAAKSELDKAIGRNTNGVITKDEAEKLFNQDVDAAVRGILRNAKLKPVYDSLDAVRRAALINMVFQMGETGVAGFTNSLRMLQQKRWDEAAVNLAKSRWYNQTPNRAKRVITTFRTGTWDAYKNL"), ui.tags.li("Sequence of PDB ID 8ET6") ) ) ) ), ui.row( ui.column(12, ui.div( {"class": "search-button"}, ui.input_action_button("search", "Search", class_="btn-primary btn-lg") # 버튼 크기 증가 ) ) ), ui.row( ui.column(12, ui.h4("Search Parameters:"), ui.div( {"class": "status-box"}, ui.tags.span("Status: ", class_="status-label"), ui.output_text("search_status", inline=True), ui.tags.div( {"class": "status-spinner"}, ui.tags.i({"class": "fas fa-spinner fa-spin"}) ) ) ) ), ui.row( ui.column(12, ui.h4("Top 10 Results:"), output_widget("results_table"), ui.download_button("download", "Download Results", class_="btn btn-info btn-lg") # 다운로드 버튼 스타일 개선 ) ), ui.row( ui.column(12, ui.div( {"class": "sequence-results", "id": "sequence-results"}, ui.h4("Sequences:"), ui.output_text("sequence_output") ) ) ) ) ) def server(input, output, session): assistant = PDBSearchAssistant() results_store = reactive.Value({"type": None, "results": []}) status_store = reactive.Value("Ready") @reactive.Effect @reactive.event(input.search) def _(): status_store.set("Searching...") query_results = assistant.process_query(input.query()) results_store.set(query_results) if query_results["type"] == "sequence": if not query_results["results"]: status_store.set("No sequences found") else: status_store.set("Ready") else: df = pd.DataFrame(query_results["results"]) if df.empty: status_store.set("No structures found") else: status_store.set("Ready") @output @render_widget def results_table(): return create_interactive_table(df) @output @render.text def search_status(): return status_store.get() @output @render.download(filename="pdb_search_results.csv") def download(): current_results = results_store.get() if current_results["type"] == "structure": df = pd.DataFrame(current_results["results"]) else: df = pd.DataFrame(current_results["results"]) return df.to_csv(index=False) @output @render.text def sequence_output(): current_results = results_store.get() if current_results["type"] == "sequence": sequences = current_results["results"] if not sequences: return "No sequences found" output_text = [] for seq in sequences: output_text.append(f"\nChain {seq['chain_id']} (Entity {seq['entity_id']}):") output_text.append(f"Description: {seq['description']}") output_text.append(f"Length: {seq['length']} residues") output_text.append("Sequence:") # 시퀀스를 60글자씩 나누어 줄바꿈 sequence = seq['sequence'] formatted_sequence = '\n'.join([sequence[i:i+60] for i in range(0, len(sequence), 60)]) output_text.append(formatted_sequence) output_text.append("-" * 60) # 구분선 길이도 조정 return "\n".join(output_text) return "" app = App(app_ui, server) if __name__ == "__main__": import nest_asyncio nest_asyncio.apply() app.run(host="0.0.0.0", port=7862)