import gradio as gr from transformers import pipeline import json from datetime import datetime import sqlite3 import asyncio from concurrent.futures import ThreadPoolExecutor import re # Initialize NLP pipelines ner_pipeline = pipeline("ner", model="dbmdz/bert-large-cased-finetuned-conll03-english") classifier = pipeline("zero-shot-classification") class OntologyRegistry: def __init__(self): self.temporal_patterns = [ r'\b\d{1,2}:\d{2}\s*(?:AM|PM|am|pm)?\b', r'\b(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]* \d{1,2}(?:st|nd|rd|th)?,? \d{4}\b', r'\btomorrow\b', r'\bin \d+ (?:days?|weeks?|months?)\b' ] self.location_patterns = [ r'\b(?:in|at|from|to) ([A-Z][a-zA-Z]+(,? [A-Z]{2})?)\b', r'\b[A-Z][a-zA-Z]+ Base\b', r'\bHeadquarters\b', r'\bHQ\b' ] self.entity_types = { 'PER': 'person', 'ORG': 'organization', 'LOC': 'location', 'MISC': 'miscellaneous' } def validate_pattern(self, text, pattern_type): patterns = getattr(self, f"{pattern_type}_patterns", []) matches = [] for pattern in patterns: matches.extend(re.finditer(pattern, text)) return [m.group() for m in matches] class RelationshipEngine: def __init__(self, db_path=':memory:'): self.conn = sqlite3.connect(db_path, check_same_thread=False) # Add this flag self.setup_database() def setup_database(self): # Events table self.conn.execute(''' CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY, text TEXT, timestamp DATETIME, confidence REAL ) ''') # Entities table self.conn.execute(''' CREATE TABLE IF NOT EXISTS entities ( id INTEGER PRIMARY KEY, entity_text TEXT, entity_type TEXT, -- person, organization, location, hashtag, temporal first_seen DATETIME, last_seen DATETIME, frequency INTEGER DEFAULT 1, confidence REAL ) ''') # Event-Entity relationships self.conn.execute(''' CREATE TABLE IF NOT EXISTS event_entities ( event_id INTEGER, entity_id INTEGER, FOREIGN KEY (event_id) REFERENCES events(id), FOREIGN KEY (entity_id) REFERENCES entities(id), PRIMARY KEY (event_id, entity_id) ) ''') # Entity relationships (e.g., person-organization affiliations) self.conn.execute(''' CREATE TABLE IF NOT EXISTS entity_relationships ( id INTEGER PRIMARY KEY, source_entity_id INTEGER, target_entity_id INTEGER, relationship_type TEXT, confidence REAL, first_seen DATETIME, last_seen DATETIME, FOREIGN KEY (source_entity_id) REFERENCES entities(id), FOREIGN KEY (target_entity_id) REFERENCES entities(id) ) ''') self.conn.commit() def store_entities(self, event_id, entities_dict): now = datetime.now().isoformat() for entity_type, entities in entities_dict.items(): if not isinstance(entities, list): continue for entity_text in entities: # Check if entity exists cursor = self.conn.execute( 'SELECT id, frequency FROM entities WHERE entity_text = ? AND entity_type = ?', (entity_text, entity_type) ) result = cursor.fetchone() if result: # Update existing entity entity_id, freq = result self.conn.execute(''' UPDATE entities SET frequency = ?, last_seen = ? WHERE id = ? ''', (freq + 1, now, entity_id)) else: # Insert new entity cursor = self.conn.execute(''' INSERT INTO entities (entity_text, entity_type, first_seen, last_seen, confidence) VALUES (?, ?, ?, ?, ?) ''', (entity_text, entity_type, now, now, 1.0)) entity_id = cursor.lastrowid # Create event-entity relationship self.conn.execute(''' INSERT OR IGNORE INTO event_entities (event_id, entity_id) VALUES (?, ?) ''', (event_id, entity_id)) self.conn.commit() def find_related_events(self, event_data): # Find events sharing entities entity_texts = [] for entity_type, entities in event_data.get('entities', {}).items(): if isinstance(entities, list): entity_texts.extend(entities) if not entity_texts: return [] # Build query using entity relationships query = ''' SELECT DISTINCT e.*, COUNT(ee.entity_id) as shared_entities FROM events e JOIN event_entities ee ON e.id = ee.event_id JOIN entities ent ON ee.entity_id = ent.id WHERE ent.entity_text IN ({}) GROUP BY e.id ORDER BY shared_entities DESC, e.timestamp DESC LIMIT 5 '''.format(','.join('?' * len(entity_texts))) cursor = self.conn.execute(query, entity_texts) return cursor.fetchall() def find_entity_relationships(self, entity_id): # Find direct relationships query = ''' SELECT er.*, e1.entity_text as source_text, e1.entity_type as source_type, e2.entity_text as target_text, e2.entity_type as target_type FROM entity_relationships er JOIN entities e1 ON er.source_entity_id = e1.id JOIN entities e2 ON er.target_entity_id = e2.id WHERE er.source_entity_id = ? OR er.target_entity_id = ? ''' cursor = self.conn.execute(query, (entity_id, entity_id)) return cursor.fetchall() def update_entity_relationships(self, event_id): # Find all entities in the event query = ''' SELECT e.id, e.entity_text, e.entity_type FROM entities e JOIN event_entities ee ON e.id = ee.entity_id WHERE ee.event_id = ? ''' cursor = self.conn.execute(query, (event_id,)) entities = cursor.fetchall() now = datetime.now().isoformat() # Create/update relationships between entities in same event for i, entity1 in enumerate(entities): for entity2 in entities[i+1:]: # Skip same entity type relationships if entity1[2] == entity2[2]: continue relationship_type = f"{entity1[2]}_to_{entity2[2]}" # Check if relationship exists cursor = self.conn.execute(''' SELECT id FROM entity_relationships WHERE (source_entity_id = ? AND target_entity_id = ?) OR (source_entity_id = ? AND target_entity_id = ?) ''', (entity1[0], entity2[0], entity2[0], entity1[0])) result = cursor.fetchone() if result: # Update existing relationship self.conn.execute(''' UPDATE entity_relationships SET last_seen = ?, confidence = confidence + 0.1 WHERE id = ? ''', (now, result[0])) else: # Create new relationship self.conn.execute(''' INSERT INTO entity_relationships (source_entity_id, target_entity_id, relationship_type, confidence, first_seen, last_seen) VALUES (?, ?, ?, ?, ?, ?) ''', (entity1[0], entity2[0], relationship_type, 0.5, now, now)) self.conn.commit() class EventAnalyzer: def __init__(self): self.ontology = OntologyRegistry() self.relationship_engine = RelationshipEngine() self.executor = ThreadPoolExecutor(max_workers=3) async def extract_entities(self, text): def _extract(): return ner_pipeline(text) # Run NER in thread pool ner_results = await asyncio.get_event_loop().run_in_executor( self.executor, _extract ) entities = { "people": [], "organizations": [], "locations": [], "hashtags": [word for word in text.split() if word.startswith('#')] } for item in ner_results: if item["entity"].endswith("PER"): entities["people"].append(item["word"]) elif item["entity"].endswith("ORG"): entities["organizations"].append(item["word"]) elif item["entity"].endswith("LOC"): entities["locations"].append(item["word"]) return entities def extract_temporal(self, text): return self.ontology.validate_pattern(text, 'temporal') async def extract_locations(self, text): entities = await self.extract_entities(text) ml_locations = entities.get('locations', []) pattern_locations = self.ontology.validate_pattern(text, 'location') return list(set(ml_locations + pattern_locations)) def calculate_confidence(self, entities, temporal_data, related_events): # Base confidence from entity presence base_confidence = min(1.0, ( 0.2 * bool(entities["people"]) + 0.2 * bool(entities["organizations"]) + 0.3 * bool(entities["locations"]) + 0.3 * bool(temporal_data) )) # Adjust confidence based on entity frequency cursor = self.relationship_engine.conn.execute(''' SELECT AVG(frequency) as avg_freq FROM entities WHERE entity_text IN ( SELECT DISTINCT entity_text FROM entities WHERE entity_text IN ({}) ) '''.format(','.join('?' * len([ *entities["people"], *entities["organizations"], *entities["locations"] ])))) avg_frequency = cursor.fetchone()[0] or 1 frequency_boost = min(0.2, (avg_frequency - 1) * 0.05) # Max 0.2 boost for frequency # Adjust confidence based on relationships relationship_confidence = 0 if related_events: relationship_scores = [] for event in related_events: cursor = self.relationship_engine.conn.execute(''' SELECT COUNT(*) as shared_entities FROM event_entities ee1 JOIN event_entities ee2 ON ee1.entity_id = ee2.entity_id WHERE ee1.event_id = ? AND ee2.event_id = ? ''', (event[0], event[0])) # event[0] is the event_id shared_count = cursor.fetchone()[0] relationship_scores.append(min(0.3, shared_count * 0.1)) # Max 0.3 boost per relationship if relationship_scores: relationship_confidence = max(relationship_scores) final_confidence = min(1.0, base_confidence + frequency_boost + relationship_confidence) return final_confidence async def analyze_event(self, text): try: # Parallel extraction entities_future = self.extract_entities(text) temporal_data = self.extract_temporal(text) locations_future = self.extract_locations(text) # Gather async results entities, locations = await asyncio.gather( entities_future, locations_future ) # Add temporal and locations to entities entities['locations'] = locations entities['temporal'] = temporal_data # Find related events related_events = self.relationship_engine.find_related_events({ 'text': text, 'entities': entities }) # Calculate confidence with enhanced logic confidence = self.calculate_confidence(entities, temporal_data, related_events) # Store event if confidence meets threshold cursor = None if confidence >= 0.6: cursor = self.relationship_engine.conn.execute( 'INSERT INTO events (text, timestamp, confidence) VALUES (?, ?, ?)', (text, datetime.now().isoformat(), confidence) ) event_id = cursor.lastrowid # Store entities and their relationships self.relationship_engine.store_entities(event_id, { 'person': entities['people'], 'organization': entities['organizations'], 'location': entities['locations'], 'temporal': temporal_data, 'hashtag': entities['hashtags'] }) # Update entity relationships self.relationship_engine.update_entity_relationships(event_id) self.relationship_engine.conn.commit() # Get entity relationships for rich output entity_relationships = [] if cursor and cursor.lastrowid: query = ''' SELECT DISTINCT er.*, e1.entity_text as source_text, e1.entity_type as source_type, e2.entity_text as target_text, e2.entity_type as target_type FROM event_entities ee JOIN entity_relationships er ON ee.entity_id IN (er.source_entity_id, er.target_entity_id) JOIN entities e1 ON er.source_entity_id = e1.id JOIN entities e2 ON er.target_entity_id = e2.id WHERE ee.event_id = ? ''' entity_relationships = self.relationship_engine.conn.execute(query, (cursor.lastrowid,)).fetchall() result = { "text": text, "entities": entities, "confidence": confidence, "verification_needed": confidence < 0.6, "related_events": [ { "text": event[1], "timestamp": event[2], "confidence": event[3], "shared_entities": event[4] if len(event) > 4 else None } for event in related_events ], "entity_relationships": [ { "type": rel[3], "source": rel[6], "target": rel[8], "confidence": rel[4] } for rel in entity_relationships ] if entity_relationships else [] } return result except Exception as e: return {"error": str(e)} def get_entity_statistics(self): """Get statistics about stored entities and relationships""" stats = {} # Entity counts by type cursor = self.relationship_engine.conn.execute(''' SELECT entity_type, COUNT(*) as count, AVG(frequency) as avg_frequency FROM entities GROUP BY entity_type ''') stats['entity_counts'] = cursor.fetchall() # Most frequent entities cursor = self.relationship_engine.conn.execute(''' SELECT entity_text, entity_type, frequency FROM entities ORDER BY frequency DESC LIMIT 10 ''') stats['frequent_entities'] = cursor.fetchall() # Relationship statistics cursor = self.relationship_engine.conn.execute(''' SELECT relationship_type, COUNT(*) as count, AVG(confidence) as avg_confidence FROM entity_relationships GROUP BY relationship_type ''') stats['relationship_stats'] = cursor.fetchall() return stats # Initialize analyzer analyzer = EventAnalyzer() # Custom CSS for UI css = """ .container { max-width: 1200px; margin: auto; padding: 20px; } .results { padding: 20px; border: 1px solid #ddd; border-radius: 8px; margin-top: 20px; } .confidence-high { color: #22c55e; font-weight: bold; } .confidence-low { color: #f97316; font-weight: bold; } .entity-section { margin: 15px 0; } .alert-warning { background: #fff3cd; padding: 10px; border-radius: 5px; margin: 10px 0; } .alert-success { background: #d1fae5; padding: 10px; border-radius: 5px; margin: 10px 0; } .related-events { background: #f3f4f6; padding: 15px; border-radius: 5px; margin-top: 15px; } """ def format_results(analysis_result): if "error" in analysis_result: return f"
Error: {analysis_result['error']}
" confidence_class = "confidence-high" if analysis_result["confidence"] >= 0.6 else "confidence-low" html = f"""

