Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use exponential backoff for connection retries #65

Merged
merged 3 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 63 additions & 3 deletions kdcproxy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
import httplib
import urlparse

logging.basicConfig()
logger = logging.getLogger('kdcproxy')


class HTTPException(Exception):

Expand All @@ -61,17 +64,37 @@ def __str__(self):
return "%d %s" % (self.code, httplib.responses[self.code])


class SocketException(Exception):

def __init__(self, message, sock):
super(Exception, self).__init__(message)
self.sockfno = sock.fileno()


class Application:
MAX_LENGTH = 128 * 1024
SOCKTYPES = {
"tcp": socket.SOCK_STREAM,
"udp": socket.SOCK_DGRAM,
}

def addr2socktypename(self, addr):
ret = None
for name in self.SOCKTYPES:
if self.SOCKTYPES[name] == addr[1]:
ret = name
break
return ret

def __init__(self):
self.__resolver = MetaResolver()

def __await_reply(self, pr, rsocks, wsocks, timeout):
starting_time = time.time()
send_error = None
recv_error = None
failing_sock = None
reactivations = {}
extra = 0
read_buffers = {}
while (timeout + extra) > time.time():
Expand All @@ -92,6 +115,12 @@ def __await_reply(self, pr, rsocks, wsocks, timeout):
pass

for sock in w:
# Fetch reactivation tuple:
# 1st element: reactivation index (-1 = first activation)
# 2nd element: planned reactivation time (0.0 = now)
(rn, rt) = reactivations.get(sock, (-1, 0.0))
if rt > time.time():
continue
try:
if self.sock_type(sock) == socket.SOCK_DGRAM:
# If we proxy over UDP, remove the 4-byte length
Expand All @@ -101,23 +130,44 @@ def __await_reply(self, pr, rsocks, wsocks, timeout):
sock.sendall(pr.request)
extra = 10 # New connections get 10 extra seconds
except Exception as e:
logging.warning("Conection broken while writing (%s)", e)
send_error = e
failing_sock = sock
reactivations[sock] = (rn + 1,
time.time() + 2.0**(rn + 1) / 10)
continue
if sock in reactivations:
del reactivations[sock]
rsocks.append(sock)
wsocks.remove(sock)

for sock in r:
try:
reply = self.__handle_recv(sock, read_buffers)
except Exception as e:
logging.warning("Connection broken while reading (%s)", e)
recv_error = e
failing_sock = sock
if self.sock_type(sock) == socket.SOCK_STREAM:
# Remove broken TCP socket from readers
rsocks.remove(sock)
else:
if reply is not None:
return reply

if reactivations:
raise SocketException("Timeout while sending packets after %.2fs "
"and %d tries: %s" % (
(timeout + extra) - starting_time,
sum(map(lambda r: r[0],
reactivations.values())),
send_error),
failing_sock)
elif recv_error is not None:
raise SocketException("Timeout while receiving packets after "
"%.2fs: %s" % (
(timeout + extra) - starting_time,
recv_error),
failing_sock)

return None

def __handle_recv(self, sock, read_buffers):
Expand Down Expand Up @@ -215,6 +265,7 @@ def __call__(self, env, start_response):
reply = None
wsocks = []
rsocks = []
sockfno2addr = {}
for server in map(urlparse.urlparse, servers):
# Enforce valid, supported URIs
scheme = server.scheme.lower().split("+", 1)
Expand Down Expand Up @@ -261,6 +312,7 @@ def __call__(self, env, start_response):
continue
except io.BlockingIOError:
pass
sockfno2addr[sock.fileno()] = addr
wsocks.append(sock)

# Resend packets to UDP servers
Expand All @@ -271,7 +323,15 @@ def __call__(self, env, start_response):

# Call select()
timeout = time.time() + (15 if addr is None else 2)
reply = self.__await_reply(pr, rsocks, wsocks, timeout)
try:
reply = self.__await_reply(pr, rsocks, wsocks, timeout)
except SocketException as e:
fail_addr = sockfno2addr[e.sockfno]
fail_socktype = self.addr2socktypename(fail_addr)
fail_ip = fail_addr[4][0]
fail_port = fail_addr[4][1]
logger.warning("Exchange with %s:[%s]:%d failed: %s",
fail_socktype, fail_ip, fail_port, e)
if reply is not None:
break

Expand Down
7 changes: 5 additions & 2 deletions kdcproxy/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import dns.rdatatype
import dns.resolver

logging.basicConfig()
logger = logging.getLogger('kdcproxy')


class IResolver(object):

Expand Down Expand Up @@ -60,14 +63,14 @@ def __init__(self, filenames=None):
try:
self.__cp.read(filenames)
except configparser.Error:
logging.error("Unable to read config file(s): %s", filenames)
logger.error("Unable to read config file(s): %s", filenames)

try:
mod = self.__cp.get(self.GLOBAL, "configs")
try:
importlib.import_module("kdcproxy.config." + mod)
except ImportError as e:
logging.log(logging.ERROR, "Error reading config: %s" % e)
logger.log(logging.ERROR, "Error reading config: %s" % e)
except configparser.Error:
pass

Expand Down
3 changes: 2 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ deps =
doc8
docutils
markdown
rst2html
basepython = python3
commands =
doc8 --allow-long-titles README
python setup.py check --restructuredtext --metadata --strict
rst2html.py README {toxworkdir}/README.html
rst2html README {toxworkdir}/README.html
markdown_py README.md -f {toxworkdir}/README.md.html

[pytest]
Expand Down