114 lines
3.6 KiB
Python
114 lines
3.6 KiB
Python
import re
|
|
import base64
|
|
import json
|
|
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from fake_useragent import UserAgent
|
|
from .Provider import Provider
|
|
|
|
# Compile regex patterns once for better performance
|
|
REDIRECT_PATTERN = re.compile(r"https?://[^'\"<>]+")
|
|
B64_PATTERN = re.compile(r"var a168c='([^']+)'")
|
|
HLS_PATTERN = re.compile(r"'hls': '(?P<hls>[^']+)'")
|
|
class VOE(Provider):
|
|
|
|
def __init__(self):
|
|
self.RANDOM_USER_AGENT = UserAgent().random
|
|
self.Header = {
|
|
"User-Agent": self.RANDOM_USER_AGENT
|
|
}
|
|
def GetLink(self, embededLink: str, DEFAULT_REQUEST_TIMEOUT: int) -> (str, [str]):
|
|
self.session = requests.Session()
|
|
|
|
# Configure retries with backoff
|
|
retries = Retry(
|
|
total=5, # Number of retries
|
|
backoff_factor=1, # Delay multiplier (1s, 2s, 4s, ...)
|
|
status_forcelist=[500, 502, 503, 504], # Retry for specific HTTP errors
|
|
allowed_methods=["GET"]
|
|
)
|
|
|
|
adapter = HTTPAdapter(max_retries=retries)
|
|
self.session.mount("https://", adapter)
|
|
DEFAULT_REQUEST_TIMEOUT = 30
|
|
|
|
response = self.session.get(
|
|
embededLink,
|
|
headers={'User-Agent': self.RANDOM_USER_AGENT},
|
|
timeout=DEFAULT_REQUEST_TIMEOUT
|
|
)
|
|
|
|
redirect = re.search(r"https?://[^'\"<>]+", response.text)
|
|
if not redirect:
|
|
raise ValueError("No redirect found.")
|
|
|
|
redirect_url = redirect.group(0)
|
|
parts = redirect_url.strip().split("/")
|
|
self.Header["Referer"] = f"{parts[0]}//{parts[2]}/"
|
|
|
|
response = self.session.get(redirect_url, headers={'User-Agent': self.RANDOM_USER_AGENT})
|
|
html = response.content
|
|
|
|
|
|
# Method 1: Extract from script tag
|
|
extracted = self.extract_voe_from_script(html)
|
|
if extracted:
|
|
return extracted, self.Header
|
|
|
|
# Method 2: Extract from base64 encoded variable
|
|
htmlText = html.decode('utf-8')
|
|
b64_match = B64_PATTERN.search(htmlText)
|
|
if b64_match:
|
|
decoded = base64.b64decode(b64_match.group(1)).decode()[::-1]
|
|
source = json.loads(decoded).get("source")
|
|
if source:
|
|
return source, self.Header
|
|
|
|
# Method 3: Extract HLS source
|
|
hls_match = HLS_PATTERN.search(htmlText)
|
|
if hls_match:
|
|
return base64.b64decode(hls_match.group("hls")).decode(), self.Header
|
|
|
|
def shift_letters(self, input_str):
|
|
result = ''
|
|
for c in input_str:
|
|
code = ord(c)
|
|
if 65 <= code <= 90:
|
|
code = (code - 65 + 13) % 26 + 65
|
|
elif 97 <= code <= 122:
|
|
code = (code - 97 + 13) % 26 + 97
|
|
result += chr(code)
|
|
return result
|
|
|
|
|
|
def replace_junk(self, input_str):
|
|
junk_parts = ['@$', '^^', '~@', '%?', '*~', '!!', '#&']
|
|
for part in junk_parts:
|
|
input_str = re.sub(re.escape(part), '_', input_str)
|
|
return input_str
|
|
|
|
|
|
def shift_back(self, s, n):
|
|
return ''.join(chr(ord(c) - n) for c in s)
|
|
|
|
|
|
def decode_voe_string(self, encoded):
|
|
step1 = self.shift_letters(encoded)
|
|
step2 = self.replace_junk(step1).replace('_', '')
|
|
step3 = base64.b64decode(step2).decode()
|
|
step4 = self.shift_back(step3, 3)
|
|
step5 = base64.b64decode(step4[::-1]).decode()
|
|
return json.loads(step5)
|
|
|
|
|
|
def extract_voe_from_script(self, html):
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
script = soup.find("script", type="application/json")
|
|
return self.decode_voe_string(script.text[2:-2])["source"]
|
|
|
|
|
|
|