This commit is contained in:
2025-10-12 18:05:31 +02:00
parent 57d49bcf78
commit 7a71715183
130 changed files with 30010 additions and 50631 deletions

View File

@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
class Provider(ABC):
@abstractmethod
def GetLink(self, embededLink: str, DEFAULT_REQUEST_TIMEOUT: int) -> (str, [str]):
pass
from abc import ABC, abstractmethod
class Provider(ABC):
@abstractmethod
def GetLink(self, embededLink: str, DEFAULT_REQUEST_TIMEOUT: int) -> (str, [str]):
pass

View File

@@ -1,59 +1,59 @@
import re
import random
import time
from fake_useragent import UserAgent
import requests
from .Provider import Provider
class Doodstream(Provider):
def __init__(self):
self.RANDOM_USER_AGENT = UserAgent().random
def GetLink(self, embededLink: str, DEFAULT_REQUEST_TIMEOUT: int) -> str:
headers = {
'User-Agent': self.RANDOM_USER_AGENT,
'Referer': 'https://dood.li/'
}
def extract_data(pattern, content):
match = re.search(pattern, content)
return match.group(1) if match else None
def generate_random_string(length=10):
characters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
return ''.join(random.choice(characters) for _ in range(length))
response = requests.get(
embededLink,
headers=headers,
timeout=DEFAULT_REQUEST_TIMEOUT,
verify=False
)
response.raise_for_status()
pass_md5_pattern = r"\$\.get\('([^']*\/pass_md5\/[^']*)'"
pass_md5_url = extract_data(pass_md5_pattern, response.text)
if not pass_md5_url:
raise ValueError(
f'pass_md5 URL not found using {embededLink}.')
full_md5_url = f"https://dood.li{pass_md5_url}"
token_pattern = r"token=([a-zA-Z0-9]+)"
token = extract_data(token_pattern, response.text)
if not token:
raise ValueError(f'Token not found using {embededLink}.')
md5_response = requests.get(
full_md5_url, headers=headers, timeout=DEFAULT_REQUEST_TIMEOUT, verify=False)
md5_response.raise_for_status()
video_base_url = md5_response.text.strip()
random_string = generate_random_string(10)
expiry = int(time.time())
direct_link = f"{video_base_url}{random_string}?token={token}&expiry={expiry}"
# print(direct_link)
import re
import random
import time
from fake_useragent import UserAgent
import requests
from .Provider import Provider
class Doodstream(Provider):
def __init__(self):
self.RANDOM_USER_AGENT = UserAgent().random
def GetLink(self, embededLink: str, DEFAULT_REQUEST_TIMEOUT: int) -> str:
headers = {
'User-Agent': self.RANDOM_USER_AGENT,
'Referer': 'https://dood.li/'
}
def extract_data(pattern, content):
match = re.search(pattern, content)
return match.group(1) if match else None
def generate_random_string(length=10):
characters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
return ''.join(random.choice(characters) for _ in range(length))
response = requests.get(
embededLink,
headers=headers,
timeout=DEFAULT_REQUEST_TIMEOUT,
verify=False
)
response.raise_for_status()
pass_md5_pattern = r"\$\.get\('([^']*\/pass_md5\/[^']*)'"
pass_md5_url = extract_data(pass_md5_pattern, response.text)
if not pass_md5_url:
raise ValueError(
f'pass_md5 URL not found using {embededLink}.')
full_md5_url = f"https://dood.li{pass_md5_url}"
token_pattern = r"token=([a-zA-Z0-9]+)"
token = extract_data(token_pattern, response.text)
if not token:
raise ValueError(f'Token not found using {embededLink}.')
md5_response = requests.get(
full_md5_url, headers=headers, timeout=DEFAULT_REQUEST_TIMEOUT, verify=False)
md5_response.raise_for_status()
video_base_url = md5_response.text.strip()
random_string = generate_random_string(10)
expiry = int(time.time())
direct_link = f"{video_base_url}{random_string}?token={token}&expiry={expiry}"
# print(direct_link)
return direct_link

View File

