diff --git a/lib/relay/servers/socksplugins/http.py b/lib/relay/servers/socksplugins/http.py index 767ca9b..6ec3d33 100644 --- a/lib/relay/servers/socksplugins/http.py +++ b/lib/relay/servers/socksplugins/http.py @@ -599,6 +599,44 @@ def shouldProbeAnonymous(self): relayClient = self.activeRelays[self.username]['protocolClient'] return relayClient.serverConfig.kernelAuth + def _sendBrowserError(self, code, reason): + try: + msg = ('HTTP/1.1 %d %s\r\nContent-Length: 0\r\nConnection: close\r\n\r\n' % (code, reason)).encode() + self.socksSocket.sendall(msg) + except Exception: + pass + + def _sendViaRelay(self, tosend, socketLock, protocol): + """Acquire the relay lock, forward the request, transfer the response. + On lock-acquire timeout: 503 to browser, session left intact. + On relay socket timeout/error: 504 to browser, relay socket killed so + future connections fail isConnectionAlive cleanly instead of reading + stale response data.""" + if not socketLock.acquire(timeout=15): + LOG.error('%s: Lock timeout waiting for relay socket (%s)' % (protocol, self.username)) + self._sendBrowserError(503, 'Service Unavailable') + return False + try: + try: + self.relaySocket.settimeout(10) + self.relaySocket.sendall(tosend) + self.transferResponse() + return True + except (socket.timeout, ConnectionResetError, BrokenPipeError, OSError) as e: + LOG.error('%s: Relay socket error, killing session: %s' % (protocol, str(e))) + self._sendBrowserError(504, 'Gateway Timeout') + try: + self.relaySocket.close() + except Exception: + pass + return False + finally: + try: + self.relaySocket.settimeout(None) + except Exception: + pass + socketLock.release() + def _processRequestWithProbe(self, buffer, socketLock, protocol='HTTP'): """ Process request with try-anonymous-first, fallback-to-auth strategy. @@ -607,10 +645,8 @@ def _processRequestWithProbe(self, buffer, socketLock, protocol='HTTP'): """ if not self.shouldProbeAnonymous(): # Kernel auth mode not enabled, use authenticated relay normally - with socketLock: - tosend = self.prepareRequest(buffer) - self.relaySocket.sendall(tosend) - self.transferResponse() + tosend = self.prepareRequest(buffer) + self._sendViaRelay(tosend, socketLock, protocol) return relayClient = self.activeRelays[self.username]['protocolClient'] @@ -625,9 +661,7 @@ def _processRequestWithProbe(self, buffer, socketLock, protocol='HTTP'): if cache_key in authCache and authCache[cache_key]: # Cached as needs auth - use authenticated relay directly LOG.debug('%s: Cache HIT (auth) %s' % (protocol, path)) - with socketLock: - self.relaySocket.sendall(tosend) - self.transferResponse() + self._sendViaRelay(tosend, socketLock, protocol) return # Try anonymous connection @@ -640,9 +674,7 @@ def _processRequestWithProbe(self, buffer, socketLock, protocol='HTTP'): anonConn.connect() except Exception as e: LOG.error('%s: Anon connection failed: %s, using auth relay' % (protocol, str(e))) - with socketLock: - self.relaySocket.sendall(tosend) - self.transferResponse() + self._sendViaRelay(tosend, socketLock, protocol) return try: @@ -658,9 +690,7 @@ def _processRequestWithProbe(self, buffer, socketLock, protocol='HTTP'): anonConn.close() except Exception: pass - with socketLock: - self.relaySocket.sendall(tosend) - self.transferResponse() + self._sendViaRelay(tosend, socketLock, protocol) return # Decision: is it a 401? @@ -673,9 +703,7 @@ def _processRequestWithProbe(self, buffer, socketLock, protocol='HTTP'): except Exception: pass - with socketLock: - self.relaySocket.sendall(tosend) - self.transferResponse() + self._sendViaRelay(tosend, socketLock, protocol) else: # Success - forward response with initial data we already read LOG.debug('%s: Path %s OK anonymously (cached)' % (protocol, path)) @@ -696,9 +724,7 @@ def _processRequestWithProbe(self, buffer, socketLock, protocol='HTTP'): anonConn.close() except Exception: pass - with socketLock: - self.relaySocket.sendall(tosend) - self.transferResponse() + self._sendViaRelay(tosend, socketLock, protocol) def prepareRequest(self, data): # Parse the HTTP data, removing headers that break stuff diff --git a/lib/relay/servers/socksserver.py b/lib/relay/servers/socksserver.py index fb128b3..1fc778a 100644 --- a/lib/relay/servers/socksserver.py +++ b/lib/relay/servers/socksserver.py @@ -22,6 +22,7 @@ from __future__ import print_function import socketserver import socket +import select import time import logging from queue import Queue @@ -361,22 +362,8 @@ def handle(self): LOG.debug('SOCKS: Target is %s(%s)' % (self.targetHost, self.targetPort)) - if self.targetPort != 53: - # Do we have an active connection for the target host/port asked? - # Still don't know the username, but it's a start - if self.targetHost in self.__socksServer.activeRelays: - if self.targetPort not in self.__socksServer.activeRelays[self.targetHost]: - LOG.error('SOCKS: Don\'t have a relay for %s(%s)' % (self.targetHost, self.targetPort)) - self.sendReplyError(replyField.CONNECTION_REFUSED) - return - else: - LOG.error('SOCKS: Don\'t have a relay for %s(%s)' % (self.targetHost, self.targetPort)) - self.sendReplyError(replyField.CONNECTION_REFUSED) - return - - # Now let's get into the loops + # DNS requests get a direct passthrough if self.targetPort == 53: - # Somebody wanting a DNS request. Should we handle this? s = socket.socket() try: LOG.debug('SOCKS: Connecting to %s(%s)' %(self.targetHost, self.targetPort)) @@ -469,7 +456,48 @@ def handle(self): if relay.username is not None: self.__socksServer.activeRelays[self.targetHost][self.targetPort][relay.username]['inUse'] = False else: - LOG.error('SOCKS: I don\'t have a handler for this port') + # No relay session — forward directly to the target + LOG.debug('SOCKS: No relay for %s(%s), forwarding directly' % (self.targetHost, self.targetPort)) + s = socket.socket() + try: + s.connect((self.targetHost, self.targetPort)) + except Exception as e: + LOG.debug("Exception:", exc_info=True) + LOG.error('SOCKS: %s' % str(e)) + s.close() + self.sendReplyError(replyField.CONNECTION_REFUSED) + return + + try: + if self.__socksVersion == 5: + reply = SOCKS5_REPLY() + reply['REP'] = replyField.SUCCEEDED.value + addr, port = s.getsockname() + reply['PAYLOAD'] = socket.inet_aton(addr) + pack('>H', port) + else: + reply = SOCKS4_REPLY() + self.__connSocket.sendall(reply.getData()) + + while True: + try: + readable, _, _ = select.select([self.__connSocket, s], [], [], 60) + if not readable: + break + for sock in readable: + data = sock.recv(8192) + if not data: + return + if sock is self.__connSocket: + s.sendall(data) + else: + self.__connSocket.sendall(data) + except Exception as e: + LOG.debug("Exception:", exc_info=True) + LOG.debug('SOCKS: %s' % str(e)) + break + finally: + s.close() + return LOG.debug('SOCKS: Shutting down connection') try: