Проблема с многопоточным подключением к приложениям и сокетам Python
Я изучаю проблему с приложением Python, запущенным на машине Ubuntu с 4G ОЗУ. Инструмент будет использоваться для аудита серверов (мы предпочитаем рулон наших собственных инструментов). Он использует потоки для подключения к большому количеству серверов, и многие из TCP-соединений терпят неудачу. Однако, если я добавлю задержку в 1 секунду между началом каждого потока, то большинство соединений будут успешными. Я использовал этот простой script для изучения того, что может произойти:
#!/usr/bin/python
import sys
import socket
import threading
import time
class Scanner(threading.Thread):
def __init__(self, host, port):
threading.Thread.__init__(self)
self.host = host
self.port = port
self.status = ""
def run(self):
self.sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sk.settimeout(20)
try:
self.sk.connect((self.host, self.port))
except Exception, err:
self.status = str(err)
else:
self.status = "connected"
finally:
self.sk.close()
def get_hostnames_list(filename):
return open(filename).read().splitlines()
if (__name__ == "__main__"):
hostnames_file = sys.argv[1]
hosts_list = get_hostnames_list(hostnames_file)
threads = []
for host in hosts_list:
#time.sleep(1)
thread = Scanner(host, 443)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print "Host: ", thread.host, " : ", thread.status
Если я запустил это с помощью time.sleep(1), прокомментировавшего, скажем, 300 хостов, многие из подключений терпят неудачу с ошибкой тайм-аута, тогда как они не тайм-аут, если я ставлю задержку на одну секунду. я попробовал приложение на другом дистрибутиве Linux, работающем на более мощной машине, и было не так много ошибок подключения? Это связано с ограничением ядра? Есть ли что-нибудь, что я могу сделать, чтобы заставить соединение работать без задержки?
UPDATE
Я также пробовал программу, которая ограничивала количество потоков, доступных в пуле. Уменьшая это до 20, я могу заставить все подключения работать, но он проверяет только 1 хост в секунду. Так что, что бы я ни пытался (помещаясь в сон (1) или ограничивая количество одновременных потоков), я не могу проверять более 1 хоста каждую секунду.
UPDATE
Я нашел этот question, который кажется похожим на то, что я вижу.
UPDATE
Интересно, может ли написать это с помощью скручивания? Может ли кто-нибудь показать, что мой пример будет выглядеть как написанный с использованием скрученного?
Ответы
Ответ 1
Вы можете попробовать gevent
:
from gevent.pool import Pool
from gevent import monkey; monkey.patch_all() # patches stdlib
import sys
import logging
from httplib import HTTPSConnection
from timeit import default_timer as timer
info = logging.getLogger().info
def connect(hostname):
info("connecting %s", hostname)
h = HTTPSConnection(hostname, timeout=2)
try: h.connect()
except IOError, e:
info("error %s reason: %s", hostname, e)
else:
info("done %s", hostname)
finally:
h.close()
def main():
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
info("getting hostname list")
hosts_file = sys.argv[1] if len(sys.argv) > 1 else "hosts.txt"
hosts_list = open(hosts_file).read().splitlines()
info("spawning jobs")
pool = Pool(20) # limit number of concurrent connections
start = timer()
for _ in pool.imap(connect, hosts_list):
pass
info("%d hosts took us %.2g seconds", len(hosts_list), timer() - start)
if __name__=="__main__":
main()
Он может обрабатывать более одного хоста в секунду.
Выход
2011-01-31 11:08:29,052 getting hostname list
2011-01-31 11:08:29,052 spawning jobs
2011-01-31 11:08:29,053 connecting www.yahoo.com
2011-01-31 11:08:29,053 connecting www.abc.com
2011-01-31 11:08:29,053 connecting www.google.com
2011-01-31 11:08:29,053 connecting stackoverflow.com
2011-01-31 11:08:29,053 connecting facebook.com
2011-01-31 11:08:29,054 connecting youtube.com
2011-01-31 11:08:29,054 connecting live.com
2011-01-31 11:08:29,054 connecting baidu.com
2011-01-31 11:08:29,054 connecting wikipedia.org
2011-01-31 11:08:29,054 connecting blogspot.com
2011-01-31 11:08:29,054 connecting qq.com
2011-01-31 11:08:29,055 connecting twitter.com
2011-01-31 11:08:29,055 connecting msn.com
2011-01-31 11:08:29,055 connecting yahoo.co.jp
2011-01-31 11:08:29,055 connecting taobao.com
2011-01-31 11:08:29,055 connecting google.co.in
2011-01-31 11:08:29,056 connecting sina.com.cn
2011-01-31 11:08:29,056 connecting amazon.com
2011-01-31 11:08:29,056 connecting google.de
2011-01-31 11:08:29,056 connecting google.com.hk
2011-01-31 11:08:29,188 done www.google.com
2011-01-31 11:08:29,189 done google.com.hk
2011-01-31 11:08:29,224 error wikipedia.org reason: [Errno 111] Connection refused
2011-01-31 11:08:29,225 done google.co.in
2011-01-31 11:08:29,227 error msn.com reason: [Errno 111] Connection refused
2011-01-31 11:08:29,228 error live.com reason: [Errno 111] Connection refused
2011-01-31 11:08:29,250 done google.de
2011-01-31 11:08:29,262 done blogspot.com
2011-01-31 11:08:29,271 error www.abc.com reason: [Errno 111] Connection refused
2011-01-31 11:08:29,465 done amazon.com
2011-01-31 11:08:29,467 error sina.com.cn reason: [Errno 111] Connection refused
2011-01-31 11:08:29,496 done www.yahoo.com
2011-01-31 11:08:29,521 done stackoverflow.com
2011-01-31 11:08:29,606 done youtube.com
2011-01-31 11:08:29,939 done twitter.com
2011-01-31 11:08:33,056 error qq.com reason: timed out
2011-01-31 11:08:33,057 error taobao.com reason: timed out
2011-01-31 11:08:33,057 error yahoo.co.jp reason: timed out
2011-01-31 11:08:34,466 done facebook.com
2011-01-31 11:08:35,056 error baidu.com reason: timed out
2011-01-31 11:08:35,057 20 hosts took us 6 seconds
Ответ 2
Интересно, может ли написать это с помощью скручивания? Может ли кто-нибудь показать, что мой пример будет выглядеть как написанный с использованием скрученного?
Этот вариант намного быстрее, чем код, который использует gevent
:
#!/usr/bin/env python
import sys
from timeit import default_timer as timer
from twisted.internet import defer, protocol, reactor, ssl, task
from twisted.python import log
info = log.msg
class NoopProtocol(protocol.Protocol):
def makeConnection(self, transport):
transport.loseConnection()
def connect(host, port, contextFactory=ssl.ClientContextFactory(), timeout=30):
info("connecting %s" % host)
cc = protocol.ClientCreator(reactor, NoopProtocol)
d = cc.connectSSL(host, port, contextFactory, timeout)
d.addCallbacks(lambda _: info("done %s" % host),
lambda f: info("error %s reason: %s" % (host, f.value)))
return d
def n_at_a_time(it, n):
"""Iterate over `it` concurently `n` items at a time.
`it` - an iterator creating Deferreds
`n` - number of concurrent iterations
return a deferred that fires on completion
"""
return defer.DeferredList([task.coiterate(it) for _ in xrange(n)])
def main():
try:
log.startLogging(sys.stderr, setStdout=False)
info("getting hostname list")
hosts_file = sys.argv[1] if len(sys.argv) > 1 else "hosts.txt"
hosts_list = open(hosts_file).read().splitlines()
info("spawning jobs")
start = timer()
jobs = (connect(host, 443, timeout=2) for host in hosts_list)
d = n_at_a_time(jobs, n=20) # limit number of simultaneous connections
d.addCallback(lambda _: info("%d hosts took us %.2g seconds" % (
len(hosts_list), timer() - start)))
d.addBoth(lambda _: (info("the end"), reactor.stop()))
except:
log.err()
reactor.stop()
if __name__=="__main__":
reactor.callWhenRunning(main)
reactor.run()
Здесь вариант, который использует t.i.d.inlineCallbacks
. Это требует Python 2.5 или новее. Это позволяет записывать асинхронный код синхронным (блокирующим) образом:
#!/usr/bin/env python
import sys
from timeit import default_timer as timer
from twisted.internet import defer, protocol, reactor, ssl, task
from twisted.python import log
info = log.msg
class NoopProtocol(protocol.Protocol):
def makeConnection(self, transport):
transport.loseConnection()
@defer.inlineCallbacks
def connect(host, port, contextFactory=ssl.ClientContextFactory(), timeout=30):
info("connecting %s" % host)
cc = protocol.ClientCreator(reactor, NoopProtocol)
try:
yield cc.connectSSL(host, port, contextFactory, timeout)
except Exception, e:
info("error %s reason: %s" % (host, e))
else:
info("done %s" % host)
def n_at_a_time(it, n):
"""Iterate over `it` concurently `n` items at a time.
`it` - an iterator creating Deferreds
`n` - number of concurrent iterations
return a deferred that fires on completion
"""
return defer.DeferredList([task.coiterate(it) for _ in xrange(n)])
@defer.inlineCallbacks
def main():
try:
log.startLogging(sys.stderr, setStdout=False)
info("getting hostname list")
hosts_file = sys.argv[1] if len(sys.argv) > 1 else "hosts.txt"
hosts_list = open(hosts_file).read().splitlines()
info("spawning jobs")
start = timer()
jobs = (connect(host, 443, timeout=2) for host in hosts_list)
yield n_at_a_time(jobs, n=20) # limit number of simultaneous connections
info("%d hosts took us %.2g seconds" % (len(hosts_list), timer()-start))
info("the end")
except:
log.err()
finally:
reactor.stop()
if __name__=="__main__":
reactor.callWhenRunning(main)
reactor.run()
Ответ 3
Как насчет реального потока?
#!/usr/bin/env python3
# http://code.activestate.com/recipes/577187-python-thread-pool/
from queue import Queue
from threading import Thread
class Worker(Thread):
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
func, args, kargs = self.tasks.get()
try: func(*args, **kargs)
except Exception as exception: print(exception)
self.tasks.task_done()
class ThreadPool:
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in range(num_threads): Worker(self.tasks)
def add_task(self, func, *args, **kargs):
self.tasks.put((func, args, kargs))
def wait_completion(self):
self.tasks.join()
Пример:
import threadpool
pool = threadpool.ThreadPool(20) # 20 threads
pool.add_task(print, "test")
pool.wait_completion()
Это в python 3, но не должно быть слишком сложно преобразовать в 2.x. Я не удивлен, если это устраняет вашу проблему.
Ответ 4
Python 3.4 представляет новый предварительный API для асинхронного ввода-вывода - asyncio
.
Этот подход похож на twisted
на основе ответа:
#!/usr/bin/env python3.4
import asyncio
import logging
from contextlib import closing
class NoopProtocol(asyncio.Protocol):
def connection_made(self, transport):
transport.close()
info = logging.getLogger().info
@asyncio.coroutine
def connect(loop, semaphor, host, port=443, ssl=True, timeout=15):
try:
with (yield from semaphor):
info("connecting %s" % host)
done, pending = yield from asyncio.wait(
[loop.create_connection(NoopProtocol, host, port, ssl=ssl)],
loop=loop, timeout=timeout)
if done:
next(iter(done)).result()
except Exception as e:
info("error %s reason: %s" % (host, e))
else:
if pending:
info("error %s reason: timeout" % (host,))
for ft in pending:
ft.cancel()
else:
info("done %s" % host)
@asyncio.coroutine
def main(loop):
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
limit, timeout, hosts = parse_cmdline()
# connect `limit` concurrent connections
sem = asyncio.BoundedSemaphore(limit)
coros = [connect(loop, sem, host, timeout=timeout) for host in hosts]
if coros:
yield from asyncio.wait(coros, loop=loop)
if __name__=="__main__":
with closing(asyncio.get_event_loop()) as loop:
loop.run_until_complete(main(loop))
Помимо варианта twisted
он использует NoopProtocol
, который ничего не делает, но немедленно отключается при успешном соединении.
Количество параллельных соединений ограничено с помощью семафора.
Код coroutine-based.
Пример
Чтобы узнать, сколько успешных ssl-соединений мы можем сделать для первых 1000 хостов из списка Top миллионов Alexa:
$ curl -O http://s3.amazonaws.com/alexa-static/top-1m.csv.zip
$ unzip *.zip
$ /usr/bin/time perl -nE'say $1 if /\d+,([^\s,]+)$/' top-1m.csv | head -1000 |\
python3.4 asyncio_ssl.py - --timeout 60 |& tee asyncio.log
В результате получается менее половины всех подключений. В среднем он проверяет ~ 20 хостов в секунду. Через несколько минут было отключено множество сайтов. Если хост не соответствует именам хостов из сертификата сервера, соединение также не выполняется. Он включает сравнения example.com
и www.example.com
.
Ответ 5
Прежде всего, попробуйте использовать неблокирующие сокеты.
Другая причина заключается в том, что вы потребляете все эфемерные порты.
Попробуйте снять ограничение на это.