-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgdpy_extensions.py
More file actions
97 lines (70 loc) · 2.6 KB
/
gdpy_extensions.py
File metadata and controls
97 lines (70 loc) · 2.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
"""
GDPY Extensions
---------------
An add-on library for gd.py that allows for
socks4 and socks5 proxy support through `aiohttp_socks`
::
from gdpy_extensions import ProxyClient
async def get_daily():
client = ProxyClient(proxy_url="socks5://localhost:9050")
daily_level = await client.get_daily()
...
"""
from aiohttp import ClientSession
from aiohttp_socks import (
ProxyConnector,
ProxyTimeoutError,
ProxyConnectionError,
ProxyError,
)
from gd import Client, Session, HTTPClient
from attrs import field, define, frozen
__license__ = "MIT"
__version__ = "0.0.2"
__author__ = "Calloc"
COMMON_PROXY_ERRORS = ProxyTimeoutError, ProxyConnectionError, ProxyError
"""Makes it very simple to write an exception since it is common
for proxies to fail a request"""
@define(slots=False)
class ProxyHTTPClient(HTTPClient):
"""Enforces gdpy to go through any proxy and that has support for
socks4 , socks5 and http"""
proxy_url:str = field(factory=str)
rdns: bool = field(default=False)
transport: ProxyConnector = field(init=False)
def __attrs_post_init__(self) -> None:
self.transport = ProxyConnector.from_url(self.proxy_url, rdns=self.rdns)
return super().__attrs_post_init__()
def __hash__(self) -> int:
return id(self)
async def create_session(self) -> ClientSession:
return ClientSession(
skip_auto_headers=self.SKIP_HEADERS, connector=self.transport, timeout=30
)
def rotate_proxy(self, proxy_url: str, rdns: bool = True):
"""Used to rotate a proxy. This is good from when
one of several things have happened
- The Server has banned your proxy (403 Forbidden)
- The Server has Rate-limited your proxy (429 Too Many Requests)
- The Proxy itself is dead (Connection Failure)...
"""
self.rdns = rdns
self.proxy_url = proxy_url
self.transport = ProxyConnector.from_url(proxy_url, rdns=self.rdns)
@frozen()
class ProxySession(Session):
proxy_url: str
http: ProxyHTTPClient = field(init=False)
def __attrs_post_init__(self):
# see: https://www.attrs.org/en/stable/init.html#post-init
object.__setattr__(self, "http", ProxyHTTPClient(proxy_url=self.proxy_url))
@define(slots=False)
class ProxyClient(Client):
proxy_url: str = field(factory=str)
session: ProxySession = field(init=False)
def __attrs_post_init__(self):
self.session = ProxySession(self.proxy_url)
self.rotate_proxy = self.http.rotate_proxy
@property
def http(self):
return self.session.http