Spaces:
Running
Running
""" | |
LynxScribe configuration and testing in LynxKite. | |
""" | |
from lynxscribe.core.llm.base import get_llm_engine | |
from lynxscribe.core.vector_store.base import get_vector_store | |
from lynxscribe.common.config import load_config | |
from lynxscribe.components.text_embedder import TextEmbedder | |
from lynxscribe.components.rag.rag_graph import RAGGraph | |
from lynxscribe.components.rag.knowledge_base_graph import PandasKnowledgeBaseGraph | |
from lynxscribe.components.rag.rag_chatbot import Scenario, ScenarioSelector, RAGChatbot | |
from lynxscribe.components.chat_processor.base import ChatProcessor | |
from lynxscribe.components.chat_processor.processors import ( | |
MaskTemplate, | |
TruncateHistory, | |
) | |
from lynxscribe.components.chat_api import ChatAPI, ChatAPIRequest, ChatAPIResponse | |
from . import ops | |
import asyncio | |
import json | |
from .executors import one_by_one | |
ENV = "LynxScribe" | |
one_by_one.register(ENV) | |
op = ops.op_registration(ENV) | |
output_on_top = ops.output_position(output="top") | |
def vector_store(*, name="chromadb", collection_name="lynx"): | |
vector_store = get_vector_store(name=name, collection_name=collection_name) | |
return {"vector_store": vector_store} | |
def llm(*, name="openai"): | |
llm = get_llm_engine(name=name) | |
return {"llm": llm} | |
def text_embedder(llm, *, model="text-embedding-ada-002"): | |
llm = llm[0]["llm"] | |
text_embedder = TextEmbedder(llm=llm, model=model) | |
return {"text_embedder": text_embedder} | |
def rag_graph(vector_store, text_embedder): | |
vector_store = vector_store[0]["vector_store"] | |
text_embedder = text_embedder[0]["text_embedder"] | |
rag_graph = RAGGraph( | |
PandasKnowledgeBaseGraph(vector_store=vector_store, text_embedder=text_embedder) | |
) | |
return {"rag_graph": rag_graph} | |
def scenario_selector(*, scenario_file: str, node_types="intent_cluster"): | |
scenarios = load_config(scenario_file) | |
node_types = [t.strip() for t in node_types.split(",")] | |
scenario_selector = ScenarioSelector( | |
scenarios=[Scenario(**scenario) for scenario in scenarios], | |
node_types=node_types, | |
) | |
return {"scenario_selector": scenario_selector} | |
DEFAULT_NEGATIVE_ANSWER = "I'm sorry, but the data I've been trained on does not contain any information related to your question." | |
def rag_chatbot( | |
rag_graph, | |
scenario_selector, | |
llm, | |
*, | |
negative_answer=DEFAULT_NEGATIVE_ANSWER, | |
limits_by_type="{}", | |
strict_limits=True, | |
max_results=5, | |
): | |
rag_graph = rag_graph[0]["rag_graph"] | |
scenario_selector = scenario_selector[0]["scenario_selector"] | |
llm = llm[0]["llm"] | |
limits_by_type = json.loads(limits_by_type) | |
rag_chatbot = RAGChatbot( | |
rag_graph=rag_graph, | |
scenario_selector=scenario_selector, | |
llm=llm, | |
negative_answer=negative_answer, | |
limits_by_type=limits_by_type, | |
strict_limits=strict_limits, | |
max_results=max_results, | |
) | |
return {"chatbot": rag_chatbot} | |
def chat_processor(processor, *, _ctx: one_by_one.Context): | |
cfg = _ctx.last_result or { | |
"question_processors": [], | |
"answer_processors": [], | |
"masks": [], | |
} | |
for f in ["question_processor", "answer_processor", "mask"]: | |
if f in processor: | |
cfg[f + "s"].append(processor[f]) | |
question_processors = cfg["question_processors"][:] | |
answer_processors = cfg["answer_processors"][:] | |
masking_templates = {} | |
for mask in cfg["masks"]: | |
masking_templates[mask["name"]] = mask | |
if masking_templates: | |
question_processors.append(MaskTemplate(masking_templates=masking_templates)) | |
answer_processors.append(MaskTemplate(masking_templates=masking_templates)) | |
chat_processor = ChatProcessor( | |
question_processors=question_processors, answer_processors=answer_processors | |
) | |
return {"chat_processor": chat_processor, **cfg} | |
def truncate_history(*, max_tokens=10000, language="English"): | |
return { | |
"question_processor": TruncateHistory( | |
max_tokens=max_tokens, language=language.lower() | |
) | |
} | |
def mask(*, name="", regex="", exceptions="", mask_pattern=""): | |
exceptions = [e.strip() for e in exceptions.split(",") if e.strip()] | |
return { | |
"mask": { | |
"name": name, | |
"regex": regex, | |
"exceptions": exceptions, | |
"mask_pattern": mask_pattern, | |
} | |
} | |
async def test_chat_api(message, chat_api, *, show_details=False): | |
chat_api = chat_api[0]["chat_api"] | |
request = ChatAPIRequest( | |
session_id="b43215a0-428f-11ef-9454-0242ac120002", | |
question=message["text"], | |
history=[], | |
) | |
response = await chat_api.answer(request) | |
if show_details: | |
return {**response.__dict__} | |
else: | |
return {"answer": response.answer} | |
def input_chat(*, chat: str): | |
return {"text": chat} | |
def chat_api(chatbot, chat_processor, knowledge_base, *, model="gpt-4o-mini"): | |
chatbot = chatbot[0]["chatbot"] | |
chat_processor = chat_processor[0]["chat_processor"] | |
knowledge_base = knowledge_base[0] | |
c = ChatAPI( | |
chatbot=chatbot, | |
chat_processor=chat_processor, | |
model=model, | |
) | |
if knowledge_base: | |
c.chatbot.rag_graph.kg_base.load_v1_knowledge_base(**knowledge_base) | |
c.chatbot.scenario_selector.check_compatibility(c.chatbot.rag_graph) | |
return {"chat_api": c} | |
def knowledge_base( | |
*, | |
nodes_path="nodes.pickle", | |
edges_path="edges.pickle", | |
template_cluster_path="tempclusters.pickle", | |
): | |
return { | |
"nodes_path": nodes_path, | |
"edges_path": edges_path, | |
"template_cluster_path": template_cluster_path, | |
} | |
def view(input): | |
columns = [str(c) for c in input.keys() if not str(c).startswith("_")] | |
v = { | |
"dataframes": { | |
"df": { | |
"columns": columns, | |
"data": [[input[c] for c in columns]], | |
} | |
} | |
} | |
return v | |
async def api_service(request): | |
""" | |
Serves a chat endpoint that matches LynxScribe's interface. | |
To access it you need to add the "module" and "workspace" | |
parameters. | |
The workspace must contain exactly one "Chat API" node. | |
curl -X POST ${LYNXKITE_URL}/api/service \ | |
-H "Content-Type: application/json" \ | |
-d '{ | |
"module": "server.lynxscribe_ops", | |
"workspace": "LynxScribe demo", | |
"session_id": "b43215a0-428f-11ef-9454-0242ac120002", | |
"question": "what does the fox say", | |
"history": [], | |
"user_id": "x", | |
"meta_inputs": {} | |
}' | |
""" | |
import pathlib | |
from . import workspace | |
DATA_PATH = pathlib.Path.cwd() / "data" | |
path = DATA_PATH / request["workspace"] | |
assert path.is_relative_to(DATA_PATH) | |
assert path.exists(), f"Workspace {path} does not exist" | |
ws = workspace.load(path) | |
contexts = ops.EXECUTORS[ENV](ws) | |
nodes = [op for op in ws.nodes if op.data.title == "Chat API"] | |
[node] = nodes | |
context = contexts[node.id] | |
chat_api = context.last_result["chat_api"] | |
request = ChatAPIRequest( | |
session_id=request["session_id"], | |
question=request["question"], | |
history=request["history"], | |
) | |
response = await chat_api.answer(request) | |
return response | |