Как последовательная, так и параллельная обработка

У меня есть один производитель и многие потребители.

  • производитель работает быстро и генерирует много результатов.
  • токены с одинаковым значением необходимо обрабатывать последовательно
  • токены с разными значениями должны обрабатываться параллельно
  • создание новых Runnables было бы очень дорогостоящим, а также производственный код мог бы работать с 100k токенов (для создания Runnable мне нужно передать конструктору некоторый сложный объект для сборки)

Могу ли я достичь тех же результатов с помощью более простого алгоритма? Вложение блока синхронизации с реентерабельной блокировкой кажется немного неестественным. Есть ли какие-либо условия гонки, которые вы могли бы заметить?

Обновление: второе решение, которое я нашел, работало с 3 коллекциями. Один для кэширования результатов производителя, второй очереди блокировки и третий, используя список для отслеживания в выполняемых задачах. Снова немного сложно.

Моя версия кода

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;

public class Main1 {
    static class Token {
        private int order;
        private String value;
        Token() {

        }
        Token(int o, String v) {
            order = o;
            value = v;
        }

        int getOrder() {
            return order;
        }

        String getValue() {
            return value;
        }
    }

    private final static BlockingQueue<Token> queue = new ArrayBlockingQueue<Token>(10);
    private final static ConcurrentMap<String, Object> locks = new ConcurrentHashMap<String, Object>();
    private final static ReentrantLock reentrantLock = new ReentrantLock();
    private final static Token STOP_TOKEN = new Token();
    private final static List<String> lockList = Collections.synchronizedList(new ArrayList<String>());

