from fastapi import FastAPI, HTTPException from pydantic import BaseModel import requests import os from datetime import datetime, timedelta from groq import Groq from dotenv import load_dotenv import logging import asyncio import aiohttp from concurrent.futures import ThreadPoolExecutor from functools import partial # Configure logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() # Configuration GITHUB_TOKEN = os.getenv("GITHUB_TOKEN", "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") GROQ_API_KEY = os.getenv("GROQ_API_KEY", "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") REPOSITORIES = [ "falcosecurity/rules", "SigmaHQ/sigma", "reversinglabs/reversinglabs-yara-rules", "elastic/detection-rules", "sublime-security/sublime-rules", "Yamato-Security/hayabusa-rules", "anvilogic-forge/armory", "chainguard-dev/osquery-defense-kit", "splunk/security_content", "Neo23x0/signature-base", "SlimKQL/Hunting-Queries-Detection-Rules" ] DAYS_BACK = 1 TIMEOUT = 10 # seconds for each request # GitHub API base URL GITHUB_API_URL = "https://api.github.com" # Groq client setup groq_client = Groq(api_key=GROQ_API_KEY) # FastAPI app app = FastAPI(docs_url=None, redoc_url=None) class RepositoryDetails(BaseModel): repo_name: str repo_url: str changes: str description: str context: str async def fetch_with_aiohttp(url: str, headers: dict, params: dict, session: aiohttp.ClientSession) -> dict: """Asynchronous HTTP GET request with timeout.""" try: async with session.get(url, headers=headers, params=params, timeout=aiohttp.ClientTimeout(total=TIMEOUT)) as response: response.raise_for_status() return await response.json() except Exception as e: logger.error(f"Error fetching {url}: {e}") return [] async def fetch_repository_changes(repo: str, days_back: int) -> list[str]: """Fetch recent commits and pull requests for a repository asynchronously.""" try: logger.debug(f"Fetching changes for repository: {repo}") since_date = (datetime.now() - timedelta(days=days_back)).isoformat() headers = { "Authorization": f"token {GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json" } async with aiohttp.ClientSession() as session: # Fetch commits commits_url = f"{GITHUB_API_URL}/repos/{repo}/commits" commits_params = {"since": since_date, "per_page": 100} # Limit to 100 for efficiency logger.debug(f"Fetching commits from: {commits_url}") commits = await fetch_with_aiohttp(commits_url, headers, commits_params, session) logger.debug(f"Found {len(commits)} commits for {repo}") # Fetch pull requests prs_url = f"{GITHUB_API_URL}/repos/{repo}/pulls" prs_params = {"state": "all", "sort": "updated", "direction": "desc", "per_page": 100} logger.debug(f"Fetching pull requests from: {prs_url}") prs = await fetch_with_aiohttp(prs_url, headers, prs_params, session) logger.debug(f"Found {len(prs)} pull requests for {repo}") # Extract changes changes = [] for commit in commits: changes.append(f"Commit: {commit['commit']['message']}") cutoff_date = datetime.now() - timedelta(days=days_back) for pr in prs: updated_at = datetime.strptime(pr["updated_at"], "%Y-%m-%dT%H:%M:%SZ") if updated_at >= cutoff_date: changes.append(f"PR: {pr['title']} - {pr['body'] or 'No description'}") logger.debug(f"Total changes for {repo}: {len(changes)}") return changes except Exception as e: logger.error(f"Error fetching changes for {repo}: {e}") return [] def summarize_changes_with_deepseek(repo: str, changes: list[str]) -> dict: """Use Groq's DeepSeek model to summarize changes (synchronous due to Groq SDK limitations).""" try: logger.debug(f"Summarizing changes for repository: {repo}") prompt = f""" Analyze the following changes made to detection rules in the GitHub repository {repo}: {', '.join(changes[:50])} # Limit to 50 changes to avoid token overflow Provide a detailed response with two sections: - Description: Summarize what changes were made. - Context: Explain why these changes might be required. """ logger.debug(f"Sending prompt to DeepSeek: {prompt[:100]}...") response = groq_client.chat.completions.create( model="deepseek-chat", messages=[{"role": "user", "content": prompt}], max_tokens=500, temperature=0.7 ) summary = response.choices[0].message.content logger.debug(f"Received summary from DeepSeek: {summary[:100]}...") # Extract description and context with fallback description = "Description not found." context = "Context not found." if "Description:" in summary and "Context:" in summary: description = summary.split("Description:")[1].split("Context:")[0].strip() context = summary.split("Context:")[1].strip() else: description = summary return {"description": description, "context": context} except Exception as e: logger.error(f"Error summarizing changes for {repo}: {e}") return {"description": "Error occurred.", "context": "Error occurred."} async def process_repository(repo: str, days_back: int) -> RepositoryDetails: """Process a single repository and return its details.""" changes = await fetch_repository_changes(repo, days_back) if changes: summary = await asyncio.get_event_loop().run_in_executor( None, partial(summarize_changes_with_deepseek, repo, changes) ) return RepositoryDetails( repo_name=f"{repo} (+{len(changes)})", repo_url=f"https://github.com/{repo}", changes="\n".join(changes), description=summary["description"], context=summary["context"] ) else: return RepositoryDetails( repo_name=f"{repo} (No changes)", repo_url=f"https://github.com/{repo}", changes=f"No changes detected in the last {days_back} day(s).", description="No changes detected.", context="No context available." ) @app.get("/monitor", response_model=list[RepositoryDetails]) async def monitor_repositories(): """Single API endpoint to fetch and summarize changes for all repositories.""" try: logger.debug("Starting to monitor repositories") tasks = [process_repository(repo, DAYS_BACK) for repo in REPOSITORIES] results = await asyncio.gather(*tasks, return_exceptions=True) # Handle any exceptions in results final_results = [] for result in results: if isinstance(result, Exception): logger.error(f"Error processing a repository: {result}") continue final_results.append(result) logger.debug("Finished monitoring repositories") return final_results except Exception as e: logger.error(f"Error in monitor_repositories: {e}") raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)