|
import concurrent.futures |
|
import threading |
|
import asyncio, json, os |
|
from llama_index.core.workflow import ( |
|
Context, |
|
Workflow, |
|
StartEvent, |
|
StopEvent, |
|
step, |
|
) |
|
from datetime import datetime |
|
from llama_index.llms.ollama import Ollama |
|
from llama_index.core.storage.chat_store import SimpleChatStore |
|
from llama_index.core.llms import ChatMessage |
|
from llama_index.core import PromptTemplate |
|
from workflow.events import * |
|
from milvusDB.retriever import MilvusRetriever |
|
from prompts.default_prompts import ( |
|
QUERY_REWRITE_PROMPT, |
|
FINAL_RESPONSE_PROMPT, |
|
INTENT_EXTRACT_PROMPT, |
|
CASUAL_CHAT_PROMPT, |
|
KEYWORDS_EXTRACTION_PROMPT, |
|
RELATED_SEARCH_PROMPT, |
|
ALIGNMENT_PROMPT, |
|
REFUSE_PROMPT, |
|
TRANSLATE_PROMPT, |
|
WEBSIT_PROMPT, |
|
TERM_PROMPT |
|
) |
|
|
|
from workflow.modules import ( |
|
ProcessStatus, |
|
ExtraStatus, |
|
MySQLChatStore, |
|
parse_image_content, |
|
parse_video_content, |
|
parse_web_search_content, |
|
video_search, |
|
image_search, |
|
general_search, |
|
web_reader, |
|
) |
|
from workflow.vllm_model import MyVllm |
|
from dotenv import load_dotenv |
|
|
|
load_dotenv() |
|
MILVUS_URI = os.getenv("MILVUS_URI") |
|
CHAT_STORE_PATH = os.getenv("CHAT_STORE_PATH") |
|
|
|
TABLE_SUMMARY = { |
|
"t_sur_media_sync_es": "This table is about Porn video information:\n\nt_sur_media_sync_es: Columns:id (integer), web_url (string), duration (integer), pattern_per (integer), like_count (integer), dislike_count (integer), view_count (integer), cover_picture (string), title (string), upload_date (datetime), uploader (string), create_time (datetime), update_time (datetime), categories (list of strings), abbreviate_video_url (string), abbreviate_mp4_video_url (string), resource_type (integer), like_count_show (string), stat_version (string), tags (list of strings), model_name (string), publisher_type (string), period (string), sexual_preference (string), country (string), type (string), rank_number (integer), rank_rate (string), has_pattern (boolean), trace (string), manifest_url (string), is_delete (boolean), web_url_md5 (string), view_key (string)", |
|
"t_sur_models_info": "This table is about Stripchat models' information:\n\nt_sur_models_info: Columns:id (INTEGER), username (VARCHAR(100), image (VARCHAR(500), num_users (INTEGER), pf (VARCHAR(50), pf_model_unite (VARCHAR(50), use_plugin (INTEGER), create_time (DATETIME), update_time (DATETIME), update_time (DATETIME), gender (VARCHAR(50), broadcast_type (VARCHAR(50), common_gender (VARCHAR(50), avatar (VARCHAR(512), age (INTEGER) " |
|
} |
|
|
|
INTENTS = ["Casual Chat", "Ask for specific Videos", "Ask for specific Images", "Ask for specific Website", "A variety of resources search", "Specific Knowledge"] |
|
START_PHRASE = "The AI-provided content is for reference only. For concerns, we encourage you to consult with qualified professionals." |
|
SPECIAL_TERMS = ["BDSM", "JOI", "Goon", "Futa", "Hentai", "Furry(Porn)", "Sissy(Porn)", "Pegging", "Cock Hero", "Femdom", "PMV", "Pornhub", "Femboy(Porn)", "Hypno(Porn)", "XXX", "Anal", "POV", "ASMR", "Futa"] |
|
REFUSE_INTENTS = [ |
|
"medical advice", "Overdose medication", "child pornography", "self-harm", "political bias", "hate speech", "illegal drugs", "not harmful", "violent tendencies", "weaponry", "religious hate", "Theft", "Robbery", "Body Disposal", "Forgery", "Smuggling", "Money laundering", "Extortion", "Terrorism", "Explosion", "Cyberattack & Hacking", "illegal stalking", "Arms trafficking", "make people vanish" |
|
] |
|
|
|
class SQLWorkflow(Workflow): |
|
""" |
|
context字段: |
|
original_query: 当前输入query |
|
refined_query: 根据聊天记录重写精确的query |
|
query_event_ct: 产生的query event计数 |
|
start_phrase: medical advice起始句 |
|
language: 语言 |
|
keywords: 用于videos & images搜索的关键词 |
|
response_mode: 标记使用什么回复模版 |
|
... |
|
""" |
|
def __init__( |
|
self, |
|
response_llm: MyVllm, |
|
response_synthesis_prompt: PromptTemplate, |
|
chat_store: MySQLChatStore, |
|
sessionId: str, |
|
context_flag: int, |
|
adultMode: bool, |
|
*args, |
|
**kwargs |
|
): |
|
super().__init__(*args, **kwargs) |
|
self.response_llm = response_llm |
|
self.response_synthesis_prompt = response_synthesis_prompt |
|
self.chat_store = chat_store |
|
self.sessionId = sessionId |
|
self.context_flag = context_flag |
|
self.adultMode = adultMode |
|
self.retry_ct = 0 |
|
if self.adultMode: |
|
self.safe_search = "off" |
|
else: |
|
self.safe_search = "" |
|
|
|
@step |
|
async def alignment(self, ctx: Context, ev: StartEvent) -> SafeStartEvent | RefuseEvent | StartEvent: |
|
status = ProcessStatus(type="thinking", status="start") |
|
status_str = status.to_json() |
|
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') |
|
await asyncio.sleep(0) |
|
if not self.adultMode: |
|
|
|
adult_intents = ["adult content", "erotic"] |
|
normal_intents = ["normal", "Sex Education", "Sexual Health"] |
|
fmt_message = ALIGNMENT_PROMPT.format_messages( |
|
user_input=ev.query, |
|
intent_labels=adult_intents+normal_intents, |
|
) |
|
response = self.response_llm.chat(fmt_message) |
|
try: |
|
response = json.loads(response.message.content) |
|
if response["intent"] in adult_intents: |
|
print(f"adultMode_0: {response['intent']}") |
|
|
|
extra_status = ExtraStatus(adultMode=self.adultMode, intentionResult=None, sensitiveResult=[response["intent"]], questionIsSex="1").to_json() |
|
ctx.streaming_queue.put_nowait(f"data: {extra_status}\n\n") |
|
await asyncio.sleep(0) |
|
|
|
status.update("end") |
|
status_str = status.to_json() |
|
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') |
|
await asyncio.sleep(0) |
|
return RefuseEvent( |
|
lang="english", |
|
query=ev.query, |
|
adult=True, |
|
) |
|
except: |
|
if self.retry_ct < 3: |
|
self.retry_ct += 1 |
|
return StartEvent(query=ev.query) |
|
return SafeStartEvent(query=ev.query) |
|
intent_labels = REFUSE_INTENTS |
|
fmt_message = ALIGNMENT_PROMPT.format_messages( |
|
user_input=ev.query, |
|
intent_labels=intent_labels |
|
) |
|
response = self.response_llm.chat(fmt_message) |
|
try: |
|
response = json.loads(response.message.content) |
|
intent = response["intent"] |
|
lang = response["language"] |
|
print(f"language: {lang}") |
|
|
|
if lang.lower() in ["zh", "chinese"]: |
|
lang = "english" |
|
await ctx.set("language", lang) |
|
print(f"FILTER STATUS: {intent}") |
|
if intent == "not harmful" or intent == "BDSM content": |
|
return SafeStartEvent(query=ev.query) |
|
elif intent == "medical advice": |
|
await ctx.set("start_phrase", START_PHRASE) |
|
return SafeStartEvent(query=ev.query) |
|
else: |
|
|
|
extra_status = ExtraStatus(adultMode=self.adultMode, intentionResult=None, sensitiveResult=[response["intent"]], questionIsSex="0").to_json() |
|
ctx.streaming_queue.put_nowait(f"data: {extra_status}\n\n") |
|
|
|
status.update("end") |
|
status_str = status.to_json() |
|
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') |
|
await asyncio.sleep(0) |
|
return RefuseEvent(lang=lang) |
|
except: |
|
if self.retry_ct < 3: |
|
self.retry_ct += 1 |
|
return StartEvent(query=ev.query) |
|
return SafeStartEvent(query=ev.query) |
|
|
|
@step |
|
async def refuse(self, ctx: Context, ev: RefuseEvent) -> StopEvent: |
|
if self.adultMode == False and ev.adult == True: |
|
video_result = video_search(q=ev.query, mode="off") |
|
video_str = json.dumps(video_result) |
|
response = "I cannot provide details about this topic at the moment. Below are the links found for you by the search engine.\n\n" |
|
for c in response: |
|
content = {"content": c} |
|
content = json.dumps(content) |
|
ctx.write_event_to_stream(TokenEvent(token=f"data:{content}\n\n")) |
|
await asyncio.sleep(0) |
|
search_words = ev.query.strip().replace(" ", "+") |
|
ctx.streaming_queue.put_nowait(f'data: {{"video_searchWords":"{search_words}"}}\n\n') |
|
ctx.streaming_queue.put_nowait(f'data: {{"videoResults":{video_str}}}\n\n') |
|
await asyncio.sleep(0) |
|
ctx.write_event_to_stream("data:[DONE]\n\n") |
|
else: |
|
response_str = "" |
|
response = self.response_llm.stream(REFUSE_PROMPT, language=ev.lang) |
|
for token in response: |
|
response_str += token |
|
content = {"content": token} |
|
content = json.dumps(content) |
|
ctx.write_event_to_stream(TokenEvent(token=f"data:{content}\n\n")) |
|
await asyncio.sleep(0) |
|
ctx.write_event_to_stream("data:[DONE]\n\n") |
|
print(f"reponse_str: {response_str}") |
|
return StopEvent(result="success") |
|
|
|
@step |
|
async def intend_recognize(self, ctx: Context, ev: SafeStartEvent) -> CasualChatEvent | VideoResourceEvent | ImageResourceEvent | GeneralSearchEvent | SafeStartEvent | FullContextEvent: |
|
|
|
if ev.query in SPECIAL_TERMS: |
|
status = ProcessStatus(type="thinking", status="end").to_json() |
|
ctx.streaming_queue.put_nowait(f"data: {status}\n\n") |
|
await asyncio.sleep(0) |
|
|
|
self.chat_store.add_message(user_id=self.sessionId, role="user", content=ev.query) |
|
await ctx.set("response_mode", "Definition of Term") |
|
await ctx.set("original_query", ev.query) |
|
await ctx.set("query_event_ct", 1) |
|
video_query = ev.query + "Porn" |
|
image_query = ev.query + "Porn" |
|
web_query = f"What is {ev.query} in sexual context" |
|
ctx.send_event(VideoSearch(query=video_query)) |
|
ctx.send_event(ImageSearch(query=image_query)) |
|
return GeneralSearch(query=web_query, tag="Specific Knowledge") |
|
|
|
|
|
if "victoria snakeysmut" in ev.query.lower() or "snakeysmut" in ev.query.lower(): |
|
status = ProcessStatus(type="thinking", status="end").to_json() |
|
ctx.streaming_queue.put_nowait(f"data: {status}\n\n") |
|
await asyncio.sleep(0) |
|
|
|
self.chat_store.add_message(user_id=self.sessionId, role="user", content=ev.query) |
|
await ctx.set("response_mode", "ads-victoria") |
|
await ctx.set("original_query", ev.query) |
|
await ctx.set("query_event_ct", 1) |
|
web_query = "Victoria SnakeySmut" |
|
image_result = [{ |
|
"position": 1, |
|
"thumbnail": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/SnakeySmut.png", |
|
"related_content_id": "WkNzSFNndkhqVlBrOU1cIixcIk16bG1veURtUndJemZN", |
|
"serpapi_related_content_link": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/SnakeySmut.png", |
|
"source": "http://www.vibemate.com", |
|
"source_logo": "", |
|
"title": "Victoria SnakeySmut", |
|
"link": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/SnakeySmut.png", |
|
"original": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/SnakeySmut.png", |
|
"original_width": 2160, |
|
"original_height": 2700, |
|
"is_product": False, |
|
}] |
|
image_str = json.dumps(image_result) |
|
ctx.streaming_queue.put_nowait(f'data: {{"imageResults":{image_str}}}\n\n') |
|
return FullContextEvent(context_str="") |
|
if "chanell-heart" in ev.query.lower() or "chanell heart" in ev.query.lower(): |
|
status = ProcessStatus(type="thinking", status="end").to_json() |
|
ctx.streaming_queue.put_nowait(f"data: {status}\n\n") |
|
await asyncio.sleep(0) |
|
|
|
self.chat_store.add_message(user_id=self.sessionId, role="user", content=ev.query) |
|
await ctx.set("response_mode", "ads-chanell-heart") |
|
await ctx.set("original_query", ev.query) |
|
await ctx.set("query_event_ct", 1) |
|
web_query = "Chanell Heart" |
|
image_result=[ |
|
{ |
|
"position": 1, |
|
"thumbnail": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/chanell-heart.png", |
|
"related_content_id": "WkNzSFNndkhqVlBrOU1cIixcIk16bG1veURtUndJemZN", |
|
"serpapi_related_content_link": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/chanell-heart.png", |
|
"source": "http://www.vibemate.com", |
|
"source_logo": "", |
|
"title": "Chanell Heart", |
|
"link": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/chanell-heart.png", |
|
"original": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/chanell-heart.png", |
|
"original_width": 2160, |
|
"original_height": 2700, |
|
"is_product": False, |
|
} |
|
] |
|
image_str = json.dumps(image_result) |
|
ctx.streaming_queue.put_nowait(f'data: {{"imageResults":{image_str}}}\n\n') |
|
return FullContextEvent(context_str="") |
|
|
|
all_intents = INTENTS |
|
await ctx.set("original_query", ev.query) |
|
await ctx.set("query_event_ct", 0) |
|
if self.context_flag == 1: |
|
if self.retry_ct == 0: |
|
|
|
self.chat_store.add_message(user_id=self.sessionId, role="user", content=ev.query) |
|
|
|
self.chat_history = self.chat_store.get_chat_history(self.sessionId) |
|
else: |
|
self.chat_history = " " |
|
fmt_message = INTENT_EXTRACT_PROMPT.format_messages( |
|
user_input=ev.query, |
|
chat_history=self.chat_history, |
|
possible_intentions=all_intents, |
|
) |
|
response = self.response_llm.chat(fmt_message).message.content |
|
try: |
|
response = json.loads(response) |
|
isSex = str(response["adult"]) |
|
intents = response["intentions"] |
|
|
|
intents = reorder(intents, "Ask for specific Images", "Ask for specific Videos") |
|
await ctx.set("intention", intents) |
|
status = ProcessStatus(type="thinking", status="end").to_json() |
|
ctx.streaming_queue.put_nowait(f"data: {status}\n\n") |
|
await asyncio.sleep(0) |
|
print(f"intention: {intents}") |
|
if len(intents) == 1 and intents[0] == "Casual Chat": |
|
|
|
extra_status = ExtraStatus(adultMode=self.adultMode, intentionResult=intents, sensitiveResult=None, questionIsSex=isSex).to_json() |
|
ctx.streaming_queue.put_nowait(f"data: {extra_status}\n\n") |
|
await asyncio.sleep(0) |
|
return CasualChatEvent(query=ev.query) |
|
if "Ask for specific Website" in intents: |
|
|
|
extra_status = ExtraStatus(adultMode=self.adultMode, intentionResult=intents, sensitiveResult=None, questionIsSex=isSex).to_json() |
|
ctx.streaming_queue.put_nowait(f"data: {extra_status}\n\n") |
|
await asyncio.sleep(0) |
|
return(GeneralSearchEvent(query=ev.query, tag="Ask for specific Website")) |
|
if "A variety of resources search" in intents: |
|
|
|
extra_status = ExtraStatus(adultMode=self.adultMode, intentionResult=intents, sensitiveResult=None, questionIsSex=isSex).to_json() |
|
ctx.streaming_queue.put_nowait(f"data: {extra_status}\n\n") |
|
await asyncio.sleep(0) |
|
ctx.send_event(ImageResourceEvent(query=ev.query)) |
|
ctx.send_event(VideoResourceEvent(query=ev.query)) |
|
return GeneralSearchEvent(query=ev.query, tag="A variety of resources search") |
|
else: |
|
if "Casual Chat" in intents: |
|
intents.remove("Casual Chat") |
|
|
|
extra_status = ExtraStatus(adultMode=self.adultMode, intentionResult=intents, sensitiveResult=None, questionIsSex=isSex).to_json() |
|
|
|
ctx.streaming_queue.put_nowait(f"data: {extra_status}\n\n") |
|
await asyncio.sleep(0) |
|
for intent in intents: |
|
if intent in all_intents: |
|
if "Ask for specific Videos" == intent: |
|
status = ProcessStatus(type="videoResults", status="start") |
|
status_str = status.to_json() |
|
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') |
|
await asyncio.sleep(0) |
|
ctx.send_event(VideoResourceEvent(query=ev.query)) |
|
elif "Ask for specific Images" == intent: |
|
|
|
status = ProcessStatus(type="imageResults", status="start") |
|
status_str = status.to_json() |
|
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') |
|
await asyncio.sleep(0) |
|
ctx.send_event(ImageResourceEvent(query=ev.query)) |
|
elif "Specific Knowledge" == intent: |
|
ctx.send_event(GeneralSearchEvent(query=ev.query, tag="Specific Knowledge")) |
|
else: |
|
ctx.send_event(SafeStartEvent(query=ev.query)) |
|
break |
|
except Exception as e: |
|
if self.retry_ct < 3: |
|
self.retry_ct += 1 |
|
print(f"retry intention recognization: {e} - retry: {self.retry_ct}") |
|
return SafeStartEvent(query=ev.query) |
|
else: |
|
return CasualChatEvent(query=ev.query) |
|
|
|
@step |
|
async def query_rewrite(self, ctx: Context, ev: GeneralSearchEvent) -> GeneralSearch | MilvusDBSearchEvent: |
|
if self.context_flag == 1: |
|
|
|
chat_history = self.chat_history |
|
else: |
|
chat_history = " " |
|
|
|
language = await ctx.get("language", "en") |
|
intention = ev.tag |
|
format_input = QUERY_REWRITE_PROMPT.format_messages( |
|
chat_history=chat_history, |
|
query_str=ev.query, |
|
intention=intention, |
|
language=language |
|
) |
|
response = self.response_llm.chat(format_input).message.content |
|
try: |
|
response = json.loads(response) |
|
query = response["query"] |
|
|
|
print(f"rewrite query: {query}") |
|
await ctx.set("refined_query", query) |
|
|
|
curr_ct = await ctx.get("query_event_ct", 0) |
|
curr_ct += 1 |
|
ctx.send_event(GeneralSearch(query=query, tag=intention)) |
|
if ev.tag == "Specific Knowledge": |
|
curr_ct += 1 |
|
ctx.send_event(MilvusDBSearchEvent(query=query)) |
|
await ctx.set("query_event_ct", curr_ct) |
|
except: |
|
return GeneralSearchEvent(query=ev.query, tag=ev.tag) |
|
|
|
@step |
|
async def keywords_extraction(self, ctx: Context, ev: ImageResourceEvent | VideoResourceEvent) -> ImageSearch | VideoSearch: |
|
if self.context_flag == 1: |
|
|
|
chat_history = self.chat_history |
|
else: |
|
chat_history = " " |
|
format_input = KEYWORDS_EXTRACTION_PROMPT.format_messages( |
|
chat_history=chat_history, |
|
query_str=ev.query, |
|
) |
|
try: |
|
|
|
keywords = await ctx.get("keywords", "None") |
|
if keywords == "None": |
|
response = self.response_llm.chat(format_input).message.content |
|
response = json.loads(response) |
|
keywords = response["keywords"] |
|
if len(keywords) > 2: |
|
keywords = keywords[:2] |
|
print(f"extrated_keywords: {keywords}") |
|
keywords = " ".join(keywords) |
|
await ctx.set("keywords", keywords) |
|
if isinstance(ev, VideoResourceEvent): |
|
return VideoSearch(query=keywords) |
|
else: |
|
return ImageSearch(query=keywords) |
|
except: |
|
|
|
curr_ct = await ctx.get("query_event_ct", 0) |
|
await ctx.set("query_event_ct", curr_ct + 1) |
|
return GeneralSearch(query=ev.query, tag="") |
|
|
|
@step |
|
async def vector_search(self, ctx: Context, ev: MilvusDBSearchEvent) -> RetrieveContextEvent: |
|
retriever = MilvusRetriever(uri=MILVUS_URI) |
|
colleciton_names = ["t_sur_sex_ed_article_spider", "t_sur_sex_ed_question_answer_spider"] |
|
query = ev.query |
|
search_results = [] |
|
context_str = [] |
|
for collection_name in colleciton_names: |
|
if collection_name == "t_sur_sex_ed_article_spider": |
|
process_type = "sex_ed_article" |
|
if collection_name == "t_sur_sex_ed_question_answer_spider": |
|
process_type = "sex_ed_qa" |
|
|
|
|
|
status = ProcessStatus(type=process_type, status="start") |
|
status_str = status.to_json() |
|
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') |
|
await asyncio.sleep(0) |
|
|
|
res = retriever.search(query=query, collection_name=collection_name, top_k=5) |
|
res = [record for record in res if record["distance"] >= 0.7] |
|
search_results.append(res) |
|
result_dict = dict(zip(colleciton_names, search_results)) |
|
for collection_name, res in result_dict.items(): |
|
if collection_name == "t_sur_sex_ed_article_spider": |
|
articles = [record["entity"] for record in res] |
|
articles = json.dumps(articles) |
|
ctx.streaming_queue.put_nowait(f'data: {{"sex_ed_article":{articles}}}\n\n') |
|
await asyncio.sleep(0) |
|
|
|
status.update(status="end") |
|
status_str = status.to_json() |
|
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') |
|
await asyncio.sleep(0) |
|
context_str.append("Sex Education Articles:") |
|
for record in res: |
|
title = record["entity"]["title"] |
|
chunk = record["entity"]["chunk"] |
|
context_str.append(title + "\n" + chunk) |
|
else: |
|
qas = [record["entity"] for record in res] |
|
qas = json.dumps(qas) |
|
ctx.streaming_queue.put_nowait(f'data: {{"sex_ed_qa":{qas}}}\n\n') |
|
await asyncio.sleep(0) |
|
|
|
status.update(status="end") |
|
status_str = status.to_json() |
|
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') |
|
await asyncio.sleep(0) |
|
context_str.append("Sex Education Q&As:") |
|
for record in res: |
|
title = record["entity"]["title"] |
|
content = record["entity"]["content"] |
|
context_str.append(title + "\n" + content) |
|
context_str = "\n".join(context_str) |
|
return RetrieveContextEvent(context_str=context_str) |
|
|
|
@step |
|
async def image_search(self, ctx: Context, ev: ImageSearch) -> StopEvent | RetrieveContextEvent: |
|
loop = asyncio.get_event_loop() |
|
|
|
search_words = ev.query.strip().replace(" ", "+") |
|
ctx.streaming_queue.put_nowait(f'data: {{"image_searchWords":"{search_words}"}}\n\n') |
|
await asyncio.sleep(0) |
|
with concurrent.futures.ThreadPoolExecutor() as pool: |
|
image_result = await loop.run_in_executor(pool, image_search, ev.query, self.safe_search) |
|
if image_result: |
|
image_str = json.dumps(image_result) |
|
|
|
ctx.streaming_queue.put_nowait(f'data: {{"imageResults":{image_str}}}\n\n') |
|
await asyncio.sleep(0) |
|
|
|
status = ProcessStatus(type="imageResults", status="end").to_json() |
|
ctx.streaming_queue.put_nowait(f'data: {status}\n\n') |
|
await asyncio.sleep(0) |
|
intents = await ctx.get("intention", " ") |
|
if len(intents) == 1 and intents[0] == "Ask for specific Images": |
|
if self.context_flag == 1: |
|
|
|
|
|
t = threading.Thread(target=self.chat_store.add_message, args=(self.sessionId, "assistant", parse_image_content(image_result))) |
|
t.start() |
|
ctx.streaming_queue.put_nowait("data:[DONE]\n\n") |
|
await asyncio.sleep(0) |
|
return StopEvent(result="success") |
|
elif set(intents) == set(["Ask for specific Videos", "Ask for specific Images"]): |
|
curr_event_Ct = await ctx.get("query_event_ct", 0) |
|
curr_event_Ct = 2 |
|
await ctx.set("query_event_ct", curr_event_Ct) |
|
return RetrieveContextEvent(context_str="") |
|
else: |
|
|
|
status = ProcessStatus(type="imageResults", status="end").to_json() |
|
ctx.streaming_queue.put_nowait(f'data: {status}\n\n') |
|
await asyncio.sleep(0) |
|
return RetrieveContextEvent(context_str="") |
|
|
|
@step |
|
async def video_search(self, ctx: Context, ev: VideoSearch) -> StopEvent | RetrieveContextEvent: |
|
loop = asyncio.get_event_loop() |
|
|
|
search_words = ev.query.strip().replace(" ", "+") |
|
ctx.streaming_queue.put_nowait(f'data: {{"video_searchWords":"{search_words}"}}\n\n') |
|
await asyncio.sleep(0) |
|
with concurrent.futures.ThreadPoolExecutor() as pool: |
|
video_result = await loop.run_in_executor(pool, video_search, ev.query, self.safe_search) |
|
if video_result: |
|
video_str = json.dumps(video_result) |
|
|
|
ctx.streaming_queue.put_nowait(f'data: {{"videoResults":{video_str}}}\n\n') |
|
await asyncio.sleep(0) |
|
|
|
status = ProcessStatus(type="videoResults", status="end").to_json() |
|
ctx.streaming_queue.put_nowait(f'data: {status}\n\n') |
|
await asyncio.sleep(0) |
|
intents = await ctx.get("intention", " ") |
|
if len(intents) == 1 and intents[0] == "Ask for specific Videos": |
|
if self.context_flag == 1: |
|
|
|
|
|
t = threading.Thread(target=self.chat_store.add_message, args=(self.sessionId, "assistant", parse_video_content(video_result))) |
|
t.start() |
|
ctx.streaming_queue.put_nowait("data:[DONE]\n\n") |
|
await asyncio.sleep(0) |
|
return StopEvent(result="success") |
|
elif set(intents) == set(["Ask for specific Videos", "Ask for specific Images"]): |
|
curr_event_Ct = await ctx.get("query_event_ct", 0) |
|
curr_event_Ct = 2 |
|
await ctx.set("query_event_ct", curr_event_Ct) |
|
return RetrieveContextEvent(context_str="") |
|
else: |
|
status = ProcessStatus(type="videoResults", status="end").to_json() |
|
ctx.streaming_queue.put_nowait(f'data: {status}\n\n') |
|
await asyncio.sleep(0) |
|
return RetrieveContextEvent(context_str="") |
|
|
|
@step |
|
async def general_search(self, ctx: Context, ev: GeneralSearch) -> RetrieveContextEvent: |
|
|
|
status = ProcessStatus(type="webResults", status="start") |
|
status_str = status.to_json() |
|
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') |
|
await asyncio.sleep(0) |
|
|
|
if ev.ads == "Victoria SnakeySmut": |
|
status.update(status="end") |
|
status_str = status.to_json() |
|
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') |
|
await asyncio.sleep(0) |
|
web_result = [{'position': 1, 'title': 'Victoria SnakeySmut | Fansly', 'link': 'https://fansly.com/SnakeySmut', 'displayed_link': 'fansly.com/SnakeySmut', 'snippet': 'SnakeySmut conjures audio roleplays. Like the little noises I make with my mouth? Come see everything here! 18+ ONLY.'}, {'position': 2, 'title': 'Victoria (u/SnakeySmut) - Reddit', 'link': 'https://www.reddit.com/user/SnakeySmut/', 'displayed_link': 'www.reddit.com › user › SnakeySmut', 'snippet': 'u/SnakeySmut: The witchiest little treat ❤︎ Enjoy me as all dark things are to be loved, in secret and the shadows. Spooky girl with a proclivity for…'}, {'position': 3, 'title': 'victoria malfoy (@SnakeySmut) / X', 'link': 'https://x.com/snakeysmut?lang=en', 'displayed_link': 'x.com › snakeysmut', 'snippet': "victoria malfoy · @SnakeySmut. ·. Jan 2. Hi there, I'm Victoria A witch of many talents, specializing in fantasy fulfillment of aural and visual magic Links ..."}, {'position': 4, 'title': 'Highlights by victoria malfoy (@SnakeySmut) / X', 'link': 'https://twitter.com/SnakeySmut/highlights', 'displayed_link': 'twitter.com › SnakeySmut › highlights', 'snippet': "Posts · Replies · Highlights · Media. victoria malfoy's Highlights. victoria malfoy · @SnakeySmut. ·. Jan 13. you don't actually want to watch a movie, right?"}, {'position': 5, 'title': 'about - SnakeySmut', 'link': 'https://www.snakeysmut.com/aboutsnakeysmut', 'displayed_link': 'www.snakeysmut.com › aboutsnakeysmut', 'snippet': "Greetings, I'm Victoria. you know me best as snakeysmut."}] |
|
web_str = json.dumps(web_result) |
|
ctx.streaming_queue.put_nowait(f'data: {{"webResults":{web_str}}}\n\n') |
|
await asyncio.sleep(0) |
|
web_content = parse_web_search_content(web_result) |
|
return RetrieveContextEvent(context_str=web_content) |
|
if ev.ads == "Chanell Heart": |
|
status.update(status="end") |
|
status_str = status.to_json() |
|
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') |
|
await asyncio.sleep(0) |
|
web_result = general_search("Chanell Heart", mode=self.safe_search) |
|
web_str = json.dumps(web_result) |
|
ctx.streaming_queue.put_nowait(f'data: {{"webResults":{web_str}}}\n\n') |
|
await asyncio.sleep(0) |
|
web_content = parse_web_search_content(web_result) |
|
return RetrieveContextEvent(context_str=web_content) |
|
|
|
loop = asyncio.get_event_loop() |
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
|
web_result = await loop.run_in_executor( |
|
executor, general_search, ev.query, self.safe_search |
|
) |
|
|
|
if web_result: |
|
|
|
status.update(status="end") |
|
status_str = status.to_json() |
|
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') |
|
await asyncio.sleep(0) |
|
if ev.tag in ('A variety of resources search', 'Specific Knowledge'): |
|
|
|
|
|
substatus = ProcessStatus(type="web_summary", status="start") |
|
substatus_str = substatus.to_json() |
|
ctx.streaming_queue.put_nowait(f'data: {substatus_str}\n\n') |
|
await asyncio.sleep(0) |
|
urls = [web["link"] for web in web_result][:2] |
|
web_summaries = [] |
|
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: |
|
future_to_url = {executor.submit(web_reader, url): url for url in urls} |
|
for future in concurrent.futures.as_completed(future_to_url): |
|
try: |
|
summary = future.result() |
|
if summary: |
|
web_summaries.append(summary) |
|
except Exception as exc: |
|
continue |
|
|
|
web_summaries = "\n\n".join(web_summaries) |
|
web_str = json.dumps(web_result) |
|
ctx.streaming_queue.put_nowait(f'data: {{"webResults":{web_str}}}\n\n') |
|
|
|
|
|
substatus.update("end") |
|
substatus_str = substatus.to_json() |
|
ctx.streaming_queue.put_nowait(f'data: {substatus_str}\n\n') |
|
await asyncio.sleep(0) |
|
return RetrieveContextEvent(context_str="General Search Result:\n" + web_summaries) |
|
else: |
|
web_content = json.dumps(web_result) |
|
await ctx.set("response_mode", "WebsiteResponse") |
|
return RetrieveContextEvent(context_str=web_content) |
|
else: |
|
|
|
status.update(status="end") |
|
status_str = status.to_json() |
|
ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') |
|
await asyncio.sleep(0) |
|
return RetrieveContextEvent(context_str=" ") |
|
|
|
@step |
|
async def gather_context(self, ctx: Context, ev: RetrieveContextEvent) -> FullContextEvent | StopEvent: |
|
event_cts = await ctx.get("query_event_ct", 0) |
|
print(f"event_cts: {event_cts}") |
|
events = ctx.collect_events(ev, [RetrieveContextEvent] * event_cts) |
|
full_context = [] |
|
intents = await ctx.get("intention", " ") |
|
if set(intents) == set(["Ask for specific Videos", "Ask for specific Images"]): |
|
if events: |
|
for idx, event in enumerate(events): |
|
if idx == 1: |
|
ctx.write_event_to_stream(TokenEvent(token='data:[DONE]\n\n')) |
|
await asyncio.sleep(0) |
|
return StopEvent(result="success") |
|
if events: |
|
print(f"recevived {len(events)} events") |
|
for ev in events: |
|
full_context.append(ev.context_str) |
|
full_context = "\n\n".join(full_context)[:10000] |
|
return FullContextEvent(context_str=full_context) |
|
|
|
@step |
|
async def casual_response(self, ctx: Context, ev: CasualChatEvent) -> StopEvent: |
|
response_str = "" |
|
if self.context_flag == 1: |
|
chat_history = self.chat_history |
|
else: |
|
chat_history = "" |
|
language = await ctx.get("language", "en") |
|
|
|
start_phrase = await ctx.get("start_phrase", "") |
|
if start_phrase != "": |
|
start_phrase = self.response_llm.chat(TRANSLATE_PROMPT.format_messages(user_input=start_phrase, language=language)).message.content |
|
start_phrase = "*" + start_phrase + "*\n\n" |
|
response_str += start_phrase |
|
content = {"content": start_phrase} |
|
content = json.dumps(content) |
|
ctx.write_event_to_stream(TokenEvent(token=(f"data:{content}\n\n"))) |
|
await asyncio.sleep(0) |
|
|
|
response = self.response_llm.stream(CASUAL_CHAT_PROMPT, user_input=ev.query, chat_history=chat_history, language=language) |
|
for token in response: |
|
if token == "": |
|
continue |
|
time = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") |
|
response_str += token |
|
content = {"content": token, "time": time} |
|
content = json.dumps(content) |
|
ctx.write_event_to_stream(TokenEvent(token=(f"data:{content}\n\n"))) |
|
await asyncio.sleep(0) |
|
ctx.write_event_to_stream(TokenEvent(token="data:[DONE]\n\n")) |
|
print(f"reponse_str: {response_str}") |
|
if self.context_flag == 1: |
|
|
|
|
|
t = threading.Thread(target=self.chat_store.add_message, args=(self.sessionId, "assistant", response_str)) |
|
t.start() |
|
return StopEvent(result="success") |
|
|
|
@step |
|
async def response_synthesis(self, ctx: Context, ev: FullContextEvent) -> StopEvent: |
|
response_str = "" |
|
query_str = await ctx.get("refined_query", " ") |
|
original_query = await ctx.get("original_query") |
|
language = await ctx.get("language", "en") |
|
keywords = await ctx.get("keywords", " ") |
|
if query_str == " ": |
|
query_str = original_query |
|
|
|
|
|
|
|
start_phrase = await ctx.get("start_phrase", "") |
|
response_mode = await ctx.get("response_mode", "") |
|
if start_phrase != "": |
|
start_phrase = self.response_llm.chat(TRANSLATE_PROMPT.format_messages(user_input=start_phrase, language=language)).message.content |
|
start_phrase = "*" + start_phrase + "*\n\n" |
|
response_str += start_phrase |
|
content = {"content": start_phrase} |
|
content = json.dumps(content) |
|
ctx.write_event_to_stream(TokenEvent(token=(f"data:{content}\n\n"))) |
|
await asyncio.sleep(0) |
|
|
|
|
|
if response_mode == "ads-victoria": |
|
for c in "**Victoria SnakeySmut is going to join X3 show, she is a famous model on Fansly! you can open her page on vibemate to sync with her content.**\n\n": |
|
response_str += c |
|
content = json.dumps({"content": c}) |
|
ctx.write_event_to_stream(TokenEvent(token=f"data:{content}\n\n")) |
|
await asyncio.sleep(0) |
|
print(f"response_str:\n{response_str}\n") |
|
return StopEvent(result="success") |
|
if response_mode == "ads-chanell-heart": |
|
for c in "**Chanell-heart is joining the X3 show! She streams on Stripchat and is also popular on Pornhub. On VibeMate, connect your toys to sync with her during her cam shows and videos.**\n\n": |
|
response_str += c |
|
content = json.dumps({"content": c}) |
|
ctx.write_event_to_stream(TokenEvent(token=f"data:{content}\n\n")) |
|
await asyncio.sleep(0) |
|
print(f"response_str:\n{response_str}\n") |
|
return StopEvent(result="success") |
|
|
|
|
|
if response_mode == "WebsiteResponse": |
|
chat_response = self.response_llm.stream( |
|
prompt=WEBSIT_PROMPT, |
|
user_input=query_str, |
|
search_result=ev.context_str, |
|
language=language |
|
) |
|
elif response_mode == "Definition of Term": |
|
chat_response = self.response_llm.stream( |
|
prompt=TERM_PROMPT, |
|
user_input=query_str, |
|
search_result=ev.context_str, |
|
language='english' |
|
) |
|
else: |
|
chat_response = self.response_llm.stream( |
|
prompt=self.response_synthesis_prompt, |
|
search_keyword=query_str, |
|
search_result=ev.context_str, |
|
language=language |
|
) |
|
try: |
|
for token in chat_response: |
|
time = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") |
|
response_str += token |
|
content = {"content": token, "time": time} |
|
content = json.dumps(content) |
|
ctx.write_event_to_stream(TokenEvent(token=f"data:{content}\n\n")) |
|
await asyncio.sleep(0) |
|
|
|
extraction = self.response_llm.chat( |
|
RELATED_SEARCH_PROMPT.format_messages( |
|
keywords=keywords, |
|
|
|
retrieved_content=ev.context_str |
|
) |
|
).message.content |
|
try: |
|
extraction = json.loads(extraction) |
|
related_search = extraction["related_searches"] |
|
tags = extraction["tags"] |
|
if len(extraction["related_searches"])>3: |
|
related_search = related_search[:3] |
|
if len(extraction["tags"])>3: |
|
tags = tags[:3] |
|
y_related_search = json.dumps({"related_searches": related_search}) |
|
y_tags = json.dumps({"tags": tags}) |
|
ctx.write_event_to_stream(TokenEvent(token=f"data:{y_tags}\n\n")) |
|
ctx.write_event_to_stream(TokenEvent(token=f"data:{y_related_search}\n\n")) |
|
await asyncio.sleep(0) |
|
except Exception as e: |
|
print(f"Related searchs & tags JSONDecode ERROR: {e}") |
|
ctx.write_event_to_stream(TokenEvent(token="data:[DONE]\n\n")) |
|
print(f"response_str:\n{response_str}\n") |
|
if self.context_flag == 1: |
|
|
|
|
|
t = threading.Thread(target=self.chat_store.add_message, args=(self.sessionId, "assistant", response_str)) |
|
t.start() |
|
except Exception as e: |
|
print(f"Streaming Exception: {e}") |
|
|
|
return StopEvent(result="success") |
|
|
|
async def sql_workflow(query: str, chat_store: SimpleChatStore, sessionId: str, llm: Ollama, context_flag: int, adultMode: bool): |
|
response_synthesis_prompt = FINAL_RESPONSE_PROMPT |
|
wf = SQLWorkflow( |
|
response_llm=llm, |
|
response_synthesis_prompt=response_synthesis_prompt, |
|
chat_store=chat_store, |
|
sessionId=sessionId, |
|
context_flag=context_flag, |
|
adultMode=adultMode, |
|
verbose=True, |
|
timeout=60 |
|
) |
|
handler = wf.run(query=query) |
|
return handler.stream_events() |
|
|
|
def reorder(l: list, former: str, latter: str): |
|
former_idx = l.index(former) if former in l else -1 |
|
latter_idx = l.index(latter) if latter in l else -1 |
|
if former_idx > latter_idx and former_idx != -1 and latter_idx != -1: |
|
l[former_idx], l[latter_idx] = l[latter_idx], l[former_idx] |
|
return l |