mirror of
https://github.com/yuanwangokk-1/TV-BOX.git
synced 2025-10-24 23:11:21 +00:00
591 lines
20 KiB
Python
591 lines
20 KiB
Python
import json
|
||
import requests
|
||
import warnings
|
||
import re
|
||
import os
|
||
import time
|
||
from urllib3.exceptions import InsecureRequestWarning
|
||
from copy import deepcopy
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
|
||
# 自定义 jsm.json 的路径或网络地址,留空则使用当前目录下的 jsm.json
|
||
jsm_file_path = ""
|
||
|
||
# 读取 jsm.json 文件
|
||
jsm_data = {}
|
||
if jsm_file_path:
|
||
if jsm_file_path.startswith(("http://", "https://")):
|
||
try:
|
||
response = requests.get(jsm_file_path)
|
||
jsm_data = response.json()
|
||
except Exception as e:
|
||
print(f"从网络读取 jsm.json 配置文件失败: {str(e)}")
|
||
else:
|
||
if os.path.exists(jsm_file_path):
|
||
try:
|
||
with open(jsm_file_path, 'r', encoding='utf-8') as f:
|
||
jsm_data = json.load(f)
|
||
except Exception as e:
|
||
print(f"读取本地 jsm.json 配置文件失败: {str(e)}")
|
||
else:
|
||
print(f"本地 jsm.json 文件 {jsm_file_path} 不存在")
|
||
else:
|
||
local_path = os.path.join(os.getcwd(), 'jsm.json')
|
||
if os.path.exists(local_path):
|
||
try:
|
||
with open(local_path, 'r', encoding='utf-8') as f:
|
||
jsm_data = json.load(f)
|
||
except Exception as e:
|
||
print(f"读取默认 jsm.json 配置文件失败: {str(e)}")
|
||
else:
|
||
print("默认的 jsm.json 文件不存在")
|
||
|
||
# 站点映射关系
|
||
site_mappings = {
|
||
'立播': 'libo', '闪电':'shandian', '欧哥': 'ouge', '小米': 'xiaomi', '多多': 'duoduo',
|
||
'蜡笔': 'labi', '至臻': 'zhizhen', '木偶':'mogg', '六趣': 'liuqu', '虎斑': 'huban',
|
||
'下饭': 'xiafan', '玩偶': 'wogg', '星剧社':'star2', '二小': 'xhww'
|
||
}
|
||
|
||
# 代理配置
|
||
proxy_config = {
|
||
"enabled": False,
|
||
"proxies": {
|
||
"http": "http://127.0.0.1:7890",
|
||
"https": "http://127.0.0.1:7890"
|
||
}
|
||
}
|
||
|
||
# 文件路径配置
|
||
file_path_config = {
|
||
"input_dir": "",
|
||
"output_dir": ""
|
||
}
|
||
|
||
# 新增jsm映射配置
|
||
jsm_mapping = {
|
||
"Libvio": "libo",
|
||
"Xiaomi": "xiaomi",
|
||
"yydsys": "duoduo",
|
||
"蜡笔网盘": "labi",
|
||
"玩偶 | 蜡笔": "labi",
|
||
"至臻|网盘": "zhizhen",
|
||
"Huban": "huban",
|
||
"Wogg": "wogg",
|
||
"Mogg": "mogg",
|
||
"玩偶 | 闪电uc": "shandian",
|
||
"玩偶 | 二小": "xhww",
|
||
"玩偶 | 小米": "xiaomi",
|
||
"玩偶 | 多多": "duoduo",
|
||
"玩偶 | 木偶": "mogg",
|
||
"玩偶gg": "wogg",
|
||
"星剧社": "star2"
|
||
}
|
||
|
||
# 需要拼接搜索路径的站点配置
|
||
search_path_config = {
|
||
'闪电': '/index.php/vod/search.html?wd=仙台有树',
|
||
'欧哥': '/index.php/vod/search.html?wd=仙台有树',
|
||
'小米': '/index.php/vod/search.html?wd=仙台有树',
|
||
'多多': '/index.php/vod/search.html?wd=仙台有树',
|
||
'蜡笔': '/index.php/vod/search.html?wd=仙台有树',
|
||
'至臻': '/index.php/vod/search.html?wd=仙台有树',
|
||
'六趣': '/index.php/vod/search.html?wd=仙台有树',
|
||
'虎斑': '/index.php/vod/search.html?wd=仙台有树',
|
||
'下饭': '/index.php/vod/search.html?wd=仙台有树',
|
||
'玩偶': '/vodsearch/-------------.html?wd=仙台有树',
|
||
'木偶': '/index.php/vod/search.html?wd=仙台有树',
|
||
'二小': '/index.php/vod/search.html?wd=仙台有树',
|
||
'立播': '/search/-------------.html?wd=仙台有树&submit='
|
||
}
|
||
|
||
# 定义需要校验关键字的站点及其关键字
|
||
keyword_required_sites = {
|
||
'闪电': 'class="search-stat"',
|
||
'欧哥': 'class="search-stat"',
|
||
'小米': 'class="search-stat"',
|
||
'多多': 'class="search-stat"',
|
||
'蜡笔': 'class="search-stat"',
|
||
'至臻': 'class="search-stat"',
|
||
'六趣': 'class="search-stat"',
|
||
'虎斑': 'class="search-stat"',
|
||
'下饭': 'class="search-stat"',
|
||
'玩偶': 'class="search-stat"',
|
||
'木偶': 'class="search-stat"',
|
||
'二小': 'class="search-stat"',
|
||
'立播': 'class="stui-screen"'
|
||
}
|
||
|
||
# 新增可选的URL加权配置,默认权重为50
|
||
url_weight_config = {
|
||
"木偶": {
|
||
"https://aliii.deno.dev": 60,
|
||
"http://149.88.87.72:5666": 60
|
||
},
|
||
"至臻": {
|
||
"http://www.xhww.net": 10,
|
||
"http://xhww.net": 10
|
||
},
|
||
"立播": {
|
||
"https://libvio.mov": 60,
|
||
"https://www.libvio.cc": 60
|
||
}
|
||
}
|
||
|
||
# 兜底URL配置
|
||
fallback_url_config = {
|
||
"立播": [
|
||
"https://libvio.mov",
|
||
"https://www.libvio.cc",
|
||
"https://libvio.la",
|
||
"https://libvio.pro",
|
||
"https://libvio.fun",
|
||
"https://libvio.me",
|
||
"https://libvio.in",
|
||
"https://libvio.site",
|
||
"https://libvio.art",
|
||
"https://libvio.com",
|
||
"https://libvio.vip",
|
||
"https://libvio.pw",
|
||
"https://libvio.link"
|
||
],
|
||
"闪电": [
|
||
"http://1.95.79.193",
|
||
"http://1.95.79.193:666"
|
||
],
|
||
"欧哥": [
|
||
"https://woog.nxog.eu.org"
|
||
],
|
||
"小米": [
|
||
"http://www.54271.fun",
|
||
"https://www.milvdou.fun",
|
||
"http://www.54271.fun",
|
||
"https://www.mucpan.cc",
|
||
"https://mucpan.cc",
|
||
"http://milvdou.fun"
|
||
],
|
||
"多多": [
|
||
"https://tv.yydsys.top",
|
||
"https://tv.yydsys.cc",
|
||
"https://tv.214521.xyz",
|
||
"http://155.248.200.65"
|
||
],
|
||
"蜡笔": [
|
||
"http://feimaoai.site",
|
||
"https://feimao666.fun",
|
||
"http://feimao888.fun"
|
||
],
|
||
"至臻": [
|
||
"https://mihdr.top",
|
||
"http://www.miqk.cc",
|
||
"http://www.xhww.net",
|
||
"http://xhww.net",
|
||
"https://xiaomiai.site"
|
||
],
|
||
"六趣": [
|
||
"https://wp.0v.fit"
|
||
],
|
||
"虎斑": [
|
||
"http://103.45.162.207:20720"
|
||
],
|
||
"下饭": [
|
||
"http://txfpan.top",
|
||
"http://www.xn--ghqy10g1w0a.xyz"
|
||
],
|
||
"玩偶": [
|
||
"https://wogg.xxooo.cf",
|
||
"https://wogg.333232.xyz",
|
||
"https://www.wogg.one",
|
||
"https://www.wogg.lol",
|
||
"https://www.wogg.net"
|
||
],
|
||
"木偶": [
|
||
"https://tv.91muou.icu",
|
||
"https://mo.666291.xyz",
|
||
"https://mo.muouso.fun",
|
||
"https://aliii.deno.dev",
|
||
"http://149.88.87.72:5666"
|
||
],
|
||
"星剧社": [
|
||
"https://mlink.cc/520TV"
|
||
],
|
||
"二小": [
|
||
"https://xhww.net",
|
||
"https://www.xhww.net"
|
||
]
|
||
}
|
||
|
||
# 全局状态
|
||
last_site = None
|
||
|
||
|
||
def log_message(message, site_name=None, step="", max_error_length=80):
|
||
"""格式化日志打印"""
|
||
global last_site
|
||
|
||
status_emojis = {
|
||
'[开始]': '🚀', '[成功]': '✅', '[完成]': '🎉', '[失败]': '❌',
|
||
'[超时]': '⏳', '[警告]': '⚠️', '[错误]': '🚨', '[信息]': 'ℹ️',
|
||
'[选择]': '🔍', '[连接失败]': '🔌'
|
||
}
|
||
|
||
if site_name and site_name != last_site:
|
||
print(f"\n{'✨ ' + '=' * 38 + ' ✨'}")
|
||
print(f"🌐 [站点: {site_name}]")
|
||
print(f"{'✨ ' + '=' * 38 + ' ✨'}")
|
||
last_site = site_name
|
||
|
||
for status, emoji in status_emojis.items():
|
||
if status in message:
|
||
message = message.replace(status, f"{status} {emoji}")
|
||
break
|
||
else:
|
||
message = f"{message} 📢"
|
||
|
||
# 截断过长的错误信息
|
||
if "[连接失败]" in message or "[错误]" in message:
|
||
if len(message) > max_error_length:
|
||
message = message[:max_error_length] + "..."
|
||
|
||
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] [{step}] {message}") if step else print(message)
|
||
|
||
|
||
def test_url(url, site_name=None):
|
||
"""增强版URL测试函数"""
|
||
search_path = search_path_config.get(site_name)
|
||
test_url = url.strip() + search_path if search_path else url.strip()
|
||
keyword = keyword_required_sites.get(site_name)
|
||
|
||
session = requests.Session()
|
||
adapter = requests.adapters.HTTPAdapter(max_retries=2)
|
||
session.mount('http://', adapter)
|
||
session.mount('https://', adapter)
|
||
|
||
try:
|
||
# 直接请求测试
|
||
response = session.get(
|
||
test_url,
|
||
timeout=7,
|
||
verify=False,
|
||
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'}
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
latency = response.elapsed.total_seconds()
|
||
has_keyword = keyword in response.text if keyword else True
|
||
|
||
log_msg = f"直接访问成功 | 延迟: {latency:.2f}s"
|
||
if keyword:
|
||
log_msg += f" | 关键字: {'✅' if has_keyword else '❌'}"
|
||
|
||
log_message(f"[成功] {test_url} {log_msg}", site_name, "URL测试")
|
||
return latency, has_keyword
|
||
|
||
log_message(f"[失败] HTTP状态码 {response.status_code}", site_name, "URL测试")
|
||
return None, None
|
||
|
||
except requests.RequestException as e:
|
||
error_type = "[超时]" if isinstance(e, requests.Timeout) else "[连接失败]"
|
||
log_message(f"{error_type} {str(e)}", site_name, "URL测试")
|
||
|
||
# 代理重试逻辑
|
||
if proxy_config["enabled"]:
|
||
try:
|
||
response = session.get(
|
||
test_url,
|
||
timeout=7,
|
||
verify=False,
|
||
proxies=proxy_config["proxies"],
|
||
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'}
|
||
)
|
||
if response.status_code == 200:
|
||
latency = response.elapsed.total_seconds()
|
||
has_keyword = keyword in response.text if keyword else True
|
||
log_message(f"[成功] 代理访问成功 | 延迟: {latency:.2f}s | 关键字: {'✅' if has_keyword else '❌'}",
|
||
site_name, "URL测试")
|
||
return latency, has_keyword
|
||
except Exception as proxy_e:
|
||
log_message(f"[失败] 代理访问错误: {str(proxy_e)}", site_name, "URL测试")
|
||
|
||
return None, None
|
||
|
||
|
||
def get_best_url(urls, site_name=None, existing_url=None):
|
||
"""优化后的URL选择算法"""
|
||
if not isinstance(urls, list):
|
||
return urls
|
||
|
||
weights = url_weight_config.get(site_name, {})
|
||
default_weight = 50
|
||
sorted_urls = sorted([(url, weights.get(url, default_weight)) for url in urls],
|
||
key=lambda x: -x[1])
|
||
|
||
def test_single_url(url_weight):
|
||
url, weight = url_weight
|
||
latency, has_keyword = test_url(url, site_name)
|
||
if latency is not None:
|
||
return {
|
||
"url": url,
|
||
"latency": latency,
|
||
"has_keyword": has_keyword,
|
||
"weight": weight,
|
||
"score": (weight * 0.6) + ((1 / (latency + 0.1)) * 40)
|
||
}
|
||
return None
|
||
|
||
with ThreadPoolExecutor() as executor:
|
||
candidates = [result for result in executor.map(test_single_url, sorted_urls) if result]
|
||
|
||
if not candidates:
|
||
log_message(f"[警告] 无可用URL,使用现有配置: {existing_url}" if existing_url else
|
||
"[错误] 无可用URL且无历史配置", site_name, "URL选择")
|
||
return existing_url if existing_url else None
|
||
|
||
# 按评分排序:关键字存在 > 评分 > 延迟
|
||
sorted_candidates = sorted(candidates,
|
||
key=lambda x: (-x['has_keyword'], -x['score'], x['latency']))
|
||
|
||
log_message("候选URL评估结果:\n" + "\n".join(
|
||
[f"{item['url']} | 权重:{item['weight']} 延迟:{item['latency']:.2f}s 评分:{item['score']:.1f}"
|
||
for item in sorted_candidates]), site_name, "URL选择")
|
||
|
||
best = sorted_candidates[0]
|
||
log_message(f"[选择] 最优URL: {best['url']} (评分: {best['score']:.1f})", site_name, "URL选择")
|
||
return best['url']
|
||
|
||
|
||
def get_star2_real_url(source_url):
|
||
"""改进的星剧社真实URL提取"""
|
||
try:
|
||
response = requests.get(
|
||
source_url,
|
||
timeout=8,
|
||
verify=False,
|
||
headers={'Referer': 'https://mlink.cc/'}
|
||
)
|
||
if response.status_code == 200:
|
||
# 增强版正则匹配
|
||
match = re.search(
|
||
r'''(?i)(?:href|src|data-?url)=["'](https?://[^"']*?star2\.cn[^"']*)["']''',
|
||
response.text
|
||
)
|
||
if match:
|
||
real_url = match.group(1).strip().rstrip('/')
|
||
log_message(f"[成功] 提取真实链接: {real_url}", "星剧社", "链接解析")
|
||
return real_url
|
||
log_message("[失败] 未找到有效链接", "星剧社", "链接解析")
|
||
except Exception as e:
|
||
log_message(f"[错误] 解析失败: {str(e)}", "星剧社", "链接解析")
|
||
return None
|
||
|
||
|
||
def merge_url_data(*dicts):
|
||
"""数据合并去重"""
|
||
merged = {}
|
||
for d in dicts:
|
||
if not d: continue
|
||
for site, urls in d.items():
|
||
merged.setdefault(site, []).extend(urls if isinstance(urls, list) else [urls])
|
||
return {k: list(dict.fromkeys(v)) for k, v in merged.items()}
|
||
|
||
|
||
def get_file_path(filename, is_input=True):
|
||
"""路径处理函数"""
|
||
base_dir = file_path_config.get("input_dir" if is_input else "output_dir", "")
|
||
return os.path.join(base_dir or os.getcwd(), filename)
|
||
|
||
|
||
def load_existing_config():
|
||
"""加载现有url.json配置"""
|
||
url_path = get_file_path('url.json')
|
||
if os.path.exists(url_path):
|
||
try:
|
||
with open(url_path, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
except Exception as e:
|
||
log_message(f"[错误] 读取现有配置失败: {str(e)}", step="配置加载")
|
||
return {}
|
||
|
||
|
||
def get_api_urls():
|
||
"""从本地文件获取链接"""
|
||
API_FILE_PATH = get_file_path('url.json')
|
||
try:
|
||
with open(API_FILE_PATH, 'r', encoding='utf-8') as f:
|
||
api_data = json.load(f)
|
||
print("成功读取 url.json 文件")
|
||
|
||
# 基于 jsm_mapping 生成 url_mapping
|
||
url_mapping = {key: api_data.get(value) for key, value in jsm_mapping.items()}
|
||
|
||
print("生成的 url_mapping:", url_mapping)
|
||
return url_mapping
|
||
except FileNotFoundError:
|
||
print("未找到 url.json 文件,请检查文件路径。")
|
||
except json.JSONDecodeError:
|
||
print("url.json 文件格式错误,请检查文件内容。")
|
||
return {}
|
||
|
||
|
||
def replace_urls(data, urls):
|
||
"""替换 JSON 数据中的 URL"""
|
||
# 根据 jsm_mapping 转换 api_urls
|
||
api_urls = {
|
||
jsm_key: urls.get(jsm_value)
|
||
for jsm_key, jsm_value in jsm_mapping.items()
|
||
}
|
||
|
||
sites = data.get('sites', [])
|
||
replaced_count = 0
|
||
|
||
for item in sites:
|
||
if isinstance(item, dict):
|
||
key = item.get('key')
|
||
ext = item.get('ext')
|
||
new_url = api_urls.get(key)
|
||
old_url = None
|
||
|
||
if new_url and isinstance(ext, str):
|
||
parts = ext.split('$$$')
|
||
if len(parts) > 1 and parts[1].strip().startswith('http'):
|
||
old_url = parts[1]
|
||
parts[1] = new_url
|
||
item['ext'] = '$$$'.join(parts)
|
||
replaced_count += 1
|
||
print(f"成功替换 {key} 的链接: {old_url} -> {new_url}")
|
||
if 'url' in item:
|
||
del item['url'] # 删除 url 字段
|
||
|
||
if old_url and not new_url:
|
||
print(f"未成功替换 {key} 的链接,原链接: {old_url}")
|
||
else:
|
||
print(f"跳过非字典类型的 item: {item}")
|
||
|
||
print(f"总共替换了 {replaced_count} 个链接。")
|
||
return data
|
||
|
||
|
||
def update_jsm_config(urls):
|
||
"""更新jsm.json配置文件中的URL"""
|
||
global jsm_data
|
||
if not jsm_data:
|
||
log_message("[错误] jsm_data 为空,无法更新配置", step="配置更新")
|
||
return False
|
||
|
||
updated_jsm_data = replace_urls(deepcopy(jsm_data), urls)
|
||
|
||
try:
|
||
jsm_output_path = get_file_path('jsm.json', is_input=False)
|
||
os.makedirs(os.path.dirname(jsm_output_path), exist_ok=True)
|
||
with open(jsm_output_path, 'w', encoding='utf-8') as f:
|
||
json.dump(updated_jsm_data, f, ensure_ascii=False, indent=4)
|
||
|
||
log_message("[完成] jsm.json 配置文件更新成功", step="配置更新")
|
||
return True
|
||
except Exception as e:
|
||
log_message(f"[错误] 更新 jsm.json 配置文件失败: {str(e)}", step="配置更新")
|
||
return False
|
||
|
||
|
||
def process_urls():
|
||
"""核心处理流程"""
|
||
log_message("[开始] 启动URL更新流程", step="主流程")
|
||
|
||
# 加载现有配置
|
||
existing_config = load_existing_config()
|
||
reverse_site_mapping = {v: k for k, v in site_mappings.items()}
|
||
|
||
# 数据源处理
|
||
data_sources = []
|
||
try:
|
||
remote_data = requests.get(
|
||
'https://github.catvod.com/https://raw.githubusercontent.com/celin1286/xiaosa/main/yuan.json',
|
||
timeout=10
|
||
).json()
|
||
data_sources.append(remote_data)
|
||
log_message("[成功] 远程数据加载完成", step="数据收集")
|
||
except Exception as e:
|
||
log_message(f"[错误] 远程数据获取失败: {str(e)}", step="数据收集")
|
||
|
||
local_path = get_file_path('yuan.json')
|
||
if os.path.exists(local_path):
|
||
try:
|
||
with open(local_path, 'r', encoding='utf-8') as f:
|
||
data_sources.append(json.load(f))
|
||
log_message("[成功] 本地数据加载完成", step="数据收集")
|
||
except Exception as e:
|
||
log_message(f"[错误] 本地数据读取失败: {str(e)}", step="数据收集")
|
||
|
||
data_sources.append(fallback_url_config)
|
||
merged_data = merge_url_data(*data_sources)
|
||
|
||
# 结果存储
|
||
result = {'url': {}}
|
||
stats = {'total': 0,'success': 0, 'failed': [], 'changed': []}
|
||
|
||
for cn_name, urls in merged_data.items():
|
||
stats['total'] += 1
|
||
site_key = site_mappings.get(cn_name)
|
||
existing_url = existing_config.get(site_key, '')
|
||
|
||
if cn_name == '星剧社':
|
||
best_source = get_best_url(urls, cn_name, existing_url)
|
||
final_url = get_star2_real_url(best_source) if best_source else existing_url
|
||
else:
|
||
final_url = get_best_url(urls, cn_name, existing_url) or existing_url
|
||
|
||
if final_url:
|
||
result['url'][site_key] = final_url
|
||
if existing_url and existing_url != final_url:
|
||
stats['changed'].append(f"{cn_name}: {existing_url} → {final_url}")
|
||
log_message(f"[更新] 配置变更检测", cn_name, "结果处理")
|
||
stats['success'] += 1
|
||
else:
|
||
stats['failed'].append(cn_name)
|
||
log_message("[警告] 无可用URL", cn_name, "结果处理")
|
||
|
||
# 文件保存
|
||
output_files = {
|
||
'yuan.json': merged_data,
|
||
'url.json': result['url']
|
||
}
|
||
|
||
for filename, data in output_files.items():
|
||
try:
|
||
path = get_file_path(filename, is_input=False)
|
||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||
with open(path, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
log_message(f"[成功] 保存文件: {path}", step="数据持久化")
|
||
except Exception as e:
|
||
log_message(f"[错误] 文件保存失败: {str(e)}", step="数据持久化")
|
||
|
||
# 新增jsm更新流程
|
||
log_message("[开始] 启动jsm配置更新", step="主流程")
|
||
update_success = update_jsm_config(result['url'])
|
||
log_message(
|
||
f"[{'成功' if update_success else '失败'}] jsm配置更新完成",
|
||
step="主流程"
|
||
)
|
||
|
||
# 统计报告
|
||
log_message(
|
||
f"[完成] 处理结果: {stats['success']}/{stats['total']} 成功\n"
|
||
f"url.json变更项 ({len(stats['changed'])}):\n" + "\n".join(stats['changed']) + "\n"
|
||
f"url.json失败项 ({len(stats['failed'])}): {', '.join(stats['failed']) if stats['failed'] else '无'}",
|
||
step="统计报告"
|
||
)
|
||
return stats['success'] > 0
|
||
|
||
|
||
def main():
|
||
warnings.simplefilter('ignore', InsecureRequestWarning)
|
||
process_urls()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
start_time = time.time()
|
||
main()
|
||
elapsed = time.time() - start_time
|
||
print(f"总耗时: {elapsed:.2f}秒")
|
||
|