Пул потоков для обработки сообщений параллельно, но сохраняйте порядок в цепочках
Мне нужно обрабатывать сообщения параллельно, но сохраняйте порядок обработки сообщений с тем же идентификатором беседы.
Пример:
Пусть определите такое сообщение:
class Message {
Message(long id, long conversationId, String someData) {...}
}
Предположим, что сообщения поступают в следующем порядке:
Сообщение (1, 1, "a1" ), Сообщение (2, 2, "a2" ), Сообщение (3, 1, "b1" ), Сообщение (4, 2, "b2" ).
Мне нужно, чтобы сообщение 3 обрабатывалось после сообщения 1, поскольку сообщения 1 и 3 имеют одинаковый идентификатор беседы (аналогично, сообщение 4 должно обрабатываться после 2 по той же причине).
Меня не волнует относительный порядок между, например, 1 и 2, так как они имеют разные идентификаторы разговора.
Я хотел бы как можно больше использовать функциональность java ThreadPoolExecutor, чтобы избежать необходимости замены мертвых потоков вручную в моем коде и т.д.
Обновление: Количество возможных "идентификаторов беседы" не ограничено, и нет времени для разговора. (Я лично не вижу в этом проблемы, так как у меня может быть простое сопоставление с идентификатором conversId с номером рабочего, например, talkId% totalWorkers).
Обновление 2:. Существует одна проблема с решением с несколькими очередями, где номер очереди определяется, например. 'index = Objects.hash(talkId)% total': если для обработки некоторого сообщения требуется много времени, все сообщения с одним и тем же "индексом", но разные "talkId" будут ждать, даже если другие потоки доступны для его обработки. То есть, я считаю, что решения с одной интеллектуальной блокирующей очередью будут лучше, но это всего лишь мнение, я открыт для любого хорошего решения.
Вы видите элегантное решение этой проблемы?
Ответы
Ответ 1
Мне нужно было сделать что-то очень похожее некоторое время назад, так что вот адаптация.
(Посмотрите на действие онлайн)
На самом деле это то же самое, что и в базовой базе, но в моем случае ключ был строкой, и, что более важно, набор ключей не увеличивался бесконечно, поэтому мне пришлось добавить "планировщик очистки". Кроме того, это в основном тот же код, поэтому я надеюсь, что я не потерял ничего серьезного в процессе адаптации. Я тестировал его, похоже, что он работает. Это больше, чем другие решения, хотя, возможно, более сложные...
Основная идея:
-
MessageTask
обертывает сообщение в Runnable
и уведомляет очередь, когда она завершена
-
ConvoQueue
: блокировка очереди сообщений для разговора. Действует как предвестник, который гарантирует желаемый порядок. Смотрите это трио в частности: ConvoQueue.runNextIfPossible()
→ MessageTask.run()
→ ConvoQueue.complete()
→...
-
MessageProcessor
имеет Map<Long, ConvoQueue>
, a ExecutorService
- сообщения обрабатываются любым потоком в исполнителе,
ConvoQueue
подают ExecutorService
и гарантируют порядок сообщений для каждого конвота, но не глобально (поэтому "трудное" сообщение не блокирует обработку других разговоров, в отличие от некоторых другие решения, и это свойство было критически важным в нашем случае - если это не так важно для вас, возможно, более простое решение лучше).
- очистка с помощью
ScheduledExecutorService
(занимает 1 поток)
Визуально:
ConvoQueues ExecutorService internal queue
(shared, but has at most 1 MessageTask per convo)
Convo 1 ########
Convo 2 #####
Convo 3 ####### Thread 1
Convo 4 } → #### → {
Convo 5 ### Thread 2
Convo 6 #########
Convo 7 #####
(Convo 4 is about to be deleted)
Ниже всех классов (MessageProcessorTest
можно выполнить непосредственно):
// MessageProcessor.java
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import static java.util.concurrent.TimeUnit.SECONDS;
public class MessageProcessor {
private static final long CLEANUP_PERIOD_S = 10;
private final Map<Long, ConvoQueue> queuesByConvo = new HashMap<>();
private final ExecutorService executorService;
public MessageProcessor(int nbThreads) {
executorService = Executors.newFixedThreadPool(nbThreads);
ScheduledExecutorService cleanupScheduler = Executors.newScheduledThreadPool(1);
cleanupScheduler.scheduleAtFixedRate(this::removeEmptyQueues, CLEANUP_PERIOD_S, CLEANUP_PERIOD_S, SECONDS);
}
public void addMessageToProcess(Message message) {
ConvoQueue queue = getQueue(message.getConversationId());
queue.addMessage(message);
}
private ConvoQueue getQueue(Long convoId) {
synchronized (queuesByConvo) {
return queuesByConvo.computeIfAbsent(convoId, p -> new ConvoQueue(executorService));
}
}
private void removeEmptyQueues() {
synchronized (queuesByConvo) {
queuesByConvo.entrySet().removeIf(entry -> entry.getValue().isEmpty());
}
}
}
// ConvoQueue.java
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
class ConvoQueue {
private Queue<MessageTask> queue;
private MessageTask activeTask;
private ExecutorService executorService;
ConvoQueue(ExecutorService executorService) {
this.executorService = executorService;
this.queue = new LinkedBlockingQueue<>();
}
private void runNextIfPossible() {
synchronized(this) {
if (activeTask == null) {
activeTask = queue.poll();
if (activeTask != null) {
executorService.submit(activeTask);
}
}
}
}
void complete(MessageTask task) {
synchronized(this) {
if (task == activeTask) {
activeTask = null;
runNextIfPossible();
}
else {
throw new IllegalStateException("Attempt to complete task that is not supposed to be active: "+task);
}
}
}
boolean isEmpty() {
return queue.isEmpty();
}
void addMessage(Message message) {
add(new MessageTask(this, message));
}
private void add(MessageTask task) {
synchronized(this) {
queue.add(task);
runNextIfPossible();
}
}
}
// MessageTask.java
public class MessageTask implements Runnable {
private ConvoQueue convoQueue;
private Message message;
MessageTask(ConvoQueue convoQueue, Message message) {
this.convoQueue = convoQueue;
this.message = message;
}
@Override
public void run() {
try {
processMessage();
}
finally {
convoQueue.complete(this);
}
}
private void processMessage() {
// Dummy processing with random delay to observe reordered messages & preserved convo order
try {
Thread.sleep((long) (50*Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(message);
}
}
// Message.java
class Message {
private long id;
private long conversationId;
private String data;
Message(long id, long conversationId, String someData) {
this.id = id;
this.conversationId = conversationId;
this.data = someData;
}
long getConversationId() {
return conversationId;
}
String getData() {
return data;
}
public String toString() {
return "Message{" + id + "," + conversationId + "," + data + "}";
}
}
// MessageProcessorTest.java
public class MessageProcessorTest {
public static void main(String[] args) {
MessageProcessor test = new MessageProcessor(2);
for (int i=1; i<100; i++) {
test.addMessageToProcess(new Message(1000+i,i%7,"hi "+i));
}
}
}
Вывод (для каждого символа заказа кода (2-го поля) сохраняется):
Message{1002,2,hi 2}
Message{1001,1,hi 1}
Message{1004,4,hi 4}
Message{1003,3,hi 3}
Message{1005,5,hi 5}
Message{1006,6,hi 6}
Message{1009,2,hi 9}
Message{1007,0,hi 7}
Message{1008,1,hi 8}
Message{1011,4,hi 11}
Message{1010,3,hi 10}
...
Message{1097,6,hi 97}
Message{1095,4,hi 95}
Message{1098,0,hi 98}
Message{1099,1,hi 99}
Message{1096,5,hi 96}
Тест выше дал мне уверенность в том, чтобы поделиться им, но я немного обеспокоен тем, что мог забыть подробности о патологических случаях. Он работает в производстве в течение многих лет без задержек (хотя с большим количеством кода, который позволяет проверять его вживую, когда нам нужно посмотреть, что происходит, почему определенная очередь требует времени и т.д. - никогда не проблема с самой системой сама по себе, но иногда с обработкой конкретной задачи)
Изменить: нажмите здесь, чтобы проверить онлайн. Альтернативный вариант: скопируйте этот gist в там и нажмите "Скомпилировать и выполнить".
Ответ 2
Не знаете, как вы хотите обрабатывать сообщения. Для удобства каждое сообщение имеет тип Runnable, который является местом выполнения.
Решение для всего этого состоит в том, чтобы иметь число Executor
, которые подаются в параллель ExecutorService
. Используйте операцию modulo для вычисления, к которому Executor
необходимо отправить входящее сообщение. Очевидно, что для той же самой беседы id ее те же Executor
, следовательно, у вас есть параллельная обработка, но последовательная для одного и того же идентификатора беседы. Это не гарантирует, что сообщения с разным идентификатором разговора всегда будут выполняться параллельно (в общем, вы ограничены, по крайней мере, количеством физических ядер в вашей системе).
public class MessageExecutor {
public interface Message extends Runnable {
long getId();
long getConversationId();
String getMessage();
}
private static class Executor implements Runnable {
private final LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
private volatile boolean stopped;
void schedule(Message message) {
messages.add(message);
}
void stop() {
stopped = true;
}
@Override
public void run() {
while (!stopped) {
try {
Message message = messages.take();
message.run();
} catch (Exception e) {
System.err.println(e.getMessage());
}
}
}
}
private final Executor[] executors;
private final ExecutorService executorService;
public MessageExecutor(int poolCount) {
executorService = Executors.newFixedThreadPool(poolCount);
executors = new Executor[poolCount];
IntStream.range(0, poolCount).forEach(i -> {
Executor executor = new Executor();
executorService.submit(executor);
executors[i] = executor;
});
}
public void submit(Message message) {
final int executorNr = Objects.hash(message.getConversationId()) % executors.length;
executors[executorNr].schedule(message);
}
public void stop() {
Arrays.stream(executors).forEach(Executor::stop);
executorService.shutdown();
}
}
Затем вы можете запустить исполнитель сообщений с пулом и отправить ему сообщения.
public static void main(String[] args) {
MessageExecutor messageExecutor = new MessageExecutor(Runtime.getRuntime().availableProcessors());
messageExecutor.submit(new Message() {
@Override
public long getId() {
return 1;
}
@Override
public long getConversationId() {
return 1;
}
@Override
public String getMessage() {
return "abc1";
}
@Override
public void run() {
System.out.println(this.getMessage());
}
});
messageExecutor.submit(new Message() {
@Override
public long getId() {
return 1;
}
@Override
public long getConversationId() {
return 2;
}
@Override
public String getMessage() {
return "abc2";
}
@Override
public void run() {
System.out.println(this.getMessage());
}
});
messageExecutor.stop();
}
Когда я запускаю счет пула 2 и отправляю количество сообщений:
Message with conversation id [1] is scheduled on scheduler #[0]
Message with conversation id [2] is scheduled on scheduler #[1]
Message with conversation id [3] is scheduled on scheduler #[0]
Message with conversation id [4] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [1] is scheduled on scheduler #[0]
Message with conversation id [2] is scheduled on scheduler #[1]
Message with conversation id [3] is scheduled on scheduler #[0]
Message with conversation id [3] is scheduled on scheduler #[0]
Message with conversation id [4] is scheduled on scheduler #[1]
Когда столько же сообщений запускается со счетом пула 3:
Message with conversation id [1] is scheduled on scheduler #[2]
Message with conversation id [2] is scheduled on scheduler #[0]
Message with conversation id [3] is scheduled on scheduler #[1]
Message with conversation id [4] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [1] is scheduled on scheduler #[2]
Message with conversation id [2] is scheduled on scheduler #[0]
Message with conversation id [3] is scheduled on scheduler #[1]
Message with conversation id [3] is scheduled on scheduler #[1]
Message with conversation id [4] is scheduled on scheduler #[2]
Сообщения распределяются между пулом Executor
:).
РЕДАКТИРОВАТЬ: Executor
run()
захватывает все Исключения, чтобы гарантировать, что он не сломается, когда одно сообщение не работает.
Ответ 3
Вы действительно хотите, чтобы работа выполнялась последовательно в рамках разговора. Одним из решений было бы синхронизировать мьютекс, который является уникальным для этого разговора. Недостатком этого решения является то, что если разговоры недолговечны и частые начинаются новые разговоры, карта "мьютексов" будет быстро расти.
Для краткости я пропустил выключение исполнителя, фактическую обработку сообщений, обработку исключений и т.д.
public class MessageProcessor {
private final ExecutorService executor;
private final ConcurrentMap<Long, Object> mutexes = new ConcurrentHashMap<> ();
public MessageProcessor(int threadCount) {
executor = Executors.newFixedThreadPool(threadCount);
}
public static void main(String[] args) throws InterruptedException {
MessageProcessor p = new MessageProcessor(10);
BlockingQueue<Message> queue = new ArrayBlockingQueue<> (1000);
//some other thread populates the queue
while (true) {
Message m = queue.take();
p.process(m);
}
}
public void process(Message m) {
Object mutex = mutexes.computeIfAbsent(m.getConversationId(), id -> new Object());
executor.submit(() -> {
synchronized(mutex) {
//That where you actually process the message
}
});
}
}
Ответ 4
У меня была аналогичная проблема в моем приложении. Мое первое решение сортировало их с помощью java.util.ConcurrentHashMap. Таким образом, в вашем случае это будет ConcurrentHashMap с ключом talkId как ключом и списком сообщений в качестве значения. Проблема заключалась в том, что HashMap слишком большой, занимая слишком много места.
Мое текущее решение:
Один поток получает сообщения и сохраняет их в java.util.ArrayList. После приема N сообщений он переводит список во второй поток. Этот поток сортирует сообщения, используя метод ArrayList.sort, с помощью talkId и id. Затем поток проходит через отсортированный список и ищет блоки, которые могут быть обработаны. Каждый блок, который может быть обработан, вынимается из списка. Чтобы обработать блок, вы можете создать runnable с этим блоком и нажать его в службу-исполнитель. Сообщения, которые не могут быть обработаны, остаются в списке и будут проверяться в следующем раунде.
Ответ 5
Для чего это стоит, API Kafka Streams предоставляет большую часть этой функции. Перегородки сохраняют порядок. Это более крупный бай-ин, чем ExecutorService, но может быть интересным, особенно если вы уже используете Kafka.
Ответ 6
Я бы использовал три исполнительных сервиса (один для приема сообщений, один для сортировки сообщений, один для обработки сообщений).
Я бы также использовал одну очередь, чтобы поместить все полученные сообщения и другую очередь с сообщениями, отсортированными и сгруппированными (отсортированные по идентификатору беседы, затем создайте группы сообщений, которые используют один и тот же идентификатор беседы).
Наконец: один поток для приема сообщений, один поток для сортировки сообщений и все остальные потоки, используемые для обработки сообщений.
см. ниже:
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.stream.Collectors;
public class MultipleMessagesExample {
private static int MAX_ELEMENTS_MESSAGE_QUEUE = 1000;
private BlockingQueue<Message> receivingBlockingQueue = new LinkedBlockingDeque<>(MAX_ELEMENTS_MESSAGE_QUEUE);
private BlockingQueue<List<Message>> prioritySortedBlockingQueue = new LinkedBlockingDeque<>(MAX_ELEMENTS_MESSAGE_QUEUE);
public static void main(String[] args) {
MultipleMessagesExample multipleMessagesExample = new MultipleMessagesExample();
multipleMessagesExample.doTheWork();
}
private void doTheWork() {
int totalCores = Runtime.getRuntime().availableProcessors();
int totalSortingProcesses = 1;
int totalMessagesReceiverProcess = 1;
int totalMessagesProcessors = totalCores - totalSortingProcesses - totalMessagesReceiverProcess;
ExecutorService messagesReceiverExecutorService = Executors.newFixedThreadPool(totalMessagesReceiverProcess);
ExecutorService sortingExecutorService = Executors.newFixedThreadPool(totalSortingProcesses);
ExecutorService messageProcessorExecutorService = Executors.newFixedThreadPool(totalMessagesProcessors);
MessageReceiver messageReceiver = new MessageReceiver(receivingBlockingQueue);
messagesReceiverExecutorService.submit(messageReceiver);
MessageSorter messageSorter = new MessageSorter(receivingBlockingQueue, prioritySortedBlockingQueue);
sortingExecutorService.submit(messageSorter);
for (int i = 0; i < totalMessagesProcessors; i++) {
MessageProcessor messageProcessor = new MessageProcessor(prioritySortedBlockingQueue);
messageProcessorExecutorService.submit(messageProcessor);
}
}
}
class Message {
private Long id;
private Long conversationId;
private String someData;
public Message(Long id, Long conversationId, String someData) {
this.id = id;
this.conversationId = conversationId;
this.someData = someData;
}
public Long getId() {
return id;
}
public Long getConversationId() {
return conversationId;
}
public String getSomeData() {
return someData;
}
}
class MessageReceiver implements Callable<Void> {
private BlockingQueue<Message> bloquingQueue;
public MessageReceiver(BlockingQueue<Message> bloquingQueue) {
this.bloquingQueue = bloquingQueue;
}
@Override
public Void call() throws Exception {
System.out.println("receiving messages...");
bloquingQueue.add(new Message(1L, 1000L, "conversation1 data fragment 1"));
bloquingQueue.add(new Message(2L, 2000L, "conversation2 data fragment 1"));
bloquingQueue.add(new Message(3L, 1000L, "conversation1 data fragment 2"));
bloquingQueue.add(new Message(4L, 2000L, "conversation2 data fragment 2"));
return null;
}
}
/**
* sorts messages. group together same conversation IDs
*/
class MessageSorter implements Callable<Void> {
private BlockingQueue<Message> receivingBlockingQueue;
private BlockingQueue<List<Message>> prioritySortedBlockingQueue;
private List<Message> intermediateList = new ArrayList<>();
private MessageComparator messageComparator = new MessageComparator();
private static int BATCH_SIZE = 10;
public MessageSorter(BlockingQueue<Message> receivingBlockingQueue, BlockingQueue<List<Message>> prioritySortedBlockingQueue) {
this.receivingBlockingQueue = receivingBlockingQueue;
this.prioritySortedBlockingQueue = prioritySortedBlockingQueue;
}
@Override
public Void call() throws Exception {
while (true) {
boolean messagesReceivedQueueIsEmpty = false;
intermediateList = new ArrayList<>();
for (int i = 0; i < BATCH_SIZE; i++) {
try {
Message message = receivingBlockingQueue.remove();
intermediateList.add(message);
} catch (NoSuchElementException e) {
// this is expected when queue is empty
messagesReceivedQueueIsEmpty = true;
break;
}
}
Collections.sort(intermediateList, messageComparator);
if (intermediateList.size() > 0) {
Map<Long, List<Message>> map = intermediateList.stream().collect(Collectors.groupingBy(message -> message.getConversationId()));
map.forEach((k, v) -> prioritySortedBlockingQueue.add(new ArrayList<>(v)));
System.out.println("new batch of messages was sorted and is ready to be processed");
}
if (messagesReceivedQueueIsEmpty) {
System.out.println("message processor is waiting for messages...");
Thread.sleep(1000); // no need to use CPU if there are no messages to process
}
}
}
}
/**
* process groups of messages with same conversationID
*/
class MessageProcessor implements Callable<Void> {
private BlockingQueue<List<Message>> prioritySortedBlockingQueue;
public MessageProcessor(BlockingQueue<List<Message>> prioritySortedBlockingQueue) {
this.prioritySortedBlockingQueue = prioritySortedBlockingQueue;
}
@Override
public Void call() throws Exception {
while (true) {
List<Message> messages = prioritySortedBlockingQueue.take(); // blocks if no message is available
messages.stream().forEach(m -> processMessage(m));
}
}
private void processMessage(Message message) {
System.out.println(message.getId() + " - " + message.getConversationId() + " - " + message.getSomeData());
}
}
class MessageComparator implements Comparator<Message> {
@Override
public int compare(Message o1, Message o2) {
return (int) (o1.getConversationId() - o2.getConversationId());
}
}
Ответ 7
создать класс исполнителя, расширяющий Executor.On submit, вы можете ввести код, как показано ниже.
public void execute(Runnable command) {
final int key= command.getKey();
//Some code to check if it is runing
final int index = key != Integer.MIN_VALUE ? Math.abs(key) % size : 0;
workers[index].execute(command);
}
Создайте рабочего с очередью, чтобы, если вы хотите, чтобы какая-то задача выполнялась последовательно, запустите.
private final AtomicBoolean scheduled = new AtomicBoolean(false);
private final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(maximumQueueSize);
public void execute(Runnable command) {
long timeout = 0;
TimeUnit timeUnit = TimeUnit.SECONDS;
if (command instanceof TimeoutRunnable) {
TimeoutRunnable timeoutRunnable = ((TimeoutRunnable) command);
timeout = timeoutRunnable.getTimeout();
timeUnit = timeoutRunnable.getTimeUnit();
}
boolean offered;
try {
if (timeout == 0) {
offered = workQueue.offer(command);
} else {
offered = workQueue.offer(command, timeout, timeUnit);
}
} catch (InterruptedException e) {
throw new RejectedExecutionException("Thread is interrupted while offering work");
}
if (!offered) {
throw new RejectedExecutionException("Worker queue is full!");
}
schedule();
}
private void schedule() {
//if it is already scheduled, we don't need to schedule it again.
if (scheduled.get()) {
return;
}
if (!workQueue.isEmpty() && scheduled.compareAndSet(false, true)) {
try {
executor.execute(this);
} catch (RejectedExecutionException e) {
scheduled.set(false);
throw e;
}
}
}
public void run() {
try {
Runnable r;
do {
r = workQueue.poll();
if (r != null) {
r.run();
}
}
while (r != null);
} finally {
scheduled.set(false);
schedule();
}
}