Многопоточность производителя/потребителя
Фон
Не имея денег на учебу в школе, я работаю в ночных сменах на платной дороге и использую Интернет, чтобы научить себя некоторым навыкам кодирования, надеясь на лучшую работу завтра или онлайн-продажу какого-либо приложения, которое я делаю. Длинные ночи, немногие клиенты.
Я рассматриваю многопоточность как тему, поскольку я сталкиваюсь с большим количеством кода в литературе (например, Android SDK), который использует его, но я все еще нахожу его неясным.
Дух
Мой подход на этом этапе: попытайтесь закодировать самый простой пример многопоточности, о котором я могу думать, немного ударить головой о стену и посмотреть, могу ли я растянуть мой мозг, чтобы разместить новый образ мышления. Я подвергаю себя своим пределам, надеюсь, превзойти их. Не стесняйтесь критиковать дико, вплоть до nitpicking, и указывать лучшие способы делать то, что я пытаюсь сделать.
Цель
-
Get some advice on how to do the above, based on my efforts so far (code provided)
Упражнение
Здесь определяется область действия:
Определение
Создайте два класса, которые работают в тандеме по созданию объектов данных и их потреблению. Один Thread создает объекты и доставляет их в разделяемое пространство для другого, чтобы забрать и потреблять. Позвольте вызвать производящий поток Producer
, потребляющий поток Consumer
и общее пространство SharedSpace
. Акт создания объектов для потребления другими может быть ассимилирован с помощью аналогии с этим сценарием:
`Producer` (a busy mum making chocolate-covered cakes for his child, up to a limit)
`Consumer` (a hungry child waiting to eat all cakes the mum makes, until told to stop)
`SharedSpace` (a kitchen table on which the cakes are put as soon as they become ready)
`dataValue` (a chocolate-dripping cake which MUST be eaten immediately or else...)
Чтобы упростить упражнение, я решил не разрешать маме готовить, когда ребенок ест свой торт. Она просто ждет, пока ребенок закончит свой торт и мгновенно сделает еще один, до определенного предела, для хорошего воспитания. Суть упражнения состоит в том, чтобы практиковать сигнализацию Thread по достижению любого concurrency вообще. Напротив, я сосредоточен на идеальной сериализации, без опроса или "я могу пойти еще?". чеки. Полагаю, мне придется процитировать последующее упражнение, в котором мать и ребенок "работают" параллельно.
Подход
-
Попросите мои классы реализовать интерфейс Runnable, чтобы у них была точка ввода кода
-
Используйте мои классы как аргументы конструктора для объектов Thread, которые создаются и запускаются из программы main
entry точка
-
Убедитесь, что программа main
не заканчивается перед Thread с помощью Thread.join()
-
Задайте ограничение на количество раз, когда Producer
создаст данные для Consumer
-
Согласитесь на значение дозорного значения, которое Produce
будет использовать для завершения передачи данных.
-
Логарифмическое получение блокировок на общих ресурсах и событиях производства/потребления данных, включая окончательное отключение рабочих потоков
-
Создайте один SharedSpace
объект из программы main
и передайте его каждому работнику перед запуском
-
Хранить private
ссылку на объект SharedSpace
внутри каждого рабочего
-
Предоставить защиту и сообщения, чтобы описать условие готовности Consumer
к употреблению до того, как будут созданы какие-либо данные.
-
Остановите Producer
после заданного количества итераций
-
Остановите Consumer
после того, как он прочитает значение часового
код
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class Consumer extends Threaded {
public Consumer(SharedSpace sharedSpace) {
super(sharedSpace);
}
@Override
public void run() {
super.run();
int consumedData = 0;
while (consumedData != -1) {
synchronized (sharedSpace) {
logger.info("Acquired lock on sharedSpace.");
consumedData = sharedSpace.dataValue;
if (consumedData == 0) {
try {
logger.info("Data production has not started yet. "
+ "Releasing lock on sharedSpace, "
+ "until notification that it has begun.");
sharedSpace.wait();
} catch (InterruptedException interruptedException) {
logger.error(interruptedException.getStackTrace().toString());
}
} else if (consumedData == -1) {
logger.info("Consumed: END (end of data production token).");
} else {
logger.info("Consumed: {}.", consumedData);
logger.info("Waking up producer to continue data production.");
sharedSpace.notify();
try {
logger.info("Releasing lock on sharedSpace "
+ "until notified of new data availability.");
sharedSpace.wait();
} catch (InterruptedException interruptedException) {
logger.error(interruptedException.getStackTrace().toString());
}
}
}
}
logger.info("Signing off.");
}
}
class Producer extends Threaded {
private static final int N_ITERATIONS = 10;
public Producer(SharedSpace sharedSpace) {
super(sharedSpace);
}
@Override
public void run() {
super.run();
int nIterations = 0;
while (nIterations <= N_ITERATIONS) {
synchronized (sharedSpace) {
logger.info("Acquired lock on sharedSpace.");
nIterations++;
if (nIterations <= N_ITERATIONS) {
sharedSpace.dataValue = nIterations;
logger.info("Produced: {}", nIterations);
} else {
sharedSpace.dataValue = -1;
logger.info("Produced: END (end of data production token).");
}
logger.info("Waking up consumer for data consumption.");
sharedSpace.notify();
if (nIterations <= N_ITERATIONS) {
try {
logger.info("Releasing lock on sharedSpace until notified.");
sharedSpace.wait();
} catch (InterruptedException interruptedException) {
logger.error(interruptedException.getStackTrace().toString());
}
}
}
}
logger.info("Signing off.");
}
}
class SharedSpace {
volatile int dataValue = 0;
}
abstract class Threaded implements Runnable {
protected Logger logger;
protected SharedSpace sharedSpace;
public Threaded(SharedSpace sharedSpace) {
this.sharedSpace = sharedSpace;
logger = LoggerFactory.getLogger(this.getClass());
}
@Override
public void run() {
logger.info("Started.");
String workerName = getClass().getName();
Thread.currentThread().setName(workerName);
}
}
public class ProducerConsumer {
public static void main(String[] args) {
SharedSpace sharedSpace = new SharedSpace();
Thread producer = new Thread(new Producer(sharedSpace), "Producer");
Thread consumer = new Thread(new Consumer(sharedSpace), "Consumer");
producer.start();
consumer.start();
try {
producer.join();
consumer.join();
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}
Журнал выполнения
Consumer - Started.
Consumer - Acquired lock on sharedSpace.
Consumer - Data production has not started yet. Releasing lock on sharedSpace, until notification that it has begun.
Producer - Started.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 1
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 1.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 2
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 2.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 3
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 3.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 4
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 4.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 5
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 5.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 6
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 6.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 7
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 7.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 8
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 8.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 9
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 9.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 10
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 10.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: END (end of data production token).
Producer - Waking up consumer for data consumption.
Producer - Signing off.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: END (end of data production token).
Consumer - Signing off.
Вопрос
- Правильно ли это? (например, использует ли он правильные языковые инструменты, правильный подход, содержит ли он какой-то глупый код,...)
Но это выглядит "правильно"?
Я спрашиваю о правильности, даже если результат "выглядит хорошо", потому что вы не можете себе представить, сколько раз что-то пошло не так в моем тестировании "один раз", а не "другое" (например, когда потребитель начал сначала, когда Продюсер никогда не уходит после производства часового и т.д.). Я научился не требовать правильности от "успешного запуска". Напротив, я стал очень подозрительным в отношении псевдопараллельного кода! (по определению это не является даже параллельным! 0
Расширенные ответы
Хороший вопрос фокусируется только на one requested piece of advice
(выше), но не стесняйтесь упоминать какие-либо другие темы в вашем ответе, если хотите:
-
Как я могу протестировать параллельный код, когда буду вводить следующие мои попытки?
-
Какие инструменты могут помочь мне в разработке и отладке? Рассмотрим, что я использую Eclipse
-
Будет ли изменен подход, если я разрешаю Producer
продолжать производить, причем каждое производство занимает некоторое переменное время, в то время как Consumer
потребляет все, что становится доступным? Следует ли запереть замок в другом месте? Должна ли сигнализация измениться с этой парадигмы ожидания/уведомления?
-
Этот метод делает вещи устаревшими и должен ли я изучать что-то еще? Из этой таблички я понятия не имею, что происходит "в реальном мире Java"
Следующие шаги
- Куда мне идти дальше? Я видел понятие "фьючерсы", упомянутое где-то, но я мог бы использовать нумерованный список тем для работы последовательно, с педалогически упорядоченным, со ссылками на связанные учебные ресурсы.
Тино Сино
Ответы
Ответ 1
Правильно ли это указано?
Единственная проблема, которую я вижу, это то, что было упомянуто @Tudor и @Bhaskar. Всякий раз, когда вы проверяете условие, когда вы его ожидаете, вы должны использовать цикл while
. Однако речь идет скорее о гоночных условиях с несколькими производителями и потребителями. Возможно появление ложных пробуждений, но условия гонки намного более вероятны. Смотрите мою страницу по теме.
Да, у вас только 1 производитель и 1 потребитель, но вы можете попытаться расширить свой код для нескольких потребителей или скопировать код в другой сценарий.
Я научился не требовать правильности от "успешного запуска". Напротив, я стал очень подозрительно относиться к псевдопараллельному коду!
Хороший инстинкт.
Как я могу протестировать параллельный код, когда буду вводить следующие мои попытки?
Это очень сложно. Масштабирование - это один из способов. Добавьте нескольких производителей и потребителей и посмотрите, есть ли проблемы. Работает на нескольких архитектурах с разными числами/типами процессоров. Ваша лучшая защита будет правильной. Тесная синхронизация, хорошее использование классов BlockingQueue
, ExecutorService
и т.д., Чтобы сделать ваш сайт более простым и чистым.
Нет простого ответа. Тестирование многопоточного кода чрезвычайно сложно.
Какие инструменты могут помочь мне в разработке и отладке?
Что касается общих вещей, я бы посмотрел на инструмент покрытия, например Emma, чтобы вы могли убедиться в том, что ваши модульные тесты покрывают весь ваш код.
В терминах тестирования многопоточного кода узнайте, как читать kill -QUIT
потоки-дампы и посмотреть на выполнение потоков внутри Jconsole. Провайдеры Java, такие как YourKit, также могут помочь.
Будет ли этот подход изменяться, если я позволю продюсеру продолжать производство, причем каждое производство занимает некоторое переменное время...
Я так не думаю. Потребитель будет ждать продюсера навсегда. Может быть, я не понимаю этот вопрос?
Этот метод делает вещи устаревшими и должен ли я изучать что-то еще? Из этой таблички я понятия не имею, что происходит "в реальном мире Java"
Теперь узнаем о ExecutorService
classes. Они обрабатывают большой процент кода стиля new Thread()
, особенно когда вы имеете дело с несколькими асинхронными задачами, выполняемыми с потоками. Здесь tutorial.
Куда мне идти дальше?
Опять же, ExecutorService
. Я предполагаю, что вы прочитали этот начальный docs. Как отметил @Bhaskar, Java Concurrency in Practice является хорошей библией.
Вот некоторые общие комментарии о вашем коде:
-
Классы SharedSpace
и Threaded
кажутся ухищренным способом сделать это. Если вы играете с базовыми классами и т.п., То отлично. Но в общем, я никогда не использую такой шаблон. Производители и потребители обычно работают с BlockingQueue
как LinkedBlockingQueue
, и в этом случае для обработки данных синхронизации и volatile
вы. Кроме того, я склонен вводить общую информацию в конструктор объекта, а не получать его из базового класса.
-
Обычно, если я использую synchronized
, он находится в поле private final
. Часто я создаю private final Object lockObject = new Object();
для блокировки, если я не работаю с объектом уже.
-
Будьте осторожны с огромными блоками synchronized
и помещайте сообщения журнала внутри разделов synchronized
. Журналы обычно делают synchronized
IO в файловую систему, которая может быть очень дорогой. У вас должны быть маленькие, очень плотные, synchronized
блоки, если это возможно.
-
Вы определяете consumedData
вне цикла. Я бы определил его в точке назначения, а затем использовал break
для запирания из цикла, если он == -1
. Обязательно ограничьте область локальных переменных, если это вообще возможно.
-
Ваши сообщения о регистрации будут доминировать над производительностью вашего кода. Это означает, что когда вы их удаляете, ваш код будет работать совершенно по-другому. Это очень важно, когда вы начинаете отлаживать проблемы с ним. Производительность также (скорее всего) изменится, когда вы перейдете к другой архитектуре с разными процессорами/ядрами.
-
Вы, вероятно, знаете это, но когда вы вызываете sharedSpace.notify();
, это означает, что другой поток уведомляется, если он в настоящее время находится в sharedSpace.wait();
. Если это не что-то другое, оно пропустит уведомление. Просто FYI.
-
Немного странно делать if (nIterations <= N_ITERATIONS)
, а затем 3 строки ниже else
делать это снова. Дублирование notify()
было бы лучше упростить ветвление.
-
У вас есть int nIterations = 0;
, затем a while
, затем внутри a ++. Это рецепт цикла for:
for (int nIterations = 0; nIterations <= N_ITERATIONS; nIterations++) {
Здесь гораздо более плотная версия вашего кода. Это всего лишь пример того, как я буду писать. Опять же, помимо отсутствующего while
, в вашей версии нет ничего плохого.
public class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
int consumedData = queue.take();
if (consumedData == Producer.FINAL_VALUE) {
logger.info("Consumed: END (end of data production token).");
break;
}
logger.info("Consumed: {}.", consumedData);
}
logger.info("Signing off.");
}
}
public class Producer implements Runnable {
public static final int FINAL_VALUE = -1;
private final BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
for (int nIterations = 0; nIterations <= N_ITERATIONS; nIterations++) {
logger.info("Produced: {}", nIterations);
queue.put(nIterations);
}
queue.put(FINAL_VALUE);
logger.info("Produced: END (end of data production token).");
logger.info("Signing off.");
}
}
public class ProducerConsumer {
public static void main(String[] args) {
// you can add an int argument to the LinkedBlockingQueue constructor
// to only allow a certain number of items in the queue at one time
BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
Thread producer = new Thread(new Producer(queue), "Producer");
Thread consumer = new Thread(new Consumer(queue), "Consumer");
// start and join go here
}
}
Ответ 2
Вы, кажется, неплохо справились здесь. На самом деле это не так много. Думаю, что я хотел бы рекомендовать, вам следует избегать синхронизации на самом буфере. В этом случае это нормально, но при условии, что вместо этого вы переключитесь на буфер структуры данных, в зависимости от класса, который он может быть синхронизирован внутренне (например, Vector
, хотя он устарел до сих пор), поэтому приобретение блокировки извне может испортить его.
Изменить: Bhaskar делает хороший аргумент в пользу использования while
для переноса вызовов на wait
. Это из-за печально известных побочных пробуждений, которые могут произойти, прежде чем выпустить поток из wait
преждевременно, поэтому вам нужно убедиться, что он вернулся.
Что вы можете сделать дальше - это реализовать потребителя конечного буфера-производителя: иметь некоторую общую структуру данных, например. связанный список и установить максимальный размер (например, 10 элементов). Затем пусть продюсер продолжает производить и только приостанавливает его, когда в очереди 10 предметов. Потребитель будет заблокирован всякий раз, когда буфер пуст.
Следующие шаги, которые вы можете предпринять, - это научиться автоматизировать процесс, который вы выполнили вручную. Взгляните на BlockingQueue
, который обеспечивает буфер с возможностью блокировки (т.е. Потребитель будет автоматически блокировать, если буфер пуст, и производитель будет блокировать, если он заполнен).
Кроме того, в зависимости от ситуации, исполнители (посмотрите ExecutorService
) могут стать достойной заменой, поскольку они инкапсулируют очередь задач и одного или нескольких рабочих (потребителей), поэтому все, что вам нужно, это производитель.
Ответ 3
Producer
и Consumer
могут быть простыми классами, реализующими Runnable
(no extends Threaded
). Таким образом, они менее хрупкие. Клиенты могут создавать темы Thread
и присоединять экземпляры, чтобы не требовалось накладных расходов иерархии классов.
Ваше состояние перед вами wait()
должно быть while()
, а не if
.
изменить: с JCIP-страницы 301:
void stateDependentMethod() throws InterruptedException {
// condition predicate must be guarded by lock
synchronized(lock) {
while (!conditionPredicate())
lock.wait();
// object is now in desired state
}
}
У вас есть условие, чтобы остановить статически. Обычно производители и потребители должны быть более гибкими - они должны иметь возможность реагировать на внешний сигнал для остановки.
Для начала, для реализации внешнего сигнала останова, у вас есть флаг:
class Producer implements Runnable {
private volatile boolean stopRequested ;
public void run() {
while(true){
if(stopRequested )
// get out of the loop
}
}
public void stop(){
stopRequested = true;
// arrange to interrupt the Producer thread here.
}
}
Когда вы попытаетесь реализовать вышеизложенное, вы, вероятно, увидите, что возникают другие осложнения - например, ваш производитель сначала публикует, а затем wait()
ing, но это может привести к проблемам.
Если вас интересует дальнейшее чтение, я предлагаю прочитать книгу - Java Concurrency In Practice. Это будет иметь множество рекомендаций, чем я могу добавить здесь.