    public static void main(String[] args) {
        ExecutorService producerExecutor = Executors.newSingleThreadExecutor();
        producerExecutor.submit(new Runnable() {
            public void run() {
                Random random = new Random();
                    try {
                        for (int i = 1; i <= 100; i++) {
                            Token token = new Token(i, String.valueOf(random.nextInt(1)));

                            queue.put(token);
                        }

                        queue.put(STOP_TOKEN);
                    }catch(InterruptedException e){
                        e.printStackTrace();
                    }
                }
        });

        ExecutorService consumerExecutor = Executors.newFixedThreadPool(10);
        for(int i=1; i<=10;i++) {

            // creating to many runnable would be inefficient because of this complex not thread safe object
            final Object dependecy = new Object(); //new ComplexDependecy()
            consumerExecutor.submit(new Runnable() {
                public void run() {
                    while(true) {
                        try {
                            //not in order


                            Token token = queue.take();
                            if (token == STOP_TOKEN) {
                                queue.add(STOP_TOKEN);
                                return;
                            }


                            System.out.println("Task start" + Thread.currentThread().getId() + " order "  + token.getOrder());

                            Random random = new Random();
                            Thread.sleep(random.nextInt(200)); //doLongRunningTask(dependecy)
                            lockList.remove(token.getValue());

                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
            }});

    }
}}

Ответы

Ответ 1

Вы можете предварительно создать набор Runnables, который будет выбирать входящие задачи (токены) и поместить их в очереди в соответствии с их порядковым значением.

Как отмечалось в комментариях, это не гарантирует, что токены с разными значениями будут всегда выполняться параллельно (в общем, вы ограничены, по крайней мере, nr физических ядер в своей коробке). Тем не менее, гарантируется, что жетоны с одинаковым порядком будут выполняться в порядке поступления.

Пример кода:

/**
 * Executor which ensures incoming tasks are executed in queues according to provided key (see {@link Task#getOrder()}).
 */
public class TasksOrderingExecutor {

    public interface Task extends Runnable {
        /**
         * @return ordering value which will be used to sequence tasks with the same value.<br>
         * Tasks with different ordering values <i>may</i> be executed in parallel, but not guaranteed to.
         */
        String getOrder();
    }

    private static class Worker implements Runnable {

        private final LinkedBlockingQueue<Task> tasks = new LinkedBlockingQueue<>();

        private volatile boolean stopped;

        void schedule(Task task) {
            tasks.add(task);
        }

        void stop() {
            stopped = true;
        }

        @Override
        public void run() {
            while (!stopped) {
                try {
                    Task task = tasks.take();
                    task.run();
                } catch (InterruptedException ie) {
                    // perhaps, handle somehow
                }
            }
        }
    }

    private final Worker[] workers;
    private final ExecutorService executorService;

    /**
     * @param queuesNr nr of concurrent task queues
     */
    public TasksOrderingExecutor(int queuesNr) {
        Preconditions.checkArgument(queuesNr >= 1, "queuesNr >= 1");
        executorService = new ThreadPoolExecutor(queuesNr, queuesNr, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
        workers = new Worker[queuesNr];
        for (int i = 0; i < queuesNr; i++) {
            Worker worker = new Worker();
            executorService.submit(worker);
            workers[i] = worker;
        }
    }

    public void submit(Task task) {
        Worker worker = getWorker(task);
        worker.schedule(task);
    }

    public void stop() {
        for (Worker w : workers) w.stop();
        executorService.shutdown();
    }

    private Worker getWorker(Task task) {
        return workers[task.getOrder().hashCode() % workers.length];
    }
}

Ответ 2

По характеру вашего кода, единственный способ гарантировать, что токены с одно и то же значение обрабатывается последовательным образом, так это ждать появления STOP_TOKEN.

Вам понадобится единая установка для одного производителя, с потребительским сбором и сортировкой токены по их значению (в Multimap, скажем).

Только тогда вы знаете, какие токены могут обрабатываться последовательно и которые могут обрабатываться параллельно.

В любом случае, я советую вам посмотреть LMAX Disruptor, что является очень эффективным способом обмена данными между потоками.

Он не страдает от накладных расходов синхронизации как исполнителей, поскольку он свободен от блокировки (что может дать вам хорошее преимущество в производительности, в зависимости от характера вашей обработки данных).

Решение с использованием двух разрывов

// single thread for processing as there will be only on consumer
Disruptor<InEvent> inboundDisruptor = new Disruptor<>(InEvent::new, 32, Executors.newSingleThreadExecutor());

// outbound disruptor that uses 3 threads for event processing
Disruptor<OutEvent> outboundDisruptor = new Disruptor<>(OutEvent::new, 32, Executors.newFixedThreadPool(3));

inboundDisruptor.handleEventsWith(new InEventHandler(outboundDisruptor));

// setup 3 event handlers, doing round robin consuming, effectively processing OutEvents in 3 threads
outboundDisruptor.handleEventsWith(new OutEventHandler(0, 3, new Object()));
outboundDisruptor.handleEventsWith(new OutEventHandler(1, 3, new Object()));
outboundDisruptor.handleEventsWith(new OutEventHandler(2, 3, new Object()));

inboundDisruptor.start();
outboundDisruptor.start();

// publisher code
for (int i = 0; i < 10; i++) {
    inboundDisruptor.publishEvent(InEventTranslator.INSTANCE, new Token());
}

Обработчик событий на входящем сбое просто собирает входящие токены. Когда маркер STOP принимается, он публикует серию токенов для исходящего разрушителя для дальнейшей обработки:

public class InEventHandler implements EventHandler<InEvent> {

    private ListMultimap<String, Token> tokensByValue = ArrayListMultimap.create();
    private Disruptor<OutEvent> outboundDisruptor;

    public InEventHandler(Disruptor<OutEvent> outboundDisruptor) {
        this.outboundDisruptor = outboundDisruptor;
    }

    @Override
    public void onEvent(InEvent event, long sequence, boolean endOfBatch) throws Exception {
        if (event.token == STOP_TOKEN) {
            // publish indexed tokens to outbound disruptor for parallel processing
            tokensByValue.asMap().entrySet().stream().forEach(entry -> outboundDisruptor.publishEvent(OutEventTranslator.INSTANCE, entry.getValue()));
        } else {
            tokensByValue.put(event.token.value, event.token);
        }
    }
}

Обработчик исходящих событий обрабатывает токены одного и того же значения последовательно:

public class OutEventHandler implements EventHandler<OutEvent> {

    private final long order;
    private final long allHandlersCount;
    private Object yourComplexDependency;

    public OutEventHandler(long order, long allHandlersCount, Object yourComplexDependency) {
        this.order = order;
        this.allHandlersCount = allHandlersCount;
        this.yourComplexDependency = yourComplexDependency;
    }

    @Override
    public void onEvent(OutEvent event, long sequence, boolean endOfBatch) throws Exception {
        if (sequence % allHandlersCount != order ) {
            // round robin, do not consume every event to allow parallel processing
            return;
        }

        for (Token token : event.tokensToProcessSerially) {
            // do procesing of the token using your complex class
        }

    }
}

Остальная часть необходимой инфраструктуры (цель описана в документах Disruptor):

public class InEventTranslator implements EventTranslatorOneArg<InEvent, Token> {

    public static final InEventTranslator INSTANCE = new InEventTranslator();

    @Override
    public void translateTo(InEvent event, long sequence, Token arg0) {
        event.token = arg0;
    }

}

public class OutEventTranslator implements EventTranslatorOneArg<OutEvent, Collection<Token>> {

    public static final OutEventTranslator INSTANCE = new OutEventTranslator();

    @Override
    public void translateTo(OutEvent event, long sequence, Collection<Token> tokens) {
        event.tokensToProcessSerially = tokens;
    }
}


public class InEvent {

    // Note that no synchronization is used here,
    // even though the field is used among multiple threads.
    // Memory barrier used by Disruptor guarantee changes are visible.
    public Token token;
}

public class OutEvent {
    // ... again, no locks.
    public Collection<Token> tokensToProcessSerially;

}

public class Token {
    String value;

}

Ответ 3

Если у вас много разных токенов, самым простым решением является создание некоторого количества однопоточных исполнителей (около 2x вашего количества ядер), а затем распределение каждой задачи исполнителю, определяемому хешем его токена.

Таким образом, все задачи с одним и тем же токеном перейдут к одному и тому же исполнителю и будут выполняться последовательно, поскольку каждый исполнитель имеет только один поток.

Если у вас есть некоторые неустановленные требования о планировании справедливости, то достаточно легко избежать каких-либо значительных дисбалансов, если поток производителей поставит свои запросы (или блокирует) до их распространения до тех пор, пока не будет, скажем, менее 10 запросов на каждого исполнителя.

Ответ 4

В следующем решении будет использоваться только одна Карта, которая используется производителем и потребителями для обработки заказов в последовательном порядке для каждого номера заказа при одновременном обработке разных порядковых номеров. Вот код:

public class Main {

    private static final int NUMBER_OF_CONSUMER_THREADS = 10;
    private static volatile int sync = 0;

    public static void main(String[] args) {
        final ConcurrentHashMap<String,Controller> queues = new ConcurrentHashMap<String, Controller>();
        final CountDownLatch latch = new CountDownLatch(NUMBER_OF_CONSUMER_THREADS);
        final AtomicBoolean done = new AtomicBoolean(false);

        // Create a Producer
        new Thread() {
            {
                this.setDaemon(true);
                this.setName("Producer");
                this.start();
            }

            public void run() {
                Random rand = new Random();

                for(int i =0 ; i < 1000 ; i++) {
                    int order = rand.nextInt(20);
                    String key = String.valueOf(order);
                    String value = String.valueOf(rand.nextInt());
                    Controller controller = queues.get(key);
                    if (controller == null) {
                        controller = new Controller();
                        queues.put(key, controller);
                    }
                    controller.add(new Token(order, value));
                    Main.sync++;
                }

                done.set(true);
            }
        };

        while (queues.size() < 10) {
            try {
                // Allow the producer to generate several entries that need to
                // be processed.
                Thread.sleep(5000);
            } catch (InterruptedException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
        }

        // System.out.println(queues);

        // Create the Consumers
        ExecutorService consumers = Executors.newFixedThreadPool(NUMBER_OF_CONSUMER_THREADS);
        for(int i = 0 ; i < NUMBER_OF_CONSUMER_THREADS ; i++) {
            consumers.submit(new Runnable() {
                private Random rand = new Random();

                public void run() {
                    String name = Thread.currentThread().getName();
                    try {
                        boolean one_last_time = false;
                        while (true) {
                            for (Map.Entry<String, Controller> entry : queues.entrySet()) {
                                Controller controller = entry.getValue();
                                if (controller.lock(this)) {
                                    ConcurrentLinkedQueue<Token> list = controller.getList();
                                    Token token;
                                    while ((token = list.poll()) != null) {
                                        try {
                                            System.out.println(name + " processing order: " + token.getOrder()
                                                    + " value: " + token.getValue());
                                            Thread.sleep(rand.nextInt(200));
                                        } catch (InterruptedException e) {
                                        }
                                    }
                                    int last = Main.sync;
                                    queues.remove(entry.getKey());
                                    while(done.get() == false && last == Main.sync) {
                                        // yield until the producer has added at least another entry
                                        Thread.yield();
                                    }
                                    // Purge any new entries added
                                    while ((token = list.poll()) != null) {
                                        try {
                                            System.out.println(name + " processing order: " + token.getOrder()
                                                    + " value: " + token.getValue());
                                            Thread.sleep(200);
                                        } catch (InterruptedException e) {
                                        }
                                    }
                                    controller.unlock(this);
                                }
                            }
                            if (one_last_time) {
                                return;
                            }
                            if (done.get()) {
                                one_last_time = true;
                            }
                        }
                    } finally {
                        latch.countDown();
                    }
                }
            });
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        consumers.shutdown();
        System.out.println("Exiting.. remaining number of entries: " + queues.size());
    }

}

Обратите внимание: класс Main содержит экземпляр очередей, являющийся Map. Ключ карты - это идентификатор заказа, который вы хотите последовательно обрабатывать потребителями. Это класс Controller, который будет содержать все заказы, связанные с этим идентификатором заказа.

Производитель будет генерировать заказы и добавить заказ (токен) к соответствующему контроллеру. Потребители будут итератором над значениями карт очередей и вызовом метода блокировки контроллера, чтобы определить, может ли он обрабатывать заказы для этого конкретного идентификатора заказа. Если блокировка вернет false, она проверит следующий экземпляр контроллера. Если блокировка вернет true, она обработает все заказы, а затем проверит следующий контроллер.

обновлено Добавлено целое число синхронизации, которое используется, чтобы гарантировать, что, когда экземпляр контроллера будет удален из карты очередей. Все его записи будут использованы. В потребительском коде была логическая ошибка, в которой вскоре был вызван метод разблокировки.

Класс Token похож на тот, который вы разместили здесь.

class Token {
    private int order;
    private String value;

    Token(int order, String value) {
        this.order = order;
        this.value = value;
    }

    int getOrder() {
        return order;
    }

    String getValue() {
        return value;
    }

    @Override
    public String toString() {
        return "Token [order=" + order + ", value=" + value + "]\n";
    }
}

Следующий класс Controller используется для обеспечения того, чтобы обрабатывать заказы только один поток в пуле потоков. Методы блокировки/разблокировки используются для определения того, какой из потоков будет разрешен для обработки заказов.

class Controller {

    private ConcurrentLinkedQueue<Token> tokens = new ConcurrentLinkedQueue<Token>();
    private ReentrantLock lock = new ReentrantLock();
    private Runnable current = null;

    void add(Token token) {
        tokens.add(token);
    }

    public ConcurrentLinkedQueue<Token> getList() {
        return tokens;
    }

    public void unlock(Runnable runnable) {
        lock.lock();
        try {
            if (current == runnable) {
                current = null;
            }
        } finally {
            lock.unlock();
        }
    }

    public boolean lock(Runnable runnable) {
        lock.lock();
        try {
            if (current == null) {
                current = runnable;
            }
        } finally {
            lock.unlock();
        }
        return current == runnable;
    }

    @Override
    public String toString() {
        return "Controller [tokens=" + tokens + "]";
    }

}

Дополнительная информация об осуществлении. Он использует CountDownLatch, чтобы гарантировать, что все произведенные заказы будут обработаны до выхода процесса. Заданная переменная точно так же, как и ваша переменная STOP_TOKEN.

В реализации есть проблема, которую вам нужно будет решить. Существует проблема, что он не очищает контроллер для идентификатора заказа, когда все заказы обрабатываются. Это приведет к тому, что поток в пуле потоков будет назначен контроллеру, который не содержит заказов. Который будет тратить циклы процессора, которые могут быть использованы для выполнения других задач.

Ответ 5

Все, что вам нужно, это убедиться, что токены с одинаковым значением не обрабатываются одновременно? Ваш код слишком запутан, чтобы понять, что вы имеете в виду (он не компилируется и содержит много неиспользуемых переменных, блокировок и карт, которые создаются, но никогда не используются). Похоже, вы сильно задумались над этим. Все, что вам нужно, это одна очередь и одна карта. Что-то вроде этого я воображаю:

   class Consumer implements Runnable {
     ConcurrentHashMap<String, Token> inProcess;
     BlockingQueue<Token> queue;

     public void run() {
        Token token = null;
        while ((token = queue.take()) != null) {
           if(inProcess.putIfAbsent(token.getValue(), token) != null) {
              queue.put(token);
              continue;
           }
           processToken(token);
           inProcess.remove(token.getValue());
        }
     }
   }

Ответ 6

токены с одинаковым значением необходимо обрабатывать последовательно

Способ гарантировать, что любые две вещи происходят последовательно, - это делать их в одном и том же потоке.

У меня будет коллекция из нескольких рабочих потоков, и у меня будет карта. Каждый раз, когда я получаю токен, который я раньше не видел, я выбираю случайный случай и вводим токен и поток в карту. С этого момента я буду использовать тот же поток для выполнения задач, связанных с этим токеном.

создание новых Runnables было бы очень дорого

Runnable - это интерфейс. Создание новых объектов, реализующих Runnable, не будет значительно дороже, чем создание любого другого объекта.

Ответ 7

Может, я что-то не понимаю. Но кажется, что было бы легче отфильтровать токены с тем же значением из тех, у кого разные значения, в две разные очереди.

И затем используйте Stream с картой или foreach для последовательного. И просто используйте версию параллельного потока для остальных.

Если ваши токены в рабочей среде лениво сгенерированы, и вы получаете только один за раз, вы просто делаете какой-то фильтр, который распределяет их в две разные очереди.

Если вы можете реализовать его с помощью Streams, я suqqest делает это, поскольку они просты, просты в использовании и FAST!

https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html

Я сделал краткий пример того, что я имею в виду. В этом случае числа Tokens являются своего рода искусственно построенными, но это не относится к делу. Также потоки инициируются в основном потоке, что, вероятно, также не будет идеальным.

public static void main(String args[]) {
    ArrayList<Token> sameValues = new ArrayList<Token>();
    ArrayList<Token> distinctValues = new ArrayList<Token>();
    Random random = new Random();
    for (int i = 0; i < 100; i++) {
        int next = random.nextInt(100);
        Token n = new Token(i, String.valueOf(next));
        if (next == i) {
            sameValues.add(n);
        } else {
            distinctValues.add(n);
        }
    }       
    distinctValues.stream().parallel().forEach(token -> System.out.println("Distinct: " + token.value));
    sameValues.stream().forEach(token -> System.out.println("Same: " + token.value));       
}

Ответ 8

Я не совсем уверен, что понял вопрос, но я возьму на себя алгоритм.

Актеры:

  • A queue задач
  • A pool свободного executors
  • A set из токенов in-process, обрабатываемых в настоящее время
  • A controller

Затем

  • Изначально все executors доступны, а set пуст

  • controller выбирает доступный executor и проходит через queue ищет task с токеном, который не находится в in-process set, и когда он находит его

    • добавляет токен в набор in-process
    • назначает executor для обработки task и
    • возвращается в начало очереди
  • executor удаляет токен из set, когда он выполняется, и добавляет себя обратно в пул

Ответ 9

Один из способов сделать это - иметь одного исполнителя для обработки последовательности и один для параллельной обработки. Нам также нужна служба с одним потоковым менеджером, которая решит, какой токен сервисов должен быть отправлен для обработки.                                                                                               // Очередь, которая будет использоваться обоими потоками. Содержит маркеры производства производителя.
                       BlockingQueue tokenList = new ArrayBlockingQueue (10);

    private void startProcess() {
    ExecutorService producer = Executors.newSingleThreadExecutor();
    final ExecutorService consumerForSequence = Executors
            .newSingleThreadExecutor();
    final ExecutorService consumerForParallel = Executors.newFixedThreadPool(10);
    ExecutorService manager = Executors.newSingleThreadExecutor();

    producer.submit(new Producer(tokenList));

    manager.submit(new Runnable() {

        public void run() {
            try {
                while (true) {
                    Token t = tokenList.take();
                    System.out.println("consumed- " + t.orderid
                            + " element");

                    if (t.orderid % 7 == 0) { // any condition to check for sequence processing

                        consumerForSequence.submit(new ConsumerForSequenceProcess(t));

                    } else {

                        ConsumerForParallel.submit(new ConsumerForParallelProcess(t));

                    }
                }
            }

            catch (InterruptedException e) { // TODO Auto-generated catch
                // block
                e.printStackTrace();
            }

        }
    });
}

Ответ 10

Я думаю, что за этой задачей скрывается более фундаментальная проблема дизайна, но все в порядке. Я не могу определить из вашего описания проблемы, если вы хотите выполнить заказ в порядке или просто хотите, чтобы операции над задачами, описанными одиночными токенами, были атомарными/транзакционными. То, что я предлагаю ниже, кажется скорее "быстрым решением" этой проблемы, чем реальным решением.

Для реального случая "упорядоченного исполнения" я предлагаю решение, основанное на прокси-серверах очереди, которые упорядочивают вывод:

  • Определите реализацию Queue, которая предоставляет метод factory, генерирующий очереди прокси, которые представляются стороне производителя одним объектом очереди; метод factory должен также регистрировать эти объекты очереди прокси. добавление элемента во входную очередь должно добавить его непосредственно в одну из выходных очередей, если она соответствует одному из элементов в одной из выходных очередей. В противном случае добавьте его в любую (самую короткую) очередь вывода. (выполнить проверку для этого эффективно). Альтернативно (немного лучше): не делайте этого, когда элемент добавляется, но когда какая-либо из выходных очередей пуста.

  • Предоставьте каждому из ваших загружаемых потребителей поле, в котором хранится отдельный интерфейс Queue (вместо доступа к одному объекту). Инициализируйте это поле с помощью метода factory, определенного выше.

В случае транзакции я думаю, что проще охватить больше потоков, чем у вас есть ядра (используйте статистику, чтобы вычислить это) и реализовать механизм блокировки на более низком (объектном) уровне.