@@ -1,51 +1,51 @@
import re
import requests
# import jsbeautifier.unpackers.packer as packer
from aniworld import config
REDIRECT_REGEX = re.compile(
r'<iframe *(?:[^>]+ )?src=(?:\'([^\']+)\'|"([^"]+)")[^>]*>')
SCRIPT_REGEX = re.compile(
r'(?s)<script\s+[^>]*?data-cfasync=["\']?false["\']?[^>]*>(.+?)</script>')
VIDEO_URL_REGEX = re.compile(r'file:\s*"([^"]+\.m3u8[^"]*)"')
# TODO Implement this script fully
def get_direct_link_from_filemoon(embeded_filemoon_link: str):
session = requests.Session()
session.verify = False
headers = {
"User-Agent": config.RANDOM_USER_AGENT,
"Referer": embeded_filemoon_link,
}
response = session.get(embeded_filemoon_link, headers=headers)
source = response.text
match = REDIRECT_REGEX.search(source)
if match:
redirect_url = match.group(1) or match.group(2)
response = session.get(redirect_url, headers=headers)
source = response.text
for script_match in SCRIPT_REGEX.finditer(source):
script_content = script_match.group(1).strip()
if not script_content.startswith("eval("):
continue
if packer.detect(script_content):
unpacked = packer.unpack(script_content)
video_match = VIDEO_URL_REGEX.search(unpacked)
if video_match:
return video_match.group(1)
raise Exception("No Video link found!")
if __name__ == '__main__':
url = input("Enter Filemoon Link: ")
print(get_direct_link_from_filemoon(url))
import re
import requests
# import jsbeautifier.unpackers.packer as packer
from aniworld import config
REDIRECT_REGEX = re.compile(
r'<iframe *(?:[^>]+ )?src=(?:\'([^\']+)\'|"([^"]+)")[^>]*>')
SCRIPT_REGEX = re.compile(
r'(?s)<script\s+[^>]*?data-cfasync=["\']?false["\']?[^>]*>(.+?)</script>')
VIDEO_URL_REGEX = re.compile(r'file:\s*"([^"]+\.m3u8[^"]*)"')
# TODO Implement this script fully
def get_direct_link_from_filemoon(embeded_filemoon_link: str):
session = requests.Session()
session.verify = False
headers = {
"User-Agent": config.RANDOM_USER_AGENT,
"Referer": embeded_filemoon_link,
}
response = session.get(embeded_filemoon_link, headers=headers)
source = response.text
match = REDIRECT_REGEX.search(source)
if match:
redirect_url = match.group(1) or match.group(2)
response = session.get(redirect_url, headers=headers)
source = response.text
for script_match in SCRIPT_REGEX.finditer(source):
script_content = script_match.group(1).strip()
if not script_content.startswith("eval("):
continue
if packer.detect(script_content):
unpacked = packer.unpack(script_content)
video_match = VIDEO_URL_REGEX.search(unpacked)
if video_match:
return video_match.group(1)
raise Exception("No Video link found!")
if __name__ == '__main__':
url = input("Enter Filemoon Link: ")
print(get_direct_link_from_filemoon(url))

View File