Analysis Results

Confidence Score: {int(analysis_result['confidence'] * 100)}%
{f'''
⚠️ Verification Required: Low confidence score detected. Please verify the extracted information.
''' if analysis_result["verification_needed"] else ''}

👤 People Detected

🏢 Organizations

📍 Locations

🕒 Temporal References

# Hashtags

{f'''
Event Validated: The extracted information meets confidence thresholds.
''' if not analysis_result["verification_needed"] else ''} {f''' ''' if analysis_result.get('related_events') else ''}
""" return html # Modified to properly handle async async def process_input(text): result = await analyzer.analyze_event(text) return format_results(result) demo = gr.Interface( fn=process_input, inputs=[ gr.Textbox( label="Event Text", placeholder="Enter text to analyze (e.g., 'John from Tech Corp. is attending the meeting in Washington, DC tomorrow at 14:30 #tech')", lines=3 ) ], outputs=gr.HTML(), title="ToY Event Analysis System", description="Analyze text to extract entities, assess confidence, and identify key event information with relationship tracking.", css=css, theme=gr.themes.Soft(), examples=[ ["John from Tech Corp. is attending the meeting in Washington, DC tomorrow at 14:30 #tech"], ["Sarah Johnson and Mike Smith from Defense Systems Inc. are conducting training in Norfolk, VA on June 15th #defense #training"], ["Team meeting at headquarters with @commander_smith at 0900 #briefing"] ] ) if __name__ == "__main__": demo.launch()