Запустите Apache Flink с помощью Amazon S3
Помогает ли кто-нибудь использовать Apache Flink 0.9 для обработки данных, хранящихся на AWS S3? Я обнаружил, что они используют собственную S3FileSystem вместо одной из Hadoop... и похоже, что она не работает. Я поместил следующий путь s3://bucket.s3.amazonaws.com/folder, у которого получилось следующее исключение:
java.io.IOException: Не удается установить соединение с Amazon S3: com.amazonaws.services.s3.model.AmazonS3Exception: Подписанная нами подпись запроса не соответствует предоставленной вами подписке. Проверьте свой ключ и метод подписи. (Сервис: Amazon S3; Код состояния: 403;
Ответы
Ответ 1
Обновление до 2016 года: документация Flink теперь содержит страницу о том, как использовать Flink с AWS
Вопрос также задан в списке рассылки пользователей Flink, и я ответил на него там: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Processing-S3- данных с-Apache-FLiNK-td3046.html
ТЛ; др:
Программа Flink
public class S3FileSystem {
public static void main(String[] args) throws Exception {
ExecutionEnvironment ee = ExecutionEnvironment.createLocalEnvironment();
DataSet<String> myLines = ee.readTextFile("s3n://my-bucket-name/some-test-file.xml");
myLines.print();
}
}
Добавьте следующее в файл core-site.xml и сделайте его доступным для Flink:
<property>
<name>fs.s3n.awsAccessKeyId</name>
<value>putKeyHere</value>
</property>
<property>
<name>fs.s3n.awsSecretAccessKey</name>
<value>putSecretHere</value>
</property>
<property>
<name>fs.s3n.impl</name>
<value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
</property>
Ответ 2
вы можете получить артефакты из ведра S3, который указан в разделе вывода шаблона CloudFormation. т.е. после запуска и запуска флинка программа обработки потока такси может быть отправлена во время выполнения Flink для запуска анализа в реальном времени событий поездки в потоке Amazon Kinesis.
$ aws s3 cp s3://«artifact-bucket»/artifacts/flink-taxi-stream-processor-1.0.jar .
$ flink run -p 8 flink-taxi-stream-processor-1.0.jar --region «AWS region» --stream «Kinesis stream name» --es-endpoint https://«Elasticsearch endpoint»
Обе вышеупомянутые команды используют Amazon S3 в качестве источника, вы должны указать имя артефакта соответственно.
Примечание. Вы можете перейти по ссылке ниже и создать конвейер, используя ведра EMR и S3.
https://aws.amazon.com/blogs/big-data/build-a-real-time-stream-processing-pipeline-with-apache-flink-on-aws/