Spaces:
Running
Running
""" | |
LynxScribe configuration and testing in LynxKite. | |
""" | |
from lynxscribe.core.llm.base import get_llm_engine | |
from lynxscribe.core.vector_store.base import get_vector_store | |
from lynxscribe.common.config import load_config | |
from lynxscribe.components.text_embedder import TextEmbedder | |
from lynxscribe.components.rag.rag_graph import RAGGraph | |
from lynxscribe.components.rag.knowledge_base_graph import PandasKnowledgeBaseGraph | |
from lynxscribe.components.rag.rag_chatbot import Scenario, ScenarioSelector, RAGChatbot | |
from lynxscribe.components.chat_processor.base import ChatProcessor | |
from lynxscribe.components.chat_processor.processors import MaskTemplate, TruncateHistory | |
from lynxscribe.components.chat_api import ChatAPI, ChatAPIRequest, ChatAPIResponse | |
from . import ops | |
import asyncio | |
import json | |
from .executors import one_by_one | |
ENV = 'LynxScribe' | |
one_by_one.register(ENV) | |
op = ops.op_registration(ENV) | |
output_on_top = ops.output_position(output="top") | |
def vector_store(*, name='chromadb', collection_name='lynx'): | |
vector_store = get_vector_store(name=name, collection_name=collection_name) | |
return {'vector_store': vector_store} | |
def llm(*, name='openai'): | |
llm = get_llm_engine(name=name) | |
return {'llm': llm} | |
def text_embedder(llm, *, model='text-embedding-ada-002'): | |
llm = llm[0]['llm'] | |
text_embedder = TextEmbedder(llm=llm, model=model) | |
return {'text_embedder': text_embedder} | |
def rag_graph(vector_store, text_embedder): | |
vector_store = vector_store[0]['vector_store'] | |
text_embedder = text_embedder[0]['text_embedder'] | |
rag_graph = RAGGraph( | |
PandasKnowledgeBaseGraph(vector_store=vector_store, text_embedder=text_embedder) | |
) | |
return {'rag_graph': rag_graph} | |
def scenario_selector(*, scenario_file: str, node_types='intent_cluster'): | |
scenarios = load_config(scenario_file) | |
node_types = [t.strip() for t in node_types.split(',')] | |
scenario_selector = ScenarioSelector( | |
scenarios=[Scenario(**scenario) for scenario in scenarios], | |
node_types=node_types, | |
) | |
return {'scenario_selector': scenario_selector} | |
DEFAULT_NEGATIVE_ANSWER = "I'm sorry, but the data I've been trained on does not contain any information related to your question." | |
def rag_chatbot( | |
rag_graph, scenario_selector, llm, *, | |
negative_answer=DEFAULT_NEGATIVE_ANSWER, | |
limits_by_type='{}', | |
strict_limits=True, max_results=5): | |
rag_graph = rag_graph[0]['rag_graph'] | |
scenario_selector = scenario_selector[0]['scenario_selector'] | |
llm = llm[0]['llm'] | |
limits_by_type = json.loads(limits_by_type) | |
rag_chatbot = RAGChatbot( | |
rag_graph=rag_graph, | |
scenario_selector=scenario_selector, | |
llm=llm, | |
negative_answer=negative_answer, | |
limits_by_type=limits_by_type, | |
strict_limits=strict_limits, | |
max_results=max_results, | |
) | |
return {'chatbot': rag_chatbot} | |
def chat_processor(processor, *, _ctx: one_by_one.Context): | |
cfg = _ctx.last_result or {'question_processors': [], 'answer_processors': [], 'masks': []} | |
for f in ['question_processor', 'answer_processor', 'mask']: | |
if f in processor: | |
cfg[f + 's'].append(processor[f]) | |
question_processors = cfg['question_processors'][:] | |
answer_processors = cfg['answer_processors'][:] | |
masking_templates = {} | |
for mask in cfg['masks']: | |
masking_templates[mask['name']] = mask | |
if masking_templates: | |
question_processors.append(MaskTemplate(masking_templates=masking_templates)) | |
answer_processors.append(MaskTemplate(masking_templates=masking_templates)) | |
chat_processor = ChatProcessor(question_processors=question_processors, answer_processors=answer_processors) | |
return {'chat_processor': chat_processor, **cfg} | |
def truncate_history(*, max_tokens=10000, language='English'): | |
return {'question_processor': TruncateHistory(max_tokens=max_tokens, language=language.lower())} | |
def mask(*, name='', regex='', exceptions='', mask_pattern=''): | |
exceptions = [e.strip() for e in exceptions.split(',') if e.strip()] | |
return {'mask': {'name': name, 'regex': regex, 'exceptions': exceptions, 'mask_pattern': mask_pattern}} | |
def test_chat_api(message, chat_api): | |
chat_api = chat_api[0]['chat_api'] | |
request = ChatAPIRequest(session_id="b43215a0-428f-11ef-9454-0242ac120002", question=message['text'], history=[]) | |
response = asyncio.run(chat_api.answer(request)) | |
return {'response': response.answer} | |
def input_chat(*, chat: str): | |
return {'text': chat} | |
def chat_api(chatbot, chat_processor, knowledge_base, *, model='gpt-4o-mini'): | |
chatbot = chatbot[0]['chatbot'] | |
chat_processor = chat_processor[0]['chat_processor'] | |
knowledge_base = knowledge_base[0] | |
c = ChatAPI( | |
chatbot=chatbot, | |
chat_processor=chat_processor, | |
model=model, | |
) | |
if knowledge_base: | |
c.chatbot.rag_graph.kg_base.load_v1_knowledge_base(**knowledge_base) | |
c.chatbot.scenario_selector.check_compatibility(c.chatbot.rag_graph) | |
return {'chat_api': c} | |
def knowledge_base(*, nodes_path='nodes.pickle', edges_path='edges.pickle', template_cluster_path='tempclusters.pickle'): | |
return {'nodes_path': nodes_path, 'edges_path': edges_path, 'template_cluster_path': template_cluster_path} | |
def view(input): | |
columns = [str(c) for c in input.keys() if not str(c).startswith('_')] | |
v = { | |
'dataframes': { 'df': { | |
'columns': columns, | |
'data': [[input[c] for c in columns]], | |
}} | |
} | |
return v | |
async def api_service(request): | |
''' | |
Serves a chat endpoint that matches LynxScribe's interface. | |
To access it you need to add the "module" and "workspace" | |
parameters. | |
The workspace must contain exactly one "Chat API" node. | |
curl -X POST ${LYNXKITE_URL}/api/service \ | |
-H "Content-Type: application/json" \ | |
-d '{ | |
"module": "server.lynxscribe_ops", | |
"workspace": "LynxScribe demo", | |
"session_id": "b43215a0-428f-11ef-9454-0242ac120002", | |
"question": "what does the fox say", | |
"history": [], | |
"user_id": "x", | |
"meta_inputs": {} | |
}' | |
''' | |
import pathlib | |
from . import workspace | |
DATA_PATH = pathlib.Path.cwd() / 'data' | |
path = DATA_PATH / request['workspace'] | |
assert path.is_relative_to(DATA_PATH) | |
assert path.exists(), f'Workspace {path} does not exist' | |
ws = workspace.load(path) | |
contexts = ops.EXECUTORS[ENV](ws) | |
nodes = [op for op in ws.nodes if op.data.title == 'Chat API'] | |
[node] = nodes | |
context = contexts[node.id] | |
chat_api = context.last_result['chat_api'] | |
request = ChatAPIRequest(session_id=request['session_id'], question=request['question'], history=request['history']) | |
response = await chat_api.answer(request) | |
return response | |