SexBot / milvusDB /update_milvus.py
Pew404's picture
Upload folder using huggingface_hub
318db6e verified
from fastapi import FastAPI, Request
from langchain_ollama import OllamaEmbeddings
from llama_index.core.node_parser import SentenceSplitter
from pymilvus import MilvusClient
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
import html2text
import logging
# 配置日志
logging.basicConfig(
filename='/home/purui/update_milvus.log', # 日志文件路径
filemode='a', # 追加模式
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.ERROR
)
# 创建一个logger
logger = logging.getLogger(__name__)
executor = ThreadPoolExecutor(max_workers=10)
h = html2text.HTML2Text()
h.ignore_links = True
embed_model = OllamaEmbeddings(model="bge-m3")
app = FastAPI(
title="update_milvus",
description="update_milvus",
)
URI = "http://localhost:19530"
def update_article(data):
# update article
splitter = SentenceSplitter(chunk_size=200, chunk_overlap=20)
client = MilvusClient(uri=URI)
collection_name = "t_sur_sex_ed_article_spider"
logger.info(f"Starting Embedding: {data}")
ct = 0
for row in tqdm(data, desc="Updating Milvus"):
id = row["id"]
# 同ID数据更新
if client.get(collection_name=collection_name, ids=[id]):
client.delete(collection_name=collection_name, filter='id like "{id}%"')
title = row["title"]
content = row["content"]
tags = row["tags"]
link = row["link"]
category = row["category2"]
if category is None:
category = " "
content = splitter.split_text(h.handle(content)) # List[str]
content_embeddings = embed_model.embed_documents(content) # List[List[float]]
title_embeddings = embed_model.embed_query(title)
tags_embeddings = embed_model.embed_query(tags)
insert_data = []
# iter chunks from one article
for idx, (sentence, embedding) in enumerate(zip(content, content_embeddings)):
# update milvus
id = id + "_" + str(idx)
s_title = title
v_title = title_embeddings
s_chunk = sentence
v_chunk = embedding
row_data = {
"id": id,
"title": s_title,
"title_vector": v_title,
"chunk": s_chunk,
"chunk_vector": v_chunk,
"tags": tags_embeddings,
"link": link,
"category": category
}
insert_data.append(row_data)
ct += 1
# update milvus
client.insert(collection_name=collection_name, data=insert_data)
logger.error(f"Insert complete. {ct} chunks inserted.")
def update_qa(data):
# set up
client = MilvusClient(uri=URI)
collection_name = "t_sur_sex_ed_question_answer_spider"
splitter = SentenceSplitter(chunk_size=200, chunk_overlap=20)
logger.info(f"Starting Embedding: {data}")
ct = 0
for row in tqdm(data, desc="Updating Milvus"):
id = row["id"]
# 同ID数据更新
if client.get(collection_name=collection_name, ids=[id]):
client.delete(collection_name=collection_name, filter='id like "{id}%"')
url = row["url"]
title = row["title"]
content = row["content"]
content_type = row["type"]
likes = row["likeCount"]
dislikes = row["dislikeCount"]
author = row["authorName"]
avatar_url = row["avatarUrl"]
if avatar_url is None:
avatar_url = " "
content = splitter.split_text(h.handle(content))
title_vector = embed_model.embed_query(title)
content_vector = embed_model.embed_documents(content)
# iter chunks from one article
for idx, (sentence, embedding) in enumerate(zip(content, content_vector)):
sub_id = id + "_" + str(idx)
row_data = {
"id": sub_id,
"url": url,
"title_vector": title_vector,
"content_vector": embedding,
"title": title,
"content": sentence,
"content_type":content_type,
"author": author,
"avatar_url": avatar_url,
"likes": likes,
"dislikes": dislikes
}
# update milvus
client.insert(collection_name=collection_name, data=row_data)
ct += 1
logger.error(f"Insert complete. {ct} data inserted.")
def update_video(data):
# set up
client = MilvusClient(uri=URI)
collection_name = "t_sur_sex_ed_youtube_spider"
logger.info(f"Starting Embedding: {data}")
ct = 0
for row in tqdm(data, desc="Updating Milvus"):
id = row["id"]
# 同ID数据更新
if client.get(collection_name=collection_name, ids=[id]):
client.delete(collection_name=collection_name, filter='id like "{id}%"')
link = row["videoLink"]
title = row["videoTitle"]
views = row["videoViews"]
author = row["videoAuthor"]
picture = row["videoPicture"]
likes = row["videoLikes"]
duration = row["videoDuration"]
tag = row["tag"]
delete_status = row["deleteStatus"]
title_vector = embed_model.embed_query(title)
tag_vector = embed_model.embed_query(tag)
views = "".join(views.split(",")) # 599,450 -> 599450
if delete_status == "1":
continue
row_data = {
"id": id,
"link": link,
"title": title,
"title_vector": title_vector,
"tag": tag,
"tag_vector": tag_vector,
"author": author,
"picture": picture,
"likes": likes,
"duration": duration,
"views": views,
"delete_status": delete_status
}
# update milvus
client.insert(collection_name=collection_name, data=row_data)
ct += 1
logger.error(f"Insert complete. {ct} data inserted.")
def update_porn(data):
#set up
client = MilvusClient(uri=URI)
collection_name = "t_sur_video"
logger.info(f"Starting Embedding: {data}")
ct = 0
for row in tqdm(data):
url = row["webUrl"]
duration = row["duration"]
viewCount = row["viewCount"]
if viewCount == None:
continue
coverPicture = row["coverPicture"]
title = row["title"]
uploader = row["uploader"]
if uploader == None:
uploader = ""
categories = row["categories"]
resource_type = row["resourceType"]
sexual_preference = int(row["sexualPreference"])
delete_status = row["isDelete"]
# delete last element from categories
categories = categories[:-1]
categories = " ".join(categories)
# embedding
title_vector = embed_model.embed_query(title)
categories_vector = embed_model.embed_query(categories)
if delete_status != 1:
row_data = {
"url": url,
"duration": duration,
"viewCount": viewCount,
"coverPicture": coverPicture,
"title": title,
"uploader": uploader,
"categories": categories,
"resourceType": resource_type,
"sexualPreference": sexual_preference,
"title_vector": title_vector,
"categories_vector": categories_vector
}
client.insert(collection_name, data=row_data)
ct += 1
logger.error(f"Insert complete. {ct} data inserted.")
@app.post("/milvus")
async def update_milvus(request: Request):
# update milvus
body = await request.json()
data = body["page"]["data"]
table = body["type"]
print(f"data len: {len(data)}")
print(f"table: {table}")
if table == "articleSpiderSyncMilVus":
# update_article(data)
try:
executor.submit(update_article, data)
return {"status": "SUCCESS"}
except Exception as e:
logging.info(f"ThreadError: {e}")
elif table == "answerSpiderSyncMilVus":
try:
executor.submit(update_qa, data)
return {"status": "SUCCESS"}
except Exception as e:
logging.info(f"ThreadError: {e}")
elif table == "youtubeSpiderSyncMilVus":
try:
executor.submit(update_video, data)
return {"status": "SUCCESS"}
except Exception as e:
logging.info(f"ThreadError: {e}")
elif table == "syncToMediaSyncMilVus":
try:
executor.submit(update_porn, data)
return {"status": "SUCCESS"}
except Exception as e:
logging.info(f"ThreadError: {e}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8010, loop="asyncio")