@@ -1,90 +1,90 @@
import re
import json
import sys
import requests
from aniworld.config import DEFAULT_REQUEST_TIMEOUT
def fetch_page_content(url):
try:
response = requests.get(url, timeout=DEFAULT_REQUEST_TIMEOUT)
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
print(f"Failed to fetch the page content: {e}")
return None
def extract_video_data(page_content):
match = re.search(r'^.*videos_manifest.*$', page_content, re.MULTILINE)
if not match:
raise ValueError("Failed to extract video manifest from the response.")
json_str = match.group(0)[match.group(0).find(
'{'):match.group(0).rfind('}') + 1]
return json.loads(json_str)
def get_streams(url):
page_content = fetch_page_content(url)
data = extract_video_data(page_content)
video_info = data['state']['data']['video']
name = video_info['hentai_video']['name']
streams = video_info['videos_manifest']['servers'][0]['streams']
return {"name": name, "streams": streams}
def display_streams(streams):
if not streams:
print("No streams available.")
return
print("Available qualities:")
for i, stream in enumerate(streams, 1):
premium_tag = "(Premium)" if not stream['is_guest_allowed'] else ""
print(
f"{i}. {stream['width']}x{stream['height']}\t"
f"({stream['filesize_mbs']}MB) {premium_tag}")
def get_user_selection(streams):
try:
selected_index = int(input("Select a stream: ").strip()) - 1
if 0 <= selected_index < len(streams):
return selected_index
print("Invalid selection.")
return None
except ValueError:
print("Invalid input.")
return None
def get_direct_link_from_hanime(url=None):
try:
if url is None:
if len(sys.argv) > 1:
url = sys.argv[1]
else:
url = input("Please enter the hanime.tv video URL: ").strip()
try:
video_data = get_streams(url)
print(f"Video: {video_data['name']}")
print('*' * 40)
display_streams(video_data['streams'])
selected_index = None
while selected_index is None:
selected_index = get_user_selection(video_data['streams'])
print(f"M3U8 URL: {video_data['streams'][selected_index]['url']}")
except ValueError as e:
print(f"Error: {e}")
except KeyboardInterrupt:
pass
if __name__ == "__main__":
get_direct_link_from_hanime()
import re
import json
import sys
import requests
from aniworld.config import DEFAULT_REQUEST_TIMEOUT
def fetch_page_content(url):
try:
response = requests.get(url, timeout=DEFAULT_REQUEST_TIMEOUT)
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
print(f"Failed to fetch the page content: {e}")
return None
def extract_video_data(page_content):
match = re.search(r'^.*videos_manifest.*$', page_content, re.MULTILINE)
if not match:
raise ValueError("Failed to extract video manifest from the response.")
json_str = match.group(0)[match.group(0).find(
'{'):match.group(0).rfind('}') + 1]
return json.loads(json_str)
def get_streams(url):
page_content = fetch_page_content(url)
data = extract_video_data(page_content)
video_info = data['state']['data']['video']
name = video_info['hentai_video']['name']
streams = video_info['videos_manifest']['servers'][0]['streams']
return {"name": name, "streams": streams}
def display_streams(streams):
if not streams:
print("No streams available.")
return
print("Available qualities:")
for i, stream in enumerate(streams, 1):
premium_tag = "(Premium)" if not stream['is_guest_allowed'] else ""
print(
f"{i}. {stream['width']}x{stream['height']}\t"
f"({stream['filesize_mbs']}MB) {premium_tag}")
def get_user_selection(streams):
try:
selected_index = int(input("Select a stream: ").strip()) - 1
if 0 <= selected_index < len(streams):
return selected_index
print("Invalid selection.")
return None
except ValueError:
print("Invalid input.")
return None
def get_direct_link_from_hanime(url=None):
try:
if url is None:
if len(sys.argv) > 1:
url = sys.argv[1]
else:
url = input("Please enter the hanime.tv video URL: ").strip()
try:
video_data = get_streams(url)
print(f"Video: {video_data['name']}")
print('*' * 40)
display_streams(video_data['streams'])
selected_index = None
while selected_index is None:
selected_index = get_user_selection(video_data['streams'])
print(f"M3U8 URL: {video_data['streams'][selected_index]['url']}")
except ValueError as e:
print(f"Error: {e}")
except KeyboardInterrupt:
pass
if __name__ == "__main__":
get_direct_link_from_hanime()

View File

@@ -1,35 +1,35 @@
import requests
import json
from urllib.parse import urlparse
# TODO Doesn't work on download yet and has to be implemented
def get_direct_link_from_loadx(embeded_loadx_link: str):
response = requests.head(
embeded_loadx_link, allow_redirects=True, verify=False)
parsed_url = urlparse(response.url)
path_parts = parsed_url.path.split("/")
if len(path_parts) < 3:
raise ValueError("Invalid path!")
id_hash = path_parts[2]
host = parsed_url.netloc
post_url = f"https://{host}/player/index.php?data={id_hash}&do=getVideo"
headers = {"X-Requested-With": "XMLHttpRequest"}
response = requests.post(post_url, headers=headers, verify=False)
data = json.loads(response.text)
print(data)
video_url = data.get("videoSource")
if not video_url:
raise ValueError("No Video link found!")
return video_url
if __name__ == '__main__':
url = input("Enter Loadx Link: ")
print(get_direct_link_from_loadx(url))
import requests
import json
from urllib.parse import urlparse
# TODO Doesn't work on download yet and has to be implemented
def get_direct_link_from_loadx(embeded_loadx_link: str):
response = requests.head(
embeded_loadx_link, allow_redirects=True, verify=False)
parsed_url = urlparse(response.url)
path_parts = parsed_url.path.split("/")
if len(path_parts) < 3:
raise ValueError("Invalid path!")
id_hash = path_parts[2]
host = parsed_url.netloc
post_url = f"https://{host}/player/index.php?data={id_hash}&do=getVideo"
headers = {"X-Requested-With": "XMLHttpRequest"}
response = requests.post(post_url, headers=headers, verify=False)
data = json.loads(response.text)
print(data)
video_url = data.get("videoSource")
if not video_url:
raise ValueError("No Video link found!")
return video_url
if __name__ == '__main__':
url = input("Enter Loadx Link: ")
print(get_direct_link_from_loadx(url))

