Spaces:
Runtime error
Runtime error
| import os | |
| from pathlib import Path | |
| from difflib import get_close_matches | |
| class DocumentSearcher: | |
| def __init__(self): | |
| self.documents = [] | |
| self.malicious_patterns = ["DROP TABLE", "SELECT *", "INSERT INTO", "DELETE FROM", "--", ";"] | |
| def load_imdb_data(self): | |
| # Define the dataset path using the HOME environment variable | |
| home_dir = Path(os.getenv("HOME", "/")) # Fallback to root if HOME is not set | |
| data_dir = home_dir / "data-sets/aclImdb/train" | |
| pos_dir = data_dir / "pos" | |
| neg_dir = data_dir / "neg" | |
| print(f"Looking for positive reviews in: {pos_dir}") | |
| print(f"Looking for negative reviews in: {neg_dir}") | |
| if not pos_dir.exists() or not any(pos_dir.iterdir()): | |
| print("No positive reviews found.") | |
| if not neg_dir.exists() or not any(neg_dir.iterdir()): | |
| print("No negative reviews found.") | |
| # Load positive reviews | |
| for filename in pos_dir.iterdir(): | |
| with open(filename, "r", encoding="utf-8") as file: | |
| self.documents.append(file.read()) | |
| # Load negative reviews | |
| for filename in neg_dir.iterdir(): | |
| with open(filename, "r", encoding="utf-8") as file: | |
| self.documents.append(file.read()) | |
| print(f"Loaded {len(self.documents)} movie reviews from IMDB dataset.") | |
| def load_txt_files(self, txt_dir=None): | |
| if txt_dir is None: | |
| home_dir = Path(os.getenv("HOME", "/")) | |
| txt_dir = home_dir / "data-sets/txt-files/" | |
| if not txt_dir.exists(): | |
| print("No .txt files directory found.") | |
| return | |
| # Load all .txt files | |
| for filename in txt_dir.glob("*.txt"): | |
| with open(filename, "r", encoding="utf-8") as file: | |
| self.documents.append(file.read()) | |
| print(f"Loaded additional {len(self.documents)} documents from .txt files.") | |
| def is_query_malicious(self, query): | |
| for pattern in self.malicious_patterns: | |
| if pattern.lower() in query.lower(): | |
| print(f"Warning: Malicious query detected - {pattern}") | |
| return True | |
| return False | |
| def search_documents(self, query): | |
| if self.is_query_malicious(query): | |
| return [{"document": "ANOMALY: Query blocked due to detected malicious content.", "similarity": 0.0}] | |
| # Use fuzzy matching for normal queries | |
| matches = get_close_matches(query, self.documents, n=5, cutoff=0.3) | |
| if not matches: | |
| return [{"document": "No matching documents found.", "similarity": 0.0}] | |
| return [{"document": match[:100] + "..."} for match in matches] | |
| # Test the system with normal and malicious queries | |
| def test_document_search(): | |
| searcher = DocumentSearcher() | |
| # Load the IMDB movie reviews | |
| searcher.load_imdb_data() | |
| # Load additional .txt files | |
| searcher.load_txt_files() | |
| # Perform a normal query | |
| normal_query = "This movie had great acting and a compelling storyline." | |
| normal_results = searcher.search_documents(normal_query) | |
| print("Normal Query Results:") | |
| for result in normal_results: | |
| print(f"Document: {result['document']}") | |
| # Perform a query injection attack | |
| malicious_query = "DROP TABLE reviews; SELECT * FROM confidential_data;" | |
| attack_results = searcher.search_documents(malicious_query) | |
| print("\nMalicious Query Results:") | |
| for result in attack_results: | |
| print(f"Document: {result['document']}") | |
| if __name__ == "__main__": | |
| test_document_search() | |