Skip to content
Snippets Groups Projects
Commit f729869f authored by ashagp2's avatar ashagp2
Browse files

Upload New File

parent 8cc097c8
No related branches found
No related tags found
No related merge requests found
import net.minidev.json.JSONObject;
import net.minidev.json.JSONValue;
import net.minidev.json.parser.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.math.RoundingMode;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.DecimalFormat;
import java.util.*;
public class CalculateAQIMain {
static String O3 = "O3";
static String NO2 = "NO2";
static String SO2 = "SO2";
static String CO = "CO";
static String PM25 = "PM25";
static String PM10 = "PM10";
static Map<String,Map<String,String>> breakPointsTable = new HashMap<>();
private static void calcAQI(String fileName, String outputPath) throws IOException {
String resultsDir = outputPath + File.separator + "AQIData";
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("AQI Calculator");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
JavaRDD<String> inputFile = sparkContext.textFile(fileName);
JavaRDD<String> linesFromFile = inputFile.flatMap(content -> Arrays.asList(content.split("\n")));
// linesFromFile.filter(new Function<String, Boolean>() {
// @Override
// public Boolean call(String v1) throws Exception {
// try {
// JSONObject json = (JSONObject) JSONValue.parseStrict(v1);
// String key = json.getAsString("country");
// if(key.equals("US")) {
// return true;
// }
// } catch (ParseException e) {
// e.printStackTrace();
// }
// return false;
// }
// }).saveAsTextFile("usLoc.txt");
JavaPairRDD<String, JSONObject> aggData = linesFromFile.mapToPair(new PairFunction<String, String, JSONObject>() {
public Tuple2<String, JSONObject> call(String s) {
//create a json object from the json string
try {
JSONObject json = (JSONObject) JSONValue.parseStrict(s);
JSONObject dateObj = (JSONObject) JSONValue.parseStrict(json.getAsString("date"));
String key = dateObj.getAsString("utc")+":"+json.getAsString("country")+":"+json.getAsString("city");
JSONObject jsonRet = new JSONObject();
//append the "averagingPeriod":{"unit":"hours","value":1} to the parameter - since some parameters like 03 are reported with 8 hour avg or 1 hour avg
//and we need to differentiate between the two
JSONObject avgPeriod = (JSONObject) JSONValue.parseStrict(json.getAsString("averagingPeriod"));
String paramKey = json.getAsString("parameter")+"-"+avgPeriod.get("value");
paramKey = paramKey.toUpperCase();
jsonRet.put(paramKey,json.getAsString("value"));
return new Tuple2<String, JSONObject>(key,jsonRet);
} catch (ParseException e) {
e.printStackTrace();
}
return null;
}
}).reduceByKey(new Function2<JSONObject, JSONObject, JSONObject>() {
@Override
public JSONObject call(JSONObject v1, JSONObject v2) throws Exception {
JSONObject jsonRet = new JSONObject(v1);
for(Map.Entry<String,Object> entry : v2.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if(jsonRet.containsKey(key)) {
//if the two keys are same, take the max of the values
Float val1 = Float.parseFloat(jsonRet.getAsString(key));
Float val2 = Float.parseFloat(value.toString());
Float maxVal = val1 > val2 ? val1 : val2;
value = maxVal;
}
jsonRet.put(key,value.toString());
}
return jsonRet;
}
});
// JavaPairRDD countData = linesFromFile.mapToPair(t -> new Tuple2(t, 1)).reduceByKey((x, y) -> (int) x + (int) y);
//Now run the actual calculation of AQI on the aggregated data
JavaPairRDD<String, JSONObject> aqiData = aggData.mapValues(new Function<JSONObject, JSONObject>() {
@Override
public JSONObject call(JSONObject v1) throws Exception {
JSONObject jsonRet = new JSONObject(v1);
calculateAQI(jsonRet);
return jsonRet;
}
});
aqiData.saveAsTextFile(resultsDir);
}
private static void calculateAQI(JSONObject jsonRet) {
//calculate the aqi
/**
* Ozone (ppm) – truncate to 3 decimal places
* PM2.5 (µg/m3) – truncate to 1 decimal place
* PM10 (µg/m3) – truncate to integer
* CO (ppm) – truncate to 1 decimal place
* SO2 (ppb) – truncate to integer
* NO2 (ppb) – truncate to integer
*/
DecimalFormat df = new DecimalFormat("####.#");
DecimalFormat df3 = new DecimalFormat("####.###");
df.setRoundingMode(RoundingMode.DOWN);
int aqiCalcVal = 0;
for(Map.Entry<String,Object> jentry : jsonRet.entrySet()) {
String key = jentry.getKey();
if(key.startsWith(O3) || key.startsWith(CO) || key.startsWith(NO2) || key.startsWith(PM10) || key.startsWith(PM25) || key.startsWith(SO2))
{
Float fVal = Float.parseFloat(df.format(Float.parseFloat(jentry.getValue().toString())));
if(key.startsWith(O3)) {
fVal = Float.parseFloat(df3.format(Float.parseFloat(jentry.getValue().toString())));
}
if(key.startsWith(PM10) || key.startsWith(SO2) || key.startsWith(NO2)) {
fVal = Float.parseFloat(jentry.getValue().toString());
fVal = (float)fVal.intValue();
}
Map<String, String> secMap = breakPointsTable.get(key);
String aqiVal = null;
Float BpHi = null, BpLo = null;
for (Map.Entry<String, String> entry : secMap.entrySet()) {
if (entry.getKey().equalsIgnoreCase("NA")) {
continue;
}
String[] parts = entry.getKey().split("-");
BpLo = Float.parseFloat(parts[0]);
BpHi = Float.parseFloat(parts[1]);
if (fVal >= BpLo && fVal <= BpHi) {
//Found the aqi value for this parameter
aqiVal = entry.getValue();
break;
}
}
if (aqiVal != null) {
//use the formula:
/**
* Equation 1:
*
* Ip = (IHi - ILo)/(BpHi-BpLo) (Cp-BpLo) +ILo
*
* Where Ip = the index for pollutant p
* Cp = the truncated concentration of pollutant p
* BPHi = the concentration breakpoint that is greater than or equal to Cp
* BPLo = the concentration breakpoint that is less than or equal to Cp
* IHi = the AQI value corresponding to BPHi
* ILo = the AQI value corresponding to BPLo
*/
String[] parts = aqiVal.split("-");
Float IpLo = Float.parseFloat(parts[0]);
Float IpHi = Float.parseFloat(parts[1]);
Float ip = (IpHi - IpLo) / (BpHi - BpLo) * (fVal - BpLo) + IpLo;
if (ip > aqiCalcVal) {
//use the max index calculated so far
aqiCalcVal = Math.round(ip);
}
}
}
}
jsonRet.put("aqi", aqiCalcVal);
}
public static void main(String[] args) {
if (args.length == 0) {
System.out.println("No files provided.");
System.exit(0);
}
try {
String resultsDir = "." + File.separator + "AQIData";
if (System.getProperty("WINDOWS") != null) {
System.setProperty("hadoop.home.dir", "C:\\winutils");
Path resultsPath = Paths.get(resultsDir);
if (Files.exists(resultsPath)) {
Files.list(resultsPath).forEach(file -> {
try {
Files.delete(file);
} catch (IOException e) {
e.printStackTrace();
}
});
Files.delete(resultsPath);
}
}
} catch (IOException e) {
e.printStackTrace();
}
try{
//Read in the breakpoints.csv used to calculate the AQI
List<String> allLines = null;
if (System.getProperty("WINDOWS") != null) {
allLines = Files.readAllLines(Paths.get("breakPoints.csv"));
} else {
//TODO
//allLines = readHDFSFile(args[0]+File.separator+"breakPoints.csv", conf);
}
int first = 0;
String[] headings = null;
for(String line : allLines) {
if(first == 0) {
//first line is the headings
//O3-8,O3-1,PM25-1,PM10-1,CO-1,SO2-1,NO2-1,AQI
headings = line.split(",");
first++;
continue;
}
String parts[] = line.split(",");
int aqiValHeading= 0;
Map<String,String> colMap = new HashMap<>();
for(int i = 2 ; i < parts.length; i++) {
String heading = headings[i].toUpperCase();
colMap = breakPointsTable.get(heading) ;
if(colMap == null) {
colMap = new HashMap<>();
breakPointsTable.put(heading,colMap);
String[] paramParts = heading.split("-");
//for pm10 and pm25, add 24 hour sampling also
if(heading.startsWith("PM10") || heading.startsWith("PM25") || heading.startsWith("SO2")) {
breakPointsTable.put(paramParts[0]+"-24",colMap);
}
if(heading.startsWith("CO")) {
breakPointsTable.put(paramParts[0]+"-8",colMap);
}
}
colMap.put(parts[i],parts[0]);
}
}
calcAQI(args[0], ".");
} catch (IOException e) {
e.printStackTrace();
}
}
private static List<String> readHDFSFile(String path, Configuration conf) throws IOException {
org.apache.hadoop.fs.Path pt = new org.apache.hadoop.fs.Path(path);
FileSystem fs = FileSystem.get(pt.toUri(), conf);
FSDataInputStream file = fs.open(pt);
BufferedReader buffIn = new BufferedReader(new InputStreamReader(file));
List<String> allRows = new ArrayList<>();
String line;
while ((line = buffIn.readLine()) != null) {
allRows.add(line);
}
return allRows;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment