- Move src/core/ → src/server/ - Split SerieList.py (531 lines) and series.py (414 lines) into src/server/database/ - Add database/models.py for SQLAlchemy models - Update all test imports to reflect new structure - Remove deprecated test files (test_serie_class.py, test_serie_folder_with_year.py)
140 lines
4.8 KiB
Python
140 lines
4.8 KiB
Python
import base64
|
|
import json
|
|
import re
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from fake_useragent import UserAgent
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
|
|
from .Provider import Provider
|
|
|
|
# Precompile the different pattern matchers used during extraction:
|
|
# - REDIRECT_PATTERN pulls the intermediate redirect URL from the bootstrap
|
|
# script so we can follow the provider's hand-off.
|
|
# - B64_PATTERN isolates the base64 encoded payload containing the ``source``
|
|
# field once decoded.
|
|
# - HLS_PATTERN captures the base64 encoded HLS manifest for fallback when
|
|
# no direct MP4 link is present.
|
|
REDIRECT_PATTERN = re.compile(r"https?://[^'\"<>]+")
|
|
B64_PATTERN = re.compile(r"var a168c='([^']+)'")
|
|
HLS_PATTERN = re.compile(r"'hls': '(?P<hls>[^']+)'")
|
|
|
|
|
|
class VOE(Provider):
|
|
"""VOE video provider implementation."""
|
|
|
|
def __init__(self):
|
|
self.RANDOM_USER_AGENT = UserAgent().random
|
|
self.Header = {"User-Agent": self.RANDOM_USER_AGENT}
|
|
|
|
def get_link(
|
|
self, embedded_link: str, timeout: int
|
|
) -> tuple[str, dict]:
|
|
"""
|
|
Extract direct download link from VOE embedded player.
|
|
|
|
Args:
|
|
embedded_link: URL of the embedded VOE player
|
|
timeout: Request timeout in seconds
|
|
|
|
Returns:
|
|
Tuple of (direct_link, headers)
|
|
"""
|
|
self.session = requests.Session()
|
|
|
|
# Configure retries with backoff
|
|
retries = Retry(
|
|
total=5, # Number of retries
|
|
backoff_factor=1, # Delay multiplier (1s, 2s, 4s, ...)
|
|
status_forcelist=[500, 502, 503, 504],
|
|
allowed_methods=["GET"],
|
|
)
|
|
|
|
adapter = HTTPAdapter(max_retries=retries)
|
|
self.session.mount("https://", adapter)
|
|
timeout = 30
|
|
|
|
response = self.session.get(
|
|
embedded_link,
|
|
headers={"User-Agent": self.RANDOM_USER_AGENT},
|
|
timeout=timeout,
|
|
)
|
|
|
|
redirect = re.search(r"https?://[^'\"<>]+", response.text)
|
|
if not redirect:
|
|
raise ValueError("No redirect found.")
|
|
|
|
redirect_url = redirect.group(0)
|
|
parts = redirect_url.strip().split("/")
|
|
self.Header["Referer"] = f"{parts[0]}//{parts[2]}/"
|
|
|
|
response = self.session.get(
|
|
redirect_url, headers={"User-Agent": self.RANDOM_USER_AGENT}
|
|
)
|
|
html = response.content
|
|
|
|
# Method 1: Extract from script tag
|
|
extracted = self.extract_voe_from_script(html)
|
|
if extracted:
|
|
return extracted, self.Header
|
|
|
|
# Method 2: Extract from base64 encoded variable
|
|
htmlText = html.decode("utf-8")
|
|
b64_match = B64_PATTERN.search(htmlText)
|
|
if b64_match:
|
|
decoded = base64.b64decode(b64_match.group(1)).decode()[::-1]
|
|
source = json.loads(decoded).get("source")
|
|
if source:
|
|
return source, self.Header
|
|
|
|
# Method 3: Extract HLS source
|
|
hls_match = HLS_PATTERN.search(htmlText)
|
|
if hls_match:
|
|
decoded_hls = base64.b64decode(hls_match.group("hls")).decode()
|
|
return decoded_hls, self.Header
|
|
|
|
raise ValueError("Could not extract download link from VOE")
|
|
|
|
def shift_letters(self, input_str: str) -> str:
|
|
"""Apply ROT13 shift to letters."""
|
|
result = ""
|
|
for c in input_str:
|
|
code = ord(c)
|
|
if 65 <= code <= 90:
|
|
code = (code - 65 + 13) % 26 + 65
|
|
elif 97 <= code <= 122:
|
|
code = (code - 97 + 13) % 26 + 97
|
|
result += chr(code)
|
|
return result
|
|
|
|
def replace_junk(self, input_str: str) -> str:
|
|
"""Replace junk character sequences."""
|
|
junk_parts = ["@$", "^^", "~@", "%?", "*~", "!!", "#&"]
|
|
for part in junk_parts:
|
|
input_str = re.sub(re.escape(part), "_", input_str)
|
|
return input_str
|
|
|
|
def shift_back(self, s: str, n: int) -> str:
|
|
"""Shift characters back by n positions."""
|
|
return "".join(chr(ord(c) - n) for c in s)
|
|
|
|
def decode_voe_string(self, encoded: str) -> dict:
|
|
"""Decode VOE-encoded string to extract video source."""
|
|
step1 = self.shift_letters(encoded)
|
|
step2 = self.replace_junk(step1).replace("_", "")
|
|
step3 = base64.b64decode(step2).decode()
|
|
step4 = self.shift_back(step3, 3)
|
|
step5 = base64.b64decode(step4[::-1]).decode()
|
|
return json.loads(step5)
|
|
|
|
def extract_voe_from_script(self, html: bytes) -> str:
|
|
"""Extract download link from VOE script tag."""
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
script = soup.find("script", type="application/json")
|
|
return self.decode_voe_string(script.text[2:-2])["source"]
|
|
|
|
|
|
|