My-Streams/update_stream1.py
2025-08-26 15:03:33 -04:00

85 lines
2.9 KiB
Python

import requests
import os
import re
import tempfile
import shutil
from datetime import datetime
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
def update_playlist():
# Get M3U URL from environment variable
m3u_url = os.getenv('M3U_SOURCE_URL')
if not m3u_url:
raise ValueError("M3U_SOURCE_URL environment variable not set")
# Configure retry strategy
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
http = requests.Session()
http.mount("https://", adapter)
http.mount("http://", adapter)
try:
# Fetch the M3U content with timeout
response = http.get(
m3u_url,
timeout=30, # 30 seconds timeout
headers={'User-Agent': 'M3U-Playlist-Updater/1.0'}
)
response.raise_for_status() # Raise an exception for bad status codes
# Get the content and remove the original #EXTM3U header if it exists
content = response.text
if content.startswith("#EXTM3U"):
content = content[content.find('\n') + 1:]
# Add M3U header with EPG URL, timestamp, and original content
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
m3u_content = "#EXTM3U x-tvg-url=\"https://epgshare01.online/epgshare01/epg_ripper_ALL_SOURCES1.xml.gz\"\n" \
f"# Last Updated: {timestamp}\n" \
+ content
# Write to the repository root
output_filename = "stream1.m3u"
# Use the current working directory (repository root)
output_path = os.path.join(os.getcwd(), output_filename)
print(f"Writing to: {output_path}")
# Write directly to the output file
try:
with open(output_path, 'w', encoding='utf-8') as f:
f.write(m3u_content)
# Verify the file was written
if not os.path.exists(output_path):
raise Exception(f"Failed to create {output_filename}")
file_size = os.path.getsize(output_path)
if file_size == 0:
raise Exception(f"Created empty file {output_filename}")
print(f"Successfully updated {output_filename}")
print(f"File size: {file_size} bytes")
except Exception as e:
print(f"Error writing to {output_path}: {str(e)}")
if os.path.exists(output_path):
os.remove(output_path)
raise
return True # Success
except Exception as e:
print(f"Error updating M3U playlist: {str(e)}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
update_playlist()