import streamlit as st import pandas as pd import numpy as np import requests import time from collections import defaultdict # Set page layout to wide mode and set page title st.set_page_config(layout="wide", page_title="影城效率与内容分析工具") # --- Efficiency Analysis Functions --- def clean_movie_title(title): if not isinstance(title, str): return title return title.split(' ', 1)[0] def style_efficiency(row): green = 'background-color: #E6F5E6;' # Light Green red = 'background-color: #FFE5E5;' # Light Red default = '' styles = [default] * len(row) seat_efficiency = row.get('座次效率', 0) session_efficiency = row.get('场次效率', 0) if seat_efficiency > 1.5 or session_efficiency > 1.5: styles = [green] * len(row) elif seat_efficiency < 0.5 or session_efficiency < 0.5: styles = [red] * len(row) return styles def process_and_analyze_data(df): if df.empty: return pd.DataFrame() analysis_df = df.groupby('影片名称_清理后').agg( 座位数=('座位数', 'sum'), 场次=('影片名称_清理后', 'size'), 票房=('总收入', 'sum'), 人次=('总人次', 'sum') ).reset_index() analysis_df.rename(columns={'影片名称_清理后': '影片'}, inplace=True) analysis_df = analysis_df.sort_values(by='票房', ascending=False).reset_index(drop=True) total_seats = analysis_df['座位数'].sum() total_sessions = analysis_df['场次'].sum() total_revenue = analysis_df['票房'].sum() analysis_df['均价'] = np.divide(analysis_df['票房'], analysis_df['人次']).fillna(0) analysis_df['座次比'] = np.divide(analysis_df['座位数'], total_seats).fillna(0) analysis_df['场次比'] = np.divide(analysis_df['场次'], total_sessions).fillna(0) analysis_df['票房比'] = np.divide(analysis_df['票房'], total_revenue).fillna(0) analysis_df['座次效率'] = np.divide(analysis_df['票房比'], analysis_df['座次比']).fillna(0) analysis_df['场次效率'] = np.divide(analysis_df['票房比'], analysis_df['场次比']).fillna(0) final_columns = ['影片', '座位数', '场次', '票房', '人次', '均价', '座次比', '场次比', '票房比', '座次效率', '场次效率'] analysis_df = analysis_df[final_columns] return analysis_df # --- New Feature: Server Movie Content Inquiry --- @st.cache_data(show_spinner=False) def fetch_and_process_server_movies(priority_movie_titles=None): if priority_movie_titles is None: priority_movie_titles = [] # 1. Get Token token_headers = { 'Host': 'oa.hengdianfilm.com:7080', 'Content-Type': 'application/json', 'Origin': 'http://115.239.253.233:7080', 'Connection': 'keep-alive', 'Accept': 'application/json, text/javascript, */*; q=0.01', 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_5_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/138.0.7204.156 Mobile/15E148 Safari/604.1', 'Accept-Language': 'zh-CN,zh-Hans;q=0.9', } token_json_data = {'appId': 'hd', 'appSecret': 'ad761f8578cc6170', 'timeStamp': int(time.time() * 1000)} token_url = 'http://oa.hengdianfilm.com:7080/cinema-api/admin/generateToken?token=hd&murl=?token=hd&murl=ticket=-1495916529737643774' response = requests.post(token_url, headers=token_headers, json=token_json_data, timeout=10) response.raise_for_status() token_data = response.json() if token_data.get('error_code') != '0000': raise Exception(f"获取Token失败: {token_data.get('error_desc')}") auth_token = token_data['param'] # 2. Fetch movie list (with pagination and delay) all_movies = [] page_index = 1 while True: list_headers = { 'Accept': 'application/json, text/javascript, */*; q=0.01', 'Content-Type': 'application/json; charset=UTF-8', 'Origin': 'http://115.239.253.233:7080', 'Proxy-Connection': 'keep-alive', 'Token': auth_token, 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36', 'X-SESSIONID': 'PQ0J3K85GJEDVYIGZE1KEG1K80USDAP4', } list_params = {'token': 'hd', 'murl': 'ContentMovie'} list_json_data = {'THEATER_ID': 38205954, 'SOURCE': 'SERVER', 'ASSERT_TYPE': 2, 'PAGE_CAPACITY': 20, 'PAGE_INDEX': page_index} list_url = 'http://oa.hengdianfilm.com:7080/cinema-api/cinema/server/dcp/list' response = requests.post(list_url, params=list_params, headers=list_headers, json=list_json_data, verify=False) response.raise_for_status() movie_data = response.json() if movie_data.get("RSPCD") != "000000": raise Exception(f"获取影片列表失败: {movie_data.get('RSPMSG')}") body = movie_data.get("BODY", {}) movies_on_page = body.get("LIST", []) if not movies_on_page: break all_movies.extend(movies_on_page) if len(all_movies) >= body.get("COUNT", 0): break page_index += 1 time.sleep(1) # Add 1-second delay between requests # 3. Process data into a central, detailed structure movie_details = {} for movie in all_movies: content_name = movie.get('CONTENT_NAME') if not content_name: continue movie_details[content_name] = { 'assert_name': movie.get('ASSERT_NAME'), 'halls': sorted([h.get('HALL_NAME') for h in movie.get('HALL_INFO', [])]) } # 4. Prepare data for the two display views # For View by Hall by_hall = defaultdict(list) for content_name, details in movie_details.items(): for hall_name in details['halls']: by_hall[hall_name].append({'content_name': content_name, 'details': details}) for hall_name in by_hall: by_hall[hall_name].sort(key=lambda item: ( item['details']['assert_name'] is None or item['details']['assert_name'] == '', item['details']['assert_name'] or item['content_name'] )) # For View by Movie view2_list = [] for content_name, details in movie_details.items(): if details.get('assert_name'): view2_list.append({ 'assert_name': details['assert_name'], 'content_name': content_name, 'halls': details['halls'] }) priority_list = [item for item in view2_list if any(p_title in item['assert_name'] for p_title in priority_movie_titles)] other_list_items = [item for item in view2_list if item not in priority_list] priority_list.sort(key=lambda x: x['assert_name']) other_list_items.sort(key=lambda x: x['assert_name']) final_sorted_list = priority_list + other_list_items return dict(sorted(by_hall.items())), final_sorted_list def get_circled_number(hall_name): mapping = {'1': '①', '2': '②', '3': '③', '4': '④', '5': '⑤', '6': '⑥', '7': '⑦', '8': '⑧', '9': '⑨'} num_str = ''.join(filter(str.isdigit, hall_name)) return mapping.get(num_str, '') # --- Streamlit Main UI --- st.title('影城排片效率与内容分析工具') st.write("上传 `影片映出日累计报表.xlsx` 进行效率分析,或点击下方按钮查询 TMS 服务器影片内容。") uploaded_file = st.file_uploader("请在此处上传 Excel 文件", type=['xlsx', 'xls', 'csv']) full_day_analysis = pd.DataFrame() if uploaded_file is not None: try: # Efficiency analysis part df = pd.read_excel(uploaded_file, skiprows=3, header=None) df.rename(columns={0: '影片名称', 2: '放映时间', 5: '总人次', 6: '总收入', 7: '座位数'}, inplace=True) required_cols = ['影片名称', '放映时间', '座位数', '总收入', '总人次'] df = df[required_cols] df.dropna(subset=['影片名称', '放映时间'], inplace=True) for col in ['座位数', '总收入', '总人次']: df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0) df['放映时间'] = pd.to_datetime(df['放映时间'], format='%H:%M:%S', errors='coerce').dt.time df.dropna(subset=['放映时间'], inplace=True) df['影片名称_清理后'] = df['影片名称'].apply(clean_movie_title) st.toast("文件上传成功,效率分析已生成!", icon="🎉") format_config = {'座位数': '{:,.0f}', '场次': '{:,.0f}', '人次': '{:,.0f}', '票房': '{:,.2f}', '均价': '{:.2f}', '座次比': '{:.2%}', '场次比': '{:.2%}', '票房比': '{:.2%}', '座次效率': '{:.2f}', '场次效率': '{:.2f}'} st.markdown("### 全天排片效率分析") full_day_analysis = process_and_analyze_data(df.copy()) if not full_day_analysis.empty: table_height = (len(full_day_analysis) + 1) * 35 + 3 st.dataframe( full_day_analysis.style.format(format_config).apply(style_efficiency, axis=1).hide(axis="index"), height=table_height, use_container_width=True) st.markdown("#### 黄金时段排片效率分析 (14:00-21:00)") start_time, end_time = pd.to_datetime('14:00:00').time(), pd.to_datetime('21:00:00').time() prime_time_df = df[df['放映时间'].between(start_time, end_time)] prime_time_analysis = process_and_analyze_data(prime_time_df.copy()) if not prime_time_analysis.empty: table_height_prime = (len(prime_time_analysis) + 1) * 35 + 3 st.dataframe( prime_time_analysis.style.format(format_config).apply(style_efficiency, axis=1).hide(axis="index"), height=table_height_prime, use_container_width=True) if not full_day_analysis.empty: st.markdown("##### 复制当日排片列表") movie_titles = full_day_analysis['影片'].tolist() formatted_titles = ''.join([f'《{title}》' for title in movie_titles]) st.code(formatted_titles, language='text') except Exception as e: st.error(f"处理文件时出错: {e}") # --- New Feature Module --- st.markdown("### TMS 服务器影片内容查询") if st.button('点击查询 TMS 服务器'): with st.spinner("正在从 TMS 服务器获取数据中,请稍候..."): try: priority_titles = full_day_analysis['影片'].tolist() if not full_day_analysis.empty else [] halls_data, movie_list_sorted = fetch_and_process_server_movies(priority_titles) st.toast("TMS 服务器数据获取成功!", icon="🎉") # --- View by Movie (in a single expander) --- st.markdown("#### 按影片查看所在影厅") with st.expander("点击展开 / 折叠影片列表", expanded = True): for item in movie_list_sorted: circled_halls = " ".join(sorted([get_circled_number(h) for h in item['halls']])) st.markdown(f"**{item['assert_name']}** - {circled_halls} - `{item['content_name']}`") # --- View by Hall --- st.markdown("#### 按影厅查看影片内容") hall_tabs = st.tabs(halls_data.keys()) for tab, hall_name in zip(hall_tabs, halls_data.keys()): with tab: for movie_item in halls_data[hall_name]: details = movie_item['details'] content_name = movie_item['content_name'] assert_name = details['assert_name'] display_name = assert_name if assert_name else content_name circled_halls = " ".join(sorted([get_circled_number(h) for h in details['halls']])) st.markdown(f"- **{display_name}** - {circled_halls} - `{content_name}`") except Exception as e: st.error(f"查询服务器时出错: {e}")