Опрос с задержкой_job
У меня есть процесс, который занимает обычно несколько секунд, поэтому я пытаюсь использовать delayed_job для обработки асинхронно. Работа сама по себе хорошо работает, мой вопрос заключается в том, как начать опрос задания, чтобы узнать, если это будет сделано.
Я могу получить id от delayed_job, просто присваивая его переменной:
job = Available.delay.dosomething(: var = > 1234)
+------+----------+----------+------------+------------+-------------+-----------+-----------+-----------+------------+-------------+
| id | priority | attempts | handler | last_error | run_at | locked_at | failed_at | locked_by | created_at | updated_at |
+------+----------+----------+------------+------------+-------------+-----------+-----------+-----------+------------+-------------+
| 4037 | 0 | 0 | --- !ru... | | 2011-04-... | | | | 2011-04... | 2011-04-... |
+------+----------+----------+------------+------------+-------------+-----------+-----------+-----------+------------+-------------+
Но как только он завершит задание, он удалит его и поиск завершенной записи возвращает ошибку:
@job=Delayed::Job.find(4037)
ActiveRecord::RecordNotFound: Couldn't find Delayed::Backend::ActiveRecord::Job with ID=4037
@job= Delayed::Job.exists?(params[:id])
Должен ли я потрудиться изменить это и, возможно, отложить удаление полных записей? Я не уверен, как еще я могу получить уведомление об этом статусе. Или опрос мертвой записи как подтверждение завершения? Кто-нибудь сталкивается с чем-то похожим?
Ответы
Ответ 1
В итоге я использовал комбинацию Delayed_Job с обратным вызовом after (job), который заполняет memcached-объект с тем же идентификатором, что и созданное задание. Таким образом, я минимизирую количество раз, когда я попал в базу данных с запросом статуса задания, вместо этого опросив memcached-объект. И он содержит весь объект, который мне нужен из завершенной работы, поэтому у меня даже нет запроса на кругосветку. Я получил идею из статьи парней github, которые сделали почти то же самое.
https://github.com/blog/467-smart-js-polling
и использовал плагин jquery для опроса, который опробован реже и сбрасывается после определенного количества попыток
https://github.com/jeremyw/jquery-smart-poll
Кажется, отлично работает.
def after(job)
prices = Room.prices.where("space_id = ? AND bookdate BETWEEN ? AND ?", space_id.to_i, date_from, date_to).to_a
Rails.cache.fetch(job.id) do
bed = Bed.new(:space_id => space_id, :date_from => date_from, :date_to => date_to, :prices => prices)
end
end
Ответ 2
Начнем с API. Я хотел бы иметь что-то вроде следующего.
@available.working? # => true or false, so we know it running
@available.finished? # => true or false, so we know it finished (already ran)
Теперь напишите задание.
class AwesomeJob < Struct.new(:options)
def perform
do_something_with(options[:var])
end
end
Пока все хорошо. У нас есть работа. Теперь давайте напишем логику, которая вставляет ее в очередь. Поскольку "Доступно" является моделью, ответственной за эту работу, дайте ей возможность научить ее, как начать эту работу.
class Available < ActiveRecord::Base
def start_working!
Delayed::Job.enqueue(AwesomeJob.new(options))
end
def working?
# not sure what to put here yet
end
def finished?
# not sure what to put here yet
end
end
Итак, как мы узнаем, работает ли работа или нет? Есть несколько способов, но в рельсах просто кажется, что когда моя модель что-то создает, она обычно ассоциируется с чем-то. Как мы ассоциируем? Использование идентификаторов в базе данных. Добавьте добавление a job_id
в Доступную модель.
Пока мы находимся в этом вопросе, как мы знаем, что работа не работает, потому что она уже завершена или потому, что она еще не началась? Один из способов - фактически проверить, что на самом деле делало. Если он создал файл, проверьте, существует ли файл. Если он вычислил значение, проверьте, что результат написан. Некоторые задания не так легко проверить, так как не может быть ясного проверяемого результата их работы. В таком случае вы можете использовать флаг или временную метку в своей модели. Предположим, что это наш случай, добавьте метку времени job_finished_at
, чтобы отличить еще не выполненную работу от уже готовой.
class AddJobIdToAvailable < ActiveRecord::Migration
def self.up
add_column :available, :job_id, :integer
add_column :available, :job_finished_at, :datetime
end
def self.down
remove_column :available, :job_id
remove_column :available, :job_finished_at
end
end
Хорошо. Итак, теперь пусть фактически связывает Available
с его заданием, как только мы ставим в очередь задание, изменяя метод start_working!
.
def start_working!
job = Delayed::Job.enqueue(AwesomeJob.new(options))
update_attribute(:job_id, job.id)
end
Великий. На этом этапе я мог бы написать belongs_to :job
, но нам это действительно не нужно.
Итак, теперь мы знаем, как легко написать метод working?
.
def working?
job_id.present?
end
Но как мы отмечаем, что работа закончена? Никто не знает, что работа закончилась лучше, чем сама работа. Поэтому передайте available_id
в задание (как один из вариантов) и используйте его в задании. Для этого нам нужно изменить метод start_working!
для передачи id.
def start_working!
job = Delayed::Job.enqueue(AwesomeJob.new(options.merge(:available_id => id))
update_attribute(:job_id, job.id)
end
И мы должны добавить логику в задание, чтобы обновить временную метку job_finished_at
, когда она будет выполнена.
class AwesomeJob < Struct.new(:options)
def perform
available = Available.find(options[:available_id])
do_something_with(options[:var])
# Depending on whether you consider an error'ed job to be finished
# you may want to put this under an ensure. This way the job
# will be deemed finished even if it error'ed out.
available.update_attribute(:job_finished_at, Time.current)
end
end
С помощью этого кода мы знаем, как написать наш метод finished?
.
def finished?
job_finished_at.present?
end
И все готово. Теперь мы можем просто опросить @available.working?
и @available.finished?
. Кроме того, вы получаете удобство знать, какое точное задание было создано для вашего Доступно, проверив @available.job_id
. Вы можете легко превратить его в реальную ассоциацию, сказав belongs_to :job
.
Ответ 3
Я думаю, что лучший способ - использовать обратные вызовы, доступные в delayed_job.
Эти:
: success,: error и: after.
поэтому вы можете поместить код в свою модель с помощью:
class ToBeDelayed
def perform
# do something
end
def after(job)
# do something
end
end
Потому что, если вы настаиваете на использовании obj.delayed.method, тогда вам придется запланировать обезьяну Delayed:: PerformableMethod и добавить туда метод after
.
ИМХО это намного лучше, чем опрос, для некоторой ценности, которая может быть даже специфичной для Бэкэнда (например, ActiveRecord против Mongoid).
Ответ 4
Самый простой способ выполнения этого действия - изменить действие вашего опроса на что-то похожее на следующее:
def poll
@job = Delayed::Job.find_by_id(params[:job_id])
if @job.nil?
# The job has completed and is no longer in the database.
else
if @job.last_error.nil?
# The job is still in the queue and has not been run.
else
# The job has encountered an error.
end
end
end
Почему это работает? Когда Delayed::Job
запускает задание из очереди, оно удаляет его из базы данных в случае успеха. Если задание не удается, запись остается в очереди, которая будет запущена позже, а атрибут last_error
будет установлен на обнаруженную ошибку. Используя две функциональные возможности выше, вы можете проверить удаленные записи, чтобы убедиться, что они были успешными.
Преимущества метода выше:
- Вы получаете эффект опроса, который вы искали в своем оригинальном сообщении.
- Используя простую ветвь логики, вы можете предоставить обратную связь пользователю, если есть ошибка при обработке задания.
Вы можете инкапсулировать эту функциональность в модельный метод, выполнив следующие действия:
# Include this in your initializers somewhere
class Queue < Delayed::Job
def self.status(id)
self.find_by_id(id).nil? ? "success" : (job.last_error.nil? ? "queued" : "failure")
end
end
# Use this method in your poll method like so:
def poll
status = Queue.status(params[:id])
if status == "success"
# Success, notify the user!
elsif status == "failure"
# Failure, notify the user!
end
end
Ответ 5
Я бы предположил, что если важно получить уведомление о завершении задания, напишите собственный объект задания и очередь, вместо того чтобы полагаться на задание по умолчанию, которое ставится в очередь, когда вы вызываете Available.delay.dosomething
. Создайте объект что-то вроде:
class DoSomethingAvailableJob
attr_accessor options
def initialize(options = {})
@options = options
end
def perform
Available.dosomething(@options)
# Do some sort of notification here
# ...
end
end
и введите в него:
Delayed::Job.enqueue DoSomethingAvailableJob.new(:var => 1234)
Ответ 6
Таблица delayed_jobs в вашем приложении предназначена для предоставления статуса только запущенных и поставленных в очередь заданий. Это не постоянная таблица, и на самом деле она должна быть как можно меньше по соображениям производительности. Вот почему задания удаляются сразу после завершения.
Вместо этого вы должны добавить поле в свою модель Available
, что означает, что задание выполнено. Поскольку меня обычно интересует, сколько времени требуется на обработку, я добавляю поля start_time и end_time. Тогда мой метод dosomething
будет выглядеть примерно так:
def self.dosomething(model_id)
model = Model.find(model_id)
begin
model.start!
# do some long work ...
rescue Exception => e
# ...
ensure
model.finish!
end
end
Начало! и закончить! методы просто записывают текущее время и сохраняют модель. Тогда у меня будет метод completed?
, который ваш AJAX может опросить, чтобы узнать, завершено ли задание.
def completed?
return true if start_time and end_time
return false
end
Есть много способов сделать это, но я нахожу этот метод простым и хорошо работает для меня.