Как отключить длительную программу с помощью rxpython?

Скажем, у меня есть длинная функция python, которая выглядит примерно так?

import random
import time
from rx import Observable
def intns(x):
    y = random.randint(5,10)
    print(y)
    print('begin')
    time.sleep(y)
    print('end')
    return x

Я хочу установить тайм-аут 1000ms.

Так что я донг что-то вроде, создавая наблюдаемый и сопоставляя его с помощью вышеупомянутого интенсивного вычисления.

a = Observable.repeat(1).map(lambda x: intns(x))

Теперь для каждого испускаемого значения, если требуется больше 1000 мс, я хочу закончить наблюдаемый, как только я достигнет 1000ms, используя on_error или on_completed

a.timeout(1000).subscribe(lambda x: print(x), lambda x: print(x))

инструкция выше получает тайм-аут и вызывает on_error, но она продолжает заканчивать вычисление интенсивного вычисления и только затем возвращается к следующим операторам. Есть ли лучший способ сделать это?

Последний оператор печатает следующие

8 # no of seconds to sleep
begin # begins sleeping, trying to emit the first value
Timeout # operation times out, and calls on_error
end # thread waits till the function ends

Идея заключается в том, что если какой-то конкретный тайм-аут функции, я хочу иметь возможность продолжить свою программу и игнорировать результат.

Мне было интересно, была ли функция intns выполнена в отдельном потоке, я думаю, что основной поток продолжает выполнение после таймаута, но я все же хочу прекратить вычислять функцию intns в потоке или как-то убить ее.

Ответы

Ответ 1

Вы можете сделать это частично, используя threading Хотя нет конкретного способа убить поток в python, вы можете реализовать метод, чтобы помечать поток до конца.

Это не будет работать, если поток ожидает других ресурсов (в вашем случае вы имитировали "длинный" запущенный код случайным ожиданием)

См. также Есть ли способ убить нить в Python?

Ответ 2

Таким образом, он работает:

import random
import time
import threading
import os

def intns(x):
    y = random.randint(5,10)
    print(y)
    print('begin')
    time.sleep(y)
    print('end')
    return x


thr = threading.Thread(target=intns, args=([10]), kwargs={})
thr.start()
st = time.clock();
while(thr.is_alive() == True):
    if(time.clock() - st > 9):
        os._exit(0)

Ответ 3

Вот пример тайм-аута

import random
import time
import threading

_timeout = 0

def intns(loops=1):
    print('begin')
    processing = 0
    for i in range(loops):
        y = random.randint(5,10)
        time.sleep(y)
        if _timeout == 1:
            print('timedout end')
            return
        print('keep processing')
    return

# this will timeout
timeout_seconds = 10
loops = 10

# this will complete
#timeout_seconds = 30.0
#loops = 1

thr = threading.Thread(target=intns, args=([loops]), kwargs={})
thr.start()
st = time.clock();
while(thr.is_alive() == True):
    if(time.clock() - st > timeout_seconds):
        _timeout = 1

thr.join()
if _timeout == 0:
    print ("completed")
else:
    print ("timed-out")

Ответ 4

Ниже приведен класс, который можно вызвать с помощью with timeout() :

Если блок под кодом работает дольше, чем указанное время, создается TimeoutError.

import signal

class timeout:
    # Default value is 1 second (1000ms)
    def __init__(self, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
    def handle_timeout(self, signum, frame):
        raise TimeoutError(self.error_message)
    def __enter__(self):
        signal.signal(signal.SIGALRM, self.handle_timeout)
        signal.alarm(self.seconds)
    def __exit__(self, type, value, traceback):
        signal.alarm(0)

# example usage
with timeout() :
    # infinite while loop so timeout is reached
    while True :
        pass

Если я понимаю вашу функцию, вот как выглядит ваша реализация:

def intns(x):
    y = random.randint(5,10)
    print(y)
    print('begin')
    with timeout() :
        time.sleep(y)
    print('end')
    return x

Ответ 5

Вы можете использовать time.sleep() и создать цикл while для time.clock()