Обновите синглтон HashMap с помощью Google pub/sub
У меня есть случай использования, когда я инициализирую HashMap, который содержит набор данных поиска (информация о физическом местоположении и т.д. Устройств IoT). Эти данные поиска служат в качестве справочных данных для второго набора данных, который представляет собой PCollection. Эта коллекция ПК представляет собой поток данных, который предоставляет данные, которые записывают устройства IoT. Поток данных с устройств IoT использует конвейер Apache Beam, который работает как поток данных Google с использованием паба/подпрограммы Google Cloud.
Когда я обрабатываю PCollection (данные устройства), я связываю данные публикации/подписки Google Cloud с соответствующей записью поиска в HashMap.
Мне нужно обновить HashMap, основываясь на 2-м пабе/сабе, который вносит изменения в свои данные. Вот как я получаю PCollection и делаю поиск, используя HashMap:
HashMap → содержит предварительно загруженные данные поиска (информация об устройствах IoT)
PCollection → содержит данные из конвейерного потока данных (данные, записанные устройствами IoT)
Я создаю HashMap для данных поиска устройств IoT как одиночный:
public class MyData {
private static final MyData instance = new MyData ();
private MyData () {
HashMap myDataMap = new HashMap<String, String>();
... logic to populate the map
this.referenceData = myDataMap;
}
public HashMap<Integer, DeviceReference> referenceData;
public static DeviceData getInstance(){
return instance;
}
}
Затем я использую HashMap в другом классе, где я подписываюсь на обновления данных (это сообщения, которые, например, дают мне новые данные, которые относятся к сущностям, уже сохраненным в HashMap). Я подписываюсь на изменения, используя паб/саб Google с Apache beam:
HashMap<String, String> referenceData = MyData.getInstance().referenceData;
Pipeline pipeLine = Pipeline.create(options);
// subscribe to changes in data
org.apache.beam.sdk.values.PCollection myDataUpdates;
myDataUpdates = pipeLine.begin()
.apply(String.format("Subscribe to data updates"),
PubsubIO.readStrings().fromTopic(
String.format("myPubSubPath")));
Я хочу эффективно применить обновления данных к одноэлементной HashMap (т.е. манипулировать HashMap на основе моей подписки на данные). Как я могу это сделать?
У меня ограниченное понимание Apache Beam, и я знаю только, как выполнять преобразования в данных конвейера, чтобы создать другую отдельную PCollection
. Я думаю, что в этом смысл Beam, что он предназначен для преобразования больших массивов данных в другую форму. Есть ли способ достижения того, что мне нужно (обновление набора данных на основе подписки pub/sub) с помощью Apache Beam, или есть ли другой способ обновить HashMap с помощью pub/sub? (Я не могу опрашивать данные, так как это создает слишком большую задержку и стоимость, мне нужно обновить HashMap с помощью подписки).
Облачные документы Google показывают способ прямой подписки на паб/подраздел Google Cloud, который не связан с конвейером Apache Beam. Это многообещающее как потенциальное решение и основывается на следующей зависимости Maven:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>1.53.0</version>
</dependency>
Я получаю конфликт, который конфликтует со следующими зависимостями Maven для Apache Beam:
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>2.5.0</version>
</dependency>
Эта проблема описана в отдельном вопросе здесь - конфликт Maven в Java-приложении с зависимостью google-cloud-core-grpc. Из того, что я вижу, кажется, что не имеет значения, какую версию артефакта Maven google-cloud-pubsub
я использую, поскольку из того, что я выяснил, это выглядит как зависимость луча v.2.5.0 и ниже будет всегда конфликтовать с любой текущей версией зависимости от Google.
(Я поднял это как проблему в Beam Jira - https://issues.apache.org/jira/browse/BEAM-6118)
В настоящее время я изучаю сторонние входные данные и combine
, чтобы добиться обновления HashMap:
https://www.programcreek.com/java-api-examples/?api=org.apache.beam.sdk.transforms.Combine
В примере 10 показан способ применения .getSideInputsMap()
к payload
. Мне интересно, могу ли я применить это как-то к моей подписке на изменения данных поиска. Если я получу PCollection
таким образом, я не смогу напрямую .getSideInputsMap()
с PCollection
deviceReferenceDataUpdates = pipeLine.begin()
.apply("Get changes to the IoT device lookup data"),
PubsubIO.readMessages().fromTopic("IoT device lookup data")).
Я задал отдельный вопрос, конкретно о том, как я могу использовать .getSideInputsMap()
- Apache Beam - как я могу применить .getSideInputsMap к подписке на паб/подписчик Google?
Ответы
Ответ 1
Я нашел способ сделать это в рамках Apache Beam следующим образом (не полностью протестирован).
Обратите внимание - примите во внимание комментарий к OP от @Serg M Ten, что лучшим подходом может быть консолидация данных позже, вместо того, чтобы пытаться объединить данные поиска как часть обработки преобразования.
Singleton HashMap
Смотрите мой ответ здесь - Доступ к HashMap из другого класса
Трубопровод (на одной нити, реализован в main
)
// initialise singleton HashMap containing lookup data on bootstrap:
LookupData lookupData = LookupData.getInstance();
org.apache.beam.sdk.values.PCollection lookupDataUpdateMessage;
lookupDataUpdateMessage = pipeLine.begin()
.apply("Extract lookup update data", PubsubIO.readStrings().fromTopic("myLookupUpdatePubSubTopic"))
.apply("Transform lookup update data",
ParDo.of(new TransformLookupData.TransformFn()));
org.apache.beam.sdk.values.PCollection lookupDataMessage;
преобразование
import java.io.Serializable;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.transforms.DoFn;
import org.json.JSONObject;
import myLookupSingletonClass;
import myLookupUpObjectClass;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Strings;
public class TransformDeviceMeta
public static class TransformFn extends DoFn<String, MyLookupData> {
@ProcessElement
public void processElement(ProcessContext c)
{
LookupData lookupData = LookupData.getInstance();
MyLookupData myLookupDataUpdate = new MyLookupData();
try
{
byte[] payload = c.element().getBytes();
String myLookUpDataJson = new JSONObject(new String(payload)).toString();
ObjectMapper mapper = new ObjectMapper();
myLookUpDataUpdate = mapper.readValue(myLookUpDataJson , MyLookupData.class);
String updatedLookupDataId = updatedLookupDataId.id;
// logic for HashMap updating e.g:
lookupData.myHashMap.remove(updatedDeviceId);
}
else {
lookupData.myHashMap.put(updatedDeviceId, deviceMetaUpdate);
}
}
catch (Exception ex) {
Log.error(ex.getMessage());
System.out.println("Error " + ex.getMessage());
}
}
}
}
MyLookupData
= Класс, который формирует модель для данных поиска