View File

@@ -1,39 +1,39 @@
import re
import requests
from aniworld import config
def get_direct_link_from_luluvdo(embeded_luluvdo_link, arguments=None):
luluvdo_id = embeded_luluvdo_link.split('/')[-1]
filelink = (
f"https://luluvdo.com/dl?op=embed&file_code={luluvdo_id}&embed=1&referer=luluvdo.com&adb=0"
)
# The User-Agent needs to be the same as the direct-link ones to work
headers = {
"Origin": "https://luluvdo.com",
"Referer": "https://luluvdo.com/",
"User-Agent": config.LULUVDO_USER_AGENT
}
if arguments.action == "Download":
headers["Accept-Language"] = "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7"
response = requests.get(filelink, headers=headers,
timeout=config.DEFAULT_REQUEST_TIMEOUT)
if response.status_code == 200:
pattern = r'file:\s*"([^"]+)"'
matches = re.findall(pattern, str(response.text))
if matches:
return matches[0]
raise ValueError("No match found")
if __name__ == '__main__':
url = input("Enter Luluvdo Link: ")
print(get_direct_link_from_luluvdo(url))
import re
import requests
from aniworld import config
def get_direct_link_from_luluvdo(embeded_luluvdo_link, arguments=None):
luluvdo_id = embeded_luluvdo_link.split('/')[-1]
filelink = (
f"https://luluvdo.com/dl?op=embed&file_code={luluvdo_id}&embed=1&referer=luluvdo.com&adb=0"
)
# The User-Agent needs to be the same as the direct-link ones to work
headers = {
"Origin": "https://luluvdo.com",
"Referer": "https://luluvdo.com/",
"User-Agent": config.LULUVDO_USER_AGENT
}
if arguments.action == "Download":
headers["Accept-Language"] = "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7"
response = requests.get(filelink, headers=headers,
timeout=config.DEFAULT_REQUEST_TIMEOUT)
if response.status_code == 200:
pattern = r'file:\s*"([^"]+)"'
matches = re.findall(pattern, str(response.text))
if matches:
return matches[0]
raise ValueError("No match found")
if __name__ == '__main__':
url = input("Enter Luluvdo Link: ")
print(get_direct_link_from_luluvdo(url))

View File

