cleaned up code
This commit is contained in:
parent
e12d10a1b1
commit
8e8d8b3eda
104
main.py
104
main.py
@ -4,9 +4,35 @@ from dataclasses import dataclass
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, Final
|
from typing import Any, Final
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Host:
|
||||||
|
"""Proxy host information."""
|
||||||
|
|
||||||
|
ip: str
|
||||||
|
port: str
|
||||||
|
signature: str
|
||||||
|
|
||||||
|
|
||||||
|
DELAY_PER_REQUEST: Final[float] = 1
|
||||||
|
"""Delay in seconds between host authentication requests."""
|
||||||
|
|
||||||
GET_ACCESS_TOKEN_URL: Final[str] = (
|
GET_ACCESS_TOKEN_URL: Final[str] = (
|
||||||
"https://api-pro.ducunt.com/rest/v1/security/tokens/accs"
|
"https://api-pro.ducunt.com/rest/v1/security/tokens/accs"
|
||||||
)
|
)
|
||||||
|
"""URL to get proxy service authentication token for further API calls."""
|
||||||
|
|
||||||
|
GET_COUNTRIES_URL: Final[str] = (
|
||||||
|
"https://stats.ducunt.com/api/rest/v2/entrypoints/countries"
|
||||||
|
)
|
||||||
|
"""URL API endpoint."""
|
||||||
|
|
||||||
|
GET_ACCESS_PROXY_URL: Final[str] = (
|
||||||
|
"https://api-pro.ducunt.com/rest/v1/security/tokens/accs-proxy"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get authentication token from proxy service
|
||||||
|
print("Getting authentication token from proxy service")
|
||||||
|
|
||||||
get_access_token_payload: dict[str, str | dict[str, str]] = {
|
get_access_token_payload: dict[str, str | dict[str, str]] = {
|
||||||
"clientApp": {"name": "URBAN_VPN_BROWSER_EXTENSION"},
|
"clientApp": {"name": "URBAN_VPN_BROWSER_EXTENSION"},
|
||||||
@ -16,59 +42,55 @@ get_access_token_payload: dict[str, str | dict[str, str]] = {
|
|||||||
with requests.post(
|
with requests.post(
|
||||||
GET_ACCESS_TOKEN_URL,
|
GET_ACCESS_TOKEN_URL,
|
||||||
json=get_access_token_payload,
|
json=get_access_token_payload,
|
||||||
headers={"Authorization": "Bearer On9GLm0c8B5GFWI9FYtdfPWlXfjhpF0Q"},
|
headers={
|
||||||
) as request:
|
"Authorization": "Bearer On9GLm0c8B5GFWI9FYtdfPWlXfjhpF0Q",
|
||||||
security_token_payload: Any = request.json()
|
},
|
||||||
|
) as reponse:
|
||||||
|
authentication_response: dict[str, Any] = reponse.json()
|
||||||
|
|
||||||
authentication_token: str = security_token_payload["value"]
|
authentication_token: str = authentication_response["value"]
|
||||||
|
print("Authentication token recieved")
|
||||||
|
|
||||||
GET_COUNTRIES_URL: Final[str] = (
|
# Get region list from proxy service
|
||||||
"https://stats.ducunt.com/api/rest/v2/entrypoints/countries"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
print("Getting available regions from proxy service")
|
||||||
with requests.get(
|
with requests.get(
|
||||||
GET_COUNTRIES_URL,
|
GET_COUNTRIES_URL,
|
||||||
headers={
|
headers={
|
||||||
"X-Client-App": "URBAN_VPN_BROWSER_EXTENSION",
|
"X-Client-App": "URBAN_VPN_BROWSER_EXTENSION",
|
||||||
"Authorization": f"Bearer {authentication_token}",
|
"Authorization": f"Bearer {authentication_token}",
|
||||||
},
|
},
|
||||||
) as request:
|
) as reponse:
|
||||||
countries_payload: Any = request.json()
|
regions_response: dict[str, Any] = reponse.json()
|
||||||
|
|
||||||
hosts: list[Any] = [
|
regions: list[Any] = [
|
||||||
element["servers"]["elements"]
|
country["servers"]["elements"]
|
||||||
for element in countries_payload["countries"]["elements"]
|
for country in regions_response["countries"]["elements"]
|
||||||
if element["servers"]["count"] > 0
|
if country["servers"]["count"] > 0
|
||||||
]
|
]
|
||||||
|
|
||||||
|
hosts: list[Host] = [
|
||||||
@dataclass
|
|
||||||
class Host:
|
|
||||||
ip: str
|
|
||||||
port: str
|
|
||||||
signature: str
|
|
||||||
|
|
||||||
|
|
||||||
host_data: list[Host] = [
|
|
||||||
Host(
|
Host(
|
||||||
ip=element["address"]["primary"]["host"],
|
ip=host["address"]["primary"]["ip"],
|
||||||
port=element["address"]["primary"]["port"],
|
port=host["address"]["primary"]["port"],
|
||||||
signature=element["signature"] if "signature" in element else "",
|
signature=host["signature"] if "signature" in host else "",
|
||||||
)
|
)
|
||||||
for host in hosts
|
for region in regions
|
||||||
for element in host
|
for host in region
|
||||||
|
if host["type"] == "PROXY" and host["accessType"] == "ACCESSIBLE"
|
||||||
]
|
]
|
||||||
GET_ACCESS_PROXY_URL: Final[str] = (
|
|
||||||
"https://api-pro.ducunt.com/rest/v1/security/tokens/accs-proxy"
|
print("Successfully acquired regions and available hosts information")
|
||||||
)
|
print(f"Preparing to query authentication information for {len(hosts)} hosts")
|
||||||
|
|
||||||
with Path("proxies").open("w", encoding="utf-8") as proxy_file:
|
with Path("proxies").open("w", encoding="utf-8") as proxy_file:
|
||||||
for host in host_data:
|
for i, host in enumerate(hosts):
|
||||||
|
time.sleep(DELAY_PER_REQUEST)
|
||||||
|
|
||||||
get_access_proxy_token_payload: dict[str, str | dict[str, str]] = {
|
get_access_proxy_token_payload: dict[str, str | dict[str, str]] = {
|
||||||
"clientApp": {"name": "URBAN_VPN_BROWSER_EXTENSION"},
|
"clientApp": {"name": "URBAN_VPN_BROWSER_EXTENSION"},
|
||||||
"signature": next(
|
"signature": next(
|
||||||
(host.signature for host in host_data if len(host.signature) > 0), ""
|
(host.signature for host in hosts if len(host.signature) > 0), ""
|
||||||
),
|
),
|
||||||
"type": "accs",
|
"type": "accs",
|
||||||
}
|
}
|
||||||
@ -76,9 +98,17 @@ with Path("proxies").open("w", encoding="utf-8") as proxy_file:
|
|||||||
with requests.post(
|
with requests.post(
|
||||||
GET_ACCESS_PROXY_URL,
|
GET_ACCESS_PROXY_URL,
|
||||||
json=get_access_proxy_token_payload,
|
json=get_access_proxy_token_payload,
|
||||||
headers={"Authorization": f"Bearer {authentication_token}"},
|
headers={
|
||||||
) as request:
|
"Authorization": f"Bearer {authentication_token}",
|
||||||
response: Any = request.json()
|
},
|
||||||
time.sleep(0.25)
|
) as reponse:
|
||||||
|
host_information_response: Any = reponse.json()
|
||||||
|
|
||||||
proxy_file.write(f"http://{response['value']}:1@{host.ip}:{host.port}\n")
|
proxy_connection_string: str = (
|
||||||
|
f"http://{host.signature}:1@{host.ip}:{host.port}\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
proxy_file.write(proxy_connection_string)
|
||||||
|
proxy_file.flush()
|
||||||
|
|
||||||
|
print(f"Acquired information for host {i}")
|
||||||
|
Loading…
Reference in New Issue
Block a user