Skip to content
Snippets Groups Projects
Commit 0f18b939 authored by Hao Wu's avatar Hao Wu
Browse files

init projects

parents
No related branches found
No related tags found
No related merge requests found
README 0 → 100644
pom.xml 0 → 100644
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>edu.illinois.cs.cogcomp</groupId>
<artifactId>s3-record-reader</artifactId>
<version>1.0-SNAPSHOT</version>
<repositories>
<repository>
<id>CogcompSoftware</id>
<name>CogcompSoftware</name>
<url>http://cogcomp.cs.illinois.edu/m2repo/</url>
</repository>
</repositories>
<dependencies>
<!--Curator deps-->
<dependency>
<groupId>edu.illinois.cs.cogcomp</groupId>
<artifactId>curator-interfaces</artifactId>
<version>0.7</version>
</dependency>
<!--Curator deps-->
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.5.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.5.8</version>
</dependency>
<!--Aws sdk-->
<!--for I/Os-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.1.3</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3.2</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package edu.illinois.cs.cogcomp.record_reader;
import edu.illinois.cs.cogcomp.thrift.curator.Record;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
/**
* Created by haowu on 11/3/14.
*/
public class RecordIOUtils {
public static Record deserializeRecord(byte[] bytes) throws TException {
Record rec = new Record();
TDeserializer td = new TDeserializer();
td.deserialize(rec, bytes);
return rec;
}
}
package edu.illinois.cs.cogcomp.record_reader;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import edu.illinois.cs.cogcomp.thrift.curator.Record;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.thrift.TException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import static edu.illinois.cs.cogcomp.record_reader.RecordIOUtils.deserializeRecord;
/**
* Created by haowu on 11/3/14.
*/
public class S3RecordBatch {
private List<S3ObjectSummary> s3Objects;
private AmazonS3 s3;
private String bucketName;
private Iterator<S3ObjectSummary> it;
public S3RecordBatch(AmazonS3 s3, String bucketName, List<S3ObjectSummary> s3Objects) {
this.s3Objects = s3Objects;
this.bucketName = bucketName;
this.s3 = s3;
this.it = this.s3Objects.iterator();
}
/**
* This function read and deserialize all the Record binary in this batch.
* @return
*/
public Pair<String,Record> readNextRecords() {
if(it.hasNext()){
S3ObjectSummary s3object = it.next();
String key = s3object.getKey();
S3Object object = s3.getObject(
new GetObjectRequest(bucketName, key));
InputStream objectData = object.getObjectContent();
try {
Record r = deserializeRecord(IOUtils.toByteArray(objectData));
Pair<String,Record> p = new ImmutablePair<String, Record>(key,r);
return p;
} catch (TException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
try {
objectData.close();
} catch (IOException e) {
e.printStackTrace();
}
}else{
return null;
}
return null;
}
public boolean hasNext(){
return this.it.hasNext();
}
}
package edu.illinois.cs.cogcomp.record_reader;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import java.util.List;
/**
* Created by haowu on 11/3/14.
*/
public class S3RecordReader {
private String bucketName;
private String prefix;
private ObjectListing currListing;
private AmazonS3 s3;
private ListObjectsRequest req;
public S3RecordReader(AmazonS3 s3,String bucketName) {
this(s3,bucketName,"");
}
/**
*
* @param s3 an Amazon S3 client,
* @param bucketName
* @param prefix
*/
public S3RecordReader(AmazonS3 s3,String bucketName, String prefix) {
this.s3=s3;
this.bucketName = bucketName;
this.prefix = prefix;
this.currListing = null;
this.connect();
}
/**
* init the object listing.
*/
private void connect(){
this.req = new ListObjectsRequest();
req.setBucketName(bucketName);
req.setPrefix(prefix);
ObjectListing rl = this.s3.listObjects(req);
this.currListing = rl;
req.setMarker(rl.getNextMarker());
}
/**
* Gives out 1000 records at once, until finish.
*/
public S3RecordBatch nextBatch(){
// If it is correctly initiated.
if(this.currListing != null){
ObjectListing oldListing = this.currListing;
// If we have the next batches.
if(this.currListing.isTruncated()){
List<S3ObjectSummary> l = this.currListing.getObjectSummaries();
this.currListing = this.s3.listObjects(req);
}else{
this.currListing = null;
}
return new S3RecordBatch(s3,bucketName,oldListing.getObjectSummaries());
}else{
// Return null if no more batch to get.
return null;
}
}
public String getPrefix() {
return prefix;
}
public void setPrefix(String prefix) {
this.prefix = prefix;
}
public String getBucketName() {
return bucketName;
}
public void setBucketName(String bucketName) {
this.bucketName = bucketName;
}
}
package example;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3Client;
import edu.illinois.cs.cogcomp.record_reader.S3RecordBatch;
import edu.illinois.cs.cogcomp.record_reader.S3RecordReader;
import edu.illinois.cs.cogcomp.thrift.base.Labeling;
import edu.illinois.cs.cogcomp.thrift.base.Span;
import edu.illinois.cs.cogcomp.thrift.curator.Record;
import org.apache.commons.lang3.tuple.Pair;
/**
* Created by haowu on 11/3/14.
*/
public class Example {
public static void main(String[] args) {
AmazonS3Client s3 = new AmazonS3Client(new BasicAWSCredentials("AKIAJSBHOGAQ5M4DHH2Q", "jnttSRxpJVmWxmwBPsWcOEz+ircsf16ERlboinep"));
//
// S3RecordReader reader = new S3RecordReader(s3, "BUCKET NAME", "result_coll/"+ corpora name);
// This is the correct address for wikipedia data. (use result_coll/wiki-coref for coref data. )
S3RecordReader reader = new S3RecordReader(s3, "curator-processing-wiki", "result_coll/wikipedia");
// One Batch is 1000 document
S3RecordBatch b = reader.nextBatch();
// You can get several batches and process then in parallel.
// S3RecordBatch b2 = reader.nextBatch();
// Listing is fast, but still take some time.
System.out.println(" Finished Listing objects");
while(b.hasNext()){
// Read next record is time consuming because it will download things from AWS.
Pair<String,Record> p = b.readNextRecords();
// The string (left) is the full path in s3 of the document, which contains its document id.
printRecord(p.getLeft(), p.getRight());
}
}
public static void printRecord(String path, Record r) {
System.out.println("Data at " + path);
for (String k : r.getLabelViews().keySet()) {
System.out.println("has the following labeling views " + k);
}
if(r.getLabelViews().containsKey("wikifier")){
System.out.println("Wikifier titles:" );
Labeling l = r.getLabelViews().get("wikifier");
for(Span s : l.getLabels()){
System.out.println(s.toString());
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment