Implement autocreation of proxy.json, removing tempout after execution

Signed-off-by: hax <hax@lainlounge.xyz>
This commit is contained in:
h@x 2025-02-09 03:04:49 +00:00
parent b86abb1ddb
commit 9b7f13988d

137
main.py
View file

@ -6,90 +6,118 @@ import time
import sys
import subprocess
import json
import importlib
import inspect
from concurrent.futures import ThreadPoolExecutor, as_completed
from proxy_provider import ProxyProvider
from proxy_providers import *
from tqdm import tqdm
PROXIES_LIST_URL = "https://nnp.nnchan.ru/mahoproxy.php?u=https://api.sandvpn.com/fetch-free-proxys"
SPEEDTEST_URL = "http://212.183.159.230/5MB.zip"
def fetch_proxies():
"""Fetch the list of proxies from the defined URL."""
try:
response = requests.get(PROXIES_LIST_URL)
response.raise_for_status()
return response.json()
except (requests.RequestException, json.JSONDecodeError) as e:
print(f"Failed to fetch proxies: {e}")
return []
def is_valid_proxy(proxy):
"""Check if the proxy is valid and not from Russia (youtube is slowed down there)."""
return proxy["host"] and proxy["country"] != "Russia"
"""Check if the proxy is valid."""
return proxy.get("host") is not None and proxy.get("country") != "Russia"
def construct_proxy_string(proxy):
"""Construct a proxy string from the proxy dictionary."""
if proxy.get("username"):
return f'{proxy["username"]}:{proxy["password"]}@{proxy["host"]}:{proxy["port"]}'
return (
f'{proxy["username"]}:{proxy["password"]}@{proxy["host"]}:{proxy["port"]}'
)
return f'{proxy["host"]}:{proxy["port"]}'
def test_proxy(proxy):
"""Test the proxy by measuring the download time."""
proxy_str = construct_proxy_string(proxy)
print(f'Testing {proxy_str}')
start_time = time.perf_counter()
try:
response = requests.get(SPEEDTEST_URL, stream=True, proxies={"http": proxy_str}, timeout=5)
response.raise_for_status()
response = requests.get(
SPEEDTEST_URL,
stream=True,
proxies={"http": f"http://{proxy_str}"},
timeout=5,
)
response.raise_for_status() # Ensure we raise an error for bad responses
total_length = response.headers.get('content-length')
total_length = response.headers.get("content-length")
if total_length is None or int(total_length) != 5242880:
print("No content or unexpected content size.")
return None
with io.BytesIO() as f:
download_time, downloaded_bytes = download_with_progress(response, f, total_length, start_time)
return {"time": download_time, **proxy}
download_time, _ = download_with_progress(
response, f, total_length, start_time
)
return {"time": download_time, **proxy} # Include original proxy info
except requests.RequestException:
print("Proxy is dead, skipping...")
return None
def download_with_progress(response, f, total_length, start_time):
"""Download content with progress tracking."""
"""Download content from the response with progress tracking."""
downloaded_bytes = 0
for chunk in response.iter_content(1024):
downloaded_bytes += len(chunk)
f.write(chunk)
done = int(30 * downloaded_bytes / int(total_length))
speed = downloaded_bytes / (time.perf_counter() - start_time) / 100000
# Check if download speed is too low and skip if necessary
if done > 3 and speed < 1.0:
print("\nProxy is too slow, skipping...")
return float('inf'), downloaded_bytes
sys.stdout.write(f"\r[{'=' * done}{' ' * (30 - done)}] {speed:.2f} Mbps")
sys.stdout.write("\n")
if done == 6:
break
if (
done > 3
and (downloaded_bytes // (time.perf_counter() - start_time) / 100000) < 1.0
):
return float("inf"), downloaded_bytes
return round(time.perf_counter() - start_time, 2), downloaded_bytes
def get_best_proxies(proxies):
"""Return the top five proxies based on speed."""
proxy_times = [test_proxy(proxy) for proxy in proxies if is_valid_proxy(proxy)]
return sorted(filter(None, proxy_times), key=lambda x: x['time'])[:5]
def save_proxies_to_file(proxies, filename="proxy.json"):
"""Save the best proxies to a JSON file."""
try:
with open(os.path.join(os.path.dirname(__file__), filename), "w") as f:
json.dump(proxies, f, indent=4)
except IOError as e:
print(f"Failed to save proxies to file: {e}")
with open(os.path.join(os.path.dirname(__file__), filename), "w") as f:
json.dump(proxies, f, indent=4)
def get_best_proxies(providers):
"""Return the top five proxies based on speed from all providers."""
all_proxies = []
proxies = None
for provider in providers:
try:
print(f"Fetching proxies from {provider.__class__.__name__}")
proxies = provider.fetch_proxies()
all_proxies.extend([proxy for proxy in proxies if is_valid_proxy(proxy)])
except Exception as e:
print(f"Failed to fetch proxies from {provider.__class__.__name__}: {e}")
best_proxies = []
with ThreadPoolExecutor(max_workers=2) as executor:
futures = {executor.submit(test_proxy, proxy): proxy for proxy in all_proxies}
for future in tqdm(as_completed(futures), total=len(futures), desc="Testing proxies", bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_noinv_fmt}]", unit=' proxies', unit_scale=True, ncols=80):
result = future.result()
if result is not None:
best_proxies.append(result)
return sorted(best_proxies, key=lambda x: x["time"])[:5]
def update_proxies():
"""Update the proxies list and save the best ones."""
proxies = fetch_proxies()
best_proxies = get_best_proxies(proxies)
providers = []
for filename in os.listdir(os.path.join(os.path.dirname(__file__), "proxy_providers")):
# Check if the file is a Python module
if filename.endswith(".py") and filename != "__init__.py":
module_name = filename[:-3] # Remove the '.py' suffix
module_path = f'{"proxy_providers"}.{module_name}'
module = importlib.import_module(module_path)
classes = inspect.getmembers(module, inspect.isclass)
providers.append(
[classs[-1]() for classs in classes if classs[0] != "ProxyProvider"][0]
)
best_proxies = get_best_proxies(providers)
save_proxies_to_file(best_proxies)
print("All done.")
def run_yt_dlp():
"""Run yt-dlp with a randomly selected proxy."""
while True:
@ -98,22 +126,24 @@ def run_yt_dlp():
proxy = random.choice(json.load(f))
proxy_str = construct_proxy_string(proxy)
print(f"Using proxy from {proxy['city']}, {proxy['country']}")
if execute_yt_dlp_command(proxy_str):
os.remove("tempout")
break # Exit loop if command was successful
break # Exit loop if command was successful
print("Got 'Sign in to confirm' error. Trying again with another proxy...")
except FileNotFoundError as e:
print("'proxy.json' not found. Starting proxy list update...")
update_proxies()
def execute_yt_dlp_command(proxy_str):
"""Execute the yt-dlp command with the given proxy."""
command = f"yt-dlp --color always --proxy '{proxy_str}' {' '.join([str(arg) for arg in sys.argv])} 2>&1 | tee tempout"
command = f"yt-dlp --color always --proxy http://{proxy_str} {' '.join([str(arg) for arg in sys.argv])} 2>&1 | tee tempout"
subprocess.run(command, shell=True)
with open("tempout", 'r') as log_fl:
result = 'Sign in to' not in log_fl.read()
os.remove("tempout") # Clean up after checking log file
return result
with open("tempout", "r") as log_fl:
log_text = log_fl.read()
return "Sign in to" not in log_text and "403" not in log_text
def main():
"""Main function to handle script arguments and execute the appropriate command."""
@ -121,12 +151,15 @@ def main():
if "update" in sys.argv:
update_proxies()
elif len(sys.argv) < 2:
print("usage: main.py update | <yt-dlp args> \nScript for starting yt-dlp with best free proxy\nCommands:\n update Update best proxy")
print(
"usage: main.py update | <yt-dlp args> \nScript for starting yt-dlp with best free proxy\nCommands:\n update Update best proxy"
)
else:
sys.argv.pop(0)
run_yt_dlp()
except KeyboardInterrupt:
print("Canceled by user")
if __name__ == "__main__":
main()
main()