Прерывать функцию через определенное время

В python для примера с игрушкой:

for x in range(0, 3):
    # call function A(x)

Я хочу продолжить цикл for, если функция A занимает более 5 секунд, пропустив ее, чтобы я не застрял или не потерял время.

Выполняя поиск, я понял, что подпроцесс или поток могут помочь, но я понятия не имею, как реализовать здесь. Любая помощь будет отличной. Благодаря

Ответы

Ответ 1

Я думаю, что создание нового процесса может быть излишним. Если вы используете Mac или систему на базе Unix, вы должны использовать signal.SIGALRM для принудительного выключения функций, которые занимают слишком много времени. Это будет работать на функциях, которые работают на холостом ходу для сетевых или других проблем, которые вы абсолютно не можете обработать, изменив свою функцию. У меня есть пример использования этого в этом ответе:

fooobar.com/questions/381763/...

Редактирование моего ответа здесь, хотя я не уверен, что должен это сделать:

import signal

class TimeoutException(Exception):   # Custom exception class
    pass

def timeout_handler(signum, frame):   # Custom signal handler
    raise TimeoutException

# Change the behavior of SIGALRM
signal.signal(signal.SIGALRM, timeout_handler)

for i in range(3):
    # Start the timer. Once 5 seconds are over, a SIGALRM signal is sent.
    signal.alarm(5)    
    # This try/except loop ensures that 
    #   you'll catch TimeoutException when it sent.
    try:
        A(i) # Whatever your function that might hang
    except TimeoutException:
        continue # continue the for loop if function A takes more than 5 second
    else:
        # Reset the alarm
        signal.alarm(0)

Это в основном устанавливает таймер на 5 секунд, затем пытается выполнить ваш код. Если он не завершится до истечения времени, отправляется SIGALRM, который мы поймаем и превратимся в исключение TimeoutException. Это заставляет вас за исключением блока, где ваша программа может продолжаться.

EDIT: whoops, TimeoutException - это класс, а не функция. Спасибо, abarnert!

Ответ 2

Комментарии верны в том, что вы должны проверить внутри. Вот потенциальное решение. Обратите внимание, что асинхронная функция (например, с использованием потока) отличается от этого решения. Это синхронно, что означает, что он все равно будет работать последовательно.

import time

for x in range(0,3):
    someFunction()

def someFunction():
    start = time.time()
    while (time.time() - start < 5):
        # do your normal function

    return;

Ответ 3

Если вы можете сломать свою работу и проверить все так часто, это почти всегда лучшее решение. Но иногда это невозможно - например, возможно, вы читаете файл с медленным файловым ресурсом, который время от времени просто зависает в течение 30 секунд. Чтобы справиться с этим, вам придется перестроить всю вашу программу вокруг цикла асинхронного ввода-вывода.

Если вам не нужно быть кросс-платформенным, вы можете использовать сигналы на * nix (включая Mac и Linux), APC на Windows и т.д. Но если вам нужно быть кросс-платформенным, это не работает.

Итак, если вам действительно нужно делать это одновременно, вы можете, а иногда и вам. В этом случае вы, вероятно, хотите использовать для этого процесс, а не поток. Вы не можете реально убить поток безопасно, но вы можете убить процесс, и он может быть таким же безопасным, каким вы хотите. Кроме того, если поток занимает 5 секунд, потому что он связан с процессором, вы не хотите сражаться с ним над GIL.

Здесь есть два основных варианта.


Сначала вы можете поместить код в другой script и запустить его с помощью subprocess:

subprocess.check_call([sys.executable, 'other_script.py', arg, other_arg],
                      timeout=5)

Поскольку это происходит через нормальные каналы дочернего процесса, единственное сообщение, которое вы можете использовать, это некоторые строки argv, значение возврата к успеху/неудаче (на самом деле небольшое целое число, но это не намного лучше) и, необязательно, hunk текста и фрагмента текста.


В качестве альтернативы вы можете использовать multiprocessing для создания поточного дочернего процесса:

p = multiprocessing.Process(func, args)
p.start()
p.join(5)
if p.is_alive():
    p.terminate()

Как вы можете видеть, это немного сложнее, но лучше всего несколькими способами:

  • Вы можете передавать произвольные объекты Python (по крайней мере, все, что можно мариновать), а не просто строки.
  • Вместо того, чтобы поставить целевой код в полностью независимый script, вы можете оставить его как функцию в том же script.
  • Это более гибко, например, если вам потребуется, скажем, передать обновления прогресса, очень просто добавить очередь в любом или в обоих направлениях.

Большая проблема с любым видом parallelism заключается в совместном использовании изменяемых данных, например, при выполнении фоновой задачи глобальный словарь как часть его работы (что, по вашему мнению, вы пытаетесь сделать). С потоками вы можете сойти с рук, но условия гонки могут привести к повреждению данных, поэтому вы должны быть очень осторожны с блокировкой. С дочерними процессами вы не можете избежать этого. (Да, вы можете использовать общую память, поскольку "Состояние совместного доступа между процессами" объясняет, но это ограничивается простыми типами, такими как числа, фиксированные массивы и типы вы знаете, как определить как C-структуры, и это возвращает вас к тем же проблемам, что и потоки.)


В идеале вы организуете вещи, поэтому вам не нужно делиться какими-либо данными во время выполнения процесса - вы передаете dict в качестве параметра и получаете обратно dict назад. Это обычно довольно легко организовать, когда у вас есть ранее синхронная функция, которую вы хотите поместить в фоновом режиме.

Но что, если, скажем, частичный результат лучше результата? В этом случае самым простым решением является передача результатов по очереди. Вы можете сделать это с явной очередью, как описано в Обмен объектами между процессами, но есть более простой способ.

Если вы можете разбить монолитный процесс на отдельные задачи, по одному для каждого значения (или группы значений), которое вы хотели вставить в словарь, вы можете запланировать их на Pool -или, еще лучше, a concurrent.futures.Executor. (Если вы находитесь на Python 2.x или 3.1, см. Backport futures в PyPI.)

Скажем, ваша медленная функция выглядела так:

def spam():
    global d
    for meat in get_all_meats():
        count = get_meat_count(meat)
        d.setdefault(meat, 0) += count

Вместо этого вы сделаете следующее:

def spam_one(meat):
    count = get_meat_count(meat)
    return meat, count

with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
    results = executor.map(spam_one, get_canned_meats(), timeout=5)
    for (meat, count) in results:
        d.setdefault(meat, 0) += count

Как много результатов, как вы получаете в течение 5 секунд, добавляются в dict; если это не все из них, остальные оставлены, а TimeoutError поднят (который вы можете обрабатывать, как вы хотите - запишите его, сделайте какой-то быстрый резервный код, что угодно).

И если задачи действительно независимы (поскольку они в моем глупом маленьком примере, но, конечно, они могут быть не в вашем реальном коде, по крайней мере, не без большой редизайна), вы можете распараллелить работу бесплатно удалив это max_workers=1. Затем, если вы запустите его на 8-ядерном компьютере, он начнет 8 рабочих и даст им каждую 1/8 часть работы, и все будет сделано быстрее. (Обычно это не 8x, а быстро, но часто 3-6 раз быстрее, что все еще неплохо.)

Ответ 4

Это кажется лучшей идеей (извините, но не уверен в именах python):

import signal

def signal_handler(signum, frame):
    raise Exception("Timeout!")

signal.signal(signal.SIGALRM, signal_handler)
signal.alarm(3)   # Three seconds
try:
    for x in range(0, 3):
        # call function A(x)
except Exception, msg:
    print "Timeout!"
signal.alarm(0)    # reset