Как закрыть соединение с базой данных, открытое реализацией IBackingMap в топологии Storm Trident?
Я использую IBackingMap для своей топологии Trident для хранения кортежей для ElasticSearch (я знаю, что существует несколько реализаций для интеграции Trident/ElasticSearch, уже существующей в GitHub, однако я решил реализовать пользовательский, который лучше подходит для моей задачи).
Итак, моя реализация классическая с factory:
public class ElasticSearchBackingMap implements IBackingMap<OpaqueValue<BatchAggregationResult>> {
// omitting here some other cool stuff...
private final Client client;
public static StateFactory getFactoryFor(final String host, final int port, final String clusterName) {
return new StateFactory() {
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
ElasticSearchBackingMap esbm = new ElasticSearchBackingMap(host, port, clusterName);
CachedMap cm = new CachedMap(esbm, LOCAL_CACHE_SIZE);
MapState ms = OpaqueMap.build(cm);
return new SnapshottableMap(ms, new Values(GLOBAL_KEY));
}
};
}
public ElasticSearchBackingMap(String host, int port, String clusterName) {
Settings settings = ImmutableSettings.settingsBuilder()
.put("cluster.name", clusterName).build();
// TODO add a possibility to close the client
client = new TransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(host, port));
}
// the actual implementation is left out
}
Вы видите, что он получает имя хоста/порта/кластера в качестве входных параметров и создает клиент ElasticSearch в качестве члена класса, НО НИКОГДА НЕ ЗАКРЫВАЕТ КЛИЕНТА.
Затем он используется из топологии довольно знакомым способом:
tridentTopology.newStream("spout", spout)
// ...some processing steps here...
.groupBy(aggregationFields)
.persistentAggregate(
ElasticSearchBackingMap.getFactoryFor(
ElasticSearchConfig.ES_HOST,
ElasticSearchConfig.ES_PORT,
ElasticSearchConfig.ES_CLUSTER_NAME
),
new Fields(FieldNames.OUTCOME),
new BatchAggregator(),
new Fields(FieldNames.AGGREGATED));
Эта топология завернута в какую-то общедоступную статическую void main, упакована в банку и отправлена в Storm для выполнения.
Вопрос в том, должен ли я беспокоиться о закрытии соединения ElasticSearch или это собственный бизнес Storm? Если это не сделано Storm, как и когда в жизненном цикле топологии я должен это сделать?
Спасибо заранее!
Ответы
Ответ 1
Хорошо, отвечая на мой собственный вопрос.
Прежде всего, еще раз спасибо @dedek за предложения и возрождение билета в Storm Jira.
Наконец, поскольку нет официального способа сделать это, я решил пойти на метод cleanup() Trident Filter. До сих пор я проверил следующее (для Storm v. 0.9.4):
С LocalCluster
- cleanup() вызывается при выключениях кластера
- cleanup() НЕ вызывается при убийстве топологии, это не должно быть трагедией, очень вероятно, что он не будет использовать LocalCluster для реальных развертываний в любом случае
С реальным кластером
- он вызывается, когда топология убивается, а также когда рабочий остановлен с использованием pkill -TERM -u storm -f 'backtype.storm.daemon.worker'
- он не вызывается, если рабочий убит с kill -9 или когда он сбой или - грустно - когда рабочий умирает из-за исключения
В целом, что дает более или менее приличную гарантию очистки() для вызова, при условии, что вы будете осторожны с обработкой исключений (я все равно добавляю "thundercatches" к каждому из моих примитивов Trident).
Мой код:
public class CloseFilter implements Filter {
private static final Logger LOG = LoggerFactory.getLogger(CloseFilter.class);
private final Closeable[] closeables;
public CloseFilter(Closeable... closeables) {
this.closeables = closeables;
}
@Override
public boolean isKeep(TridentTuple tuple) {
return true;
}
@Override
public void prepare(Map conf, TridentOperationContext context) {
}
@Override
public void cleanup() {
for (Closeable c : closeables) {
try {
c.close();
} catch (Exception e) {
LOG.warn("Failed to close an instance of {}", c.getClass(), e);
}
}
}
}
Однако было бы неплохо, если бы некоторые крючки для закрытия соединений стали частью API.