TV-BOX/tvbox/江湖/py/upurl.py
yuanwangokk-1 7fb1457e88 main
2025-10-14 22:54:33 +08:00

591 lines
20 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import requests
import warnings
import re
import os
import time
from urllib3.exceptions import InsecureRequestWarning
from copy import deepcopy
from concurrent.futures import ThreadPoolExecutor
# 自定义 jsm.json 的路径或网络地址,留空则使用当前目录下的 jsm.json
jsm_file_path = ""
# 读取 jsm.json 文件
jsm_data = {}
if jsm_file_path:
if jsm_file_path.startswith(("http://", "https://")):
try:
response = requests.get(jsm_file_path)
jsm_data = response.json()
except Exception as e:
print(f"从网络读取 jsm.json 配置文件失败: {str(e)}")
else:
if os.path.exists(jsm_file_path):
try:
with open(jsm_file_path, 'r', encoding='utf-8') as f:
jsm_data = json.load(f)
except Exception as e:
print(f"读取本地 jsm.json 配置文件失败: {str(e)}")
else:
print(f"本地 jsm.json 文件 {jsm_file_path} 不存在")
else:
local_path = os.path.join(os.getcwd(), 'jsm.json')
if os.path.exists(local_path):
try:
with open(local_path, 'r', encoding='utf-8') as f:
jsm_data = json.load(f)
except Exception as e:
print(f"读取默认 jsm.json 配置文件失败: {str(e)}")
else:
print("默认的 jsm.json 文件不存在")
# 站点映射关系
site_mappings = {
'立播': 'libo', '闪电':'shandian', '欧哥': 'ouge', '小米': 'xiaomi', '多多': 'duoduo',
'蜡笔': 'labi', '至臻': 'zhizhen', '木偶':'mogg', '六趣': 'liuqu', '虎斑': 'huban',
'下饭': 'xiafan', '玩偶': 'wogg', '星剧社':'star2', '二小': 'xhww'
}
# 代理配置
proxy_config = {
"enabled": False,
"proxies": {
"http": "http://127.0.0.1:7890",
"https": "http://127.0.0.1:7890"
}
}
# 文件路径配置
file_path_config = {
"input_dir": "",
"output_dir": ""
}
# 新增jsm映射配置
jsm_mapping = {
"Libvio": "libo",
"Xiaomi": "xiaomi",
"yydsys": "duoduo",
"蜡笔网盘": "labi",
"玩偶 | 蜡笔": "labi",
"至臻|网盘": "zhizhen",
"Huban": "huban",
"Wogg": "wogg",
"Mogg": "mogg",
"玩偶 | 闪电uc": "shandian",
"玩偶 | 二小": "xhww",
"玩偶 | 小米": "xiaomi",
"玩偶 | 多多": "duoduo",
"玩偶 | 木偶": "mogg",
"玩偶gg": "wogg",
"星剧社": "star2"
}
# 需要拼接搜索路径的站点配置
search_path_config = {
'闪电': '/index.php/vod/search.html?wd=仙台有树',
'欧哥': '/index.php/vod/search.html?wd=仙台有树',
'小米': '/index.php/vod/search.html?wd=仙台有树',
'多多': '/index.php/vod/search.html?wd=仙台有树',
'蜡笔': '/index.php/vod/search.html?wd=仙台有树',
'至臻': '/index.php/vod/search.html?wd=仙台有树',
'六趣': '/index.php/vod/search.html?wd=仙台有树',
'虎斑': '/index.php/vod/search.html?wd=仙台有树',
'下饭': '/index.php/vod/search.html?wd=仙台有树',
'玩偶': '/vodsearch/-------------.html?wd=仙台有树',
'木偶': '/index.php/vod/search.html?wd=仙台有树',
'二小': '/index.php/vod/search.html?wd=仙台有树',
'立播': '/search/-------------.html?wd=仙台有树&submit='
}
# 定义需要校验关键字的站点及其关键字
keyword_required_sites = {
'闪电': 'class="search-stat"',
'欧哥': 'class="search-stat"',
'小米': 'class="search-stat"',
'多多': 'class="search-stat"',
'蜡笔': 'class="search-stat"',
'至臻': 'class="search-stat"',
'六趣': 'class="search-stat"',
'虎斑': 'class="search-stat"',
'下饭': 'class="search-stat"',
'玩偶': 'class="search-stat"',
'木偶': 'class="search-stat"',
'二小': 'class="search-stat"',
'立播': 'class="stui-screen"'
}
# 新增可选的URL加权配置默认权重为50
url_weight_config = {
"木偶": {
"https://aliii.deno.dev": 60,
"http://149.88.87.72:5666": 60
},
"至臻": {
"http://www.xhww.net": 10,
"http://xhww.net": 10
},
"立播": {
"https://libvio.mov": 60,
"https://www.libvio.cc": 60
}
}
# 兜底URL配置
fallback_url_config = {
"立播": [
"https://libvio.mov",
"https://www.libvio.cc",
"https://libvio.la",
"https://libvio.pro",
"https://libvio.fun",
"https://libvio.me",
"https://libvio.in",
"https://libvio.site",
"https://libvio.art",
"https://libvio.com",
"https://libvio.vip",
"https://libvio.pw",
"https://libvio.link"
],
"闪电": [
"http://1.95.79.193",
"http://1.95.79.193:666"
],
"欧哥": [
"https://woog.nxog.eu.org"
],
"小米": [
"http://www.54271.fun",
"https://www.milvdou.fun",
"http://www.54271.fun",
"https://www.mucpan.cc",
"https://mucpan.cc",
"http://milvdou.fun"
],
"多多": [
"https://tv.yydsys.top",
"https://tv.yydsys.cc",
"https://tv.214521.xyz",
"http://155.248.200.65"
],
"蜡笔": [
"http://feimaoai.site",
"https://feimao666.fun",
"http://feimao888.fun"
],
"至臻": [
"https://mihdr.top",
"http://www.miqk.cc",
"http://www.xhww.net",
"http://xhww.net",
"https://xiaomiai.site"
],
"六趣": [
"https://wp.0v.fit"
],
"虎斑": [
"http://103.45.162.207:20720"
],
"下饭": [
"http://txfpan.top",
"http://www.xn--ghqy10g1w0a.xyz"
],
"玩偶": [
"https://wogg.xxooo.cf",
"https://wogg.333232.xyz",
"https://www.wogg.one",
"https://www.wogg.lol",
"https://www.wogg.net"
],
"木偶": [
"https://tv.91muou.icu",
"https://mo.666291.xyz",
"https://mo.muouso.fun",
"https://aliii.deno.dev",
"http://149.88.87.72:5666"
],
"星剧社": [
"https://mlink.cc/520TV"
],
"二小": [
"https://xhww.net",
"https://www.xhww.net"
]
}
# 全局状态
last_site = None
def log_message(message, site_name=None, step="", max_error_length=80):
"""格式化日志打印"""
global last_site
status_emojis = {
'[开始]': '🚀', '[成功]': '', '[完成]': '🎉', '[失败]': '',
'[超时]': '', '[警告]': '⚠️', '[错误]': '🚨', '[信息]': '',
'[选择]': '🔍', '[连接失败]': '🔌'
}
if site_name and site_name != last_site:
print(f"\n{'' + '=' * 38 + ''}")
print(f"🌐 [站点: {site_name}]")
print(f"{'' + '=' * 38 + ''}")
last_site = site_name
for status, emoji in status_emojis.items():
if status in message:
message = message.replace(status, f"{status} {emoji}")
break
else:
message = f"{message} 📢"
# 截断过长的错误信息
if "[连接失败]" in message or "[错误]" in message:
if len(message) > max_error_length:
message = message[:max_error_length] + "..."
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] [{step}] {message}") if step else print(message)
def test_url(url, site_name=None):
"""增强版URL测试函数"""
search_path = search_path_config.get(site_name)
test_url = url.strip() + search_path if search_path else url.strip()
keyword = keyword_required_sites.get(site_name)
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(max_retries=2)
session.mount('http://', adapter)
session.mount('https://', adapter)
try:
# 直接请求测试
response = session.get(
test_url,
timeout=7,
verify=False,
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'}
)
if response.status_code == 200:
latency = response.elapsed.total_seconds()
has_keyword = keyword in response.text if keyword else True
log_msg = f"直接访问成功 | 延迟: {latency:.2f}s"
if keyword:
log_msg += f" | 关键字: {'' if has_keyword else ''}"
log_message(f"[成功] {test_url} {log_msg}", site_name, "URL测试")
return latency, has_keyword
log_message(f"[失败] HTTP状态码 {response.status_code}", site_name, "URL测试")
return None, None
except requests.RequestException as e:
error_type = "[超时]" if isinstance(e, requests.Timeout) else "[连接失败]"
log_message(f"{error_type} {str(e)}", site_name, "URL测试")
# 代理重试逻辑
if proxy_config["enabled"]:
try:
response = session.get(
test_url,
timeout=7,
verify=False,
proxies=proxy_config["proxies"],
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'}
)
if response.status_code == 200:
latency = response.elapsed.total_seconds()
has_keyword = keyword in response.text if keyword else True
log_message(f"[成功] 代理访问成功 | 延迟: {latency:.2f}s | 关键字: {'' if has_keyword else ''}",
site_name, "URL测试")
return latency, has_keyword
except Exception as proxy_e:
log_message(f"[失败] 代理访问错误: {str(proxy_e)}", site_name, "URL测试")
return None, None
def get_best_url(urls, site_name=None, existing_url=None):
"""优化后的URL选择算法"""
if not isinstance(urls, list):
return urls
weights = url_weight_config.get(site_name, {})
default_weight = 50
sorted_urls = sorted([(url, weights.get(url, default_weight)) for url in urls],
key=lambda x: -x[1])
def test_single_url(url_weight):
url, weight = url_weight
latency, has_keyword = test_url(url, site_name)
if latency is not None:
return {
"url": url,
"latency": latency,
"has_keyword": has_keyword,
"weight": weight,
"score": (weight * 0.6) + ((1 / (latency + 0.1)) * 40)
}
return None
with ThreadPoolExecutor() as executor:
candidates = [result for result in executor.map(test_single_url, sorted_urls) if result]
if not candidates:
log_message(f"[警告] 无可用URL使用现有配置: {existing_url}" if existing_url else
"[错误] 无可用URL且无历史配置", site_name, "URL选择")
return existing_url if existing_url else None
# 按评分排序:关键字存在 > 评分 > 延迟
sorted_candidates = sorted(candidates,
key=lambda x: (-x['has_keyword'], -x['score'], x['latency']))
log_message("候选URL评估结果:\n" + "\n".join(
[f"{item['url']} | 权重:{item['weight']} 延迟:{item['latency']:.2f}s 评分:{item['score']:.1f}"
for item in sorted_candidates]), site_name, "URL选择")
best = sorted_candidates[0]
log_message(f"[选择] 最优URL: {best['url']} (评分: {best['score']:.1f})", site_name, "URL选择")
return best['url']
def get_star2_real_url(source_url):
"""改进的星剧社真实URL提取"""
try:
response = requests.get(
source_url,
timeout=8,
verify=False,
headers={'Referer': 'https://mlink.cc/'}
)
if response.status_code == 200:
# 增强版正则匹配
match = re.search(
r'''(?i)(?:href|src|data-?url)=["'](https?://[^"']*?star2\.cn[^"']*)["']''',
response.text
)
if match:
real_url = match.group(1).strip().rstrip('/')
log_message(f"[成功] 提取真实链接: {real_url}", "星剧社", "链接解析")
return real_url
log_message("[失败] 未找到有效链接", "星剧社", "链接解析")
except Exception as e:
log_message(f"[错误] 解析失败: {str(e)}", "星剧社", "链接解析")
return None
def merge_url_data(*dicts):
"""数据合并去重"""
merged = {}
for d in dicts:
if not d: continue
for site, urls in d.items():
merged.setdefault(site, []).extend(urls if isinstance(urls, list) else [urls])
return {k: list(dict.fromkeys(v)) for k, v in merged.items()}
def get_file_path(filename, is_input=True):
"""路径处理函数"""
base_dir = file_path_config.get("input_dir" if is_input else "output_dir", "")
return os.path.join(base_dir or os.getcwd(), filename)
def load_existing_config():
"""加载现有url.json配置"""
url_path = get_file_path('url.json')
if os.path.exists(url_path):
try:
with open(url_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
log_message(f"[错误] 读取现有配置失败: {str(e)}", step="配置加载")
return {}
def get_api_urls():
"""从本地文件获取链接"""
API_FILE_PATH = get_file_path('url.json')
try:
with open(API_FILE_PATH, 'r', encoding='utf-8') as f:
api_data = json.load(f)
print("成功读取 url.json 文件")
# 基于 jsm_mapping 生成 url_mapping
url_mapping = {key: api_data.get(value) for key, value in jsm_mapping.items()}
print("生成的 url_mapping:", url_mapping)
return url_mapping
except FileNotFoundError:
print("未找到 url.json 文件,请检查文件路径。")
except json.JSONDecodeError:
print("url.json 文件格式错误,请检查文件内容。")
return {}
def replace_urls(data, urls):
"""替换 JSON 数据中的 URL"""
# 根据 jsm_mapping 转换 api_urls
api_urls = {
jsm_key: urls.get(jsm_value)
for jsm_key, jsm_value in jsm_mapping.items()
}
sites = data.get('sites', [])
replaced_count = 0
for item in sites:
if isinstance(item, dict):
key = item.get('key')
ext = item.get('ext')
new_url = api_urls.get(key)
old_url = None
if new_url and isinstance(ext, str):
parts = ext.split('$$$')
if len(parts) > 1 and parts[1].strip().startswith('http'):
old_url = parts[1]
parts[1] = new_url
item['ext'] = '$$$'.join(parts)
replaced_count += 1
print(f"成功替换 {key} 的链接: {old_url} -> {new_url}")
if 'url' in item:
del item['url'] # 删除 url 字段
if old_url and not new_url:
print(f"未成功替换 {key} 的链接,原链接: {old_url}")
else:
print(f"跳过非字典类型的 item: {item}")
print(f"总共替换了 {replaced_count} 个链接。")
return data
def update_jsm_config(urls):
"""更新jsm.json配置文件中的URL"""
global jsm_data
if not jsm_data:
log_message("[错误] jsm_data 为空,无法更新配置", step="配置更新")
return False
updated_jsm_data = replace_urls(deepcopy(jsm_data), urls)
try:
jsm_output_path = get_file_path('jsm.json', is_input=False)
os.makedirs(os.path.dirname(jsm_output_path), exist_ok=True)
with open(jsm_output_path, 'w', encoding='utf-8') as f:
json.dump(updated_jsm_data, f, ensure_ascii=False, indent=4)
log_message("[完成] jsm.json 配置文件更新成功", step="配置更新")
return True
except Exception as e:
log_message(f"[错误] 更新 jsm.json 配置文件失败: {str(e)}", step="配置更新")
return False
def process_urls():
"""核心处理流程"""
log_message("[开始] 启动URL更新流程", step="主流程")
# 加载现有配置
existing_config = load_existing_config()
reverse_site_mapping = {v: k for k, v in site_mappings.items()}
# 数据源处理
data_sources = []
try:
remote_data = requests.get(
'https://github.catvod.com/https://raw.githubusercontent.com/celin1286/xiaosa/main/yuan.json',
timeout=10
).json()
data_sources.append(remote_data)
log_message("[成功] 远程数据加载完成", step="数据收集")
except Exception as e:
log_message(f"[错误] 远程数据获取失败: {str(e)}", step="数据收集")
local_path = get_file_path('yuan.json')
if os.path.exists(local_path):
try:
with open(local_path, 'r', encoding='utf-8') as f:
data_sources.append(json.load(f))
log_message("[成功] 本地数据加载完成", step="数据收集")
except Exception as e:
log_message(f"[错误] 本地数据读取失败: {str(e)}", step="数据收集")
data_sources.append(fallback_url_config)
merged_data = merge_url_data(*data_sources)
# 结果存储
result = {'url': {}}
stats = {'total': 0,'success': 0, 'failed': [], 'changed': []}
for cn_name, urls in merged_data.items():
stats['total'] += 1
site_key = site_mappings.get(cn_name)
existing_url = existing_config.get(site_key, '')
if cn_name == '星剧社':
best_source = get_best_url(urls, cn_name, existing_url)
final_url = get_star2_real_url(best_source) if best_source else existing_url
else:
final_url = get_best_url(urls, cn_name, existing_url) or existing_url
if final_url:
result['url'][site_key] = final_url
if existing_url and existing_url != final_url:
stats['changed'].append(f"{cn_name}: {existing_url}{final_url}")
log_message(f"[更新] 配置变更检测", cn_name, "结果处理")
stats['success'] += 1
else:
stats['failed'].append(cn_name)
log_message("[警告] 无可用URL", cn_name, "结果处理")
# 文件保存
output_files = {
'yuan.json': merged_data,
'url.json': result['url']
}
for filename, data in output_files.items():
try:
path = get_file_path(filename, is_input=False)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
log_message(f"[成功] 保存文件: {path}", step="数据持久化")
except Exception as e:
log_message(f"[错误] 文件保存失败: {str(e)}", step="数据持久化")
# 新增jsm更新流程
log_message("[开始] 启动jsm配置更新", step="主流程")
update_success = update_jsm_config(result['url'])
log_message(
f"[{'成功' if update_success else '失败'}] jsm配置更新完成",
step="主流程"
)
# 统计报告
log_message(
f"[完成] 处理结果: {stats['success']}/{stats['total']} 成功\n"
f"url.json变更项 ({len(stats['changed'])}):\n" + "\n".join(stats['changed']) + "\n"
f"url.json失败项 ({len(stats['failed'])}): {', '.join(stats['failed']) if stats['failed'] else ''}",
step="统计报告"
)
return stats['success'] > 0
def main():
warnings.simplefilter('ignore', InsecureRequestWarning)
process_urls()
if __name__ == "__main__":
start_time = time.time()
main()
elapsed = time.time() - start_time
print(f"总耗时: {elapsed:.2f}")