Поиск записей mongoDB партиями (с использованием адаптера для рубинового монтирования)
Используя rails 3 и mongoDB с адаптером mongoid, как я могу найти партию mongo? Мне нужно захватить все записи в конкретной коллекции БД Монго и индексировать их в solr (начальный индекс данных для поиска).
Проблема, с которой я сталкиваюсь, заключается в том, что Model.all захватывает все записи и сохраняет их в памяти. Затем, когда я обрабатываю их и индексирую в solr, моя память съедается, и процесс умирает.
То, что я пытаюсь сделать, это пакетная находка в монго, чтобы я мог перебирать более 1000 записей за один раз, передавать их в solr для индексации, а затем обрабатывать следующую 1000 и т.д.
Код, который у меня есть в настоящее время, имеет следующее:
Model.all.each do |r|
Sunspot.index(r)
end
Для коллекции, насчитывающей около 1,5 миллионов записей, она потребляет 8 + ГБ памяти и убивает процесс. В ActiveRecord существует метод find_in_batches, который позволяет мне обрабатывать запросы в управляемые партии, которые не позволяют памяти выходить из-под контроля. Тем не менее, я не могу найти ничего подобного для mongoDB/mongoid.
Мне хотелось бы сделать что-то вроде этого:
Model.all.in_batches_of(1000) do |batch|
Sunpot.index(batch)
end
Это облегчит проблемы с памятью и проблемы с запросами, только каждый раз задавая управляемую проблему. Однако документация разрежена при выполнении пакетных находок в mongoDB. Я вижу много документации по выполнению пакетных вставок, но не для пакетного поиска.
Ответы
Ответ 1
С помощью Mongoid вам не нужно вручную выполнять запрос.
В Mongoid Model.all
возвращает экземпляр Mongoid::Criteria
. При вызове #each
по этим критериям создается экземпляр курсора Mongo и используется для перебора записей. Этот основной указатель драйвера Mongo уже загружает все записи. По умолчанию значение batch_size
равно 100.
Для получения дополнительной информации по этой теме прочитайте этот комментарий от автора и сопровождающего сайта Mongoid.
В общем, вы можете просто сделать это:
Model.all.each do |r|
Sunspot.index(r)
end
Ответ 2
Если вы выполняете итерирование по коллекции, в которой для каждой записи требуется большая обработка (например, при запросе внешнего API для каждого элемента), для курсора можно использовать тайм-аут. В этом случае вам нужно выполнить несколько запросов, чтобы не оставить курсор открытым.
require 'mongoid'
module Mongoid
class Criteria
def in_batches_of(count = 100)
Enumerator.new do |y|
total = 0
loop do
batch = 0
self.limit(count).skip(total).each do |item|
total += 1
batch += 1
y << item
end
break if batch == 0
end
end
end
end
end
Вот вспомогательный метод, который вы можете использовать для добавления функциональности пакетной обработки. Его можно использовать так:
Post.all.order_by(:id => 1).in_batches_of(7).each_with_index do |post, index|
# call external slow API
end
Просто убедитесь, что вы ВСЕГДА имеете order_by по вашему запросу. В противном случае пейджинг может не делать то, что вы хотите. Также я буду придерживаться партий по 100 или меньше. Как сказано в принятом ответе на монгольские запросы в партиях по 100, поэтому вы никогда не захотите оставить курсор открытым во время обработки.
Ответ 3
Быстрее отправлять партии на солнечные пятна.
Вот как я это делаю:
records = []
Model.batch_size(1000).no_timeout.only(:your_text_field, :_id).all.each do |r|
records << r
if records.size > 1000
Sunspot.index! records
records.clear
end
end
Sunspot.index! records
no_timeout
: предотвращает отключение курсора (через 10 минут по умолчанию)
only
: выбирает только идентификатор и поля, которые фактически индексируются
batch_size
: выберите 1000 записей вместо 100
Ответ 4
Я не уверен в пакетной обработке, но вы можете сделать это
current_page = 0
item_count = Model.count
while item_count > 0
Model.all.skip(current_page * 1000).limit(1000).each do |item|
Sunpot.index(item)
end
item_count-=1000
current_page+=1
end
Но если вы ищете идеальное решение долгого времени, я бы не рекомендовал это. Позвольте мне объяснить, как я применил тот же сценарий в своем приложении. Вместо выполнения пакетных заданий
-
Я создал resque задание, которое обновляет индекс solr
class SolrUpdator
@queue = :solr_updator
def self.perform(item_id)
item = Model.find(item_id)
#i have used RSolr, u can change the below code to handle sunspot
solr = RSolr.connect :url => Rails.application.config.solr_path
js = JSON.parse(item.to_json)
solr.add js
end
конец
-
После добавления элемента я просто помещаю запись в очередь resque
Resque.enqueue(SolrUpdator, item.id.to_s)
- Вот и все, запустите resque, и он позаботится обо всем.
Ответ 5
Следующие действия будут работать для вас, просто попробуйте
Model.all.in_groups_of(1000, false) do |r|
Sunspot.index! r
end
Ответ 6
Как сказал @RyanMcGeary, вам не нужно беспокоиться о пакетной обработке запроса. Однако индексирование объектов по одному намного медленнее, чем их доработка.
Model.all.to_a.in_groups_of(1000, false) do |records|
Sunspot.index! records
end