Как определить потокобезопасный массив?
Как я могу определить глобальный массив с минимальным потоком с минимальными изменениями?
Я хочу, чтобы каждый доступ к нему выполнялся с помощью мьютекса и синхронизированного блока.
Что-то вроде этого как "T" будет некотором типом (обратите внимание, что ключевое слово "sync" в настоящее время не определено AFAIK):
sync Array!(T) syncvar;
И каждый доступ к нему будет близок к этому:
Mutex __syncvar_mutex;
//some func scope....
synchronized(__syncvar_mutex) { /* edits 'syncvar' safely */ }
Ответы
Ответ 1
Моя наивная попытка состояла в том, чтобы сделать что-то вроде этого:
import std.typecons : Proxy:
synchronized class Array(T)
{
static import std.array;
private std.array.Array!T data;
mixin Proxy!data;
}
К сожалению, это не работает из-за https://issues.dlang.org/show_bug.cgi?id=14509
Не могу сказать, что я очень удивлен, хотя автоматическая обработка многопоточности через скрытые мьютексы очень унииоматична в современном D, и сама концепция синхронизированных классов в основном является реликтовым из D1 раз.
Вы можете реализовать такое же решение вручную, конечно, путем определения собственного класса SharedArray
со всеми необходимыми методами и добавления блокировок внутри методов перед вызовом внутренних частных простых методов Array
. Но я предполагаю, что вы хотите что-то, что больше работает из коробки.
Невозможно придумать что-нибудь лучше прямо здесь и сейчас (подумаем об этом больше), но стоит отметить, что в целом рекомендуется в D создавать структуры данных, предназначенные для обработки совместного доступа явно, вместо того, чтобы просто защищать нормальные структуры данных с мьютексами. И, конечно же, наиболее востребованный подход заключается в том, чтобы не использовать общие данные, используя вместо этого передачу сообщений.
Я обновлю ответ, если что-нибудь лучше приходит мне на ум.
Ответ 2
Вы можете обернуть массив внутри struct, который блокирует доступ к массиву, когда поток получает токен и пока он не освободит его.
Обертка/шкафчик:
-
acquire()
: вызывается в цикле потоком. Когда он возвращает указатель, поток знает, что он имеет токен, когда метод возвращает ненулевое значение.
-
release()
: вызывается потоком после обработки данных, чей доступ ранее был получен.
.
shared struct Locker(T)
{
private:
T t;
size_t token;
public:
shared(T) * acquire()
{
if (token) return null;
else
{
import core.atomic;
atomicOp!"+="(token, 1);
return &t;
}
}
void release()
{
import core.atomic;
atomicOp!"-="(token, 1);
}
}
и быстрый тест:
alias LockedIntArray = Locker!(size_t[]);
shared LockedIntArray intArr;
void arrayTask(size_t cnt)
{
import core.thread, std.random;
// ensure the desynchronization of this job.
Thread.sleep( dur!"msecs"(uniform(4, 20)));
shared(size_t[])* arr = null;
// wait for the token
while(arr == null) {arr = intArr.acquire;}
*arr ~= cnt;
import std.stdio;
writeln(*arr);
// release the token for the waiting threads
intArr.release;
}
void main(string[] args)
{
import std.parallelism;
foreach(immutable i; 0..16)
{
auto job = task(&arrayTask, i);
job.executeInNewThread();
}
}
С недостатком, что каждый блок работы над массивом должен быть окружен парой "приобретать/выпускать".
Ответ 3
Довольно легко создать оболочку вокруг массива, которая сделает ее потокобезопасной. Тем не менее, чрезвычайно сложно создать поточно-безопасный массив, который не является узким местом concurrency.
Самое близкое, что приходит в голову, это класс Java CopyOnWriteArrayList, но даже это не идеально...
Ответ 4
У вас есть правильная идея. Как массив, вы должны иметь возможность редактировать и получать информацию. Я предлагаю вам взглянуть на мьютекс чтения-записи и atomic, предоставляемые Phobos. Операция чтения довольно проста:
-
synchronize
on mutex.readLock
- load (с
atomicLoad
)
- скопируйте элемент из блока
synchronize
- вернуть скопированный элемент
Письмо должно быть почти таким же. Просто syncronize
на mutex.writeLock
и выполните операцию cas
или atomicOp
.
Обратите внимание, что это будет работать, только если вы скопируете элементы в массиве во время чтения. Если вы хотите получить ссылку, вам нужно выполнить дополнительную синхронизацию элемента при каждом доступе или изменении.