Weird Hazelcat IMap # put() поведение

Моя программа на базе Hazelcast может работать в двух режимах: отправитель и рабочий.

Submitter ставит POJO на распределенную карту с помощью некоторого ключа, например: hazelcastInstance.getMap(MAP_NAME).put(key, value);

Рабочий имеет бесконечный цикл (с Thread.sleep(1000L); внутри для таймаута), который должен обрабатывать сущности из карты. Пока я просто печатаю размер карты в этом цикле.

Теперь вот проблема. Я запускаю рабочее приложение. Затем я запускаю четыре отправителя одновременно (каждый добавляет запись на карту и завершает работу). Но после того, как все приложения-приложения выполнены, рабочее приложение печатает произвольный размер: иногда он обнаруживает, что добавлена ​​только одна запись, иногда две, иногда три (на самом деле она никогда не видела всех четырех записей).

В чем проблема с этим простым потоком? Я читал в документах Hazelcast, что метод put() является синхронным, поэтому он гарантирует, что после его возврата запись будет помещена в распределенную карту и будет реплицирована. Но в моем эксперименте это не кажется.

UPD (код)

Отправитель:

public void submit(String key) {
    Object mySerializableObject = ...
    IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME);
    map.putIfAbsent(key, mySerializableObject, TASK_TTL_IN_HOURS, TimeUnit.HOURS);
}

Рабочий:

public void process() {
    while (true) {
        IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME);
        System.out.println(map.size());

        // Optional<Map.Entry<String, Object>> objectToProcess = getObjectToProcess();
        // objectToProcess.ifPresent(objectToProcess-> processObject(id, objectToProcess));
        try {
            Thread.sleep(PAUSE);
        } catch (InterruptedException e) {
            LOGGER.error(e.getMessage(), e);
        }
    }
}

Я прокомментировал "обработку" самой части, потому что теперь я просто пытаюсь получить согласованное состояние карты. Приведенный выше код печатает разные результаты каждый раз, например: "4, 3, 1, 1, 1, 1, 1..." (поэтому он может даже увидеть 4 поставленные задачи на мгновение, но затем они... исчезнут).

UPD (журнал)

Рабочий:

...
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 1
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
...

Отправитель 1:

Before: tasksMap.size() = 0
After: tasksMap.size() = 1

Отправитель 2:

Before: tasksMap.size() = 1
After: tasksMap.size() = 4

Отправитель 3:

Before: tasksMap.size() = 1
After: tasksMap.size() = 2

Отправитель 4:

Before: tasksMap.size() = 3
After: tasksMap.size() = 4

Ответы

Ответ 1

Ну, я думаю, я понял проблему. Насколько я понимаю, распределенный IMap, возвращаемый hazelcastInstance.getMap, не гарантирует, что данные реплицируются по всем существующим узлам кластера: некоторые части данных могут быть реплицированы на некоторые узлы, другая часть - на другую узлы. Поэтому в моем примере некоторые из представленных задач были реплицированы не на рабочий node (который работает постоянно), а на некоторые другие отправители, которые прекращают их выполнение после отправки. Таким образом, такие записи были потеряны при выходе отправителей.

Я решил эту проблему, заменив hazelcastInstance.getMap на hazelcastInstance.getReplicatedMap. Этот метод возвращает ReplicatedMap, который, AFAIK, гарантирует, что введенные в него записи будут реплицированы на все узлы кластера. Итак, теперь все работает отлично в моей системе.