Spaces:
Sleeping
Sleeping
| # from operator import add | |
| # from re import search | |
| from typing import TypedDict, Annotated | |
| from langgraph.graph.message import add_messages | |
| #from langchain_core.messages import AnyMessage, HumanMessage, AIMessage | |
| from langgraph.prebuilt import ToolNode | |
| from langgraph.graph import START, StateGraph | |
| from langgraph.prebuilt import tools_condition | |
| from langchain_huggingface import HuggingFaceEndpoint, ChatHuggingFace | |
| # from langchain_community.llms.ollama import Ollama | |
| from langchain_community.tools import DuckDuckGoSearchRun | |
| import os | |
| #:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::# | |
| from dotenv import load_dotenv | |
| # from langgraph.graph import START, StateGraph, MessagesState | |
| from langgraph.graph import START, StateGraph | |
| from langgraph.prebuilt import tools_condition | |
| from langgraph.prebuilt import ToolNode | |
| # from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint, HuggingFaceEmbeddings | |
| from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint | |
| from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage | |
| from langchain_core.messages import AnyMessage | |
| from langchain_core.tools import Tool | |
| from googleapiclient.discovery import build | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| from urllib.parse import parse_qs, urlparse | |
| from openai import OpenAI | |
| import pandas as pd | |
| import chess | |
| import chess.engine | |
| # import tempfile | |
| # from PIL import Image | |
| #from tavily import TavilyClient | |
| load_dotenv() | |
| google_key = os.getenv("GOOGLE_SECRET_KEY") | |
| my_search_engine_id = os.getenv("Google_WebSearch_Engine") | |
| #TAVILY_API_KEY = os.getenv("TAVILY_API_KEY") | |
| # client = TavilyClient(TAVILY_API_KEY) | |
| OpenAI_key = os.getenv("OPENAI_API_KEY") | |
| client = OpenAI(api_key=OpenAI_key) | |
| yt_ap = YouTubeTranscriptApi() | |
| #wikipedia.set_lang("en") | |
| #:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::# | |
| api_key = os.getenv("HF_TOKEN") | |
| # search_tool = DuckDuckGoSearchRun() | |
| # Generate the chat interface, including the tools | |
| # llm = HuggingFaceEndpoint( | |
| # repo_id="deepseek-ai/DeepSeek-R1-0528", | |
| # huggingfacehub_api_token=api_key, | |
| # timeout=300, | |
| # ) | |
| llm = HuggingFaceEndpoint( | |
| repo_id="deepseek-ai/DeepSeek-V3.1-Terminus", | |
| huggingfacehub_api_token=api_key, | |
| timeout=500 | |
| ) | |
| # # Initialize local Ollama model | |
| # llm Ollama(model="qwen2.5-coder", base_url="http://127.0.0.1:11434") | |
| #:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::# | |
| def custom_multiply(__arg1: str) -> int: | |
| # Expect something like "5,3" | |
| a, b = map(int, __arg1.split(",")) | |
| return a * b | |
| custom_multiply_tool = Tool( | |
| name="custom_multiply", | |
| func=custom_multiply, | |
| description="Multiplies two numbers extracted from a string then returns the result.", | |
| ) | |
| def web_search(input: str) -> str: | |
| """Search Tavily for a query and return maximum 3 results. | |
| Args: | |
| query: The search query.""" | |
| service = build("customsearch", "v1", developerKey=google_key) | |
| result = service.cse().list(q=input, cx=my_search_engine_id, num=4).execute() | |
| formatted_docs = [] | |
| for doc in result.get("items", []): | |
| content = doc.get("snippet", "No content available.") | |
| source = doc.get("link", "No URL available.") | |
| # Creating the desired XML-like output format | |
| formatted_doc = ( | |
| f'<Document source="{source}">\n' | |
| f'{content}\n' | |
| f'</Document>' | |
| ) | |
| formatted_docs.append(formatted_doc) | |
| formatted_search_docs = "\n\n---\n\n".join(formatted_docs) | |
| return formatted_search_docs | |
| web_search_tool = Tool( | |
| name="web_search", | |
| func=web_search, | |
| description="Useful for searching the web for relevant information to answer questions.", | |
| ) | |
| def extract_video_id(url: str) -> str: | |
| """ | |
| Extracts the video ID from a YouTube URL. | |
| Args: | |
| url (str): The full YouTube video URL. | |
| Returns: | |
| str: The extracted video ID or raises ValueError. | |
| """ | |
| parsed = urlparse(url) | |
| if parsed.hostname in {"www.youtube.com", "youtube.com"}: | |
| qs = parse_qs(parsed.query) | |
| if "v" in qs: | |
| return qs["v"][0] | |
| # fallback for youtu.be or raw IDs | |
| return parsed.path.lstrip("/") | |
| def fetch_youtube_details(video_url: str) -> str: | |
| """ | |
| Fetches the transcript text for a given YouTube video. | |
| Use the extracted transcript to answer questions about the video. | |
| Args: | |
| video_url (str): The YouTube video URL. | |
| Returns: | |
| str: Combined transcript text or an error message. | |
| """ | |
| video_id = extract_video_id(video_url) | |
| try: | |
| # ✅ call on the class, NOT an instance | |
| transcript_data = yt_ap.fetch( | |
| video_id=video_id, | |
| languages=["en"], #You can add as many languages, use yt_ap.list(video_id) function to get the langauges | |
| ) | |
| #FROM TRANSCRIPT DATA, YOU CAN CREATE A OBJECT OF TRANSCRIPT SNIPET AND TIME | |
| arr = [ {"text": snippet.text} for snippet in transcript_data] | |
| return " ".join(f"{entry['text']}" for entry in arr) | |
| except Exception as e: | |
| return f"Error fetching video details: {str(e)}" | |
| fetch_youtube_details_tool = Tool( | |
| name="fetch_youtube_details", | |
| func=fetch_youtube_details, | |
| description="Fetches details from a YouTube video, including its transcript.", | |
| ) | |
| def transcribe_audio(audio_file_path: str) -> str: | |
| """ | |
| Transcribes speech from an audio file using OpenAI Whisper. | |
| Use the extracted transcript to answer questions about the video. | |
| Args: | |
| audio_file_path | |
| Returns: | |
| str: Combined transcript text or an error message. | |
| """ | |
| """Transcribe a .wav file using OpenAI Whisper.""" | |
| with open(audio_file_path, "rb") as audio_file: | |
| response = client.audio.transcriptions.create( | |
| model="whisper-1", # or "whisper-1" if available gpt-4o-transcribe | |
| file=audio_file | |
| ) | |
| return response.text | |
| transcribe_audio_tool = Tool( | |
| name="transcribe_audio", | |
| func=transcribe_audio, | |
| description="Transcribes audio from a file using OpenAI Whisper.", | |
| ) | |
| def df_to_column_row_map(df): | |
| """ | |
| Convert a pandas DataFrame into the format: | |
| [ | |
| { | |
| column1: {row1: value, row2: value, ...}, | |
| column2: {row1: value, row2: value, ...}, | |
| ... | |
| } | |
| ] | |
| """ | |
| result = {} | |
| for col in df.columns: | |
| # Create row mapping like {row1: val1, row2: val2, ...} | |
| col_dict = {f"row{i+1}": df.iloc[i][col] for i in range(len(df))} | |
| result[col] = col_dict | |
| return [result] | |
| def excel_csv_reader(file_path: str, query: str = "") -> str: | |
| """ | |
| Reads a CSV or Excel file and returns its contents as a dictionary array. | |
| If no file path or filename is provided, the agent must: | |
| 1. Infer whether the user is referring to a CSV or Excel file based on context. | |
| 2. Search the workspace for a matching file (e.g., any .csv or .xlsx file). | |
| 3. If multiple matches exist, choose the most relevant one based on the query. | |
| Default fallback paths: | |
| - /<inferred>.csv | |
| - /<inferred>.xlsx | |
| Args: | |
| file_path (str, optional): The full path or filename. If omitted, the agent must auto-detect the correct file. | |
| Returns: | |
| str: The parsed content as a dictionary array or an explicit error message if the file cannot be found or inferred. | |
| """ | |
| try: | |
| _, ext = os.path.splitext(file_path.lower()) | |
| if ext == ".csv": | |
| df = pd.read_csv(file_path) | |
| elif ext in [".xls", ".xlsx"]: | |
| df = pd.read_excel(file_path) | |
| else: | |
| return "Unsupported file format. Please upload CSV or Excel." | |
| if df.empty: | |
| return "The file is empty or unreadable." | |
| return df_to_column_row_map(df) | |
| except Exception as e: | |
| return f"Error reading file: {str(e)}" | |
| excel_csv_reader_tool = Tool( | |
| name="excel_csv_reader", | |
| func=excel_csv_reader, | |
| description="Reads and summarizes data from Excel or CSV files.", | |
| ) | |
| STOCKFISH_PATH = "/usr/local/Cellar/stockfish/17.1/bin/stockfish" | |
| def analyze_position_from_fen(fen: str, time_limit: float = 1.0) -> str: | |
| """ | |
| Uses Stockfish to analyze the best move from a given FEN string. | |
| Args: | |
| fen (str): Forsyth–Edwards Notation of the board. | |
| time_limit (float): Time to let Stockfish think. | |
| Returns: | |
| str: Best move in algebraic notation. | |
| """ | |
| try: | |
| board = chess.Board(fen) | |
| engine = chess.engine.SimpleEngine.popen_uci(STOCKFISH_PATH) | |
| result = engine.play(board, chess.engine.Limit(time=time_limit)) | |
| engine.quit() | |
| return board.san(result.move) | |
| except Exception as e: | |
| return f"Stockfish error: {e}" | |
| def solve_chess_image(image_path: str) -> str: | |
| """ | |
| Stub function for image-to-FEN. Replace with actual OCR/vision logic. | |
| Args: | |
| image_path (str): Path to chessboard image. | |
| Returns: | |
| str: Best move or error. | |
| """ | |
| # Placeholder FEN for development (e.g., black to move, guaranteed mate) | |
| sample_fen = "6k1/5ppp/8/8/8/8/5PPP/6K1 b - - 0 1" | |
| try: | |
| print(f"Simulating FEN extraction from image: {image_path}") | |
| # Replace the above with actual OCR image-to-FEN logic | |
| best_move = analyze_position_from_fen(sample_fen) | |
| return f"Detected FEN: {sample_fen}\nBest move for Black: {best_move}" | |
| except Exception as e: | |
| return f"Image analysis error: {e}" | |
| solve_chess_image_tool = Tool( | |
| name="solve_chess_image", | |
| func=solve_chess_image, | |
| description="Analyzes a chess position from an image and suggests the best move.", | |
| ) | |
| #:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::# | |
| chat = ChatHuggingFace(llm=llm, verbose=True) | |
| tools = [custom_multiply_tool, web_search_tool, fetch_youtube_details_tool, transcribe_audio_tool, excel_csv_reader_tool, solve_chess_image_tool] | |
| # chat_with_tools = chat.bind_tools(tools) | |
| # # Generate the AgentState and Agent graph | |
| class AgentState(TypedDict): | |
| messages: Annotated[list[AnyMessage], add_messages] | |
| def build_graph(): | |
| chat_with_tools = chat.bind_tools(tools) | |
| def assistant(state: AgentState): | |
| return { | |
| "messages": [chat_with_tools.invoke(state["messages"])], | |
| } | |
| ## The graph | |
| builder = StateGraph(AgentState) | |
| # Define nodes: these do the work | |
| builder.add_node("assistant", assistant) | |
| builder.add_node("tools", ToolNode(tools)) | |
| # Define edges: these determine how the control flow moves | |
| builder.add_edge(START, "assistant") | |
| builder.add_conditional_edges( | |
| "assistant", | |
| # If the latest message requires a tool, route to tools | |
| # Otherwise, provide a direct response | |
| tools_condition, | |
| ) | |
| builder.add_edge("tools", "assistant") | |
| return builder.compile() | |