-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlinux_client.py
More file actions
139 lines (122 loc) · 5.05 KB
/
linux_client.py
File metadata and controls
139 lines (122 loc) · 5.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import requests
import time
import subprocess
from datetime import datetime
import dnsstamps
from dnsstamps import Option
# IPFS API URL and IPNS key
IPFS_API_URL = "http://localhost:5001/api/v0"
IPNS_KEY = "/ipns/k51qzi5uqu5di6najzrxj93bpvomq1zqd37fottriis27yajea2pmrdhoesy2m"
# DNSCrypt-proxy configuration path
DNSCRYPT_CONFIG_PATH = "/etc/dnscrypt-proxy/dnscrypt-proxy.toml"
def get_latest_ipns_content():
"""
Resolve the latest IPFS content using the IPNS key with no cache.
"""
try:
response = requests.post(f"{IPFS_API_URL}/name/resolve", params={"arg": IPNS_KEY, "nocache": "true"})
response.raise_for_status()
cid = response.json().get('Path')
print(f"Resolved IPNS to CID: {cid}")
return cid
except requests.exceptions.RequestException as e:
print(f"Error resolving IPNS: {e}")
return None
def get_ip_from_ipfs(cid):
"""
Retrieve the IP address from the IPFS content using the IPFS API.
"""
try:
# Use the IPFS API to directly fetch the content of the CID
response = requests.post(f"{IPFS_API_URL}/cat", params={"arg": cid})
response.raise_for_status()
content = response.json()
ip_address = content.get('ip')
query_path = content.get('query_path')
print(f"Retrieved IP address from IPFS content: {ip_address}")
print(f"Retrieved query path from IPFS content: {query_path}")
return ip_address, query_path
except requests.exceptions.RequestException as e:
print(f"Error retrieving IPFS content via API: {e}")
return None
def create_dnscrypt_stamp(ip_address, hostname, query_path):
"""
Create a DNSCrypt DoH stamp based on the server's IP address and hostname.
"""
stamp = dnsstamps.create_doh(ip_address, [], hostname, query_path, [Option.NO_LOGS, Option.DNSSEC, Option.NO_FILTERS])
print(f"Generated DNSCrypt stamp: {stamp}")
return stamp
def update_dnscrypt_config(ip_address, hostname, query_path):
"""
Update the dnscrypt-proxy configuration with the new IP address and stamp.
"""
stamp = create_dnscrypt_stamp(ip_address, hostname, query_path)
if not stamp:
print("Error: Unable to generate stamp.")
return
try:
with open(DNSCRYPT_CONFIG_PATH, 'r') as file:
config = file.readlines()
# Update the static section for the custom DoH server
with open(DNSCRYPT_CONFIG_PATH, 'w') as file:
for line in config:
if line.strip().startswith('stamp ='):
file.write(f"stamp = '{stamp}'\n")
else:
file.write(line)
print(f"Updated dnscrypt-proxy configuration with the new DoH server at {ip_address}.")
except FileNotFoundError:
print(f"Configuration file {DNSCRYPT_CONFIG_PATH} not found.")
except Exception as e:
print(f"Error updating dnscrypt-proxy configuration: {e}")
def restart_dnscrypt_proxy():
"""
Restart the dnscrypt-proxy service.
"""
try:
subprocess.run(['sudo', 'systemctl', 'restart', 'dnscrypt-proxy'], check=True)
print("Restarted dnscrypt-proxy service to apply the new configuration.")
print("Sleeping 5 before testing...")
time.sleep(5)
except subprocess.CalledProcessError as e:
print(f"Error restarting dnscrypt-proxy: {e}")
def test_doh_server(ip_address):
"""
Test if the DoH server is reachable at the new IP address using dig.
"""
try:
result = subprocess.run(['dig', '@127.0.0.1', 'google.com'], capture_output=True, text=True)
print(f"Command executed: dig @127.0.0.1 google.com")
print(f"Return code: {result.returncode}")
print(f"stdout: {result.stdout}")
print(f"stderr: {result.stderr}")
if result.returncode == 0:
print(f"DoH server at {ip_address} is reachable. Output:\n{result.stdout}")
else:
print(f"Failed to reach DoH server at {ip_address}. Error:\n{result.stderr}")
except Exception as e:
print(f"Error testing DoH server: {e}")
def main():
while True:
# Get the current time
now = datetime.now()
# Wait until the start of the next minute
wait_time = 60 - now.second
#time.sleep(wait_time)
time.sleep(2)
print(f"\n[{datetime.now()}] Checking IPNS...")
# Resolve the latest content from IPNS
cid = get_latest_ipns_content()
if cid:
# Get the IP address from the IPFS content
ip_address, query_path = get_ip_from_ipfs(cid)
hostname = ip_address+":443"
if ip_address:
# Update dnscrypt-proxy configuration with the new IP address and DoH stamp
update_dnscrypt_config(ip_address, hostname, query_path)
# Restart dnscrypt-proxy to apply the new configuration
restart_dnscrypt_proxy()
# Test the DoH server at the retrieved IP address
test_doh_server(ip_address)
if __name__ == "__main__":
main()