Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 35 additions & 17 deletions mtprotoproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ def init_config():
# fallback to direct DC connection if all middle proxies fail
conf_dict.setdefault("MIDDLE_PROXY_DIRECT_FALLBACK", False)

# if IPv6 avaliable, use it by default
conf_dict.setdefault("PREFER_IPV6", socket.has_ipv6)
# if IPv6 available, use it by default, IPv6 with middle proxies is unstable now
conf_dict.setdefault("PREFER_IPV6", socket.has_ipv6 and not conf_dict["USE_MIDDLE_PROXY"])

# disables tg->client trafic reencryption, faster but less secure
# disables tg->client traffic reencryption, faster but less secure
conf_dict.setdefault("FAST_MODE", True)

# enables some working modes
Expand Down Expand Up @@ -281,6 +281,9 @@ def init_config():
# telegram servers connect timeout in seconds
conf_dict.setdefault("TG_CONNECT_TIMEOUT", 10)

# drop connection if no data from telegram server for this many seconds
conf_dict.setdefault("TG_READ_TIMEOUT", 60)

# listen address for IPv4
conf_dict.setdefault("LISTEN_ADDR_IPV4", "0.0.0.0")

Expand Down Expand Up @@ -486,7 +489,7 @@ def getrandbytes(self, n):


class TgConnectionPool:
MAX_CONNS_IN_POOL = 64
MAX_CONNS_IN_POOL = 16

def __init__(self):
self.pools = {}
Expand All @@ -503,6 +506,16 @@ async def open_tg_connection(self, host, port, init_func=None):
timeout=config.TG_CONNECT_TIMEOUT)
return reader_tgt, writer_tgt

def is_conn_dead(self, reader, writer):
if writer.transport.is_closing():
return True
raw_reader = reader
while hasattr(raw_reader, 'upstream'):
raw_reader = raw_reader.upstream
if raw_reader.at_eof():
return True
return False

def register_host_port(self, host, port, init_func):
if (host, port, init_func) not in self.pools:
self.pools[(host, port, init_func)] = []
Expand All @@ -515,15 +528,16 @@ async def get_connection(self, host, port, init_func=None):
self.register_host_port(host, port, init_func)

ret = None
for task in self.pools[(host, port, init_func)][::]:
for task in self.pools[(host, port, init_func)][:]:
if task.done():
if task.exception():
self.pools[(host, port, init_func)].remove(task)
continue

reader, writer, *other = task.result()
if writer.transport.is_closing():
if self.is_conn_dead(reader, writer):
self.pools[(host, port, init_func)].remove(task)
writer.transport.abort()
continue

if not ret:
Expand Down Expand Up @@ -785,7 +799,7 @@ class MTProtoCompactFrameStreamWriter(LayeredStreamWriterBase):

def write(self, data, extra={}):
SMALL_PKT_BORDER = 0x7f
LARGE_PKT_BORGER = 256 ** 3
LARGE_PKT_BORDER = 256 ** 3

if len(data) % 4 != 0:
print_err("BUG: MTProtoFrameStreamWriter attempted to send msg with len", len(data))
Expand All @@ -798,7 +812,7 @@ def write(self, data, extra={}):

if len_div_four < SMALL_PKT_BORDER:
return self.upstream.write(bytes([len_div_four]) + data)
elif len_div_four < LARGE_PKT_BORGER:
elif len_div_four < LARGE_PKT_BORDER:
return self.upstream.write(b'\x7f' + int.to_bytes(len_div_four, 3, 'little') + data)
else:
print_err("Attempted to send too large pkt len =", len(data))
Expand Down Expand Up @@ -1372,7 +1386,7 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT)
return False
except ConnectionAbortedError as E:
print_err("The Telegram server connection is bad: %d %s" % (dc_idx, E))
print_err("The Telegram server connection is bad: %d (%s %s) %s" % (dc_idx, dc, TG_DATACENTER_PORT, E))
return False
except (OSError, asyncio.TimeoutError) as E:
print_err("Unable to connect to", dc, TG_DATACENTER_PORT)
Expand Down Expand Up @@ -1596,7 +1610,11 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
async def tg_connect_reader_to_writer(rd, wr, user, rd_buf_size, is_upstream):
try:
while True:
data = await rd.read(rd_buf_size)
if not is_upstream:
data = await asyncio.wait_for(rd.read(rd_buf_size),
timeout=config.TG_READ_TIMEOUT)
else:
data = await rd.read(rd_buf_size)
if isinstance(data, tuple):
data, extra = data
else:
Expand All @@ -1617,7 +1635,7 @@ async def tg_connect_reader_to_writer(rd, wr, user, rd_buf_size, is_upstream):

wr.write(data, extra)
await wr.drain()
except (OSError, asyncio.IncompleteReadError) as e:
except (OSError, asyncio.IncompleteReadError, asyncio.TimeoutError) as e:
# print_err(e)
pass

Expand Down Expand Up @@ -2187,15 +2205,15 @@ def print_tg_info():
for ip in ip_addrs:
if config.MODES["classic"]:
params = {"server": ip, "port": config.PORT, "secret": secret}
params_encodeded = urllib.parse.urlencode(params, safe=':')
classic_link = "tg://proxy?{}".format(params_encodeded)
params_encoded = urllib.parse.urlencode(params, safe=':')
classic_link = "tg://proxy?{}".format(params_encoded)
proxy_links.append({"user": user, "link": classic_link})
print("{}: {}".format(user, classic_link), flush=True)

if config.MODES["secure"]:
params = {"server": ip, "port": config.PORT, "secret": "dd" + secret}
params_encodeded = urllib.parse.urlencode(params, safe=':')
dd_link = "tg://proxy?{}".format(params_encodeded)
params_encoded = urllib.parse.urlencode(params, safe=':')
dd_link = "tg://proxy?{}".format(params_encoded)
proxy_links.append({"user": user, "link": dd_link})
print("{}: {}".format(user, dd_link), flush=True)

Expand All @@ -2205,8 +2223,8 @@ def print_tg_info():
# tls_secret = bytes.fromhex("ee" + secret) + config.TLS_DOMAIN.encode()
# tls_secret_base64 = base64.urlsafe_b64encode(tls_secret)
params = {"server": ip, "port": config.PORT, "secret": tls_secret}
params_encodeded = urllib.parse.urlencode(params, safe=':')
tls_link = "tg://proxy?{}".format(params_encodeded)
params_encoded = urllib.parse.urlencode(params, safe=':')
tls_link = "tg://proxy?{}".format(params_encoded)
proxy_links.append({"user": user, "link": tls_link})
print("{}: {}".format(user, tls_link), flush=True)

Expand Down
Loading