In order to determine how the ages at which Players hit their Peak, we look at Historical Baseball Data available on the Internet. The specific source of data chosen here is a database of baseball statistics over the years 1870 to 2016. http://www.seanlahman.com/baseball-database.html
This database has 27 tables. However to obtain the answer for our query above, we need to cross reference data from 2 tables in this database. The Master.csv table lists every player that has played the game from 1870 to 2016, along with their year of birth . Its schema is listed below.
Field | Description |
---|---|
playerID | A unique code asssigned to each player |
birthYear | Year player was born |
birthMonth | Month player was born |
birthDay | Day player was born |
birthCount | Country where player was born |
birthState | State where player was born |
birthCity | City where player was born |
deathYear | Year player died |
deathMonth | Month player died |
deathDay | Day player died |
deathCount | Country where player died |
deathState | State where player died |
deathCity | City where player died |
nameFirst | Player's first name |
nameLast | Player's last name |
nameGiven | Player's given name |
weight | Player's weight in pounds |
height | Player's height in inches |
bats | Player's batting hand (left, right) |
throws | Player's throwing hand (left or right) |
debut | Date that player made first appearance |
finalGame | Date that player made last appearance |
retroID | ID used by retrosheet |
bbrefID | ID used by Baseball Reference website |
The Batting.csv table lists the batting statistics for every player, for every year that he played the game of baseball between 1870 and 2016. Its schema is listed below
Field | Description |
---|---|
playerID | A unique code asssigned to each player |
yearID | Year |
stint | players stint |
teamID | Team |
lgID | League |
G | Games Played |
AB | At Bats |
R | Runs Scored |
H | Hits |
2B | Doubles |
3B | Triples |
HR | Homeruns |
RBI | Runs Batted In |
SB | Stolen Bases |
CS | Caught Stealing |
BB | Base on Balls |
SO | Strike Outs |
IBB | Intentional Wals |
HBP | Hit by Pitch |
SH | Sacrifice Hits |
SF | Sacrifice Flies |
GIDP | Grounded into Double Plays |
We Utilize Apache Spark to perform the required database operations to answer our questions. The Code below explains the process of answering these questions, and shows how easy it is to use Spark to analyze Big Data. The Code to implement this query is implemented in Python, and can either be run on a local server or a cluster of servers. The example below was run on an Amazon EC2 Free Tier Ubuntu Server instance. The EC2 instance was set up with Python (Anaconda 3-4.1.1), Java, Scala, py4j, Spark and Hadoop. The code was written and executed in a Jupyter Notebook. Several guides are available on the internet describing how to install and run spark on an EC2 instance. One that particularly covers all these facets is https://medium.com/@josemarcialportilla/getting-spark-python-and-jupyter-notebook-running-on-amazon-ec2-dec599e1c297
Import the pyspark libraries to allow python to interact with spark. A description of the basic functionality of each of these libaries is provided in the code comments below. A more detailed explanation of the functionality of each of these libraries can be found in Apache's documentation on Spark https://spark.apache.org/docs/latest/api/python/index.html
# Import SparkContext. This is the main entry point for Spark functionality
# Import Sparkconf. We use Spark Conf to easily change the configuration settings when changing between local mode cluster mode.
# Import SQLContext from pyspark.sql. We use the libraries here to read in data in csv format. The format of our native database
# Import count, avg, round from pyspark.sql.functions. This is used for the math operations needed to answer our questions
# Import Window from pyspark.sql to allow us to effectively partition and analyze data
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.functions import count
from pyspark.sql.functions import avg
from pyspark.sql.functions import round
from pyspark.sql.functions import cume_dist
from pyspark.sql.window import Window
We configure spark for local mode or cluster mode, configure our application name, and configure logging. Several other configuration settings can be programmed as well. A detailed explanation of these can be found at https://spark.apache.org/docs/latest/configuration.html
We pass the configuration to an instance of a SparkContext object, so that we can begin using Apache Spark
# The Master will need to change when running on a cluster.
# If we need to specify multiple cores we can list something like local[2] for 2 cores, or local[*] to use all available cores.
# All the available Configuration settings can be found at https://spark.apache.org/docs/latest/configuration.html
sc_conf = SparkConf().setMaster('local[*]').setAppName('Question3').set('spark.logConf', True)
# We instantiate a SparkContext object with the SparkConfig
sc = SparkContext(conf=sc_conf)
We use the SQLContext library to easily allow us to read the csv files 'Salaries.csv' and 'Teams.csv'. These files are currently stored in Amazon s3 storage (s3://cs498ccafinalproject/) and are publicly available for download. They were copied over to a local EC2 instance by using the AWS command line interace command
aws s3 cp s3://cs498ccafinalproject . --recursive
# We create a sql context object, so that we can read in csv files easily, and create a data frame
sqlContext = SQLContext(sc)
df_master = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('Master.csv')
df_bat = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('Batting.csv')
In order to determine how the Age a player is most effective, we perform the following operations.
1) We extract columns of data from the 2 tables that we need.
2) We clean the data to remove null entries and filter out entries where a player did not have enough bats in a season to qualify for a batting title (502). This makes sure we remove statistically insignificant entries.
3) We calculate additional batting statistics about the player such as Batting Average, Slugging Percentage, On base Perentage, On base Plus Slugging Percentage .
4) We then perform a merge on the 2 tables, based on the playerID
5) We calculate the age of a player, based on the year he was born and the year he was in the major
NOTE:
Batting Average = Hits/At Bats
Slugging Percentage = total bases divided by at bats: where AB is the number of at-bats for a given player, and 1B, 2B, 3B, and HR are the number of singles, doubles, triples, and home runs, respectively.
On base Percentage = (Hits + Walks + Hit by Pitch) / (At Bats + Walks + Hit by Pitch + Sacrifice Flies)
On base plus Slugging = On base percentage plus Slugging Percentage
# Keep the playerID and the birthYear from the Master table
keep = [df_master.playerID, df_master.birthYear ]
df_master_data = df_master.select(*keep).filter(df_master.birthYear != "")
# Keep the playerID, yearID, AB, R, H, 2B, 3B, HR, RBI, SB, BB, HBP, SF from the Master Table
keep2 = ['playerID', 'yearID', 'AB', 'R', 'H', '2B', '3B', 'HR', 'RBI', 'SB', 'BB', 'HBP', 'SF']
df_bat_data = df_bat.select(*keep2)
# Replace null entries with Zero in the batting stats
df_bat_no_null = df_bat_data.na.fill(0)
# Filter out statistically insnificant entries for batting
df_bat_filt = df_bat_no_null.filter((df_bat_no_null.AB >= 502)).withColumnRenamed('2B', 'DB').withColumnRenamed('3B', 'TR')
# Calculate Advanced batting stats, average, slugging pct, on base pct, on base plus slug pct
df_bat_stats = df_bat_filt.withColumn("AVG", round(df_bat_filt.H/df_bat_filt.AB,3)).\
withColumn("SLG", round(((df_bat_filt.H -(df_bat_filt.DB + df_bat_filt.TR + df_bat_filt.HR))+ (2*df_bat_filt.DB) + \
(3*df_bat_filt.TR) + (4*df_bat_filt.HR)) /(df_bat_filt.AB),3)).\
withColumn("OBP", round((df_bat_filt.H + df_bat_filt.BB + df_bat_filt.HBP)/(df_bat_filt.AB + df_bat_filt.BB + df_bat_filt.HBP + \
df_bat_filt.SF),3))
df_bats_adv_stats = df_bat_stats.withColumn("OPS", round(df_bat_stats.OBP + df_bat_stats.SLG,3))
# Merge the two tables
cond = [df_master_data.playerID == df_bats_adv_stats.playerID]
# Join the Player Table and the Batting Table
df_bats_merge = df_bats_adv_stats.join(df_master_data, cond, 'inner')
# Calculate age of every player in the merged table
df_bats_merge_age = df_bats_merge.withColumn("age", df_bats_merge.yearID - df_bats_merge.birthYear)
df_bats_merge_age.show()
To put our data into context, we can also look up the following information
1) Group players by their ages
a) Find the Average of the Batting Average for each age group
b) Find the Average of the On base Percentage for each age group
c) Find the Average of the Slugging Percentage for each age group
d) Find the Average of the On Base plus Slugging Percentage for each age group
# Group all the players by their age, and calculate the average batting average for each age group
df_avg_stats = df_bats_merge_age.groupBy(df_bats_merge_age.age).agg({"AVG": "avg","SLG": "avg","OBP": "avg" ,\
"OPS": "avg", "age": "count"}).\
orderBy(df_bats_merge_age.age)
df_avg_stats.show()
To put our data into furhter context, we can also look up the following information
1) Group players by their ages
a) Find the Median of the Batting Average for each age group
b) Find the Median of the On base Percentage for each age group
c) Find the Median of the Slugging Percentage for each age group
d) Find the Median of the On Base plus Slugging Percentage for each age group
NOTE: It appears Spark is not able to calculate Quantile information without HIVE. We did not install HIVE on our clusters, so instead we group players by age groups, and calculate a cumulative distribution for the batting average, slugging percentage, ob base percentage and On Base plus slugging percentage. This will allow us to divide players into quantiles. We use these quantiles to provide examples of how to look up the median data for a specific age group
# Calculating median data with HIVE is easy
#df_bats_merge_age.registerTempTable("df")
#df_quant = sqlContext.sql("select age, percentile_approx(AVG,0.5) as approxQuantile from df group by age")
# Without Hive we have to improvise
keep3 = ['yearID', 'AVG', 'SLG', 'OBP', 'OPS', 'age' ]
df_filt_bat_data = df_bats_merge_age.select(*keep3)
windowSpec = Window.partitionBy(df_filt_bat_data['age']).orderBy(df_filt_bat_data['AVG'].desc())
windowSpec2 = Window.partitionBy(df_filt_bat_data['age']).orderBy(df_filt_bat_data['SLG'].desc())
windowSpec3 = Window.partitionBy(df_filt_bat_data['age']).orderBy(df_filt_bat_data['OBP'].desc())
windowSpec4 = Window.partitionBy(df_filt_bat_data['age']).orderBy(df_filt_bat_data['OPS'].desc())
df_med_stats = df_filt_bat_data.withColumn("cumDistAvg", cume_dist().over(windowSpec)).\
withColumn("cumDistSlg", cume_dist().over(windowSpec2)).\
withColumn("cumDistObp", cume_dist().over(windowSpec3)).\
withColumn("cumDistOps", cume_dist().over(windowSpec4))
df_med_stats.show()
# Approx Median Batting Average for players of Age 27
# Answer: 0.283
df_med_stats.filter(df_med_stats.age==27).filter("cumDistAvg> 0.495 AND cumDistAvg<0.505").show()
# Approx Median OPS for players of Age 30
# Answer: 0.776
df_med_stats.filter(df_med_stats.age==30).filter("cumDistOps> 0.495 AND cumDistOps<0.505").show()
# Approx Median OBP for players of Age 25
# Answer: 0.334
df_med_stats.filter(df_med_stats.age==25).filter("cumDistObp> 0.495 AND cumDistObp<0.505").show()
# Approx Median SLG for players of Age 32, find the median Batting average
# Answer: 0.427
df_med_stats.filter(df_med_stats.age==32).filter("cumDistSlg> 0.495 AND cumDistSlg<0.505").show()
We convert our spark data frames to pandas data frames, so it is easy to save them in a human readable csv format. These files contain the answers to the questions we posed.
# Examples to show how to print the results to an output file
pandas_bats_merge_age = df_bats_merge_age.toPandas()
pandas_avg_stats = df_avg_stats.toPandas()
pandas_med_stats = df_med_stats.toPandas()
pandas_bats_merge_age.to_csv('spark_question3_bat_stats_with_age.csv')
pandas_avg_stats.to_csv('spark_question3_bat_stats_averages_by_age.csv')
pandas_med_stats.to_csv('spark_question3_bat_stats_quantile_by_age.csv')
sc.stop()