from transformers import pipeline
from rcsbsearchapi import TextQuery, AttributeQuery, Query
from rcsbsearchapi.search import Sort, SequenceQuery
import os
from dotenv import load_dotenv
from shiny import App, render, ui, reactive
import pandas as pd
import warnings
import re
from UniprotKB_P_Sequence_RCSB_API_test import ProteinQuery, ProteinSearchEngine
import plotly.graph_objects as go
from shinywidgets import output_widget, render_widget
import requests
import asyncio
warnings.filterwarnings('ignore')
# Load environment variables from .env file
load_dotenv()
# os.environ["TRANSFORMERS_CACHE"] = "./transformers_cache"
# os.makedirs("./transformers_cache", exist_ok=True)
class PDBSearchAssistant:
def __init__(self, model_name="google/flan-t5-large"):
# Set up HuggingFace pipeline with better model
self.pipe = pipeline(
"text2text-generation",
model=model_name,
max_new_tokens=512,
temperature=0.3,
torch_dtype="auto",
device="cpu"
)
self.prompt_template = """
Extract specific search parameters from the protein-related query:
1. Protein name or type
2. Resolution cutoff (in Å)
3. Protein sequence information
4. Specific PDB ID
5. Experimental method (X-RAY, EM, NMR)
6. Organism/Species information
Format:
Protein: [protein name or type]
Organism: [organism/species if mentioned]
Resolution: [maximum resolution in Å, if mentioned]
Sequence: [any sequence mentioned]
PDB_ID: [specific PDB ID if mentioned]
Method: [experimental method if mentioned]
Examples:
Query: "Find human insulin structures with X-ray better than 2.5Å resolution"
Protein: insulin
Organism: human
Resolution: 2.5
Sequence: none
PDB_ID: none
Method: X-RAY
Query: "Get sequence of PDB ID 8ET6"
Protein: none
Organism: none
Resolution: none
Sequence: none
PDB_ID: 8ET6
Method: none
Query: "Sequence of 7BZ5"
Protein: none
Organism: none
Resolution: none
Sequence: none
PDB_ID: 7BZ5
Method: none
Query: "7BZ5"
Protein: none
Organism: none
Resolution: none
Sequence: none
PDB_ID: 7BZ5
Method: none
Query: "6KAO"
Protein: none
Organism: none
Resolution: none
Sequence: none
PDB_ID: 6KAO
Method: none
Query: "Find structures containing sequence MNIFEMLRIDEGLRLKIYKDTEGYYTIGIGHLLTKSPSLNAAKSELDKAIGRNTNGVITKDEAEKLFNQDVDAAVRGILRNAKLKPVYDSLDAVRRAALINMVFQMGETGVAGFTNSLRMLQQKRWDEAAVNLAKSRWYNQTPNRAKRVITTFRTGTWDAYKNL"
Protein: none
Organism: none
Resolution: none
Sequence: MNIFEMLRIDEGLRLKIYKDTEGYYTIGIGHLLTKSPSLNAAKSELDKAIGRNTNGVITKDEAEKLFNQDVDAAVRGILRNAKLKPVYDSLDAVRRAALINMVFQMGETGVAGFTNSLRMLQQKRWDEAAVNLAKSRWYNQTPNRAKRVITTFRTGTWDAYKNL
PDB_ID: none
Method: none
Now analyze:
Query: {query}
"""
def search_pdb(self, query):
try:
# Get search parameters from LLM
formatted_prompt = self.prompt_template.format(query=query)
response = self.pipe(formatted_prompt)[0]['generated_text']
print("Generated parameters:", response)
# Parse LLM response
resolution_limit = None
pdb_id = None
sequence = None
method = None
has_resolution_query = False
resolution_direction = "less"
# Check if query contains resolution-related terms
resolution_terms = {
'better': 'less',
'best': 'less',
'highest': 'less',
'good': 'less',
'fine': 'less',
'worse': 'greater',
'worst': 'greater',
'lowest': 'greater',
'poor': 'greater',
'resolution': None,
'å': None,
'angstrom': None,
'than': None,
'under': 'less',
'below': 'less',
'above': 'greater',
'over': 'greater'
}
# Check if the original query mentions resolution
query_lower = query.lower()
# Determine resolution direction from query
for term, direction in resolution_terms.items():
if term in query_lower:
has_resolution_query = True
if direction: # if not None
resolution_direction = direction
# Also check for numerical values with Å
if re.search(r'\d+\.?\d*\s*å?', query_lower):
has_resolution_query = True
# Clean and parse LLM response
for line in response.split('\n'):
if 'Resolution:' in line:
value = line.split('Resolution:')[1].strip()
if value.lower() not in ['none', 'n/a'] and has_resolution_query:
try:
# Extract just the number
res_value = ''.join(c for c in value if c.isdigit() or c == '.')
resolution_limit = float(res_value)
except ValueError:
pass
elif 'Method:' in line:
value = line.split('Method:')[1].strip()
if value.lower() not in ['none', 'n/a']:
method = value.upper()
elif 'Sequence:' in line:
value = line.split('Sequence:')[1].strip()
if value.lower() not in ['none', 'n/a']:
sequence = value
elif 'PDB_ID:' in line:
value = line.split('PDB_ID:')[1].strip()
if value.lower() not in ['none', 'n/a']:
pdb_id = value
# Build search query
queries = []
# Check if the query contains a protein sequence pattern
# Check for amino acid sequence (minimum 25 residues)
query_words = query.split()
for word in query_words:
# Check if the word consists of valid amino acid letters
if (len(word) >= 25 and # minimum 25 residues requirement
all(c in 'ACDEFGHIKLMNPQRSTVWY' for c in word.upper()) and
sum(c.isupper() for c in word) / len(word) > 0.8):
sequence = word
break
# If sequence is found, use SequenceQuery
if sequence:
if len(sequence) < 25:
print("Warning: Sequence must be at least 25 residues long. Skipping sequence search.")
sequence = None
else:
print(f"Adding sequence search with identity 100% for sequence: {sequence}")
sequence_query = SequenceQuery(
sequence,
identity_cutoff=1.0, # 100% identity
evalue_cutoff=1,
sequence_type="protein"
)
queries.append(sequence_query)
# If no sequence, proceed with text search
else:
# Clean the original query and add text search
clean_query = query.lower()
# Remove resolution numbers and terms if they exist
if has_resolution_query:
clean_query = re.sub(r'\d+\.?\d*\s*å?', '', clean_query)
for term in resolution_terms:
clean_query = clean_query.replace(term, '')
# Clean up extra spaces and trim
clean_query = ' '.join(clean_query.split())
print("Cleaned query:", clean_query)
# Add text search if query is not empty
if clean_query.strip():
text_query = AttributeQuery(
attribute="struct.title",
operator="contains_phrase",
value=clean_query
)
queries.append(text_query)
# Add resolution filter if specified
if resolution_limit and has_resolution_query:
operator = "less_or_equal" if resolution_direction == "less" else "greater_or_equal"
print(f"Adding resolution filter: {operator} {resolution_limit}Å")
resolution_query = AttributeQuery(
attribute="rcsb_entry_info.resolution_combined",
operator=operator,
value=resolution_limit
)
queries.append(resolution_query)
# Add PDB ID search if specified
if pdb_id:
print(f"Searching for specific PDB ID: {pdb_id}")
id_query = AttributeQuery(
attribute="rcsb_id",
operator="exact_match",
value=pdb_id.upper()
)
queries = [id_query] # Override other queries for direct PDB ID search
# Add experimental method filter if specified
if method:
print(f"Adding experimental method filter: {method}")
method_query = AttributeQuery(
attribute="exptl.method",
operator="exact_match",
value=method
)
queries.append(method_query)
# Combine queries with AND operator
if queries:
final_query = queries[0]
for q in queries[1:]:
final_query = final_query & q
print("Final query:", final_query)
# Execute search
session = final_query.exec()
results = []
# Process results with additional information
search_engine = ProteinSearchEngine()
try:
for entry in session:
try:
# PDB ID 추출 방식 개선
if isinstance(entry, dict):
pdb_id = entry.get('identifier')
elif hasattr(entry, 'identifier'):
pdb_id = entry.identifier
else:
pdb_id = str(entry)
pdb_id = pdb_id.upper() # PDB ID는 항상 대문자
if not pdb_id or len(pdb_id) != 4: # PDB ID는 항상 4자리
continue
# RCSB PDB REST API를 직접 사용하여 구조 정보 가져오기
structure_url = f"https://data.rcsb.org/rest/v1/core/entry/{pdb_id}"
response = requests.get(structure_url)
if response.status_code != 200:
continue
structure_data = response.json()
# 결과 구성
result = {
'PDB ID': pdb_id,
'Resolution': f"{structure_data.get('rcsb_entry_info', {}).get('resolution_combined', [0.0])[0]:.2f}Å",
'Method': structure_data.get('exptl', [{}])[0].get('method', 'Unknown'),
'Title': structure_data.get('struct', {}).get('title', 'N/A'),
'Release Date': structure_data.get('rcsb_accession_info', {}).get('initial_release_date', 'N/A')
}
results.append(result)
# Limit to top 10 results
if len(results) >= 10:
break
except Exception as e:
print(f"Error processing entry: {str(e)}")
continue
except Exception as e:
print(f"Error processing results: {str(e)}")
print(f"Error type: {type(e)}")
print(f"Found {len(results)} structures")
return results
return []
except Exception as e:
print(f"Error during search: {str(e)}")
print(f"Error type: {type(e)}")
return []
def get_sequences_by_pdb_id(self, pdb_id):
"""Get sequences for all chains in a PDB structure"""
try:
# ProteinSearchEngine 인스턴스 생성
search_engine = ProteinSearchEngine()
# ProteinQuery 객체 생성 (resolution limit은 높게 설정하여 모든 결과 포함)
query = ProteinQuery(
name=pdb_id,
max_resolution=100.0 # 높은 값으로 설정하여 모든 구조 포함
)
# 검색 실행
results = search_engine.search(query)
if not results:
return []
sequences = []
# 결과에서 sequence 정보 추출
for structure in results:
if structure.pdb_id.upper() == pdb_id.upper():
chain_info = {
'chain_id': 'ALL', # 체인 정보는 통합
'entity_id': '1',
'description': structure.title,
'sequence': structure.sequence,
'length': len(structure.sequence),
'resolution': structure.resolution,
'method': structure.method,
'release_date': structure.release_date
}
sequences.append(chain_info)
break # 정확한 PDB ID 매치를 찾으면 중단
# 결과가 없으면 직접 API 호출 시도
if not sequences:
print(f"No results found using ProteinSearchEngine, trying direct API call...")
return self._get_sequences_by_direct_api(pdb_id)
return sequences
except Exception as e:
print(f"Error in ProteinSearchEngine search for PDB ID {pdb_id}: {str(e)}")
# 에러 발생 시 직접 API 호출로 폴백
return self._get_sequences_by_direct_api(pdb_id)
def _get_sequences_by_direct_api(self, pdb_id):
"""Fallback method using direct API calls"""
# 기존의 get_sequences_by_pdb_id 메소드 내용을 여기로 이동
try:
url = f"https://data.rcsb.org/rest/v1/core/polymer_entity_instances/{pdb_id}"
response = requests.get(url)
if response.status_code != 200:
return []
chains_data = response.json()
sequences = []
for chain_id in chains_data.keys():
entity_id = chains_data[chain_id].get('rcsb_polymer_entity_instance_container_identifiers', {}).get('entity_id')
if entity_id:
entity_url = f"https://data.rcsb.org/rest/v1/core/polymer_entity/{pdb_id}/{entity_id}"
entity_response = requests.get(entity_url)
if entity_response.status_code == 200:
entity_data = entity_response.json()
sequence = entity_data.get('entity_poly', {}).get('pdbx_seq_one_letter_code', '')
description = entity_data.get('rcsb_polymer_entity', {}).get('pdbx_description', 'N/A')
chain_info = {
'chain_id': chain_id,
'entity_id': entity_id,
'description': description,
'sequence': sequence,
'length': len(sequence)
}
sequences.append(chain_info)
return sequences
except Exception as e:
print(f"Error in direct API call for PDB ID {pdb_id}: {str(e)}")
return []
def process_query(self, query):
"""Process query and return results"""
try:
# Get search parameters from LLM
formatted_prompt = self.prompt_template.format(query=query)
response = self.pipe(formatted_prompt)[0]['generated_text']
print("Generated parameters:", response)
# Parse LLM response for PDB ID
pdb_id = None
for line in response.split('\n'):
if 'PDB_ID:' in line:
value = line.split('PDB_ID:')[1].strip()
if value.lower() not in ['none', 'n/a']:
pdb_id = value.upper()
break
# Check if query is asking for sequence
sequence_keywords = ['sequence', 'seq']
is_sequence_query = any(keyword in query.lower() for keyword in sequence_keywords)
if is_sequence_query and pdb_id:
# Get sequences for the PDB ID
sequences = self.get_sequences_by_pdb_id(pdb_id)
return {
"type": "sequence",
"results": sequences
}
# If not a sequence query or no PDB ID found, proceed with normal structure search
return {
"type": "structure",
"results": self.search_pdb(query)
}
except Exception as e:
print(f"Error processing query: {str(e)}")
return {"type": "structure", "results": []}
def pdbsummary(name):
search_engine = ProteinSearchEngine()
query = ProteinQuery(
name,
max_resolution= 5.0
)
results = search_engine.search(query)
answer = ""
for i, structure in enumerate(results, 1):
answer += f"\n{i}. PDB ID : {structure.pdb_id}\n"
answer += f"\nResolution : {structure.resolution:.2f} A \n"
answer += f"Method : {structure.method}\n Title : {structure.title}\n"
answer += f"Release Date : {structure.release_date}\n Sequence length: {len(structure.sequence)} aa\n"
answer += f" Sequence:\n {structure.sequence}\n"
return answer
def render_html(pdb_id, chain="A"):
if pdb_id is None or chain is None:
return ""
html_content = f"""