File size: 44,928 Bytes
318db6e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
import concurrent.futures
import threading
import asyncio, json, os
from llama_index.core.workflow import (
    Context,
    Workflow,
    StartEvent,
    StopEvent,
    step,
)
from datetime import datetime
from llama_index.llms.ollama import Ollama
from llama_index.core.storage.chat_store import SimpleChatStore
from llama_index.core.llms import ChatMessage
from llama_index.core import PromptTemplate
from workflow.events import *
from milvusDB.retriever import MilvusRetriever
from prompts.default_prompts import ( 
    QUERY_REWRITE_PROMPT,
    FINAL_RESPONSE_PROMPT,
    INTENT_EXTRACT_PROMPT,
    CASUAL_CHAT_PROMPT,
    KEYWORDS_EXTRACTION_PROMPT,
    RELATED_SEARCH_PROMPT,
    ALIGNMENT_PROMPT,
    REFUSE_PROMPT,
    TRANSLATE_PROMPT,
    WEBSIT_PROMPT,
    TERM_PROMPT
)

from workflow.modules import (
    ProcessStatus,
    ExtraStatus,
    MySQLChatStore,
    parse_image_content,
    parse_video_content,
    parse_web_search_content,
    video_search,
    image_search,
    general_search,
    web_reader,
)
from workflow.vllm_model import MyVllm
from dotenv import load_dotenv

load_dotenv()
MILVUS_URI = os.getenv("MILVUS_URI")
CHAT_STORE_PATH = os.getenv("CHAT_STORE_PATH")

TABLE_SUMMARY = {
    "t_sur_media_sync_es": "This table is about Porn video information:\n\nt_sur_media_sync_es: Columns:id (integer), web_url (string), duration (integer), pattern_per (integer), like_count (integer), dislike_count (integer), view_count (integer), cover_picture (string), title (string), upload_date (datetime), uploader (string), create_time (datetime), update_time (datetime), categories (list of strings), abbreviate_video_url (string), abbreviate_mp4_video_url (string), resource_type (integer), like_count_show (string), stat_version (string), tags (list of strings), model_name (string), publisher_type (string), period (string), sexual_preference (string), country (string), type (string), rank_number (integer), rank_rate (string), has_pattern (boolean), trace (string), manifest_url (string), is_delete (boolean), web_url_md5 (string), view_key (string)",
    "t_sur_models_info": "This table is about Stripchat models' information:\n\nt_sur_models_info: Columns:id (INTEGER), username (VARCHAR(100), image (VARCHAR(500), num_users (INTEGER), pf (VARCHAR(50), pf_model_unite (VARCHAR(50), use_plugin (INTEGER), create_time (DATETIME), update_time (DATETIME), update_time (DATETIME), gender (VARCHAR(50), broadcast_type (VARCHAR(50), common_gender (VARCHAR(50), avatar (VARCHAR(512), age (INTEGER) "
}

INTENTS = ["Casual Chat", "Ask for specific Videos", "Ask for specific Images", "Ask for specific Website", "A variety of resources search", "Specific Knowledge"]
START_PHRASE = "The AI-provided content is for reference only. For concerns, we encourage you to consult with qualified professionals."
SPECIAL_TERMS = ["BDSM", "JOI", "Goon", "Futa", "Hentai", "Furry(Porn)", "Sissy(Porn)", "Pegging", "Cock Hero", "Femdom", "PMV", "Pornhub", "Femboy(Porn)", "Hypno(Porn)", "XXX", "Anal", "POV", "ASMR", "Futa"]
REFUSE_INTENTS = [
    "medical advice", "Overdose medication", "child pornography", "self-harm", "political bias", "hate speech", "illegal drugs", "not harmful", "violent tendencies", "weaponry", "religious hate", "Theft", "Robbery", "Body Disposal", "Forgery", "Smuggling", "Money laundering", "Extortion", "Terrorism", "Explosion", "Cyberattack & Hacking", "illegal stalking", "Arms trafficking", "make people vanish"
]

