Skip to content
Snippets Groups Projects
Commit 27f6a3af authored by kamenon2's avatar kamenon2
Browse files

s3 bucket/kafka integration completed

parent 817956cd
No related branches found
No related tags found
No related merge requests found
Showing
with 7381 additions and 36 deletions
......@@ -52,9 +52,13 @@
<artifactId>spring-cloud-aws-messaging</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-autoconfigure</artifactId>
</dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
......
This diff is collapsed.
package com.mcsds.cc.openaq.aws.provider;
import org.apache.kafka.clients.producer.Producer;
public interface KafkaProvider {
Producer<String, byte[]> createProducer();
}
......@@ -46,14 +46,6 @@ public class OpenAQDataProvider implements DataProvider {
@Autowired
public AWSCredentials credentials;
static final String DEFAULT_OPENAQ_BUCKET_ARN = "arn:aws:s3:::openaq-fetches";
static final String DEFAULT_OPENAQ_NEW_MEAS_ARN = "arn:aws:sns:us-east-1:470049585876:OPENAQ_NEW_MEASUREMENT";
static final String DEFAULT_OPENAQ_NEW_FETCH_ARN = "arn:aws:sns:us-east-1:470049585876:NewFetchObject";
static final String DEFAULT_OPENAQ_BUCKET_NAME = "openaq-fetches";
static final String DEFAULT_OPENAQ_NEW_MEAS_NAME = "OPENAQ_NEW_MEASUREMENT";
static final String DEFAULT_OPENAQ_NEW_FETCH_NAME = "NewFetchObject";
// S3 ...
@Bean
public AmazonS3 s3Initializer() {
......
package com.mcsds.cc.openaq.aws.provider;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import com.mcsds.cc.openaq.constants.DataConstants;
@Component
public class OpenAQKafkaProvider implements KafkaProvider {
@Bean(name="kafkaProducer")
public Producer<String, byte[]> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DataConstants.BOOTSTRAP_SERVERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, DataConstants.DEFAULT_OPENAQ_BUCKET_ARN);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "60000000");
return new KafkaProducer<String, byte[]>(props);
}
}
......@@ -2,6 +2,21 @@ package com.mcsds.cc.openaq.constants;
public class DataConstants {
public static final String DEFAULT_IP = "127.0.0.1";
public static final String REMOTE_MAC_IP = "192.168.1.191"; //24 running kafka, zookeeper, hbase, hdfs etc
public static final String REMOTE_WIN_IP_1 = "192.168.1.192"; //24 running kafka, zookeeper, hbase, hdfs etc
public static final String REMOTE_WIN_IP_2 = "192.168.1.77"; //24 running kafka, zookeeper alone
public final static String TOPIC = "openaq-stream-in-data-topic"; // this is topic where .json file will be pushed.
public final static String BOOTSTRAP_SERVERS = "192.168.1.191:9092"; // Change your kafka server:port accordingly.
public static final String DEFAULT_OPENAQ_BUCKET_ARN = "arn:aws:s3:::openaq-fetches";
public static final String DEFAULT_OPENAQ_NEW_MEAS_ARN = "arn:aws:sns:us-east-1:470049585876:OPENAQ_NEW_MEASUREMENT";
public static final String DEFAULT_OPENAQ_NEW_FETCH_ARN = "arn:aws:sns:us-east-1:470049585876:NewFetchObject";
public static final String DEFAULT_OPENAQ_BUCKET_NAME = "openaq-fetches";
public static final String DEFAULT_OPENAQ_NEW_MEAS_NAME = "OPENAQ_NEW_MEASUREMENT";
public static final String DEFAULT_OPENAQ_NEW_FETCH_NAME = "NewFetchObject";
private enum DATA_TYPE {
DAILY("daily"),
REALTIME("realtime"),
......@@ -18,6 +33,5 @@ public class DataConstants {
public String getDataType() {
return dataType;
}
}
}
package com.mcsds.cc.openaq.s3;
import java.io.File;
import java.io.IOException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
......@@ -24,10 +25,13 @@ import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.util.IOUtils;
import com.mcsds.cc.openaq.constants.DataConstants;
@RestController
public class S3OpenAQBucketReader {
private Logger logger = Logger.getLogger(this.getClass().getName());
@Value("us-east-1")
private String region;
......@@ -37,12 +41,18 @@ public class S3OpenAQBucketReader {
@Value("realtime")
private String filter;
@Autowired
@Value("true")
private String isKafkaEnabled;
@Autowired(required = true)
private AmazonS3 s3Initializer;
@Autowired(required = true)
private Producer<String, byte[]> kafkaProducer;
@RequestMapping("/download")
public ResponseEntity<String> download(@RequestParam("type") String type, @RequestParam("date") String date)
throws IOException {
throws IOException, InterruptedException, ExecutionException {
StringBuilder keyBuilder = new StringBuilder().append(type).append("/").append(date).append("/");
String key = keyBuilder.toString();
......@@ -50,32 +60,48 @@ public class S3OpenAQBucketReader {
ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(bucket).withPrefix(key).withMaxKeys(2);
ListObjectsV2Result result;
HttpHeaders httpHeaders = new HttpHeaders();
List<String> fileNames = new ArrayList<String>();
do {
result = s3Initializer.listObjectsV2(req);
for (S3ObjectSummary objectSummary : result.getObjectSummaries()) {
System.out.printf(" - %s (size: %d)\n", objectSummary.getKey(), objectSummary.getSize());
logger.info("Object summary key : " + objectSummary.getKey() + " , Size: " + objectSummary.getSize());
GetObjectRequest getObjectRequest = new GetObjectRequest(bucket, objectSummary.getKey());
S3Object s3Object = s3Initializer.getObject(getObjectRequest);
S3ObjectInputStream objectInputStream = s3Object.getObjectContent();
byte[] bytes = IOUtils.toByteArray(objectInputStream);
String fileName = URLEncoder.encode(key, "UTF-8").replaceAll("\\+", "%20");
fileNames.add(fileName);
httpHeaders.setContentType(MediaType.APPLICATION_OCTET_STREAM);
httpHeaders.setContentLength(bytes.length);
httpHeaders.setContentDispositionFormData("attachment", fileName);
/*
//Format 1: Have the content in file. No kafka serializer for this
//and hence if wanted this can be saved on the same machine or sent to HDFS for record keeping.
File file = new File(objectSummary.getKey());
FileUtils.copyInputStreamToFile(objectInputStream, file);
*/
//TODO: Integrate with Kafka...
//Format 2: Convert to a byte array and use the byte array serializer.
byte [] inputByteArray = IOUtils.toByteArray(objectInputStream);
// SEND TO KAFKA
if ("true".equalsIgnoreCase(isKafkaEnabled)) {
final ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(
DataConstants.TOPIC, inputByteArray);
try {
RecordMetadata metadata = kafkaProducer.send(record).get();
logger.info("sent record : " + record.key() + ", partition : " + metadata.partition()
+ ", metadata :" + metadata.offset());
} catch (InterruptedException e) {
logger.error("InterruptedException : ", e);
throw e;
} catch (ExecutionException e) {
logger.error("ExecutionException : ", e);
throw e;
}
}
}
String token = result.getNextContinuationToken();
System.out.println("Next Continuation Token: " + token);
logger.info("Next Continuation Token: " + token);
req.setContinuationToken(token);
} while (result.isTruncated());
......
#Generated by Maven Integration for Eclipse
#Fri Apr 12 22:50:03 CDT 2019
#Sat Apr 13 13:40:08 CDT 2019
version=0.0.1-SNAPSHOT
groupId=com.mcsds.cc.openaq
m2e.projectName=OpenAQSQSListener
......
......@@ -52,9 +52,13 @@
<artifactId>spring-cloud-aws-messaging</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-autoconfigure</artifactId>
</dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
......
File added
No preview for this file type
File added
No preview for this file type
No preview for this file type
No preview for this file type
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment