mirror of
https://github.com/BuddyChewChew/xumo-playlist-generator.git
synced 2025-04-29 21:37:42 +02:00
437 lines
30 KiB
Python
437 lines
30 KiB
Python
# -*- coding: utf-8 -*-
|
|
import requests
|
|
import json
|
|
import os
|
|
import gzip
|
|
from datetime import datetime, timedelta, timezone
|
|
import xml.etree.ElementTree as ET
|
|
import logging
|
|
import time
|
|
import re
|
|
import uuid # Needed for IFA placeholder
|
|
import sys
|
|
import math # For calculating total pages
|
|
|
|
# --- Configuration ---
|
|
ANDROID_TV_ENDPOINT = "https://android-tv-mds.xumo.com/v2" # Keep for asset/broadcast lookups
|
|
VALENCIA_API_ENDPOINT = "https://valencia-app-mds.xumo.com/v2" # Primary endpoint
|
|
GEO_ID = "us"
|
|
|
|
# --- List IDs ---
|
|
PRIMARY_LIST_ID = "10006" # Use the Valencia/Web List ID for BOTH primary channel list AND EPG now
|
|
# EPG/Asset lookup still might rely on Android TV endpoints internally for data
|
|
ASSET_LOOKUP_ENDPOINT = ANDROID_TV_ENDPOINT # Use Android TV endpoint for asset/broadcast
|
|
|
|
# --- Endpoint URLs ---
|
|
PRIMARY_CHANNEL_LIST_URL = f"{VALENCIA_API_ENDPOINT}/proxy/channels/list/{PRIMARY_LIST_ID}.json?geoId={GEO_ID}"
|
|
|
|
# URLs for Asset/Stream lookup (Using Android TV endpoint as these might be more stable/direct for this purpose)
|
|
BROADCAST_NOW_URL_TEMPLATE = f"{ASSET_LOOKUP_ENDPOINT}/channels/channel/{{channel_id}}/broadcast.json?hour={{hour_num}}"
|
|
ASSET_DETAILS_URL_TEMPLATE = f"{ASSET_LOOKUP_ENDPOINT}/assets/asset/{{asset_id}}.json?f=providers"
|
|
|
|
# EPG URL (Using Valencia endpoint and primary list ID, with hourly iteration)
|
|
EPG_FETCH_URL_TEMPLATE = f"{VALENCIA_API_ENDPOINT}/epg/{PRIMARY_LIST_ID}/{{date_str}}/{{hour}}.json?limit=50&offset={{offset}}&f=asset.title&f=asset.descriptions"
|
|
|
|
XUMO_LOGO_URL_TEMPLATE = "https://image.xumo.com/v1/channels/channel/{channel_id}/168x168.png?type=color_onBlack"
|
|
|
|
# --- Script Settings ---
|
|
EPG_FETCH_DAYS = 2
|
|
MAX_EPG_OFFSET = 350
|
|
HOURS_TO_FETCH = 24
|
|
API_DELAY_SECONDS = 0.10
|
|
OUTPUT_DIR = "playlists"
|
|
PLAYLIST_FILENAME = "xumo_playlist.m3u"
|
|
EPG_FILENAME = "xumo_epg.xml.gz"
|
|
REQUEST_TIMEOUT = 45
|
|
|
|
# !!! GitHub Repo Info !!!
|
|
GITHUB_USER = "BuddyChewChew"
|
|
GITHUB_REPO = "xumo-playlist-generator"
|
|
GITHUB_BRANCH = "main"
|
|
EPG_RAW_URL = f"https://raw.githubusercontent.com/{GITHUB_USER}/{GITHUB_REPO}/{GITHUB_BRANCH}/{OUTPUT_DIR}/{EPG_FILENAME}"
|
|
|
|
# --- Headers ---
|
|
WEB_HEADERS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'en-US,en;q=0.9', 'Origin': 'https://play.xumo.com', 'Referer': 'https://play.xumo.com/', }
|
|
# Use Android TV Headers for Asset/Broadcast lookup calls
|
|
ANDROID_TV_HEADERS = { 'User-Agent': 'okhttp/4.9.3', }
|
|
|
|
# --- Logging Setup ---
|
|
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - [%(funcName)s] %(message)s', stream=sys.stdout)
|
|
|
|
|
|
# --- Helper Functions ---
|
|
|
|
def fetch_data(url, params=None, is_json=True, retries=2, delay=2, headers=WEB_HEADERS): # Default to Web Headers
|
|
"""Fetches data from a URL, handles JSON parsing and errors, includes retries."""
|
|
logging.debug(f"URL: {url}, Params: {params}")
|
|
# logging.debug(f"Headers: {json.dumps(headers)}") # Can be verbose
|
|
for attempt in range(retries + 1):
|
|
try:
|
|
# Pass specific headers for the call
|
|
response = requests.get(url, headers=headers, params=params, timeout=REQUEST_TIMEOUT, allow_redirects=True)
|
|
logging.debug(f"Request URL: {response.url} -> Status: {response.status_code}")
|
|
response.raise_for_status()
|
|
if is_json:
|
|
if not response.content: logging.warning(f"Empty response content received from {url}"); return None
|
|
try: return response.json()
|
|
except json.JSONDecodeError as e_final:
|
|
logging.error(f"Error decoding JSON. Content: {response.text[:500]}... - {e_final}")
|
|
if logging.getLogger().level == logging.DEBUG: logging.debug(f"Full Text:\n{response.text}")
|
|
return None
|
|
else:
|
|
try:
|
|
decoded_text = response.content.decode('utf-8', errors='ignore')
|
|
# Limit logging of non-json text unless debugging specific text issue
|
|
# if logging.getLogger().level == logging.DEBUG: logging.debug(f"Raw Text Response:\n{decoded_text[:1500]}...")
|
|
return decoded_text
|
|
except Exception as decode_ex: logging.error(f"Error decoding text response: {decode_ex}"); return None
|
|
except requests.exceptions.HTTPError as e:
|
|
logging.warning(f"Attempt {attempt+1}/{retries+1} HTTP Error: {e}")
|
|
if response is not None: logging.warning(f"Error Response Content: {response.text[:500]}...")
|
|
# Don't retry 404
|
|
if attempt < retries and response is not None and response.status_code not in [401, 403, 404, 429]:
|
|
time.sleep(delay * (attempt + 1))
|
|
elif attempt == retries: logging.error(f"Final attempt failed with HTTP Error: {e}"); return None
|
|
else: break # Non-retriable HTTP error or final attempt failed (like 404)
|
|
except requests.exceptions.RequestException as e:
|
|
logging.warning(f"Attempt {attempt+1}/{retries+1} Network Error: {e}")
|
|
if attempt < retries: time.sleep(delay * (attempt + 1))
|
|
elif attempt == retries: logging.error(f"Final attempt failed with Network Error: {e}"); return None
|
|
return None
|
|
|
|
def parse_iso_datetime(iso_time_str):
|
|
"""Parses ISO 8601 string, handling 'Z', milliseconds, and '+HHMM' timezone format."""
|
|
if not iso_time_str: logging.debug("parse_iso_datetime received empty string."); return None
|
|
try:
|
|
original_str = iso_time_str
|
|
if iso_time_str.endswith('Z'): iso_time_str = iso_time_str[:-1] + '+00:00'
|
|
if '.' in iso_time_str:
|
|
offset_str = ""; plus_index = iso_time_str.rfind('+'); minus_index = iso_time_str.rfind('-'); t_index = iso_time_str.find('T'); offset_index = -1
|
|
if plus_index > t_index: offset_index = plus_index
|
|
if minus_index > t_index: offset_index = max(offset_index, minus_index)
|
|
if offset_index != -1: offset_str = iso_time_str[offset_index:]; iso_time_str = iso_time_str[:offset_index]
|
|
iso_time_str = iso_time_str.split('.', 1)[0]; iso_time_str += offset_str
|
|
if len(iso_time_str) >= 5 and iso_time_str[-5] in ['+', '-'] and iso_time_str[-4:].isdigit():
|
|
if ':' not in iso_time_str[-5:]: iso_time_str = iso_time_str[:-2] + ':' + iso_time_str[-2:]; logging.debug(f"Inserted colon in timezone offset: {iso_time_str}")
|
|
if '+' not in iso_time_str[10:] and '-' not in iso_time_str[10:]: logging.debug(f"Adding default +00:00 offset to '{iso_time_str}'"); iso_time_str += "+00:00"
|
|
dt_obj = datetime.fromisoformat(iso_time_str)
|
|
return dt_obj.astimezone(timezone.utc)
|
|
except Exception as e: logging.warning(f"Parse failed for input '{original_str}' (processed as '{iso_time_str}'): {e}"); return None
|
|
|
|
def format_xmltv_time(dt_obj):
|
|
"""Formats datetime object into XMLTV time (YYYYMMDDHHMMSS +HHMM)."""
|
|
if not isinstance(dt_obj, datetime): logging.warning(f"format_xmltv_time received non-datetime object: {type(dt_obj)}"); return ""
|
|
if not dt_obj.tzinfo: dt_obj_utc = dt_obj.replace(tzinfo=timezone.utc)
|
|
else: dt_obj_utc = dt_obj.astimezone(timezone.utc)
|
|
main_part = dt_obj_utc.strftime('%Y%m%d%H%M%S'); offset_part = dt_obj_utc.strftime('%z')
|
|
offset_part_clean = offset_part.replace(':', ''); full_time_str = f"{main_part} {offset_part_clean}"
|
|
# logging.debug(f"Formatted time: {full_time_str}"); # Keep commented unless debugging format
|
|
return full_time_str
|
|
|
|
def ensure_output_dir():
|
|
if not os.path.exists(OUTPUT_DIR):
|
|
logging.info(f"Creating output directory: {OUTPUT_DIR}")
|
|
try: os.makedirs(OUTPUT_DIR)
|
|
except OSError as e: logging.error(f"Failed to create directory {OUTPUT_DIR}: {e}"); raise
|
|
|
|
ADD_XMLTV_DOCTYPE = True # Keep DOCTYPE addition
|
|
|
|
def save_gzipped_xml(tree, filepath):
|
|
"""Saves the ElementTree XML to a gzipped file, optionally adding DOCTYPE."""
|
|
try:
|
|
if ADD_XMLTV_DOCTYPE:
|
|
xml_partial_string = ET.tostring(tree.getroot(), encoding='unicode', method='xml')
|
|
xml_full_string = f'''<?xml version="1.0" encoding="UTF-8"?>
|
|
<!DOCTYPE tv SYSTEM "xmltv.dtd">
|
|
{xml_partial_string}'''
|
|
xml_bytes = xml_full_string.encode('utf-8'); logging.debug("Adding DOCTYPE to XML output.")
|
|
else:
|
|
xml_bytes = ET.tostring(tree.getroot(), encoding='UTF-8', xml_declaration=True); logging.debug("Saving XML without DOCTYPE.")
|
|
with gzip.open(filepath, 'wb') as f: f.write(xml_bytes)
|
|
logging.info(f"Gzipped EPG XML file saved: {filepath}")
|
|
except Exception as e: logging.error(f"Error writing gzipped EPG file {filepath}: {e}")
|
|
|
|
def save_m3u(content, filepath):
|
|
try:
|
|
with open(filepath, 'w', encoding='utf-8') as f: f.write(content)
|
|
logging.info(f"M3U playlist file saved: {filepath}")
|
|
except Exception as e: logging.error(f"Error writing M3U file {filepath}: {e}")
|
|
|
|
def process_stream_uri(uri):
|
|
if not uri: return None
|
|
try:
|
|
uri = uri.replace('[PLATFORM]', "web"); uri = uri.replace('[APP_VERSION]', "1.0.0"); uri = uri.replace('[timestamp]', str(int(time.time()*1000)))
|
|
uri = uri.replace('[app_bundle]', "web.xumo.com"); uri = uri.replace('[device_make]', "GitHubAction"); uri = uri.replace('[device_model]', "PythonScript")
|
|
uri = uri.replace('[content_language]', "en"); uri = uri.replace('[IS_LAT]', "0"); uri = uri.replace('[IFA]', str(uuid.uuid4()))
|
|
uri = uri.replace('[SESSION_ID]', str(uuid.uuid4())); uri = uri.replace('[DEVICE_ID]', str(uuid.uuid4().hex))
|
|
uri = re.sub(r'\[([^]]+)\]', '', uri)
|
|
return uri
|
|
except Exception as e: logging.error(f"Error processing stream URI '{uri[:50]}...': {e}"); return None
|
|
|
|
|
|
# --- Core Logic Functions ---
|
|
|
|
def get_channels_via_primary_list():
|
|
"""Gets channel list from the primary Valencia endpoint."""
|
|
logging.info(f"Attempting Primary Channel List: {PRIMARY_CHANNEL_LIST_URL}")
|
|
# Use WEB_HEADERS for Valencia endpoint
|
|
data = fetch_data(PRIMARY_CHANNEL_LIST_URL, is_json=True, retries=1, headers=WEB_HEADERS)
|
|
if not data or not isinstance(data, dict): logging.warning(f"Failed to fetch valid dictionary data from primary list endpoint."); return None
|
|
processed_channels = []; channel_items = []
|
|
if 'channel' in data and isinstance(data['channel'], dict) and 'item' in data['channel'] and isinstance(data['channel']['item'], list):
|
|
channel_items = data['channel']['item']; logging.debug("Found channel list under data['channel']['item']")
|
|
elif 'items' in data and isinstance(data['items'], list):
|
|
channel_items = data['items']; logging.debug("Found channel list under data['items']")
|
|
else: logging.error(f"Could not find channel list in primary list response. Top-level keys: {list(data.keys())}"); return None
|
|
logging.info(f"Found {len(channel_items)} potential channel items in primary list response.")
|
|
if not channel_items: logging.warning("Primary list response contained an empty channel list."); return None
|
|
for item in channel_items:
|
|
callsign = item.get('callsign', ''); properties = item.get('properties', {})
|
|
is_live = properties.get('is_live') == "true"; is_drm = callsign.endswith("-DRM") or callsign.endswith("DRM-CMS")
|
|
if is_drm: logging.debug(f"Skipping potential DRM channel: {item.get('guid', {}).get('value')} ({item.get('title')})"); continue
|
|
if not is_live: logging.debug(f"Skipping non-live channel: {item.get('guid', {}).get('value')} ({item.get('title')})"); continue
|
|
try:
|
|
channel_id = item.get('guid', {}).get('value'); title = item.get('title'); number_str = item.get('number')
|
|
logo_url = item.get('images', {}).get('logo') or item.get('logo'); genre_list = item.get('genre'); genre = 'General'
|
|
if isinstance(genre_list, list) and len(genre_list) > 0 and isinstance(genre_list[0], dict): genre = genre_list[0].get('value', 'General')
|
|
elif isinstance(genre_list, str): genre = genre_list
|
|
if not channel_id or not title: logging.warning(f"Skipping item due to missing ID or title: {item}"); continue
|
|
channel_id_str = str(channel_id); final_logo_url = None
|
|
if logo_url:
|
|
if logo_url.startswith('//'): final_logo_url = 'https:' + logo_url
|
|
elif logo_url.startswith('/'): final_logo_url = 'https://image.xumo.com' + logo_url
|
|
else: final_logo_url = logo_url
|
|
else: final_logo_url = XUMO_LOGO_URL_TEMPLATE.replace("{channel_id}", channel_id_str)
|
|
processed_channels.append({ 'id': channel_id_str, 'name': title, 'number': str(number_str) if number_str else None, 'callsign': callsign, 'logo': final_logo_url, 'group': genre, 'stream_url': None })
|
|
except Exception as e: logging.warning(f"Error processing channel list item {item.get('id', 'N/A')}: {e}", exc_info=True)
|
|
if not processed_channels: logging.warning("Primary channel list endpoint returned data, but no channels could be successfully processed."); return None
|
|
logging.info(f"Successfully processed {len(processed_channels)} live, non-DRM channels from primary list endpoint.")
|
|
return processed_channels
|
|
|
|
# <<< FUNCTION ADDED BACK >>>
|
|
def fetch_stream_urls_via_asset_lookup(channels_list):
|
|
"""Fetches stream URLs via Android TV asset lookup method for ALL provided channels."""
|
|
# This function now assumes it needs to get streams for the entire list passed to it.
|
|
logging.info(f"Attempting Android TV asset lookup for {len(channels_list)} channels...")
|
|
processed_count = 0; channels_with_streams = []
|
|
|
|
for i, channel_info in enumerate(channels_list):
|
|
channel_id = channel_info['id']
|
|
# We no longer skip based on existing stream_url, assuming this function is called
|
|
# when we *know* we need to fetch streams via this method.
|
|
logging.debug(f"Asset Lookup: Processing {channel_id} ({channel_info['name']}) ({i+1}/{len(channels_list)})")
|
|
|
|
# 1. Get current asset ID (using Android TV endpoint for broadcast)
|
|
current_hour = datetime.now(timezone.utc).hour
|
|
broadcast_url = BROADCAST_NOW_URL_TEMPLATE.format(channel_id=channel_id, hour_num=current_hour)
|
|
logging.debug(f"Fetching broadcast info: {broadcast_url}")
|
|
# Use ANDROID_TV_HEADERS for these specific calls
|
|
broadcast_data = fetch_data(broadcast_url, is_json=True, retries=1, headers=ANDROID_TV_HEADERS)
|
|
|
|
asset_id = None
|
|
if broadcast_data and 'assets' in broadcast_data and isinstance(broadcast_data['assets'], list) and len(broadcast_data['assets']) > 0:
|
|
now_utc = datetime.now(timezone.utc); current_asset = None
|
|
for asset in broadcast_data['assets']:
|
|
start_time = parse_iso_datetime(asset.get('start')); end_time = parse_iso_datetime(asset.get('end'))
|
|
if start_time and end_time and start_time <= now_utc < end_time: current_asset = asset; break
|
|
if not current_asset and broadcast_data['assets']: current_asset = broadcast_data['assets'][0]
|
|
if current_asset: asset_id = current_asset.get('id')
|
|
if asset_id: logging.debug(f"Found current asset ID {asset_id} for channel {channel_id}")
|
|
else: logging.warning(f"Relevant asset in broadcast data for channel {channel_id} has no ID.")
|
|
else: logging.warning(f"Could not get valid broadcast data or assets for channel {channel_id} (Hour: {current_hour})"); time.sleep(API_DELAY_SECONDS); continue # Skip channel if broadcast fails
|
|
|
|
if not asset_id: logging.warning(f"No asset ID found for channel {channel_id}, cannot get stream URL."); time.sleep(API_DELAY_SECONDS); continue # Skip channel
|
|
|
|
# 2. Get asset details (using Android TV endpoint for assets)
|
|
asset_details_url = ASSET_DETAILS_URL_TEMPLATE.format(asset_id=asset_id)
|
|
logging.debug(f"Fetching asset details: {asset_details_url}")
|
|
# Use ANDROID_TV_HEADERS for these specific calls
|
|
asset_data = fetch_data(asset_details_url, is_json=True, headers=ANDROID_TV_HEADERS)
|
|
|
|
raw_stream_uri = None
|
|
if asset_data and 'providers' in asset_data and isinstance(asset_data['providers'], list):
|
|
for provider in asset_data['providers']:
|
|
if ('sources' in provider and isinstance(provider['sources'], list)):
|
|
for source in provider['sources']:
|
|
if source.get('uri') and (source.get('type') == 'application/x-mpegURL' or source.get('uri', '').endswith('.m3u8')): raw_stream_uri = source['uri']; break
|
|
elif source.get('uri') and not raw_stream_uri: raw_stream_uri = source['uri']
|
|
if raw_stream_uri: break
|
|
else: logging.warning(f"Could not find providers/sources for asset {asset_id} (Channel {channel_id})")
|
|
|
|
if not raw_stream_uri: logging.warning(f"No stream URI found in sources for asset {asset_id} (Channel {channel_id})"); time.sleep(API_DELAY_SECONDS); continue # Skip channel
|
|
|
|
# 3. Process URI
|
|
processed_stream_url = process_stream_uri(raw_stream_uri)
|
|
if processed_stream_url:
|
|
# Create a new dict or update existing one with stream_url
|
|
updated_channel_info = channel_info.copy() # Avoid modifying original list directly if passed by reference elsewhere
|
|
updated_channel_info['stream_url'] = processed_stream_url
|
|
logging.debug(f"Successfully processed stream URL for channel {channel_id} via asset lookup")
|
|
channels_with_streams.append(updated_channel_info) # Add the updated dict
|
|
processed_count += 1
|
|
else: logging.warning(f"Failed to process stream URI for asset {asset_id} (Channel {channel_id})") # Don't add if processing fails
|
|
|
|
time.sleep(API_DELAY_SECONDS)
|
|
|
|
logging.info(f"Asset lookup method obtained stream URLs for {processed_count} channels.")
|
|
return channels_with_streams # Return the new list containing only channels where stream URL was found
|
|
# <<< END OF FUNCTION ADDED BACK >>>
|
|
|
|
|
|
def fetch_epg_data(channel_list):
|
|
"""Fetches EPG data using the Valencia endpoint (10006), iterating through hours and offsets."""
|
|
if not channel_list: return {}
|
|
logging.info(f"Fetching EPG data for {len(channel_list)} channels (using Valencia EPG endpoint {PRIMARY_LIST_ID})...")
|
|
consolidated_epg = {channel['id']: [] for channel in channel_list}; assets_cache = {}
|
|
channel_ids_in_final_list = {ch['id'] for ch in channel_list}
|
|
today = datetime.now(timezone.utc); dates_to_fetch = [today + timedelta(days=d) for d in range(EPG_FETCH_DAYS)]
|
|
total_requests = 0; total_programs_fetched = 0; total_programs_added = 0
|
|
for date_obj in dates_to_fetch:
|
|
date_str = date_obj.strftime('%Y%m%d'); logging.info(f"Fetching EPG for date: {date_str}")
|
|
offset = 0
|
|
while offset <= MAX_EPG_OFFSET:
|
|
logging.debug(f"Processing EPG Offset Block: {offset}"); found_relevant_channel_in_offset_block = False
|
|
for hour in range(HOURS_TO_FETCH):
|
|
total_requests += 1
|
|
fetch_url = EPG_FETCH_URL_TEMPLATE.format(date_str=date_str, hour=hour, offset=offset)
|
|
logging.debug(f" Fetching EPG - Date={date_str}, Hour={hour}, Offset={offset} -> {fetch_url}")
|
|
page_data = fetch_data(fetch_url, is_json=True, retries=1, delay=1, headers=WEB_HEADERS)
|
|
if page_data and isinstance(page_data, dict):
|
|
if 'assets' in page_data and isinstance(page_data['assets'], dict): assets_cache.update(page_data['assets'])
|
|
if 'channels' in page_data and isinstance(page_data['channels'], list):
|
|
processed_channels_this_page = 0; programs_added_this_page = 0
|
|
for channel_schedule_data in page_data['channels']:
|
|
channel_id = str(channel_schedule_data.get('channelId'))
|
|
if channel_id in channel_ids_in_final_list:
|
|
found_relevant_channel_in_offset_block = True; processed_channels_this_page += 1
|
|
if channel_id not in consolidated_epg: consolidated_epg[channel_id] = []
|
|
for program_schedule in channel_schedule_data.get('schedule', []):
|
|
total_programs_fetched += 1; asset_id = program_schedule.get('assetId')
|
|
asset_details = assets_cache.get(asset_id)
|
|
if asset_details:
|
|
program_info = { 'start': program_schedule.get('start'), 'end': program_schedule.get('end'), 'assetId': asset_id, 'title': asset_details.get('title', 'Unknown Program'), 'descriptions': asset_details.get('descriptions',{}), 'episodeTitle': asset_details.get('episodeTitle'), }
|
|
if program_info['start'] and program_info['end']: consolidated_epg[channel_id].append(program_info); programs_added_this_page += 1; total_programs_added += 1
|
|
else: logging.warning(f"EPG: Program for asset {asset_id} on channel {channel_id} missing start/end time in schedule.")
|
|
else: logging.warning(f"EPG: Asset details not found for assetId {asset_id} on channel {channel_id} (Date={date_str}, Hour={hour}, Offset={offset})")
|
|
if programs_added_this_page > 0: logging.debug(f" Processed {processed_channels_this_page} relevant channels, added {programs_added_this_page} program entries from Hour {hour}.")
|
|
else: logging.debug(f" No 'channels' key found in response for Hour {hour}, Offset {offset}.")
|
|
else: logging.debug(f" Failed to fetch or invalid data for Hour {hour}, Offset {offset}. Skipping.")
|
|
time.sleep(API_DELAY_SECONDS / 2)
|
|
logging.debug(f"Finished processing all hours for Offset Block: {offset}")
|
|
offset += 50; time.sleep(API_DELAY_SECONDS)
|
|
logging.info(f"Finished fetching EPG data after {total_requests} requests.")
|
|
logging.info(f"Found {total_programs_fetched} raw program entries, successfully stored {total_programs_added} entries.")
|
|
logging.info("Removing duplicate program entries...")
|
|
final_epg = {}; duplicates_removed_total = 0
|
|
for channel_id, programs in consolidated_epg.items():
|
|
seen_programs = set(); unique_programs = []; duplicates_this_channel = 0
|
|
for prog in programs:
|
|
prog_key = (prog.get('start'), prog.get('assetId'))
|
|
if prog_key not in seen_programs: seen_programs.add(prog_key); unique_programs.append(prog)
|
|
else: duplicates_this_channel += 1
|
|
final_epg[channel_id] = unique_programs
|
|
if duplicates_this_channel > 0: logging.debug(f" Channel {channel_id}: Removed {duplicates_this_channel} duplicates. Final count: {len(unique_programs)}")
|
|
duplicates_removed_total += duplicates_this_channel
|
|
logging.info(f"Finished EPG processing. Total duplicates removed: {duplicates_removed_total}")
|
|
return final_epg
|
|
|
|
# --- Generate M3U and EPG XML ---
|
|
|
|
def generate_epg_xml(channel_list_with_streams, consolidated_epg_data):
|
|
logging.info("Generating EPG XML structure...")
|
|
tv_element = ET.Element('tv', attrib={'generator-info-name': f'{GITHUB_USER}-{GITHUB_REPO}'})
|
|
programme_count = 0; channel_ids_in_list = {c['id'] for c in channel_list_with_streams}
|
|
logging.debug("Adding channel elements to EPG XML...")
|
|
for channel in channel_list_with_streams:
|
|
chan_el = ET.SubElement(tv_element, 'channel', attrib={'id': channel['id']})
|
|
ET.SubElement(chan_el, 'display-name').text = channel['name'] # Only one display-name
|
|
if channel['logo']: ET.SubElement(chan_el, 'icon', attrib={'src': channel['logo']})
|
|
logging.debug("Adding programme elements to EPG XML...")
|
|
total_programs_in_data = sum(len(progs) for progs in consolidated_epg_data.values())
|
|
logging.debug(f"Total unique program entries before XML generation: {total_programs_in_data}")
|
|
for channel_id, programs in consolidated_epg_data.items():
|
|
if channel_id not in channel_ids_in_list: logging.debug(f"Skipping EPG programs for channel {channel_id} as it's not in the final M3U list."); continue
|
|
program_processed_for_channel = 0
|
|
for program in programs:
|
|
program_asset_id = program.get('assetId', 'N/A'); program_title = program.get('title', 'N/A')
|
|
try:
|
|
start_time_str = program.get('start'); end_time_str = program.get('end')
|
|
start_time = parse_iso_datetime(start_time_str); end_time = parse_iso_datetime(end_time_str)
|
|
if not start_time or not end_time: logging.warning(f" Skipping program due to failed time parsing: AssetID={program_asset_id}, Title='{program_title}'"); continue
|
|
title = program.get('title', 'Unknown Program'); desc_obj = program.get('descriptions', {})
|
|
desc = desc_obj.get('large') or desc_obj.get('medium') or desc_obj.get('small') or desc_obj.get('tiny')
|
|
episode_title = program.get('episodeTitle'); asset_id = program.get('assetId')
|
|
start_formatted = format_xmltv_time(start_time); stop_formatted = format_xmltv_time(end_time) # Use final updated formatter
|
|
if start_formatted and stop_formatted:
|
|
prog_el = ET.SubElement(tv_element, 'programme', attrib={'start': start_formatted,'stop': stop_formatted,'channel': channel_id})
|
|
ET.SubElement(prog_el, 'title', attrib={'lang': 'en'}).text = title
|
|
if desc: ET.SubElement(prog_el, 'desc', attrib={'lang': 'en'}).text = desc
|
|
if episode_title and episode_title != title: ET.SubElement(prog_el, 'sub-title', attrib={'lang': 'en'}).text = episode_title
|
|
if asset_id:
|
|
system_type = "dd_progid" if asset_id.startswith("EP") else "dd_assetid"
|
|
ET.SubElement(prog_el, 'episode-num', attrib={'system': system_type}).text = asset_id
|
|
programme_count += 1; program_processed_for_channel += 1
|
|
else: logging.warning(f" Skipping program due to invalid formatted time: AssetID={program_asset_id}, Title='{title}' (Channel: {channel_id})")
|
|
except Exception as e: logging.exception(f"Error processing EPG program item {program_asset_id} for channel {channel_id}: {e}")
|
|
logging.info(f"Generated XML with {len(channel_list_with_streams)} channels and {programme_count} programmes.")
|
|
if programme_count == 0 and total_programs_in_data > 0: logging.warning("EPG data was fetched, but no valid program entries could be added to the XML. Check time parsing/formatting issues in DEBUG logs.")
|
|
return ET.ElementTree(tv_element)
|
|
|
|
def generate_m3u_playlist(channel_list_with_streams):
|
|
logging.info("Generating M3U playlist...")
|
|
playlist_parts = [f'#EXTM3U url-tvg="{EPG_RAW_URL}"\n']
|
|
added_count = 0
|
|
def sort_key(channel):
|
|
try: num = int(channel.get('number', '999999')); return (num, channel['name'].lower())
|
|
except (ValueError, TypeError): return (999999, channel['name'].lower())
|
|
sorted_channels = sorted(channel_list_with_streams, key=sort_key)
|
|
for channel in sorted_channels:
|
|
stream_url = channel.get('stream_url'); channel_id = channel['id']; display_name = channel['name'].replace(',', ';')
|
|
group_title = channel.get('group', 'General').replace(',', ';'); tvg_name = channel['name'].replace('"', "'"); logo = channel.get("logo", "")
|
|
if stream_url:
|
|
extinf_line = f'#EXTINF:-1 tvg-id="{channel_id}" tvg-name="{tvg_name}" tvg-logo="{logo}" group-title="{group_title}",{display_name}\n'
|
|
playlist_parts.append(extinf_line); playlist_parts.append(f'{stream_url}\n'); added_count += 1
|
|
else: logging.error(f"Channel {channel_id} ('{channel['name']}') reached M3U generation without a stream URL!")
|
|
logging.info(f"Added {added_count} channels with stream URLs to M3U playlist.")
|
|
return "".join(playlist_parts)
|
|
|
|
# --- Main Execution ---
|
|
if __name__ == "__main__":
|
|
logging.info(f"--- Starting Xumo Scraper (Primary List ID: {PRIMARY_LIST_ID}) ---")
|
|
try: ensure_output_dir()
|
|
except Exception as e: logging.error(f"Halting script because output directory could not be ensured: {e}"); sys.exit(1)
|
|
|
|
primary_channel_list = None; final_channel_list_with_streams = None
|
|
|
|
logging.info(f"Step 1: Getting channel metadata via Primary List endpoint ({PRIMARY_LIST_ID})...")
|
|
primary_channel_list = get_channels_via_primary_list()
|
|
|
|
if not primary_channel_list: logging.error("Failed to get primary channel list. Aborting."); sys.exit(1)
|
|
|
|
# Step 2: Fetch Stream URLs using Asset Lookup method
|
|
logging.info(f"Step 2: Fetching stream URLs via asset lookup for {len(primary_channel_list)} channels...")
|
|
final_channel_list_with_streams = fetch_stream_urls_via_asset_lookup(primary_channel_list) # Pass the list obtained from Step 1
|
|
|
|
if not final_channel_list_with_streams:
|
|
logging.warning("No channels with stream URLs found after processing. Generating empty files.")
|
|
save_m3u(f'#EXTM3U url-tvg="{EPG_RAW_URL}"\n', os.path.join(OUTPUT_DIR, PLAYLIST_FILENAME))
|
|
empty_root = ET.Element('tv'); empty_tree = ET.ElementTree(empty_root)
|
|
save_gzipped_xml(empty_tree, os.path.join(OUTPUT_DIR, EPG_FILENAME))
|
|
logging.info("Generated empty playlist and EPG files."); sys.exit(0)
|
|
|
|
logging.info(f"Proceeding with {len(final_channel_list_with_streams)} channels confirmed to have stream URLs.")
|
|
|
|
# Step 3: Fetch EPG Data (Using new hourly method)
|
|
epg_data = fetch_epg_data(final_channel_list_with_streams)
|
|
|
|
# Step 4: Generate EPG XML
|
|
epg_tree = generate_epg_xml(final_channel_list_with_streams, epg_data)
|
|
|
|
# Step 5: Generate M3U Playlist
|
|
m3u_content = generate_m3u_playlist(final_channel_list_with_streams)
|
|
|
|
# Step 6: Save Files
|
|
save_m3u(m3u_content, os.path.join(OUTPUT_DIR, PLAYLIST_FILENAME))
|
|
save_gzipped_xml(epg_tree, os.path.join(OUTPUT_DIR, EPG_FILENAME))
|
|
|
|
logging.info("--- Xumo Scraper Finished Successfully ---")
|