dwb2023's picture
Update app.py
e8d9eb9 verified
raw
history blame
17.2 kB
import gradio as gr
from transformers import pipeline
import json
from datetime import datetime
import sqlite3
import asyncio
from concurrent.futures import ThreadPoolExecutor
import re
# Initialize NLP pipelines
ner_pipeline = pipeline("ner", model="dbmdz/bert-large-cased-finetuned-conll03-english")
classifier = pipeline("zero-shot-classification")
class OntologyRegistry:
def __init__(self):
self.temporal_patterns = [
r'\b\d{1,2}:\d{2}\s*(?:AM|PM|am|pm)?\b',
r'\b(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]* \d{1,2}(?:st|nd|rd|th)?,? \d{4}\b',
r'\btomorrow\b',
r'\bin \d+ (?:days?|weeks?|months?)\b'
]
self.location_patterns = [
r'\b(?:in|at|from|to) ([A-Z][a-zA-Z]+(,? [A-Z]{2})?)\b',
r'\b[A-Z][a-zA-Z]+ Base\b',
r'\bHeadquarters\b',
r'\bHQ\b'
]
self.entity_types = {
'PER': 'person',
'ORG': 'organization',
'LOC': 'location',
'MISC': 'miscellaneous'
}
def validate_pattern(self, text, pattern_type):
patterns = getattr(self, f"{pattern_type}_patterns", [])
matches = []
for pattern in patterns:
matches.extend(re.finditer(pattern, text))
return [m.group() for m in matches]
class RelationshipEngine:
def __init__(self, db_path=':memory:'):
self.conn = sqlite3.connect(db_path)
self.setup_database()
def setup_database(self):
# Events table
self.conn.execute('''
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY,
text TEXT,
timestamp DATETIME,
confidence REAL
)
''')
# Entities table
self.conn.execute('''
CREATE TABLE IF NOT EXISTS entities (
id INTEGER PRIMARY KEY,
entity_text TEXT,
entity_type TEXT, -- person, organization, location, hashtag, temporal
first_seen DATETIME,
last_seen DATETIME,
frequency INTEGER DEFAULT 1,
confidence REAL
)
''')
# Event-Entity relationships
self.conn.execute('''
CREATE TABLE IF NOT EXISTS event_entities (
event_id INTEGER,
entity_id INTEGER,
FOREIGN KEY (event_id) REFERENCES events(id),
FOREIGN KEY (entity_id) REFERENCES entities(id),
PRIMARY KEY (event_id, entity_id)
)
''')
# Entity relationships (e.g., person-organization affiliations)
self.conn.execute('''
CREATE TABLE IF NOT EXISTS entity_relationships (
id INTEGER PRIMARY KEY,
source_entity_id INTEGER,
target_entity_id INTEGER,
relationship_type TEXT,
confidence REAL,
first_seen DATETIME,
last_seen DATETIME,
FOREIGN KEY (source_entity_id) REFERENCES entities(id),
FOREIGN KEY (target_entity_id) REFERENCES entities(id)
)
''')
self.conn.commit()
def store_entities(self, event_id, entities_dict):
now = datetime.now().isoformat()
for entity_type, entities in entities_dict.items():
if not isinstance(entities, list):
continue
for entity_text in entities:
# Check if entity exists
cursor = self.conn.execute(
'SELECT id, frequency FROM entities WHERE entity_text = ? AND entity_type = ?',
(entity_text, entity_type)
)
result = cursor.fetchone()
if result:
# Update existing entity
entity_id, freq = result
self.conn.execute('''
UPDATE entities
SET frequency = ?, last_seen = ?
WHERE id = ?
''', (freq + 1, now, entity_id))
else:
# Insert new entity
cursor = self.conn.execute('''
INSERT INTO entities (entity_text, entity_type, first_seen, last_seen, confidence)
VALUES (?, ?, ?, ?, ?)
''', (entity_text, entity_type, now, now, 1.0))
entity_id = cursor.lastrowid
# Create event-entity relationship
self.conn.execute('''
INSERT OR IGNORE INTO event_entities (event_id, entity_id)
VALUES (?, ?)
''', (event_id, entity_id))
self.conn.commit()
def find_related_events(self, event_data):
# Find events sharing entities
entity_texts = []
for entity_type, entities in event_data.get('entities', {}).items():
if isinstance(entities, list):
entity_texts.extend(entities)
if not entity_texts:
return []
# Build query using entity relationships
query = '''
SELECT DISTINCT e.*, COUNT(ee.entity_id) as shared_entities
FROM events e
JOIN event_entities ee ON e.id = ee.event_id
JOIN entities ent ON ee.entity_id = ent.id
WHERE ent.entity_text IN ({})
GROUP BY e.id
ORDER BY shared_entities DESC, e.timestamp DESC
LIMIT 5
'''.format(','.join('?' * len(entity_texts)))
cursor = self.conn.execute(query, entity_texts)
return cursor.fetchall()
def find_entity_relationships(self, entity_id):
# Find direct relationships
query = '''
SELECT er.*,
e1.entity_text as source_text, e1.entity_type as source_type,
e2.entity_text as target_text, e2.entity_type as target_type
FROM entity_relationships er
JOIN entities e1 ON er.source_entity_id = e1.id
JOIN entities e2 ON er.target_entity_id = e2.id
WHERE er.source_entity_id = ? OR er.target_entity_id = ?
'''
cursor = self.conn.execute(query, (entity_id, entity_id))
return cursor.fetchall()
def update_entity_relationships(self, event_id):
# Find all entities in the event
query = '''
SELECT e.id, e.entity_text, e.entity_type
FROM entities e
JOIN event_entities ee ON e.id = ee.entity_id
WHERE ee.event_id = ?
'''
cursor = self.conn.execute(query, (event_id,))
entities = cursor.fetchall()
now = datetime.now().isoformat()
# Create/update relationships between entities in same event
for i, entity1 in enumerate(entities):
for entity2 in entities[i+1:]:
# Skip same entity type relationships
if entity1[2] == entity2[2]:
continue
relationship_type = f"{entity1[2]}_to_{entity2[2]}"
# Check if relationship exists
cursor = self.conn.execute('''
SELECT id FROM entity_relationships
WHERE (source_entity_id = ? AND target_entity_id = ?)
OR (source_entity_id = ? AND target_entity_id = ?)
''', (entity1[0], entity2[0], entity2[0], entity1[0]))
result = cursor.fetchone()
if result:
# Update existing relationship
self.conn.execute('''
UPDATE entity_relationships
SET last_seen = ?, confidence = confidence + 0.1
WHERE id = ?
''', (now, result[0]))
else:
# Create new relationship
self.conn.execute('''
INSERT INTO entity_relationships
(source_entity_id, target_entity_id, relationship_type, confidence, first_seen, last_seen)
VALUES (?, ?, ?, ?, ?, ?)
''', (entity1[0], entity2[0], relationship_type, 0.5, now, now))
self.conn.commit()
class EventAnalyzer:
def __init__(self):
self.ontology = OntologyRegistry()
self.relationship_engine = RelationshipEngine()
self.executor = ThreadPoolExecutor(max_workers=3)
async def extract_entities(self, text):
def _extract():
return ner_pipeline(text)
# Run NER in thread pool
ner_results = await asyncio.get_event_loop().run_in_executor(
self.executor, _extract
)
entities = {
"people": [],
"organizations": [],
"locations": [],
"hashtags": [word for word in text.split() if word.startswith('#')]
}
for item in ner_results:
if item["entity"].endswith("PER"):
entities["people"].append(item["word"])
elif item["entity"].endswith("ORG"):
entities["organizations"].append(item["word"])
elif item["entity"].endswith("LOC"):
entities["locations"].append(item["word"])
return entities
def extract_temporal(self, text):
return self.ontology.validate_pattern(text, 'temporal')
async def extract_locations(self, text):
# First await the entities result
entities = await self.extract_entities(text)
ml_locations = entities.get('locations', [])
# Get pattern-based locations
pattern_locations = self.ontology.validate_pattern(text, 'location')
return list(set(ml_locations + pattern_locations))
async def analyze_event(self, text):
try:
# Parallel extraction
entities_future = self.extract_entities(text)
temporal_data = self.extract_temporal(text) # This is synchronous now
locations_future = self.extract_locations(text)
# Gather async results
entities, locations = await asyncio.gather(
entities_future, locations_future
)
# Add temporal and locations to entities
entities['locations'] = locations
entities['temporal'] = temporal_data
# Calculate initial confidence
confidence = min(1.0, (
0.2 * bool(entities["people"]) +
0.2 * bool(entities["organizations"]) +
0.3 * bool(entities["locations"]) +
0.3 * bool(temporal_data)
))
# Find related events
related_events = self.relationship_engine.find_related_events({
'text': text,
'entities': entities
})
# Adjust confidence based on relationships
if related_events:
relationship_confidence = max(
self.relationship_engine.calculate_relationship_confidence(
{'entities': entities},
{'text': event[1]}
)
for event in related_events
)
confidence = (confidence + relationship_confidence) / 2
result = {
"text": text,
"entities": entities,
"confidence": confidence,
"verification_needed": confidence < 0.6,
"related_events": [
{
"text": event[1],
"timestamp": event[2],
"confidence": event[3]
}
for event in related_events
]
}
# Store event if confidence is sufficient
if confidence >= 0.6:
self.relationship_engine.conn.execute(
'INSERT INTO events (text, timestamp, confidence) VALUES (?, ?, ?)',
(text, datetime.now().isoformat(), confidence)
)
self.relationship_engine.conn.commit()
return result
except Exception as e:
return {"error": str(e)}
# Initialize analyzer
analyzer = EventAnalyzer()
# Custom CSS for UI
css = """
.container { max-width: 1200px; margin: auto; padding: 20px; }
.results { padding: 20px; border: 1px solid #ddd; border-radius: 8px; margin-top: 20px; }
.confidence-high { color: #22c55e; font-weight: bold; }
.confidence-low { color: #f97316; font-weight: bold; }
.entity-section { margin: 15px 0; }
.alert-warning { background: #fff3cd; padding: 10px; border-radius: 5px; margin: 10px 0; }
.alert-success { background: #d1fae5; padding: 10px; border-radius: 5px; margin: 10px 0; }
.related-events { background: #f3f4f6; padding: 15px; border-radius: 5px; margin-top: 15px; }
"""
def format_results(analysis_result):
if "error" in analysis_result:
return f"<div style='color: red'>Error: {analysis_result['error']}</div>"
confidence_class = "confidence-high" if analysis_result["confidence"] >= 0.6 else "confidence-low"
html = f"""
<div class="results">
<div style="display: flex; justify-content: space-between; align-items: center; margin-bottom: 20px;">
<h3 style="margin: 0;">Analysis Results</h3>
<div>
Confidence Score: <span class="{confidence_class}">{int(analysis_result['confidence'] * 100)}%</span>
</div>
</div>
{f'''
<div class="alert-warning">
⚠️ <strong>Verification Required:</strong> Low confidence score detected. Please verify the extracted information.
</div>
''' if analysis_result["verification_needed"] else ''}
<div class="entity-section">
<h4>πŸ‘€ People Detected</h4>
<ul>{''.join(f'<li>{person}</li>' for person in analysis_result['entities']['people']) or '<li>None detected</li>'}</ul>
</div>
<div class="entity-section">
<h4>🏒 Organizations</h4>
<ul>{''.join(f'<li>{org}</li>' for org in analysis_result['entities']['organizations']) or '<li>None detected</li>'}</ul>
</div>
<div class="entity-section">
<h4>πŸ“ Locations</h4>
<ul>{''.join(f'<li>{loc}</li>' for loc in analysis_result['entities']['locations']) or '<li>None detected</li>'}</ul>
</div>
<div class="entity-section">
<h4>πŸ•’ Temporal References</h4>
<ul>{''.join(f'<li>{time}</li>' for time in analysis_result['entities']['temporal']) or '<li>None detected</li>'}</ul>
</div>
<div class="entity-section">
<h4># Hashtags</h4>
<ul>{''.join(f'<li>{tag}</li>' for tag in analysis_result['entities']['hashtags']) or '<li>None detected</li>'}</ul>
</div>
{f'''
<div class="alert-success">
βœ… <strong>Event Validated:</strong> The extracted information meets confidence thresholds.
</div>
''' if not analysis_result["verification_needed"] else ''}
{f'''
<div class="related-events">
<h4>Related Events</h4>
<ul>
{''.join(f'<li>{event["text"]} ({event["timestamp"]}) - Confidence: {int(event["confidence"] * 100)}%</li>' for event in analysis_result['related_events'])}
</ul>
</div>
''' if analysis_result.get('related_events') else ''}
</div>
"""
return html
# Modified to properly handle async
async def process_input(text):
result = await analyzer.analyze_event(text)
return format_results(result)
demo = gr.Interface(
fn=process_input,
inputs=[
gr.Textbox(
label="Event Text",
placeholder="Enter text to analyze (e.g., 'John from Tech Corp. is attending the meeting in Washington, DC tomorrow at 14:30 #tech')",
lines=3
)
],
outputs=gr.HTML(),
title="ToY Event Analysis System",
description="Analyze text to extract entities, assess confidence, and identify key event information with relationship tracking.",
css=css,
theme=gr.themes.Soft(),
examples=[
["John from Tech Corp. is attending the meeting in Washington, DC tomorrow at 14:30 #tech"],
["Sarah Johnson and Mike Smith from Defense Systems Inc. are conducting training in Norfolk, VA on June 15th #defense #training"],
["Team meeting at headquarters with @commander_smith at 0900 #briefing"]
]
)
if __name__ == "__main__":
demo.launch()