Ответ 1
import threading
def printit():
threading.Timer(5.0, printit).start()
print "Hello, World!"
printit()
# continue with the rest of your code
https://docs.python.org/3/library/threading.html#timer-objects
Есть ли способ, например, напечатать Hello World!
каждые n секунд?
Например, программа будет проходить через любой код, который у меня был, а затем, когда это было 5 секунд (с time.sleep()
), он выполнил бы этот код. Я бы использовал это, чтобы обновить файл, но не печатать Hello World.
Например:
startrepeat("print('Hello World')", .01) # Repeats print('Hello World') ever .01 seconds
for i in range(5):
print(i)
>> Hello World!
>> 0
>> 1
>> 2
>> Hello World!
>> 3
>> Hello World!
>> 4
import threading
def printit():
threading.Timer(5.0, printit).start()
print "Hello, World!"
printit()
# continue with the rest of your code
https://docs.python.org/3/library/threading.html#timer-objects
Мой скромный подход к предмету, обобщение ответа Алекса Мартелли, с помощью функции start() и stop():
from threading import Timer
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
self.start()
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
self._timer = Timer(self.interval, self._run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False
Использование:
from time import sleep
def hello(name):
print "Hello %s!" % name
print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
sleep(5) # your long-running job goes here...
finally:
rt.stop() # better in a try/finally block to make sure the program ends!
Особенности:
start()
и stop()
безопасны для вызова несколько раз, даже если таймер уже запущен/остановленinterval
в любое время, это будет эффективно после следующего прогона. То же самое для args
, kwargs
и даже function
!Спасите себя эпизодом шизофреника и используйте планировщик Advanced Python: http://pythonhosted.org/APScheduler
Код настолько прост:
from apscheduler.scheduler import Scheduler
sched = Scheduler()
sched.start()
def some_job():
print "Every 10 seconds"
sched.add_interval_job(some_job, seconds = 10)
....
sched.shutdown()
def update():
import time
while True:
print 'Hello World!'
time.sleep(5)
Это будет работать как функция. while True:
заставляет его работать вечно. Вы всегда можете извлечь его из функции, если вам нужно.
Вот простой пример, совместимый с APScheduler 3.00+:
# note that there are many other schedulers available
from apscheduler.schedulers.background import BackgroundScheduler
sched = BackgroundScheduler()
def some_job():
print('Every 10 seconds')
# seconds can be replaced with minutes, hours, or days
sched.add_job(some_job, 'interval', seconds=10)
sched.start()
...
sched.shutdown()
В качестве альтернативы вы можете использовать следующее. В отличие от многих альтернатив, этот таймер будет выполнять нужный код каждые n секунд точно (независимо от времени, которое требуется для выполнения кода). Так что это отличный вариант, если вы не можете позволить себе какой-либо дрейф.
import time
from threading import Event, Thread
class RepeatedTimer:
"""Repeat `function` every `interval` seconds."""
def __init__(self, interval, function, *args, **kwargs):
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.start = time.time()
self.event = Event()
self.thread = Thread(target=self._target)
self.thread.start()
def _target(self):
while not self.event.wait(self._time):
self.function(*self.args, **self.kwargs)
@property
def _time(self):
return self.interval - ((time.time() - self.start) % self.interval)
def stop(self):
self.event.set()
self.thread.join()
# start timer
timer = RepeatedTimer(10, print, 'Hello world')
# stop timer
timer.stop()
Здесь версия, которая не создает новый поток каждые n
секунд:
from threading import Event, Thread
def call_repeatedly(interval, func, *args):
stopped = Event()
def loop():
while not stopped.wait(interval): # the first call is in `interval` secs
func(*args)
Thread(target=loop).start()
return stopped.set
Событие используется для остановки повторений:
cancel_future_calls = call_repeatedly(5, print, "Hello, World")
# do something else here...
cancel_future_calls() # stop future calls
Вы можете запустить отдельный поток, единственная обязанность которого - подсчитать в течение 5 секунд, обновить файл, повторить. Вы не хотите, чтобы этот отдельный поток мешал вашему основному потоку.