TV-BOX/tvbox/pro/py/两个BT.py
yuanwangokk-1 365cc43aa9 main
2025-08-19 14:44:38 +08:00

599 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
# !/usr/bin/python
import sys
sys.path.append('..')
from base.spider import Spider
import json
import time
import urllib.parse
import re
import base64
class Spider(Spider):
def getName(self):
return "两个BT"
def init(self, extend=""):
self.host = "https://www.bttwoo.com"
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Connection': 'keep-alive',
'Referer': self.host
}
self.log(f"两个BT爬虫初始化完成主站: {self.host}")
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def homeContent(self, filter):
"""首页内容 - TVBox标准实现"""
result = {}
# 1. 定义分类 - 基于实际网站结构
classes = [
{'type_id': 'movie_bt_tags/xiju', 'type_name': '喜剧'},
{'type_id': 'movie_bt_tags/aiqing', 'type_name': '爱情'},
{'type_id': 'movie_bt_tags/adt', 'type_name': '冒险'},
{'type_id': 'movie_bt_tags/at', 'type_name': '动作'},
{'type_id': 'movie_bt_tags/donghua', 'type_name': '动画'},
{'type_id': 'movie_bt_tags/qihuan', 'type_name': '奇幻'},
{'type_id': 'movie_bt_tags/xuanni', 'type_name': '悬疑'},
{'type_id': 'movie_bt_tags/kehuan', 'type_name': '科幻'},
{'type_id': 'movie_bt_tags/juqing', 'type_name': '剧情'},
{'type_id': 'movie_bt_tags/kongbu', 'type_name': '恐怖'},
{'type_id': 'meiju', 'type_name': '美剧'},
{'type_id': 'gf', 'type_name': '高分电影'}
]
result['class'] = classes
# 2. 添加筛选配置
result['filters'] = self._get_filters()
# 3. 获取首页推荐内容
try:
rsp = self.fetch(self.host, headers=self.headers)
doc = self.html(rsp.text)
videos = self._get_videos(doc, limit=50)
result['list'] = videos
except Exception as e:
self.log(f"首页获取出错: {str(e)}")
result['list'] = []
return result
def homeVideoContent(self):
"""兼容性方法 - 提供分类定义"""
return {
'class': [
{'type_id': 'movie_bt_tags/xiju', 'type_name': '喜剧'},
{'type_id': 'movie_bt_tags/aiqing', 'type_name': '爱情'},
{'type_id': 'movie_bt_tags/adt', 'type_name': '冒险'},
{'type_id': 'movie_bt_tags/at', 'type_name': '动作'},
{'type_id': 'movie_bt_tags/donghua', 'type_name': '动画'},
{'type_id': 'movie_bt_tags/qihuan', 'type_name': '奇幻'},
{'type_id': 'movie_bt_tags/xuanni', 'type_name': '悬疑'},
{'type_id': 'movie_bt_tags/kehuan', 'type_name': '科幻'},
{'type_id': 'movie_bt_tags/juqing', 'type_name': '剧情'},
{'type_id': 'movie_bt_tags/kongbu', 'type_name': '恐怖'},
{'type_id': 'meiju', 'type_name': '美剧'},
{'type_id': 'gf', 'type_name': '高分电影'}
],
'filters': self._get_filters()
}
def categoryContent(self, tid, pg, filter, extend):
"""分类内容 - 支持筛选功能"""
try:
# 合并filter和extend参数
if filter and isinstance(filter, dict):
if not extend:
extend = {}
extend.update(filter)
self.log(f"分类请求: tid={tid}, pg={pg}, extend={extend}")
url = self._build_url(tid, pg, extend)
if not url:
return {'list': []}
rsp = self.fetch(url, headers=self.headers)
doc = self.html(rsp.text)
videos = self._get_videos(doc, limit=20)
return {
'list': videos,
'page': int(pg),
'pagecount': 999,
'limit': 20,
'total': 19980
}
except Exception as e:
self.log(f"分类内容获取出错: {str(e)}")
return {'list': []}
def searchContent(self, key, quick, pg="1"):
"""搜索功能 - 智能过滤"""
try:
search_url = f"{self.host}/xssssearch?q={urllib.parse.quote(key)}"
if pg and pg != "1":
search_url += f"&p={pg}"
self.log(f"搜索URL: {search_url}")
rsp = self.fetch(search_url, headers=self.headers)
doc = self.html(rsp.text)
videos = []
seen_ids = set()
# 搜索结果选择器
elements = doc.xpath('//li[contains(@class,"") and .//a[contains(@href,"/movie/")]]')
self.log(f"找到 {len(elements)} 个搜索结果元素")
for elem in elements:
video = self._extract_video_info(elem, is_search=True)
if video and video['vod_id'] not in seen_ids:
# 添加相关性检查
if self._is_relevant_search_result(video['vod_name'], key):
videos.append(video)
seen_ids.add(video['vod_id'])
self.log(f"✅ 相关视频: {video['vod_name']} (ID: {video['vod_id']})")
else:
self.log(f"❌ 过滤无关: {video['vod_name']} (搜索: {key})")
self.log(f"最终搜索结果: {len(videos)} 个视频")
return {'list': videos}
except Exception as e:
self.log(f"搜索出错: {str(e)}")
return {'list': []}
def detailContent(self, ids):
"""详情页面"""
try:
vid = ids[0]
detail_url = f"{self.host}/movie/{vid}.html"
rsp = self.fetch(detail_url, headers=self.headers)
doc = self.html(rsp.text)
video_info = self._get_detail(doc, vid)
return {'list': [video_info]} if video_info else {'list': []}
except Exception as e:
self.log(f"详情获取出错: {str(e)}")
return {'list': []}
def playerContent(self, flag, id, vipFlags):
"""播放链接"""
try:
self.log(f"获取播放链接: flag={flag}, id={id}")
# 解码Base64播放ID
try:
decoded_id = base64.b64decode(id).decode('utf-8')
self.log(f"解码播放ID: {decoded_id}")
except:
decoded_id = id
play_url = f"{self.host}/v_play/{id}.html"
# 返回播放页面URL让播放器处理
return {'parse': 1, 'playUrl': '', 'url': play_url}
except Exception as e:
self.log(f"播放链接获取出错: {str(e)}")
return {'parse': 1, 'playUrl': '', 'url': f"{self.host}/v_play/{id}.html"}
# ========== 辅助方法 ==========
def _get_filters(self):
"""获取筛选配置 - TVBox兼容版"""
base_filters = [
{
'key': 'area',
'name': '地区',
'value': [
{'n': '全部', 'v': ''},
{'n': '中国大陆', 'v': '中国大陆'},
{'n': '美国', 'v': '美国'},
{'n': '韩国', 'v': '韩国'},
{'n': '日本', 'v': '日本'},
{'n': '英国', 'v': '英国'},
{'n': '法国', 'v': '法国'},
{'n': '德国', 'v': '德国'},
{'n': '其他', 'v': '其他'}
]
},
{
'key': 'year',
'name': '年份',
'value': [
{'n': '全部', 'v': ''},
{'n': '2025', 'v': '2025'},
{'n': '2024', 'v': '2024'},
{'n': '2023', 'v': '2023'},
{'n': '2022', 'v': '2022'},
{'n': '2021', 'v': '2021'},
{'n': '2020', 'v': '2020'},
{'n': '2019', 'v': '2019'},
{'n': '2018', 'v': '2018'}
]
}
]
# 为每个分类提供筛选配置
filters = {}
category_ids = [
'movie_bt_tags/xiju', 'movie_bt_tags/aiqing', 'movie_bt_tags/adt',
'movie_bt_tags/at', 'movie_bt_tags/donghua', 'movie_bt_tags/qihuan',
'movie_bt_tags/xuanni', 'movie_bt_tags/kehuan', 'movie_bt_tags/juqing',
'movie_bt_tags/kongbu', 'meiju', 'gf'
]
for category_id in category_ids:
filters[category_id] = base_filters
return filters
def _build_url(self, tid, pg, extend):
"""构建URL - 支持筛选"""
try:
# 基础分类URL映射
if tid.startswith('movie_bt_tags/'):
url = f"{self.host}/{tid}"
elif tid == 'meiju':
url = f"{self.host}/meiju"
elif tid == 'gf':
url = f"{self.host}/gf"
else:
url = f"{self.host}/{tid}"
# 添加分页
if pg and pg != '1':
if '?' in url:
url += f"&paged={pg}"
else:
url += f"?paged={pg}"
return url
except Exception as e:
self.log(f"构建URL出错: {str(e)}")
return f"{self.host}/movie_bt_tags/xiju"
def _get_videos(self, doc, limit=None):
"""获取视频列表"""
try:
videos = []
seen_ids = set()
# 尝试多种选择器
selectors = [
'//li[.//a[contains(@href,"/movie/")]]',
'//div[contains(@class,"item")]//li[.//a[contains(@href,"/movie/")]]'
]
for selector in selectors:
elements = doc.xpath(selector)
if elements:
for elem in elements:
video = self._extract_video_info(elem)
if video and video['vod_id'] not in seen_ids:
videos.append(video)
seen_ids.add(video['vod_id'])
break
return videos[:limit] if limit and videos else videos
except Exception as e:
self.log(f"获取视频列表出错: {str(e)}")
return []
def _extract_video_info(self, element, is_search=False):
"""提取视频信息"""
try:
# 提取链接
links = element.xpath('.//a[contains(@href,"/movie/")]/@href')
if not links:
return None
link = links[0]
if link.startswith('/'):
link = self.host + link
vod_id = self.regStr(r'/movie/(\d+)\.html', link)
if not vod_id:
return None
# 提取标题
title_selectors = [
'.//h3/a/text()',
'.//h3/text()',
'.//a/@title',
'.//a/text()'
]
title = ''
for selector in title_selectors:
titles = element.xpath(selector)
for t in titles:
if t and t.strip() and len(t.strip()) > 1:
title = t.strip()
break
if title:
break
if not title:
return None
# 提取图片
pic = self._extract_image(element, is_search, vod_id)
# 提取备注
remarks = self._extract_remarks(element)
return {
'vod_id': vod_id,
'vod_name': title,
'vod_pic': pic,
'vod_remarks': remarks,
'vod_year': ''
}
except Exception as e:
self.log(f"提取视频信息出错: {str(e)}")
return None
def _extract_image(self, element, is_search=False, vod_id=None):
"""图片提取 - 处理懒加载"""
pic_selectors = [
'.//img/@data-original',
'.//img/@data-src',
'.//img/@src'
]
for selector in pic_selectors:
pics = element.xpath(selector)
for p in pics:
# 跳过懒加载占位符
if (p and not p.endswith('blank.gif') and
not p.startswith('data:image/') and 'base64' not in p):
if p.startswith('//'):
return 'https:' + p
elif p.startswith('/'):
return self.host + p
elif p.startswith('http'):
return p
# 搜索页面特殊处理:从详情页面获取
if is_search and vod_id:
return self._get_image_from_detail(vod_id)
return ''
def _extract_remarks(self, element):
"""提取备注信息"""
remarks_selectors = [
'.//span[contains(@class,"rating")]/text()',
'.//div[contains(@class,"rating")]/text()',
'.//span[contains(@class,"status")]/text()',
'.//div[contains(@class,"status")]/text()',
'.//span[contains(text(),"")]/text()',
'.//span[contains(text(),"1080p")]/text()',
'.//span[contains(text(),"HD")]/text()'
]
for selector in remarks_selectors:
remarks_list = element.xpath(selector)
for r in remarks_list:
if r and r.strip():
return r.strip()
return ''
def _get_image_from_detail(self, vod_id):
"""从详情页面获取图片"""
try:
detail_url = f"{self.host}/movie/{vod_id}.html"
rsp = self.fetch(detail_url, headers=self.headers)
doc = self.html(rsp.text)
# 详情页图片选择器
pic_selectors = [
'//img[contains(@class,"poster")]/@src',
'//div[contains(@class,"poster")]//img/@src',
'//img[contains(@alt,"")]/@src'
]
for selector in pic_selectors:
pics = doc.xpath(selector)
for p in pics:
if p and not p.endswith('blank.gif'):
if p.startswith('//'):
return 'https:' + p
elif p.startswith('/'):
return self.host + p
elif p.startswith('http'):
return p
except:
pass
return ''
def _is_relevant_search_result(self, title, search_key):
"""检查搜索结果是否与搜索关键词相关"""
if not title or not search_key:
return False
title_lower = title.lower()
search_key_lower = search_key.lower()
# 直接包含搜索关键词的肯定相关
if search_key_lower in title_lower:
return True
# 字符匹配
search_chars = set(search_key_lower.replace(' ', ''))
title_chars = set(title_lower.replace(' ', ''))
if len(search_chars) > 0:
match_ratio = len(search_chars & title_chars) / len(search_chars)
if match_ratio >= 0.6:
return True
# 短搜索词要求严格匹配
if len(search_key_lower) <= 2:
return search_key_lower in title_lower
return False
def _get_detail(self, doc, vod_id):
"""获取详情信息"""
try:
# 提取标题
title_selectors = [
'//h1/text()',
'//h2/text()',
'//title/text()'
]
title = ''
for selector in title_selectors:
titles = doc.xpath(selector)
for t in titles:
if t and t.strip():
title = t.strip()
break
if title:
break
# 提取图片
pic_selectors = [
'//img[contains(@class,"poster")]/@src',
'//div[contains(@class,"poster")]//img/@src',
'//img/@src'
]
pic = ''
for selector in pic_selectors:
pics = doc.xpath(selector)
for p in pics:
if p and not p.endswith('blank.gif'):
if p.startswith('//'):
pic = 'https:' + p
elif p.startswith('/'):
pic = self.host + p
elif p.startswith('http'):
pic = p
break
if pic:
break
# 提取描述
desc_selectors = [
'//div[contains(@class,"intro")]//text()',
'//div[contains(@class,"description")]//text()',
'//p[contains(@class,"desc")]//text()'
]
desc = ''
for selector in desc_selectors:
descs = doc.xpath(selector)
desc_parts = []
for d in descs:
if d and d.strip():
desc_parts.append(d.strip())
if desc_parts:
desc = ' '.join(desc_parts)
break
# 提取演员
actor_selectors = [
'//li[contains(text(),"主演")]/text()',
'//span[contains(text(),"主演")]/following-sibling::text()',
'//div[contains(@class,"actor")]//text()'
]
actor = ''
for selector in actor_selectors:
actors = doc.xpath(selector)
for a in actors:
if a and a.strip() and '主演' in a:
actor = a.strip().replace('主演:', '').replace('主演', '')
break
if actor:
break
# 提取导演
director_selectors = [
'//li[contains(text(),"导演")]/text()',
'//span[contains(text(),"导演")]/following-sibling::text()',
'//div[contains(@class,"director")]//text()'
]
director = ''
for selector in director_selectors:
directors = doc.xpath(selector)
for d in directors:
if d and d.strip() and '导演' in d:
director = d.strip().replace('导演:', '').replace('导演', '')
break
if director:
break
# 提取播放源
play_sources = self._parse_play_sources(doc, vod_id)
return {
'vod_id': vod_id,
'vod_name': title,
'vod_pic': pic,
'type_name': '',
'vod_year': '',
'vod_area': '',
'vod_remarks': '',
'vod_actor': actor,
'vod_director': director,
'vod_content': desc,
'vod_play_from': '$$$'.join([source['name'] for source in play_sources]),
'vod_play_url': '$$$'.join([source['episodes'] for source in play_sources])
}
except Exception as e:
self.log(f"获取详情出错: {str(e)}")
return None
def _parse_play_sources(self, doc, vod_id):
"""解析播放源"""
try:
play_sources = []
# 查找播放链接
episode_selectors = [
'//a[contains(@href,"/v_play/")]',
'//div[contains(@class,"play")]//a'
]
episodes = []
for selector in episode_selectors:
episode_elements = doc.xpath(selector)
if episode_elements:
for ep in episode_elements:
ep_title = ep.xpath('./text()')[0] if ep.xpath('./text()') else ''
ep_url = ep.xpath('./@href')[0] if ep.xpath('./@href') else ''
if ep_title and ep_url:
# 提取播放ID
play_id = self.regStr(r'/v_play/([^.]+)\.html', ep_url)
if play_id:
episodes.append(f"{ep_title.strip()}${play_id}")
break
if episodes:
play_sources.append({
'name': '默认播放',
'episodes': '#'.join(episodes)
})
else:
# 默认播放源
play_sources.append({
'name': '默认播放',
'episodes': f'第1集$bXZfMTM0NTY4LW5tXzE='
})
return play_sources
except Exception as e:
self.log(f"解析播放源出错: {str(e)}")
return [{'name': '默认播放', 'episodes': f'第1集$bXZfMTM0NTY4LW5tXzE='}]