jiguang / app.py
Ethscriptions's picture
Update app.py
f9efdb9 verified
raw
history blame
13.4 kB
import streamlit as st
import pandas as pd
import numpy as np
import requests
import time
from collections import defaultdict
# Set page layout to wide mode and set page title
st.set_page_config(layout="wide", page_title="影城效率与内容分析工具")
# --- Efficiency Analysis Functions ---
def clean_movie_title(title):
if not isinstance(title, str):
return title
return title.split(' ', 1)[0]
def style_efficiency(row):
green = 'background-color: #E6F5E6;' # Light Green
red = 'background-color: #FFE5E5;' # Light Red
default = ''
styles = [default] * len(row)
seat_efficiency = row.get('座次效率', 0)
session_efficiency = row.get('场次效率', 0)
if seat_efficiency > 1.5 or session_efficiency > 1.5:
styles = [green] * len(row)
elif seat_efficiency < 0.5 or session_efficiency < 0.5:
styles = [red] * len(row)
return styles
def process_and_analyze_data(df):
if df.empty:
return pd.DataFrame()
analysis_df = df.groupby('影片名称_清理后').agg(
座位数=('座位数', 'sum'),
场次=('影片名称_清理后', 'size'),
票房=('总收入', 'sum'),
人次=('总人次', 'sum')
).reset_index()
analysis_df.rename(columns={'影片名称_清理后': '影片'}, inplace=True)
analysis_df = analysis_df.sort_values(by='票房', ascending=False).reset_index(drop=True)
total_seats = analysis_df['座位数'].sum()
total_sessions = analysis_df['场次'].sum()
total_revenue = analysis_df['票房'].sum()
analysis_df['均价'] = np.divide(analysis_df['票房'], analysis_df['人次']).fillna(0)
analysis_df['座次比'] = np.divide(analysis_df['座位数'], total_seats).fillna(0)
analysis_df['场次比'] = np.divide(analysis_df['场次'], total_sessions).fillna(0)
analysis_df['票房比'] = np.divide(analysis_df['票房'], total_revenue).fillna(0)
analysis_df['座次效率'] = np.divide(analysis_df['票房比'], analysis_df['座次比']).fillna(0)
analysis_df['场次效率'] = np.divide(analysis_df['票房比'], analysis_df['场次比']).fillna(0)
final_columns = ['影片', '座位数', '场次', '票房', '人次', '均价', '座次比', '场次比', '票房比', '座次效率',
'场次效率']
analysis_df = analysis_df[final_columns]
return analysis_df
# --- New Feature: Server Movie Content Inquiry ---
@st.cache_data(show_spinner=False)
def fetch_and_process_server_movies(priority_movie_titles=None):
if priority_movie_titles is None:
priority_movie_titles = []
# 1. Get Token
token_headers = {
'Host': 'oa.hengdianfilm.com:7080', 'Content-Type': 'application/json',
'Origin': 'http://115.239.253.233:7080', 'Connection': 'keep-alive',
'Accept': 'application/json, text/javascript, */*; q=0.01',
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_5_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/138.0.7204.156 Mobile/15E148 Safari/604.1',
'Accept-Language': 'zh-CN,zh-Hans;q=0.9',
}
token_json_data = {'appId': 'hd', 'appSecret': 'ad761f8578cc6170', 'timeStamp': int(time.time() * 1000)}
token_url = 'http://oa.hengdianfilm.com:7080/cinema-api/admin/generateToken?token=hd&murl=?token=hd&murl=ticket=-1495916529737643774'
response = requests.post(token_url, headers=token_headers, json=token_json_data, timeout=10)
response.raise_for_status()
token_data = response.json()
if token_data.get('error_code') != '0000':
raise Exception(f"获取Token失败: {token_data.get('error_desc')}")
auth_token = token_data['param']
# 2. Fetch movie list (with pagination and delay)
all_movies = []
page_index = 1
while True:
list_headers = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Content-Type': 'application/json; charset=UTF-8',
'Origin': 'http://115.239.253.233:7080', 'Proxy-Connection': 'keep-alive', 'Token': auth_token,
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
'X-SESSIONID': 'PQ0J3K85GJEDVYIGZE1KEG1K80USDAP4',
}
list_params = {'token': 'hd', 'murl': 'ContentMovie'}
list_json_data = {'THEATER_ID': 38205954, 'SOURCE': 'SERVER', 'ASSERT_TYPE': 2, 'PAGE_CAPACITY': 20,
'PAGE_INDEX': page_index}
list_url = 'http://oa.hengdianfilm.com:7080/cinema-api/cinema/server/dcp/list'
response = requests.post(list_url, params=list_params, headers=list_headers, json=list_json_data, verify=False)
response.raise_for_status()
movie_data = response.json()
if movie_data.get("RSPCD") != "000000":
raise Exception(f"获取影片列表失败: {movie_data.get('RSPMSG')}")
body = movie_data.get("BODY", {})
movies_on_page = body.get("LIST", [])
if not movies_on_page: break
all_movies.extend(movies_on_page)
if len(all_movies) >= body.get("COUNT", 0): break
page_index += 1
time.sleep(1) # Add 1-second delay between requests
# 3. Process data into a central, detailed structure
movie_details = {}
for movie in all_movies:
content_name = movie.get('CONTENT_NAME')
if not content_name: continue
movie_details[content_name] = {
'assert_name': movie.get('ASSERT_NAME'),
'halls': sorted([h.get('HALL_NAME') for h in movie.get('HALL_INFO', [])]),
'play_time': movie.get('PLAY_TIME')
}
# 4. Prepare data for the two display views
by_hall = defaultdict(list)
for content_name, details in movie_details.items():
for hall_name in details['halls']:
by_hall[hall_name].append({'content_name': content_name, 'details': details})
for hall_name in by_hall:
by_hall[hall_name].sort(key=lambda item: (
item['details']['assert_name'] is None or item['details']['assert_name'] == '',
item['details']['assert_name'] or item['content_name']
))
view2_list = []
for content_name, details in movie_details.items():
if details.get('assert_name'):
view2_list.append({
'assert_name': details['assert_name'],
'content_name': content_name,
'halls': details['halls'],
'play_time': details['play_time']
})
priority_list = [item for item in view2_list if
any(p_title in item['assert_name'] for p_title in priority_movie_titles)]
other_list_items = [item for item in view2_list if item not in priority_list]
priority_list.sort(key=lambda x: x['assert_name'])
other_list_items.sort(key=lambda x: x['assert_name'])
final_sorted_list = priority_list + other_list_items
return dict(sorted(by_hall.items())), final_sorted_list
def get_circled_number(hall_name):
mapping = {'1': '①', '2': '②', '3': '③', '4': '④', '5': '⑤', '6': '⑥', '7': '⑦', '8': '⑧', '9': '⑨'}
num_str = ''.join(filter(str.isdigit, hall_name))
return mapping.get(num_str, '')
def format_play_time(time_str):
if not time_str or not isinstance(time_str, str): return None
try:
parts = time_str.split(':');
hours = int(parts[0]);
minutes = int(parts[1])
return hours * 60 + minutes
except (ValueError, IndexError):
return None
# --- NEW Helper function to add TMS location column ---
def add_tms_locations_to_analysis(analysis_df, tms_movie_list):
locations = []
for index, row in analysis_df.iterrows():
movie_title = row['影片']
found_versions = []
for tms_movie in tms_movie_list:
if movie_title in tms_movie['assert_name']:
# Extract version name by removing the base title
version_name = tms_movie['assert_name'].replace(movie_title, '').strip()
circled_halls = " ".join(sorted([get_circled_number(h) for h in tms_movie['halls']]))
found_versions.append(f"{version_name}{circled_halls}")
locations.append('|'.join(found_versions))
analysis_df['影片所在影厅位置'] = locations
return analysis_df
# --- Streamlit Main UI ---
st.title('影城排片效率与内容分析工具')
st.write("上传 `影片映出日累计报表.xlsx` 进行效率分析,或点击下方按钮查询 TMS 服务器影片内容。")
uploaded_file = st.file_uploader("请在此处上传 Excel 文件", type=['xlsx', 'xls', 'csv'])
# NEW: Checkbox for the new feature
query_tms_for_location = st.checkbox("查询 TMS 找影片所在影厅")
if uploaded_file is not None:
try:
df = pd.read_excel(uploaded_file, skiprows=3, header=None)
df.rename(columns={0: '影片名称', 2: '放映时间', 5: '总人次', 6: '总收入', 7: '座位数'}, inplace=True)
required_cols = ['影片名称', '放映时间', '座位数', '总收入', '总人次']
df = df[required_cols]
df.dropna(subset=['影片名称', '放映时间'], inplace=True)
for col in ['座位数', '总收入', '总人次']:
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
df['放映时间'] = pd.to_datetime(df['放映时间'], format='%H:%M:%S', errors='coerce').dt.time
df.dropna(subset=['放映时间'], inplace=True)
df['影片名称_清理后'] = df['影片名称'].apply(clean_movie_title)
st.toast("文件上传成功,效率分析已生成!", icon="🎉")
format_config = {'座位数': '{:,.0f}', '场次': '{:,.0f}', '人次': '{:,.0f}', '票房': '{:,.2f}', '均价': '{:.2f}',
'座次比': '{:.2%}', '场次比': '{:.2%}', '票房比': '{:.2%}', '座次效率': '{:.2f}',
'场次效率': '{:.2f}'}
full_day_analysis = process_and_analyze_data(df.copy())
prime_time_analysis = process_and_analyze_data(
df[df['放映时间'].between(pd.to_datetime('14:00:00').time(), pd.to_datetime('21:00:00').time())].copy())
# --- NEW LOGIC: If checkbox is ticked, fetch data and modify dataframes ---
if query_tms_for_location:
with st.spinner("正在关联查询 TMS 服务器..."):
_, tms_movie_list = fetch_and_process_server_movies()
full_day_analysis = add_tms_locations_to_analysis(full_day_analysis, tms_movie_list)
prime_time_analysis = add_tms_locations_to_analysis(prime_time_analysis, tms_movie_list)
st.toast("TMS 影片位置关联成功!", icon="🔗")
st.markdown("### 全天排片效率分析")
if not full_day_analysis.empty:
st.dataframe(
full_day_analysis.style.format(format_config),
use_container_width=True, hide_index=True)
st.markdown("#### 黄金时段排片效率分析 (14:00-21:00)")
if not prime_time_analysis.empty:
st.dataframe(
prime_time_analysis.style.format(format_config),
use_container_width=True, hide_index=True)
if not full_day_analysis.empty:
st.markdown("##### 复制当日排片列表")
movie_titles = full_day_analysis['影片'].tolist()
formatted_titles = ''.join([f'《{title}》' for title in movie_titles])
st.code(formatted_titles, language='text')
except Exception as e:
st.error(f"处理文件时出错: {e}")
st.divider()
st.markdown("### TMS 服务器影片内容查询")
if st.button('点击查询 TMS 服务器'):
with st.spinner("正在从 TMS 服务器获取数据中..."):
try:
halls_data, movie_list_sorted = fetch_and_process_server_movies()
st.toast("TMS 服务器数据获取成功!", icon="🎉")
st.markdown("#### 按影片查看所在影厅")
view2_data = [{'影片名称': item['assert_name'],
'所在影厅': " ".join(sorted([get_circled_number(h) for h in item['halls']])),
'文件名': item['content_name'], '时长': format_play_time(item['play_time'])} for item in
movie_list_sorted]
df_view2 = pd.DataFrame(view2_data)
st.dataframe(df_view2, hide_index=True, use_container_width=True)
st.markdown("#### 按影厅查看影片内容")
hall_tabs = st.tabs(halls_data.keys())
for tab, hall_name in zip(hall_tabs, halls_data.keys()):
with tab:
view1_data_for_tab = [{'影片名称': item['details']['assert_name'], '所在影厅': " ".join(
sorted([get_circled_number(h) for h in item['details']['halls']])),
'文件名': item['content_name'],
'时长': format_play_time(item['details']['play_time'])} for item in
halls_data[hall_name]]
df_view1_tab = pd.DataFrame(view1_data_for_tab)
st.dataframe(df_view1_tab, hide_index=True, use_container_width=True)
except Exception as e:
st.error(f"查询服务器时出错: {e}")