Python threading.timer - повторять функцию каждые n секунд
У меня возникают трудности с таймером python и я бы очень признателен за некоторые советы или помощь: D
Я не слишком разбираюсь в том, как работают потоки, но я просто хочу отключить функцию каждые 0,5 секунды и иметь возможность запускать и останавливать и reset таймер.
Тем не менее, я продолжаю получать RuntimeError: threads can only be started once
, когда я выполняю threading.timer.start()
дважды. Есть ли для этого работа? Я попытался применить threading.timer.cancel()
перед каждым запуском.
Псевдокод:
t=threading.timer(0.5,function)
while True:
t.cancel()
t.start()
Ответы
Ответ 1
Лучший способ - запустить поток таймера один раз. Внутри потока таймера вы бы указали следующие
class MyThread(Thread):
def __init__(self, event):
Thread.__init__(self)
self.stopped = event
def run(self):
while not self.stopped.wait(0.5):
print("my thread")
# call a function
В коде, который запустил таймер, вы можете set
остановить событие, чтобы остановить таймер.
stopFlag = Event()
thread = MyThread(stopFlag)
thread.start()
# this will stop the timer
stopFlag.set()
Ответ 2
Использование потоков таймера -
from threading import Timer,Thread,Event
class perpetualTimer():
def __init__(self,t,hFunction):
self.t=t
self.hFunction = hFunction
self.thread = Timer(self.t,self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t,self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def cancel(self):
self.thread.cancel()
def printer():
print 'ipsem lorem'
t = perpetualTimer(5,printer)
t.start()
это может быть остановлено на t.cancel()
Ответ 3
Из Эквивалент setInterval в python:
import threading
def setInterval(interval):
def decorator(function):
def wrapper(*args, **kwargs):
stopped = threading.Event()
def loop(): # executed in another thread
while not stopped.wait(interval): # until stopped
function(*args, **kwargs)
t = threading.Thread(target=loop)
t.daemon = True # stop if the program exits
t.start()
return stopped
return wrapper
return decorator
Использование:
@setInterval(.5)
def function():
"..."
stop = function() # start timer, the first call is in .5 seconds
stop.set() # stop the loop
stop = function() # start new timer
# ...
stop.set()
Или здесь ту же функциональность, но как отдельную функцию вместо декоратора:
cancel_future_calls = call_repeatedly(60, print, "Hello, World")
# ...
cancel_future_calls()
Здесь как это сделать без использования потоков.
Ответ 4
Небольшое улучшение по Гансу Тогда ответьте, мы можем просто создать подкласс функции Timer. Следующее становится всем нашим кодом "таймера повтора", и его можно использовать в качестве замены для threading.Timer со всеми одинаковыми аргументами:
from threading import Timer
class RepeatTimer(Timer):
def run(self):
while not self.finished.wait(self.interval):
self.function(*self.args, **self.kwargs)
Пример использования:
def dummyfn(msg="foo"):
print(msg)
timer = RepeatTimer(1, dummyfn)
timer.start()
time.sleep(5)
timer.cancel()
производит следующий вывод:
foo
foo
foo
foo
а также
timer = RepeatTimer(1, dummyfn, args=("bar",))
timer.start()
time.sleep(5)
timer.cancel()
производит
bar
bar
bar
bar
Ответ 5
В интересах предоставления правильного ответа с использованием таймера в соответствии с запросом OP, я улучшу ответ swapnil jariwala:
from threading import Timer
import time
class InfiniteTimer():
"""A Timer class that does not stop, unless you want it to."""
def __init__(self, seconds, target):
self._should_continue = False
self.is_running = False
self.seconds = seconds
self.target = target
self.thread = None
def _handle_target(self):
self.is_running = True
self.target()
self.is_running = False
self._start_timer()
def _start_timer(self):
if self._should_continue: # Code could have been running when cancel was called.
self.thread = Timer(self.seconds, self._handle_target)
self.thread.start()
def start(self):
if not self._should_continue and not self.is_running:
self._should_continue = True
self._start_timer()
else:
print("Timer already started or running, please wait if you're restarting.")
def cancel(self):
if self.thread is not None:
self._should_continue = False # Just in case thread is running and cancel fails.
self.thread.cancel()
else:
print("Timer never started or failed to initialize.")
def tick():
print('ipsem lorem')
# Example Usage
t = InfiniteTimer(0.5, tick)
t.start()
Время импорта является необязательным без использования примера.
Ответ 6
Я изменил некоторый код в коде swapnil-jariwala, чтобы сделать немного консольных часов.
from threading import Timer, Thread, Event
from datetime import datetime
class PT():
def __init__(self, t, hFunction):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def printer():
tempo = datetime.today()
h,m,s = tempo.hour, tempo.minute, tempo.second
print(f"{h}:{m}:{s}")
t = PT(1, printer)
t.start()
ВЫХОД
>>> 11:39:11
11:39:12
11:39:13
11:39:14
11:39:15
11:39:16
...
Таймер с графическим интерфейсом tkinter
Этот код помещает таймер часов в маленькое окно с tkinter
from threading import Timer, Thread, Event
from datetime import datetime
import tkinter as tk
app = tk.Tk()
lab = tk.Label(app, text="Timer will start in a sec")
lab.pack()
class perpetualTimer():
def __init__(self, t, hFunction):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def cancel(self):
self.thread.cancel()
def printer():
tempo = datetime.today()
clock = "{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second)
try:
lab['text'] = clock
except RuntimeError:
exit()
t = perpetualTimer(1, printer)
t.start()
app.mainloop()
Пример игры с карточками (вроде)
from threading import Timer, Thread, Event
from datetime import datetime
class perpetualTimer():
def __init__(self, t, hFunction):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def cancel(self):
self.thread.cancel()
x = datetime.today()
start = x.second
def printer():
global questions, counter, start
x = datetime.today()
tempo = x.second
if tempo - 3 > start:
show_ans()
#print("\n{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second), end="")
print()
print("-" + questions[counter])
counter += 1
if counter == len(answers):
counter = 0
def show_ans():
global answers, c2
print("It is {}".format(answers[c2]))
c2 += 1
if c2 == len(answers):
c2 = 0
questions = ["What is the capital of Italy?",
"What is the capital of France?",
"What is the capital of England?",
"What is the capital of Spain?"]
answers = "Rome", "Paris", "London", "Madrid"
counter = 0
c2 = 0
print("Get ready to answer")
t = perpetualTimer(3, printer)
t.start()
выход:
Get ready to answer
>>>
-What is the capital of Italy?
It is Rome
-What is the capital of France?
It is Paris
-What is the capital of England?
...
Ответ 7
Я должен был сделать это для проекта. То, что я закончил, это запустить отдельный поток для функции
t = threading.Thread(target =heartbeat, args=(worker,))
t.start()
**** heartbeat - это моя функция, рабочий - один из моих аргументов ****
внутри моей функции heartbeat:
def heartbeat(worker):
while True:
time.sleep(5)
#all of my code
Поэтому, когда я запускаю поток, функция будет многократно ждать 5 секунд, запускать весь мой код и делать это бесконечно. Если вы хотите убить процесс, просто уничтожьте поток.
Ответ 8
Я реализовал класс, который работает как таймер.
Я оставляю ссылку здесь, если кому-то это нужно:
https://github.com/ivanhalencp/python/tree/master/xTimer
Ответ 9
from threading import Timer
def TaskManager():
#do stuff
t = Timer( 1, TaskManager )
t.start()
TaskManager()
Вот небольшой пример, он поможет лучше понять, как он работает. Функция taskManager() в конце создает отложенный вызов функции для себя.
Попробуйте изменить переменную "dalay", и вы сможете увидеть разницу
from threading import Timer, _sleep
# ------------------------------------------
DATA = []
dalay = 0.25 # sec
counter = 0
allow_run = True
FIFO = True
def taskManager():
global counter, DATA, delay, allow_run
counter += 1
if len(DATA) > 0:
if FIFO:
print("["+str(counter)+"] new data: ["+str(DATA.pop(0))+"]")
else:
print("["+str(counter)+"] new data: ["+str(DATA.pop())+"]")
else:
print("["+str(counter)+"] no data")
if allow_run:
#delayed method/function call to it self
t = Timer( dalay, taskManager )
t.start()
else:
print(" END task-manager: disabled")
# ------------------------------------------
def main():
DATA.append("data from main(): 0")
_sleep(2)
DATA.append("data from main(): 1")
_sleep(2)
# ------------------------------------------
print(" START task-manager:")
taskManager()
_sleep(2)
DATA.append("first data")
_sleep(2)
DATA.append("second data")
print(" START main():")
main()
print(" END main():")
_sleep(2)
DATA.append("last data")
allow_run = False
Ответ 10
Мне нравится правильный ответ right2clicky, особенно в том, что он не требует разрушения потока и создания нового при каждом срабатывании таймера. Кроме того, легко переопределить создание класса с обратным вызовом таймера, который вызывается периодически. Это мой нормальный вариант использования:
class MyClass(RepeatTimer):
def __init__(self, period):
super().__init__(period, self.on_timer)
def on_timer(self):
print("Tick")
if __name__ == "__main__":
mc = MyClass(1)
mc.start()
time.sleep(5)
mc.cancel()
Ответ 11
Это альтернативная реализация, использующая функцию вместо класса. Вдохновленный @Эндрю Уилкинс выше.
Потому что ожидание точнее сна (учитывает время выполнения функции):
import threading
PING_ON = threading.Event()
def ping():
while not PING_ON.wait(1):
print("my thread %s" % str(threading.current_thread().ident))
t = threading.Thread(target=ping)
t.start()
sleep(5)
PING_ON.set()
Ответ 12
У меня есть проблема, когда я пытаюсь отменить поток с другим потоком:
Код:
from threading import Timer, Thread, Event
import time
import RPi.GPIO as GPIO
import time
class perpetualTimer():
def __init__(self,t,hFunction):
self.t=t
self.hFunction = hFunction
self.thread = Timer(self.t,self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t,self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def cancel(self):
self.thread.cancel()
def apagar():
print ('ipsem lorem')
def encender():
s.calcel()
t.calcel()
t = perpetualTimer(0.5, apagar)
s = perpetualTimer(5, encender)
t.start()
s.start()
Ошибка:
[email protected]:~/proyecto $ python hilo_infinito_fallo.py
ipsem lorem
ipsem lorem
ipsem lorem
ipsem lorem
ipsem lorem
ipsem lorem
ipsem lorem
ipsem lorem
ipsem lorem
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 1073, in run
self.function(*self.args, **self.kwargs)
File "hilo_infinito_fallo.py", line 15, in handle_function
self.hFunction()
File "hilo_infinito_fallo.py", line 29, in encender
s.calcel()
AttributeError: perpetualTimer instance has no attribute 'calcel'
ipsem lorem
ipsem lorem
ipsem lorem
ipsem lorem
Как я могу отменить тему в определенное время?