Spaces:
Runtime error
Runtime error
File size: 3,936 Bytes
eb24142 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
import streamlit as st
import pandas as pd
import pathlib
import whoosh
import whoosh.index
import whoosh.query
import os
from datetime import date as Date
import re
DATA_FOLDER = pathlib.Path(__file__).parent / "Data"
RAW_FOLDER = DATA_FOLDER / "Transcription_raw"
INDEX_FOLDER = DATA_FOLDER / "Transcription_index"
class Searcher:
def __init__(self):
self.ix = self.make_total_ix()
self.df_video_links = self.get_video_links()
def make_total_ix(self):
ixes_sub = []
index_dir_list = os.listdir(INDEX_FOLDER)
for name in index_dir_list:
if name.startswith("sub"):
ixes_sub.append(whoosh.index.open_dir(INDEX_FOLDER / name))
ix = MultiIndexSearcher(ixes_sub)
return ix
def search(self, date_start, date_end, **kwargs):
titles = self.ix.search(**kwargs)
#(index, date, title)
contents = []
for title in titles:
index = int(title.split("m")[0])
row = self.df_video_links.iloc[index]
title = row["title"]
date = row["date"]
date_datetime = Date(*map(int, date.split("/")))
if not (date_start <= date_datetime <= date_end):
continue
contents.append((date_datetime, index, date, title))
#order by date_datetime
contents.sort()
#remove date_datetime
contents = [(index, date, title) for _, index, date, title in contents]
return contents
def get_video_links(self):
return pd.read_csv(DATA_FOLDER / "video_links.csv", index_col=0)
def get_content(self, index):
#正規表現でRAW_FOLDERから「index-(数字).csv」のファイルを取得
folder_list = os.listdir(RAW_FOLDER)
pattern = re.compile(r"{}-\d+.csv".format(index))
matched = [name for name in folder_list if pattern.match(name)]
matched.sort()
best = matched[-1]
return pd.read_csv(RAW_FOLDER / best)
class MultiIndexSearcher:
def __init__(self, ixes):
self.ixes = ixes
def search(self, **kwargs):
titles = []
for ix in self.ixes:
with ix.searcher() as sub_searcher:
hits = sub_searcher.search(**kwargs, limit = None)
for hit in hits:
titles.append(hit["title"])
return titles
searcher = Searcher()
def main():
global searcher
st.title("KATO DB")
keyword = st.text_input(
"検索したいキーワードを入力して、Enterを押してください\n"\
"空欄だと全文書表示します。"
)
date_start = st.date_input(
"検索したい開始日付を入力してください",
Date(2009, 1, 1)
)
date_end = st.date_input(
"検索したい終了日付を入力してください",
Date(2050, 12, 31)
)
#make query
if keyword == "":
query = whoosh.query.Every()
else:
#AND search
keyword_list = keyword.split()
query = whoosh.query.And([whoosh.query.Term("content", word) for word in keyword_list])
contents = searcher.search(q = query, date_start=date_start, date_end=date_end)
st.write("該当件数:{}件".format(len(contents)))
results = pd.DataFrame(contents, columns=["管理番号", "放送日", "動画タイトル"])
st.dataframe(results, hide_index=True)
selected_index = st.selectbox("管理番号を選択して書き起こしを表示", results["管理番号"])
if selected_index is not None:
df_transcription = searcher.get_content(selected_index)
st.dataframe(df_transcription, width=1000)
if __name__ == "__main__":
main()
|