From 8e8d8b3eda3006d3f14676dff2d9f7a466e662f3 Mon Sep 17 00:00:00 2001 From: mikhail Date: Thu, 27 Feb 2025 13:30:02 -0500 Subject: [PATCH] cleaned up code --- main.py | 104 ++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 67 insertions(+), 37 deletions(-) diff --git a/main.py b/main.py index 7370f73..c3e5575 100644 --- a/main.py +++ b/main.py @@ -4,9 +4,35 @@ from dataclasses import dataclass from pathlib import Path from typing import Any, Final + +@dataclass +class Host: + """Proxy host information.""" + + ip: str + port: str + signature: str + + +DELAY_PER_REQUEST: Final[float] = 1 +"""Delay in seconds between host authentication requests.""" + GET_ACCESS_TOKEN_URL: Final[str] = ( "https://api-pro.ducunt.com/rest/v1/security/tokens/accs" ) +"""URL to get proxy service authentication token for further API calls.""" + +GET_COUNTRIES_URL: Final[str] = ( + "https://stats.ducunt.com/api/rest/v2/entrypoints/countries" +) +"""URL API endpoint.""" + +GET_ACCESS_PROXY_URL: Final[str] = ( + "https://api-pro.ducunt.com/rest/v1/security/tokens/accs-proxy" +) + +# Get authentication token from proxy service +print("Getting authentication token from proxy service") get_access_token_payload: dict[str, str | dict[str, str]] = { "clientApp": {"name": "URBAN_VPN_BROWSER_EXTENSION"}, @@ -16,59 +42,55 @@ get_access_token_payload: dict[str, str | dict[str, str]] = { with requests.post( GET_ACCESS_TOKEN_URL, json=get_access_token_payload, - headers={"Authorization": "Bearer On9GLm0c8B5GFWI9FYtdfPWlXfjhpF0Q"}, -) as request: - security_token_payload: Any = request.json() + headers={ + "Authorization": "Bearer On9GLm0c8B5GFWI9FYtdfPWlXfjhpF0Q", + }, +) as reponse: + authentication_response: dict[str, Any] = reponse.json() -authentication_token: str = security_token_payload["value"] +authentication_token: str = authentication_response["value"] +print("Authentication token recieved") -GET_COUNTRIES_URL: Final[str] = ( - "https://stats.ducunt.com/api/rest/v2/entrypoints/countries" -) +# Get region list from proxy service +print("Getting available regions from proxy service") with requests.get( GET_COUNTRIES_URL, headers={ "X-Client-App": "URBAN_VPN_BROWSER_EXTENSION", "Authorization": f"Bearer {authentication_token}", }, -) as request: - countries_payload: Any = request.json() +) as reponse: + regions_response: dict[str, Any] = reponse.json() -hosts: list[Any] = [ - element["servers"]["elements"] - for element in countries_payload["countries"]["elements"] - if element["servers"]["count"] > 0 +regions: list[Any] = [ + country["servers"]["elements"] + for country in regions_response["countries"]["elements"] + if country["servers"]["count"] > 0 ] - -@dataclass -class Host: - ip: str - port: str - signature: str - - -host_data: list[Host] = [ +hosts: list[Host] = [ Host( - ip=element["address"]["primary"]["host"], - port=element["address"]["primary"]["port"], - signature=element["signature"] if "signature" in element else "", + ip=host["address"]["primary"]["ip"], + port=host["address"]["primary"]["port"], + signature=host["signature"] if "signature" in host else "", ) - for host in hosts - for element in host + for region in regions + for host in region + if host["type"] == "PROXY" and host["accessType"] == "ACCESSIBLE" ] -GET_ACCESS_PROXY_URL: Final[str] = ( - "https://api-pro.ducunt.com/rest/v1/security/tokens/accs-proxy" -) + +print("Successfully acquired regions and available hosts information") +print(f"Preparing to query authentication information for {len(hosts)} hosts") with Path("proxies").open("w", encoding="utf-8") as proxy_file: - for host in host_data: + for i, host in enumerate(hosts): + time.sleep(DELAY_PER_REQUEST) get_access_proxy_token_payload: dict[str, str | dict[str, str]] = { "clientApp": {"name": "URBAN_VPN_BROWSER_EXTENSION"}, "signature": next( - (host.signature for host in host_data if len(host.signature) > 0), "" + (host.signature for host in hosts if len(host.signature) > 0), "" ), "type": "accs", } @@ -76,9 +98,17 @@ with Path("proxies").open("w", encoding="utf-8") as proxy_file: with requests.post( GET_ACCESS_PROXY_URL, json=get_access_proxy_token_payload, - headers={"Authorization": f"Bearer {authentication_token}"}, - ) as request: - response: Any = request.json() - time.sleep(0.25) + headers={ + "Authorization": f"Bearer {authentication_token}", + }, + ) as reponse: + host_information_response: Any = reponse.json() - proxy_file.write(f"http://{response['value']}:1@{host.ip}:{host.port}\n") + proxy_connection_string: str = ( + f"http://{host.signature}:1@{host.ip}:{host.port}\n" + ) + + proxy_file.write(proxy_connection_string) + proxy_file.flush() + + print(f"Acquired information for host {i}")