@@ -1,43 +1,43 @@
import re
import base64
import requests
from aniworld.config import DEFAULT_REQUEST_TIMEOUT, RANDOM_USER_AGENT
SPEEDFILES_PATTERN = re.compile(r'var _0x5opu234 = "(?P<encoded_data>.*?)";')
def get_direct_link_from_speedfiles(embeded_speedfiles_link):
response = requests.get(
embeded_speedfiles_link,
timeout=DEFAULT_REQUEST_TIMEOUT,
headers={'User-Agent': RANDOM_USER_AGENT}
)
if "<span class=\"inline-block\">Web server is down</span>" in response.text:
raise ValueError(
"The SpeedFiles server is currently down.\n"
"Please try again later or choose a different hoster."
)
match = SPEEDFILES_PATTERN.search(response.text)
if not match:
raise ValueError("Pattern not found in the response.")
encoded_data = match.group("encoded_data")
decoded = base64.b64decode(encoded_data).decode()
decoded = decoded.swapcase()[::-1]
decoded = base64.b64decode(decoded).decode()[::-1]
decoded_hex = ''.join(chr(int(decoded[i:i + 2], 16))
for i in range(0, len(decoded), 2))
shifted = ''.join(chr(ord(char) - 3) for char in decoded_hex)
result = base64.b64decode(shifted.swapcase()[::-1]).decode()
return result
if __name__ == '__main__':
speedfiles_link = input("Enter Speedfiles Link: ")
print(get_direct_link_from_speedfiles(
embeded_speedfiles_link=speedfiles_link))
import re
import base64
import requests
from aniworld.config import DEFAULT_REQUEST_TIMEOUT, RANDOM_USER_AGENT
SPEEDFILES_PATTERN = re.compile(r'var _0x5opu234 = "(?P<encoded_data>.*?)";')
def get_direct_link_from_speedfiles(embeded_speedfiles_link):
response = requests.get(
embeded_speedfiles_link,
timeout=DEFAULT_REQUEST_TIMEOUT,
headers={'User-Agent': RANDOM_USER_AGENT}
)
if "<span class=\"inline-block\">Web server is down</span>" in response.text:
raise ValueError(
"The SpeedFiles server is currently down.\n"
"Please try again later or choose a different hoster."
)
match = SPEEDFILES_PATTERN.search(response.text)
if not match:
raise ValueError("Pattern not found in the response.")
encoded_data = match.group("encoded_data")
decoded = base64.b64decode(encoded_data).decode()
decoded = decoded.swapcase()[::-1]
decoded = base64.b64decode(decoded).decode()[::-1]
decoded_hex = ''.join(chr(int(decoded[i:i + 2], 16))
for i in range(0, len(decoded), 2))
shifted = ''.join(chr(ord(char) - 3) for char in decoded_hex)
result = base64.b64decode(shifted.swapcase()[::-1]).decode()
return result
if __name__ == '__main__':
speedfiles_link = input("Enter Speedfiles Link: ")
print(get_direct_link_from_speedfiles(
embeded_speedfiles_link=speedfiles_link))

View File

@@ -1,2 +1,2 @@
def get_direct_link_from_streamtape(embeded_streamtape_link: str) -> str:
pass
def get_direct_link_from_streamtape(embeded_streamtape_link: str) -> str:
pass

View File

@@ -1,34 +1,34 @@
import re
import requests
from bs4 import BeautifulSoup
from aniworld.config import DEFAULT_REQUEST_TIMEOUT, RANDOM_USER_AGENT
def get_direct_link_from_vidmoly(embeded_vidmoly_link: str):
response = requests.get(
embeded_vidmoly_link,
headers={'User-Agent': RANDOM_USER_AGENT},
timeout=DEFAULT_REQUEST_TIMEOUT
)
html_content = response.text
soup = BeautifulSoup(html_content, 'html.parser')
scripts = soup.find_all('script')
file_link_pattern = r'file:\s*"(https?://.*?)"'
for script in scripts:
if script.string:
match = re.search(file_link_pattern, script.string)
if match:
file_link = match.group(1)
return file_link
raise ValueError("No direct link found.")
if __name__ == '__main__':
link = input("Enter Vidmoly Link: ")
print('Note: --referer "https://vidmoly.to"')
print(get_direct_link_from_vidmoly(embeded_vidmoly_link=link))
import re
import requests
from bs4 import BeautifulSoup
from aniworld.config import DEFAULT_REQUEST_TIMEOUT, RANDOM_USER_AGENT
def get_direct_link_from_vidmoly(embeded_vidmoly_link: str):
response = requests.get(
embeded_vidmoly_link,
headers={'User-Agent': RANDOM_USER_AGENT},
timeout=DEFAULT_REQUEST_TIMEOUT
)
html_content = response.text
soup = BeautifulSoup(html_content, 'html.parser')
scripts = soup.find_all('script')
file_link_pattern = r'file:\s*"(https?://.*?)"'
for script in scripts:
if script.string:
match = re.search(file_link_pattern, script.string)
if match:
file_link = match.group(1)
return file_link
raise ValueError("No direct link found.")
if __name__ == '__main__':
link = input("Enter Vidmoly Link: ")
print('Note: --referer "https://vidmoly.to"')
print(get_direct_link_from_vidmoly(embeded_vidmoly_link=link))

View File

