Запустите Apache Flink с помощью Amazon S3

Помогает ли кто-нибудь использовать Apache Flink 0.9 для обработки данных, хранящихся на AWS S3? Я обнаружил, что они используют собственную S3FileSystem вместо одной из Hadoop... и похоже, что она не работает. Я поместил следующий путь s3://bucket.s3.amazonaws.com/folder, у которого получилось следующее исключение:

java.io.IOException: Не удается установить соединение с Amazon S3: com.amazonaws.services.s3.model.AmazonS3Exception: Подписанная нами подпись запроса не соответствует предоставленной вами подписке. Проверьте свой ключ и метод подписи. (Сервис: Amazon S3; Код состояния: 403;

Ответы

Ответ 1

Обновление до 2016 года: документация Flink теперь содержит страницу о том, как использовать Flink с AWS


Вопрос также задан в списке рассылки пользователей Flink, и я ответил на него там: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Processing-S3- данных с-Apache-FLiNK-td3046.html

ТЛ; др:

Программа Flink

public class S3FileSystem {
   public static void main(String[] args) throws Exception {
      ExecutionEnvironment ee = ExecutionEnvironment.createLocalEnvironment();
      DataSet<String> myLines = ee.readTextFile("s3n://my-bucket-name/some-test-file.xml");
      myLines.print();
   }
}

Добавьте следующее в файл core-site.xml и сделайте его доступным для Flink:

<property>
    <name>fs.s3n.awsAccessKeyId</name>
    <value>putKeyHere</value>
</property>

<property>
    <name>fs.s3n.awsSecretAccessKey</name>
    <value>putSecretHere</value>
</property>
<property>
    <name>fs.s3n.impl</name>
    <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
</property>

Ответ 2

вы можете получить артефакты из ведра S3, который указан в разделе вывода шаблона CloudFormation. т.е. после запуска и запуска флинка программа обработки потока такси может быть отправлена во время выполнения Flink для запуска анализа в реальном времени событий поездки в потоке Amazon Kinesis.

$ aws s3 cp s3://«artifact-bucket»/artifacts/flink-taxi-stream-processor-1.0.jar .
$ flink run -p 8 flink-taxi-stream-processor-1.0.jar --region «AWS region» --stream «Kinesis stream name» --es-endpoint https://«Elasticsearch endpoint»

Обе вышеупомянутые команды используют Amazon S3 в качестве источника, вы должны указать имя артефакта соответственно.

Примечание. Вы можете перейти по ссылке ниже и создать конвейер, используя ведра EMR и S3.

https://aws.amazon.com/blogs/big-data/build-a-real-time-stream-processing-pipeline-with-apache-flink-on-aws/