refactor: restructure core→server, split large entity files into database module
- Move src/core/ → src/server/ - Split SerieList.py (531 lines) and series.py (414 lines) into src/server/database/ - Add database/models.py for SQLAlchemy models - Update all test imports to reflect new structure - Remove deprecated test files (test_serie_class.py, test_serie_folder_with_year.py)
This commit is contained in:
139
src/server/providers/streaming/voe.py
Normal file
139
src/server/providers/streaming/voe.py
Normal file
@@ -0,0 +1,139 @@
|
||||
import base64
|
||||
import json
|
||||
import re
|
||||
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from fake_useragent import UserAgent
|
||||
from requests.adapters import HTTPAdapter
|
||||
from urllib3.util.retry import Retry
|
||||
|
||||
from .Provider import Provider
|
||||
|
||||
# Precompile the different pattern matchers used during extraction:
|
||||
# - REDIRECT_PATTERN pulls the intermediate redirect URL from the bootstrap
|
||||
# script so we can follow the provider's hand-off.
|
||||
# - B64_PATTERN isolates the base64 encoded payload containing the ``source``
|
||||
# field once decoded.
|
||||
# - HLS_PATTERN captures the base64 encoded HLS manifest for fallback when
|
||||
# no direct MP4 link is present.
|
||||
REDIRECT_PATTERN = re.compile(r"https?://[^'\"<>]+")
|
||||
B64_PATTERN = re.compile(r"var a168c='([^']+)'")
|
||||
HLS_PATTERN = re.compile(r"'hls': '(?P<hls>[^']+)'")
|
||||
|
||||
|
||||
class VOE(Provider):
|
||||
"""VOE video provider implementation."""
|
||||
|
||||
def __init__(self):
|
||||
self.RANDOM_USER_AGENT = UserAgent().random
|
||||
self.Header = {"User-Agent": self.RANDOM_USER_AGENT}
|
||||
|
||||
def get_link(
|
||||
self, embedded_link: str, timeout: int
|
||||
) -> tuple[str, dict]:
|
||||
"""
|
||||
Extract direct download link from VOE embedded player.
|
||||
|
||||
Args:
|
||||
embedded_link: URL of the embedded VOE player
|
||||
timeout: Request timeout in seconds
|
||||
|
||||
Returns:
|
||||
Tuple of (direct_link, headers)
|
||||
"""
|
||||
self.session = requests.Session()
|
||||
|
||||
# Configure retries with backoff
|
||||
retries = Retry(
|
||||
total=5, # Number of retries
|
||||
backoff_factor=1, # Delay multiplier (1s, 2s, 4s, ...)
|
||||
status_forcelist=[500, 502, 503, 504],
|
||||
allowed_methods=["GET"],
|
||||
)
|
||||
|
||||
adapter = HTTPAdapter(max_retries=retries)
|
||||
self.session.mount("https://", adapter)
|
||||
timeout = 30
|
||||
|
||||
response = self.session.get(
|
||||
embedded_link,
|
||||
headers={"User-Agent": self.RANDOM_USER_AGENT},
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
redirect = re.search(r"https?://[^'\"<>]+", response.text)
|
||||
if not redirect:
|
||||
raise ValueError("No redirect found.")
|
||||
|
||||
redirect_url = redirect.group(0)
|
||||
parts = redirect_url.strip().split("/")
|
||||
self.Header["Referer"] = f"{parts[0]}//{parts[2]}/"
|
||||
|
||||
response = self.session.get(
|
||||
redirect_url, headers={"User-Agent": self.RANDOM_USER_AGENT}
|
||||
)
|
||||
html = response.content
|
||||
|
||||
# Method 1: Extract from script tag
|
||||
extracted = self.extract_voe_from_script(html)
|
||||
if extracted:
|
||||
return extracted, self.Header
|
||||
|
||||
# Method 2: Extract from base64 encoded variable
|
||||
htmlText = html.decode("utf-8")
|
||||
b64_match = B64_PATTERN.search(htmlText)
|
||||
if b64_match:
|
||||
decoded = base64.b64decode(b64_match.group(1)).decode()[::-1]
|
||||
source = json.loads(decoded).get("source")
|
||||
if source:
|
||||
return source, self.Header
|
||||
|
||||
# Method 3: Extract HLS source
|
||||
hls_match = HLS_PATTERN.search(htmlText)
|
||||
if hls_match:
|
||||
decoded_hls = base64.b64decode(hls_match.group("hls")).decode()
|
||||
return decoded_hls, self.Header
|
||||
|
||||
raise ValueError("Could not extract download link from VOE")
|
||||
|
||||
def shift_letters(self, input_str: str) -> str:
|
||||
"""Apply ROT13 shift to letters."""
|
||||
result = ""
|
||||
for c in input_str:
|
||||
code = ord(c)
|
||||
if 65 <= code <= 90:
|
||||
code = (code - 65 + 13) % 26 + 65
|
||||
elif 97 <= code <= 122:
|
||||
code = (code - 97 + 13) % 26 + 97
|
||||
result += chr(code)
|
||||
return result
|
||||
|
||||
def replace_junk(self, input_str: str) -> str:
|
||||
"""Replace junk character sequences."""
|
||||
junk_parts = ["@$", "^^", "~@", "%?", "*~", "!!", "#&"]
|
||||
for part in junk_parts:
|
||||
input_str = re.sub(re.escape(part), "_", input_str)
|
||||
return input_str
|
||||
|
||||
def shift_back(self, s: str, n: int) -> str:
|
||||
"""Shift characters back by n positions."""
|
||||
return "".join(chr(ord(c) - n) for c in s)
|
||||
|
||||
def decode_voe_string(self, encoded: str) -> dict:
|
||||
"""Decode VOE-encoded string to extract video source."""
|
||||
step1 = self.shift_letters(encoded)
|
||||
step2 = self.replace_junk(step1).replace("_", "")
|
||||
step3 = base64.b64decode(step2).decode()
|
||||
step4 = self.shift_back(step3, 3)
|
||||
step5 = base64.b64decode(step4[::-1]).decode()
|
||||
return json.loads(step5)
|
||||
|
||||
def extract_voe_from_script(self, html: bytes) -> str:
|
||||
"""Extract download link from VOE script tag."""
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
script = soup.find("script", type="application/json")
|
||||
return self.decode_voe_string(script.text[2:-2])["source"]
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user