Spaces:
Runtime error
Runtime error
"""Meta-learning reasoning implementation with advanced adaptation capabilities.""" | |
import logging | |
from typing import Dict, Any, List, Optional, Set, Tuple, Callable | |
import json | |
from dataclasses import dataclass, field | |
from enum import Enum | |
from collections import defaultdict | |
import numpy as np | |
from datetime import datetime | |
from .base import ReasoningStrategy | |
class MetaFeatureType(Enum): | |
"""Types of meta-features for learning.""" | |
PROBLEM_STRUCTURE = "problem_structure" | |
SOLUTION_PATTERN = "solution_pattern" | |
REASONING_STYLE = "reasoning_style" | |
ERROR_PATTERN = "error_pattern" | |
PERFORMANCE_METRIC = "performance_metric" | |
ADAPTATION_SIGNAL = "adaptation_signal" | |
class MetaFeature: | |
"""Represents a meta-feature for learning.""" | |
type: MetaFeatureType | |
name: str | |
value: Any | |
confidence: float | |
timestamp: datetime | |
metadata: Dict[str, Any] = field(default_factory=dict) | |
class LearningEpisode: | |
"""Represents a learning episode.""" | |
id: str | |
query: str | |
features: List[MetaFeature] | |
outcome: Dict[str, Any] | |
performance: float | |
timestamp: datetime | |
metadata: Dict[str, Any] = field(default_factory=dict) | |
class MetaLearningStrategy(ReasoningStrategy): | |
""" | |
Advanced Meta-Learning reasoning implementation with: | |
- Dynamic strategy adaptation | |
- Performance tracking | |
- Pattern recognition | |
- Automated optimization | |
- Cross-episode learning | |
""" | |
def __init__(self, | |
learning_rate: float = 0.1, | |
memory_size: int = 1000, | |
adaptation_threshold: float = 0.7, | |
exploration_rate: float = 0.2): | |
self.learning_rate = learning_rate | |
self.memory_size = memory_size | |
self.adaptation_threshold = adaptation_threshold | |
self.exploration_rate = exploration_rate | |
# Learning components | |
self.episode_memory: List[LearningEpisode] = [] | |
self.feature_patterns: Dict[str, Dict[str, float]] = defaultdict(lambda: defaultdict(float)) | |
self.strategy_performance: Dict[str, List[float]] = defaultdict(list) | |
self.adaptation_history: List[Dict[str, Any]] = [] | |
# Performance tracking | |
self.success_rate: float = 0.0 | |
self.adaptation_rate: float = 0.0 | |
self.exploration_count: int = 0 | |
async def reason(self, query: str, context: Dict[str, Any]) -> Dict[str, Any]: | |
"""Main reasoning method implementing meta-learning.""" | |
try: | |
# Extract meta-features | |
features = await self._extract_meta_features(query, context) | |
# Select optimal strategy | |
strategy = await self._select_strategy(features, context) | |
# Apply strategy with adaptation | |
result = await self._apply_strategy(strategy, query, features, context) | |
# Learn from episode | |
episode = self._create_episode(query, features, result) | |
self._learn_from_episode(episode) | |
# Optimize performance | |
self._optimize_performance() | |
return { | |
"success": True, | |
"answer": result["answer"], | |
"confidence": result["confidence"], | |
"meta_features": [self._feature_to_dict(f) for f in features], | |
"selected_strategy": strategy, | |
"adaptations": result["adaptations"], | |
"performance_metrics": result["performance_metrics"], | |
"meta_insights": result["meta_insights"] | |
} | |
except Exception as e: | |
logging.error(f"Error in meta-learning reasoning: {str(e)}") | |
return {"success": False, "error": str(e)} | |
async def _extract_meta_features(self, query: str, context: Dict[str, Any]) -> List[MetaFeature]: | |
"""Extract meta-features from query and context.""" | |
prompt = f""" | |
Extract meta-features for learning: | |
Query: {query} | |
Context: {json.dumps(context)} | |
For each feature type: | |
1. Problem Structure | |
2. Solution Patterns | |
3. Reasoning Style | |
4. Error Patterns | |
5. Performance Metrics | |
6. Adaptation Signals | |
Format as: | |
[Type1] | |
Name: ... | |
Value: ... | |
Confidence: ... | |
Metadata: ... | |
[Type2] | |
... | |
""" | |
response = await context["groq_api"].predict(prompt) | |
return self._parse_meta_features(response["answer"]) | |
async def _select_strategy(self, features: List[MetaFeature], context: Dict[str, Any]) -> str: | |
"""Select optimal reasoning strategy based on meta-features.""" | |
prompt = f""" | |
Select optimal reasoning strategy: | |
Features: {json.dumps([self._feature_to_dict(f) for f in features])} | |
Context: {json.dumps(context)} | |
Consider: | |
1. Past performance patterns | |
2. Feature relevance | |
3. Adaptation potential | |
4. Resource constraints | |
Format as: | |
[Selection] | |
Strategy: ... | |
Rationale: ... | |
Confidence: ... | |
Adaptations: ... | |
""" | |
response = await context["groq_api"].predict(prompt) | |
return self._parse_strategy_selection(response["answer"]) | |
async def _apply_strategy(self, strategy: str, query: str, features: List[MetaFeature], context: Dict[str, Any]) -> Dict[str, Any]: | |
"""Apply selected strategy with dynamic adaptation.""" | |
prompt = f""" | |
Apply strategy with meta-learning: | |
Strategy: {strategy} | |
Query: {query} | |
Features: {json.dumps([self._feature_to_dict(f) for f in features])} | |
Context: {json.dumps(context)} | |
Provide: | |
1. Main reasoning steps | |
2. Adaptation points | |
3. Performance metrics | |
4. Meta-insights | |
Format as: | |
[Application] | |
Steps: ... | |
Adaptations: ... | |
Metrics: ... | |
Insights: ... | |
[Result] | |
Answer: ... | |
Confidence: ... | |
""" | |
response = await context["groq_api"].predict(prompt) | |
return self._parse_strategy_application(response["answer"]) | |
def _create_episode(self, query: str, features: List[MetaFeature], result: Dict[str, Any]) -> LearningEpisode: | |
"""Create a learning episode from the current interaction.""" | |
return LearningEpisode( | |
id=f"episode_{len(self.episode_memory)}", | |
query=query, | |
features=features, | |
outcome=result, | |
performance=result.get("confidence", 0.0), | |
timestamp=datetime.now(), | |
metadata={ | |
"adaptations": result.get("adaptations", []), | |
"metrics": result.get("performance_metrics", {}) | |
} | |
) | |
def _learn_from_episode(self, episode: LearningEpisode): | |
"""Learn from a completed episode.""" | |
# Update episode memory | |
self.episode_memory.append(episode) | |
if len(self.episode_memory) > self.memory_size: | |
self.episode_memory.pop(0) | |
# Update feature patterns | |
for feature in episode.features: | |
pattern_key = f"{feature.type.value}:{feature.name}" | |
self.feature_patterns[pattern_key]["count"] += 1 | |
self.feature_patterns[pattern_key]["success"] += episode.performance | |
# Update strategy performance | |
strategy = episode.metadata.get("selected_strategy", "default") | |
self.strategy_performance[strategy].append(episode.performance) | |
# Track adaptations | |
self.adaptation_history.append({ | |
"timestamp": episode.timestamp, | |
"adaptations": episode.metadata.get("adaptations", []), | |
"performance": episode.performance | |
}) | |
# Update performance metrics | |
self._update_performance_metrics(episode) | |
def _optimize_performance(self): | |
"""Optimize meta-learning performance.""" | |
# Adjust learning rate | |
recent_performance = [e.performance for e in self.episode_memory[-10:]] | |
if recent_performance: | |
avg_performance = sum(recent_performance) / len(recent_performance) | |
if avg_performance > 0.8: | |
self.learning_rate *= 0.9 # Reduce learning rate when performing well | |
elif avg_performance < 0.5: | |
self.learning_rate *= 1.1 # Increase learning rate when performing poorly | |
# Adjust exploration rate | |
self.exploration_rate = max(0.1, self.exploration_rate * 0.995) # Gradually reduce exploration | |
# Prune ineffective patterns | |
for pattern, stats in list(self.feature_patterns.items()): | |
if stats["count"] > 10 and stats["success"] / stats["count"] < 0.3: | |
del self.feature_patterns[pattern] | |
# Update adaptation threshold | |
recent_adaptations = [a["performance"] for a in self.adaptation_history[-10:]] | |
if recent_adaptations: | |
self.adaptation_threshold = sum(recent_adaptations) / len(recent_adaptations) | |
def _update_performance_metrics(self, episode: LearningEpisode): | |
"""Update performance tracking metrics.""" | |
# Update success rate | |
self.success_rate = (self.success_rate * len(self.episode_memory) + episode.performance) / (len(self.episode_memory) + 1) | |
# Update adaptation rate | |
adaptations = len(episode.metadata.get("adaptations", [])) | |
self.adaptation_rate = (self.adaptation_rate * len(self.adaptation_history) + (adaptations > 0)) / (len(self.adaptation_history) + 1) | |
# Track exploration | |
if episode.metadata.get("exploration", False): | |
self.exploration_count += 1 | |
def _parse_meta_features(self, response: str) -> List[MetaFeature]: | |
"""Parse meta-features from response.""" | |
features = [] | |
current_type = None | |
current_feature = None | |
for line in response.split('\n'): | |
line = line.strip() | |
if not line: | |
continue | |
if line.startswith('[Type'): | |
if current_feature: | |
features.append(current_feature) | |
current_feature = None | |
try: | |
type_str = line[1:-1].lower() | |
current_type = MetaFeatureType(type_str) | |
except ValueError: | |
current_type = None | |
elif current_type and line.startswith('Name:'): | |
current_feature = MetaFeature( | |
type=current_type, | |
name=line[5:].strip(), | |
value=None, | |
confidence=0.0, | |
timestamp=datetime.now(), | |
metadata={} | |
) | |
elif current_feature: | |
if line.startswith('Value:'): | |
current_feature.value = line[6:].strip() | |
elif line.startswith('Confidence:'): | |
try: | |
current_feature.confidence = float(line[11:].strip()) | |
except: | |
pass | |
elif line.startswith('Metadata:'): | |
try: | |
current_feature.metadata = json.loads(line[9:].strip()) | |
except: | |
pass | |
if current_feature: | |
features.append(current_feature) | |
return features | |
def _parse_strategy_selection(self, response: str) -> str: | |
"""Parse strategy selection from response.""" | |
lines = response.split('\n') | |
strategy = "default" | |
for line in lines: | |
if line.startswith('Strategy:'): | |
strategy = line[9:].strip() | |
break | |
return strategy | |
def _parse_strategy_application(self, response: str) -> Dict[str, Any]: | |
"""Parse strategy application results.""" | |
result = { | |
"answer": "", | |
"confidence": 0.0, | |
"steps": [], | |
"adaptations": [], | |
"performance_metrics": {}, | |
"meta_insights": [] | |
} | |
section = None | |
for line in response.split('\n'): | |
line = line.strip() | |
if not line: | |
continue | |
if line.startswith('[Application]'): | |
section = "application" | |
elif line.startswith('[Result]'): | |
section = "result" | |
elif section == "application": | |
if line.startswith('Steps:'): | |
result["steps"] = [s.strip() for s in line[6:].split(',')] | |
elif line.startswith('Adaptations:'): | |
result["adaptations"] = [a.strip() for a in line[12:].split(',')] | |
elif line.startswith('Metrics:'): | |
try: | |
result["performance_metrics"] = json.loads(line[8:].strip()) | |
except: | |
pass | |
elif line.startswith('Insights:'): | |
result["meta_insights"] = [i.strip() for i in line[9:].split(',')] | |
elif section == "result": | |
if line.startswith('Answer:'): | |
result["answer"] = line[7:].strip() | |
elif line.startswith('Confidence:'): | |
try: | |
result["confidence"] = float(line[11:].strip()) | |
except: | |
result["confidence"] = 0.5 | |
return result | |
def _feature_to_dict(self, feature: MetaFeature) -> Dict[str, Any]: | |
"""Convert feature to dictionary for serialization.""" | |
return { | |
"type": feature.type.value, | |
"name": feature.name, | |
"value": feature.value, | |
"confidence": feature.confidence, | |
"timestamp": feature.timestamp.isoformat(), | |
"metadata": feature.metadata | |
} | |
def get_performance_metrics(self) -> Dict[str, Any]: | |
"""Get current performance metrics.""" | |
return { | |
"success_rate": self.success_rate, | |
"adaptation_rate": self.adaptation_rate, | |
"exploration_count": self.exploration_count, | |
"episode_count": len(self.episode_memory), | |
"pattern_count": len(self.feature_patterns), | |
"learning_rate": self.learning_rate, | |
"exploration_rate": self.exploration_rate | |
} | |
def get_top_patterns(self, n: int = 10) -> List[Tuple[str, float]]: | |
"""Get top performing patterns.""" | |
pattern_scores = [] | |
for pattern, stats in self.feature_patterns.items(): | |
if stats["count"] > 0: | |
score = stats["success"] / stats["count"] | |
pattern_scores.append((pattern, score)) | |
return sorted(pattern_scores, key=lambda x: x[1], reverse=True)[:n] | |
def clear_memory(self): | |
"""Clear learning memory.""" | |
self.episode_memory.clear() | |
self.feature_patterns.clear() | |
self.strategy_performance.clear() | |
self.adaptation_history.clear() | |