Spaces:
Running
Running
from flask import Flask, request, jsonify, render_template, send_from_directory | |
import base64 | |
from pydub import AudioSegment | |
import os | |
import shutil | |
import requests | |
import tempfile | |
import json | |
from process import AudioProcessor | |
from transcription import TranscriptionMaker | |
from analyze import TextAnalyzer | |
from flask_cors import CORS | |
process = AudioProcessor() | |
transcripter = TranscriptionMaker() | |
app = Flask(__name__) | |
# CORS設定: すべてのオリジンからのリクエストを許可 | |
# 必要であれば、特定のオリジンやメソッド、ヘッダーをより厳密に指定できます | |
# 例: CORS(app, resources={r"/api/*": {"origins": "http://localhost:3000"}}, supports_credentials=True) | |
CORS(app, origins="*", methods=["GET", "POST", "DELETE", "OPTIONS"], headers=["Content-Type", "Authorization"]) | |
# GASのエンドポイントURL | |
GAS_URL = "https://script.google.com/macros/s/AKfycbwR2cnMKVU1AxoT9NDaeZaNUaTiwRX64Ul0sH0AU4ccP49Byph-TpxtM_Lwm4G9zLnuYA/exec" | |
users = [] # 選択されたユーザーのリスト | |
all_users = [] # 利用可能なすべてのユーザーのリスト | |
transcription_text = "" | |
harassment_keywords = [ | |
"バカ", "馬鹿", "アホ", "死ね", "クソ", "うざい", | |
"きもい", "キモい", "ブス", "デブ", "ハゲ", | |
"セクハラ", "パワハラ", "モラハラ" | |
] | |
total_audio = "" | |
def index(): | |
return render_template('index.html', users=users) | |
# フィードバック画面(テンプレート: feedback.html) | |
def feedback(): | |
return render_template('feedback.html') | |
# 会話詳細画面(テンプレート: talkDetail.html) | |
def talk_detail(): | |
return render_template('talkDetail.html') | |
# 音声登録画面(テンプレート: userRegister.html) | |
def userregister(): | |
return render_template('userRegister.html') | |
# 人数確認 | |
def confirm(): | |
global all_users | |
# 最新のユーザーリストを取得 | |
try: | |
update_all_users() | |
except Exception as e: | |
print(f"ユーザーリストの更新エラー: {str(e)}") | |
return jsonify({'members': users, 'all_members': all_users}), 200 | |
# リセット画面(テンプレート: reset.html) | |
def reset_html(): | |
return render_template('reset.html') | |
# メンバー削除&累積音声削除 | |
def reset_member(): | |
global users | |
global total_audio | |
global transcription_text | |
# 一時ディレクトリのクリーンアップ | |
if total_audio: | |
process.delete_files_in_directory(total_audio) | |
process.delete_files_in_directory('/tmp/data/transcription_audio') | |
# 書き起こしテキストの削除 | |
if os.path.exists(transcription_text): | |
try: | |
os.remove(transcription_text) | |
print(f"{transcription_text} を削除しました。") | |
except Exception as e: | |
print(f"ファイル削除中にエラーが発生しました: {e}") | |
transcription_text = "" | |
try: | |
data = request.get_json() | |
if not data or "names" not in data: | |
return jsonify({"status": "error", "message": "Invalid request body"}), 400 | |
names = data.get("names", []) | |
# GASからファイルを削除 | |
for name in names: | |
try: | |
delete_from_cloud(f"{name}.wav") | |
print(f"クラウドから {name}.wav を削除しました。") | |
except Exception as e: | |
print(f"クラウド削除中にエラーが発生しました: {e}") | |
return jsonify({"status": "error", "message": f"Failed to delete {name} from cloud: {e}"}), 500 | |
# usersリストから削除するユーザーを除外 | |
users = [u for u in users if u not in names] | |
# 全ユーザーリストの更新 | |
update_all_users() | |
return jsonify({"status": "success", "message": "Members deleted successfully", "users": users}), 200 | |
except Exception as e: | |
print(f"An unexpected error occurred: {e}") | |
return jsonify({"status": "error", "message": f"Internal server error: {e}"}), 500 | |
# 書き起こし作成エンドポイント | |
def transcription(): | |
global transcription_text | |
global total_audio | |
if not os.path.exists(transcription_text) or not transcription_text: | |
try: | |
if not total_audio or not os.path.exists(total_audio): | |
return jsonify({"error": "No audio segments provided"}), 400 | |
transcription_text = transcripter.create_transcription(total_audio) | |
print("transcription") | |
print(transcription_text) | |
except Exception as e: | |
return jsonify({"error": str(e)}), 500 | |
try: | |
with open(transcription_text, 'r', encoding='utf-8') as file: | |
file_content = file.read() | |
print(file_content) | |
return jsonify({'transcription': file_content}), 200 | |
except FileNotFoundError: | |
return jsonify({"error": "Transcription file not found"}), 404 | |
except Exception as e: | |
return jsonify({"error": f"Unexpected error: {str(e)}"}), 500 | |
# AI分析エンドポイント | |
def analyze(): | |
global transcription_text | |
global total_audio | |
if not os.path.exists(transcription_text) or not transcription_text: | |
try: | |
if not total_audio: | |
return jsonify({"error": "No audio segments provided"}), 400 | |
transcription_text = transcripter.create_transcription(total_audio) | |
except Exception as e: | |
return jsonify({"error": str(e)}), 500 | |
analyzer = TextAnalyzer(transcription_text, harassment_keywords) | |
api_key = os.environ.get("DEEPSEEK") | |
if api_key is None: | |
raise ValueError("DEEPSEEK_API_KEY が設定されていません。") | |
results = analyzer.analyze(api_key=api_key) | |
print(json.dumps(results, ensure_ascii=False, indent=2)) | |
if "deepseek_analysis" in results and results["deepseek_analysis"]: | |
deepseek_data = results["deepseek_analysis"] | |
conversation_level = deepseek_data.get("conversationLevel") | |
harassment_present = deepseek_data.get("harassmentPresent") | |
harassment_type = deepseek_data.get("harassmentType") | |
repetition = deepseek_data.get("repetition") | |
pleasantConversation = deepseek_data.get("pleasantConversation") | |
blameOrHarassment = deepseek_data.get("blameOrHarassment") | |
print("\n--- DeepSeek 分析結果 ---") | |
print(f"会話レベル: {conversation_level}") | |
print(f"ハラスメントの有無: {harassment_present}") | |
print(f"ハラスメントの種類: {harassment_type}") | |
print(f"繰り返しの程度: {repetition}") | |
print(f"会話の心地よさ: {pleasantConversation}") | |
print(f"非難またはハラスメントの程度: {blameOrHarassment}") | |
return jsonify({"results": results}), 200 | |
# クラウドから音声を取得してローカルに保存する関数 | |
def download_from_cloud(filename, local_path): | |
try: | |
payload = { | |
"action": "download", | |
"fileName": filename | |
} | |
print(f"クラウドから {filename} をダウンロード中...") | |
response = requests.post(GAS_URL, json=payload) | |
if response.status_code != 200: | |
print(f"ダウンロードエラー: ステータスコード {response.status_code}") | |
print(f"レスポンス: {response.text}") | |
raise Exception(f"クラウドからのダウンロードに失敗しました: {response.text}") | |
try: | |
res_json = response.json() | |
except: | |
print("JSONデコードエラー、レスポンス内容:") | |
print(response.text[:500]) # 最初の500文字だけ表示 | |
raise Exception("サーバーからの応答をJSONとして解析できませんでした") | |
if res_json.get("status") != "success": | |
print(f"ダウンロードステータスエラー: {res_json.get('message')}") | |
raise Exception(f"クラウドからのダウンロードに失敗しました: {res_json.get('message')}") | |
# Base64文字列をデコード | |
base64_data = res_json.get("base64Data") | |
if not base64_data: | |
print("Base64データが存在しません") | |
raise Exception("応答にBase64データが含まれていません") | |
try: | |
audio_binary = base64.b64decode(base64_data) | |
except Exception as e: | |
print(f"Base64デコードエラー: {str(e)}") | |
raise Exception(f"音声データのデコードに失敗しました: {str(e)}") | |
# 指定パスに保存 | |
os.makedirs(os.path.dirname(local_path), exist_ok=True) | |
with open(local_path, 'wb') as f: | |
f.write(audio_binary) | |
print(f"{filename} をローカルに保存しました: {local_path}") | |
# データの整合性チェック(ファイルサイズが0より大きいかなど) | |
if os.path.getsize(local_path) <= 0: | |
raise Exception(f"保存されたファイル {local_path} のサイズが0バイトです") | |
return local_path | |
except Exception as e: | |
print(f"ダウンロード中にエラーが発生しました: {str(e)}") | |
# エラーを上位に伝播させる | |
raise | |
# クラウドからファイルを削除する関数 | |
def delete_from_cloud(filename): | |
payload = { | |
"action": "delete", | |
"fileName": filename | |
} | |
response = requests.post(GAS_URL, json=payload) | |
if response.status_code != 200: | |
raise Exception(f"クラウドからの削除に失敗しました: {response.text}") | |
res_json = response.json() | |
if res_json.get("status") != "success": | |
raise Exception(f"クラウドからの削除に失敗しました: {res_json.get('message')}") | |
return True | |
# すべてのベース音声ユーザーリストを更新する関数 | |
def update_all_users(): | |
global all_users | |
payload = {"action": "list"} | |
response = requests.post(GAS_URL, json=payload) | |
if response.status_code != 200: | |
raise Exception(f"GAS一覧取得エラー: {response.text}") | |
res_json = response.json() | |
if res_json.get("status") != "success": | |
raise Exception(f"GAS一覧取得失敗: {res_json.get('message')}") | |
# ファイル名から拡張子を除去してユーザーリストを作成 | |
all_users = [os.path.splitext(filename)[0] for filename in res_json.get("fileNames", [])] | |
return all_users | |
# 音声アップロード&解析エンドポイント | |
def upload_audio(): | |
global total_audio | |
global users # グローバル変数のusersを更新する場合 | |
try: | |
data = request.get_json() | |
if not data or 'audio_data' not in data: | |
print("エラー: リクエストに audio_data が含まれていません。") | |
return jsonify({"error": "音声データがありません"}), 400 | |
# リクエストからユーザーリストを取得(指定がなければ現在のusersを使用) | |
# リクエストごとにユーザーを指定する方が安全 | |
request_users = data.get('selected_users', []) # .getでキーが存在しない場合も安全に | |
if request_users: | |
current_users = request_users # リクエストで指定されたユーザーを使用 | |
print(f"リクエストから選択されたユーザー: {current_users}") | |
else: | |
# グローバル変数を使う場合(非推奨) | |
# current_users = users | |
# print(f"グローバル変数からユーザーを使用: {current_users}") | |
# グローバル変数ではなく、エラーにする方が安全 | |
print("エラー: リクエストに selected_users が指定されていません。") | |
return jsonify({"error": "選択されたユーザーが指定されていません"}), 400 | |
if not current_users: | |
print("エラー: 処理対象のユーザーがいません。") | |
return jsonify({"error": "選択されたユーザーがいません"}), 400 | |
# Base64デコードして音声バイナリを取得 | |
audio_binary = base64.b64decode(data['audio_data']) | |
# 一時ファイルに音声を保存 | |
upload_name = 'uploaded_audio_segment' # 一時ファイル名 | |
audio_dir = "/tmp/justalk_audio_data" # 一時ディレクトリ名(環境に合わせて変更可) | |
os.makedirs(audio_dir, exist_ok=True) | |
audio_path = os.path.join(audio_dir, f"{upload_name}.wav") | |
with open(audio_path, 'wb') as f: | |
f.write(audio_binary) | |
print(f"一時音声ファイルを保存: {audio_path}") | |
print(f"処理を実行するユーザー: {current_users}") | |
# 参照音声用の一時ディレクトリ | |
temp_ref_dir = os.path.join(audio_dir, "base_audio") | |
os.makedirs(temp_ref_dir, exist_ok=True) | |
# 各ユーザーの参照音声ファイルのパスをリストに格納 | |
reference_paths = [] | |
missing_files = [] | |
for user in current_users: | |
try: | |
ref_path = os.path.join(temp_ref_dir, f"{user}.wav") | |
# 参照ファイルがローカルになければクラウドから取得試行 | |
if not os.path.exists(ref_path): | |
print(f"参照音声 {ref_path} がローカルにありません。ダウンロードを試みます...") | |
if not download_from_cloud(f"{user}.wav", ref_path): | |
print(f"エラー: {user}.wav のダウンロードに失敗しました。") | |
missing_files.append(user) | |
continue # 次のユーザーへ | |
else: | |
print(f"クラウドから {user}.wav を {ref_path} にダウンロードしました") | |
# 再度存在確認 (ダウンロード成功したか) | |
if os.path.exists(ref_path): | |
reference_paths.append(ref_path) | |
else: | |
# 最終的にファイルが見つからなかった場合 | |
print(f"エラー: ユーザー '{user}' の参照音声ファイルが見つかりません: {ref_path}") | |
missing_files.append(user) | |
except Exception as e: | |
print(f"エラー: ユーザー '{user}' の参照音声準備中にエラーが発生しました: {e}") | |
# エラーが発生したユーザーをリストに追加 | |
missing_files.append(user) | |
# エラーの詳細をログに出力 | |
traceback.print_exc() | |
# 必要な参照ファイルが不足している場合はエラーを返す | |
if missing_files: | |
return jsonify({"error": f"一部ユーザーの参照音声が見つかりません: {', '.join(missing_files)}"}), 500 | |
# 処理に必要なユーザー数と参照ファイル数が一致しない場合もエラー | |
if len(reference_paths) != len(current_users): | |
return jsonify({"error": f"参照音声ファイルの数({len(reference_paths)})がユーザー数({len(current_users)})と一致しません"}), 500 | |
# --- ユーザー数に応じて処理分岐 --- | |
if len(current_users) > 1: | |
# --- 複数人処理 --- | |
print(f"複数人 ({len(current_users)}人) の音声処理を開始します。") | |
try: | |
matched_times, merged_segments = process.process_multi_audio( | |
reference_paths, audio_path, current_users, threshold=0.05 | |
) | |
# total_audio = transcripter.save_marged_segments(merged_segments) # 必要なら有効化 | |
except Exception as proc_e: | |
print(f"エラー: process_multi_audio でエラーが発生しました: {proc_e}") | |
traceback.print_exc() | |
return jsonify({"error": "音声処理中にエラーが発生しました(multi)", "details": str(proc_e)}), 500 | |
# 各メンバーのrateを計算 | |
total_matched_time = sum(matched_times) # 発話時間の合計 | |
user_rates = {} # { 'ユーザー名': rate } 形式の辞書 | |
print(f"各ユーザーの発話時間 (秒): {dict(zip(current_users, matched_times))}") | |
print(f"発話時間の合計 (秒): {total_matched_time:.2f}") | |
# 各ユーザーの割合を計算 | |
for i in range(len(current_users)): | |
user = current_users[i] | |
time = matched_times[i] | |
# 発話合計時間が0より大きい場合のみ割合計算 | |
rate = (time / total_matched_time) * 100 if total_matched_time > 0 else 0 | |
# 念のため rate が 0 未満にならないようにする | |
user_rates[user] = max(0, rate) | |
print(f"計算直後の user_rates: {user_rates}") | |
# --- 'その他' の計算と追加 --- | |
current_total_rate = sum(user_rates.values()) # 計算されたレートの合計 | |
print(f"計算後の合計レート: {current_total_rate:.2f}%") | |
# 合計が100%未満の場合 (浮動小数点誤差を考慮) | |
# かつ合計が負でないことを確認(通常ありえないが念のため) | |
if current_total_rate < 99.99 and current_total_rate >= 0: | |
other_rate = 100.0 - current_total_rate | |
user_rates['その他'] = other_rate # 'その他' を追加 | |
print(f"'その他' ({other_rate:.2f}%) を追加しました。") | |
# オプション: 合計が100%をわずかに超える場合の正規化 (必要に応じてコメント解除) | |
elif current_total_rate > 100.01: | |
print(f"警告: 合計レートが {current_total_rate:.2f}% で100%を超えました。正規化します。") | |
factor = 100.0 / current_total_rate | |
normalized_rates = {} | |
temp_sum = 0 | |
keys = list(user_rates.keys()) | |
for i, user in enumerate(keys): | |
if i < len(keys) - 1: | |
normalized_rate = user_rates[user] * factor | |
normalized_rates[user] = normalized_rate | |
temp_sum += normalized_rate | |
else: | |
normalized_rates[user] = max(0, 100.0 - temp_sum) # 最後の要素で調整、0未満防止 | |
user_rates = normalized_rates | |
print(f"正規化後の user_rates: {user_rates}") | |
print(f"最終的に返す user_rates: {user_rates}") | |
# React側が扱いやすい user_rates 形式で返す | |
return jsonify({"user_rates": user_rates}), 200 | |
else: | |
# --- 単一ユーザー処理 --- | |
print(f"単一ユーザー ({current_users[0]}) の音声処理を開始します。") | |
try: | |
matched_time, unmatched_time, merged_segments = process.process_audio( | |
reference_paths[0], audio_path, current_users[0], threshold=0.05 | |
) | |
# total_audio = transcripter.save_marged_segments(merged_segments) # 必要なら有効化 | |
except Exception as proc_e: | |
print(f"エラー: process_audio でエラーが発生しました: {proc_e}") | |
traceback.print_exc() | |
return jsonify({"error": "音声処理中にエラーが発生しました(single)", "details": str(proc_e)}), 500 | |
total_time = matched_time + unmatched_time | |
rate = (matched_time / total_time) * 100 if total_time > 0 else 0 | |
# レートを 0-100 の範囲に収める | |
rate = max(0, min(100, rate)) | |
silent_rate = 100.0 - rate | |
# シングルユーザーでも user_rates 形式で統一して返す | |
user_rates = {current_users[0]: rate, '無音': silent_rate} | |
print(f"単一ユーザー、user_rates形式で返す: {user_rates}") | |
return jsonify({"user_rates": user_rates}), 200 | |
except Exception as e: | |
print(f"エラー: /upload_audio の処理中に予期せぬエラーが発生しました: {e}") | |
# エラーの詳細をスタックトレース付きでログに出力 | |
traceback.print_exc() | |
return jsonify({"error": "サーバー内部エラーが発生しました", "details": str(e)}), 500 | |
# ユーザー選択画面(テンプレート: userSelect.html) | |
def userselect(): | |
return render_template('userSelect.html') | |
# 選択したユーザーを設定するエンドポイント | |
def select_users(): | |
global users | |
try: | |
data = request.get_json() | |
if not data or 'users' not in data: | |
return jsonify({"error": "ユーザーリストがありません"}), 400 | |
users = data['users'] | |
print(f"選択されたユーザー: {users}") | |
return jsonify({"status": "success", "selected_users": users}), 200 | |
except Exception as e: | |
print("Error in /select_users:", str(e)) | |
return jsonify({"error": "サーバーエラー", "details": str(e)}), 500 | |
def reset(): | |
global users | |
users = [] | |
global total_audio | |
global transcription_text | |
# 一時ディレクトリのクリーンアップ | |
if total_audio: | |
process.delete_files_in_directory(total_audio) | |
process.delete_files_in_directory('/tmp/data/transcription_audio') | |
# 書き起こしテキストの削除 | |
if os.path.exists(transcription_text): | |
try: | |
os.remove(transcription_text) | |
print(f"{transcription_text} を削除しました。") | |
except Exception as e: | |
print(f"ファイル削除中にエラーが発生しました: {e}") | |
transcription_text = "" | |
return jsonify({"status": "success", "message": "Users reset"}), 200 | |
def copy_selected_files(): | |
try: | |
data = request.get_json() | |
if not data or "names" not in data: | |
return jsonify({"error": "namesパラメータが存在しません"}), 400 | |
names = data["names"] | |
dest_dir = "/tmp/data/selected_audio" # コピー先のフォルダ | |
os.makedirs(dest_dir, exist_ok=True) | |
copied_files = [] | |
for name in names: | |
dest_path = os.path.join(dest_dir, f"{name}.wav") | |
try: | |
# クラウドから直接ダウンロード | |
download_from_cloud(f"{name}.wav", dest_path) | |
copied_files.append(name) | |
print(f"{name}.wav を {dest_path} にダウンロードしました。") | |
except Exception as e: | |
print(f"ダウンロード中にエラーが発生しました: {e}") | |
continue | |
return jsonify({"status": "success", "copied": copied_files}), 200 | |
except Exception as e: | |
print("Error in /copy_selected_files:", str(e)) | |
return jsonify({"error": "サーバー内部エラー", "details": str(e)}), 500 | |
def clear_tmp(): | |
try: | |
tmp_dir = "/tmp/data" # アプリケーションが使用しているtmpフォルダ | |
# ファイルのみの削除 | |
process.delete_files_in_directory(tmp_dir) | |
# フォルダがあれば再帰的に削除 | |
for item in os.listdir(tmp_dir): | |
item_path = os.path.join(tmp_dir, item) | |
if os.path.isdir(item_path): | |
shutil.rmtree(item_path) | |
print(f"ディレクトリを削除しました: {item_path}") | |
return jsonify({"status": "success", "message": "tmp配下がすべて削除されました"}), 200 | |
except Exception as e: | |
print("Error in /clear_tmp:", str(e)) | |
return jsonify({"error": "サーバー内部エラー", "details": str(e)}), 500 | |
def upload_base_audio(): | |
global all_users | |
try: | |
data = request.get_json() | |
if not data or 'audio_data' not in data or 'name' not in data: | |
return jsonify({"error": "音声データまたは名前がありません"}), 400 | |
name = data['name'] | |
print(f"登録名: {name}") | |
# GASのアップロードエンドポイントにリクエスト | |
payload = { | |
"action": "upload", | |
"fileName": f"{name}.wav", | |
"base64Data": data['audio_data'] | |
} | |
response = requests.post(GAS_URL, json=payload) | |
if response.status_code != 200: | |
return jsonify({"error": "GASアップロードエラー", "details": response.text}), 500 | |
res_json = response.json() | |
if res_json.get("status") != "success": | |
return jsonify({"error": "GASアップロード失敗", "details": res_json.get("message")}), 500 | |
# 全ユーザーリストを更新 | |
update_all_users() | |
return jsonify({"state": "Registration Success!", "driveFileId": res_json.get("fileId")}), 200 | |
except Exception as e: | |
print("Error in /upload_base_audio:", str(e)) | |
return jsonify({"error": "サーバーエラー", "details": str(e)}), 500 | |
def list_base_audio(): | |
try: | |
global all_users | |
all_users = update_all_users() | |
return jsonify({"status": "success", "id": all_users}), 200 | |
except Exception as e: | |
print("Error in /list_base_audio:", str(e)) | |
return jsonify({"error": "サーバーエラー", "details": str(e)}), 500 | |
if __name__ == '__main__': | |
port = int(os.environ.get("PORT", 7860)) | |
app.run(debug=True, host="0.0.0.0", port=port) |