class SQLWorkflow(Workflow):
    """
        context字段:
            original_query: 当前输入query
            refined_query: 根据聊天记录重写精确的query
            query_event_ct: 产生的query event计数
            start_phrase: medical advice起始句
            language: 语言
            keywords: 用于videos & images搜索的关键词
            response_mode: 标记使用什么回复模版
            ...
    """
    def __init__(
        self,
        response_llm: MyVllm,
        response_synthesis_prompt: PromptTemplate,
        chat_store: MySQLChatStore,
        sessionId: str,
        context_flag: int,
        adultMode: bool,
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.response_llm = response_llm
        self.response_synthesis_prompt = response_synthesis_prompt
        self.chat_store = chat_store
        self.sessionId = sessionId
        self.context_flag = context_flag
        self.adultMode = adultMode
        self.retry_ct = 0
        if self.adultMode:
            self.safe_search = "off"
        else:
            self.safe_search = ""

    @step
    async def alignment(self, ctx: Context, ev: StartEvent) -> SafeStartEvent | RefuseEvent | StartEvent:
        status = ProcessStatus(type="thinking", status="start")
        status_str = status.to_json()
        ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
        await asyncio.sleep(0)
        if not self.adultMode:
            # 非成人模式 拒绝回答成人内容
            adult_intents = ["adult content", "erotic"]
            normal_intents = ["normal", "Sex Education", "Sexual Health"]
            fmt_message = ALIGNMENT_PROMPT.format_messages(
                user_input=ev.query,
                intent_labels=adult_intents+normal_intents,
            )
            response = self.response_llm.chat(fmt_message)
            try:
                response = json.loads(response.message.content)
                if response["intent"] in adult_intents:
                    print(f"adultMode_0: {response['intent']}")
                    # extraStatus
                    extra_status = ExtraStatus(adultMode=self.adultMode, intentionResult=None, sensitiveResult=[response["intent"]], questionIsSex="1").to_json()
                    ctx.streaming_queue.put_nowait(f"data: {extra_status}\n\n")
                    await asyncio.sleep(0)
                    # 拒绝回复,完成Thinking
                    status.update("end")
                    status_str = status.to_json()
                    ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
                    await asyncio.sleep(0)
                    return RefuseEvent(
                        lang="english",
                        query=ev.query,
                        adult=True,
                    )
            except:
                if self.retry_ct < 3:
                    self.retry_ct += 1
                    return StartEvent(query=ev.query)
                return SafeStartEvent(query=ev.query)
        intent_labels = REFUSE_INTENTS
        fmt_message = ALIGNMENT_PROMPT.format_messages(
            user_input=ev.query,
            intent_labels=intent_labels
        )
        response = self.response_llm.chat(fmt_message)
        try:
            response = json.loads(response.message.content)
            intent = response["intent"]
            lang = response["language"]
            print(f"language: {lang}")
            # 检测中文输入默认用英文回复
            if lang.lower() in ["zh", "chinese"]:
                lang = "english"
            await ctx.set("language", lang)
            print(f"FILTER STATUS: {intent}")
            if intent == "not harmful" or intent == "BDSM content":
                return SafeStartEvent(query=ev.query)
            elif intent == "medical advice":
                await ctx.set("start_phrase", START_PHRASE)
                return SafeStartEvent(query=ev.query)
            else:
                # send extraStatus
                extra_status = ExtraStatus(adultMode=self.adultMode, intentionResult=None, sensitiveResult=[response["intent"]], questionIsSex="0").to_json()
                ctx.streaming_queue.put_nowait(f"data: {extra_status}\n\n")
                # 拒绝回复,完成Thinking
                status.update("end")
                status_str = status.to_json()
                ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
                await asyncio.sleep(0)
                return RefuseEvent(lang=lang)
        except:
            if self.retry_ct < 3:
                self.retry_ct += 1
                return StartEvent(query=ev.query)
            return SafeStartEvent(query=ev.query)

    @step
    async def refuse(self, ctx: Context, ev: RefuseEvent) -> StopEvent:
        if self.adultMode == False and ev.adult == True:
            video_result = video_search(q=ev.query, mode="off")
            video_str = json.dumps(video_result)
            response = "I cannot provide details about this topic at the moment. Below are the links found for you by the search engine.\n\n"
            for c in response:
                content = {"content": c}
                content = json.dumps(content)
                ctx.write_event_to_stream(TokenEvent(token=f"data:{content}\n\n"))
                await asyncio.sleep(0)
            search_words = ev.query.strip().replace(" ", "+")
            ctx.streaming_queue.put_nowait(f'data: {{"video_searchWords":"{search_words}"}}\n\n')
            ctx.streaming_queue.put_nowait(f'data: {{"videoResults":{video_str}}}\n\n')
            await asyncio.sleep(0)
            ctx.write_event_to_stream("data:[DONE]\n\n")
        else:
            response_str = ""
            response = self.response_llm.stream(REFUSE_PROMPT, language=ev.lang)
            for token in response:
                response_str += token
                content = {"content": token}
                content = json.dumps(content)
                ctx.write_event_to_stream(TokenEvent(token=f"data:{content}\n\n"))
                await asyncio.sleep(0)
            ctx.write_event_to_stream("data:[DONE]\n\n")
            print(f"reponse_str: {response_str}")
        return StopEvent(result="success")

    @step
    async def intend_recognize(self, ctx: Context, ev: SafeStartEvent) -> CasualChatEvent | VideoResourceEvent | ImageResourceEvent | GeneralSearchEvent | SafeStartEvent | FullContextEvent:
        # special terms
        if ev.query in SPECIAL_TERMS:
            status = ProcessStatus(type="thinking", status="end").to_json()
            ctx.streaming_queue.put_nowait(f"data: {status}\n\n")
            await asyncio.sleep(0)
            # self.chat_store.add_message(key=self.sessionId, message=ChatMessage(role="user", content=ev.query))
            self.chat_store.add_message(user_id=self.sessionId, role="user", content=ev.query)
            await ctx.set("response_mode", "Definition of Term")
            await ctx.set("original_query", ev.query)
            await ctx.set("query_event_ct", 1) # image和video内容不作为最后回复的参考
            video_query = ev.query + "Porn"
            image_query = ev.query + "Porn"
            web_query = f"What is {ev.query} in sexual context"
            ctx.send_event(VideoSearch(query=video_query))
            ctx.send_event(ImageSearch(query=image_query))
            return GeneralSearch(query=web_query, tag="Specific Knowledge")
        
        # 25-1-15 special ads
        if "victoria snakeysmut" in ev.query.lower() or "snakeysmut" in ev.query.lower():
            status = ProcessStatus(type="thinking", status="end").to_json()
            ctx.streaming_queue.put_nowait(f"data: {status}\n\n")
            await asyncio.sleep(0)
            # self.chat_store.add_message(key=self.sessionId, message=ChatMessage(role="user", content=ev.query))
            self.chat_store.add_message(user_id=self.sessionId, role="user", content=ev.query)
            await ctx.set("response_mode", "ads-victoria")
            await ctx.set("original_query", ev.query)
            await ctx.set("query_event_ct", 1) # image和video内容不作为最后回复的参考
            web_query = "Victoria SnakeySmut"
            image_result = [{
                "position": 1,
                "thumbnail": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/SnakeySmut.png",
                "related_content_id": "WkNzSFNndkhqVlBrOU1cIixcIk16bG1veURtUndJemZN",
                "serpapi_related_content_link": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/SnakeySmut.png",
                "source": "http://www.vibemate.com",
                "source_logo": "",
                "title": "Victoria SnakeySmut",
                "link": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/SnakeySmut.png",
                "original": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/SnakeySmut.png",
                "original_width": 2160,
                "original_height": 2700,
                "is_product": False,
            }]
            image_str = json.dumps(image_result)
            ctx.streaming_queue.put_nowait(f'data: {{"imageResults":{image_str}}}\n\n')
            return FullContextEvent(context_str="")
        if "chanell-heart" in ev.query.lower() or "chanell heart" in ev.query.lower():
            status = ProcessStatus(type="thinking", status="end").to_json()
            ctx.streaming_queue.put_nowait(f"data: {status}\n\n")
            await asyncio.sleep(0)
            # self.chat_store.add_message(key=self.sessionId, message=ChatMessage(role="user", content=ev.query))
            self.chat_store.add_message(user_id=self.sessionId, role="user", content=ev.query)
            await ctx.set("response_mode", "ads-chanell-heart")
            await ctx.set("original_query", ev.query)
            await ctx.set("query_event_ct", 1) # image和video内容不作为最后回复的参考
            web_query = "Chanell Heart"
            image_result=[
                {
                    "position": 1,
                    "thumbnail": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/chanell-heart.png",
                    "related_content_id": "WkNzSFNndkhqVlBrOU1cIixcIk16bG1veURtUndJemZN",
                    "serpapi_related_content_link": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/chanell-heart.png",
                    "source": "http://www.vibemate.com",
                    "source_logo": "",
                    "title": "Chanell Heart",
                    "link": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/chanell-heart.png",
                    "original": "https://cdn.lovense-api.com/UploadFiles/surfease/x3/chanell-heart.png",
                    "original_width": 2160,
                    "original_height": 2700,
                    "is_product": False,
                }
            ]
            image_str = json.dumps(image_result)
            ctx.streaming_queue.put_nowait(f'data: {{"imageResults":{image_str}}}\n\n')
            return FullContextEvent(context_str="")
        # intention
        all_intents = INTENTS
        await ctx.set("original_query", ev.query)
        await ctx.set("query_event_ct", 0)
        if self.context_flag == 1:
            if self.retry_ct == 0:
                # self.chat_store.add_message(self.sessionId, ChatMessage(role="user", content=ev.query)) 
                self.chat_store.add_message(user_id=self.sessionId, role="user", content=ev.query)
            # chat_history = "\n".join([str(message) for message in self.chat_store.get_messages(self.sessionId)[-5:]])
            self.chat_history = self.chat_store.get_chat_history(self.sessionId)
        else:
            self.chat_history = " "
        fmt_message = INTENT_EXTRACT_PROMPT.format_messages(
            user_input=ev.query,
            chat_history=self.chat_history,
            possible_intentions=all_intents,
        )
        response = self.response_llm.chat(fmt_message).message.content
        try:
            response = json.loads(response)
            isSex = str(response["adult"])
            intents = response["intentions"]
            # reorder
            intents = reorder(intents, "Ask for specific Images", "Ask for specific Videos") 
            await ctx.set("intention", intents)
            status = ProcessStatus(type="thinking", status="end").to_json()
            ctx.streaming_queue.put_nowait(f"data: {status}\n\n")
            await asyncio.sleep(0)
            print(f"intention: {intents}")
            if len(intents) == 1 and intents[0] == "Casual Chat":
                # send extraStatus
                extra_status = ExtraStatus(adultMode=self.adultMode, intentionResult=intents, sensitiveResult=None, questionIsSex=isSex).to_json()
                ctx.streaming_queue.put_nowait(f"data: {extra_status}\n\n")
                await asyncio.sleep(0)
                return CasualChatEvent(query=ev.query)
            if "Ask for specific Website" in intents:
                # send extraStatus
                extra_status = ExtraStatus(adultMode=self.adultMode, intentionResult=intents, sensitiveResult=None, questionIsSex=isSex).to_json()
                ctx.streaming_queue.put_nowait(f"data: {extra_status}\n\n")
                await asyncio.sleep(0)
                return(GeneralSearchEvent(query=ev.query, tag="Ask for specific Website"))
            if "A variety of resources search" in intents:
                # send extraStatus
                extra_status = ExtraStatus(adultMode=self.adultMode, intentionResult=intents, sensitiveResult=None, questionIsSex=isSex).to_json()
                ctx.streaming_queue.put_nowait(f"data: {extra_status}\n\n")
                await asyncio.sleep(0)
                ctx.send_event(ImageResourceEvent(query=ev.query))
                ctx.send_event(VideoResourceEvent(query=ev.query))
                return GeneralSearchEvent(query=ev.query, tag="A variety of resources search")
            else:
                if "Casual Chat" in intents:
                    intents.remove("Casual Chat")
                # send extraStatus
                extra_status = ExtraStatus(adultMode=self.adultMode, intentionResult=intents, sensitiveResult=None, questionIsSex=isSex).to_json()
                # ctx.write_event_to_stream(StatusEvent(status=f"data: {extra_status}\n\n"))
                ctx.streaming_queue.put_nowait(f"data: {extra_status}\n\n")
                await asyncio.sleep(0)
                for intent in intents:
                    if intent in all_intents:
                        if "Ask for specific Videos" == intent:
                            status = ProcessStatus(type="videoResults", status="start")
                            status_str = status.to_json()
                            ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
                            await asyncio.sleep(0)
                            ctx.send_event(VideoResourceEvent(query=ev.query))
                        elif "Ask for specific Images" == intent:
                            # 发送状态信息 --> start
                            status = ProcessStatus(type="imageResults", status="start")
                            status_str = status.to_json()
                            ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
                            await asyncio.sleep(0)
                            ctx.send_event(ImageResourceEvent(query=ev.query))
                        elif "Specific Knowledge" == intent:
                            ctx.send_event(GeneralSearchEvent(query=ev.query, tag="Specific Knowledge"))
                    else:
                        ctx.send_event(SafeStartEvent(query=ev.query))
                        break
        except Exception as e:
            if self.retry_ct < 3:
                self.retry_ct += 1
                print(f"retry intention recognization: {e} - retry: {self.retry_ct}")
                return SafeStartEvent(query=ev.query)
            else:
                return CasualChatEvent(query=ev.query)

    @step
    async def query_rewrite(self, ctx: Context, ev: GeneralSearchEvent) -> GeneralSearch | MilvusDBSearchEvent:
        if self.context_flag == 1:
            # chat_history = "\n".join([str(message) for message in self.chat_store.get_messages(self.sessionId)[-5:]])
            chat_history = self.chat_history
        else:
            chat_history = " "
        # print(f"chat_history: {chat_history}")
        language = await ctx.get("language", "en")
        intention = ev.tag
        format_input = QUERY_REWRITE_PROMPT.format_messages(
            chat_history=chat_history,
            query_str=ev.query,
            intention=intention,
            language=language
        )
        response = self.response_llm.chat(format_input).message.content
        try:
            response = json.loads(response)
            query = response["query"]
            # sex_ed = response["sex_ed"]
            print(f"rewrite query: {query}")
            await ctx.set("refined_query", query)
            # searchevent 计数
            curr_ct = await ctx.get("query_event_ct", 0)
            curr_ct += 1
            ctx.send_event(GeneralSearch(query=query, tag=intention))
            if ev.tag == "Specific Knowledge":
                curr_ct += 1
                ctx.send_event(MilvusDBSearchEvent(query=query))
            await ctx.set("query_event_ct", curr_ct)
        except:
            return GeneralSearchEvent(query=ev.query, tag=ev.tag)

    @step
    async def keywords_extraction(self, ctx: Context, ev: ImageResourceEvent | VideoResourceEvent) -> ImageSearch | VideoSearch:
        if self.context_flag == 1:
            # chat_history = "\n".join([str(message) for message in self.chat_store.get_messages(self.sessionId)[-5:]])
            chat_history = self.chat_history
        else:
            chat_history = " "
        format_input = KEYWORDS_EXTRACTION_PROMPT.format_messages(
            chat_history=chat_history,
            query_str=ev.query,
        )
        try:
            # 同时有视频,图片搜索只需要提取一次关键词供搜索使用
            keywords = await ctx.get("keywords", "None")
            if keywords == "None":
                response = self.response_llm.chat(format_input).message.content
                response = json.loads(response)
                keywords = response["keywords"]
                if len(keywords) > 2:
                    keywords = keywords[:2]
                print(f"extrated_keywords: {keywords}")
                keywords = " ".join(keywords)
                await ctx.set("keywords", keywords)
            if isinstance(ev, VideoResourceEvent):
                return VideoSearch(query=keywords)
            else:
                return ImageSearch(query=keywords)
        except:
            # searchevent 计数
            curr_ct = await ctx.get("query_event_ct", 0)
            await ctx.set("query_event_ct", curr_ct + 1)
            return GeneralSearch(query=ev.query, tag="")

    @step
    async def vector_search(self, ctx: Context, ev: MilvusDBSearchEvent) -> RetrieveContextEvent:
        retriever = MilvusRetriever(uri=MILVUS_URI)
        colleciton_names = ["t_sur_sex_ed_article_spider", "t_sur_sex_ed_question_answer_spider"]
        query = ev.query
        search_results = []
        context_str = []
        for collection_name in colleciton_names:
            if collection_name == "t_sur_sex_ed_article_spider":
                process_type = "sex_ed_article"
            if collection_name == "t_sur_sex_ed_question_answer_spider":
                process_type = "sex_ed_qa"

            # 发送状态信息 --> start
            status = ProcessStatus(type=process_type, status="start")
            status_str = status.to_json()
            ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
            await asyncio.sleep(0)
            # 搜索
            res = retriever.search(query=query, collection_name=collection_name, top_k=5)
            res = [record for record in res if record["distance"] >= 0.7]
            search_results.append(res)
        result_dict = dict(zip(colleciton_names, search_results))
        for collection_name, res in result_dict.items():
            if collection_name == "t_sur_sex_ed_article_spider":
                articles = [record["entity"] for record in res]
                articles = json.dumps(articles)
                ctx.streaming_queue.put_nowait(f'data: {{"sex_ed_article":{articles}}}\n\n')
                await asyncio.sleep(0)
                # 发送状态信息 --> end
                status.update(status="end")
                status_str = status.to_json()
                ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
                await asyncio.sleep(0)
                context_str.append("Sex Education Articles:")
                for record in res:
                    title = record["entity"]["title"]
                    chunk = record["entity"]["chunk"]
                    context_str.append(title + "\n" + chunk)
            else:
                qas = [record["entity"] for record in res]
                qas = json.dumps(qas)
                ctx.streaming_queue.put_nowait(f'data: {{"sex_ed_qa":{qas}}}\n\n')
                await asyncio.sleep(0)
                # 发送状态信息 --> end
                status.update(status="end")
                status_str = status.to_json()
                ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
                await asyncio.sleep(0)
                context_str.append("Sex Education Q&As:")
                for record in res:
                    title = record["entity"]["title"]
                    content = record["entity"]["content"]
                    context_str.append(title + "\n" + content)
        context_str = "\n".join(context_str)
        return RetrieveContextEvent(context_str=context_str)

    @step
    async def image_search(self, ctx: Context, ev: ImageSearch) -> StopEvent | RetrieveContextEvent:
        loop = asyncio.get_event_loop()
        
        search_words = ev.query.strip().replace(" ", "+")
        ctx.streaming_queue.put_nowait(f'data: {{"image_searchWords":"{search_words}"}}\n\n')
        await asyncio.sleep(0)
        with concurrent.futures.ThreadPoolExecutor() as pool:
            image_result = await loop.run_in_executor(pool, image_search, ev.query, self.safe_search) 
        if image_result:
            image_str = json.dumps(image_result)
            # google_url = f"https://www.google.com/search?q={search_words}&safe=off&udm=2"
            ctx.streaming_queue.put_nowait(f'data: {{"imageResults":{image_str}}}\n\n')
            await asyncio.sleep(0)
            # 发送状态信息 --> end
            status = ProcessStatus(type="imageResults", status="end").to_json()
            ctx.streaming_queue.put_nowait(f'data: {status}\n\n')
            await asyncio.sleep(0)
            intents = await ctx.get("intention", " ")
            if len(intents) == 1 and intents[0] == "Ask for specific Images":
                if self.context_flag == 1:
                    # self.chat_store.add_message(self.sessionId, ChatMessage(role="assistant", content=parse_image_content(image_result)))
                    # self.chat_store.persist(f"{CHAT_STORE_PATH}/testing_chat_store.json")
                    t = threading.Thread(target=self.chat_store.add_message, args=(self.sessionId, "assistant", parse_image_content(image_result)))
                    t.start()
                ctx.streaming_queue.put_nowait("data:[DONE]\n\n")
                await asyncio.sleep(0)
                return StopEvent(result="success")
            elif set(intents) == set(["Ask for specific Videos", "Ask for specific Images"]):
                curr_event_Ct = await ctx.get("query_event_ct", 0)
                curr_event_Ct = 2
                await ctx.set("query_event_ct", curr_event_Ct)
                return RetrieveContextEvent(context_str="")
        else:
            # 发送状态信息 --> end
            status = ProcessStatus(type="imageResults", status="end").to_json()
            ctx.streaming_queue.put_nowait(f'data: {status}\n\n')
            await asyncio.sleep(0)
            return RetrieveContextEvent(context_str="")
    
    @step
    async def video_search(self, ctx: Context, ev: VideoSearch) -> StopEvent | RetrieveContextEvent:
        loop = asyncio.get_event_loop()
        
        search_words = ev.query.strip().replace(" ", "+")
        ctx.streaming_queue.put_nowait(f'data: {{"video_searchWords":"{search_words}"}}\n\n')
        await asyncio.sleep(0)
        with concurrent.futures.ThreadPoolExecutor() as pool:
            video_result = await loop.run_in_executor(pool, video_search, ev.query, self.safe_search)
        if video_result:
            video_str = json.dumps(video_result)
            # google_url = f"https://www.google.com/search?q={search_words}&safe=off&udm=7"
            ctx.streaming_queue.put_nowait(f'data: {{"videoResults":{video_str}}}\n\n')
            await asyncio.sleep(0)
            # 发送状态信息 --> end
            status = ProcessStatus(type="videoResults", status="end").to_json()
            ctx.streaming_queue.put_nowait(f'data: {status}\n\n')
            await asyncio.sleep(0)
            intents = await ctx.get("intention", " ")
            if len(intents) == 1 and intents[0] == "Ask for specific Videos":
                if self.context_flag == 1:
                    # self.chat_store.add_message(self.sessionId, ChatMessage(role="assistant", content=parse_video_content(video_result)))
                    # self.chat_store.persist(f"{CHAT_STORE_PATH}/testing_chat_store.json")
                    t = threading.Thread(target=self.chat_store.add_message, args=(self.sessionId, "assistant", parse_video_content(video_result)))
                    t.start()
                ctx.streaming_queue.put_nowait("data:[DONE]\n\n")
                await asyncio.sleep(0)
                return StopEvent(result="success")
            elif set(intents) == set(["Ask for specific Videos", "Ask for specific Images"]):
                curr_event_Ct = await ctx.get("query_event_ct", 0)
                curr_event_Ct = 2
                await ctx.set("query_event_ct", curr_event_Ct)
                return RetrieveContextEvent(context_str="")
        else:
            status = ProcessStatus(type="videoResults", status="end").to_json()
            ctx.streaming_queue.put_nowait(f'data: {status}\n\n')
            await asyncio.sleep(0)
            return RetrieveContextEvent(context_str="")

    @step
    async def general_search(self, ctx: Context, ev: GeneralSearch) -> RetrieveContextEvent:
        # 发送状态信息 --> start
        status = ProcessStatus(type="webResults", status="start")
        status_str = status.to_json()
        ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
        await asyncio.sleep(0)
        
        if ev.ads == "Victoria SnakeySmut":
            status.update(status="end")
            status_str = status.to_json()
            ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
            await asyncio.sleep(0)
            web_result = [{'position': 1, 'title': 'Victoria SnakeySmut | Fansly', 'link': 'https://fansly.com/SnakeySmut', 'displayed_link': 'fansly.com/SnakeySmut', 'snippet': 'SnakeySmut conjures audio roleplays. Like the little noises I make with my mouth? Come see everything here! 18+ ONLY.'}, {'position': 2, 'title': 'Victoria (u/SnakeySmut) - Reddit', 'link': 'https://www.reddit.com/user/SnakeySmut/', 'displayed_link': 'www.reddit.com › user › SnakeySmut', 'snippet': 'u/SnakeySmut: The witchiest little treat ❤︎ Enjoy me as all dark things are to be loved, in secret and the shadows. Spooky girl with a proclivity for…'}, {'position': 3, 'title': 'victoria malfoy (@SnakeySmut) / X', 'link': 'https://x.com/snakeysmut?lang=en', 'displayed_link': 'x.com › snakeysmut', 'snippet': "victoria malfoy · @SnakeySmut. ·. Jan 2. Hi there, I'm Victoria A witch of many talents, specializing in fantasy fulfillment of aural and visual magic Links ..."}, {'position': 4, 'title': 'Highlights by victoria malfoy (@SnakeySmut) / X', 'link': 'https://twitter.com/SnakeySmut/highlights', 'displayed_link': 'twitter.com › SnakeySmut › highlights', 'snippet': "Posts · Replies · Highlights · Media. victoria malfoy's Highlights. victoria malfoy · @SnakeySmut. ·. Jan 13. you don't actually want to watch a movie, right?"}, {'position': 5, 'title': 'about - SnakeySmut', 'link': 'https://www.snakeysmut.com/aboutsnakeysmut', 'displayed_link': 'www.snakeysmut.com › aboutsnakeysmut', 'snippet': "Greetings, I'm Victoria. you know me best as snakeysmut."}]
            web_str = json.dumps(web_result)
            ctx.streaming_queue.put_nowait(f'data: {{"webResults":{web_str}}}\n\n')
            await asyncio.sleep(0)
            web_content = parse_web_search_content(web_result)
            return RetrieveContextEvent(context_str=web_content)
        if ev.ads == "Chanell Heart":
            status.update(status="end")
            status_str = status.to_json()
            ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
            await asyncio.sleep(0)
            web_result = general_search("Chanell Heart", mode=self.safe_search)
            web_str = json.dumps(web_result)
            ctx.streaming_queue.put_nowait(f'data: {{"webResults":{web_str}}}\n\n')
            await asyncio.sleep(0)
            web_content = parse_web_search_content(web_result)
            return RetrieveContextEvent(context_str=web_content)

        loop = asyncio.get_event_loop()
        with concurrent.futures.ThreadPoolExecutor() as executor:
            web_result = await loop.run_in_executor(
                executor, general_search, ev.query, self.safe_search
            )

        if web_result:
            # 发送状态信息 --> end (搜索)
            status.update(status="end")
            status_str = status.to_json()
            ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
            await asyncio.sleep(0)
            if ev.tag in ('A variety of resources search', 'Specific Knowledge'):
                # 总结网页内容
                # 发送状态信息 --> start (总结)
                substatus = ProcessStatus(type="web_summary", status="start")
                substatus_str = substatus.to_json()
                ctx.streaming_queue.put_nowait(f'data: {substatus_str}\n\n')
                await asyncio.sleep(0)
                urls = [web["link"] for web in web_result][:2]
                web_summaries = []
                with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
                    future_to_url = {executor.submit(web_reader, url): url for url in urls}
                    for future in concurrent.futures.as_completed(future_to_url):
                        try:
                            summary = future.result()
                            if summary:
                                web_summaries.append(summary)
                        except Exception as exc:
                            continue
                
                web_summaries = "\n\n".join(web_summaries)
                web_str = json.dumps(web_result)
                ctx.streaming_queue.put_nowait(f'data: {{"webResults":{web_str}}}\n\n')

                # 发送状态信息 --> end (总结)
                substatus.update("end")
                substatus_str = substatus.to_json()
                ctx.streaming_queue.put_nowait(f'data: {substatus_str}\n\n')
                await asyncio.sleep(0)
                return RetrieveContextEvent(context_str="General Search Result:\n" + web_summaries)
            else:
                web_content = json.dumps(web_result)
                await ctx.set("response_mode", "WebsiteResponse")
                return RetrieveContextEvent(context_str=web_content)
        else:
            # 发送状态信息 --> end (搜索)
            status.update(status="end")
            status_str = status.to_json()
            ctx.streaming_queue.put_nowait(f'data: {status_str}\n\n')
            await asyncio.sleep(0)
            return RetrieveContextEvent(context_str=" ")

    @step
    async def gather_context(self, ctx: Context, ev: RetrieveContextEvent) -> FullContextEvent | StopEvent:
        event_cts = await ctx.get("query_event_ct", 0)
        print(f"event_cts: {event_cts}")
        events = ctx.collect_events(ev, [RetrieveContextEvent] * event_cts)
        full_context = []
        intents = await ctx.get("intention", " ")
        if set(intents) == set(["Ask for specific Videos", "Ask for specific Images"]):
            if events:
                for idx, event in enumerate(events):
                    if idx == 1:
                        ctx.write_event_to_stream(TokenEvent(token='data:[DONE]\n\n'))
                        await asyncio.sleep(0)
                        return StopEvent(result="success")
        if events:
            print(f"recevived {len(events)} events")
            for ev in events:
                full_context.append(ev.context_str)
            full_context = "\n\n".join(full_context)[:10000]
            return FullContextEvent(context_str=full_context)

    @step
    async def casual_response(self, ctx: Context, ev: CasualChatEvent) -> StopEvent:
        response_str = ""
        if self.context_flag == 1:
            chat_history = self.chat_history
        else:
            chat_history = ""
        language = await ctx.get("language", "en")

        start_phrase = await ctx.get("start_phrase", "")
        if start_phrase != "":
            start_phrase = self.response_llm.chat(TRANSLATE_PROMPT.format_messages(user_input=start_phrase, language=language)).message.content
            start_phrase = "*" + start_phrase + "*\n\n"
            response_str += start_phrase
            content = {"content": start_phrase}
            content = json.dumps(content)
            ctx.write_event_to_stream(TokenEvent(token=(f"data:{content}\n\n")))
            await asyncio.sleep(0)

        response = self.response_llm.stream(CASUAL_CHAT_PROMPT, user_input=ev.query, chat_history=chat_history, language=language)
        for token in response:
            if token == "":
                continue
            time = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
            response_str += token
            content = {"content": token, "time": time}
            content = json.dumps(content)
            ctx.write_event_to_stream(TokenEvent(token=(f"data:{content}\n\n")))
            await asyncio.sleep(0)
        ctx.write_event_to_stream(TokenEvent(token="data:[DONE]\n\n"))
        print(f"reponse_str: {response_str}")
        if self.context_flag == 1:
            # self.chat_store.add_message(self.sessionId, ChatMessage(role="assistant", content=response_str))
            # self.chat_store.persist(f"{CHAT_STORE_PATH}/testing_chat_store.json")
            t = threading.Thread(target=self.chat_store.add_message, args=(self.sessionId, "assistant", response_str))
            t.start()
        return StopEvent(result="success")

    @step
    async def response_synthesis(self, ctx: Context, ev: FullContextEvent) -> StopEvent:        
        response_str = ""
        query_str = await ctx.get("refined_query", " ")
        original_query = await ctx.get("original_query")
        language = await ctx.get("language", "en")
        keywords = await ctx.get("keywords", " ")
        if query_str == " ":
            query_str = original_query

        # stream 最终回复   
        # 医疗信息起始句
        start_phrase = await ctx.get("start_phrase", "")
        response_mode = await ctx.get("response_mode", "")
        if start_phrase != "":
            start_phrase = self.response_llm.chat(TRANSLATE_PROMPT.format_messages(user_input=start_phrase, language=language)).message.content
            start_phrase = "*" + start_phrase + "*\n\n"
            response_str += start_phrase
            content = {"content": start_phrase}
            content = json.dumps(content)
            ctx.write_event_to_stream(TokenEvent(token=(f"data:{content}\n\n")))
            await asyncio.sleep(0)
            
        # 临时ads
        if response_mode == "ads-victoria":
            for c in "**Victoria SnakeySmut is going to join X3 show, she is a famous model on Fansly! you can open her page on vibemate to sync with her content.**\n\n":
                response_str += c
                content = json.dumps({"content": c})
                ctx.write_event_to_stream(TokenEvent(token=f"data:{content}\n\n"))
                await asyncio.sleep(0)
            print(f"response_str:\n{response_str}\n")
            return StopEvent(result="success")
        if response_mode == "ads-chanell-heart":
            for c in "**Chanell-heart is joining the X3 show! She streams on Stripchat and is also popular on Pornhub. On VibeMate, connect your toys to sync with her during her cam shows and videos.**\n\n":
                response_str += c
                content = json.dumps({"content": c})
                ctx.write_event_to_stream(TokenEvent(token=f"data:{content}\n\n"))
                await asyncio.sleep(0)
            print(f"response_str:\n{response_str}\n")
            return StopEvent(result="success")
            
        # 选择回复模版
        if response_mode == "WebsiteResponse":
            chat_response = self.response_llm.stream(
                prompt=WEBSIT_PROMPT,
                user_input=query_str,
                search_result=ev.context_str,
                language=language
            )
        elif response_mode == "Definition of Term":
            chat_response = self.response_llm.stream(
                prompt=TERM_PROMPT,
                user_input=query_str,
                search_result=ev.context_str,
                language='english'
            )
        else:
            chat_response = self.response_llm.stream(
                prompt=self.response_synthesis_prompt,
                search_keyword=query_str,
                search_result=ev.context_str,
                language=language
            )
        try:
            for token in chat_response:
                time = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
                response_str += token
                content = {"content": token, "time": time}
                content = json.dumps(content)
                ctx.write_event_to_stream(TokenEvent(token=f"data:{content}\n\n"))
                await asyncio.sleep(0)
            # 关联搜索词
            extraction = self.response_llm.chat(
                RELATED_SEARCH_PROMPT.format_messages(
                    keywords=keywords,
                    # chat_history="assistant: " + response_str,
                    retrieved_content=ev.context_str
                )
            ).message.content
            try:
                extraction = json.loads(extraction)
                related_search = extraction["related_searches"]
                tags = extraction["tags"]
                if len(extraction["related_searches"])>3:
                    related_search = related_search[:3]
                if len(extraction["tags"])>3:
                    tags = tags[:3]
                y_related_search = json.dumps({"related_searches": related_search})
                y_tags = json.dumps({"tags": tags})
                ctx.write_event_to_stream(TokenEvent(token=f"data:{y_tags}\n\n"))
                ctx.write_event_to_stream(TokenEvent(token=f"data:{y_related_search}\n\n"))
                await asyncio.sleep(0)
            except Exception as e:
                print(f"Related searchs & tags JSONDecode ERROR: {e}")
            ctx.write_event_to_stream(TokenEvent(token="data:[DONE]\n\n"))
            print(f"response_str:\n{response_str}\n")
            if self.context_flag == 1:
                # self.chat_store.add_message(self.sessionId, ChatMessage(role="assistant", content=response_str))
                # self.chat_store.persist(f"{CHAT_STORE_PATH}/testing_chat_store.json")
                t = threading.Thread(target=self.chat_store.add_message, args=(self.sessionId, "assistant", response_str))
                t.start()
        except Exception as e:
            print(f"Streaming Exception: {e}")

        return StopEvent(result="success")

async def sql_workflow(query: str, chat_store: SimpleChatStore, sessionId: str, llm: Ollama, context_flag: int, adultMode: bool):
    response_synthesis_prompt = FINAL_RESPONSE_PROMPT
    wf = SQLWorkflow(
        response_llm=llm,
        response_synthesis_prompt=response_synthesis_prompt,
        chat_store=chat_store,
        sessionId=sessionId,
        context_flag=context_flag,
        adultMode=adultMode,
        verbose=True,
        timeout=60
    )
    handler = wf.run(query=query)
    return handler.stream_events()
    
def reorder(l: list, former: str, latter: str):
    former_idx = l.index(former) if former in l else -1
    latter_idx = l.index(latter) if latter in l else -1
    if former_idx > latter_idx and former_idx != -1 and latter_idx != -1:
        l[former_idx], l[latter_idx] = l[latter_idx], l[former_idx]
    return l