""" LynxScribe configuration and testing in LynxKite. TODO: all these outputs should contain metadata. So the next task can check the input type, etc. """ from google.cloud import storage from copy import deepcopy from enum import Enum import asyncio import pandas as pd from pydantic import BaseModel, ConfigDict import pathlib from lynxscribe.core.llm.base import get_llm_engine from lynxscribe.core.vector_store.base import get_vector_store from lynxscribe.common.config import load_config from lynxscribe.components.text.embedder import TextEmbedder from lynxscribe.core.models.embedding import Embedding from lynxscribe.components.embedding_clustering import FclusterBasedClustering from lynxscribe.components.rag.rag_graph import RAGGraph from lynxscribe.components.rag.knowledge_base_graph import PandasKnowledgeBaseGraph from lynxscribe.components.rag.rag_chatbot import Scenario, ScenarioSelector, RAGChatbot from lynxscribe.components.chat.processors import ( ChatProcessor, MaskTemplate, TruncateHistory, ) from lynxscribe.components.chat.api import ChatAPI from lynxscribe.core.models.prompts import ChatCompletionPrompt, Message from lynxscribe.components.rag.loaders import FAQTemplateLoader from lynxkite.core import ops import json from lynxkite.core.executors import one_by_one DEFAULT_NEGATIVE_ANSWER = "I'm sorry, but the data I've been trained on does not contain any information related to your question." ENV = "LynxScribe" one_by_one.register(ENV) op = ops.op_registration(ENV) output_on_top = ops.output_position(output="top") # defining the cloud provider enum class CloudProvider(str, Enum): GCP = "gcp" AWS = "aws" AZURE = "azure" class RAGVersion(str, Enum): V1 = "v1" V2 = "v2" class MessageRole(str, Enum): SYSTEM = "system" USER = "user" class RAGTemplate(BaseModel): """ Model for RAG templates consisting of three tables: they are connected via scenario names. One table (FAQs) contains scenario-denoted nodes to upsert into the knowledge base, the other two tables serve as the configuration for the scenario selector. Attributes: faq_data: Table where each row is an FAQ question, and possibly its answer pair. Will be fed into `FAQTemplateLoader.load_nodes_and_edges()`. For configuration of this table see the loader's init arguments. scenario_data: Table where each row is a Scenario, column names are thus scenario attributes. Will be fed into `ScenarioSelector.from_data()`. prompt_codes: Optional helper for the scenario table, may contain prompt code mappings to real prompt messages. It's enough then to use the codes instead of the full messages in the scenarios table. Will be fed into `ScenarioSelector.from_data()`. """ model_config = ConfigDict(arbitrary_types_allowed=True) faq_data: pd.DataFrame scenario_data: pd.DataFrame prompt_codes: dict[str, str] = {} @classmethod def from_excel_path( cls, path: str, faq_data_sheet_name: str, scenario_data_sheet_name: str, prompt_codes_sheet_name: str | None = None, ) -> "RAGTemplate": """Spawn a RAGTemplate from an Excel file containing the two needed (plus one optional) sheets.""" def transform_codes(prompt_codes: pd.DataFrame) -> dict[str, str]: """Check and transform prompt codes table into a code dictionary.""" if (len_columns := len(prompt_codes.columns)) != 2: raise ValueError( f"Prompt codes should contain exactly 2 columns, {len_columns} found." ) return prompt_codes.set_index(prompt_codes.columns[0])[ prompt_codes.columns[1] ].to_dict() return cls( faq_data=pd.read_excel(path, sheet_name=faq_data_sheet_name), scenario_data=pd.read_excel(path, sheet_name=scenario_data_sheet_name), prompt_codes=transform_codes(pd.read_excel(path, sheet_name=prompt_codes_sheet_name)) if prompt_codes_sheet_name else {}, ) @op("Cloud-sourced File Listing") def cloud_file_loader( *, cloud_provider: CloudProvider = CloudProvider.GCP, folder_URL: str = "https://storage.googleapis.com/lynxkite_public_data/lynxscribe-images/image-rag-test", accepted_file_types: str = ".jpg, .jpeg, .png", ): """ Gives back the list of URLs of all the images from a cloud-based folder. Currently only supports GCP storage. """ if folder_URL[-1].endswith("/"): folder_URL = folder_URL[:-1] accepted_file_types = tuple([t.strip() for t in accepted_file_types.split(",")]) if cloud_provider == CloudProvider.GCP: client = storage.Client() url_useful_part = folder_URL.split(".com/")[-1] bucket_name = url_useful_part.split("/")[0] if bucket_name == url_useful_part: prefix = "" else: prefix = url_useful_part.split(bucket_name + "/")[-1] bucket = client.bucket(bucket_name) blobs = bucket.list_blobs(prefix=prefix) file_urls = [blob.public_url for blob in blobs if blob.name.endswith(accepted_file_types)] return {"file_urls": file_urls} else: raise ValueError(f"Cloud provider '{cloud_provider}' is not supported.") # @output_on_top # @op("LynxScribe RAG Graph Vector Store", slow=True) # def ls_rag_graph( # *, # name: str = "faiss", # num_dimensions: int = 3072, # collection_name: str = "lynx", # text_embedder_interface: str = "openai", # text_embedder_model_name_or_path: str = "text-embedding-3-large", # # api_key_name: str = "OPENAI_API_KEY", # ): # """ # Returns with a vector store instance. # """ # # getting the text embedder instance # llm_params = {"name": text_embedder_interface} # # if api_key_name: # # llm_params["api_key"] = os.getenv(api_key_name) # llm = get_llm_engine(**llm_params) # text_embedder = TextEmbedder(llm=llm, model=text_embedder_model_name_or_path) # # getting the vector store # if name == "chromadb": # vector_store = get_vector_store(name=name, collection_name=collection_name) # elif name == "faiss": # vector_store = get_vector_store(name=name, num_dimensions=num_dimensions) # else: # raise ValueError(f"Vector store name '{name}' is not supported.") # # building up the RAG graph # rag_graph = RAGGraph( # PandasKnowledgeBaseGraph(vector_store=vector_store, text_embedder=text_embedder) # ) # return {"rag_graph": rag_graph} @op("LynxScribe Image Describer", slow=True) async def ls_image_describer( file_urls, *, llm_interface: str = "openai", llm_visual_model: str = "gpt-4o", llm_prompt_path: str = "uploads/image_description_prompts.yaml", llm_prompt_name: str = "cot_picture_descriptor", # api_key_name: str = "OPENAI_API_KEY", ): """ Returns with image descriptions from a list of image URLs. TODO: making the inputs more flexible (e.g. accepting file locations, URLs, binaries, etc.). the input dictionary should contain some meta info: e.g., what is in the list... """ # handling inputs image_urls = file_urls["file_urls"] # loading the LLM llm_params = {"name": llm_interface} # if api_key_name: # llm_params["api_key"] = os.getenv(api_key_name) llm = get_llm_engine(**llm_params) # preparing the prompts prompt_base = load_config(llm_prompt_path)[llm_prompt_name] prompt_list = [] for i in range(len(image_urls)): image = image_urls[i] _prompt = deepcopy(prompt_base) for message in _prompt: if isinstance(message["content"], list): for _message_part in message["content"]: if "image_url" in _message_part: _message_part["image_url"] = {"url": image} prompt_list.append(_prompt) # creating the prompt objects ch_prompt_list = [ ChatCompletionPrompt(model=llm_visual_model, messages=prompt) for prompt in prompt_list ] # get the image descriptions tasks = [llm.acreate_completion(completion_prompt=_prompt) for _prompt in ch_prompt_list] out_completions = await asyncio.gather(*tasks) results = [ dictionary_corrector(result.choices[0].message.content) for result in out_completions ] # getting the image descriptions (list of dictionaries {image_url: URL, description: description}) # TODO: some result class could be a better idea (will be developed in LynxScribe) image_descriptions = [ {"image_url": image_urls[i], "description": results[i]} for i in range(len(image_urls)) ] return {"image_descriptions": image_descriptions} @op("LynxScribe Image RAG Builder", slow=True) async def ls_image_rag_builder( image_descriptions, *, vdb_provider_name: str = "faiss", vdb_num_dimensions: int = 3072, vdb_collection_name: str = "lynx", text_embedder_interface: str = "openai", text_embedder_model_name_or_path: str = "text-embedding-3-large", # api_key_name: str = "OPENAI_API_KEY", ): """ Based on image descriptions, and embedding/VDB parameters, the function builds up an image RAG graph, where the nodes are the descriptions of the images (and of all image objects). In a later phase, synthetic questions and "named entities" will also be added to the graph. """ # handling inputs image_descriptions = image_descriptions["image_descriptions"] # Building up the empty RAG graph # a) Define LLM interface and get a text embedder llm_params = {"name": text_embedder_interface} # if api_key_name: # llm_params["api_key"] = os.getenv(api_key_name) llm = get_llm_engine(**llm_params) text_embedder = TextEmbedder(llm=llm, model=text_embedder_model_name_or_path) # b) getting the vector store # TODO: vdb_provider_name should be ENUM, and other parameters should appear accordingly if vdb_provider_name == "chromadb": vector_store = get_vector_store(name=vdb_provider_name, collection_name=vdb_collection_name) elif vdb_provider_name == "faiss": vector_store = get_vector_store(name=vdb_provider_name, num_dimensions=vdb_num_dimensions) else: raise ValueError(f"Vector store name '{vdb_provider_name}' is not supported.") # c) building up the RAG graph rag_graph = RAGGraph( PandasKnowledgeBaseGraph(vector_store=vector_store, text_embedder=text_embedder) ) dict_list_df = [] for image_description_tuple in image_descriptions: image_url = image_description_tuple["image_url"] image_description = image_description_tuple["description"] if "overall description" in image_description: dict_list_df.append( { "image_url": image_url, "description": image_description["overall description"], "source": "overall description", } ) if "details" in image_description: for dkey in image_description["details"].keys(): text = f"The picture's description is: {image_description['overall description']}\n\nThe description of the {dkey} is: {image_description['details'][dkey]}" dict_list_df.append( {"image_url": image_url, "description": text, "source": "details"} ) pdf_descriptions = pd.DataFrame(dict_list_df) pdf_descriptions["embedding_values"] = await text_embedder.acreate_embedding( pdf_descriptions["description"].to_list() ) pdf_descriptions["id"] = "im_" + pdf_descriptions.index.astype(str) # adding the embeddings to the RAG graph with metadata pdf_descriptions["embedding"] = pdf_descriptions.apply( lambda row: Embedding( id=row["id"], value=row["embedding_values"], metadata={ "image_url": row["image_url"], "image_part": row["source"], "type": "image_description", }, document=row["description"], ), axis=1, ) embedding_list = pdf_descriptions["embedding"].tolist() # adding the embeddings to the RAG graph rag_graph.kg_base.vector_store.upsert(embedding_list) # # saving the RAG graph # rag_graph.kg_base.save(image_rag_out_path) return {"rag_graph": rag_graph} @op("LynxScribe RAG Graph Saver") def ls_save_rag_graph( rag_graph, *, image_rag_out_path: str = "image_test_rag_graph.pickle", ): """ Saves the RAG graph to a pickle file. """ # reading inputs rag_graph = rag_graph[0]["rag_graph"] rag_graph.kg_base.save(image_rag_out_path) return None @ops.input_position(rag_graph="bottom") @op("LynxScribe Image RAG Query") async def search_context(rag_graph, text, *, top_k=3): """ top_k: which results we are showing (TODO: when the image viewer is updated w pager, change back to top k) """ message = text["text"] rag_graph = rag_graph[0]["rag_graph"] # get all similarities emb_similarities = await rag_graph.search_context( message, max_results=top_k, unique_metadata_key="image_url" ) # get the image urls, scores and descriptions result_list = [] for emb_sim in emb_similarities: image_url = emb_sim.embedding.metadata["image_url"] score = emb_sim.score description = emb_sim.embedding.document result_list.append({"image_url": image_url, "score": score, "description": description}) real_k = min(top_k, len(result_list) - 1) return {"embedding_similarities": [result_list[real_k]]} @op("LynxScribe Image Result Viewer", view="image") def view_image(embedding_similarities): """ Plotting the TOP images (from embedding similarities). TODO: later on, the user can scroll the images and send feedbacks """ embedding_similarities = embedding_similarities["embedding_similarities"] return embedding_similarities[0]["image_url"] @op("LynxScribe Text RAG Loader", slow=True) def ls_text_rag_loader( file_urls, *, input_type: RAGVersion = RAGVersion.V1, vdb_provider_name: str = "faiss", vdb_num_dimensions: int = 3072, vdb_collection_name: str = "lynx", text_embedder_interface: str = "openai", text_embedder_model_name_or_path: str = "text-embedding-3-large", # api_key_name: str = "OPENAI_API_KEY", ): """ Loading a text-based RAG graph from saved files (getting pandas readable links). """ # handling inputs file_urls = file_urls["file_urls"] # getting the text embedder instance llm_params = {"name": text_embedder_interface} # if api_key_name: # llm_params["api_key"] = os.getenv(api_key_name) llm = get_llm_engine(**llm_params) text_embedder = TextEmbedder(llm=llm, model=text_embedder_model_name_or_path) # getting the vector store if vdb_provider_name == "chromadb": vector_store = get_vector_store(name=vdb_provider_name, collection_name=vdb_collection_name) elif vdb_provider_name == "faiss": vector_store = get_vector_store(name=vdb_provider_name, num_dimensions=vdb_num_dimensions) else: raise ValueError(f"Vector store name '{vdb_provider_name}' is not supported.") # building up the RAG graph rag_graph = RAGGraph( PandasKnowledgeBaseGraph(vector_store=vector_store, text_embedder=text_embedder) ) # loading the knowledge base (temporary + TODO: adding v2) if input_type == RAGVersion.V1: node_file = [f for f in file_urls if "nodes.p" in f][0] edge_file = [f for f in file_urls if "edges.p" in f][0] tempcluster_file = [f for f in file_urls if "clusters.p" in f][0] rag_graph.kg_base.load_v1_knowledge_base( nodes_path=node_file, edges_path=edge_file, template_cluster_path=tempcluster_file, ) elif input_type == RAGVersion.V2: raise ValueError("Currently only v1 input type is supported.") else: raise ValueError(f"Input type '{input_type}' is not supported.") return {"rag_graph": rag_graph} @op("LynxScribe FAQ to RAG", slow=True) async def ls_faq_to_rag( *, faq_excel_path: str = "", vdb_provider_name: str = "faiss", vdb_num_dimensions: int = 3072, vdb_collection_name: str = "lynx", text_embedder_interface: str = "openai", text_embedder_model_name_or_path: str = "text-embedding-3-large", scenario_cluster_distance_pct: int = 30, ): """ Loading a text-based RAG graph from saved files (getting pandas readable links). """ # getting the text embedder instance llm_params = {"name": text_embedder_interface} llm = get_llm_engine(**llm_params) text_embedder = TextEmbedder(llm=llm, model=text_embedder_model_name_or_path) # getting the vector store if vdb_provider_name == "chromadb": vector_store = get_vector_store(name=vdb_provider_name, collection_name=vdb_collection_name) elif vdb_provider_name == "faiss": vector_store = get_vector_store(name=vdb_provider_name, num_dimensions=vdb_num_dimensions) else: raise ValueError(f"Vector store name '{vdb_provider_name}' is not supported.") # building up the RAG graph rag_graph = RAGGraph( PandasKnowledgeBaseGraph(vector_store=vector_store, text_embedder=text_embedder) ) # loading the knowledge base from the FAQ file rag_template = RAGTemplate.from_excel_path( path=faq_excel_path, faq_data_sheet_name="scenario_examples", scenario_data_sheet_name="scenario_scripts", prompt_codes_sheet_name="prompt_dictionary", ) faq_loader_params = { "id_column": "scenario_example_ID", "timestamp_column": "last_modified_timestamp", "validity_column": "valid_flg", "question_type_contents_id": ["faq_question", "faq_question", "q_{id}"], "answer_type_contents_id": ["faq_answer", "{faq_question}\n\n{faq_answer}", "a_{id}"], "question_to_answer_edge_type_weight": ["qna", 1.0], } nodes, edges = FAQTemplateLoader(**faq_loader_params).load_nodes_and_edges( rag_template.faq_data ) await rag_graph.kg_base.upsert_nodes(*nodes) rag_graph.kg_base.upsert_edges(edges) # Generating scenario clusters question_ids = [_id for _id in nodes[0] if _id.startswith("q_")] stored_embeddings = rag_graph.kg_base.vector_store.get( question_ids, include=["embeddings", "metadatas"] ) embedding_vals = pd.Series([_emb.value for _emb in stored_embeddings], index=question_ids) labels = pd.Series( [_emb.metadata["scenario_name"] for _emb in stored_embeddings], index=question_ids ) temp_cls = FclusterBasedClustering(distance_percentile=scenario_cluster_distance_pct) temp_cls.fit(embedding_vals, labels) df_tempclusters = temp_cls.get_cluster_centers() # Adding the scenario clusters to the RAG Graph df_tempclusters["template_id"] = "t_" + df_tempclusters.index.astype(str) df_tempclusters["embedding"] = df_tempclusters.apply( lambda row: Embedding( id=row["template_id"], value=row["cluster_center"], metadata={"scenario_name": row["control_label"], "type": "intent_cluster"}, ), axis=1, ) embedding_list = df_tempclusters["embedding"].tolist() rag_graph.kg_base.vector_store.upsert(embedding_list) return {"rag_graph": rag_graph} @output_on_top @op("LynxScribe RAG Graph Chatbot Builder") def ls_rag_chatbot_builder( rag_graph, *, scenario_file: str = "uploads/lynx_chatbot_scenario_selector.yaml", node_types: str = "intent_cluster", scenario_meta_name: str = "", ): """ Builds up a RAG Graph-based chatbot (basically the loaded RAG graph + a scenario selector). TODO: Later, the scenario selector can be built up synthetically from the input documents - or semi-automated, not just from the scenario yaml. """ scenarios = load_config(scenario_file) node_types = [t.strip() for t in node_types.split(",")] # handling inputs rag_graph = rag_graph["rag_graph"] parameters = { "scenarios": [Scenario(**scenario) for scenario in scenarios], "node_types": node_types, } if len(scenario_meta_name) > 0: parameters["get_scenario_name"] = lambda node: node.metadata[scenario_meta_name] # loading the scenarios scenario_selector = ScenarioSelector(**parameters) # TODO: later we should unify this "knowledge base" object across the functions # this could be always an input of a RAG Chatbot, but also for other apps. return { "knowledge_base": { "rag_graph": rag_graph, "scenario_selector": scenario_selector, } } @output_on_top @ops.input_position(knowledge_base="bottom", chat_processor="bottom") @op("LynxScribe RAG Graph Chatbot Backend") def ls_rag_chatbot_backend( knowledge_base, chat_processor, *, negative_answer=DEFAULT_NEGATIVE_ANSWER, retriever_limits_by_type="{}", retriever_strict_limits=True, retriever_overall_chunk_limit=20, retriever_overall_token_limit=3000, retriever_max_iterations=3, llm_interface: str = "openai", llm_model_name: str = "gpt-4o", # api_key_name: str = "OPENAI_API_KEY", ): """ Returns with a chatbot instance. """ # handling_inputs rag_graph = knowledge_base[0]["knowledge_base"]["rag_graph"] scenario_selector = knowledge_base[0]["knowledge_base"]["scenario_selector"] chat_processor = chat_processor[0]["chat_processor"] limits_by_type = json.loads(retriever_limits_by_type) # connecting to the LLM llm_params = {"name": llm_interface} # if api_key_name: # llm_params["api_key"] = os.getenv(api_key_name) llm = get_llm_engine(**llm_params) # setting the parameters params = { "limits_by_type": limits_by_type, "strict_limits": retriever_strict_limits, "max_results": retriever_overall_chunk_limit, "token_limit": retriever_overall_token_limit, "max_iterations": retriever_max_iterations, } # generating the RAG Chatbot rag_chatbot = RAGChatbot( rag_graph=rag_graph, scenario_selector=scenario_selector, llm=llm, negative_answer=negative_answer, **params, ) # generating the chatbot back-end c = ChatAPI( chatbot=rag_chatbot, chat_processor=chat_processor, model=llm_model_name, ) return {"chat_api": c} @output_on_top @ops.input_position(processor="bottom") @op("Chat processor") def chat_processor(processor, *, _ctx: one_by_one.Context): cfg = _ctx.last_result or { "question_processors": [], "answer_processors": [], "masks": [], } for f in ["question_processor", "answer_processor", "mask"]: if f in processor: cfg[f + "s"].append(processor[f]) question_processors = cfg["question_processors"][:] answer_processors = cfg["answer_processors"][:] masking_templates = {} for mask in cfg["masks"]: masking_templates[mask["name"]] = mask if masking_templates: question_processors.append(MaskTemplate(masking_templates=masking_templates)) answer_processors.append(MaskTemplate(masking_templates=masking_templates)) chat_processor = ChatProcessor( question_processors=question_processors, answer_processors=answer_processors ) return {"chat_processor": chat_processor, **cfg} @output_on_top @op("LynxScribe Message") def lynxscribe_message( *, prompt_role: MessageRole = MessageRole.SYSTEM, prompt_content: ops.LongStr ): return_message = Message(role=prompt_role.value, content=prompt_content.strip()) return {"prompt_message": return_message} @op("Read Excel") def read_excel(*, file_path: str, sheet_name: str = "Sheet1", columns: str = ""): """ Reads an Excel file and returns the content of the specified sheet. The columns parameter can be used to specify which columns to include in the output. If not specified, all columns will be included (separate the values by comma). TODO: more general: several input/output versions. """ df = pd.read_excel(file_path, sheet_name=sheet_name) if columns: columns = [c.strip() for c in columns.split(",") if c.strip()] columns = [c for c in columns if c in df.columns] if len(columns) == 0: raise ValueError("No valid columns specified.") df = df[columns].copy() return {"dataframe": df} @ops.input_position(system_prompt="bottom", instruction_prompt="bottom", dataframe="left") @op("LynxScribe Task Solver", slow=True) async def ls_task_solver( system_prompt, instruction_prompt, dataframe, *, llm_interface: str = "openai", llm_model_name: str = "gpt-4o", new_column_names: str = "processed_field", # api_key_name: str = "OPENAI_API_KEY", ): """ Solving the described task on a data frame and put the results into a new column. If there are multiple new_column_names provided, the structured dictionary output will be split into multiple columns. """ # handling inputs system_message = system_prompt[0]["prompt_message"] instruction_message = instruction_prompt[0]["prompt_message"] df = dataframe["dataframe"] # preparing output out_df = df.copy() # connecting to the LLM llm_params = {"name": llm_interface} # if api_key_name: # llm_params["api_key"] = os.getenv(api_key_name) llm = get_llm_engine(**llm_params) # getting the list of fieldnames used in the instruction message fieldnames = [] for pot_fieldname in df.columns: if "{" + pot_fieldname + "}" in instruction_message.content: fieldnames.append(pot_fieldname) # generate a list of instruction messages (from fieldnames) # each row of the df is a separate instruction message # TODO: make it fast for large dataframes instruction_messages = [] for i in range(len(df)): instruction_message_i = deepcopy(instruction_message) for fieldname in fieldnames: instruction_message_i.content = instruction_message_i.content.replace( "{" + fieldname + "}", str(df.iloc[i][fieldname]) ) instruction_messages.append(instruction_message_i) # generate completition prompt completion_prompts = [ ChatCompletionPrompt( model=llm_model_name, messages=[system_message, instruction_message_j], ) for instruction_message_j in instruction_messages ] # get the answers tasks = [llm.acreate_completion(completion_prompt=_prompt) for _prompt in completion_prompts] out_completions = await asyncio.gather(*tasks) # answer post-processing: 1 vs more columns col_list = [_c.strip() for _c in new_column_names.split(",") if _c.strip()] if len(col_list) == 0: raise ValueError("No valid column names specified.") elif len(col_list) == 1: out_df[col_list[0]] = [result.choices[0].message.content for result in out_completions] else: answers = [ dictionary_corrector(result.choices[0].message.content, expected_keys=col_list) for result in out_completions ] for i, col in enumerate(col_list): out_df[col] = [answer[col] for answer in answers] return {"dataframe": out_df} @output_on_top @op("Truncate history") def truncate_history(*, max_tokens=10000): return {"question_processor": TruncateHistory(max_tokens=max_tokens)} @output_on_top @op("Mask") def mask(*, name="", regex="", exceptions="", mask_pattern=""): exceptions = [e.strip() for e in exceptions.split(",") if e.strip()] return { "mask": { "name": name, "regex": regex, "exceptions": exceptions, "mask_pattern": mask_pattern, } } @ops.input_position(chat_api="bottom") @op("Test Chat API", slow=True) async def test_chat_api(message, chat_api, *, show_details=False): chat_api = chat_api[0]["chat_api"] request = ChatCompletionPrompt( model="", messages=[{"role": "user", "content": message["text"]}], ) response = await chat_api.answer(request, stream=False) if len(response.choices) == 0: answer = "The following FAQ items are similar to the question:\n" for item in response.sources: answer += f"------------------------------------------------------ \n{item.body}\n\n" else: answer = response.choices[0].message.content if show_details: return {"answer": answer, **response.__dict__} else: return {"answer": answer} @op("Input chat") def input_chat(*, chat: str): return {"text": chat} @ops.input_position(input="bottom") @op("View DataFrame", view="table_view") def view_df(input): df = input[0]["dataframe"] v = { "dataframes": { "df": { "columns": [str(c) for c in df.columns], "data": df.values.tolist(), } } } return v @op("View", view="table_view") def view(input): columns = [str(c) for c in input.keys() if not str(c).startswith("_")] v = { "dataframes": { "df": { "columns": columns, "data": [[input[c] for c in columns]], } } } return v async def get_chat_api(ws: str): from lynxkite.core import workspace cwd = pathlib.Path() path = cwd / (ws + ".lynxkite.json") assert path.is_relative_to(cwd), f"Path '{path}' is invalid" assert path.exists(), f"Workspace {path} does not exist" ws = workspace.Workspace.load(path) # Remove any test nodes. ws.nodes = [op for op in ws.nodes if op.data.title != "Test Chat API"] ws.normalize() executor = ops.EXECUTORS[ENV] contexts = await executor(ws) nodes = [op for op in ws.nodes if op.data.title == "LynxScribe RAG Graph Chatbot Backend"] [node] = nodes context = contexts[node.id] return context.last_result["chat_api"] async def stream_chat_api_response(request): chat_api = await get_chat_api(request["model"]) request = ChatCompletionPrompt(**request) async for chunk in await chat_api.answer(request, stream=True): chunk.sources = [] yield chunk.model_dump_json() async def get_chat_api_response(request): chat_api = await get_chat_api(request["model"]) request = ChatCompletionPrompt(**request) response = await chat_api.answer(request, stream=False) response.sources = [] return response.model_dump_json() async def api_service_post(request): """ Serves a chat endpoint that matches LynxScribe's interface. To access it you need to add the "module" and "workspace" parameters. The workspace must contain exactly one "Chat API" node. curl -X POST ${LYNXKITE_URL}/api/service/server.lynxkite_ops \ -H "Content-Type: application/json" \ -d '{ "model": "LynxScribe demo", "messages": [{"role": "user", "content": "what does the fox say"}] }' """ path = "/".join(request.url.path.split("/")[4:]) request = await request.json() if path == "chat/completions": if request["stream"]: from sse_starlette.sse import EventSourceResponse return EventSourceResponse(stream_chat_api_response(request)) else: return await get_chat_api_response(request) return {"error": "Not found"} async def api_service_get(request): path = "/".join(request.url.path.split("/")[4:]) if path == "models": return { "object": "list", "data": [ { "id": ws.removesuffix(".lynxkite.json"), "object": "model", "created": 0, "owned_by": "lynxkite", "meta": {"profile_image_url": "https://lynxkite.com/favicon.png"}, } for ws in get_lynxscribe_workspaces() ], } return {"error": "Not found"} def get_lynxscribe_workspaces() -> list[str]: from lynxkite.core import workspace workspaces = [] for p in pathlib.Path().glob("**/*"): if p.is_file(): try: ws = workspace.Workspace.load(p) if ws.env == ENV: workspaces.append(str(p)) except Exception: pass # Ignore files that are not valid workspaces. workspaces.sort() return workspaces def dictionary_corrector(dict_string: str, expected_keys: list | None = None) -> dict: """ Processing LLM outputs: when the LLM returns with a dictionary (in a string format). It optionally crosschecks the input with the expected keys and return a dictionary with the expected keys and their values ('unknown' if not present). If there is an error during the processing, it will return with a dictionary of the expected keys, all with 'error' as a value (or with an empty dictionary). Currently the function does not delete the extra key-value pairs. """ out_dict = {} if len(dict_string) == 0: return out_dict # deleting the optional text before the first and after the last curly brackets dstring_prc = dict_string if dstring_prc[0] != "{": dstring_prc = "{" + "{".join(dstring_prc.split("{")[1:]) if dstring_prc[-1] != "}": dstring_prc = "}".join(dstring_prc.split("}")[:-1]) + "}" try: trf_dict = json.loads(dstring_prc) if expected_keys: for _key in expected_keys: if _key in trf_dict: out_dict[_key] = trf_dict[_key] else: out_dict[_key] = "unknown" else: out_dict = trf_dict except Exception: if expected_keys: for _key in expected_keys: out_dict[_key] = "error" return out_dict