@@ -1,29 +1,29 @@
import re
import requests
from bs4 import BeautifulSoup
from aniworld.config import DEFAULT_REQUEST_TIMEOUT, RANDOM_USER_AGENT
def get_direct_link_from_vidoza(embeded_vidoza_link: str) -> str:
response = requests.get(
embeded_vidoza_link,
headers={'User-Agent': RANDOM_USER_AGENT},
timeout=DEFAULT_REQUEST_TIMEOUT
)
soup = BeautifulSoup(response.content, "html.parser")
for tag in soup.find_all('script'):
if 'sourcesCode:' in tag.text:
match = re.search(r'src: "(.*?)"', tag.text)
if match:
return match.group(1)
raise ValueError("No direct link found.")
if __name__ == '__main__':
link = input("Enter Vidoza Link: ")
print(get_direct_link_from_vidoza(embeded_vidoza_link=link))
import re
import requests
from bs4 import BeautifulSoup
from aniworld.config import DEFAULT_REQUEST_TIMEOUT, RANDOM_USER_AGENT
def get_direct_link_from_vidoza(embeded_vidoza_link: str) -> str:
response = requests.get(
embeded_vidoza_link,
headers={'User-Agent': RANDOM_USER_AGENT},
timeout=DEFAULT_REQUEST_TIMEOUT
)
soup = BeautifulSoup(response.content, "html.parser")
for tag in soup.find_all('script'):
if 'sourcesCode:' in tag.text:
match = re.search(r'src: "(.*?)"', tag.text)
if match:
return match.group(1)
raise ValueError("No direct link found.")
if __name__ == '__main__':
link = input("Enter Vidoza Link: ")
print(get_direct_link_from_vidoza(embeded_vidoza_link=link))

View File

