Optimize LineRecordReader (#419)

* Move locations initialization for file input splits to initialize method
* Little optimization for schema detection regular expression

Signed-off-by: Paul Dubs <paul.dubs@gmail.com>
master
Paul Dubs 2020-04-28 02:38:31 +02:00 committed by GitHub
parent c9d1454743
commit b9d5f1645b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 9 deletions

View File

@ -60,6 +60,13 @@ public class LineRecordReader extends BaseRecordReader {
@Override @Override
public void initialize(InputSplit split) throws IOException, InterruptedException { public void initialize(InputSplit split) throws IOException, InterruptedException {
super.initialize(split); super.initialize(split);
if(!(inputSplit instanceof StringSplit || inputSplit instanceof InputStreamInputSplit)){
final ArrayList<URI> uris = new ArrayList<>();
final Iterator<URI> uriIterator = inputSplit.locationsIterator();
while(uriIterator.hasNext()) uris.add(uriIterator.next());
this.locations = uris.toArray(new URI[0]);
}
this.iter = getIterator(0); this.iter = getIterator(0);
this.initialized = true; this.initialized = true;
} }
@ -68,7 +75,6 @@ public class LineRecordReader extends BaseRecordReader {
public void initialize(Configuration conf, InputSplit split) throws IOException, InterruptedException { public void initialize(Configuration conf, InputSplit split) throws IOException, InterruptedException {
this.conf = conf; this.conf = conf;
initialize(split); initialize(split);
this.initialized = true;
} }
@Override @Override
@ -207,11 +213,6 @@ public class LineRecordReader extends BaseRecordReader {
} }
} }
} else { } else {
final ArrayList<URI> uris = new ArrayList<>();
final Iterator<URI> uriIterator = inputSplit.locationsIterator();
while(uriIterator.hasNext()) uris.add(uriIterator.next());
this.locations = uris.toArray(new URI[uris.size()]);
if (locations.length > 0) { if (locations.length > 0) {
InputStream inputStream = streamCreatorFn.apply(locations[location]); InputStream inputStream = streamCreatorFn.apply(locations[location]);
try { try {

View File

@ -23,6 +23,7 @@ import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Iterator; import java.util.Iterator;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.regex.Pattern;
/** /**
* A simple utility method to convert a {@code Iterator<String>} to an {@code Iterator<URI>}, where each * A simple utility method to convert a {@code Iterator<String>} to an {@code Iterator<URI>}, where each
@ -32,6 +33,7 @@ import java.util.NoSuchElementException;
*/ */
@AllArgsConstructor @AllArgsConstructor
public class UriFromPathIterator implements Iterator<URI> { public class UriFromPathIterator implements Iterator<URI> {
final Pattern schemaPattern = Pattern.compile("^.*?:/.*");
private final Iterator<String> paths; private final Iterator<String> paths;
@ -42,16 +44,17 @@ public class UriFromPathIterator implements Iterator<URI> {
@Override @Override
public URI next() { public URI next() {
if (!hasNext()) { if (!hasNext()) {
throw new NoSuchElementException("No next element"); throw new NoSuchElementException("No next element");
} }
try { try {
String s = paths.next(); String s = paths.next();
if(!s.matches(".*:/.*")){ if(schemaPattern.matcher(s).matches()){
return new URI(s);
} else {
//No scheme - assume file for backward compatibility //No scheme - assume file for backward compatibility
return new File(s).toURI(); return new File(s).toURI();
} else {
return new URI(s);
} }
} catch (URISyntaxException e) { } catch (URISyntaxException e) {