|
from fastapi import FastAPI, Request |
|
from langchain_ollama import OllamaEmbeddings |
|
from llama_index.core.node_parser import SentenceSplitter |
|
from pymilvus import MilvusClient |
|
from tqdm import tqdm |
|
from concurrent.futures import ThreadPoolExecutor |
|
import html2text |
|
import logging |
|
|
|
|
|
logging.basicConfig( |
|
filename='/home/purui/update_milvus.log', |
|
filemode='a', |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
level=logging.ERROR |
|
) |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
executor = ThreadPoolExecutor(max_workers=10) |
|
|
|
h = html2text.HTML2Text() |
|
h.ignore_links = True |
|
|
|
embed_model = OllamaEmbeddings(model="bge-m3") |
|
|
|
app = FastAPI( |
|
title="update_milvus", |
|
description="update_milvus", |
|
) |
|
|
|
URI = "http://localhost:19530" |
|
|
|
def update_article(data): |
|
|
|
splitter = SentenceSplitter(chunk_size=200, chunk_overlap=20) |
|
client = MilvusClient(uri=URI) |
|
collection_name = "t_sur_sex_ed_article_spider" |
|
|
|
logger.info(f"Starting Embedding: {data}") |
|
ct = 0 |
|
for row in tqdm(data, desc="Updating Milvus"): |
|
id = row["id"] |
|
|
|
if client.get(collection_name=collection_name, ids=[id]): |
|
client.delete(collection_name=collection_name, filter='id like "{id}%"') |
|
title = row["title"] |
|
content = row["content"] |
|
tags = row["tags"] |
|
link = row["link"] |
|
category = row["category2"] |
|
if category is None: |
|
category = " " |
|
|
|
content = splitter.split_text(h.handle(content)) |
|
content_embeddings = embed_model.embed_documents(content) |
|
|
|
title_embeddings = embed_model.embed_query(title) |
|
tags_embeddings = embed_model.embed_query(tags) |
|
|
|
insert_data = [] |
|
|
|
for idx, (sentence, embedding) in enumerate(zip(content, content_embeddings)): |
|
|
|
id = id + "_" + str(idx) |
|
s_title = title |
|
v_title = title_embeddings |
|
s_chunk = sentence |
|
v_chunk = embedding |
|
row_data = { |
|
"id": id, |
|
"title": s_title, |
|
"title_vector": v_title, |
|
"chunk": s_chunk, |
|
"chunk_vector": v_chunk, |
|
"tags": tags_embeddings, |
|
"link": link, |
|
"category": category |
|
} |
|
insert_data.append(row_data) |
|
ct += 1 |
|
|
|
client.insert(collection_name=collection_name, data=insert_data) |
|
logger.error(f"Insert complete. {ct} chunks inserted.") |
|
|
|
def update_qa(data): |
|
|
|
client = MilvusClient(uri=URI) |
|
collection_name = "t_sur_sex_ed_question_answer_spider" |
|
splitter = SentenceSplitter(chunk_size=200, chunk_overlap=20) |
|
|
|
logger.info(f"Starting Embedding: {data}") |
|
ct = 0 |
|
for row in tqdm(data, desc="Updating Milvus"): |
|
id = row["id"] |
|
|
|
if client.get(collection_name=collection_name, ids=[id]): |
|
client.delete(collection_name=collection_name, filter='id like "{id}%"') |
|
url = row["url"] |
|
title = row["title"] |
|
content = row["content"] |
|
content_type = row["type"] |
|
likes = row["likeCount"] |
|
dislikes = row["dislikeCount"] |
|
author = row["authorName"] |
|
avatar_url = row["avatarUrl"] |
|
if avatar_url is None: |
|
avatar_url = " " |
|
content = splitter.split_text(h.handle(content)) |
|
|
|
title_vector = embed_model.embed_query(title) |
|
content_vector = embed_model.embed_documents(content) |
|
|
|
|
|
for idx, (sentence, embedding) in enumerate(zip(content, content_vector)): |
|
sub_id = id + "_" + str(idx) |
|
row_data = { |
|
"id": sub_id, |
|
"url": url, |
|
"title_vector": title_vector, |
|
"content_vector": embedding, |
|
"title": title, |
|
"content": sentence, |
|
"content_type":content_type, |
|
"author": author, |
|
"avatar_url": avatar_url, |
|
"likes": likes, |
|
"dislikes": dislikes |
|
} |
|
|
|
client.insert(collection_name=collection_name, data=row_data) |
|
ct += 1 |
|
logger.error(f"Insert complete. {ct} data inserted.") |
|
|
|
|
|
def update_video(data): |
|
|
|
client = MilvusClient(uri=URI) |
|
collection_name = "t_sur_sex_ed_youtube_spider" |
|
|
|
logger.info(f"Starting Embedding: {data}") |
|
ct = 0 |
|
|
|
for row in tqdm(data, desc="Updating Milvus"): |
|
id = row["id"] |
|
|
|
if client.get(collection_name=collection_name, ids=[id]): |
|
client.delete(collection_name=collection_name, filter='id like "{id}%"') |
|
link = row["videoLink"] |
|
title = row["videoTitle"] |
|
views = row["videoViews"] |
|
author = row["videoAuthor"] |
|
picture = row["videoPicture"] |
|
likes = row["videoLikes"] |
|
duration = row["videoDuration"] |
|
tag = row["tag"] |
|
delete_status = row["deleteStatus"] |
|
|
|
title_vector = embed_model.embed_query(title) |
|
tag_vector = embed_model.embed_query(tag) |
|
views = "".join(views.split(",")) |
|
|
|
if delete_status == "1": |
|
continue |
|
row_data = { |
|
"id": id, |
|
"link": link, |
|
"title": title, |
|
"title_vector": title_vector, |
|
"tag": tag, |
|
"tag_vector": tag_vector, |
|
"author": author, |
|
"picture": picture, |
|
"likes": likes, |
|
"duration": duration, |
|
"views": views, |
|
"delete_status": delete_status |
|
} |
|
|
|
client.insert(collection_name=collection_name, data=row_data) |
|
ct += 1 |
|
logger.error(f"Insert complete. {ct} data inserted.") |
|
|
|
def update_porn(data): |
|
|
|
client = MilvusClient(uri=URI) |
|
collection_name = "t_sur_video" |
|
|
|
logger.info(f"Starting Embedding: {data}") |
|
ct = 0 |
|
|
|
for row in tqdm(data): |
|
url = row["webUrl"] |
|
duration = row["duration"] |
|
viewCount = row["viewCount"] |
|
if viewCount == None: |
|
continue |
|
coverPicture = row["coverPicture"] |
|
title = row["title"] |
|
uploader = row["uploader"] |
|
if uploader == None: |
|
uploader = "" |
|
categories = row["categories"] |
|
resource_type = row["resourceType"] |
|
sexual_preference = int(row["sexualPreference"]) |
|
delete_status = row["isDelete"] |
|
|
|
|
|
categories = categories[:-1] |
|
categories = " ".join(categories) |
|
|
|
|
|
title_vector = embed_model.embed_query(title) |
|
categories_vector = embed_model.embed_query(categories) |
|
|
|
if delete_status != 1: |
|
row_data = { |
|
"url": url, |
|
"duration": duration, |
|
"viewCount": viewCount, |
|
"coverPicture": coverPicture, |
|
"title": title, |
|
"uploader": uploader, |
|
"categories": categories, |
|
"resourceType": resource_type, |
|
"sexualPreference": sexual_preference, |
|
"title_vector": title_vector, |
|
"categories_vector": categories_vector |
|
} |
|
|
|
client.insert(collection_name, data=row_data) |
|
ct += 1 |
|
logger.error(f"Insert complete. {ct} data inserted.") |
|
|
|
|
|
@app.post("/milvus") |
|
async def update_milvus(request: Request): |
|
|
|
body = await request.json() |
|
data = body["page"]["data"] |
|
table = body["type"] |
|
print(f"data len: {len(data)}") |
|
print(f"table: {table}") |
|
if table == "articleSpiderSyncMilVus": |
|
|
|
try: |
|
executor.submit(update_article, data) |
|
return {"status": "SUCCESS"} |
|
except Exception as e: |
|
logging.info(f"ThreadError: {e}") |
|
elif table == "answerSpiderSyncMilVus": |
|
try: |
|
executor.submit(update_qa, data) |
|
return {"status": "SUCCESS"} |
|
except Exception as e: |
|
logging.info(f"ThreadError: {e}") |
|
elif table == "youtubeSpiderSyncMilVus": |
|
try: |
|
executor.submit(update_video, data) |
|
return {"status": "SUCCESS"} |
|
except Exception as e: |
|
logging.info(f"ThreadError: {e}") |
|
elif table == "syncToMediaSyncMilVus": |
|
try: |
|
executor.submit(update_porn, data) |
|
return {"status": "SUCCESS"} |
|
except Exception as e: |
|
logging.info(f"ThreadError: {e}") |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=8010, loop="asyncio") |