Flink Streaming: Как вывести один поток данных на разные выходы в зависимости от данных?
В Apache Flink у меня есть поток кортежей. Возьмем действительно простой Tuple1<String>
. Кортеж может иметь произвольное значение в поле значений (например, "P1", "P2" и т.д.). Набор возможных значений конечен, но я не знаю полного набора заранее (так что может быть "P362" ). Я хочу написать этот кортеж в определенное место вывода в зависимости от значения внутри кортежа. Так, например, Я хотел бы иметь следующую структуру файлов:
В документации я только нашел возможность писать в местах, которые я знаю заранее (например, stream.writeCsv("/output/somewhere")
), но никоим образом не позволять содержимому данных решать, где данные фактически заканчиваются.
Я прочитал о расщеплении вывода в документации, но это, похоже, не дает возможности перенаправить вывод в разные адресаты так, как я хотел бы иметь его (или просто не понимаю, как это будет работать).
Можно ли это сделать с помощью Flink API, если да, то каким образом? Если нет, может быть, может быть, сторонняя библиотека, которая может это сделать, или мне пришлось бы строить такую вещь самостоятельно?
Обновление
Следуя предложению Маттиаса, я придумал функцию sifting sink, которая определяет выходной путь, а затем записывает кортеж в соответствующий файл после его сериализации. Я поставил его здесь для справки, возможно, это полезно для кого-то еще:
public class SiftingSinkFunction<IT> extends RichSinkFunction<IT> {
private final OutputSelector<IT> outputSelector;
private final MapFunction<IT, String> serializationFunction;
private final String basePath;
Map<String, TextOutputFormat<String>> formats = new HashMap<>();
/**
* @param outputSelector the selector which determines into which output(s) a record is written.
* @param serializationFunction a function which serializes the record to a string.
* @param basePath the base path for writing the records. It will be appended with the output selector.
*/
public SiftingSinkFunction(OutputSelector<IT> outputSelector, MapFunction<IT, String> serializationFunction, String basePath) {
this.outputSelector = outputSelector;
this.serializationFunction = serializationFunction;
this.basePath = basePath;
}
@Override
public void invoke(IT value) throws Exception {
// find out where to write.
Iterable<String> selection = outputSelector.select(value);
for (String s : selection) {
// ensure we have a format for this.
TextOutputFormat<String> destination = ensureDestinationExists(s);
// then serialize and write.
destination.writeRecord(serializationFunction.map(value));
}
}
private TextOutputFormat<String> ensureDestinationExists(String selection) throws IOException {
// if we know the destination, we just return the format.
if (formats.containsKey(selection)) {
return formats.get(selection);
}
// create a new output format and initialize it from the context.
TextOutputFormat<String> format = new TextOutputFormat<>(new Path(basePath, selection));
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
format.configure(context.getTaskStubParameters());
format.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
// put it into our map.
formats.put(selection, format);
return format;
}
@Override
public void close() throws IOException {
Exception lastException = null;
try {
for (TextOutputFormat<String> format : formats.values()) {
try {
format.close();
} catch (Exception e) {
lastException = e;
format.tryCleanupOnError();
}
}
} finally {
formats.clear();
}
if (lastException != null) {
throw new IOException("Close failed.", lastException);
}
}
}
Ответы
Ответ 1
Вы можете реализовать пользовательскую раковину. Наследовать от одного из них:
-
org.apache.flink.streaming.api.functions.sink.SinkFunction
-
org.apache.flink.streaming.api.functions.sink.RichSinkFunction
В вашей программе используйте:
stream.addSink(SinkFunction<T> sinkFunction);
вместо stream.writeCsv("/output/somewhere")
.