Weird Hazelcat IMap # put() поведение
Моя программа на базе Hazelcast может работать в двух режимах: отправитель и рабочий.
Submitter ставит POJO на распределенную карту с помощью некоторого ключа, например: hazelcastInstance.getMap(MAP_NAME).put(key, value);
Рабочий имеет бесконечный цикл (с Thread.sleep(1000L);
внутри для таймаута), который должен обрабатывать сущности из карты. Пока я просто печатаю размер карты в этом цикле.
Теперь вот проблема. Я запускаю рабочее приложение. Затем я запускаю четыре отправителя одновременно (каждый добавляет запись на карту и завершает работу). Но после того, как все приложения-приложения выполнены, рабочее приложение печатает произвольный размер: иногда он обнаруживает, что добавлена только одна запись, иногда две, иногда три (на самом деле она никогда не видела всех четырех записей).
В чем проблема с этим простым потоком? Я читал в документах Hazelcast, что метод put()
является синхронным, поэтому он гарантирует, что после его возврата запись будет помещена в распределенную карту и будет реплицирована. Но в моем эксперименте это не кажется.
UPD (код)
Отправитель:
public void submit(String key) {
Object mySerializableObject = ...
IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME);
map.putIfAbsent(key, mySerializableObject, TASK_TTL_IN_HOURS, TimeUnit.HOURS);
}
Рабочий:
public void process() {
while (true) {
IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME);
System.out.println(map.size());
// Optional<Map.Entry<String, Object>> objectToProcess = getObjectToProcess();
// objectToProcess.ifPresent(objectToProcess-> processObject(id, objectToProcess));
try {
Thread.sleep(PAUSE);
} catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}
}
}
Я прокомментировал "обработку" самой части, потому что теперь я просто пытаюсь получить согласованное состояние карты. Приведенный выше код печатает разные результаты каждый раз, например: "4, 3, 1, 1, 1, 1, 1..." (поэтому он может даже увидеть 4 поставленные задачи на мгновение, но затем они... исчезнут).
UPD (журнал)
Рабочий:
...
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 1
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
...
Отправитель 1:
Before: tasksMap.size() = 0
After: tasksMap.size() = 1
Отправитель 2:
Before: tasksMap.size() = 1
After: tasksMap.size() = 4
Отправитель 3:
Before: tasksMap.size() = 1
After: tasksMap.size() = 2
Отправитель 4:
Before: tasksMap.size() = 3
After: tasksMap.size() = 4
Ответы
Ответ 1
Ну, я думаю, я понял проблему. Насколько я понимаю, распределенный IMap
, возвращаемый hazelcastInstance.getMap
, не гарантирует, что данные реплицируются по всем существующим узлам кластера: некоторые части данных могут быть реплицированы на некоторые узлы, другая часть - на другую узлы. Поэтому в моем примере некоторые из представленных задач были реплицированы не на рабочий node (который работает постоянно), а на некоторые другие отправители, которые прекращают их выполнение после отправки. Таким образом, такие записи были потеряны при выходе отправителей.
Я решил эту проблему, заменив hazelcastInstance.getMap
на hazelcastInstance.getReplicatedMap
. Этот метод возвращает ReplicatedMap
, который, AFAIK, гарантирует, что введенные в него записи будут реплицированы на все узлы кластера. Итак, теперь все работает отлично в моей системе.