Загадочные исключения при создании множества одновременных запросов от urllib.request к HTTPServer
Я пытаюсь сделать эту криптографическую задачу Matasano, которая включает в себя временную атаку на сервер с искусственно замедленной функцией сравнения строк. В нем говорится использовать "веб-структуру по вашему выбору", но мне не хотелось устанавливать веб-фреймворк, поэтому я решил использовать класс HTTPServer, встроенный в http.server
.
Я придумал что-то, что сработало, но оно было очень медленным, поэтому я попытался ускорить его с использованием (плохо документированного) потока пула, встроенного в multiprocessing.dummy
. Это было намного быстрее, но я заметил что-то странное: если я делаю 8 или меньше запросов одновременно, он работает нормально. Если у меня есть нечто большее, это работает некоторое время и дает мне ошибки в кажущиеся случайными временами. Ошибки кажутся непоследовательными и не всегда одинаковыми, но обычно они имеют Connection refused, invalid argument
, OSError: [Errno 22] Invalid argument
, urllib.error.URLError: <urlopen error [Errno 22] Invalid argument>
, BrokenPipeError: [Errno 32] Broken pipe
или urllib.error.URLError: <urlopen error [Errno 61] Connection refused>
.
Есть ли ограничение на количество соединений, которые может обрабатывать сервер? Я не думаю, что количество потоков как таковых является проблемой, потому что я написал простую функцию, которая выполняла замедленное сравнение строк без запуска веб-сервера и называла ее с 500 одновременными потоками, и она работала нормально. Я не думаю, что просто делать запросы из многих потоков - проблема, потому что я сделал сканеры, которые использовали более 100 потоков (все одновременные запросы на один и тот же сайт), и они отлично работали. Похоже, что HTTPServer не предназначен для надежного размещения производственных веб-сайтов, которые получают большой объем трафика, но я удивлен, что это легко заставить его сбой.
Я попытался постепенно удалить материал из моего кода, который выглядел не связанным с проблемой, как я обычно делаю, когда я диагностирую таинственные ошибки, подобные этому, но в этом случае это было не очень полезно. Похоже, что когда я удалял, по-видимому, несвязанный код, количество соединений, которые сервер мог обрабатывать, постепенно увеличивался, но не было явной причины сбоев.
Кто-нибудь знает, как увеличить количество запросов, которые я могу сделать сразу, или, по крайней мере, почему это происходит?
Мой код сложный, но я придумал эту простую программу, которая демонстрирует проблему:
#!/usr/bin/env python3
import os
import random
from http.server import BaseHTTPRequestHandler, HTTPServer
from multiprocessing.dummy import Pool as ThreadPool
from socketserver import ForkingMixIn, ThreadingMixIn
from threading import Thread
from time import sleep
from urllib.error import HTTPError
from urllib.request import urlopen
class FancyHTTPServer(ThreadingMixIn, HTTPServer):
pass
class MyRequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
sleep(random.uniform(0, 2))
self.send_response(200)
self.end_headers()
self.wfile.write(b"foo")
def log_request(self, code=None, size=None):
pass
def request_is_ok(number):
try:
urlopen("http://localhost:31415/test" + str(number))
except HTTPError:
return False
else:
return True
server = FancyHTTPServer(("localhost", 31415), MyRequestHandler)
try:
Thread(target=server.serve_forever).start()
with ThreadPool(200) as pool:
for i in range(10):
numbers = [random.randint(0, 99999) for j in range(20000)]
for j, result in enumerate(pool.imap(request_is_ok, numbers)):
if j % 20 == 0:
print(i, j)
finally:
server.shutdown()
server.server_close()
print("done testing server")
По какой-то причине программа выше работает отлично, если у нее не более 100 потоков или около того, но мой реальный код для вызова может обрабатывать только 8 потоков. Если я запускаю его с 9, я обычно получаю ошибки соединения, и с 10 я всегда получаю ошибки подключения. Я попытался использовать concurrent.futures.ThreadPoolExecutor
, concurrent.futures.ProcessPoolExecutor
и multiprocessing.pool
вместо multiprocessing.dummy.pool
, и ни один из них не помог. Я попытался использовать простой объект HTTPServer
(без ThreadingMixIn
), и это просто заставило вещи работать очень медленно и не устранило проблему. Я попытался использовать ForkingMixIn
, и это тоже не исправило.
Что я должен делать? Я запускаю Python 3.5.1 на MacBook Pro конца 2013 года под управлением OS X 10.11.3.
EDIT: Я пробовал еще несколько вещей, включая запуск сервера в процессе вместо потока, как простой HTTPServer
, с ForkingMixIn
и с ThreadingMixIn
, Ни один из них не помог.
EDIT: Эта проблема незнакома, чем я думал. Я попытался сделать один script с сервером, а другой с большим количеством потоков, делающих запросы, и запускал их на разных вкладках моего терминала. Процесс с сервером прошел нормально, но один из запросов делался с ошибкой. Исключениями были сочетание ConnectionResetError: [Errno 54] Connection reset by peer
, urllib.error.URLError: <urlopen error [Errno 54] Connection reset by peer>
, OSError: [Errno 41] Protocol wrong type for socket
, urllib.error.URLError: <urlopen error [Errno 41] Protocol wrong type for socket>
, urllib.error.URLError: <urlopen error [Errno 22] Invalid argument>
.
Я попробовал это с фиктивным сервером, как выше, и если бы я ограничил количество одновременных запросов до 5 или менее, он работал нормально, но с 6 запросами клиентский процесс разбился. Были ошибки на сервере, но он продолжал идти. Клиент разбился, независимо от того, использовал ли я потоки или процессы для выполнения запросов. Затем я попытался помещать замедленную функцию на сервер, и он мог обрабатывать 60 одновременных запросов, но он разбился с 70. Это похоже на то, что это может противоречить доказательствам, что проблема связана с сервером.
РЕДАКТИРОВАТЬ: Я пробовал большую часть вещей, которые я описал, используя requests
вместо urllib.request
и столкнулся с аналогичными проблемами.
EDIT: Теперь я запускаю OS X 10.11.4 и сталкиваюсь с теми же проблемами.
Ответы
Ответ 1
Вы используете значение по умолчанию listen()
, которое, вероятно, является причиной многих из этих ошибок. Это не количество одновременных клиентов с уже установленным соединением, а количество клиентов, ожидающих очереди прослушивания до установления соединения. Измените свой класс сервера на:
class FancyHTTPServer(ThreadingMixIn, HTTPServer):
def server_activate(self):
self.socket.listen(128)
128 является разумным пределом. Возможно, вы захотите проверить socket.SOMAXCONN или вашу ОС somaxconn, если хотите увеличить ее дальше. Если у вас все еще есть случайные ошибки при большой нагрузке, вы должны проверить свои настройки ulimit и при необходимости увеличить.
Я сделал это с вашим примером, и я получил более 1000 потоков, работающих нормально, поэтому я думаю, что это должно решить вашу проблему.
Обновление
Если он улучшится, но он все еще терпит крах 200 одновременных клиентов, то я уверен, что основной проблемой является размер отставания. Имейте в виду, что ваша проблема заключается не в количестве одновременных клиентов, а в количестве одновременных запросов на соединение. Краткое объяснение того, что это значит, не заходя слишком глубоко в внутренние сети TCP.
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((HOST, PORT))
s.listen(BACKLOG)
while running:
conn, addr = s.accept()
do_something(conn, addr)
В этом примере сокет теперь принимает соединения на данном порту, а вызов s.accept()
блокируется до тех пор, пока клиент не подключится. У вас может быть много клиентов, пытающихся подключиться одновременно, и в зависимости от вашего приложения вы не сможете вызвать s.accept()
и отправить клиентское соединение так же быстро, как клиенты пытаются подключиться. Ожидающие клиенты поставлены в очередь, а максимальный размер этой очереди определяется значением BACKLOG. Если очередь заполнена, клиенты будут терпеть неудачу с ошибкой Connection Refused.
Threading не помогает, так как класс ThreadingMixIn выполняет вызов do_something(conn, addr)
в отдельном потоке, поэтому сервер может вернуться к mainloop и вызову s.accept()
.
Вы можете попытаться увеличить отставание дальше, но будет точка, в которой это не поможет, потому что, если очередь становится слишком большой, некоторые клиенты будут таймаутом, прежде чем сервер выполнит вызов s.accept()
.
Итак, как я сказал выше, ваша проблема - это количество одновременных попыток подключения, а не количество одновременных клиентов. Возможно, 128 достаточно для вашего реального приложения, но вы получаете ошибку в своем тесте, потому что пытаетесь подключиться со всеми 200 потоками сразу и наводнять очередь.
Не беспокойтесь о ulimit
, если вы не получите ошибку Too many open files
, но если вы хотите увеличить отставание за пределами 128, выполните некоторые исследования socket.SOMAXCONN
. Это хорошее начало: https://utcc.utoronto.ca/~cks/space/blog/python/AvoidSOMAXCONN
Ответ 2
Я бы сказал, что ваша проблема связана с некоторой блокировкой ввода-вывода, поскольку я успешно выполнил ваш код в NodeJs. Я также заметил, что и сервер, и клиент могут работать индивидуально.
Но можно увеличить количество запросов с несколькими изменениями:
-
Определите количество параллельных подключений:
http.server.HTTPServer.request_queue_size = 500
-
Запустите сервер в другом процессе:
server = multiprocessing.Process(target = RunHTTPServer) server.start()
-
Используйте пул соединений на стороне клиента для выполнения запросов
-
Используйте пул потоков на стороне сервера для обработки запросов
-
Разрешить повторное использование соединения на стороне клиента, установив схему и используя заголовок "keep-alive"
Со всеми этими изменениями мне удалось запустить код с 500 потоками без каких-либо проблем. Поэтому, если вы хотите попробовать, вот полный код:
import random
from time import sleep, clock
from http.server import BaseHTTPRequestHandler, HTTPServer
from multiprocessing import Process
from multiprocessing.pool import ThreadPool
from socketserver import ThreadingMixIn
from concurrent.futures import ThreadPoolExecutor
from urllib3 import HTTPConnectionPool
from urllib.error import HTTPError
class HTTPServerThreaded(HTTPServer):
request_queue_size = 500
allow_reuse_address = True
def serve_forever(self):
executor = ThreadPoolExecutor(max_workers=self.request_queue_size)
while True:
try:
request, client_address = self.get_request()
executor.submit(ThreadingMixIn.process_request_thread, self, request, client_address)
except OSError:
break
self.server_close()
class MyRequestHandler(BaseHTTPRequestHandler):
default_request_version = 'HTTP/1.1'
def do_GET(self):
sleep(random.uniform(0, 1) / 100.0)
data = b"abcdef"
self.send_response(200)
self.send_header("Content-type", 'text/html')
self.send_header("Content-length", len(data))
self.end_headers()
self.wfile.write(data)
def log_request(self, code=None, size=None):
pass
def RunHTTPServer():
server = HTTPServerThreaded(('127.0.0.1', 5674), MyRequestHandler)
server.serve_forever()
client_headers = {
'User-Agent' : 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)',
'Content-Type': 'text/plain',
'Connection': 'keep-alive'
}
client_pool = None
def request_is_ok(number):
response = client_pool.request('GET', "/test" + str(number), headers=client_headers)
return response.status == 200 and response.data == b"abcdef"
if __name__ == '__main__':
# start the server in another process
server = Process(target=RunHTTPServer)
server.start()
# start a connection pool for the clients
client_pool = HTTPConnectionPool('127.0.0.1', 5674)
# execute the requests
with ThreadPool(500) as thread_pool:
start = clock()
for i in range(5):
numbers = [random.randint(0, 99999) for j in range(20000)]
for j, result in enumerate(thread_pool.imap(request_is_ok, numbers)):
if j % 1000 == 0:
print(i, j, result)
end = clock()
print("execution time: %s" % (end-start,))
Обновление 1:
Увеличение request_queue_size просто дает вам больше места для хранения запросов, которые не могут быть выполнены в то время, чтобы их можно было выполнить позже.
Таким образом, чем дольше очередь, тем выше дисперсия для времени отклика, что я считаю противоположной вашей цели здесь.
Что касается ThreadingMixIn, он не идеален, так как он создает и уничтожает поток для каждого запроса, и это дорого. Лучшим решением для сокращения очереди ожидания является использование пула повторных потоков для обработки запросов.
Причиной запуска сервера в другом процессе является использование другого ЦП для сокращения времени выполнения.
Для клиентской стороны, использующей HTTPConnectionPool, был единственный способ сохранить постоянный поток запросов, поскольку у меня было какое-то странное поведение с urlopen при анализе соединений.
Ответ 3
В норме используется только столько потоков, сколько ядер, следовательно, потребность в потоке 8 (включая виртуальные ядра). Модель потоковой обработки проще всего работать, но это действительно мусорный способ сделать это. Лучшим способом обработки нескольких соединений является использование асинхронного подхода. Это сложнее, хотя.
С помощью метода потоков вы можете начать с изучения того, остается ли процесс открытым после выхода из программы. Это будет означать, что ваши потоки не закрываются и, очевидно, будут вызывать проблемы.
Попробуйте это...
class FancyHTTPServer(ThreadingMixIn, HTTPServer):
daemon_threads = True
Это обеспечит правильное закрытие ваших потоков. Это может произойти автоматически в пуле потоков, но, вероятно, стоит попробовать.