Безопасен ли фильтр
У меня есть поток, который обновляет список под названием l
. Правильно ли я говорю, что это поточно-безопасное выполнение следующего из другого потока?
filter(lambda x: x[0] == "in", l)
Если это не безопасный поток, то это правильный подход:
import threading
import time
import Queue
class Logger(threading.Thread):
def __init__(self, log):
super(Logger, self).__init__()
self.log = log
self.data = []
self.finished = False
self.data_lock = threading.Lock()
def run(self):
while not self.finished:
try:
with self.data_lock:
self.data.append(self.log.get(block=True, timeout=0.1))
except Queue.Empty:
pass
def get_data(self, cond):
with self.data_lock:
d = filter(cond, self.data)
return d
def stop(self):
self.finished = True
self.join()
print("Logger stopped")
где метод get_data(self, cond)
используется для извлечения небольшого подмножества данных в self.data безопасным потоком.
Ответы
Ответ 1
Во-первых, чтобы ответить на ваш вопрос в заголовке: filter
- это просто функция. Следовательно, его безопасность потока будет опираться на структуру данных, с которой вы ее используете.
Как уже отмечалось в комментариях, сами операции с списками являются потокобезопасными в CPython и защищены GIL, но это, возможно, только деталь реализации CPython, на которую вы не должны положиться. Даже если вы можете положиться на это, безопасность потоков некоторых из их операций, вероятно, не означает вид безопасности потока, который вы имеете в виду:
Проблема состоит в том, что итерация по последовательности с filter
вообще не является атомной операцией. Последовательность может быть изменена во время итерации. В зависимости от структуры данных, лежащей в основе вашего итератора, это может вызвать более или менее странные эффекты. Одним из способов преодоления этой проблемы является повторение копии последовательности, созданной с помощью одного атомарного действия. Самый простой способ сделать это для стандартных последовательностей, таких как tuple
, list
, string
, заключается в следующем:
filter(lambda x: x[0] == "in", l[:])
Помимо этого, не обязательно быть потокобезопасным для других типов данных, есть одна проблема с этим: это только мелкая копия. Поскольку элементы списка, похоже, также похожи на список, другой поток может параллельно del l[1000][:]
удалять один из внутренних списков (на которые также указывают и ваши мелкие копии). Это приведет к отказу вашего фильтра с помощью IndexError
.
Все, что сказал, не стыдно использовать блокировку для защиты доступа к вашему списку, и я определенно рекомендую его. В зависимости от того, как ваши данные изменяются и как вы работаете с возвращенными данными, может быть даже разумно глубоко скопировать элементы, удерживая блокировку и вернуть эти копии. Таким образом, вы можете гарантировать, что после возвращения условие фильтра не будет внезапно изменяться для возвращаемых элементов.
Wrt. ваш код Logger
: я не уверен на 100%, как вы планируете использовать это, и если вам необходимо запустить несколько потоков в одной очереди и join
их. Мне кажется, что вы никогда не используете Queue.task_done()
(при условии, что его self.log
является Queue
). Также ваш опрос в очереди потенциально расточительный. Если вам не нужен join
потока, я бы предложил по крайней мере включить сбор блокировки:
class Logger(threading.Thread):
def __init__(self, log):
super(Logger, self).__init__()
self.daemon = True
self.log = log
self.data = []
self.data_lock = threading.Lock()
def run(self):
while True:
l = self.log.get() # thread will sleep here indefinitely
with self.data_lock:
self.data.append(l)
self.log.task_done()
def get_data(self, cond):
with self.data_lock:
d = filter(cond, self.data)
# maybe deepcopy d here
return d
Внешне вы все равно можете сделать log.join()
, чтобы убедиться, что все элементы очереди log
обработаны.
Ответ 2
Если один поток записывает в список, а другой поток читает этот список, эти два должны быть синхронизированы. Для этого не имеет значения, использует ли читатель filter()
, индекс или итерацию или использует ли автор append()
или любой другой метод.
В вашем коде вы достигнете необходимой синхронизации с помощью threading.Lock
. Поскольку вы получаете доступ только к списку в контексте with self.data_lock
, доступ является взаимоисключающим.
Таким образом, ваш код формально корректен в отношении обработки списка между потоками. Но:
- Вы получаете доступ к
self.finished
без блокировки, что является проблематичным. Присвоение этому члену изменит self
, то есть сопоставление объекта с соответствующими членами, так что это должно быть синхронизировано. Эффективно это не повредит, потому что True
и False
являются глобальными константами, в худшем случае у вас будет небольшая задержка между настройкой состояния в одном потоке и наблюдением состояния в другом. Он остается плохим, потому что он формирует привычку.
- Как правило, при использовании блокировки всегда сохраняется документ, защищающий эту блокировку. Кроме того, документ, к которому осуществляется доступ к объекту, через какой поток. Тот факт, что
self.finished
является общим и требует синхронизации, был бы очевиден. Кроме того, визуальное различие между публичными функциями и данными и частными (начиная с _underscore
, см. PEP 8) помогает отслеживать это. Это также помогает другим читателям.
- Аналогичная проблема - ваш базовый класс. В общем, наследование от
threading.Thread
- плохая идея. Скорее, включите экземпляр класса потока и дайте ему функцию, подобную self._main_loop
для запуска. Причина в том, что вы говорите, что ваш Logger
- это Thread
, и что все его публичные члены базового класса также являются публичными членами вашего класса, что, вероятно, намного шире интерфейса, чем то, что вы намеревались.
- Вы не должны блокировать блокировку. В вашем коде вы блокируете в
self.log.get(block=True, timeout=0.1)
блокировку мьютекса. В то время, даже если ничего не происходит, ни один другой поток не сможет позвонить и завершить вызов get_data()
. На самом деле есть только крошечное окно между разблокировкой мьютекса и блокировкой его снова, когда вызывающему абоненту get_data()
не нужно ждать, что очень плохо для производительности. Я даже мог предположить, что ваш вопрос мотивирован очень плохим исполнением, которое это вызывает. Вместо этого вызовите log.get(..)
без блокировки, он не нужен. Затем, удерживая блокировку, добавьте данные в self.data
и проверьте self.finished
.