File size: 12,754 Bytes
db17bc0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 |
import os
import sys
import asyncio
from typing import List, Dict, Optional, Set
from urllib.parse import urlparse
from langchain_community.tools import DuckDuckGoSearchResults, TavilySearchResults
from langchain_community.utilities.duckduckgo_search import DuckDuckGoSearchAPIWrapper
from crawl4ai import AsyncWebCrawler, CacheMode
from crawl4ai.content_filter_strategy import PruningContentFilter, BM25ContentFilter
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
from dotenv import load_dotenv
load_dotenv()
class DeepWebCrawler:
def __init__(self,
max_search_results: int = 5,
max_external_links: int = 3,
word_count_threshold: int = 50,
content_filter_type: str = 'pruning',
filter_threshold: float = 0.48):
"""
Initialize the Deep Web Crawler with support for one-level deep crawling
Args:
max_search_results (int): Maximum number of search results to process
max_external_links (int): Maximum number of external links to crawl per page
word_count_threshold (int): Minimum word count for crawled content
content_filter_type (str): Type of content filter ('pruning' or 'bm25')
filter_threshold (float): Threshold for content filtering
"""
self.max_search_results = max_search_results
self.max_external_links = max_external_links
self.word_count_threshold = word_count_threshold
self.content_filter_type = content_filter_type
self.filter_threshold = filter_threshold
self.crawled_urls: Set[str] = set()
def _create_web_search_tool(self):
return TavilySearchResults(max_results=self.max_search_results)
def _create_content_filter(self, user_query: Optional[str] = None):
if self.content_filter_type == 'bm25' and user_query:
return BM25ContentFilter(
user_query=user_query,
bm25_threshold=self.filter_threshold
)
return PruningContentFilter(
threshold=self.filter_threshold,
threshold_type="fixed",
min_word_threshold=self.word_count_threshold
)
def _extract_links_from_search_results(self, results: List[Dict]) -> List[str]:
"""Safely extract URLs from search results"""
urls = []
for result in results:
if isinstance(result, dict) and 'url' in result:
urls.append(result['url'])
elif isinstance(result, str):
urls.append(result)
return urls
def _extract_url_from_link(self, link):
"""Extract URL string from link object which might be a dict or string"""
if isinstance(link, dict):
return link.get('url', '') # Assuming the URL is stored in a 'url' key
elif isinstance(link, str):
return link
return ''
def _process_crawl_result(self, result) -> Dict:
"""Process individual crawl result into structured format"""
return {
"url": result.url,
"success": result.success,
"title": result.metadata.get('title', 'N/A'),
"content": result.markdown_v2.raw_markdown if result.success else result.error_message,
"word_count": len(result.markdown_v2.raw_markdown.split()) if result.success else 0,
"links": {
"internal": result.links.get('internal', []),
"external": result.links.get('external', [])
},
"images": len(result.media.get('images', []))
}
async def crawl_urls(self, urls: List[str], user_query: Optional[str] = None, depth: int = 0):
"""
Crawl URLs with support for external link crawling
Args:
urls (List[str]): List of URLs to crawl
user_query (Optional[str]): Query for content filtering
depth (int): Current crawl depth (0 for initial, 1 for external links)
Returns:
List of crawl results including external link content
"""
if not urls or depth > 1:
return []
# Filter out already crawled URLs
new_urls = [url for url in urls if url not in self.crawled_urls]
if not new_urls:
return []
async with AsyncWebCrawler(
browser_type="chromium",
headless=True,
verbose=True
) as crawler:
content_filter = self._create_content_filter(user_query)
results = await crawler.arun_many(
urls=new_urls,
word_count_threshold=self.word_count_threshold,
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(content_filter=content_filter),
exclude_external_links=True,
exclude_social_media_links=True,
remove_overlay_elements=True,
simulate_user=True,
magic=True
)
processed_results = []
external_urls = set()
# Process results and collect external URLs
for result in results:
self.crawled_urls.add(result.url)
processed_result = self._process_crawl_result(result)
processed_results.append(processed_result)
if depth == 0 and result.success:
# Collect unique external URLs for further crawling
external_links = result.links.get('external', [])[:self.max_external_links]
external_urls.update(
self._extract_url_from_link(link)
for link in external_links
if self._extract_url_from_link(link)
and self._extract_url_from_link(link) not in self.crawled_urls
)
# Crawl external links if at depth 0
if depth == 0 and external_urls and False:
external_results = await self.crawl_urls(
list(external_urls),
user_query=user_query,
depth=0
)
processed_results.extend(external_results)
return processed_results
async def search_and_crawl(self, query: str) -> List[Dict]:
"""
Perform web search and deep crawl of results
Args:
query (str): Search query
Returns:
List of crawled content results including external links
"""
search_tool = self._create_web_search_tool()
search_results = search_tool.invoke(query)
# Handle different types of search results
if isinstance(search_results, str):
urls = [search_results]
elif isinstance(search_results, list):
urls = self._extract_links_from_search_results(search_results)
else:
print(f"Unexpected search results format: {type(search_results)}")
return []
if not urls:
print("No valid URLs found in search results")
return []
print(f"Initial search found {len(urls)} URLs for query: {query}")
print(urls)
crawl_results = await self.crawl_urls(urls, user_query=query)
return crawl_results
class ResourceCollectionAgent:
def __init__(self, max_results_per_query: int = 10):
"""
Initialize the Resource Collection Agent
Args:
max_results_per_query (int): Maximum number of results per search query
"""
self.max_results_per_query = max_results_per_query
self.search_tool = TavilySearchResults(max_results=max_results_per_query)
def _is_valid_domain(self, url: str, valid_domains: List[str]) -> bool:
"""Check if URL belongs to allowed domains"""
try:
domain = urlparse(url).netloc.lower()
return any(valid_domain in domain for valid_domain in valid_domains)
except:
return False
def _extract_search_result(self, result) -> Optional[Dict]:
"""Safely extract information from a search result"""
try:
if isinstance(result, dict):
return {
"title": result.get("title", "No title"),
"url": result.get("url", ""),
"snippet": result.get("snippet", "No description")
}
elif isinstance(result, str):
return {
"title": "Unknown",
"url": result,
"snippet": "No description available"
}
return None
except Exception as e:
print(f"Error processing search result: {str(e)}")
return None
async def collect_resources(self) -> Dict[str, List[Dict]]:
"""
Collect AI/ML resources from specific platforms
Returns:
Dictionary with categorized resource links
"""
search_queries = {
"datasets": [
("kaggle", "site:kaggle.com/datasets machine learning"),
("huggingface", "site:huggingface.co/datasets artificial intelligence")
],
"repositories": [
("github", "site:github.com AI tools repository")
]
}
results = {
"kaggle_datasets": [],
"huggingface_datasets": [],
"github_repositories": []
}
for category, queries in search_queries.items():
for platform, query in queries:
try:
search_results = self.search_tool.invoke(query)
# Handle different result formats
if isinstance(search_results, str):
search_results = [search_results]
elif not isinstance(search_results, list):
print(f"Unexpected search results format for {platform}: {type(search_results)}")
continue
# Filter results based on domain
valid_domains = {
"kaggle": ["kaggle.com"],
"huggingface": ["huggingface.co"],
"github": ["github.com"]
}
for result in search_results:
processed_result = self._extract_search_result(result)
if processed_result and self._is_valid_domain(
processed_result["url"],
valid_domains[platform]
):
if platform == "kaggle":
results["kaggle_datasets"].append(processed_result)
elif platform == "huggingface":
results["huggingface_datasets"].append(processed_result)
elif platform == "github":
results["github_repositories"].append(processed_result)
except Exception as e:
print(f"Error collecting {platform} resources: {str(e)}")
continue
return results
def main():
async def run_examples():
# Test DeepWebCrawler
deep_crawler = DeepWebCrawler(
max_search_results=3,
max_external_links=2,
word_count_threshold=50
)
crawl_results = await deep_crawler.search_and_crawl(
"Adani Defence & Aerospace"
)
print("\nDeep Crawler Results:")
for result in crawl_results:
print(f"URL: {result['url']}")
print(f"Title: {result['title']}")
print(f"Word Count: {result['word_count']}")
print(f"External Links: {len(result['links']['external'])}\n")
# Test ResourceCollectionAgent
resource_agent = ResourceCollectionAgent(max_results_per_query=5)
resources = await resource_agent.collect_resources()
print("\nResource Collection Results:")
for category, items in resources.items():
print(f"\n{category.upper()}:")
for item in items:
print(f"Title: {item['title']}")
print(f"URL: {item['url']}")
print("---")
asyncio.run(run_examples())
if __name__ == "__main__":
main() |