Использование многопоточной очереди в python правильно?

Я пытаюсь использовать Queue в python, который будет многопоточным. Я просто хотел знать, что подход, который я использую, является правильным или нет. И если я делаю что-то лишнее или если есть лучший подход, который я должен использовать.

Я пытаюсь получить новые запросы из таблицы и планировать их с помощью некоторой логики для выполнения некоторой операции, например, при выполнении запроса.

Итак, вот из основного потока я создаю отдельный поток для очереди.

if __name__=='__main__':

  request_queue = SetQueue(maxsize=-1)
  worker = Thread(target=request_queue.process_queue)
  worker.setDaemon(True)
  worker.start()


  while True:
    try:
      #Connect to the database get all the new requests to be verified
      db = Database(username_testschema, password_testschema, mother_host_testschema, mother_port_testschema, mother_sid_testschema, 0)
      #Get new requests for verification
      verify_these = db.query("SELECT JOB_ID FROM %s.table WHERE     JOB_STATUS='%s' ORDER BY JOB_ID" %
                             (username_testschema, 'INITIATED'))

      #If there are some requests to be verified, put them in the queue.
      if len(verify_these) > 0:
        for row in verify_these:
          print "verifying : %s" % row[0]
          verify_id = row[0]
          request_queue.put(verify_id)
    except Exception as e:
      logger.exception(e)
    finally:
      time.sleep(10)

Теперь в классе Setqueue у меня есть функция process_queue, которая используется для обработки двух верхних запросов в каждом запуске, которые были добавлены в очередь.

'''
Overridding the Queue class to use set as all_items instead of list to ensure unique items added and processed all the time,
'''

class SetQueue(Queue.Queue):
  def _init(self, maxsize):
    Queue.Queue._init(self, maxsize)
    self.all_items = set()

  def _put(self, item):
    if item not in self.all_items:
      Queue.Queue._put(self, item)
      self.all_items.add(item)

  '''
  The Multi threaded queue for verification process. Take the top two items, verifies them in a separate thread and sleeps for 10 sec.
  This way max two requests per run will be processed.
  '''
  def process_queue(self):
    while True:
      scheduler_obj = Scheduler()

      try:
        if self.qsize() > 0:
          for i in range(2):
            job_id = self.get()
            t = Thread(target=scheduler_obj.verify_func, args=(job_id,))
            t.start()

          for i in range(2):
            t.join(timeout=1)
            self.task_done()

      except Exception as e:
        logger.exception(
          "QUEUE EXCEPTION : Exception occured while processing requests in the VERIFICATION QUEUE")
      finally:
        time.sleep(10)

Я хочу узнать, правильно ли я понимаю, и могут ли быть какие-либо проблемы с ним.

Итак, основной поток, выполняющийся в то время, когда True в главном func подключается к базе данных, получает новые запросы и помещает их в очередь. Рабочий поток (daemon) для очереди продолжает получать новые запросы из потоков очереди и fork non-daemon, которые выполняют обработку, и поскольку время ожидания для соединения равно 1, рабочий поток будет продолжать принимать новые запросы без блокировки, а дочерняя нить будет продолжать обрабатываться в фоновом режиме. Правильно?

Итак, если основной выход из этого процесса не будет убит, пока они не закончат свою работу, но поток рабочего демона завершится. Сомнение: если родитель является демоном, а дочерний не является демонами, и если родительские выходы заканчиваются, то ребенок выходит?).


Я также читаю здесь: - Многопроцессорность Дэвида Бэйзли

Давид Бизлей в использовании пула в качестве раздела сопроцессора потока, где он пытается решить аналогичную проблему. Поэтому я должен следовать его шагам: 1. Создайте пул процессов. 2. Откройте поток, как я делаю для request_queue 3. В этой теме

  def process_verification_queue(self):
    while True:
      try:
        if self.qsize() > 0:
          job_id = self.get()
          pool.apply_async(Scheduler.verify_func, args=(job_id,))
      except Exception as e:
        logger.exception("QUEUE EXCEPTION : Exception occured while    processing requests in the VERIFICATION QUEUE")

Используйте процесс из пула и параллельно проверяйте verify_func. Это даст мне больше производительности?

Ответы

Ответ 1

Пока можно создать новый независимый поток для очереди и обработать эти данные по отдельности так, как вы это делаете, я считаю, что для каждого независимого рабочего потока чаще всего отправлять сообщения в очередь, которую они уже знают " около. Затем эта очередь обрабатывается из некоторого другого потока, вытаскивая сообщения из этой очереди.

Идея дизайна

То, как я обращаю ваше приложение, будет три потока. Основной поток и два рабочих потока. 1 рабочий поток будет получать запросы из базы данных и помещать их в очередь. Другой рабочий поток обработает эти данные из очереди

Основной поток просто ждет завершения остальных потоков с помощью функций потока .join()

Вы должны защитить очередь, к которой потоки имеют доступ, и сделать ее потокобезопасной, используя мьютекс. Я видел эту схему во многих других проектах на других языках.

Рекомендуемое чтение

"Эффективный Python" Бретта Слаткина имеет прекрасный пример этого самого вопроса.

Вместо того, чтобы наследовать от Queue, он просто создает обертку в своем классе называется MyQueue и добавляет функцию get() и put (message).

Он даже предоставляет исходный код в своем репозитории Github

https://github.com/bslatkin/effectivepython/blob/master/example_code/item_39.py

Я не связан с книгой или ее автором, но я очень рекомендую ее, поскольку я узнал от нее несколько вещей:)

Ответ 2

Мне нравится это объяснение преимуществ и различий между использованием потоков и процессов - "..... Но есть серебряная подкладка: процессы могут продвигаться одновременно по нескольким потокам исполнения. Поскольку родительский процесс не разделяет GIL с его дочерними процессами, все процессы могут выполняться одновременно (с учетом ограничений аппаратное обеспечение и ОС)...."

У него есть отличные объяснения для того, чтобы обойти GIL и как улучшить производительность.

Подробнее здесь:

http://jeffknupp.com/blog/2013/06/30/pythons-hardest-problem-revisited/