SexBot / workflow /sql_workflow.py
Pew404's picture
Upload folder using huggingface_hub
318db6e verified
import concurrent.futures
import threading
import asyncio, json, os
from llama_index.core.workflow import (
Context,
Workflow,
StartEvent,
StopEvent,
step,
)
from datetime import datetime
from llama_index.llms.ollama import Ollama
from llama_index.core.storage.chat_store import SimpleChatStore
from llama_index.core.llms import ChatMessage
from llama_index.core import PromptTemplate
from workflow.events import *
from milvusDB.retriever import MilvusRetriever
from prompts.default_prompts import (
QUERY_REWRITE_PROMPT,
FINAL_RESPONSE_PROMPT,
INTENT_EXTRACT_PROMPT,
CASUAL_CHAT_PROMPT,
KEYWORDS_EXTRACTION_PROMPT,
RELATED_SEARCH_PROMPT,
ALIGNMENT_PROMPT,
REFUSE_PROMPT,
TRANSLATE_PROMPT,
WEBSIT_PROMPT,
TERM_PROMPT
)
from workflow.modules import (
ProcessStatus,
ExtraStatus,
MySQLChatStore,
parse_image_content,
parse_video_content,
parse_web_search_content,
video_search,
image_search,
general_search,
web_reader,
)
from workflow.vllm_model import MyVllm
from dotenv import load_dotenv
load_dotenv()
MILVUS_URI = os.getenv("MILVUS_URI")
CHAT_STORE_PATH = os.getenv("CHAT_STORE_PATH")
TABLE_SUMMARY = {
"t_sur_media_sync_es": "This table is about Porn video information:\n\nt_sur_media_sync_es: Columns:id (integer), web_url (string), duration (integer), pattern_per (integer), like_count (integer), dislike_count (integer), view_count (integer), cover_picture (string), title (string), upload_date (datetime), uploader (string), create_time (datetime), update_time (datetime), categories (list of strings), abbreviate_video_url (string), abbreviate_mp4_video_url (string), resource_type (integer), like_count_show (string), stat_version (string), tags (list of strings), model_name (string), publisher_type (string), period (string), sexual_preference (string), country (string), type (string), rank_number (integer), rank_rate (string), has_pattern (boolean), trace (string), manifest_url (string), is_delete (boolean), web_url_md5 (string), view_key (string)",
"t_sur_models_info": "This table is about Stripchat models' information:\n\nt_sur_models_info: Columns:id (INTEGER), username (VARCHAR(100), image (VARCHAR(500), num_users (INTEGER), pf (VARCHAR(50), pf_model_unite (VARCHAR(50), use_plugin (INTEGER), create_time (DATETIME), update_time (DATETIME), update_time (DATETIME), gender (VARCHAR(50), broadcast_type (VARCHAR(50), common_gender (VARCHAR(50), avatar (VARCHAR(512), age (INTEGER) "
}
INTENTS = ["Casual Chat", "Ask for specific Videos", "Ask for specific Images", "Ask for specific Website", "A variety of resources search", "Specific Knowledge"]
START_PHRASE = "The AI-provided content is for reference only. For concerns, we encourage you to consult with qualified professionals."
SPECIAL_TERMS = ["BDSM", "JOI", "Goon", "Futa", "Hentai", "Furry(Porn)", "Sissy(Porn)", "Pegging", "Cock Hero", "Femdom", "PMV", "Pornhub", "Femboy(Porn)", "Hypno(Porn)", "XXX", "Anal", "POV", "ASMR", "Futa"]
REFUSE_INTENTS = [
"medical advice", "Overdose medication", "child pornography", "self-harm", "political bias", "hate speech", "illegal drugs", "not harmful", "violent tendencies", "weaponry", "religious hate", "Theft", "Robbery", "Body Disposal", "Forgery", "Smuggling", "Money laundering", "Extortion", "Terrorism", "Explosion", "Cyberattack & Hacking", "illegal stalking", "Arms trafficking", "make people vanish"
]
class SQLWorkflow(Workflow):
"""
context字段:
original_query: 当前输入query
refined_query: 根据聊天记录重写精确的query
query_event_ct: 产生的query event计数
start_phrase: medical advice起始句
language: 语言
keywords: 用于videos & images搜索的关键词
response_mode: 标记使用什么回复模版
...
"""
def __init__(
self,
response_llm: MyVllm,
response_synthesis_prompt: PromptTemplate,
chat_store: MySQLChatStore,
sessionId: str,
context_flag: int,
adultMode: bool,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.response_llm = response_llm
self.response_synthesis_prompt = response_synthesis_prompt
self.chat_store = chat_store
self.sessionId = sessionId
self.context_flag = context_flag
self.adultMode = adultMode
self.retry_ct = 0
if self.adultMode:
self.safe_search = "off"
else:
self.safe_search = ""
@step
async def alignment(self, ctx: Context, ev: StartEvent) -> SafeStartEvent | RefuseEvent | StartEvent:
status = ProcessStatus(type="thinking", status="start")
status_str = status.to_json()
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
await asyncio.sleep(0)
if not self.adultMode:
# 非成人模式 拒绝回答成人内容
adult_intents = ["adult content", "erotic"]
normal_intents = ["normal", "Sex Education", "Sexual Health"]
fmt_message = ALIGNMENT_PROMPT.format_messages(
user_input=ev.query,
intent_labels=adult_intents+normal_intents,
)
response = self.response_llm.chat(fmt_message)
try:
response = json.loads(response.message.content)
if response["intent"] in adult_intents:
print(f"adultMode_0: {response['intent']}")
# extraStatus
extra_status = ExtraStatus(adultMode=self.adultMode, intentionResult=None, sensitiveResult=[response["intent"]], questionIsSex="1").to_json()
ctx.streaming_queue.put_nowait(f"data: {extra_status}\n\n")
await asyncio.sleep(0)
# 拒绝回复,完成Thinking
status.update("end")
status_str = status.to_json()
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
await asyncio.sleep(0)
return RefuseEvent(
lang="english",
query=ev.query,
adult=True,
)
except:
if self.retry_ct < 3:
self.retry_ct += 1
return StartEvent(query=ev.query)
return SafeStartEvent(query=ev.query)
intent_labels = REFUSE_INTENTS
fmt_message = ALIGNMENT_PROMPT.format_messages(
user_input=ev.query,
intent_labels=intent_labels
)
response = self.response_llm.chat(fmt_message)
try:
response = json.loads(response.message.content)
intent = response["intent"]
lang = response["language"]
print(f"language: {lang}")
# 检测中文输入默认用英文回复
if lang.lower() in ["zh", "chinese"]:
lang = "english"
await ctx.set("language", lang)
print(f"FILTER STATUS: {intent}")
if intent == "not harmful" or intent == "BDSM content":
return SafeStartEvent(query=ev.query)
elif intent == "medical advice":
await ctx.set("start_phrase", START_PHRASE)
return SafeStartEvent(query=ev.query)
else:
# send extraStatus
extra_status = ExtraStatus(adultMode=self.adultMode, intentionResult=None, sensitiveResult=[response["intent"]], questionIsSex="0").to_json()
ctx.streaming_queue.put_nowait(f"data: {extra_status}\n\n")
# 拒绝回复,完成Thinking
status.update("end")
status_str = status.to_json()
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
await asyncio.sleep(0)
return RefuseEvent(lang=lang)
except:
if self.retry_ct < 3:
self.retry_ct += 1
return StartEvent(query=ev.query)
return SafeStartEvent(query=ev.query)
@step
async def refuse(self, ctx: Context, ev: RefuseEvent) -> StopEvent:
if self.adultMode == False and ev.adult == True:
video_result = video_search(q=ev.query, mode="off")
video_str = json.dumps(video_result)
response = "I cannot provide details about this topic at the moment. Below are the links found for you by the search engine.\n\n"
for c in response:
content = {"content": c}
content = json.dumps(content)
ctx.write_event_to_stream(TokenEvent(token=f"data:{content}\n\n"))
await asyncio.sleep(0)
search_words = ev.query.strip().replace(" ", "+")
ctx.streaming_queue.put_nowait(f'data: {{"video_searchWords":"{search_words}"}}\n\n')
ctx.streaming_queue.put_nowait(f'data: {{"videoResults":{video_str}}}\n\n')
await asyncio.sleep(0)
ctx.write_event_to_stream("data:[DONE]\n\n")
else:
response_str = ""
response = self.response_llm.stream(REFUSE_PROMPT, language=ev.lang)
for token in response:
response_str += token
content = {"content": token}
content = json.dumps(content)
ctx.write_event_to_stream(TokenEvent(token=f"data:{content}\n\n"))
await asyncio.sleep(0)
ctx.write_event_to_stream("data:[DONE]\n\n")
print(f"reponse_str: {response_str}")
return StopEvent(result="success")
@step
async def intend_recognize(self, ctx: Context, ev: SafeStartEvent) -> CasualChatEvent | VideoResourceEvent | ImageResourceEvent | GeneralSearchEvent | SafeStartEvent | FullContextEvent:
# special terms
if ev.query in SPECIAL_TERMS:
status = ProcessStatus(type="thinking", status="end").to_json()
ctx.streaming_queue.put_nowait(f"data: {status}\n\n")
await asyncio.sleep(0)
# self.chat_store.add_message(key=self.sessionId, message=ChatMessage(role="user", content=ev.query))
self.chat_store.add_message(user_id=self.sessionId, role="user", content=ev.query)
await ctx.set("response_mode", "Definition of Term")
await ctx.set("original_query", ev.query)
await ctx.set("query_event_ct", 1) # image和video内容不作为最后回复的参考
video_query = ev.query + "Porn"
image_query = ev.query + "Porn"
web_query = f"What is {ev.query} in sexual context"
ctx.send_event(VideoSearch(query=video_query))
ctx.send_event(ImageSearch(query=image_query))
return GeneralSearch(query=web_query, tag="Specific Knowledge")
# 25-1-15 special ads
if "victoria snakeysmut" in ev.query.lower() or "snakeysmut" in ev.query.lower():
status = ProcessStatus(type="thinking", status="end").to_json()
ctx.streaming_queue.put_nowait(f"data: {status}\n\n")
await asyncio.sleep(0)
# self.chat_store.add_message(key=self.sessionId, message=ChatMessage(role="user", content=ev.query))
self.chat_store.add_message(user_id=self.sessionId, role="user", content=ev.query)
await ctx.set("response_mode", "ads-victoria")
await ctx.set("original_query", ev.query)
await ctx.set("query_event_ct", 1) # image和video内容不作为最后回复的参考
web_query = "Victoria SnakeySmut"
image_result = [{
"position": 1,
"thumbnail": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/SnakeySmut.png",
"related_content_id": "WkNzSFNndkhqVlBrOU1cIixcIk16bG1veURtUndJemZN",
"serpapi_related_content_link": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/SnakeySmut.png",
"source": "http://www.vibemate.com",
"source_logo": "",
"title": "Victoria SnakeySmut",
"link": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/SnakeySmut.png",
"original": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/SnakeySmut.png",
"original_width": 2160,
"original_height": 2700,
"is_product": False,
}]
image_str = json.dumps(image_result)
ctx.streaming_queue.put_nowait(f'data: {{"imageResults":{image_str}}}\n\n')
return FullContextEvent(context_str="")
if "chanell-heart" in ev.query.lower() or "chanell heart" in ev.query.lower():
status = ProcessStatus(type="thinking", status="end").to_json()
ctx.streaming_queue.put_nowait(f"data: {status}\n\n")
await asyncio.sleep(0)
# self.chat_store.add_message(key=self.sessionId, message=ChatMessage(role="user", content=ev.query))
self.chat_store.add_message(user_id=self.sessionId, role="user", content=ev.query)
await ctx.set("response_mode", "ads-chanell-heart")
await ctx.set("original_query", ev.query)
await ctx.set("query_event_ct", 1) # image和video内容不作为最后回复的参考
web_query = "Chanell Heart"
image_result=[
{
"position": 1,
"thumbnail": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/chanell-heart.png",
"related_content_id": "WkNzSFNndkhqVlBrOU1cIixcIk16bG1veURtUndJemZN",
"serpapi_related_content_link": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/chanell-heart.png",
"source": "http://www.vibemate.com",
"source_logo": "",
"title": "Chanell Heart",
"link": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/chanell-heart.png",
"original": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/chanell-heart.png",
"original_width": 2160,
"original_height": 2700,
"is_product": False,
}
]
image_str = json.dumps(image_result)
ctx.streaming_queue.put_nowait(f'data: {{"imageResults":{image_str}}}\n\n')
return FullContextEvent(context_str="")
# intention
all_intents = INTENTS
await ctx.set("original_query", ev.query)
await ctx.set("query_event_ct", 0)
if self.context_flag == 1:
if self.retry_ct == 0:
# self.chat_store.add_message(self.sessionId, ChatMessage(role="user", content=ev.query))
self.chat_store.add_message(user_id=self.sessionId, role="user", content=ev.query)
# chat_history = "\n".join([str(message) for message in self.chat_store.get_messages(self.sessionId)[-5:]])
self.chat_history = self.chat_store.get_chat_history(self.sessionId)
else:
self.chat_history = " "
fmt_message = INTENT_EXTRACT_PROMPT.format_messages(
user_input=ev.query,
chat_history=self.chat_history,
possible_intentions=all_intents,
)
response = self.response_llm.chat(fmt_message).message.content
try:
response = json.loads(response)
isSex = str(response["adult"])
intents = response["intentions"]
# reorder
intents = reorder(intents, "Ask for specific Images", "Ask for specific Videos")
await ctx.set("intention", intents)
status = ProcessStatus(type="thinking", status="end").to_json()
ctx.streaming_queue.put_nowait(f"data: {status}\n\n")
await asyncio.sleep(0)
print(f"intention: {intents}")
if len(intents) == 1 and intents[0] == "Casual Chat":
# send extraStatus
extra_status = ExtraStatus(adultMode=self.adultMode, intentionResult=intents, sensitiveResult=None, questionIsSex=isSex).to_json()
ctx.streaming_queue.put_nowait(f"data: {extra_status}\n\n")
await asyncio.sleep(0)
return CasualChatEvent(query=ev.query)
if "Ask for specific Website" in intents:
# send extraStatus
extra_status = ExtraStatus(adultMode=self.adultMode, intentionResult=intents, sensitiveResult=None, questionIsSex=isSex).to_json()
ctx.streaming_queue.put_nowait(f"data: {extra_status}\n\n")
await asyncio.sleep(0)
return(GeneralSearchEvent(query=ev.query, tag="Ask for specific Website"))
if "A variety of resources search" in intents:
# send extraStatus
extra_status = ExtraStatus(adultMode=self.adultMode, intentionResult=intents, sensitiveResult=None, questionIsSex=isSex).to_json()
ctx.streaming_queue.put_nowait(f"data: {extra_status}\n\n")
await asyncio.sleep(0)
ctx.send_event(ImageResourceEvent(query=ev.query))
ctx.send_event(VideoResourceEvent(query=ev.query))
return GeneralSearchEvent(query=ev.query, tag="A variety of resources search")
else:
if "Casual Chat" in intents:
intents.remove("Casual Chat")
# send extraStatus
extra_status = ExtraStatus(adultMode=self.adultMode, intentionResult=intents, sensitiveResult=None, questionIsSex=isSex).to_json()
# ctx.write_event_to_stream(StatusEvent(status=f"data: {extra_status}\n\n"))
ctx.streaming_queue.put_nowait(f"data: {extra_status}\n\n")
await asyncio.sleep(0)
for intent in intents:
if intent in all_intents:
if "Ask for specific Videos" == intent:
status = ProcessStatus(type="videoResults", status="start")
status_str = status.to_json()
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
await asyncio.sleep(0)
ctx.send_event(VideoResourceEvent(query=ev.query))
elif "Ask for specific Images" == intent:
# 发送状态信息 --> start
status = ProcessStatus(type="imageResults", status="start")
status_str = status.to_json()
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
await asyncio.sleep(0)
ctx.send_event(ImageResourceEvent(query=ev.query))
elif "Specific Knowledge" == intent:
ctx.send_event(GeneralSearchEvent(query=ev.query, tag="Specific Knowledge"))
else:
ctx.send_event(SafeStartEvent(query=ev.query))
break
except Exception as e:
if self.retry_ct < 3:
self.retry_ct += 1
print(f"retry intention recognization: {e} - retry: {self.retry_ct}")
return SafeStartEvent(query=ev.query)
else:
return CasualChatEvent(query=ev.query)
@step
async def query_rewrite(self, ctx: Context, ev: GeneralSearchEvent) -> GeneralSearch | MilvusDBSearchEvent:
if self.context_flag == 1:
# chat_history = "\n".join([str(message) for message in self.chat_store.get_messages(self.sessionId)[-5:]])
chat_history = self.chat_history
else:
chat_history = " "
# print(f"chat_history: {chat_history}")
language = await ctx.get("language", "en")
intention = ev.tag
format_input = QUERY_REWRITE_PROMPT.format_messages(
chat_history=chat_history,
query_str=ev.query,
intention=intention,
language=language
)
response = self.response_llm.chat(format_input).message.content
try:
response = json.loads(response)
query = response["query"]
# sex_ed = response["sex_ed"]
print(f"rewrite query: {query}")
await ctx.set("refined_query", query)
# searchevent 计数
curr_ct = await ctx.get("query_event_ct", 0)
curr_ct += 1
ctx.send_event(GeneralSearch(query=query, tag=intention))
if ev.tag == "Specific Knowledge":
curr_ct += 1
ctx.send_event(MilvusDBSearchEvent(query=query))
await ctx.set("query_event_ct", curr_ct)
except:
return GeneralSearchEvent(query=ev.query, tag=ev.tag)
@step
async def keywords_extraction(self, ctx: Context, ev: ImageResourceEvent | VideoResourceEvent) -> ImageSearch | VideoSearch:
if self.context_flag == 1:
# chat_history = "\n".join([str(message) for message in self.chat_store.get_messages(self.sessionId)[-5:]])
chat_history = self.chat_history
else:
chat_history = " "
format_input = KEYWORDS_EXTRACTION_PROMPT.format_messages(
chat_history=chat_history,
query_str=ev.query,
)
try:
# 同时有视频,图片搜索只需要提取一次关键词供搜索使用
keywords = await ctx.get("keywords", "None")
if keywords == "None":
response = self.response_llm.chat(format_input).message.content
response = json.loads(response)
keywords = response["keywords"]
if len(keywords) > 2:
keywords = keywords[:2]
print(f"extrated_keywords: {keywords}")
keywords = " ".join(keywords)
await ctx.set("keywords", keywords)
if isinstance(ev, VideoResourceEvent):
return VideoSearch(query=keywords)
else:
return ImageSearch(query=keywords)
except:
# searchevent 计数
curr_ct = await ctx.get("query_event_ct", 0)
await ctx.set("query_event_ct", curr_ct + 1)
return GeneralSearch(query=ev.query, tag="")
@step
async def vector_search(self, ctx: Context, ev: MilvusDBSearchEvent) -> RetrieveContextEvent:
retriever = MilvusRetriever(uri=MILVUS_URI)
colleciton_names = ["t_sur_sex_ed_article_spider", "t_sur_sex_ed_question_answer_spider"]
query = ev.query
search_results = []
context_str = []
for collection_name in colleciton_names:
if collection_name == "t_sur_sex_ed_article_spider":
process_type = "sex_ed_article"
if collection_name == "t_sur_sex_ed_question_answer_spider":
process_type = "sex_ed_qa"
# 发送状态信息 --> start
status = ProcessStatus(type=process_type, status="start")
status_str = status.to_json()
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
await asyncio.sleep(0)
# 搜索
res = retriever.search(query=query, collection_name=collection_name, top_k=5)
res = [record for record in res if record["distance"] >= 0.7]
search_results.append(res)
result_dict = dict(zip(colleciton_names, search_results))
for collection_name, res in result_dict.items():
if collection_name == "t_sur_sex_ed_article_spider":
articles = [record["entity"] for record in res]
articles = json.dumps(articles)
ctx.streaming_queue.put_nowait(f'data: {{"sex_ed_article":{articles}}}\n\n')
await asyncio.sleep(0)
# 发送状态信息 --> end
status.update(status="end")
status_str = status.to_json()
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
await asyncio.sleep(0)
context_str.append("Sex Education Articles:")
for record in res:
title = record["entity"]["title"]
chunk = record["entity"]["chunk"]
context_str.append(title + "\n" + chunk)
else:
qas = [record["entity"] for record in res]
qas = json.dumps(qas)
ctx.streaming_queue.put_nowait(f'data: {{"sex_ed_qa":{qas}}}\n\n')
await asyncio.sleep(0)
# 发送状态信息 --> end
status.update(status="end")
status_str = status.to_json()
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
await asyncio.sleep(0)
context_str.append("Sex Education Q&As:")
for record in res:
title = record["entity"]["title"]
content = record["entity"]["content"]
context_str.append(title + "\n" + content)
context_str = "\n".join(context_str)
return RetrieveContextEvent(context_str=context_str)
@step
async def image_search(self, ctx: Context, ev: ImageSearch) -> StopEvent | RetrieveContextEvent:
loop = asyncio.get_event_loop()
search_words = ev.query.strip().replace(" ", "+")
ctx.streaming_queue.put_nowait(f'data: {{"image_searchWords":"{search_words}"}}\n\n')
await asyncio.sleep(0)
with concurrent.futures.ThreadPoolExecutor() as pool:
image_result = await loop.run_in_executor(pool, image_search, ev.query, self.safe_search)
if image_result:
image_str = json.dumps(image_result)
# google_url = f"https://www.google.com/search?q={search_words}&safe=off&udm=2"
ctx.streaming_queue.put_nowait(f'data: {{"imageResults":{image_str}}}\n\n')
await asyncio.sleep(0)
# 发送状态信息 --> end
status = ProcessStatus(type="imageResults", status="end").to_json()
ctx.streaming_queue.put_nowait(f'data: {status}\n\n')
await asyncio.sleep(0)
intents = await ctx.get("intention", " ")
if len(intents) == 1 and intents[0] == "Ask for specific Images":
if self.context_flag == 1:
# self.chat_store.add_message(self.sessionId, ChatMessage(role="assistant", content=parse_image_content(image_result)))
# self.chat_store.persist(f"{CHAT_STORE_PATH}/testing_chat_store.json")
t = threading.Thread(target=self.chat_store.add_message, args=(self.sessionId, "assistant", parse_image_content(image_result)))
t.start()
ctx.streaming_queue.put_nowait("data:[DONE]\n\n")
await asyncio.sleep(0)
return StopEvent(result="success")
elif set(intents) == set(["Ask for specific Videos", "Ask for specific Images"]):
curr_event_Ct = await ctx.get("query_event_ct", 0)
curr_event_Ct = 2
await ctx.set("query_event_ct", curr_event_Ct)
return RetrieveContextEvent(context_str="")
else:
# 发送状态信息 --> end
status = ProcessStatus(type="imageResults", status="end").to_json()
ctx.streaming_queue.put_nowait(f'data: {status}\n\n')
await asyncio.sleep(0)
return RetrieveContextEvent(context_str="")
@step
async def video_search(self, ctx: Context, ev: VideoSearch) -> StopEvent | RetrieveContextEvent:
loop = asyncio.get_event_loop()
search_words = ev.query.strip().replace(" ", "+")
ctx.streaming_queue.put_nowait(f'data: {{"video_searchWords":"{search_words}"}}\n\n')
await asyncio.sleep(0)
with concurrent.futures.ThreadPoolExecutor() as pool:
video_result = await loop.run_in_executor(pool, video_search, ev.query, self.safe_search)
if video_result:
video_str = json.dumps(video_result)
# google_url = f"https://www.google.com/search?q={search_words}&safe=off&udm=7"
ctx.streaming_queue.put_nowait(f'data: {{"videoResults":{video_str}}}\n\n')
await asyncio.sleep(0)
# 发送状态信息 --> end
status = ProcessStatus(type="videoResults", status="end").to_json()
ctx.streaming_queue.put_nowait(f'data: {status}\n\n')
await asyncio.sleep(0)
intents = await ctx.get("intention", " ")
if len(intents) == 1 and intents[0] == "Ask for specific Videos":
if self.context_flag == 1:
# self.chat_store.add_message(self.sessionId, ChatMessage(role="assistant", content=parse_video_content(video_result)))
# self.chat_store.persist(f"{CHAT_STORE_PATH}/testing_chat_store.json")
t = threading.Thread(target=self.chat_store.add_message, args=(self.sessionId, "assistant", parse_video_content(video_result)))
t.start()
ctx.streaming_queue.put_nowait("data:[DONE]\n\n")
await asyncio.sleep(0)
return StopEvent(result="success")
elif set(intents) == set(["Ask for specific Videos", "Ask for specific Images"]):
curr_event_Ct = await ctx.get("query_event_ct", 0)
curr_event_Ct = 2
await ctx.set("query_event_ct", curr_event_Ct)
return RetrieveContextEvent(context_str="")
else:
status = ProcessStatus(type="videoResults", status="end").to_json()
ctx.streaming_queue.put_nowait(f'data: {status}\n\n')
await asyncio.sleep(0)
return RetrieveContextEvent(context_str="")
@step
async def general_search(self, ctx: Context, ev: GeneralSearch) -> RetrieveContextEvent:
# 发送状态信息 --> start
status = ProcessStatus(type="webResults", status="start")
status_str = status.to_json()
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
await asyncio.sleep(0)
if ev.ads == "Victoria SnakeySmut":
status.update(status="end")
status_str = status.to_json()
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
await asyncio.sleep(0)
web_result = [{'position': 1, 'title': 'Victoria SnakeySmut | Fansly', 'link': 'https://fansly.com/SnakeySmut', 'displayed_link': 'fansly.com/SnakeySmut', 'snippet': 'SnakeySmut conjures audio roleplays. Like the little noises I make with my mouth? Come see everything here! 18+ ONLY.'}, {'position': 2, 'title': 'Victoria (u/SnakeySmut) - Reddit', 'link': 'https://www.reddit.com/user/SnakeySmut/', 'displayed_link': 'www.reddit.com › user › SnakeySmut', 'snippet': 'u/SnakeySmut: The witchiest little treat ❤︎ Enjoy me as all dark things are to be loved, in secret and the shadows. Spooky girl with a proclivity for…'}, {'position': 3, 'title': 'victoria malfoy (@SnakeySmut) / X', 'link': 'https://x.com/snakeysmut?lang=en', 'displayed_link': 'x.com › snakeysmut', 'snippet': "victoria malfoy · @SnakeySmut. ·. Jan 2. Hi there, I'm Victoria A witch of many talents, specializing in fantasy fulfillment of aural and visual magic Links ..."}, {'position': 4, 'title': 'Highlights by victoria malfoy (@SnakeySmut) / X', 'link': 'https://twitter.com/SnakeySmut/highlights', 'displayed_link': 'twitter.com › SnakeySmut › highlights', 'snippet': "Posts · Replies · Highlights · Media. victoria malfoy's Highlights. victoria malfoy · @SnakeySmut. ·. Jan 13. you don't actually want to watch a movie, right?"}, {'position': 5, 'title': 'about - SnakeySmut', 'link': 'https://www.snakeysmut.com/aboutsnakeysmut', 'displayed_link': 'www.snakeysmut.com › aboutsnakeysmut', 'snippet': "Greetings, I'm Victoria. you know me best as snakeysmut."}]
web_str = json.dumps(web_result)
ctx.streaming_queue.put_nowait(f'data: {{"webResults":{web_str}}}\n\n')
await asyncio.sleep(0)
web_content = parse_web_search_content(web_result)
return RetrieveContextEvent(context_str=web_content)
if ev.ads == "Chanell Heart":
status.update(status="end")
status_str = status.to_json()
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
await asyncio.sleep(0)
web_result = general_search("Chanell Heart", mode=self.safe_search)
web_str = json.dumps(web_result)
ctx.streaming_queue.put_nowait(f'data: {{"webResults":{web_str}}}\n\n')
await asyncio.sleep(0)
web_content = parse_web_search_content(web_result)
return RetrieveContextEvent(context_str=web_content)
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
web_result = await loop.run_in_executor(
executor, general_search, ev.query, self.safe_search
)
if web_result:
# 发送状态信息 --> end (搜索)
status.update(status="end")
status_str = status.to_json()
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
await asyncio.sleep(0)
if ev.tag in ('A variety of resources search', 'Specific Knowledge'):
# 总结网页内容
# 发送状态信息 --> start (总结)
substatus = ProcessStatus(type="web_summary", status="start")
substatus_str = substatus.to_json()
ctx.streaming_queue.put_nowait(f'data: {substatus_str}\n\n')
await asyncio.sleep(0)
urls = [web["link"] for web in web_result][:2]
web_summaries = []
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
future_to_url = {executor.submit(web_reader, url): url for url in urls}
for future in concurrent.futures.as_completed(future_to_url):
try:
summary = future.result()
if summary:
web_summaries.append(summary)
except Exception as exc:
continue
web_summaries = "\n\n".join(web_summaries)
web_str = json.dumps(web_result)
ctx.streaming_queue.put_nowait(f'data: {{"webResults":{web_str}}}\n\n')
# 发送状态信息 --> end (总结)
substatus.update("end")
substatus_str = substatus.to_json()
ctx.streaming_queue.put_nowait(f'data: {substatus_str}\n\n')
await asyncio.sleep(0)
return RetrieveContextEvent(context_str="General Search Result:\n" + web_summaries)
else:
web_content = json.dumps(web_result)
await ctx.set("response_mode", "WebsiteResponse")
return RetrieveContextEvent(context_str=web_content)
else:
# 发送状态信息 --> end (搜索)
status.update(status="end")
status_str = status.to_json()
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
await asyncio.sleep(0)
return RetrieveContextEvent(context_str=" ")
@step
async def gather_context(self, ctx: Context, ev: RetrieveContextEvent) -> FullContextEvent | StopEvent:
event_cts = await ctx.get("query_event_ct", 0)
print(f"event_cts: {event_cts}")
events = ctx.collect_events(ev, [RetrieveContextEvent] * event_cts)
full_context = []
intents = await ctx.get("intention", " ")
if set(intents) == set(["Ask for specific Videos", "Ask for specific Images"]):
if events:
for idx, event in enumerate(events):
if idx == 1:
ctx.write_event_to_stream(TokenEvent(token='data:[DONE]\n\n'))
await asyncio.sleep(0)
return StopEvent(result="success")
if events:
print(f"recevived {len(events)} events")
for ev in events:
full_context.append(ev.context_str)
full_context = "\n\n".join(full_context)[:10000]
return FullContextEvent(context_str=full_context)
@step
async def casual_response(self, ctx: Context, ev: CasualChatEvent) -> StopEvent:
response_str = ""
if self.context_flag == 1:
chat_history = self.chat_history
else:
chat_history = ""
language = await ctx.get("language", "en")
start_phrase = await ctx.get("start_phrase", "")
if start_phrase != "":
start_phrase = self.response_llm.chat(TRANSLATE_PROMPT.format_messages(user_input=start_phrase, language=language)).message.content
start_phrase = "*" + start_phrase + "*\n\n"
response_str += start_phrase
content = {"content": start_phrase}
content = json.dumps(content)
ctx.write_event_to_stream(TokenEvent(token=(f"data:{content}\n\n")))
await asyncio.sleep(0)
response = self.response_llm.stream(CASUAL_CHAT_PROMPT, user_input=ev.query, chat_history=chat_history, language=language)
for token in response:
if token == "":
continue
time = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
response_str += token
content = {"content": token, "time": time}
content = json.dumps(content)
ctx.write_event_to_stream(TokenEvent(token=(f"data:{content}\n\n")))
await asyncio.sleep(0)
ctx.write_event_to_stream(TokenEvent(token="data:[DONE]\n\n"))
print(f"reponse_str: {response_str}")
if self.context_flag == 1:
# self.chat_store.add_message(self.sessionId, ChatMessage(role="assistant", content=response_str))
# self.chat_store.persist(f"{CHAT_STORE_PATH}/testing_chat_store.json")
t = threading.Thread(target=self.chat_store.add_message, args=(self.sessionId, "assistant", response_str))
t.start()
return StopEvent(result="success")
@step
async def response_synthesis(self, ctx: Context, ev: FullContextEvent) -> StopEvent:
response_str = ""
query_str = await ctx.get("refined_query", " ")
original_query = await ctx.get("original_query")
language = await ctx.get("language", "en")
keywords = await ctx.get("keywords", " ")
if query_str == " ":
query_str = original_query
# stream 最终回复
# 医疗信息起始句
start_phrase = await ctx.get("start_phrase", "")
response_mode = await ctx.get("response_mode", "")
if start_phrase != "":
start_phrase = self.response_llm.chat(TRANSLATE_PROMPT.format_messages(user_input=start_phrase, language=language)).message.content
start_phrase = "*" + start_phrase + "*\n\n"
response_str += start_phrase
content = {"content": start_phrase}
content = json.dumps(content)
ctx.write_event_to_stream(TokenEvent(token=(f"data:{content}\n\n")))
await asyncio.sleep(0)
# 临时ads
if response_mode == "ads-victoria":
for c in "**Victoria SnakeySmut is going to join X3 show, she is a famous model on Fansly! you can open her page on vibemate to sync with her content.**\n\n":
response_str += c
content = json.dumps({"content": c})
ctx.write_event_to_stream(TokenEvent(token=f"data:{content}\n\n"))
await asyncio.sleep(0)
print(f"response_str:\n{response_str}\n")
return StopEvent(result="success")
if response_mode == "ads-chanell-heart":
for c in "**Chanell-heart is joining the X3 show! She streams on Stripchat and is also popular on Pornhub. On VibeMate, connect your toys to sync with her during her cam shows and videos.**\n\n":
response_str += c
content = json.dumps({"content": c})
ctx.write_event_to_stream(TokenEvent(token=f"data:{content}\n\n"))
await asyncio.sleep(0)
print(f"response_str:\n{response_str}\n")
return StopEvent(result="success")
# 选择回复模版
if response_mode == "WebsiteResponse":
chat_response = self.response_llm.stream(
prompt=WEBSIT_PROMPT,
user_input=query_str,
search_result=ev.context_str,
language=language
)
elif response_mode == "Definition of Term":
chat_response = self.response_llm.stream(
prompt=TERM_PROMPT,
user_input=query_str,
search_result=ev.context_str,
language='english'
)
else:
chat_response = self.response_llm.stream(
prompt=self.response_synthesis_prompt,
search_keyword=query_str,
search_result=ev.context_str,
language=language
)
try:
for token in chat_response:
time = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
response_str += token
content = {"content": token, "time": time}
content = json.dumps(content)
ctx.write_event_to_stream(TokenEvent(token=f"data:{content}\n\n"))
await asyncio.sleep(0)
# 关联搜索词
extraction = self.response_llm.chat(
RELATED_SEARCH_PROMPT.format_messages(
keywords=keywords,
# chat_history="assistant: " + response_str,
retrieved_content=ev.context_str
)
).message.content
try:
extraction = json.loads(extraction)
related_search = extraction["related_searches"]
tags = extraction["tags"]
if len(extraction["related_searches"])>3:
related_search = related_search[:3]
if len(extraction["tags"])>3:
tags = tags[:3]
y_related_search = json.dumps({"related_searches": related_search})
y_tags = json.dumps({"tags": tags})
ctx.write_event_to_stream(TokenEvent(token=f"data:{y_tags}\n\n"))
ctx.write_event_to_stream(TokenEvent(token=f"data:{y_related_search}\n\n"))
await asyncio.sleep(0)
except Exception as e:
print(f"Related searchs & tags JSONDecode ERROR: {e}")
ctx.write_event_to_stream(TokenEvent(token="data:[DONE]\n\n"))
print(f"response_str:\n{response_str}\n")
if self.context_flag == 1:
# self.chat_store.add_message(self.sessionId, ChatMessage(role="assistant", content=response_str))
# self.chat_store.persist(f"{CHAT_STORE_PATH}/testing_chat_store.json")
t = threading.Thread(target=self.chat_store.add_message, args=(self.sessionId, "assistant", response_str))
t.start()
except Exception as e:
print(f"Streaming Exception: {e}")
return StopEvent(result="success")
async def sql_workflow(query: str, chat_store: SimpleChatStore, sessionId: str, llm: Ollama, context_flag: int, adultMode: bool):
response_synthesis_prompt = FINAL_RESPONSE_PROMPT
wf = SQLWorkflow(
response_llm=llm,
response_synthesis_prompt=response_synthesis_prompt,
chat_store=chat_store,
sessionId=sessionId,
context_flag=context_flag,
adultMode=adultMode,
verbose=True,
timeout=60
)
handler = wf.run(query=query)
return handler.stream_events()
def reorder(l: list, former: str, latter: str):
former_idx = l.index(former) if former in l else -1
latter_idx = l.index(latter) if latter in l else -1
if former_idx > latter_idx and former_idx != -1 and latter_idx != -1:
l[former_idx], l[latter_idx] = l[latter_idx], l[former_idx]
return l