import concurrent.futures import threading import asyncio, json, os from llama_index.core.workflow import ( Context, Workflow, StartEvent, StopEvent, step, ) from datetime import datetime from llama_index.llms.ollama import Ollama from llama_index.core.storage.chat_store import SimpleChatStore from llama_index.core.llms import ChatMessage from llama_index.core import PromptTemplate from workflow.events import * from milvusDB.retriever import MilvusRetriever from prompts.default_prompts import ( QUERY_REWRITE_PROMPT, FINAL_RESPONSE_PROMPT, INTENT_EXTRACT_PROMPT, CASUAL_CHAT_PROMPT, KEYWORDS_EXTRACTION_PROMPT, RELATED_SEARCH_PROMPT, ALIGNMENT_PROMPT, REFUSE_PROMPT, TRANSLATE_PROMPT, WEBSIT_PROMPT, TERM_PROMPT ) from workflow.modules import ( ProcessStatus, ExtraStatus, MySQLChatStore, parse_image_content, parse_video_content, parse_web_search_content, video_search, image_search, general_search, web_reader, ) from workflow.vllm_model import MyVllm from dotenv import load_dotenv load_dotenv() MILVUS_URI = os.getenv("MILVUS_URI") CHAT_STORE_PATH = os.getenv("CHAT_STORE_PATH") TABLE_SUMMARY = { "t_sur_media_sync_es": "This table is about Porn video information:\n\nt_sur_media_sync_es: Columns:id (integer), web_url (string), duration (integer), pattern_per (integer), like_count (integer), dislike_count (integer), view_count (integer), cover_picture (string), title (string), upload_date (datetime), uploader (string), create_time (datetime), update_time (datetime), categories (list of strings), abbreviate_video_url (string), abbreviate_mp4_video_url (string), resource_type (integer), like_count_show (string), stat_version (string), tags (list of strings), model_name (string), publisher_type (string), period (string), sexual_preference (string), country (string), type (string), rank_number (integer), rank_rate (string), has_pattern (boolean), trace (string), manifest_url (string), is_delete (boolean), web_url_md5 (string), view_key (string)", "t_sur_models_info": "This table is about Stripchat models' information:\n\nt_sur_models_info: Columns:id (INTEGER), username (VARCHAR(100), image (VARCHAR(500), num_users (INTEGER), pf (VARCHAR(50), pf_model_unite (VARCHAR(50), use_plugin (INTEGER), create_time (DATETIME), update_time (DATETIME), update_time (DATETIME), gender (VARCHAR(50), broadcast_type (VARCHAR(50), common_gender (VARCHAR(50), avatar (VARCHAR(512), age (INTEGER) " } INTENTS = ["Casual Chat", "Ask for specific Videos", "Ask for specific Images", "Ask for specific Website", "A variety of resources search", "Specific Knowledge"] START_PHRASE = "The AI-provided content is for reference only. For concerns, we encourage you to consult with qualified professionals." SPECIAL_TERMS = ["BDSM", "JOI", "Goon", "Futa", "Hentai", "Furry(Porn)", "Sissy(Porn)", "Pegging", "Cock Hero", "Femdom", "PMV", "Pornhub", "Femboy(Porn)", "Hypno(Porn)", "XXX", "Anal", "POV", "ASMR", "Futa"] REFUSE_INTENTS = [ "medical advice", "Overdose medication", "child pornography", "self-harm", "political bias", "hate speech", "illegal drugs", "not harmful", "violent tendencies", "weaponry", "religious hate", "Theft", "Robbery", "Body Disposal", "Forgery", "Smuggling", "Money laundering", "Extortion", "Terrorism", "Explosion", "Cyberattack & Hacking", "illegal stalking", "Arms trafficking", "make people vanish" ] class SQLWorkflow(Workflow): """ context字段: original_query: 当前输入query refined_query: 根据聊天记录重写精确的query query_event_ct: 产生的query event计数 start_phrase: medical advice起始句 language: 语言 keywords: 用于videos & images搜索的关键词 response_mode: 标记使用什么回复模版 ... """ def __init__( self, response_llm: MyVllm, response_synthesis_prompt: PromptTemplate, chat_store: MySQLChatStore, sessionId: str, context_flag: int, adultMode: bool, *args, **kwargs ): super().__init__(*args, **kwargs) self.response_llm = response_llm self.response_synthesis_prompt = response_synthesis_prompt self.chat_store = chat_store self.sessionId = sessionId self.context_flag = context_flag self.adultMode = adultMode self.retry_ct = 0 if self.adultMode: self.safe_search = "off" else: self.safe_search = "" @step async def alignment(self, ctx: Context, ev: StartEvent) -> SafeStartEvent | RefuseEvent | StartEvent: status = ProcessStatus(type="thinking", status="start") status_str = status.to_json() ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') await asyncio.sleep(0) if not self.adultMode: # 非成人模式 拒绝回答成人内容 adult_intents = ["adult content", "erotic"] normal_intents = ["normal", "Sex Education", "Sexual Health"] fmt_message = ALIGNMENT_PROMPT.format_messages( user_input=ev.query, intent_labels=adult_intents+normal_intents, ) response = self.response_llm.chat(fmt_message) try: response = json.loads(response.message.content) if response["intent"] in adult_intents: print(f"adultMode_0: {response['intent']}") # extraStatus extra_status = ExtraStatus(adultMode=self.adultMode, intentionResult=None, sensitiveResult=[response["intent"]], questionIsSex="1").to_json() ctx.streaming_queue.put_nowait(f"data: {extra_status}\n\n") await asyncio.sleep(0) # 拒绝回复,完成Thinking status.update("end") status_str = status.to_json() ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') await asyncio.sleep(0) return RefuseEvent( lang="english", query=ev.query, adult=True, ) except: if self.retry_ct < 3: self.retry_ct += 1 return StartEvent(query=ev.query) return SafeStartEvent(query=ev.query) intent_labels = REFUSE_INTENTS fmt_message = ALIGNMENT_PROMPT.format_messages( user_input=ev.query, intent_labels=intent_labels ) response = self.response_llm.chat(fmt_message) try: response = json.loads(response.message.content) intent = response["intent"] lang = response["language"] print(f"language: {lang}") # 检测中文输入默认用英文回复 if lang.lower() in ["zh", "chinese"]: lang = "english" await ctx.set("language", lang) print(f"FILTER STATUS: {intent}") if intent == "not harmful" or intent == "BDSM content": return SafeStartEvent(query=ev.query) elif intent == "medical advice": await ctx.set("start_phrase", START_PHRASE) return SafeStartEvent(query=ev.query) else: # send extraStatus extra_status = ExtraStatus(adultMode=self.adultMode, intentionResult=None, sensitiveResult=[response["intent"]], questionIsSex="0").to_json() ctx.streaming_queue.put_nowait(f"data: {extra_status}\n\n") # 拒绝回复,完成Thinking status.update("end") status_str = status.to_json() ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') await asyncio.sleep(0) return RefuseEvent(lang=lang) except: if self.retry_ct < 3: self.retry_ct += 1 return StartEvent(query=ev.query) return SafeStartEvent(query=ev.query) @step async def refuse(self, ctx: Context, ev: RefuseEvent) -> StopEvent: if self.adultMode == False and ev.adult == True: video_result = video_search(q=ev.query, mode="off") video_str = json.dumps(video_result) response = "I cannot provide details about this topic at the moment. Below are the links found for you by the search engine.\n\n" for c in response: content = {"content": c} content = json.dumps(content) ctx.write_event_to_stream(TokenEvent(token=f"data:{content}\n\n")) await asyncio.sleep(0) search_words = ev.query.strip().replace(" ", "+") ctx.streaming_queue.put_nowait(f'data: {{"video_searchWords":"{search_words}"}}\n\n') ctx.streaming_queue.put_nowait(f'data: {{"videoResults":{video_str}}}\n\n') await asyncio.sleep(0) ctx.write_event_to_stream("data:[DONE]\n\n") else: response_str = "" response = self.response_llm.stream(REFUSE_PROMPT, language=ev.lang) for token in response: response_str += token content = {"content": token} content = json.dumps(content) ctx.write_event_to_stream(TokenEvent(token=f"data:{content}\n\n")) await asyncio.sleep(0) ctx.write_event_to_stream("data:[DONE]\n\n") print(f"reponse_str: {response_str}") return StopEvent(result="success") @step async def intend_recognize(self, ctx: Context, ev: SafeStartEvent) -> CasualChatEvent | VideoResourceEvent | ImageResourceEvent | GeneralSearchEvent | SafeStartEvent | FullContextEvent: # special terms if ev.query in SPECIAL_TERMS: status = ProcessStatus(type="thinking", status="end").to_json() ctx.streaming_queue.put_nowait(f"data: {status}\n\n") await asyncio.sleep(0) # self.chat_store.add_message(key=self.sessionId, message=ChatMessage(role="user", content=ev.query)) self.chat_store.add_message(user_id=self.sessionId, role="user", content=ev.query) await ctx.set("response_mode", "Definition of Term") await ctx.set("original_query", ev.query) await ctx.set("query_event_ct", 1) # image和video内容不作为最后回复的参考 video_query = ev.query + "Porn" image_query = ev.query + "Porn" web_query = f"What is {ev.query} in sexual context" ctx.send_event(VideoSearch(query=video_query)) ctx.send_event(ImageSearch(query=image_query)) return GeneralSearch(query=web_query, tag="Specific Knowledge") # 25-1-15 special ads if "victoria snakeysmut" in ev.query.lower() or "snakeysmut" in ev.query.lower(): status = ProcessStatus(type="thinking", status="end").to_json() ctx.streaming_queue.put_nowait(f"data: {status}\n\n") await asyncio.sleep(0) # self.chat_store.add_message(key=self.sessionId, message=ChatMessage(role="user", content=ev.query)) self.chat_store.add_message(user_id=self.sessionId, role="user", content=ev.query) await ctx.set("response_mode", "ads-victoria") await ctx.set("original_query", ev.query) await ctx.set("query_event_ct", 1) # image和video内容不作为最后回复的参考 web_query = "Victoria SnakeySmut" image_result = [{ "position": 1, "thumbnail": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/SnakeySmut.png", "related_content_id": "WkNzSFNndkhqVlBrOU1cIixcIk16bG1veURtUndJemZN", "serpapi_related_content_link": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/SnakeySmut.png", "source": "http://www.vibemate.com", "source_logo": "", "title": "Victoria SnakeySmut", "link": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/SnakeySmut.png", "original": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/SnakeySmut.png", "original_width": 2160, "original_height": 2700, "is_product": False, }] image_str = json.dumps(image_result) ctx.streaming_queue.put_nowait(f'data: {{"imageResults":{image_str}}}\n\n') return FullContextEvent(context_str="") if "chanell-heart" in ev.query.lower() or "chanell heart" in ev.query.lower(): status = ProcessStatus(type="thinking", status="end").to_json() ctx.streaming_queue.put_nowait(f"data: {status}\n\n") await asyncio.sleep(0) # self.chat_store.add_message(key=self.sessionId, message=ChatMessage(role="user", content=ev.query)) self.chat_store.add_message(user_id=self.sessionId, role="user", content=ev.query) await ctx.set("response_mode", "ads-chanell-heart") await ctx.set("original_query", ev.query) await ctx.set("query_event_ct", 1) # image和video内容不作为最后回复的参考 web_query = "Chanell Heart" image_result=[ { "position": 1, "thumbnail": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/chanell-heart.png", "related_content_id": "WkNzSFNndkhqVlBrOU1cIixcIk16bG1veURtUndJemZN", "serpapi_related_content_link": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/chanell-heart.png", "source": "http://www.vibemate.com", "source_logo": "", "title": "Chanell Heart", "link": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/chanell-heart.png", "original": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/chanell-heart.png", "original_width": 2160, "original_height": 2700, "is_product": False, } ] image_str = json.dumps(image_result) ctx.streaming_queue.put_nowait(f'data: {{"imageResults":{image_str}}}\n\n') return FullContextEvent(context_str="") # intention all_intents = INTENTS await ctx.set("original_query", ev.query) await ctx.set("query_event_ct", 0) if self.context_flag == 1: if self.retry_ct == 0: # self.chat_store.add_message(self.sessionId, ChatMessage(role="user", content=ev.query)) self.chat_store.add_message(user_id=self.sessionId, role="user", content=ev.query) # chat_history = "\n".join([str(message) for message in self.chat_store.get_messages(self.sessionId)[-5:]]) self.chat_history = self.chat_store.get_chat_history(self.sessionId) else: self.chat_history = " " fmt_message = INTENT_EXTRACT_PROMPT.format_messages( user_input=ev.query, chat_history=self.chat_history, possible_intentions=all_intents, ) response = self.response_llm.chat(fmt_message).message.content try: response = json.loads(response) isSex = str(response["adult"]) intents = response["intentions"] # reorder intents = reorder(intents, "Ask for specific Images", "Ask for specific Videos") await ctx.set("intention", intents) status = ProcessStatus(type="thinking", status="end").to_json() ctx.streaming_queue.put_nowait(f"data: {status}\n\n") await asyncio.sleep(0) print(f"intention: {intents}") if len(intents) == 1 and intents[0] == "Casual Chat": # send extraStatus extra_status = ExtraStatus(adultMode=self.adultMode, intentionResult=intents, sensitiveResult=None, questionIsSex=isSex).to_json() ctx.streaming_queue.put_nowait(f"data: {extra_status}\n\n") await asyncio.sleep(0) return CasualChatEvent(query=ev.query) if "Ask for specific Website" in intents: # send extraStatus extra_status = ExtraStatus(adultMode=self.adultMode, intentionResult=intents, sensitiveResult=None, questionIsSex=isSex).to_json() ctx.streaming_queue.put_nowait(f"data: {extra_status}\n\n") await asyncio.sleep(0) return(GeneralSearchEvent(query=ev.query, tag="Ask for specific Website")) if "A variety of resources search" in intents: # send extraStatus extra_status = ExtraStatus(adultMode=self.adultMode, intentionResult=intents, sensitiveResult=None, questionIsSex=isSex).to_json() ctx.streaming_queue.put_nowait(f"data: {extra_status}\n\n") await asyncio.sleep(0) ctx.send_event(ImageResourceEvent(query=ev.query)) ctx.send_event(VideoResourceEvent(query=ev.query)) return GeneralSearchEvent(query=ev.query, tag="A variety of resources search") else: if "Casual Chat" in intents: intents.remove("Casual Chat") # send extraStatus extra_status = ExtraStatus(adultMode=self.adultMode, intentionResult=intents, sensitiveResult=None, questionIsSex=isSex).to_json() # ctx.write_event_to_stream(StatusEvent(status=f"data: {extra_status}\n\n")) ctx.streaming_queue.put_nowait(f"data: {extra_status}\n\n") await asyncio.sleep(0) for intent in intents: if intent in all_intents: if "Ask for specific Videos" == intent: status = ProcessStatus(type="videoResults", status="start") status_str = status.to_json() ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') await asyncio.sleep(0) ctx.send_event(VideoResourceEvent(query=ev.query)) elif "Ask for specific Images" == intent: # 发送状态信息 --> start status = ProcessStatus(type="imageResults", status="start") status_str = status.to_json() ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') await asyncio.sleep(0) ctx.send_event(ImageResourceEvent(query=ev.query)) elif "Specific Knowledge" == intent: ctx.send_event(GeneralSearchEvent(query=ev.query, tag="Specific Knowledge")) else: ctx.send_event(SafeStartEvent(query=ev.query)) break except Exception as e: if self.retry_ct < 3: self.retry_ct += 1 print(f"retry intention recognization: {e} - retry: {self.retry_ct}") return SafeStartEvent(query=ev.query) else: return CasualChatEvent(query=ev.query) @step async def query_rewrite(self, ctx: Context, ev: GeneralSearchEvent) -> GeneralSearch | MilvusDBSearchEvent: if self.context_flag == 1: # chat_history = "\n".join([str(message) for message in self.chat_store.get_messages(self.sessionId)[-5:]]) chat_history = self.chat_history else: chat_history = " " # print(f"chat_history: {chat_history}") language = await ctx.get("language", "en") intention = ev.tag format_input = QUERY_REWRITE_PROMPT.format_messages( chat_history=chat_history, query_str=ev.query, intention=intention, language=language ) response = self.response_llm.chat(format_input).message.content try: response = json.loads(response) query = response["query"] # sex_ed = response["sex_ed"] print(f"rewrite query: {query}") await ctx.set("refined_query", query) # searchevent 计数 curr_ct = await ctx.get("query_event_ct", 0) curr_ct += 1 ctx.send_event(GeneralSearch(query=query, tag=intention)) if ev.tag == "Specific Knowledge": curr_ct += 1 ctx.send_event(MilvusDBSearchEvent(query=query)) await ctx.set("query_event_ct", curr_ct) except: return GeneralSearchEvent(query=ev.query, tag=ev.tag) @step async def keywords_extraction(self, ctx: Context, ev: ImageResourceEvent | VideoResourceEvent) -> ImageSearch | VideoSearch: if self.context_flag == 1: # chat_history = "\n".join([str(message) for message in self.chat_store.get_messages(self.sessionId)[-5:]]) chat_history = self.chat_history else: chat_history = " " format_input = KEYWORDS_EXTRACTION_PROMPT.format_messages( chat_history=chat_history, query_str=ev.query, ) try: # 同时有视频,图片搜索只需要提取一次关键词供搜索使用 keywords = await ctx.get("keywords", "None") if keywords == "None": response = self.response_llm.chat(format_input).message.content response = json.loads(response) keywords = response["keywords"] if len(keywords) > 2: keywords = keywords[:2] print(f"extrated_keywords: {keywords}") keywords = " ".join(keywords) await ctx.set("keywords", keywords) if isinstance(ev, VideoResourceEvent): return VideoSearch(query=keywords) else: return ImageSearch(query=keywords) except: # searchevent 计数 curr_ct = await ctx.get("query_event_ct", 0) await ctx.set("query_event_ct", curr_ct + 1) return GeneralSearch(query=ev.query, tag="") @step async def vector_search(self, ctx: Context, ev: MilvusDBSearchEvent) -> RetrieveContextEvent: retriever = MilvusRetriever(uri=MILVUS_URI) colleciton_names = ["t_sur_sex_ed_article_spider", "t_sur_sex_ed_question_answer_spider"] query = ev.query search_results = [] context_str = [] for collection_name in colleciton_names: if collection_name == "t_sur_sex_ed_article_spider": process_type = "sex_ed_article" if collection_name == "t_sur_sex_ed_question_answer_spider": process_type = "sex_ed_qa" # 发送状态信息 --> start status = ProcessStatus(type=process_type, status="start") status_str = status.to_json() ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') await asyncio.sleep(0) # 搜索 res = retriever.search(query=query, collection_name=collection_name, top_k=5) res = [record for record in res if record["distance"] >= 0.7] search_results.append(res) result_dict = dict(zip(colleciton_names, search_results)) for collection_name, res in result_dict.items(): if collection_name == "t_sur_sex_ed_article_spider": articles = [record["entity"] for record in res] articles = json.dumps(articles) ctx.streaming_queue.put_nowait(f'data: {{"sex_ed_article":{articles}}}\n\n') await asyncio.sleep(0) # 发送状态信息 --> end status.update(status="end") status_str = status.to_json() ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') await asyncio.sleep(0) context_str.append("Sex Education Articles:") for record in res: title = record["entity"]["title"] chunk = record["entity"]["chunk"] context_str.append(title + "\n" + chunk) else: qas = [record["entity"] for record in res] qas = json.dumps(qas) ctx.streaming_queue.put_nowait(f'data: {{"sex_ed_qa":{qas}}}\n\n') await asyncio.sleep(0) # 发送状态信息 --> end status.update(status="end") status_str = status.to_json() ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') await asyncio.sleep(0) context_str.append("Sex Education Q&As:") for record in res: title = record["entity"]["title"] content = record["entity"]["content"] context_str.append(title + "\n" + content) context_str = "\n".join(context_str) return RetrieveContextEvent(context_str=context_str) @step async def image_search(self, ctx: Context, ev: ImageSearch) -> StopEvent | RetrieveContextEvent: loop = asyncio.get_event_loop() search_words = ev.query.strip().replace(" ", "+") ctx.streaming_queue.put_nowait(f'data: {{"image_searchWords":"{search_words}"}}\n\n') await asyncio.sleep(0) with concurrent.futures.ThreadPoolExecutor() as pool: image_result = await loop.run_in_executor(pool, image_search, ev.query, self.safe_search) if image_result: image_str = json.dumps(image_result) # google_url = f"https://www.google.com/search?q={search_words}&safe=off&udm=2" ctx.streaming_queue.put_nowait(f'data: {{"imageResults":{image_str}}}\n\n') await asyncio.sleep(0) # 发送状态信息 --> end status = ProcessStatus(type="imageResults", status="end").to_json() ctx.streaming_queue.put_nowait(f'data: {status}\n\n') await asyncio.sleep(0) intents = await ctx.get("intention", " ") if len(intents) == 1 and intents[0] == "Ask for specific Images": if self.context_flag == 1: # self.chat_store.add_message(self.sessionId, ChatMessage(role="assistant", content=parse_image_content(image_result))) # self.chat_store.persist(f"{CHAT_STORE_PATH}/testing_chat_store.json") t = threading.Thread(target=self.chat_store.add_message, args=(self.sessionId, "assistant", parse_image_content(image_result))) t.start() ctx.streaming_queue.put_nowait("data:[DONE]\n\n") await asyncio.sleep(0) return StopEvent(result="success") elif set(intents) == set(["Ask for specific Videos", "Ask for specific Images"]): curr_event_Ct = await ctx.get("query_event_ct", 0) curr_event_Ct = 2 await ctx.set("query_event_ct", curr_event_Ct) return RetrieveContextEvent(context_str="") else: # 发送状态信息 --> end status = ProcessStatus(type="imageResults", status="end").to_json() ctx.streaming_queue.put_nowait(f'data: {status}\n\n') await asyncio.sleep(0) return RetrieveContextEvent(context_str="") @step async def video_search(self, ctx: Context, ev: VideoSearch) -> StopEvent | RetrieveContextEvent: loop = asyncio.get_event_loop() search_words = ev.query.strip().replace(" ", "+") ctx.streaming_queue.put_nowait(f'data: {{"video_searchWords":"{search_words}"}}\n\n') await asyncio.sleep(0) with concurrent.futures.ThreadPoolExecutor() as pool: video_result = await loop.run_in_executor(pool, video_search, ev.query, self.safe_search) if video_result: video_str = json.dumps(video_result) # google_url = f"https://www.google.com/search?q={search_words}&safe=off&udm=7" ctx.streaming_queue.put_nowait(f'data: {{"videoResults":{video_str}}}\n\n') await asyncio.sleep(0) # 发送状态信息 --> end status = ProcessStatus(type="videoResults", status="end").to_json() ctx.streaming_queue.put_nowait(f'data: {status}\n\n') await asyncio.sleep(0) intents = await ctx.get("intention", " ") if len(intents) == 1 and intents[0] == "Ask for specific Videos": if self.context_flag == 1: # self.chat_store.add_message(self.sessionId, ChatMessage(role="assistant", content=parse_video_content(video_result))) # self.chat_store.persist(f"{CHAT_STORE_PATH}/testing_chat_store.json") t = threading.Thread(target=self.chat_store.add_message, args=(self.sessionId, "assistant", parse_video_content(video_result))) t.start() ctx.streaming_queue.put_nowait("data:[DONE]\n\n") await asyncio.sleep(0) return StopEvent(result="success") elif set(intents) == set(["Ask for specific Videos", "Ask for specific Images"]): curr_event_Ct = await ctx.get("query_event_ct", 0) curr_event_Ct = 2 await ctx.set("query_event_ct", curr_event_Ct) return RetrieveContextEvent(context_str="") else: status = ProcessStatus(type="videoResults", status="end").to_json() ctx.streaming_queue.put_nowait(f'data: {status}\n\n') await asyncio.sleep(0) return RetrieveContextEvent(context_str="") @step async def general_search(self, ctx: Context, ev: GeneralSearch) -> RetrieveContextEvent: # 发送状态信息 --> start status = ProcessStatus(type="webResults", status="start") status_str = status.to_json() ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') await asyncio.sleep(0) if ev.ads == "Victoria SnakeySmut": status.update(status="end") status_str = status.to_json() ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') await asyncio.sleep(0) web_result = [{'position': 1, 'title': 'Victoria SnakeySmut | Fansly', 'link': 'https://fansly.com/SnakeySmut', 'displayed_link': 'fansly.com/SnakeySmut', 'snippet': 'SnakeySmut conjures audio roleplays. Like the little noises I make with my mouth? Come see everything here! 18+ ONLY.'}, {'position': 2, 'title': 'Victoria (u/SnakeySmut) - Reddit', 'link': 'https://www.reddit.com/user/SnakeySmut/', 'displayed_link': 'www.reddit.com › user › SnakeySmut', 'snippet': 'u/SnakeySmut: The witchiest little treat ❤︎ Enjoy me as all dark things are to be loved, in secret and the shadows. Spooky girl with a proclivity for…'}, {'position': 3, 'title': 'victoria malfoy (@SnakeySmut) / X', 'link': 'https://x.com/snakeysmut?lang=en', 'displayed_link': 'x.com › snakeysmut', 'snippet': "victoria malfoy · @SnakeySmut. ·. Jan 2. Hi there, I'm Victoria A witch of many talents, specializing in fantasy fulfillment of aural and visual magic Links ..."}, {'position': 4, 'title': 'Highlights by victoria malfoy (@SnakeySmut) / X', 'link': 'https://twitter.com/SnakeySmut/highlights', 'displayed_link': 'twitter.com › SnakeySmut › highlights', 'snippet': "Posts · Replies · Highlights · Media. victoria malfoy's Highlights. victoria malfoy · @SnakeySmut. ·. Jan 13. you don't actually want to watch a movie, right?"}, {'position': 5, 'title': 'about - SnakeySmut', 'link': 'https://www.snakeysmut.com/aboutsnakeysmut', 'displayed_link': 'www.snakeysmut.com › aboutsnakeysmut', 'snippet': "Greetings, I'm Victoria. you know me best as snakeysmut."}] web_str = json.dumps(web_result) ctx.streaming_queue.put_nowait(f'data: {{"webResults":{web_str}}}\n\n') await asyncio.sleep(0) web_content = parse_web_search_content(web_result) return RetrieveContextEvent(context_str=web_content) if ev.ads == "Chanell Heart": status.update(status="end") status_str = status.to_json() ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') await asyncio.sleep(0) web_result = general_search("Chanell Heart", mode=self.safe_search) web_str = json.dumps(web_result) ctx.streaming_queue.put_nowait(f'data: {{"webResults":{web_str}}}\n\n') await asyncio.sleep(0) web_content = parse_web_search_content(web_result) return RetrieveContextEvent(context_str=web_content) loop = asyncio.get_event_loop() with concurrent.futures.ThreadPoolExecutor() as executor: web_result = await loop.run_in_executor( executor, general_search, ev.query, self.safe_search ) if web_result: # 发送状态信息 --> end (搜索) status.update(status="end") status_str = status.to_json() ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') await asyncio.sleep(0) if ev.tag in ('A variety of resources search', 'Specific Knowledge'): # 总结网页内容 # 发送状态信息 --> start (总结) substatus = ProcessStatus(type="web_summary", status="start") substatus_str = substatus.to_json() ctx.streaming_queue.put_nowait(f'data: {substatus_str}\n\n') await asyncio.sleep(0) urls = [web["link"] for web in web_result][:2] web_summaries = [] with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: future_to_url = {executor.submit(web_reader, url): url for url in urls} for future in concurrent.futures.as_completed(future_to_url): try: summary = future.result() if summary: web_summaries.append(summary) except Exception as exc: continue web_summaries = "\n\n".join(web_summaries) web_str = json.dumps(web_result) ctx.streaming_queue.put_nowait(f'data: {{"webResults":{web_str}}}\n\n') # 发送状态信息 --> end (总结) substatus.update("end") substatus_str = substatus.to_json() ctx.streaming_queue.put_nowait(f'data: {substatus_str}\n\n') await asyncio.sleep(0) return RetrieveContextEvent(context_str="General Search Result:\n" + web_summaries) else: web_content = json.dumps(web_result) await ctx.set("response_mode", "WebsiteResponse") return RetrieveContextEvent(context_str=web_content) else: # 发送状态信息 --> end (搜索) status.update(status="end") status_str = status.to_json() ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n') await asyncio.sleep(0) return RetrieveContextEvent(context_str=" ") @step async def gather_context(self, ctx: Context, ev: RetrieveContextEvent) -> FullContextEvent | StopEvent: event_cts = await ctx.get("query_event_ct", 0) print(f"event_cts: {event_cts}") events = ctx.collect_events(ev, [RetrieveContextEvent] * event_cts) full_context = [] intents = await ctx.get("intention", " ") if set(intents) == set(["Ask for specific Videos", "Ask for specific Images"]): if events: for idx, event in enumerate(events): if idx == 1: ctx.write_event_to_stream(TokenEvent(token='data:[DONE]\n\n')) await asyncio.sleep(0) return StopEvent(result="success") if events: print(f"recevived {len(events)} events") for ev in events: full_context.append(ev.context_str) full_context = "\n\n".join(full_context)[:10000] return FullContextEvent(context_str=full_context) @step async def casual_response(self, ctx: Context, ev: CasualChatEvent) -> StopEvent: response_str = "" if self.context_flag == 1: chat_history = self.chat_history else: chat_history = "" language = await ctx.get("language", "en") start_phrase = await ctx.get("start_phrase", "") if start_phrase != "": start_phrase = self.response_llm.chat(TRANSLATE_PROMPT.format_messages(user_input=start_phrase, language=language)).message.content start_phrase = "*" + start_phrase + "*\n\n" response_str += start_phrase content = {"content": start_phrase} content = json.dumps(content) ctx.write_event_to_stream(TokenEvent(token=(f"data:{content}\n\n"))) await asyncio.sleep(0) response = self.response_llm.stream(CASUAL_CHAT_PROMPT, user_input=ev.query, chat_history=chat_history, language=language) for token in response: if token == "": continue time = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") response_str += token content = {"content": token, "time": time} content = json.dumps(content) ctx.write_event_to_stream(TokenEvent(token=(f"data:{content}\n\n"))) await asyncio.sleep(0) ctx.write_event_to_stream(TokenEvent(token="data:[DONE]\n\n")) print(f"reponse_str: {response_str}") if self.context_flag == 1: # self.chat_store.add_message(self.sessionId, ChatMessage(role="assistant", content=response_str)) # self.chat_store.persist(f"{CHAT_STORE_PATH}/testing_chat_store.json") t = threading.Thread(target=self.chat_store.add_message, args=(self.sessionId, "assistant", response_str)) t.start() return StopEvent(result="success") @step async def response_synthesis(self, ctx: Context, ev: FullContextEvent) -> StopEvent: response_str = "" query_str = await ctx.get("refined_query", " ") original_query = await ctx.get("original_query") language = await ctx.get("language", "en") keywords = await ctx.get("keywords", " ") if query_str == " ": query_str = original_query # stream 最终回复 # 医疗信息起始句 start_phrase = await ctx.get("start_phrase", "") response_mode = await ctx.get("response_mode", "") if start_phrase != "": start_phrase = self.response_llm.chat(TRANSLATE_PROMPT.format_messages(user_input=start_phrase, language=language)).message.content start_phrase = "*" + start_phrase + "*\n\n" response_str += start_phrase content = {"content": start_phrase} content = json.dumps(content) ctx.write_event_to_stream(TokenEvent(token=(f"data:{content}\n\n"))) await asyncio.sleep(0) # 临时ads if response_mode == "ads-victoria": for c in "**Victoria SnakeySmut is going to join X3 show, she is a famous model on Fansly! you can open her page on vibemate to sync with her content.**\n\n": response_str += c content = json.dumps({"content": c}) ctx.write_event_to_stream(TokenEvent(token=f"data:{content}\n\n")) await asyncio.sleep(0) print(f"response_str:\n{response_str}\n") return StopEvent(result="success") if response_mode == "ads-chanell-heart": for c in "**Chanell-heart is joining the X3 show! She streams on Stripchat and is also popular on Pornhub. On VibeMate, connect your toys to sync with her during her cam shows and videos.**\n\n": response_str += c content = json.dumps({"content": c}) ctx.write_event_to_stream(TokenEvent(token=f"data:{content}\n\n")) await asyncio.sleep(0) print(f"response_str:\n{response_str}\n") return StopEvent(result="success") # 选择回复模版 if response_mode == "WebsiteResponse": chat_response = self.response_llm.stream( prompt=WEBSIT_PROMPT, user_input=query_str, search_result=ev.context_str, language=language ) elif response_mode == "Definition of Term": chat_response = self.response_llm.stream( prompt=TERM_PROMPT, user_input=query_str, search_result=ev.context_str, language='english' ) else: chat_response = self.response_llm.stream( prompt=self.response_synthesis_prompt, search_keyword=query_str, search_result=ev.context_str, language=language ) try: for token in chat_response: time = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") response_str += token content = {"content": token, "time": time} content = json.dumps(content) ctx.write_event_to_stream(TokenEvent(token=f"data:{content}\n\n")) await asyncio.sleep(0) # 关联搜索词 extraction = self.response_llm.chat( RELATED_SEARCH_PROMPT.format_messages( keywords=keywords, # chat_history="assistant: " + response_str, retrieved_content=ev.context_str ) ).message.content try: extraction = json.loads(extraction) related_search = extraction["related_searches"] tags = extraction["tags"] if len(extraction["related_searches"])>3: related_search = related_search[:3] if len(extraction["tags"])>3: tags = tags[:3] y_related_search = json.dumps({"related_searches": related_search}) y_tags = json.dumps({"tags": tags}) ctx.write_event_to_stream(TokenEvent(token=f"data:{y_tags}\n\n")) ctx.write_event_to_stream(TokenEvent(token=f"data:{y_related_search}\n\n")) await asyncio.sleep(0) except Exception as e: print(f"Related searchs & tags JSONDecode ERROR: {e}") ctx.write_event_to_stream(TokenEvent(token="data:[DONE]\n\n")) print(f"response_str:\n{response_str}\n") if self.context_flag == 1: # self.chat_store.add_message(self.sessionId, ChatMessage(role="assistant", content=response_str)) # self.chat_store.persist(f"{CHAT_STORE_PATH}/testing_chat_store.json") t = threading.Thread(target=self.chat_store.add_message, args=(self.sessionId, "assistant", response_str)) t.start() except Exception as e: print(f"Streaming Exception: {e}") return StopEvent(result="success") async def sql_workflow(query: str, chat_store: SimpleChatStore, sessionId: str, llm: Ollama, context_flag: int, adultMode: bool): response_synthesis_prompt = FINAL_RESPONSE_PROMPT wf = SQLWorkflow( response_llm=llm, response_synthesis_prompt=response_synthesis_prompt, chat_store=chat_store, sessionId=sessionId, context_flag=context_flag, adultMode=adultMode, verbose=True, timeout=60 ) handler = wf.run(query=query) return handler.stream_events() def reorder(l: list, former: str, latter: str): former_idx = l.index(former) if former in l else -1 latter_idx = l.index(latter) if latter in l else -1 if former_idx > latter_idx and former_idx != -1 and latter_idx != -1: l[former_idx], l[latter_idx] = l[latter_idx], l[former_idx] return l