@@ -1,113 +1,113 @@
import re
import base64
import json
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import requests
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from .Provider import Provider
# Compile regex patterns once for better performance
REDIRECT_PATTERN = re.compile(r"https?://[^'\"<>]+")
B64_PATTERN = re.compile(r"var a168c='([^']+)'")
HLS_PATTERN = re.compile(r"'hls': '(?P<hls>[^']+)'")
class VOE(Provider):
def __init__(self):
self.RANDOM_USER_AGENT = UserAgent().random
self.Header = {
"User-Agent": self.RANDOM_USER_AGENT
}
def GetLink(self, embededLink: str, DEFAULT_REQUEST_TIMEOUT: int) -> (str, [str]):
self.session = requests.Session()
# Configure retries with backoff
retries = Retry(
total=5, # Number of retries
backoff_factor=1, # Delay multiplier (1s, 2s, 4s, ...)
status_forcelist=[500, 502, 503, 504], # Retry for specific HTTP errors
allowed_methods=["GET"]
)
adapter = HTTPAdapter(max_retries=retries)
self.session.mount("https://", adapter)
DEFAULT_REQUEST_TIMEOUT = 30
response = self.session.get(
embededLink,
headers={'User-Agent': self.RANDOM_USER_AGENT},
timeout=DEFAULT_REQUEST_TIMEOUT
)
redirect = re.search(r"https?://[^'\"<>]+", response.text)
if not redirect:
raise ValueError("No redirect found.")
redirect_url = redirect.group(0)
parts = redirect_url.strip().split("/")
self.Header["Referer"] = f"{parts[0]}//{parts[2]}/"
response = self.session.get(redirect_url, headers={'User-Agent': self.RANDOM_USER_AGENT})
html = response.content
# Method 1: Extract from script tag
extracted = self.extract_voe_from_script(html)
if extracted:
return extracted, self.Header
# Method 2: Extract from base64 encoded variable
htmlText = html.decode('utf-8')
b64_match = B64_PATTERN.search(htmlText)
if b64_match:
decoded = base64.b64decode(b64_match.group(1)).decode()[::-1]
source = json.loads(decoded).get("source")
if source:
return source, self.Header
# Method 3: Extract HLS source
hls_match = HLS_PATTERN.search(htmlText)
if hls_match:
return base64.b64decode(hls_match.group("hls")).decode(), self.Header
def shift_letters(self, input_str):
result = ''
for c in input_str:
code = ord(c)
if 65 <= code <= 90:
code = (code - 65 + 13) % 26 + 65
elif 97 <= code <= 122:
code = (code - 97 + 13) % 26 + 97
result += chr(code)
return result
def replace_junk(self, input_str):
junk_parts = ['@$', '^^', '~@', '%?', '*~', '!!', '#&']
for part in junk_parts:
input_str = re.sub(re.escape(part), '_', input_str)
return input_str
def shift_back(self, s, n):
return ''.join(chr(ord(c) - n) for c in s)
def decode_voe_string(self, encoded):
step1 = self.shift_letters(encoded)
step2 = self.replace_junk(step1).replace('_', '')
step3 = base64.b64decode(step2).decode()
step4 = self.shift_back(step3, 3)
step5 = base64.b64decode(step4[::-1]).decode()
return json.loads(step5)
def extract_voe_from_script(self, html):
soup = BeautifulSoup(html, "html.parser")
script = soup.find("script", type="application/json")
return self.decode_voe_string(script.text[2:-2])["source"]
import re
import base64
import json
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import requests
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from .Provider import Provider
# Compile regex patterns once for better performance
REDIRECT_PATTERN = re.compile(r"https?://[^'\"<>]+")
B64_PATTERN = re.compile(r"var a168c='([^']+)'")
HLS_PATTERN = re.compile(r"'hls': '(?P<hls>[^']+)'")
class VOE(Provider):
def __init__(self):
self.RANDOM_USER_AGENT = UserAgent().random
self.Header = {
"User-Agent": self.RANDOM_USER_AGENT
}
def GetLink(self, embededLink: str, DEFAULT_REQUEST_TIMEOUT: int) -> (str, [str]):
self.session = requests.Session()
# Configure retries with backoff
retries = Retry(
total=5, # Number of retries
backoff_factor=1, # Delay multiplier (1s, 2s, 4s, ...)
status_forcelist=[500, 502, 503, 504], # Retry for specific HTTP errors
allowed_methods=["GET"]
)
adapter = HTTPAdapter(max_retries=retries)
self.session.mount("https://", adapter)
DEFAULT_REQUEST_TIMEOUT = 30
response = self.session.get(
embededLink,
headers={'User-Agent': self.RANDOM_USER_AGENT},
timeout=DEFAULT_REQUEST_TIMEOUT
)
redirect = re.search(r"https?://[^'\"<>]+", response.text)
if not redirect:
raise ValueError("No redirect found.")
redirect_url = redirect.group(0)
parts = redirect_url.strip().split("/")
self.Header["Referer"] = f"{parts[0]}//{parts[2]}/"
response = self.session.get(redirect_url, headers={'User-Agent': self.RANDOM_USER_AGENT})
html = response.content
# Method 1: Extract from script tag
extracted = self.extract_voe_from_script(html)
if extracted:
return extracted, self.Header
# Method 2: Extract from base64 encoded variable
htmlText = html.decode('utf-8')
b64_match = B64_PATTERN.search(htmlText)
if b64_match:
decoded = base64.b64decode(b64_match.group(1)).decode()[::-1]
source = json.loads(decoded).get("source")
if source:
return source, self.Header
# Method 3: Extract HLS source
hls_match = HLS_PATTERN.search(htmlText)
if hls_match:
return base64.b64decode(hls_match.group("hls")).decode(), self.Header
def shift_letters(self, input_str):
result = ''
for c in input_str:
code = ord(c)
if 65 <= code <= 90:
code = (code - 65 + 13) % 26 + 65
elif 97 <= code <= 122:
code = (code - 97 + 13) % 26 + 97
result += chr(code)
return result
def replace_junk(self, input_str):
junk_parts = ['@$', '^^', '~@', '%?', '*~', '!!', '#&']
for part in junk_parts:
input_str = re.sub(re.escape(part), '_', input_str)
return input_str
def shift_back(self, s, n):
return ''.join(chr(ord(c) - n) for c in s)
def decode_voe_string(self, encoded):
step1 = self.shift_letters(encoded)
step2 = self.replace_junk(step1).replace('_', '')
step3 = base64.b64decode(step2).decode()
step4 = self.shift_back(step3, 3)
step5 = base64.b64decode(step4[::-1]).decode()
return json.loads(step5)
def extract_voe_from_script(self, html):
soup = BeautifulSoup(html, "html.parser")
script = soup.find("script", type="application/json")
return self.decode_voe_string(script.text[2:-2])["source"]