Как использовать искровой Java API для чтения двоичного потока файлов из HDFS?
Я пишу компонент, который должен получить новый двоичный файл на определенном пути HDFS, чтобы я мог делать онлайн-обучение на основе этих данных. Итак, я хочу прочитать двоичный файл, созданный Flume из HDFS в потоке. Я нашел несколько функций, предоставляемых искровым API, например
public JavaDStream<byte[]> binaryRecordsStream(String directory,int recordLength)
и
public <K,V,F extends org.apache.hadoop.mapreduce.InputFormat<K,V>>
JavaPairInputDStream<K,V> fileStream(String directory, Class<K> kClass, Class<V> vClass, Class<F> fClass)
Но я действительно не знаю, как использовать эти функции. Я пробовал binaryRecordStream
, но он определяет конкретную длину файла, поэтому это не хорошо.
Для функции fileStream
я использовал:
SparkConf sparkConf = new SparkConf().setAppName("SparkFileStreamTest").setMaster("local[2]");
// Create the context with the specified batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(durationInMillis));
JavaPairInputDStream<LongWritable, BytesWritable> inputDStream = jssc.fileStream(hdfsPath, LongWritable.class, BytesWritable.class, CustomInputFormat.class);
//**********************************************************************
JavaPairInputDStream<LongWritable, BytesWritable> inputDStream = jssc.fileStream(
hdfsPath, LongWritable.class, BytesWritable.class, CustomInputFormat.class);
JavaDStream<byte[]> content = inputDStream.map(new Function<Tuple2<LongWritable, BytesWritable>, byte[]>() {
@Override
public byte[] call(Tuple2<LongWritable, BytesWritable> tuple2) {
System.out.println("----------------[testReadFileStreamFromHDFS] ENTER ......");
if (tuple2 == null) {
System.out.println("----------------[testReadFileStreamFromHDFS] TUPLE = NULL");
System.out.println("----------------[testReadFileStreamFromHDFS] END.");
return null;
}
else {
System.out.println("----------------[testReadFileStreamFromHDFS] KEY = [" + tuple2._1().toString() + "]");
System.out.println("----------------[testReadFileStreamFromHDFS] VAL-LENGTH = [" + tuple2._2().getBytes().length + "]");
System.out.println("----------------[testReadFileStreamFromHDFS] END.");
return tuple2._2().getBytes();
}
}
});
/***********************************************************************/
if (content == null) {
System.out.println("----------------[testReadFileStreamFromHDFS] CONTENT = NULL");
}
else {
System.out.println("----------------[testReadFileStreamFromHDFS] CONTENT-length = [" + content.count());
content.print();
}
System.out.println("----------------[testReadFileStreamFromHDFS] END-111.");
jssc.start();
jssc.awaitTermination();
System.out.println("----------------[testReadFileStreamFromHDFS] END-222.");
В CustomInputFormat
я создал
public class CustomInputFormat extends FileInputFormat<LongWritable, BytesWritable> {
private CustomInputSplit mInputSplit;
public CustomInputFormat() {
mInputSplit = new CustomInputSplit();
}
@Override
public List<InputSplit> getSplits(JobContext context)
throws IOException {
System.out.println("----------------[CustomInputFormat] 1111 ......");
final ArrayList<InputSplit> result = new ArrayList<InputSplit>();
result.add(mInputSplit);
System.out.println("----------------[CustomInputFormat] 2222 ......");
return result;
}
@Override
public RecordReader<LongWritable, BytesWritable> createRecordReader(
InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
System.out.println("----------------[CustomInputFormat] 3333 ......");
System.out.println("----------------[CustomInputFormat] ENTER createRecordReader, inputSplit-length = ["
+ inputSplit.getLength() + "]");
mInputSplit.init(inputSplit);
System.out.println("----------------[CustomInputFormat] 4444 ......");
return new CustomRecordReader();
}
@Override
protected boolean isSplitable(JobContext context, Path filename) {
System.out.println("----------------[CustomInputFormat] 5555 ......");
return false;
}
public class CustomRecordReader extends RecordReader<LongWritable, BytesWritable> {
private BytesWritable mValues;
private int mCursor;
public CustomRecordReader() {
System.out.println("----------------[CustomRecordReader] 1111 ......");
mValues = null;
mCursor = 0;
System.out.println("----------------[CustomRecordReader] 2222 ......");
}
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
System.out.println("----------------[CustomRecordReader] 3333 ......");
CustomInputSplit customInputSplit = (CustomInputSplit) inputSplit;
mValues = customInputSplit.getValues();
System.out.println("----------------[CustomRecordReader] 4444 ......");
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
System.out.println("----------------[CustomRecordReader] 5555 ......");
boolean existNext = (mCursor == 0);
mCursor++;
System.out.println("----------------[CustomRecordReader] 6666 ......");
return existNext;
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
System.out.println("----------------[CustomRecordReader] 7777 ......");
return new LongWritable(0);
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
System.out.println("----------------[CustomRecordReader] 8888 ......");
return mValues;
}
@Override
public float getProgress() throws IOException, InterruptedException {
System.out.println("----------------[CustomRecordReader] 9999 ......");
return 0;
}
@Override
public void close() throws IOException {
System.out.println("----------------[CustomRecordReader] AAAA ......");
mValues = null;
}
}
public class CustomInputSplit extends InputSplit implements Writable {
private long mLength;
private String[] mLocations;
private final BytesWritable mContent;
public CustomInputSplit() {
System.out.println("----------------[CustomInputSplit] 1111 ......");
mLength = 0;
mLocations = null;
mContent = new BytesWritable();
System.out.println("----------------[CustomInputSplit] 2222 ......");
}
public void init(InputSplit inputSplit) throws IOException, InterruptedException {
System.out.println("----------------[CustomInputSplit] 3333 ......");
mLength = inputSplit.getLength();
String[] locations = inputSplit.getLocations();
if (locations != null) {
int numLocations = locations.length;
mLocations = new String[numLocations];
for (int i = 0; i < numLocations; i++) {
mLocations[i] = locations[i];
}
}
System.out.println("----------------[CustomInputSplit] 4444 ......");
}
@Override
public long getLength() throws IOException, InterruptedException {
System.out.println("----------------[CustomInputSplit] 5555 ......");
return mLength;
}
@Override
public String[] getLocations() throws IOException, InterruptedException {
if (mLocations == null) {
System.out.println("----------------[CustomInputSplit] 6666-0001 ...... mLocations = [NULL]");
mLocations = new String[] {"localhost"};
}
System.out.println("----------------[CustomInputSplit] 6666-0002 ...... mLocations-length = [" + mLocations.length + "]");
return mLocations;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
System.out.println("----------------[CustomInputSplit] 7777 ......");
mContent.write(dataOutput);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
System.out.println("----------------[CustomInputSplit] 8888 ......");
mContent.readFields(dataInput);
}
public BytesWritable getValues() {
System.out.println("----------------[CustomInputSplit] 9999 ......");
return mContent;
}
}
Но когда я печатаю:
System.out.println("----------------[testReadFileStreamFromHDFS] VAL-LENGTH = [" + tuple2._2().getBytes().length + "]");
Я всегда получаю 0 length:
----------------[testReadFileStreamFromHDFS] VAL-LENGTH = [0]
Есть ли проблемы с CustomerInputFormat.class
? Кто-нибудь знает, как использовать Java API Spark stream для чтения двоичного файла из HDFS?
Ответы
Ответ 1
попробуйте это
JavaStreamingContext context
JavaSparkContext jContext = context.sparkContext();
JavaPairRDD<String, PortableDataStream> rdd = jContext.binaryFiles(fsURI + directoryPath);
JavaRDD<Object> rdd1 = rdd.map(new Function<Tuple2<String, PortableDataStream>, Object>() {
private static final long serialVersionUID = -7894402430221488712L;
@Override
public Object call(Tuple2<String, PortableDataStream> arg0) throws Exception {
byte[] imageInByte = arg0._2().toArray();
String base64Encoded = DatatypeConverter.printBase64Binary(imageInByte);
return (arg0._1 + Constants.COMMA_DELIMITER + base64Encoded).getBytes();
}
});
java.util.Queue<JavaRDD<Object>> queue = new LinkedList();
queue.add(rdd1);
JavaDStream<Object> dStream = context.queueStream(queue);
Единственным ограничением в этом аппаратном обеспечении является то, что он не сможет читать новые файлы из HDFS, созданные после запуска этого конвейера.
Ответ 2
Используйте этот подход:
Напишите пользовательский ресивер:
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import javax.xml.bind.DatatypeConverter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.input.PortableDataStream;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.receiver.Receiver;
class DFSReceiver extends Receiver<byte[]> {
/** The Constant serialVersionUID. */
private static final long serialVersionUID = -1051061769769056605L;
Long windowSize = 20000l;
/** Instantiates a new RMQ receiver. */
DFSReceiver() {
super(StorageLevel.MEMORY_AND_DISK_SER_2());
}
@Override
public void onStart() {
System.out.println("Inside onStart method");
new Thread() {
@Override
public void run() {
try {
receive();
}
} catch (Exception e) {
e.printStackTrace();
LOGGER.error("Exception raised at DFSReceiverHelper , exception : " + e);
}
}
}.start();
}
/** Receive.
*
* @throws Exception
* the exception */
protected void receive() throws Exception {
try {
ConnectionMetadata connectionMetadata = ConnectionMetadataFactory.getConnectionMetadataObj(ConnectionConstants.HDFS_DATA_STORE);
String connectionId = connectionMetadata.getConnectionId(ConnectionConstants.HDFS_DATA_STORE, connectionName);
ConnectionMetaDataDTO c = connectionMetadata.getConnectionMetaDataById(connectionId);
Map<String, Object> map = connectionMetadata.getConnectionConfigParameters(c);
FileSystem fs = HDFSUtils.getFileSystemInstance(map);
JavaPairRDD<String, PortableDataStream> rdd = sparkContext.binaryFiles(fsURI + directoryPath);
List<Tuple2<String, PortableDataStream>> rddList = rdd.collect();
for (Tuple2<String, PortableDataStream> arg0 : rddList) {
byte[] imageInByte = arg0._2().toArray();
String base64Encoded = DatatypeConverter.printBase64Binary(imageInByte);
store((arg0._1 + Constants.COMMA_DELIMITER + base64Encoded).getBytes());
}
Long time = System.currentTimeMillis();
System.out.println();
Thread.currentThread().sleep(windowSize);
while (true) {
List<Path> newFiles = checkIfNewFileCreated(fs, new Path(fsURI + directoryPath), time);
for (Path p : newFiles) {
JavaPairRDD<String, PortableDataStream> rdd11 = sparkContext.binaryFiles(p.toString());
Tuple2<String, PortableDataStream> arg0 = rdd11.first();
byte[] imageInByte = arg0._2().toArray();
String base64Encoded = DatatypeConverter.printBase64Binary(imageInByte);
store((arg0._1 + Constants.COMMA_DELIMITER + base64Encoded).getBytes());
}
Thread.currentThread().sleep(windowSize);
time += windowSize;
}
} catch (ShutdownSignalException s) {
LOGGER.error("ShutdownSignalException raised in receive method of DFSReceiver", s);
}
}
private List<Path> checkIfNewFileCreated(FileSystem fs, Path p, Long timeStamp) throws IOException {
List<Path> fileList = new ArrayList<>();
if (fs.isDirectory(p)) {
FileStatus[] fStatus = fs.listStatus(p);
for (FileStatus status : fStatus) {
if (status.isFile() && timeStamp < status.getModificationTime() && timeStamp + windowSize >= status.getModificationTime()) {
fileList.add(status.getPath());
}
}
}
return fileList;
}
@Override
public void onStop() {
}
}
С помощью этого ресивера вы сможете читать вновь созданные файлы также каждые 20 секунд.