Вопрос в том, будет ли какая-нибудь проблема, если я начну повторять "a" в одном потоке и, в то же время, итерируя "b" в другом потоке? Ясно, что a и b разделяют некоторые данные (исходный итеративный, + некоторый дополнительный материал, внутренние буферы или что-то еще). Итак, будут ли a.next() и b.next() выполнять соответствующую блокировку, когда они будут обращаться к этим общим данным?
Ответ 3
TL;DR
В CPython itertools.tee
является потокобезопасным тогда и только тогда, когда исходный итератор реализован в C/С++, т.е. не использует любой питон.
Если исходный итератор it
был написан на python, например, экземпляр класса или генератор, то itertools.tee(it)
не поточно-безопасный. В лучшем случае вы получите только исключение (которое вы сделаете), а в худшем питоне произойдет сбой.
Вместо использования tee
, вот класс и функция-оболочка, которые являются потокобезопасными:
class safeteeobject(object):
"""tee object wrapped to make it thread-safe"""
def __init__(self, teeobj, lock):
self.teeobj = teeobj
self.lock = lock
def __iter__(self):
return self
def __next__(self):
with self.lock:
return next(self.teeobj)
def __copy__(self):
return safeteeobject(self.teeobj.__copy__(), self.lock)
def safetee(iterable, n=2):
"""tuple of n independent thread-safe iterators"""
lock = Lock()
return tuple(safeteeobject(teeobj, lock) for teeobj in tee(iterable, n))
Теперь я буду расширяться (много), когда tee
является и не является потокобезопасным, и почему.
Пример, где ok
Позвольте запустить некоторый код (это код python 3, для python 2 используйте itertools.izip
вместо zip
, чтобы иметь такое же поведение):
>>> from itertools import tee, count
>>> from threading import Thread
>>> def limited_sum(it):
... s = 0
... for elem, _ in zip(it, range(1000000)):
... s += elem
... print(elem)
>>> a, b = tee(count())
>>> [Thread(target=limited_sum, args=(it,)).start() for it in [a, b]]
# prints 499999500000 twice, which is in fact the same 1+...+999999
itertools.count полностью написан на С++ в файле Modules/itertoolsmodule.c
проекта CPython, поэтому он отлично работает.
То же самое верно для: списки, кортежи, наборы, диапазон, словари (ключи, значения и элементы), collections.defaultdict
(ключи, значения и элементы) и несколько других.
Пример, где он не работает - Генераторы
Очень короткий пример - использование генератора:
>>> gen = (i for i in range(1000000))
>>> a, b = tee(gen)
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Exception in thread Thread-10:
Traceback (most recent call last):
File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
self.run()
File "/usr/lib/python3.4/threading.py", line 868, in run
self._target(*self._args, **self._kwargs)
ValueError: generator already executing
Да, tee
написан на С++, и это правда, что GIL выполняет один байт кода за раз. Но приведенный выше пример показывает, что этого недостаточно для обеспечения безопасности потоков. Где-то вдоль линии это то, что произошло:
- Два потока вызвали
next
в своих экземплярах tee_object столько же раз,
- Тема 1 вызывает
next(a)
,
- Ему нужно получить новый элемент, поэтому поток 1 теперь вызывает
next(gen)
,
-
gen
написан на питоне. Например, первый байтовый код gen.__next__
CPython решает переключить потоки,
- Тема 2 возобновляет и вызывает
next(b)
,
- Ему нужно получить новый элемент, поэтому он вызывает
next(gen)
- Так как
gen.__next__
уже запущен в потоке 1, мы получаем исключение.
Пример, где он не работает - Объект Iterator
Хорошо, возможно, просто не поточно-безопасно использовать генераторы внутри tee
. Затем мы запускаем вариант вышеуказанного кода, который использует объект итератора:
>>> from itertools import tee
>>> from threading import Thread
>>> class countdown(object):
... def __init__(self, n):
... self.i = n
... def __iter__(self):
... return self
... def __next__(self):
... self.i -= 1
... if self.i < 0:
... raise StopIteration
... return self.i
...
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)
Вышеупомянутый код сбой в python 2.7.13 и 3.6 (и, вероятно, все версии cpython), на Ubuntu, Windows 7 и OSX. Я пока не хочу раскрывать причину, еще один шаг раньше.
Что делать, если я использую блокировки внутри моего итератора?
Возможно, приведенный выше код сработает, потому что наш итератор сам по себе не был потокобезопасным. Давайте добавим блокировку и посмотрим, что произойдет:
>>> from itertools import tee
>>> from threading import Thread, Lock
>>> class countdown(object):
... def __init__(self, n):
... self.i = n
... self.lock = Lock()
... def __iter__(self):
... return self
... def __next__(self):
... with self.lock:
... self.i -= 1
... if self.i < 0:
... raise StopIteration
... return self.i
...
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)
Добавление блокировки внутри нашего итератора недостаточно, чтобы сделать tee
потокобезопасным.
Почему tee не является потокобезопасным
Суть дела - это getitem
метод teedataobject
в файле Modules/itertoolsmodule.c
CPython. Реализация tee
действительно классная, с оптимизацией, которая сохраняет вызовы RAM: tee
возвращает "объекты tee", каждый из которых сохраняет ссылку на голову teedataobject
. Они, в свою очередь, похожи на ссылки в связанном списке, но вместо того, чтобы держать один элемент - они держат 57. Это не очень важно для наших целей, но это то, что есть. Вот функция getitem
teedataobject
:
static PyObject *
teedataobject_getitem(teedataobject *tdo, int i)
{
PyObject *value;
assert(i < LINKCELLS);
if (i < tdo->numread)
value = tdo->values[i];
else {
/* this is the lead iterator, so fetch more data */
assert(i == tdo->numread);
value = PyIter_Next(tdo->it);
if (value == NULL)
return NULL;
tdo->numread++;
tdo->values[i] = value;
}
Py_INCREF(value);
return value;
}
При запросе элемента teedataobject
проверяет, подготовлен ли он. Если это произойдет, оно вернет его. Если это не так, он вызывает next
на исходном итераторе. Вот где, если итератор написан на python, код может зависать. Итак, вот проблема:
- Два потока вызвали
next
столько же раз,
- Thread 1 вызывает
next(a)
, а код C переходит к вызову PyIter_Next
выше. Например, первый байтовый код next(gen)
, CPython решает переключить потоки.
- Thread 2 вызывает
next(b)
, и поскольку ему все еще нужен новый элемент, код C получает вызов PyIter_Next
,
В этот момент оба потока находятся в одном и том же месте, с теми же значениями для i
и tdo->numread
. Обратите внимание, что tdo->numread
- это просто переменная, чтобы отслеживать, где в 57-ячечной ссылке teedataobject
следует написать следующую.
- Thread 2 завершает вызов
PyIter_Next
и возвращает элемент. В какой-то момент CPython решает снова включить потоки,
-
Тема 1 возобновляет, завершает свой вызов до PyIter_Next
, а затем запускает две строки:
tdo->numread++;
tdo->values[i] = value;
-
Но поток 2 уже установил tdo->values[i]
!
Этого достаточно, чтобы показать, что tee
не является потокобезопасным, так как мы теряем значение, которое поток 2 помещает в tdo->values[i]
. Но это не объясняет сбой.
Скажите i
было 56. Поскольку оба потока вызывают tdo->numread++
, теперь он получает 58 - над 57, выделенный размер tdo->values
. После того, как поток 1 перемещается, объект tdo
не имеет больше ссылок и готов к удалению. Это явная функция для teedataobject
:
static int
teedataobject_clear(teedataobject *tdo)
{
int i;
PyObject *tmp;
Py_CLEAR(tdo->it);
for (i=0 ; i<tdo->numread ; i++)
Py_CLEAR(tdo->values[i]); // <----- PROBLEM!!!
tmp = tdo->nextlink;
tdo->nextlink = NULL;
teedataobject_safe_decref(tmp);
return 0;
}
В строке с надписью "ПРОБЛЕМА" CPython попытается очистить tdo->values[57]
. Здесь происходит авария. Ну, иногда. Там более одного места для сбоев, я просто хотел показать его.
Теперь вы знаете - itertools.tee
не является потокобезопасным.
Одно решение - Внешняя блокировка
Вместо блокировки внутри нашего итератора __next__
мы можем поместить блокировку вокруг tee.__next__
. Это означает, что весь метод teedataobject.__getitem__
будет вызываться одним потоком каждый раз. В начале ответа я дал краткое описание. Это замена для tee
, которая является потокобезопасной. Единственное, что он не реализует, что делает tee
- это травление. Поскольку блокировки не разборчивы, это не тривиально, чтобы добавить это. Но, конечно, это можно сделать.