mirror of
https://github.com/yuanwangokk-1/TV-BOX.git
synced 2025-10-24 23:11:21 +00:00
445 lines
No EOL
16 KiB
Python
445 lines
No EOL
16 KiB
Python
import re
|
||
import sys
|
||
import urllib.parse
|
||
import threading
|
||
import time
|
||
import requests
|
||
import base64
|
||
import gzip
|
||
import json
|
||
from io import BytesIO
|
||
from pyquery import PyQuery as pq
|
||
|
||
sys.path.append('..')
|
||
from base.spider import Spider
|
||
|
||
class Spider(Spider):
|
||
def __init__(self):
|
||
self.name = "黄色仓库"
|
||
self.host = self.getDynamicHost()
|
||
self.classes = self.preprocessClasses()
|
||
|
||
def getName(self):
|
||
return self.name
|
||
|
||
def getDynamicHost(self):
|
||
"""动态获取主机地址"""
|
||
try:
|
||
# 解码base64获取初始主机
|
||
initial_host = base64.b64decode('aHR0cHM6Ly9oc2NrLm5ldA==').decode('utf-8')
|
||
|
||
# 获取初始页面
|
||
response = requests.get(initial_host, headers=self.header)
|
||
html = response.text
|
||
|
||
# 匹配strU参数
|
||
strU_match = re.search(r'strU="(.*?)"', html)
|
||
if not strU_match:
|
||
return initial_host
|
||
|
||
strU = strU_match.group(1)
|
||
locationU = strU + initial_host.rstrip('/') + '/&p=/'
|
||
|
||
# 获取重定向地址
|
||
redirect_response = requests.get(locationU, headers=self.header, allow_redirects=False)
|
||
if 'location' in redirect_response.headers:
|
||
return redirect_response.headers['location']
|
||
else:
|
||
# 尝试从JSON响应中获取
|
||
try:
|
||
data = redirect_response.json()
|
||
return data.get('location', initial_host)
|
||
except:
|
||
return initial_host
|
||
|
||
except Exception as e:
|
||
print(f"获取动态主机失败: {e}")
|
||
return "http://6590ck.cc/"
|
||
|
||
def preprocessClasses(self):
|
||
"""预处理分类数据"""
|
||
return [
|
||
{"type_name": "日韩AV", "type_id": "1"},
|
||
{"type_name": "国产系列", "type_id": "2"},
|
||
{"type_name": "欧美", "type_id": "3"},
|
||
{"type_name": "成人动漫", "type_id": "4"},
|
||
{"type_name": "日本有码", "type_id": "7"},
|
||
{"type_name": "一本道高清无码", "type_id": "8"},
|
||
{"type_name": "有码中文字幕", "type_id": "9"},
|
||
{"type_name": "日本无码", "type_id": "10"},
|
||
{"type_name": "国产视频", "type_id": "15"},
|
||
{"type_name": "欧美高清", "type_id": "21"},
|
||
{"type_name": "动漫剧情", "type_id": "22"}
|
||
]
|
||
|
||
def init(self, extend):
|
||
pass
|
||
|
||
def isVideoFormat(self, url):
|
||
pass
|
||
|
||
def manualVideoCheck(self):
|
||
pass
|
||
|
||
def homeContent(self, filter):
|
||
result = {}
|
||
result['class'] = self.classes
|
||
return result
|
||
|
||
def homeVideoContent(self):
|
||
"""推荐内容"""
|
||
result = {}
|
||
try:
|
||
url = f"{self.host.rstrip('/')}/"
|
||
rsp = self.fetch(url)
|
||
root = pq(rsp.text)
|
||
|
||
videos = []
|
||
list_items = root('.stui-vodlist li')
|
||
|
||
for item in list_items.items():
|
||
vid = item.find('a').attr('href')
|
||
if not vid or not vid.startswith('/vodplay/'):
|
||
continue
|
||
|
||
name = item.find('h4').text()
|
||
img = item.find('a').attr('data-original')
|
||
remark = item.find('.pic-text').text()
|
||
|
||
if not name or not img:
|
||
continue
|
||
|
||
videos.append({
|
||
"vod_id": vid, # 只保存相对路径
|
||
"vod_name": name,
|
||
"vod_pic": self.getFullUrl(img),
|
||
"vod_remarks": remark
|
||
})
|
||
|
||
result['list'] = videos
|
||
except Exception as e:
|
||
print(f"获取推荐内容失败: {e}")
|
||
result['list'] = []
|
||
|
||
return result
|
||
|
||
def categoryContent(self, tid, pg, filter, extend):
|
||
result = {}
|
||
try:
|
||
url = f"{self.host.rstrip('/')}/vodtype/{tid}-{pg}.html"
|
||
rsp = self.fetch(url)
|
||
root = pq(rsp.text)
|
||
|
||
videos = []
|
||
list_items = root('.stui-vodlist li')
|
||
|
||
for item in list_items.items():
|
||
vid = item.find('a').attr('href')
|
||
if not vid or not vid.startswith('/vodplay/'):
|
||
continue
|
||
|
||
name = item.find('h4').text()
|
||
img = item.find('a').attr('data-original')
|
||
remark = item.find('.pic-text').text()
|
||
|
||
if not name or not img:
|
||
continue
|
||
|
||
videos.append({
|
||
"vod_id": vid, # 只保存相对路径
|
||
"vod_name": name,
|
||
"vod_pic": self.getFullUrl(img),
|
||
"vod_remarks": remark
|
||
})
|
||
|
||
result['list'] = videos
|
||
result['page'] = int(pg)
|
||
result['pagecount'] = 9999
|
||
result['limit'] = 6
|
||
result['total'] = 999999
|
||
|
||
except Exception as e:
|
||
print(f"获取分类内容失败: {e}")
|
||
result['list'] = []
|
||
result['page'] = 1
|
||
result['pagecount'] = 1
|
||
result['limit'] = 6
|
||
result['total'] = 0
|
||
|
||
return result
|
||
|
||
def extractM3U8Url(self, script_text):
|
||
"""专门提取m3u8播放链接的方法"""
|
||
m3u8_urls = []
|
||
|
||
print("开始提取m3u8链接...")
|
||
|
||
# 方法1: 从player_aaaa JavaScript变量中提取
|
||
player_patterns = [
|
||
r'var\s+player_aaaa\s*=\s*({.*?});',
|
||
r'player_aaaa\s*=\s*({.*?});',
|
||
r'var\s+player_aaaa\s*=\s*({.*?})\s*<\/script>',
|
||
r'player_aaaa\s*=\s*({.*?})\s*<\/script>'
|
||
]
|
||
|
||
for pattern in player_patterns:
|
||
player_match = re.search(pattern, script_text, re.DOTALL)
|
||
if player_match:
|
||
try:
|
||
player_data_str = player_match.group(1)
|
||
print(f"找到player_aaaa数据: {player_data_str[:200]}...")
|
||
|
||
# 修复JSON字符串
|
||
player_data_str = player_data_str.replace('\\/', '/')
|
||
player_data = json.loads(player_data_str)
|
||
|
||
m3u8_url = player_data.get('url')
|
||
if m3u8_url and '.m3u8' in m3u8_url:
|
||
print(f"从player_aaaa提取到m3u8: {m3u8_url}")
|
||
# 确保URL完整
|
||
if not m3u8_url.startswith('http'):
|
||
if m3u8_url.startswith('//'):
|
||
m3u8_url = 'https:' + m3u8_url
|
||
else:
|
||
m3u8_url = self.getFullUrl(m3u8_url)
|
||
m3u8_urls.append(m3u8_url)
|
||
return m3u8_urls # 找到就返回
|
||
|
||
except Exception as e:
|
||
print(f"解析player_aaaa失败: {e}")
|
||
|
||
# 方法2: 直接搜索m3u8链接
|
||
m3u8_patterns = [
|
||
r'"url"\s*:\s*"([^"]+\.m3u8[^"]*)"',
|
||
r'url\s*:\s*"([^"]+\.m3u8[^"]*)"',
|
||
r'src\s*:\s*"([^"]+\.m3u8[^"]*)"',
|
||
r'file\s*:\s*"([^"]+\.m3u8[^"]*)"',
|
||
r'https?://[^\s"\'<>]+\.m3u8[^\s"\'<>]*'
|
||
]
|
||
|
||
for pattern in m3u8_patterns:
|
||
matches = re.findall(pattern, script_text)
|
||
for match in matches:
|
||
if '.m3u8' in match and match not in m3u8_urls:
|
||
print(f"从正则匹配提取到m3u8: {match}")
|
||
# 确保URL完整
|
||
if not match.startswith('http'):
|
||
if match.startswith('//'):
|
||
match = 'https:' + match
|
||
else:
|
||
match = self.getFullUrl(match)
|
||
m3u8_urls.append(match)
|
||
return m3u8_urls # 找到就返回
|
||
|
||
print("未找到m3u8播放链接")
|
||
return m3u8_urls
|
||
|
||
def detailContent(self, array):
|
||
"""二级详情页面解析 - 修复播放链接提取及简介使用标题"""
|
||
result = {}
|
||
try:
|
||
vid = array[0]
|
||
# 确保vid是完整URL
|
||
if not vid.startswith('http'):
|
||
vid = self.getFullUrl(vid)
|
||
|
||
print(f"开始解析详情页面: {vid}")
|
||
rsp = self.fetch(vid)
|
||
root = pq(rsp.text)
|
||
|
||
# 提取基本信息
|
||
title = root('.stui-pannel__head .title').text()
|
||
if not title:
|
||
title = root('title').text().split(' - ')[0]
|
||
|
||
# 提取封面图
|
||
pic = root('.stui-vodlist__thumb').attr('data-original') or root('.stui-vodlist__thumb').attr('src')
|
||
if not pic:
|
||
pic = root('img').attr('src')
|
||
|
||
# 获取所有script内容
|
||
script_text = root('script').text()
|
||
|
||
# 提取m3u8播放链接
|
||
m3u8_urls = self.extractM3U8Url(script_text)
|
||
|
||
# 构建播放链接
|
||
play_urls = []
|
||
if m3u8_urls:
|
||
for i, m3u8_url in enumerate(m3u8_urls):
|
||
play_urls.append(f"线路{i+1}${m3u8_url}")
|
||
else:
|
||
# 如果没有找到m3u8链接,尝试从页面其他位置提取
|
||
print("尝试从页面其他位置提取播放链接...")
|
||
# 从iframe中提取
|
||
iframe_src = root('iframe').attr('src')
|
||
if iframe_src and 'm3u8' in iframe_src:
|
||
if not iframe_src.startswith('http'):
|
||
iframe_src = self.getFullUrl(iframe_src)
|
||
play_urls.append(f"iframe线路${iframe_src}")
|
||
else:
|
||
# 最后使用详情页URL
|
||
play_urls.append(f"详情页线路${vid}")
|
||
|
||
# 用视频标题作为简介,避免没有简介内容
|
||
vod = {
|
||
"vod_id": array[0], # 保持原始ID
|
||
"vod_name": title,
|
||
"vod_pic": self.getFullUrl(pic) if pic else "",
|
||
"vod_content": title, # 用标题当简介
|
||
"vod_play_from": "老僧酿酒",
|
||
"vod_play_url": "#".join(play_urls) # 使用#分隔多个播放源
|
||
}
|
||
|
||
result['list'] = [vod]
|
||
print(f"详情页解析完成,播放链接: {vod['vod_play_url']}")
|
||
|
||
except Exception as e:
|
||
print(f"解析详情页面失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
# 返回基础信息
|
||
result['list'] = [{
|
||
"vod_id": array[0],
|
||
"vod_name": "未知标题",
|
||
"vod_pic": "",
|
||
"vod_content": "",
|
||
"vod_play_from": "默认线路",
|
||
"vod_play_url": f"详情页线路${array[0]}"
|
||
}]
|
||
|
||
return result
|
||
|
||
def searchContent(self, key, quick):
|
||
result = {}
|
||
try:
|
||
# 使用搜索URL
|
||
search_url = f"{self.host.rstrip('/')}/vodsearch/-------------.html?wd={urllib.parse.quote(key)}"
|
||
rsp = self.fetch(search_url)
|
||
root = pq(rsp.text)
|
||
|
||
videos = []
|
||
list_items = root('.stui-vodlist li')
|
||
|
||
for item in list_items.items():
|
||
vid = item.find('a').attr('href')
|
||
if not vid or not vid.startswith('/vodplay/'):
|
||
continue
|
||
|
||
name = item.find('h4').text()
|
||
img = item.find('a').attr('data-original')
|
||
remark = item.find('.pic-text').text()
|
||
|
||
if not name or not img:
|
||
continue
|
||
|
||
videos.append({
|
||
"vod_id": vid, # 只保存相对路径
|
||
"vod_name": name,
|
||
"vod_pic": self.getFullUrl(img),
|
||
"vod_remarks": remark
|
||
})
|
||
|
||
result['list'] = videos
|
||
except Exception as e:
|
||
print(f"搜索失败: {e}")
|
||
result['list'] = []
|
||
|
||
return result
|
||
|
||
def playerContent(self, flag, id, vipFlags):
|
||
"""播放页面解析 - 修复数组越界问题"""
|
||
result = {}
|
||
try:
|
||
print(f"playerContent被调用: flag={flag}, id={id}")
|
||
|
||
# 如果id已经是m3u8链接,直接返回
|
||
if id.startswith('http') and '.m3u8' in id:
|
||
result["parse"] = 0
|
||
result["playUrl"] = ""
|
||
result["url"] = id
|
||
result["header"] = self.header
|
||
print(f"直接返回m3u8链接: {id}")
|
||
return result
|
||
|
||
# 如果id是播放线路格式,提取m3u8链接
|
||
if '#' in id:
|
||
play_sources = id.split('#')
|
||
for source in play_sources:
|
||
if '$' in source:
|
||
_, url = source.split('$', 1)
|
||
if '.m3u8' in url:
|
||
result["parse"] = 0
|
||
result["playUrl"] = ""
|
||
result["url"] = url
|
||
result["header"] = self.header
|
||
print(f"从播放线路提取到m3u8: {url}")
|
||
return result
|
||
|
||
# 如果id是详情页链接,重新解析详情页
|
||
print(f"重新解析详情页获取m3u8: {id}")
|
||
detail_result = self.detailContent([id])
|
||
|
||
if detail_result and 'list' in detail_result and detail_result['list']:
|
||
vod = detail_result['list'][0]
|
||
play_url = vod.get('vod_play_url', '')
|
||
print(f"从详情页获取的播放链接: {play_url}")
|
||
|
||
# 解析播放链接
|
||
if '#' in play_url:
|
||
play_sources = play_url.split('#')
|
||
for source in play_sources:
|
||
if '$' in source:
|
||
_, url = source.split('$', 1)
|
||
if '.m3u8' in url:
|
||
result["parse"] = 0
|
||
result["playUrl"] = ""
|
||
result["url"] = url
|
||
result["header"] = self.header
|
||
print(f"最终提取到m3u8: {url}")
|
||
return result
|
||
|
||
# 如果没有找到m3u8,使用第一个播放源
|
||
if play_sources:
|
||
first_source = play_sources[0]
|
||
if '$' in first_source:
|
||
_, url = first_source.split('$', 1)
|
||
result["parse"] = 0
|
||
result["playUrl"] = ""
|
||
result["url"] = url
|
||
result["header"] = self.header
|
||
print(f"使用第一个播放源: {url}")
|
||
return result
|
||
|
||
# 如果所有方法都失败,返回空结果
|
||
print("无法提取播放链接,返回空结果")
|
||
return {}
|
||
|
||
except Exception as e:
|
||
print(f"解析播放页面失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return {}
|
||
|
||
def getFullUrl(self, url):
|
||
"""获取完整的URL"""
|
||
if not url:
|
||
return ""
|
||
if url.startswith('http'):
|
||
return url
|
||
if url.startswith('//'):
|
||
return f"https:{url}"
|
||
return f"{self.host.rstrip('/')}{url}"
|
||
|
||
config = {
|
||
"player": {},
|
||
"filter": {}
|
||
}
|
||
header = {
|
||
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1",
|
||
"Referer": "https://hsck123.com/"
|
||
}
|
||
|
||
def localProxy(self, param):
|
||
action = {}
|
||
return action |