Запланировать повторное событие в Python 3
Я пытаюсь запланировать повторяющееся событие для запуска каждую минуту в Python 3.
Я видел класс sched.scheduler
, но мне интересно, есть ли другой способ сделать это. Я слышал упоминания, что я мог бы использовать несколько потоков для этого, что я бы не прочь сделать.
Я в основном запрашиваю JSON, а затем разбираю его; его значение изменяется со временем.
Чтобы использовать sched.scheduler
, мне нужно создать цикл, чтобы запросить его, чтобы запланировать четное выполнение в течение одного часа:
scheduler = sched.scheduler(time.time, time.sleep)
# Schedule the event. THIS IS UGLY!
for i in range(60):
scheduler.enter(3600 * i, 1, query_rate_limit, ())
scheduler.run()
Какие еще способы сделать это?
Ответы
Ответ 1
Вы можете использовать threading.Timer
, но также планируете одноразовое событие, аналогично методу .enter
объектов планировщика.
Нормальный шаблон (на любом языке) для преобразования одноразового планировщика в периодический планировщик должен состоять в том, чтобы каждое событие перепланировало себя с заданным интервалом. Например, с sched
, я бы не использовал цикл, как вы делаете, а что-то вроде:
def periodic(scheduler, interval, action, actionargs=()):
scheduler.enter(interval, 1, periodic,
(scheduler, interval, action, actionargs))
action(*actionargs)
и инициировать весь "вечный периодический график" с помощью вызова
periodic(scheduler, 3600, query_rate_limit)
Или, я мог бы использовать threading.Timer
вместо scheduler.enter
, но шаблон довольно схож.
Если вам нужна более четкая вариация (например, прекратите периодическую перепланировку в определенный момент времени или при определенных условиях), это не слишком сложно для размещения с несколькими дополнительными параметрами.
Ответ 2
Мой скромный подход к теме:
from threading import Timer
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.function = function
self.interval = interval
self.args = args
self.kwargs = kwargs
self.is_running = False
self.start()
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
self._timer = Timer(self.interval, self._run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False
Использование:
from time import sleep
def hello(name):
print "Hello %s!" % name
print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
sleep(5) # your long-running job goes here...
finally:
rt.stop() # better in a try/finally block to make sure the program ends!
Особенности:
- Стандартная библиотека, без внешних зависимостей
- Использует шаблон, предложенный Алексом Мартнелли
-
start()
и stop()
безопасны для вызова несколько раз, даже если таймер уже запущен/остановлен
- Функция, которая должна быть вызвана, может иметь позиционные и именованные аргументы
- Вы можете изменить
interval
в любое время, это будет эффективно после следующего прогона. То же самое для args
, kwargs
и даже function
!
Ответ 3
Вы можете использовать schedule. Он работает на Python 2.7 и 3.3 и довольно легкий:
import schedule
import time
def job():
print("I'm working...")
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
while 1:
schedule.run_pending()
time.sleep(1)
Ответ 4
Вы можете использовать Advanced Python Scheduler. Он даже имеет cron-подобный интерфейс.
Ответ 5
Основываясь на ответе MestreLion, он решает небольшую проблему при многопоточности:
from threading import Timer, Lock
class Periodic(object):
"""
A periodic task running in threading.Timers
"""
def __init__(self, interval, function, *args, **kwargs):
self._lock = Lock()
self._timer = None
self.function = function
self.interval = interval
self.args = args
self.kwargs = kwargs
self._stopped = True
if kwargs.pop('autostart', True):
self.start()
def start(self, from_run=False):
self._lock.acquire()
if from_run or self._stopped:
self._stopped = False
self._timer = Timer(self.interval, self._run)
self._timer.start()
self._lock.release()
def _run(self):
self.start(from_run=True)
self.function(*self.args, **self.kwargs)
def stop(self):
self._lock.acquire()
self._stopped = True
self._timer.cancel()
self._lock.release()
Ответ 6
Используйте сельдерей.
from celery.task import PeriodicTask
from datetime import timedelta
class ProcessClicksTask(PeriodicTask):
run_every = timedelta(minutes=30)
def run(self, **kwargs):
#do something
Ответ 7
Основываясь на ответе Алекса Мартелли, я внедрил версию декоратора, которая проще интегрировать.
import sched
import time
import datetime
from functools import wraps
from threading import Thread
def async(func):
@wraps(func)
def async_func(*args, **kwargs):
func_hl = Thread(target=func, args=args, kwargs=kwargs)
func_hl.start()
return func_hl
return async_func
def schedule(interval):
def decorator(func):
def periodic(scheduler, interval, action, actionargs=()):
scheduler.enter(interval, 1, periodic,
(scheduler, interval, action, actionargs))
action(*actionargs)
@wraps(func)
def wrap(*args, **kwargs):
scheduler = sched.scheduler(time.time, time.sleep)
periodic(scheduler, interval, func)
scheduler.run()
return wrap
return decorator
@async
@schedule(1)
def periodic_event():
print(datetime.datetime.now())
if __name__ == '__main__':
print('start')
periodic_event()
print('end')
Ответ 8
Здесь быстрый и грязный неблокирующий цикл с Thread
:
#!/usr/bin/env python3
import threading,time
def worker():
print(time.time())
time.sleep(5)
t = threading.Thread(target=worker)
t.start()
threads = []
t = threading.Thread(target=worker)
threads.append(t)
t.start()
time.sleep(7)
print("Hello World")
Там нет ничего особенного, worker
создает новый поток сам с задержкой. Не может быть наиболее эффективным, но достаточно простым. Ответ Northtree был бы правильным, если вам нужно более сложное решение.
И исходя из этого, мы можем сделать то же самое, только с Timer
:
#!/usr/bin/env python3
import threading,time
def hello():
t = threading.Timer(10.0, hello)
t.start()
print( "hello, world",time.time() )
t = threading.Timer(10.0, hello)
t.start()
time.sleep(12)
print("Oh,hai",time.time())
time.sleep(4)
print("How it going?",time.time())
Ответ 9
См. Мой пример
import sched, time
def myTask(m,n):
print n+' '+m
def periodic_queue(interval,func,args=(),priority=1):
s = sched.scheduler(time.time, time.sleep)
periodic_task(s,interval,func,args,priority)
s.run()
def periodic_task(scheduler,interval,func,args,priority):
func(*args)
scheduler.enter(interval,priority,periodic_task,
(scheduler,interval,func,args,priority))
periodic_queue(1,myTask,('world','hello'))
Ответ 10
task-scheduler - это планировщик процесса, который периодически организует и запускает задачу в соответствии с конфигурационным файлом YAML.
См. https://github.com/tzutalin/task-scheduler