import re import base64 import json from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry import requests from bs4 import BeautifulSoup from fake_useragent import UserAgent from src.Loaders.provider.Provider import Provider # Compile regex patterns once for better performance REDIRECT_PATTERN = re.compile(r"https?://[^'\"<>]+") B64_PATTERN = re.compile(r"var a168c='([^']+)'") HLS_PATTERN = re.compile(r"'hls': '(?P[^']+)'") class VOE(Provider): def __init__(self): self.RANDOM_USER_AGENT = UserAgent().random self.Header = { "User-Agent": self.RANDOM_USER_AGENT } def GetLink(self, embededLink: str, DEFAULT_REQUEST_TIMEOUT: int) -> (str, [str]): self.session = requests.Session() # Configure retries with backoff retries = Retry( total=5, # Number of retries backoff_factor=1, # Delay multiplier (1s, 2s, 4s, ...) status_forcelist=[500, 502, 503, 504], # Retry for specific HTTP errors allowed_methods=["GET"] ) adapter = HTTPAdapter(max_retries=retries) self.session.mount("https://", adapter) DEFAULT_REQUEST_TIMEOUT = 30 response = self.session.get( embededLink, headers={'User-Agent': self.RANDOM_USER_AGENT}, timeout=DEFAULT_REQUEST_TIMEOUT ) redirect = re.search(r"https?://[^'\"<>]+", response.text) if not redirect: raise ValueError("No redirect found.") redirect_url = redirect.group(0) parts = redirect_url.strip().split("/") self.Header["Referer"] = f"{parts[0]}//{parts[2]}/" response = self.session.get(redirect_url, headers={'User-Agent': self.RANDOM_USER_AGENT}) html = response.content # Method 1: Extract from script tag extracted = self.extract_voe_from_script(html) if extracted: return extracted, self.Header # Method 2: Extract from base64 encoded variable htmlText = html.decode('utf-8') b64_match = B64_PATTERN.search(htmlText) if b64_match: decoded = base64.b64decode(b64_match.group(1)).decode()[::-1] source = json.loads(decoded).get("source") if source: return source, self.Header # Method 3: Extract HLS source hls_match = HLS_PATTERN.search(htmlText) if hls_match: return base64.b64decode(hls_match.group("hls")).decode(), self.Header def shift_letters(self, input_str): result = '' for c in input_str: code = ord(c) if 65 <= code <= 90: code = (code - 65 + 13) % 26 + 65 elif 97 <= code <= 122: code = (code - 97 + 13) % 26 + 97 result += chr(code) return result def replace_junk(self, input_str): junk_parts = ['@$', '^^', '~@', '%?', '*~', '!!', '#&'] for part in junk_parts: input_str = re.sub(re.escape(part), '_', input_str) return input_str def shift_back(self, s, n): return ''.join(chr(ord(c) - n) for c in s) def decode_voe_string(self, encoded): step1 = self.shift_letters(encoded) step2 = self.replace_junk(step1).replace('_', '') step3 = base64.b64decode(step2).decode() step4 = self.shift_back(step3, 3) step5 = base64.b64decode(step4[::-1]).decode() return json.loads(step5) def extract_voe_from_script(self, html): soup = BeautifulSoup(html, "html.parser") script = soup.find("script", type="application/json") return self.decode_voe_string(script.text[2:-2])["source"]