from fastapi import FastAPI, Request from langchain_ollama import OllamaEmbeddings from llama_index.core.node_parser import SentenceSplitter from pymilvus import MilvusClient from tqdm import tqdm from concurrent.futures import ThreadPoolExecutor import html2text import logging # 配置日志 logging.basicConfig( filename='/home/purui/update_milvus.log', # 日志文件路径 filemode='a', # 追加模式 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.ERROR ) # 创建一个logger logger = logging.getLogger(__name__) executor = ThreadPoolExecutor(max_workers=10) h = html2text.HTML2Text() h.ignore_links = True embed_model = OllamaEmbeddings(model="bge-m3") app = FastAPI( title="update_milvus", description="update_milvus", ) URI = "http://localhost:19530" def update_article(data): # update article splitter = SentenceSplitter(chunk_size=200, chunk_overlap=20) client = MilvusClient(uri=URI) collection_name = "t_sur_sex_ed_article_spider" logger.info(f"Starting Embedding: {data}") ct = 0 for row in tqdm(data, desc="Updating Milvus"): id = row["id"] # 同ID数据更新 if client.get(collection_name=collection_name, ids=[id]): client.delete(collection_name=collection_name, filter='id like "{id}%"') title = row["title"] content = row["content"] tags = row["tags"] link = row["link"] category = row["category2"] if category is None: category = " " content = splitter.split_text(h.handle(content)) # List[str] content_embeddings = embed_model.embed_documents(content) # List[List[float]] title_embeddings = embed_model.embed_query(title) tags_embeddings = embed_model.embed_query(tags) insert_data = [] # iter chunks from one article for idx, (sentence, embedding) in enumerate(zip(content, content_embeddings)): # update milvus id = id + "_" + str(idx) s_title = title v_title = title_embeddings s_chunk = sentence v_chunk = embedding row_data = { "id": id, "title": s_title, "title_vector": v_title, "chunk": s_chunk, "chunk_vector": v_chunk, "tags": tags_embeddings, "link": link, "category": category } insert_data.append(row_data) ct += 1 # update milvus client.insert(collection_name=collection_name, data=insert_data) logger.error(f"Insert complete. {ct} chunks inserted.") def update_qa(data): # set up client = MilvusClient(uri=URI) collection_name = "t_sur_sex_ed_question_answer_spider" splitter = SentenceSplitter(chunk_size=200, chunk_overlap=20) logger.info(f"Starting Embedding: {data}") ct = 0 for row in tqdm(data, desc="Updating Milvus"): id = row["id"] # 同ID数据更新 if client.get(collection_name=collection_name, ids=[id]): client.delete(collection_name=collection_name, filter='id like "{id}%"') url = row["url"] title = row["title"] content = row["content"] content_type = row["type"] likes = row["likeCount"] dislikes = row["dislikeCount"] author = row["authorName"] avatar_url = row["avatarUrl"] if avatar_url is None: avatar_url = " " content = splitter.split_text(h.handle(content)) title_vector = embed_model.embed_query(title) content_vector = embed_model.embed_documents(content) # iter chunks from one article for idx, (sentence, embedding) in enumerate(zip(content, content_vector)): sub_id = id + "_" + str(idx) row_data = { "id": sub_id, "url": url, "title_vector": title_vector, "content_vector": embedding, "title": title, "content": sentence, "content_type":content_type, "author": author, "avatar_url": avatar_url, "likes": likes, "dislikes": dislikes } # update milvus client.insert(collection_name=collection_name, data=row_data) ct += 1 logger.error(f"Insert complete. {ct} data inserted.") def update_video(data): # set up client = MilvusClient(uri=URI) collection_name = "t_sur_sex_ed_youtube_spider" logger.info(f"Starting Embedding: {data}") ct = 0 for row in tqdm(data, desc="Updating Milvus"): id = row["id"] # 同ID数据更新 if client.get(collection_name=collection_name, ids=[id]): client.delete(collection_name=collection_name, filter='id like "{id}%"') link = row["videoLink"] title = row["videoTitle"] views = row["videoViews"] author = row["videoAuthor"] picture = row["videoPicture"] likes = row["videoLikes"] duration = row["videoDuration"] tag = row["tag"] delete_status = row["deleteStatus"] title_vector = embed_model.embed_query(title) tag_vector = embed_model.embed_query(tag) views = "".join(views.split(",")) # 599,450 -> 599450 if delete_status == "1": continue row_data = { "id": id, "link": link, "title": title, "title_vector": title_vector, "tag": tag, "tag_vector": tag_vector, "author": author, "picture": picture, "likes": likes, "duration": duration, "views": views, "delete_status": delete_status } # update milvus client.insert(collection_name=collection_name, data=row_data) ct += 1 logger.error(f"Insert complete. {ct} data inserted.") def update_porn(data): #set up client = MilvusClient(uri=URI) collection_name = "t_sur_video" logger.info(f"Starting Embedding: {data}") ct = 0 for row in tqdm(data): url = row["webUrl"] duration = row["duration"] viewCount = row["viewCount"] if viewCount == None: continue coverPicture = row["coverPicture"] title = row["title"] uploader = row["uploader"] if uploader == None: uploader = "" categories = row["categories"] resource_type = row["resourceType"] sexual_preference = int(row["sexualPreference"]) delete_status = row["isDelete"] # delete last element from categories categories = categories[:-1] categories = " ".join(categories) # embedding title_vector = embed_model.embed_query(title) categories_vector = embed_model.embed_query(categories) if delete_status != 1: row_data = { "url": url, "duration": duration, "viewCount": viewCount, "coverPicture": coverPicture, "title": title, "uploader": uploader, "categories": categories, "resourceType": resource_type, "sexualPreference": sexual_preference, "title_vector": title_vector, "categories_vector": categories_vector } client.insert(collection_name, data=row_data) ct += 1 logger.error(f"Insert complete. {ct} data inserted.") @app.post("/milvus") async def update_milvus(request: Request): # update milvus body = await request.json() data = body["page"]["data"] table = body["type"] print(f"data len: {len(data)}") print(f"table: {table}") if table == "articleSpiderSyncMilVus": # update_article(data) try: executor.submit(update_article, data) return {"status": "SUCCESS"} except Exception as e: logging.info(f"ThreadError: {e}") elif table == "answerSpiderSyncMilVus": try: executor.submit(update_qa, data) return {"status": "SUCCESS"} except Exception as e: logging.info(f"ThreadError: {e}") elif table == "youtubeSpiderSyncMilVus": try: executor.submit(update_video, data) return {"status": "SUCCESS"} except Exception as e: logging.info(f"ThreadError: {e}") elif table == "syncToMediaSyncMilVus": try: executor.submit(update_porn, data) return {"status": "SUCCESS"} except Exception as e: logging.info(f"ThreadError: {e}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8010, loop="asyncio")