Skip to content
Snippets Groups Projects
Commit ef7b0301 authored by kamenon2's avatar kamenon2
Browse files

s3 bucket downloader. kafka integration pending

parent 817222d8
No related branches found
No related tags found
No related merge requests found
Showing
with 400 additions and 14 deletions
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
</build> </build>
<dependencies> <dependencies>
<!-- spring jars -->
<dependency> <dependency>
<groupId>org.springframework</groupId> <groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId> <artifactId>spring-core</artifactId>
...@@ -50,13 +51,44 @@ ...@@ -50,13 +51,44 @@
<groupId>org.springframework.cloud</groupId> <groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-messaging</artifactId> <artifactId>spring-cloud-aws-messaging</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-autoconfigure</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId> <artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- common jars -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<!-- hadoop jars -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>jdk1.8.0_181</version>
<scope>system</scope>
<systemPath>C:/Program Files/Java/jdk1.8.0_181/lib/tools.jar</systemPath>
</dependency>
<!-- postgresql jars -->
<dependency> <dependency>
<groupId>org.postgresql</groupId> <groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId> <artifactId>postgresql</artifactId>
......
package com.mcsds.cc.openaq.aws.provider;
import com.amazonaws.auth.AWSCredentials;
public interface AuthProvider {
static String DEFAULT_PROVIDER = "default";
AWSCredentials getAWSCredentials(String provider);
}
package com.mcsds.cc.openaq.aws.provider;
import org.springframework.core.io.Resource;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.sqs.AmazonSQS;
public interface DataProvider {
AmazonS3 s3Initializer ();
AmazonSQS sqsInitializer ();
Resource s3ResourceLoader ();
}
package com.mcsds.cc.openaq.aws.provider;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
@Configuration
public class OpenAQAuthenticationProvider implements AuthProvider {
private Logger logger = Logger.getLogger(this.getClass());
static final String DEFAULT_VAL = "default";
@Bean
@Value(DEFAULT_VAL)
public AWSCredentials getAWSCredentials(String provider) {
AWSCredentials credentials = null;
logger.info("Initializing AWS Credentials ...");
try {
credentials = new ProfileCredentialsProvider(provider != null ? provider : DEFAULT_PROVIDER)
.getCredentials();
} catch (Exception e) {
throw new AmazonClientException("Cannot load the credentials from the credential profiles file. "
+ "Please make sure that your credentials file is at the correct "
+ "location (C:\\Users\\kajay\\.aws\\credentials), and is in valid format.", e);
}
logger.info("AWS Credentials Initialized Successfully ...");
return credentials;
}
}
package com.mcsds.cc.openaq.aws.provider;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.stereotype.Component;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.sqs.AmazonSQS;
/*
* S3 Bucket manual download options...
* Option 1. aws cli pull command : aws s3 cp s3://openaq-fetches . --recursive
* Option 2. aws s3 sync s3://openaq-fetches .
* Option 3. aws s3 sync s3://openaq-fetches/realtime realtime/.
*
* Note : Option 3 is better since we are concerned only with realtime data.
*
*/
@Component
public class OpenAQDataProvider implements DataProvider {
private Logger logger = Logger.getLogger(this.getClass());
@Value("us-east-1")
public String region;
@Value("openaq-fetches")
public String bucket;
@Value("realtime")
public String filter;
@Value("realtime")
public String key;
@Autowired
public ResourceLoader resourceLoader;
@Autowired
public AWSCredentials credentials;
static final String DEFAULT_OPENAQ_BUCKET_ARN = "arn:aws:s3:::openaq-fetches";
static final String DEFAULT_OPENAQ_NEW_MEAS_ARN = "arn:aws:sns:us-east-1:470049585876:OPENAQ_NEW_MEASUREMENT";
static final String DEFAULT_OPENAQ_NEW_FETCH_ARN = "arn:aws:sns:us-east-1:470049585876:NewFetchObject";
static final String DEFAULT_OPENAQ_BUCKET_NAME = "openaq-fetches";
static final String DEFAULT_OPENAQ_NEW_MEAS_NAME = "OPENAQ_NEW_MEASUREMENT";
static final String DEFAULT_OPENAQ_NEW_FETCH_NAME = "NewFetchObject";
// S3 ...
@Bean
public AmazonS3 s3Initializer() {
AmazonS3 amazonS3 = AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(credentials)).withRegion(region).build();
return amazonS3;
}
@Bean
public Resource s3ResourceLoader() {
StringBuilder resourceVal = new StringBuilder().append("s3://").append(bucket).append("/").append(filter);
Resource resource = this.resourceLoader.getResource(resourceVal.toString());
return resource;
}
public AmazonSQS sqsInitializer() {
// TODO Auto-generated method stub
return null;
}
}
package com.mcsds.cc.openaq.config;
public class Message {
}
package com.mcsds.cc.openaq.constants;
public class DataConstants {
private enum DATA_TYPE {
DAILY("daily"),
REALTIME("realtime"),
REALTIME_GZ("realtime-gzipped"),
TEST_REALTIME("test-realtime"),
TEST_REAL_EVENTS("test-realtime-events"),
TEST_REAL_GZ("test-realtime-gzip");
private final String dataType;
DATA_TYPE (String dataType) {
this.dataType = dataType;
}
public String getDataType() {
return dataType;
}
}
}
...@@ -2,13 +2,15 @@ package com.mcsds.cc.openaq.main; ...@@ -2,13 +2,15 @@ package com.mcsds.cc.openaq.main;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.cloud.aws.autoconfigure.context.ContextRegionProviderAutoConfiguration; import org.springframework.cloud.aws.autoconfigure.context.ContextRegionProviderAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.ImportResource; import org.springframework.context.annotation.ImportResource;
@SpringBootApplication(exclude = {ContextRegionProviderAutoConfiguration.class,DataSourceAutoConfiguration.class }) //“convention over configuration.”
//@SpringBootApplication
@SpringBootApplication(exclude = {ContextRegionProviderAutoConfiguration.class })
@ImportResource("classpath:aws-config.xml") @ImportResource("classpath:aws-config.xml")
@ComponentScan("com.mcsds.cc.openaq")
public class OpenAQMainApplication { public class OpenAQMainApplication {
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(OpenAQMainApplication.class, args); SpringApplication.run(OpenAQMainApplication.class, args);
......
package com.mcsds.cc.openaq.s3;
import java.io.IOException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.util.IOUtils;
@RestController
public class S3OpenAQBucketReader {
@Value("us-east-1")
private String region;
@Value("openaq-fetches")
private String bucket;
@Value("realtime")
private String filter;
@Autowired
private AmazonS3 s3Initializer;
@RequestMapping("/download")
public ResponseEntity<String> download(@RequestParam("type") String type, @RequestParam("date") String date)
throws IOException {
StringBuilder keyBuilder = new StringBuilder().append(type).append("/").append(date).append("/");
String key = keyBuilder.toString();
ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(bucket).withPrefix(key).withMaxKeys(2);
ListObjectsV2Result result;
HttpHeaders httpHeaders = new HttpHeaders();
List<String> fileNames = new ArrayList<String>();
do {
result = s3Initializer.listObjectsV2(req);
for (S3ObjectSummary objectSummary : result.getObjectSummaries()) {
System.out.printf(" - %s (size: %d)\n", objectSummary.getKey(), objectSummary.getSize());
GetObjectRequest getObjectRequest = new GetObjectRequest(bucket, objectSummary.getKey());
S3Object s3Object = s3Initializer.getObject(getObjectRequest);
S3ObjectInputStream objectInputStream = s3Object.getObjectContent();
byte[] bytes = IOUtils.toByteArray(objectInputStream);
String fileName = URLEncoder.encode(key, "UTF-8").replaceAll("\\+", "%20");
fileNames.add(fileName);
httpHeaders.setContentType(MediaType.APPLICATION_OCTET_STREAM);
httpHeaders.setContentLength(bytes.length);
httpHeaders.setContentDispositionFormData("attachment", fileName);
//TODO: Integrate with Kafka...
}
String token = result.getNextContinuationToken();
System.out.println("Next Continuation Token: " + token);
req.setContinuationToken(token);
} while (result.isTruncated());
return new ResponseEntity<String>("Data pushed to Kafka successfully", HttpStatus.OK);
}
}
...@@ -17,7 +17,7 @@ public class OpenAQNewFetchSQS { ...@@ -17,7 +17,7 @@ public class OpenAQNewFetchSQS {
private static final Logger logger = LoggerFactory.getLogger(OpenAQNewFetchSQS.class); private static final Logger logger = LoggerFactory.getLogger(OpenAQNewFetchSQS.class);
static final String QUEUE_NAME = "jvmcoders-openaq-newfetchobject"; static final String QUEUE_NAME = "OPENAQ_NEW_MEASUREMENT";
/* /*
* CountDownLatch is added to wait for messages * CountDownLatch is added to wait for messages
......
...@@ -17,7 +17,7 @@ public class OpenAQNewMeasurementsSQS { ...@@ -17,7 +17,7 @@ public class OpenAQNewMeasurementsSQS {
private static final Logger logger = LoggerFactory.getLogger(OpenAQNewMeasurementsSQS.class); private static final Logger logger = LoggerFactory.getLogger(OpenAQNewMeasurementsSQS.class);
static final String QUEUE_NAME = "jvmcoders-openaq-new-measurements"; static final String QUEUE_NAME = "NewFetchObject";
/* /*
* CountDownLatch is added to wait for messages * CountDownLatch is added to wait for messages
......
cloud.aws.credentials.accessKey=AKIAIOU5JEGT5CDHXKVQ #cloud.aws.credentials.accessKey=<<YOUR_KEY>>
cloud.aws.credentials.secretKey=wnzw+wEe0phOAA/cMvQRfuNWNCvnQrnaE2B2KQgx #cloud.aws.credentials.secretKey=<<YOUR_SECRET>>
cloud.aws.region.static=us-east-1 cloud.aws.region.static=us-east-1
# Disable auto cloudfromation # Disable auto cloudfromation
cloud.aws.stack.auto=false cloud.aws.stack.auto=false
\ No newline at end of file cloud.aws.s3.bucket=openaq-fetches
cloud.aws.s3.bucket.filter=realtime
\ No newline at end of file
package com.mcsds.cc.openaq.test;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Paths;
import java.util.logging.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
public class HadoopFSUtil {
public static final String local_hdfsuri = "hdfs://localhost:9001/user/root/openaq";
public static final String remote_hdfsuri = "hdfs://192.168.1.191:9003/user/root/openaq";
public static final String dest_path = "user/root/openaq/realtime";
public static final String src_path = "H:/backup/openaq-data/realtime/2013-11-26/2013-11-26.ndjson";
private static final Logger logger = Logger.getLogger("com.mcsds.cc.openaq.test");
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", remote_hdfsuri);
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
System.setProperty("HADOOP_USER_NAME", "root");
String OS = System.getProperty("os.name").toLowerCase();
if (OS.contains("win")) {
System.setProperty("hadoop.home.dir", Paths.get("winutils").toAbsolutePath().toString());
} else {
System.setProperty("hadoop.home.dir", "/");
}
FSDataOutputStream out = null;
FSDataInputStream in = null;
try {
InputStream is = new BufferedInputStream(new FileInputStream(src_path));
FileSystem fs = FileSystem.get(URI.create(remote_hdfsuri), conf);
out = fs.create(new Path(dest_path), Boolean.TRUE);
logger.info("Begin Write file into hdfs");
IOUtils.copyBytes(is, out, conf);
logger.info("File read complete ..");
} catch (IOException e) {
e.printStackTrace();
}
}
}
File deleted
#Generated by Maven Integration for Eclipse #Generated by Maven Integration for Eclipse
#Sat Mar 30 11:31:53 CDT 2019 #Fri Apr 12 22:50:03 CDT 2019
version=0.0.1-SNAPSHOT version=0.0.1-SNAPSHOT
groupId=com.mcsds.cc.openaq groupId=com.mcsds.cc.openaq
m2e.projectName=OpenAQSQSListener m2e.projectName=OpenAQSQSListener
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
</build> </build>
<dependencies> <dependencies>
<!-- spring jars -->
<dependency> <dependency>
<groupId>org.springframework</groupId> <groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId> <artifactId>spring-core</artifactId>
...@@ -50,13 +51,44 @@ ...@@ -50,13 +51,44 @@
<groupId>org.springframework.cloud</groupId> <groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-messaging</artifactId> <artifactId>spring-cloud-aws-messaging</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-autoconfigure</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId> <artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- common jars -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<!-- hadoop jars -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>jdk1.8.0_181</version>
<scope>system</scope>
<systemPath>C:/Program Files/Java/jdk1.8.0_181/lib/tools.jar</systemPath>
</dependency>
<!-- postgresql jars -->
<dependency> <dependency>
<groupId>org.postgresql</groupId> <groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId> <artifactId>postgresql</artifactId>
......
cloud.aws.credentials.accessKey=AKIAIOU5JEGT5CDHXKVQ #cloud.aws.credentials.accessKey=<<YOUR_KEY>>
cloud.aws.credentials.secretKey=wnzw+wEe0phOAA/cMvQRfuNWNCvnQrnaE2B2KQgx #cloud.aws.credentials.secretKey=<<YOUR_SECRET>>
cloud.aws.region.static=us-east-1 cloud.aws.region.static=us-east-1
# Disable auto cloudfromation # Disable auto cloudfromation
cloud.aws.stack.auto=false cloud.aws.stack.auto=false
\ No newline at end of file cloud.aws.s3.bucket=openaq-fetches
cloud.aws.s3.bucket.filter=realtime
\ No newline at end of file
File added
File added
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment