Spaces:
Sleeping
Sleeping
File size: 12,044 Bytes
e522499 cdf0803 79f26df cdf0803 79f26df cdf0803 15c01f8 79f26df cdf0803 1ce52fb cdf0803 15c01f8 cdf0803 79f26df cdf0803 01ac828 cdf0803 01ac828 cdf0803 15c01f8 cdf0803 79f26df cdf0803 15c01f8 79f26df cdf0803 79f26df cdf0803 79f26df cdf0803 a1d5478 79f26df cdf0803 79f26df cdf0803 79f26df cdf0803 79f26df cdf0803 79f26df cdf0803 79f26df cdf0803 79f26df 15c01f8 79f26df cdf0803 e522499 cdf0803 79f26df |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
import streamlit as st
import pandas as pd
import numpy as np
import requests
import time
from collections import defaultdict
# Set page layout to wide mode and set page title
st.set_page_config(layout="wide", page_title="影城效率与内容分析工具")
# --- Efficiency Analysis Functions ---
def clean_movie_title(title):
if not isinstance(title, str):
return title
return title.split(' ', 1)[0]
def style_efficiency(row):
green = 'background-color: #E6F5E6;' # Light Green
red = 'background-color: #FFE5E5;' # Light Red
default = ''
styles = [default] * len(row)
seat_efficiency = row.get('座次效率', 0)
session_efficiency = row.get('场次效率', 0)
if seat_efficiency > 1.5 or session_efficiency > 1.5:
styles = [green] * len(row)
elif seat_efficiency < 0.5 or session_efficiency < 0.5:
styles = [red] * len(row)
return styles
def process_and_analyze_data(df):
if df.empty:
return pd.DataFrame()
analysis_df = df.groupby('影片名称_清理后').agg(
座位数=('座位数', 'sum'),
场次=('影片名称_清理后', 'size'),
票房=('总收入', 'sum'),
人次=('总人次', 'sum')
).reset_index()
analysis_df.rename(columns={'影片名称_清理后': '影片'}, inplace=True)
analysis_df = analysis_df.sort_values(by='票房', ascending=False).reset_index(drop=True)
total_seats = analysis_df['座位数'].sum()
total_sessions = analysis_df['场次'].sum()
total_revenue = analysis_df['票房'].sum()
analysis_df['均价'] = np.divide(analysis_df['票房'], analysis_df['人次']).fillna(0)
analysis_df['座次比'] = np.divide(analysis_df['座位数'], total_seats).fillna(0)
analysis_df['场次比'] = np.divide(analysis_df['场次'], total_sessions).fillna(0)
analysis_df['票房比'] = np.divide(analysis_df['票房'], total_revenue).fillna(0)
analysis_df['座次效率'] = np.divide(analysis_df['票房比'], analysis_df['座次比']).fillna(0)
analysis_df['场次效率'] = np.divide(analysis_df['票房比'], analysis_df['场次比']).fillna(0)
final_columns = ['影片', '座位数', '场次', '票房', '人次', '均价', '座次比', '场次比', '票房比', '座次效率',
'场次效率']
analysis_df = analysis_df[final_columns]
return analysis_df
# --- New Feature: Server Movie Content Inquiry ---
@st.cache_data(show_spinner=False)
def fetch_and_process_server_movies(priority_movie_titles=None):
if priority_movie_titles is None:
priority_movie_titles = []
# 1. Get Token
token_headers = {
'Host': 'oa.hengdianfilm.com:7080', 'Content-Type': 'application/json',
'Origin': 'http://115.239.253.233:7080', 'Connection': 'keep-alive',
'Accept': 'application/json, text/javascript, */*; q=0.01',
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_5_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/138.0.7204.156 Mobile/15E148 Safari/604.1',
'Accept-Language': 'zh-CN,zh-Hans;q=0.9',
}
token_json_data = {'appId': 'hd', 'appSecret': 'ad761f8578cc6170', 'timeStamp': int(time.time() * 1000)}
token_url = 'http://oa.hengdianfilm.com:7080/cinema-api/admin/generateToken?token=hd&murl=?token=hd&murl=ticket=-1495916529737643774'
response = requests.post(token_url, headers=token_headers, json=token_json_data, timeout=10)
response.raise_for_status()
token_data = response.json()
if token_data.get('error_code') != '0000':
raise Exception(f"获取Token失败: {token_data.get('error_desc')}")
auth_token = token_data['param']
# 2. Fetch movie list (with pagination and delay)
all_movies = []
page_index = 1
while True:
list_headers = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Content-Type': 'application/json; charset=UTF-8',
'Origin': 'http://115.239.253.233:7080', 'Proxy-Connection': 'keep-alive', 'Token': auth_token,
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
'X-SESSIONID': 'PQ0J3K85GJEDVYIGZE1KEG1K80USDAP4',
}
list_params = {'token': 'hd', 'murl': 'ContentMovie'}
list_json_data = {'THEATER_ID': 38205954, 'SOURCE': 'SERVER', 'ASSERT_TYPE': 2, 'PAGE_CAPACITY': 20,
'PAGE_INDEX': page_index}
list_url = 'http://oa.hengdianfilm.com:7080/cinema-api/cinema/server/dcp/list'
response = requests.post(list_url, params=list_params, headers=list_headers, json=list_json_data, verify=False)
response.raise_for_status()
movie_data = response.json()
if movie_data.get("RSPCD") != "000000":
raise Exception(f"获取影片列表失败: {movie_data.get('RSPMSG')}")
body = movie_data.get("BODY", {})
movies_on_page = body.get("LIST", [])
if not movies_on_page: break
all_movies.extend(movies_on_page)
if len(all_movies) >= body.get("COUNT", 0): break
page_index += 1
time.sleep(1) # Add 1-second delay between requests
# 3. Process data into a central, detailed structure
movie_details = {}
for movie in all_movies:
content_name = movie.get('CONTENT_NAME')
if not content_name: continue
movie_details[content_name] = {
'assert_name': movie.get('ASSERT_NAME'),
'halls': sorted([h.get('HALL_NAME') for h in movie.get('HALL_INFO', [])])
}
# 4. Prepare data for the two display views
# For View by Hall
by_hall = defaultdict(list)
for content_name, details in movie_details.items():
for hall_name in details['halls']:
by_hall[hall_name].append({'content_name': content_name, 'details': details})
for hall_name in by_hall:
by_hall[hall_name].sort(key=lambda item: (
item['details']['assert_name'] is None or item['details']['assert_name'] == '',
item['details']['assert_name'] or item['content_name']
))
# For View by Movie
view2_list = []
for content_name, details in movie_details.items():
if details.get('assert_name'):
view2_list.append({
'assert_name': details['assert_name'],
'content_name': content_name,
'halls': details['halls']
})
priority_list = [item for item in view2_list if
any(p_title in item['assert_name'] for p_title in priority_movie_titles)]
other_list_items = [item for item in view2_list if item not in priority_list]
priority_list.sort(key=lambda x: x['assert_name'])
other_list_items.sort(key=lambda x: x['assert_name'])
final_sorted_list = priority_list + other_list_items
return dict(sorted(by_hall.items())), final_sorted_list
def get_circled_number(hall_name):
mapping = {'1': '①', '2': '②', '3': '③', '4': '④', '5': '⑤', '6': '⑥', '7': '⑦', '8': '⑧', '9': '⑨'}
num_str = ''.join(filter(str.isdigit, hall_name))
return mapping.get(num_str, '')
# --- Streamlit Main UI ---
st.title('影城排片效率与内容分析工具')
st.write("上传 `影片映出日累计报表.xlsx` 进行效率分析,或点击下方按钮查询 TMS 服务器影片内容。")
uploaded_file = st.file_uploader("请在此处上传 Excel 文件", type=['xlsx', 'xls', 'csv'])
full_day_analysis = pd.DataFrame()
if uploaded_file is not None:
try:
# Efficiency analysis part
df = pd.read_excel(uploaded_file, skiprows=3, header=None)
df.rename(columns={0: '影片名称', 2: '放映时间', 5: '总人次', 6: '总收入', 7: '座位数'}, inplace=True)
required_cols = ['影片名称', '放映时间', '座位数', '总收入', '总人次']
df = df[required_cols]
df.dropna(subset=['影片名称', '放映时间'], inplace=True)
for col in ['座位数', '总收入', '总人次']:
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
df['放映时间'] = pd.to_datetime(df['放映时间'], format='%H:%M:%S', errors='coerce').dt.time
df.dropna(subset=['放映时间'], inplace=True)
df['影片名称_清理后'] = df['影片名称'].apply(clean_movie_title)
st.toast("文件上传成功,效率分析已生成!", icon="🎉")
format_config = {'座位数': '{:,.0f}', '场次': '{:,.0f}', '人次': '{:,.0f}', '票房': '{:,.2f}', '均价': '{:.2f}',
'座次比': '{:.2%}', '场次比': '{:.2%}', '票房比': '{:.2%}', '座次效率': '{:.2f}',
'场次效率': '{:.2f}'}
st.markdown("### 全天排片效率分析")
full_day_analysis = process_and_analyze_data(df.copy())
if not full_day_analysis.empty:
table_height = (len(full_day_analysis) + 1) * 35 + 3
st.dataframe(
full_day_analysis.style.format(format_config).apply(style_efficiency, axis=1).hide(axis="index"),
height=table_height, use_container_width=True)
st.markdown("#### 黄金时段排片效率分析 (14:00-21:00)")
start_time, end_time = pd.to_datetime('14:00:00').time(), pd.to_datetime('21:00:00').time()
prime_time_df = df[df['放映时间'].between(start_time, end_time)]
prime_time_analysis = process_and_analyze_data(prime_time_df.copy())
if not prime_time_analysis.empty:
table_height_prime = (len(prime_time_analysis) + 1) * 35 + 3
st.dataframe(
prime_time_analysis.style.format(format_config).apply(style_efficiency, axis=1).hide(axis="index"),
height=table_height_prime, use_container_width=True)
if not full_day_analysis.empty:
st.markdown("##### 复制当日排片列表")
movie_titles = full_day_analysis['影片'].tolist()
formatted_titles = ''.join([f'《{title}》' for title in movie_titles])
st.code(formatted_titles, language='text')
except Exception as e:
st.error(f"处理文件时出错: {e}")
# --- New Feature Module ---
st.markdown("### TMS 服务器影片内容查询")
if st.button('点击查询 TMS 服务器'):
with st.spinner("正在从 TMS 服务器获取数据中,请稍候..."):
try:
priority_titles = full_day_analysis['影片'].tolist() if not full_day_analysis.empty else []
halls_data, movie_list_sorted = fetch_and_process_server_movies(priority_titles)
st.toast("TMS 服务器数据获取成功!", icon="🎉")
# --- View by Movie (in a single expander) ---
st.markdown("#### 按影片查看所在影厅")
with st.expander("点击展开 / 折叠影片列表", expanded = True):
for item in movie_list_sorted:
circled_halls = " ".join(sorted([get_circled_number(h) for h in item['halls']]))
st.markdown(f"**{item['assert_name']}** - {circled_halls} - `{item['content_name']}`")
# --- View by Hall ---
st.markdown("#### 按影厅查看影片内容")
hall_tabs = st.tabs(halls_data.keys())
for tab, hall_name in zip(hall_tabs, halls_data.keys()):
with tab:
for movie_item in halls_data[hall_name]:
details = movie_item['details']
content_name = movie_item['content_name']
assert_name = details['assert_name']
display_name = assert_name if assert_name else content_name
circled_halls = " ".join(sorted([get_circled_number(h) for h in details['halls']]))
st.markdown(f"- **{display_name}** - {circled_halls} - `{content_name}`")
except Exception as e:
st.error(f"查询服务器时出错: {e}") |