Paramiko Не удается загрузить большие файлы> 1GB
def download():
if os.path.exists( dst_dir_path ) == False:
logger.error( "Cannot access destination folder %s. Please check path and permissions. " % ( dst_dir_path ))
return 1
elif os.path.isdir( dst_dir_path ) == False:
logger.error( "%s is not a folder. Please check path. " % ( dst_dir_path ))
return 1
file_list = None
#transport = paramiko.Transport(( hostname, port))
paramiko.util.log_to_file('paramiko.log')
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
#transport
try:
ssh.connect( hostname, username=username, password=password, timeout=5.0)
#transport.connect(username=username, password=password )
except Exception, err:
logger.error( "Failed to connect to the remote server. Reason: %s" % ( str(err) ) )
return 1
try:
#sftp = paramiko.SFTPClient.from_transport(transport)
sftp = ssh.open_sftp()
except Exception, err:
logger.error( "Failed to start SFTP session from connection to %s. Check that SFTP service is running and available. Reason: %s" % ( hostname, str(err) ))
return 1
try:
sftp.chdir(src_dir_path)
#file_list = sftp.listdir(path="%s" % ( src_dir_path ) )
file_list = sftp.listdir()
except Exception, err:
logger.error( "Failed to list files in folder %s. Please check path and permissions. Reason: %s" % ( src_dir_path, str(err) ))
return 1
match_text = re.compile( file_mask )
download_count = 0
for file in file_list:
# Here is an item name... but is it a file or directory?
#logger.info( "Downloading file %s." % ( file ) )
if not re.match( file_mask, file ):
continue
else:
logger.info( "File \"%s\" name matched file mask \"%s\". matches %s.Processing file..." % ( file, file_mask, (match_text.match( file_mask ) ) ) )
src_file_path = "./%s" % ( file )
dst_file_path = "/".join( [ dst_dir_path, file] )
retry_count = 0
while True:
try:
logger.info( "Downloading file %s to %s." % ( file, dst_file_path ) )
#sftp.get( file, dst_file_path, callback=printTotals ) #sftp.get( remote file, local file )
sftp.get( file, dst_file_path) #sftp.get( remote file, local file )
logger.info( "Successfully downloaded file %s to %s." % ( file, dst_file_path ) )
download_count += 1
break
except Exception, err:
if retry_count == retry_threshold:
logger.error( "Failed to download %s to %s. Reason: %s." % ( file, dst_file_path, str(err) ) )
sftp.close()
#transport.close()
return 1
else:
logger.error( "Failed to download %s to %s. Reason: %s." % ( file, dst_file_path, str(err) ) )
retry_count +=1
sftp.close()
transport.close()
logger.info( "%d files downloaded." % ( download_count ) )
return 0
Когда я запускаю функцию ниже, она загружает исходный файл примерно на 3 минуты, а затем закрывает сеанс, хотя загружается только 38-41 МБ (изменяется) файла 1-1.6 ГБ.
Из файла журнала Paramiko, похоже, что соединение SSh остается открытым, пока сеанс SFTP закрывается:
DEB [20120913-10: 05: 00.894] thr = 1 paramiko.transport: переключиться на новые ключи... DEB [20120913-10: 05: 06.953] thr = 1 paramiko.transport: Rekeying (получено 401 пакетов, получено 1053444 байт) DEB [20120913-10: 05: 07.391] thr = 1 paramiko.transport: kex algos: ['diffie-hellman-group1-sha1', 'diffie-hellman-group-exchange-sha1'] ключ сервера: ['ssh- dss '] client encrypt: [' aes256-ctr ',' aes192-ctr ',' aes128-ctr ',' aes256-cbc ',' aes192-cbc ',' aes128-cbc ',' twofish-cbc ',' blowfish-cbc ',' 3des-cbc ',' arcfour '] server encrypt: [' aes256-ctr ',' aes192-ctr ',' aes128-ctr ',' aes256-cbc ',' aes192-cbc ',' Mac OS: "hmac-sha1", "hmac-sha1-96", "hmac-md5", "cfc-cbc", "blowfish-cbc", "3ds-cbc", "arcfour" ], 'hmac-md5-96', '[email protected]'] сервер mac: ['hmac-sha1', 'hmac-sha1-96', 'hmac-md5', 'hmac-md5-96', '[email protected]'] клиент: ['[email protected]', 'zlib', 'none'] серверный сжимать: ['[email protected]', 'zlib', 'none' ] client lang: [''] server lang: [''] kex следует? False DEB [20120913-10: 05: 07.421] thr = 1 paramiko.transport: Чиперы согласились: local = aes128-ctr, remote = aes128-ctr DEB [20120913-10: 05: 07.421] thr = 1 paramiko.transport: использование kex diffie-hellman-group1-sha1; тип ключа сервера ssh-dss; шифр: локальный aes128-ctr, удаленный aes128-ctr; mac: локальный hmac-sha1, удаленный hmac-sha1; сжатие: локальный нет, удаленный нет DEB [20120913-10: 05: 07.625] thr = 1 paramiko.transport: перейти на новые ключи... INF [20120913-10: 05: 10.374] thr = 2 paramiko.transport.sftp: [chan 1] sftp session закрыт. DEB [20120913-10: 05: 10.388] thr = 2 paramiko.transport: [chan 1] EOF отправлено (1)
После этой точки script завершает работу с этим исключением (из блока try/except sftp.get())
Недостаточно ресурсов для завершения запроса
Сама система имеет гигабайты свободного места на диске, так что это не проблема.
Та же самая передача, с которой работает parakmiko, работает отлично с FileZilla и Java-приложением, которые я написал много лет назад для передачи SFTP. Поэтому я думаю, что это проблема с парамико.
Это выполняется в Windows XP и Windows Server 2003.
Я пробовал патчи Paramko 1.17, чтобы он чаще обновлял ключи, но передача все равно бросает исключение.
Python 2.7.3
Paramiko 1.7 с патчем
Windows 2003 Sevfer
Идеи?
Дополнительная информация:
Он терпит неудачу на Windows XP SP3 и Windows 2003, точно такие же поведение и сообщения об ошибках.
Информация о sys.version
Windows XP Workstation: "2.7.3 (по умолчанию, 10 апреля 2012, 23:31:26) [MSC v.1500 32 бит (Intel)] '
Windows 2003 Server: "2.7.3 (по умолчанию, 10 апреля 2012, 23:31:26) [MSC v.1500 32 бит (Intel)] '
Я заплатил файл packet.py, чтобы сократить время между продлениями ключей. Это не повлияло на поведение sftp.get().
Ответы
Ответ 1
Протокол SFTP не имеет возможности передавать данные файла; вместо этого у него есть способ запросить блок данных с определенного смещения в открытом файле. Наивный способ загрузки файла состоял в том, чтобы запросить первый блок, записать его на диск, затем запросить второй блок и так далее. Это надежный, но очень медленный.
Вместо этого у Paramiko есть трюк производительности, который он использует: когда вы вызываете .get()
, он немедленно отправляет запрос для каждого блока в файле и запоминает, какое смещение они должны быть записаны. Затем, когда каждый ответ приходит, он гарантирует, что он будет записан в правильное смещение на диске. Для получения дополнительной информации см. Методы SFTPFile.prefetch()
и SFTPFile.readv()
в документации Paramiko. Я подозреваю, что хранящаяся в нем информация о хранении, когда загрузка вашего 1GB файла может вызвать... что-то закончится из-за нехватки ресурсов, создавая сообщение "Недостаточно ресурсов".
Вместо того, чтобы использовать .get()
, если вы просто вызываете .open()
, чтобы получить экземпляр SFTPFile
, затем вызовите .read()
на этом объекте или просто передайте его в стандартную библиотечную функцию Python shutil.copyfileobj()
, чтобы загрузить содержание. Это должно избежать кэша предварительной выборки Paramiko и позволить вам загружать файл, даже если он не так быстро.
i.e:
def lazy_loading_ftp_file(sftp_host_conn, filename):
"""
Lazy loading ftp file when exception simple sftp.get call
:param sftp_host_conn: sftp host
:param filename: filename to be downloaded
:return: None, file will be downloaded current directory
"""
import shutil
try:
with sftp_host_conn() as host:
sftp_file_instance = host.open(filename, 'r')
with open(filename, 'wb') as out_file:
shutil.copyfileobj(sftp_file_instance, out_file)
return {"status": "sucess", "msg": "sucessfully downloaded file: {}".format(filename)}
except Exception as ex:
return {"status": "failed", "msg": "Exception in Lazy reading too: {}".format(ex)}
Ответ 2
У меня была очень похожая проблема, в моем случае файл только ~ 400 МБ, но он будет последовательно терпеть неудачу после загрузки около 35 МБ или около того. Он не всегда терпел неудачу при том же количестве загруженных байтов, но где-то около 35 - 40 МБ, файл прекратил бы передачу, а через минуту я получил бы "Недостаточно ресурсов для завершения запроса".
Загрузка файла через WinSCP или PSFTP работала нормально.
Я попробовал метод Screwtape, и он действительно работал, но был очень медленным. Мой 400-мегабайтный файл был в темпе, чтобы взять что-то вроде 4 часов для загрузки, что было неприемлемым таймфреймом для этого конкретного приложения.
Кроме того, когда-то, когда мы впервые установили это, все работало нормально. Но администратор сервера внес некоторые изменения в SFTP-сервер и что, когда что-то сломалось. Я не уверен, какие изменения были внесены, но поскольку он все еще работал нормально с использованием WinSCP/других методов SFTP, я не думал, что это будет полезно, если вы попытаетесь атаковать это со стороны сервера.
Я не собираюсь притворяться, что понимаю, почему, но вот что в итоге работает для меня:
-
Я загрузил и установил текущую версию Paramiko (1.11.1 в это время). Первоначально это не имело никакого значения, но я решил, что упомянул об этом на всякий случай, если это будет частью решения.
-
Трассировка стека для исключения:
File "C:\Python26\lib\site-packages\paramiko\sftp_client.py", line 676, in get
size = self.getfo(remotepath, fl, callback)
File "C:\Python26\lib\site-packages\paramiko\sftp_client.py", line 645, in getfo
data = fr.read(32768)
File "C:\Python26\lib\site-packages\paramiko\file.py", line 153, in read
new_data = self._read(read_size)
File "C:\Python26\lib\site-packages\paramiko\sftp_file.py", line 157, in _read
data = self._read_prefetch(size)
File "C:\Python26\lib\site-packages\paramiko\sftp_file.py", line 138, in _read_prefetch
self._check_exception()
File "C:\Python26\lib\site-packages\paramiko\sftp_file.py", line 483, in _check_exception
raise x
-
Сотрясая немного в sftp_file.py, я заметил это (строки 43-45 в текущей версии):
# Some sftp servers will choke if you send read/write requests larger than
# this size.
MAX_REQUEST_SIZE = 32768
-
По прихоти, я попытался изменить MAX_REQUEST_SIZE на 1024 и, вот и вот, я смог загрузить весь файл!
-
После того как я заработал, изменив MAX_REQUEST_SIZE на 1024, я попробовал кучу других значений между 1024 и 32768, чтобы увидеть, повлияло ли это на производительность или что-то еще. Но я всегда получал ошибку раньше или позже, когда значение было значительно больше, чем 1024 (1025 было в порядке, но 1048 в итоге не удалось).
Ответ 3
В дополнение к ответу Screwtape также стоит упомянуть, что вам следует, вероятно, ограничить размер блока .read([block size in bytes])
Смотрите ленивый метод чтения большого файла
У меня были реальные проблемы только с file.read()
без размера размера блока в 2.4, однако 2.7 определяет правильный размер блока.
Ответ 4
Я пытаюсь проследить код в paramiko, теперь я уверен, что это проблема сервера.
1. Проделанная предварительная выборка
Чтобы увеличить скорость загрузки, paramiko попытается выполнить предварительную выборку с помощью метода fetch. Когда вызывается метод SFTP_FILE.prefetch()
, создается новый поток и т.е. запрос на выборку отправляется на сервер, используя весь файл.
мы можем найти это в файле paramiko/sftp_file.py вокруг строки 464.
2. Как убедиться в проблеме сервера
Указанный выше запрос запускается в асинхронном режиме. SFTP_FILE._async_response()
используется для получения ответа от сервера async.And отслеживает код, мы можем найти, что это исключение создается в методе SFTP_FILE._async_response()
, который конвертируется из сообщения, отправленного с сервера.
Теперь мы уверены, что это исключение из сервера.
3. Как решить проблему
Поскольку у меня нет доступа к серверу, поэтому использовать sftp в командной строке - это мой лучший выбор. Но, с другой стороны, теперь мы знаем, что слишком много запросов заставляет сервер сбой, поэтому мы можем заснуть при отправке запрос на сервер.
Ответ 5
Я использую этот тип сценария с paramiko для больших файлов, вы можете поиграться с window_size
/packet size
чтобы увидеть, что работает лучше для вас, если вы хотите, чтобы он был более производительным, вы можете запускать параллельные процессы для чтения различных фрагментов файлов в параллельно с использованием второго метода (см. http://docs.paramiko.org/en/latest/api/sftp.html#paramiko.sftp_file.SFTPFile.readv)
import time, paramiko
MAX_RETRIES = 10
ftp_server = "ftp.someserver.com"
port = 22
sftp_file = "/somefolder/somefile.txt"
local_file = "/somefolder/somewhere/here.txt"
ssh_conn = sftp_client = None
username = "username"
password = "password"
start_time = time.time()
for retry in range(MAX_RETRIES):
try:
ssh_conn = paramiko.Transport((ftp_server, port))
ssh_conn.connect(username=username, password=password)
#method 1 using sftpfile.get and settings window_size, max_packet_size
window_size = pow(4, 12)#about ~16MB chunks
max_packet_size = pow(4, 12)
sftp_client = paramiko.SFTPClient.from_transport(ssh_conn, window_size=window_size, max_packet_size=max_packet_size)
sftp_client.get(sftp_file, local_file)
#method 2 breaking up file into chunks to read in parallel
sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
filesize = sftp_client.stat(sftp_file).st_size
chunksize = pow(4, 12)#<-- adjust this and benchmark speed
chunks = [(offset, chunksize) for offset in range(0, filesize, chunksize)]
with sftp_client.open(sftp_file, "rb") as infile:
with open(local_file, "wb") as outfile:
for chunk in infile.readv(chunks):
outfile.write(chunk)
break
except (EOFError, paramiko.ssh_exception.SSHException) as x:
retry += 1
print("%s %s - > retrying %s..." % (type(x), x, retry))
time.sleep(abs(retry - 1) * 10)
#back off in steps of 10, 20.. seconds
finally:
if hasattr(sftp_client, "close") and callable(sftp_client.close):
sftp_client.close()
if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
ssh_conn.close()
print("Loading File %s Took %d seconds " % (sftp_file, time.time() - start_time))
Если вы действительно обеспокоены производительностью, вы можете запустить второй метод и разбить файл на несколько процессов/потоков, здесь пример кода с использованием многопоточности, который записывает несколько частей файла, а затем объединяет их в один файл.
import threading, os, time, paramiko
#you could make the number of threads relative to file size
NUM_THREADS = 4
MAX_RETRIES = 10
def make_filepart_path(file_path, part_number):
'''creates filepart path from filepath'''
return "%s.filepart.%s" % (file_path, part_number+1)
def write_chunks(chunks, tnum, local_file_part, username, password, ftp_server, max_retries):
ssh_conn = sftp_client = None
for retry in range(max_retries):
try:
ssh_conn = paramiko.Transport((ftp_server, port))
ssh_conn.connect(username=username, password=password)
sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
with sftp_client.open(sftp_file, "rb") as infile:
with open(local_file_part, "wb") as outfile:
for chunk in infile.readv(chunks):
outfile.write(chunk)
break
except (EOFError, paramiko.ssh_exception.SSHException) as x:
retry += 1
print("%s %s Thread %s - > retrying %s..." % (type(x), x, tnum, retry))
time.sleep(abs(retry - 1) * 10)
finally:
if hasattr(sftp_client, "close") and callable(sftp_client.close):
sftp_client.close()
if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
ssh_conn.close()
start_time = time.time()
for retry in range(MAX_RETRIES):
try:
ssh_conn = paramiko.Transport((ftp_server, port))
ssh_conn.connect(username=username, password=password)
sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
#connect to get the file size in order to calculate chunks
filesize = sftp_client.stat(sftp_file).st_size
sftp_client.close()
ssh_conn.close()
chunksize = pow(4, 12)
chunks = [(offset, chunksize) for offset in range(0, filesize, chunksize)]
thread_chunk_size = (len(chunks) // NUM_THREADS) + 1
#break the chunks into sub lists to hand off to threads
thread_chunks = [chunks[i:i+thread_chunk_size] for i in range(0, len(chunks) - 1, thread_chunk_size)]
threads = []
fileparts = []
for thread_num in range(len(thread_chunks)):
local_file_part = make_filepart_path(local_file, thread_num)
args = (thread_chunks[thread_num], thread_num, local_file_part, username, password, ftp_server, MAX_RETRIES)
threads.append(threading.Thread(target=write_chunks, args=args))
fileparts.append(local_file_part)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
#join file parts into one file, remove fileparts
with open(local_file, "wb") as outfile:
for filepart in fileparts:
with open(filepart, "rb") as infile:
outfile.write(infile.read())
os.remove(filepart)
break
except (EOFError, paramiko.ssh_exception.SSHException) as x:
retry += 1
print("%s %s - > retrying %s..." % (type(x), x, retry))
time.sleep(abs(retry - 1) * 10)
finally:
if hasattr(sftp_client, "close") and callable(sftp_client.close):
sftp_client.close()
if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
ssh_conn.close()
print("Loading File %s Took %d seconds " % (sftp_file, time.time() - start_time))
Ответ 6
Я столкнулся с проблемами при загрузке больших файлов (> 1 ГБ) через SFTP с использованием pysftp. Базовая библиотека - Парамико.
Погуглите насчет проблемы, приведите меня сюда и есть отличные решения. Тем не менее, многие посты являются относительно старыми, и я полагаю, что большинство из этих проблем были решены с течением времени. И это не помогло с моей проблемой.
А именно: Paramiko сталкивается с ошибкой памяти при загрузке фрагментов во время предварительной выборки в sftp_file.py. Список расширяется за пределы и ошибка памяти как-то не блокирует выполнение. Вероятно, он молча использовался в стеке. Загрузка завершается неудачно только при возникновении этой ошибки, и они запускаются в отдельных потоках.
В любом случае, способ контролировать размер списка - установить MAX_REQUEST_SIZE:
paramiko.sftp_file.SFTPFile.MAX_REQUEST_SIZE = pow(2, 22) # 4MB per chunk
Однако, если вы превысите 16 МБ, вы столкнетесь с новой проблемой: paramiko.sftp.SFTPError: Получен пакет мусора. Оказывается, есть проверка sftp.py в методе _read_packet:
# most sftp servers won't accept packets larger than about 32k, so
# anything with the high byte set (> 16MB) is just garbage.
if byte_ord(x[0]):
raise SFTPError("Garbage packet received")
Итак, если порция составляет> 16 МБ, у нас возникает эта ошибка. Я не хотел возиться с самой библиотекой Paramiko, поэтому мне пришлось сохранять размер куска на "приемлемом максимуме" в 4 МБ.
Таким образом, я смог загрузить файлы размером> 30 ГБ.
Надеюсь, что это помогает людям.