from transformers import pipeline
from rcsbsearchapi import TextQuery, AttributeQuery, Query
from rcsbsearchapi.search import Sort, SequenceQuery
import os
from dotenv import load_dotenv
from shiny import App, render, ui, reactive
import pandas as pd
import warnings
import re
from UniprotKB_P_Sequence_RCSB_API_test import ProteinQuery, ProteinSearchEngine
import plotly.graph_objects as go
from shinywidgets import output_widget, render_widget
import requests
import asyncio
from Bio import PDB
from Bio.PDB.PDBList import PDBList
from Bio.PDB.Polypeptide import protein_letters_3to1
import shutil
warnings.filterwarnings('ignore')
# Load environment variables from .env file
load_dotenv()
# os.environ["TRANSFORMERS_CACHE"] = "./transformers_cache"
# os.makedirs("./transformers_cache", exist_ok=True)
class PDBSearchAssistant:
def __init__(self, model_name="google/flan-t5-large"):
# Set up HuggingFace pipeline with better model
self.pipe = pipeline(
"text2text-generation",
model=model_name,
max_new_tokens=1024,
temperature=0.3,
torch_dtype="auto",
device="cpu"
)
self.prompt_template = """
Extract specific search parameters from the protein-related query:
1. Protein name or type
2. Resolution cutoff (in Å)
3. Protein sequence information
4. Specific PDB ID
5. Experimental method (X-RAY, EM, NMR)
6. Organism/Species information
Format:
Protein: [protein name or type]
Organism: [organism/species if mentioned]
Resolution: [maximum resolution in Å, if mentioned]
Sequence: [any sequence mentioned]
PDB_ID: [specific PDB ID if mentioned]
Method: [experimental method if mentioned]
Examples:
Query: "Find human insulin structures with X-ray better than 2.5Å resolution"
Protein: insulin
Organism: Homo sapiens
Resolution: 2.5
Sequence: none
PDB_ID: none
Method: X-RAY
Query: "Find structures containing sequence MNIFEMLRIDEGLRLKIYKDTEGYYTIGIGHLLTKSPSLNAAKSELDKAIGRNTNGVITKDEAEKLFNQDVDAAVRGILRNAKLKPVYDSLDAVRRAALINMVFQMGETGVAGFTNSLRMLQQKRWDEAAVNLAKSRWYNQTPNRAKRVITTFRTGTWDAYKNL"
Protein: none
Organism: none
Resolution: none
Sequence: MNIFEMLRIDEGLRLKIYKDTEGYYTIGIGHLLTKSPSLNAAKSELDKAIGRNTNGVITKDEAEKLFNQDVDAAVRGILRNAKLKPVYDSLDAVRRAALINMVFQMGETGVAGFTNSLRMLQQKRWDEAAVNLAKSRWYNQTPNRAKRVITTFRTGTWDAYKNL
PDB_ID: none
Method: none
Query: "Get sequence of PDB ID 8ET6"
Protein: none
Organism: none
Resolution: none
Sequence: none
PDB_ID: 8ET6
Method: none
Query: "Find mouse lysozyme structures"
Protein: lysozyme
Organism: Mus musculus
Resolution: none
Sequence: none
PDB_ID: none
Method: none
Query: "Show me E. coli protein structures solved by Cryo-EM"
Protein: none
Organism: Escherichia coli
Resolution: none
Sequence: none
PDB_ID: none
Method: EM
Query: "Find S. cerevisiae structures with resolution better than 1.8Å"
Protein: none
Organism: Saccharomyces cerevisiae
Resolution: 1.8
Sequence: none
PDB_ID: none
Method: none
Query: "Sequence of 7BZ5"
Protein: none
Organism: none
Resolution: none
Sequence: none
PDB_ID: 7BZ5
Method: none
Now analyze:
Query: {query}
"""
self.pdb_dir = "pdb_tmp" # 임시 PDB 파일 저장 디렉토리
os.makedirs(self.pdb_dir, exist_ok=True)
self.pdbl = PDBList()
def search_pdb(self, query):
try:
# Get search parameters from LLM
formatted_prompt = self.prompt_template.format(query=query)
response = self.pipe(formatted_prompt)[0]['generated_text']
print("Generated parameters:", response)
# Parse LLM response
resolution_limit = None
pdb_id = None
sequence = None
method = None
organism = None
has_resolution_query = False
resolution_direction = "less"
# Check if query contains resolution-related terms
resolution_terms = {
'better': 'less',
'best': 'less',
'highest': 'less',
'good': 'less',
'fine': 'less',
'worse': 'greater',
'worst': 'greater',
'lowest': 'greater',
'poor': 'greater',
'resolution': None,
'å': None,
'angstrom': None,
'than': None,
'under': 'less',
'below': 'less',
'above': 'greater',
'over': 'greater'
}
# Check if the original query mentions resolution
query_lower = query.lower()
# Determine resolution direction from query
for term, direction in resolution_terms.items():
if term in query_lower:
has_resolution_query = True
if direction: # if not None
resolution_direction = direction
# Also check for numerical values with Å
if re.search(r'\d+\.?\d*\s*å?', query_lower):
has_resolution_query = True
# Clean and parse LLM response
for line in response.split('\n'):
if 'Resolution:' in line:
value = line.split('Resolution:')[1].strip()
if value.lower() not in ['none', 'n/a'] and has_resolution_query:
try:
# Extract just the number
res_value = ''.join(c for c in value if c.isdigit() or c == '.')
resolution_limit = float(res_value)
except ValueError:
pass
elif 'Method:' in line:
value = line.split('Method:')[1].strip()
if value.lower() not in ['none', 'n/a']:
method = value.upper()
elif 'Sequence:' in line:
value = line.split('Sequence:')[1].strip()
if value.lower() not in ['none', 'n/a']:
sequence = value
elif 'PDB_ID:' in line:
value = line.split('PDB_ID:')[1].strip()
if value.lower() not in ['none', 'n/a']:
pdb_id = value
elif 'Organism:' in line:
value = line.split('Organism:')[1].strip()
if value.lower() not in ['none', 'n/a']:
organism = value
# Build search query
queries = []
# Check if the query contains a protein sequence pattern
# Check for amino acid sequence (minimum 25 residues)
query_words = query.split()
for word in query_words:
# Check if the word consists of valid amino acid letters
if (len(word) >= 25 and # minimum 25 residues requirement
all(c in 'ACDEFGHIKLMNPQRSTVWY' for c in word.upper()) and
sum(c.isupper() for c in word) / len(word) > 0.8):
sequence = word
break
# If sequence is found, use SequenceQuery
if sequence:
if len(sequence) < 25:
print("Warning: Sequence must be at least 25 residues long. Skipping sequence search.")
sequence = None
else:
print(f"Adding sequence search with identity 100% for sequence: {sequence}")
sequence_query = SequenceQuery(
sequence,
identity_cutoff=1.0, # 100% identity
evalue_cutoff=1,
sequence_type="protein"
)
queries.append(sequence_query)
# If no sequence, proceed with text search
else:
# Clean the original query and add text search
clean_query = query.lower()
# Remove resolution numbers and terms if they exist
if has_resolution_query:
clean_query = re.sub(r'\d+\.?\d*\s*å?', '', clean_query)
for term in resolution_terms:
clean_query = clean_query.replace(term, '')
# Clean up extra spaces and trim
clean_query = ' '.join(clean_query.split())
print("Cleaned query:", clean_query)
# Add text search if query is not empty
if clean_query.strip():
text_query = AttributeQuery(
attribute="struct.title",
operator="contains_phrase",
value=clean_query
)
queries.append(text_query)
# Add resolution filter if specified
if resolution_limit and has_resolution_query:
operator = "less_or_equal" if resolution_direction == "less" else "greater_or_equal"
print(f"Adding resolution filter: {operator} {resolution_limit}Å")
resolution_query = AttributeQuery(
attribute="rcsb_entry_info.resolution_combined",
operator=operator,
value=resolution_limit
)
queries.append(resolution_query)
# Add PDB ID search if specified
if pdb_id:
print(f"Searching for specific PDB ID: {pdb_id}")
id_query = AttributeQuery(
attribute="rcsb_id",
operator="exact_match",
value=pdb_id.upper()
)
queries = [id_query] # Override other queries for direct PDB ID search
# Add experimental method filter if specified
if method:
print(f"Adding experimental method filter: {method}")
method_query = AttributeQuery(
attribute="exptl.method",
operator="exact_match",
value=method
)
queries.append(method_query)
# Add organism filter if specified
if organism:
print(f"Adding organism filter: {organism}")
organism_query = AttributeQuery(
attribute="rcsb_entity_source_organism.taxonomy_lineage.name",
operator="exact_match",
value=organism
)
queries.append(organism_query)
# Combine queries with AND operator
if queries:
final_query = queries[0]
for q in queries[1:]:
final_query = final_query & q
print("Final query:", final_query)
# Execute search
session = final_query.exec()
results = []
# Process results with additional information
search_engine = ProteinSearchEngine()
try:
for entry in session:
try:
# PDB ID 추출 방식 개선
if isinstance(entry, dict):
pdb_id = entry.get('identifier')
elif hasattr(entry, 'identifier'):
pdb_id = entry.identifier
else:
pdb_id = str(entry)
pdb_id = pdb_id.upper() # PDB ID는 항상 대문자
if not pdb_id or len(pdb_id) != 4: # PDB ID는 항상 4자리
continue
# RCSB PDB REST API를 직접 사용하여 구조 정보 가져오기
structure_url = f"https://data.rcsb.org/rest/v1/core/entry/{pdb_id}"
response = requests.get(structure_url)
if response.status_code != 200:
continue
structure_data = response.json()
# 결과 구성
result = {
'PDB ID': pdb_id,
'Resolution': f"{structure_data.get('rcsb_entry_info', {}).get('resolution_combined', [0.0])[0]:.2f}Å",
'Method': structure_data.get('exptl', [{}])[0].get('method', 'Unknown'),
'Title': structure_data.get('struct', {}).get('title', 'N/A'),
'Release Date': structure_data.get('rcsb_accession_info', {}).get('initial_release_date', 'N/A')
}
results.append(result)
# Limit to top 10 results
if len(results) >= 10:
break
except Exception as e:
print(f"Error processing entry: {str(e)}")
continue
except Exception as e:
print(f"Error processing results: {str(e)}")
print(f"Error type: {type(e)}")
print(f"Found {len(results)} structures")
return results
return []
except Exception as e:
print(f"Error during search: {str(e)}")
print(f"Error type: {type(e)}")
return []
def get_sequences_by_pdb_id(self, pdb_id):
"""Get sequences for all chains in a PDB structure using Biopython"""
try:
# Download PDB file
pdb_path = self.pdbl.retrieve_pdb_file(
pdb_id,
pdir=self.pdb_dir,
file_format="pdb"
)
if not pdb_path or not os.path.exists(pdb_path):
print(f"Failed to download PDB file for {pdb_id}")
return []
# Parse structure
parser = PDB.PDBParser(QUIET=True)
structure = parser.get_structure(pdb_id, pdb_path)
# Get structure info from RCSB API for additional details
structure_url = f"https://data.rcsb.org/rest/v1/core/entry/{pdb_id}"
response = requests.get(structure_url)
structure_data = response.json() if response.status_code == 200 else {}
sequences = []
# Extract sequences from each chain
for model in structure:
for chain in model:
sequence = ""
for residue in chain:
if PDB.is_aa(residue, standard=True):
try:
# 3글자 아미노산 코드를 1글자로 변환
resname = residue.get_resname()
if resname in protein_letters_3to1:
sequence += protein_letters_3to1[resname]
except:
continue
if sequence: # Only add if sequence is not empty
chain_info = {
'chain_id': chain.id,
'entity_id': '1', # Default entity ID
'description': structure_data.get('struct', {}).get('title', 'N/A'),
'sequence': sequence,
'length': len(sequence),
'resolution': structure_data.get('rcsb_entry_info', {}).get('resolution_combined', [0.0])[0],
'method': structure_data.get('exptl', [{}])[0].get('method', 'Unknown'),
'release_date': structure_data.get('rcsb_accession_info', {}).get('initial_release_date', 'N/A')
}
sequences.append(chain_info)
# Cleanup downloaded file
if os.path.exists(pdb_path):
os.remove(pdb_path)
return sequences
except Exception as e:
print(f"Error getting sequences for PDB ID {pdb_id}: {str(e)}")
return []
def __del__(self):
"""Cleanup temporary directory on object destruction"""
if hasattr(self, 'pdb_dir') and os.path.exists(self.pdb_dir):
shutil.rmtree(self.pdb_dir)
def process_query(self, query):
"""Process query and return results"""
try:
# Get search parameters from LLM
formatted_prompt = self.prompt_template.format(query=query)
response = self.pipe(formatted_prompt)[0]['generated_text']
print("Generated parameters:", response)
# Parse LLM response for PDB ID
pdb_id = None
for line in response.split('\n'):
if 'PDB_ID:' in line:
value = line.split('PDB_ID:')[1].strip()
if value.lower() not in ['none', 'n/a']:
pdb_id = value.upper()
break
# Check if query is asking for sequence
sequence_keywords = ['sequence', 'seq']
is_sequence_query = any(keyword in query.lower() for keyword in sequence_keywords)
if is_sequence_query and pdb_id:
# Get sequences for the PDB ID
sequences = self.get_sequences_by_pdb_id(pdb_id)
return {
"type": "sequence",
"results": sequences
}
# If not a sequence query or no PDB ID found, proceed with normal structure search
return {
"type": "structure",
"results": self.search_pdb(query)
}
except Exception as e:
print(f"Error processing query: {str(e)}")
return {"type": "structure", "results": []}
def pdbsummary(name):
search_engine = ProteinSearchEngine()
query = ProteinQuery(
name,
max_resolution= 5.0
)
results = search_engine.search(query)
answer = ""
for i, structure in enumerate(results, 1):
answer += f"\n{i}. PDB ID : {structure.pdb_id}\n"
answer += f"\nResolution : {structure.resolution:.2f} A \n"
answer += f"Method : {structure.method}\n Title : {structure.title}\n"
answer += f"Release Date : {structure.release_date}\n Sequence length: {len(structure.sequence)} aa\n"
answer += f" Sequence:\n {structure.sequence}\n"
return answer
def render_html(pdb_id, chain="A"):
if pdb_id is None or chain is None:
return ""
html_content = f"""