Summary
In this notebook, fundamental of big data in PySpark is presented. First, it will show how to install PySpark on local machine for testing, debugging and demonstration. Then, PySpark DataFrame and PySparkSQL will be discussed. Finally, application of machine learning with PySpark MLib library will be presented with synthetic and realistic examples.
Python functions and data files needed to run this notebook are available via this link.
Big data is a term used to refer to the study and applications of data sets that are too complex for traditional data-processing so/ware - Wikipedia
Three Versions of Big Data
Volume: Size of the data. Volumes of data that can reach unprecedented heights in fact. It is now not uncommon for large companies to have Terabytes – and even Petabytes – of data in storage devices and on servers.
Variety: Different sources and formats. Data was once collected from one place and delivered in one format. Once taking the shape of database files - such as, excel, csv and access - it is now being presented in non-traditional forms, like video, text, pdf, and graphics on social media, as well as via tech such as wearable devices.
Velocity: Speed of the data. Velocity essentially measures how fast the data is coming in. Some data will come in in real-time, whereas other will come in fits and starts, sent to us in batches.
retrieved from https://bigdataldn.com/news/big-data-the-3-vs-explained/
Concepts and Terminology of Big Data
In this notebook, we present Apache Spark.
Main Features of Apache Spark
Apache Spark Components
Deployment of Spark Modes
Local mode: It has a single machine such as pc, laptop
Cluster mode: It has a set of pre-defined machines
The workflow is to start with local model and transition to cluster mode. Changing code is not require for this transition. Here we apply local mode.
Apache Spark is written in Scala. Apache Spark Community released PySpark to support Python with Spark. PySpark is similar computation speed and power as Scala. PySpark APIs are similar to Pandas and Scikit-learn
PySpark Shell is the Python-based command line tool. PySpark shell allows data scientists interface with Spark data structures. PySpark shell support connecting to a cluster. Figure below shows Pyspark shell.
SparkContext
is an entry point into the world of SparkSpark cluster
PySpark
has a default SparkContext
called sc
Prerequest to install pyspark is to install java JDK
Next, install pyspark:
conda install pyspark
download spark from https://www.apache.org/dyn/closer.lua/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz then put the unzipped file in folder "C:\spark3".
winutils.exe should be downloaded from link below and place it in bin folder of downloaded spark https://github.com/steveloughran/winutils/blob/master/hadoop-2.7.1/bin/winutils.exe
Finally, run the code below to install pyspark on your environment:
C:\Users\MehdiRezvandehy>conda install -c conda-forge findspark
See these videos for details to install Pyspark :
import findspark
import findspark
findspark.init()
findspark.find()
## import pyspark
import pyspark
print(pyspark.__version__)
# 3.3.0
from pyspark import SparkContext
sc=SparkContext(master='local')
# Verify SparkContext (SparkContext called as sc)
print(sc)
# Print Spark version
print(sc.version)
sc.pythonVer
sc.master
SparkContext's parallelize()
method is used to load data in PySpark:
rdd = sc.parallelize([1,2,3,4,5])
SparkContext's textFile()
method is used to load a text file in PySpark:
rdd2 = sc.textFile("test.txt")
# Print the version of SparkContext
print("The version of Spark Context in the PySpark shell is", sc.version)
# Print the Python version of SparkContext
print("The Python version of Spark Context in the PySpark shell is", sc.pythonVer)
# Print the master of SparkContext
print("The master of Spark Context in the PySpark shell is", sc.master)
# Create a Python list of numbers from 1 to 100
numb = range(0, 20)
# Load the list into PySpark
spark_data = sc.parallelize(numb)
Python Anonymous Functions
Lambda
functions are anonymous functions in Pythonmap()
and filter()
# general expression
lambda arguments: expression
times2 = lambda x: x * 2
print(times2(3))
# Compare with def
def square(x):
return x ** 2
s = lambda x: x ** 2
print(s(10))
print(square(10))
lambda
does not need return statementlambda
function can be put anywhereLambda
function with map()
map()
function takes a function and a list and returns a new list which contains items returned by that function for each itemmap(function, list)
items = [0, 2, 5, 6]
list(map(lambda x: x*2 , items))
We should always have map within a list to show the result.
Lambda
function with filter()
filter()
function takes a function and a list and returns a new list for which the function evaluates as true.filter(function, list)
items = [0, 2, 5, 6]
list(filter(lambda x: (True if (x%2 == 0) else False), items))
items = [0, 2, 5, 6]
list(filter(lambda x: (x%2 == 0), items))
RDD (Resilient Distributed Datasets):
Parallelizing
parallelize()
is used for creating RDDs from python lists.RDDnum = sc.parallelize([1,2,3,4])
RDDhello = sc.parallelize("Hello world")
PySpark
can easily create RDDs from files that are stored in external storage devices such as HDFS (Hadoop Distributed File System), Amazon S3 buckets, etc:
textFile()
for creating RDDs from external datasetsRDDfile = sc.textFile("README.md", minPartitions = 6)
type(RDDfile)
Basic RDD Transformations
map()
, filter()
, flatMap()
, and union()
map()
Transformation
map()
transformation applies a function to all elements in the RDDRDD = sc.parallelize([1,2,3,4])
RDD_map = RDD.map(lambda x: x * x)
filter()
Transformation
RDD = sc.parallelize([1,2,3,4])
RDD_filter = RDD.filter(lambda x: x > 2)
flatMap()
Transformation
flatMap()
transformation returns multiple values for each element in the original RDDRDD = sc.parallelize(["hello world", "how are you"])
RDD_flatmap = RDD.flatMap(lambda x: x.split(" "))
union()
Transformationunion()
is a transformation in PySpark that is used to merge two or more data frames in a PySpark application. The union operation is applied to spark data frames with the same schema and structure.#input_RDD = sc.textFile("logs.txt")
#error_RDD = input_RDD.filter(lambda x: "error" in x.split())
#warnings_RDD = input_RDD.filter(lambda x: "warnings" in x.split())
#combined_RDD = error_RDD.union(warnings_RDD)
collect()
take(N)
first()
count()
collect()
and take()
Actions
collect()
return all the elements of the dataset as an arraytake(N)
returns an array with the ,first N elements of the datasetRDD = sc.parallelize(["hello world", "how are you doing"])
flatmap_RDD = RDD.flatMap(lambda x: x.split(" "))
flatmap_RDD.collect()
flatmap_RDD.take(3)
first()
and count()
Actions
first()
prints the first element of the RDDflatmap_RDD.first()
flatmap_RDD.count()
We usually have key/value pairs in real life datasets. Each row is a key and map to one or more values. Pair RDD is a special data structure to work with this kind of datasets.
There are two common ways to create pair RDDs:
1. From a list of key-value tuple
my_tuple = [('Ali', 15), ('John', 18), ('Peter', 68)]
tuple_pairRDD = sc.parallelize(my_tuple)
tuple_pairRDD.collect()
2. From a regular RDD
my_list = ['Ali 15', 'John 18', 'Peter 68']
RDD_regular = sc.parallelize(my_list)
RDD_pairRDD = RDD_regular.map(lambda s: (s.split(' ')[0], s.split(' ')[1]))
RDD_pairRDD.collect()
All regular transformations work on pair RDD. Have to pass functions that operate on key value pairs rather than on individual elements.
Examples of paired RDD Transformations:
reduceByKey(func)
: Combine values with the same keygroupByKey()
: Group values with the same keysortByKey()
: Return an RDD sorted by the keyjoin()
: Join two pair RDDs based on their keyBelow explains each paired RDD Transformations.
reduceByKey()
transformation: it combines values with the same key.regularRDD = sc.parallelize([ ("Ronaldo", 12),("Messi", 15),("Neymar", 36),("Ronaldo", 8),("Messi", 3)])
reducebykey_pairRDD = regularRDD.reduceByKey(lambda x,y : x + y)
reducebykey_pairRDD.collect()
sortByKey()
transformation: it orders pair RDD by keyreducebykey_pairRDD_sort = reducebykey_pairRDD.map(lambda x: (x[1], x[0]))
reducebykey_pairRDD_sort.sortByKey(ascending=False).collect()
groupByKey()
transformation: it groups all the values with the same key in the pair RDD.airprts = [("US", "JFK1"),("UK", "LHYR"),("US", "FOSQ"),("FR", "CYDG"),("US", "S_FO")]
regularRDD = sc.parallelize(airprts)
pairRDD_groupby = regularRDD.groupByKey().collect()
for conts, airs in pairRDD_groupby:
print(conts, list(airs))
join()
transformation: it joins two pair RDDs based on their key.RDD1 = sc.parallelize([("Ronaldo", 13),("Messi", 12),("Neymar", 32)])
RDD2 = sc.parallelize([("Neymar", 39),("Ronaldo", 10),("Messi", 89)])
RDD1.join(RDD2).collect()
reduce()
: it is used for aggregating the elements of a regular RDD. The function should be commutative (changing the order of the operands does not change the result.x = [4,3,2,1]
RDD = sc.parallelize(x)
RDD.reduce(lambda x, y : x + y)
saveAsTextFile()
: it saves the path of a file and writes the content of the RDD to that file. The path is considered as a directory, and multiple outputs will be produced in that directory. This is how Spark becomes able to write output from multiple codes.#RDD.saveAsTextFile("tempFile")
coalesce()
method can be used to save RDD as a single text file.#RDD.coalesce(1).saveAsTextFile("tempFile")
countByKey()
: it only available for type (K, V). It counts the number of elements for each key.rdd = sc.parallelize([("a", 2), ("c", 10), ("a", 4), ("b", 3), ("c", 6), ("a", 1), ("b", 8)])
for keys, vals in rdd.countByKey().items():
print(keys, vals)
collectAsMap()
: it returns the key-value pairs in the RDD as a dictionarysc.parallelize([(1, 2), (3, 4)]).collectAsMap()
PySpark SQL is a Spark library for structured data. It provides more information about the structure of data and computation.
PySpark DataFrames support both SQL queries (SELECT * from table
) or expression methods (df.select()
).
SparkContext
is the main entry point for creating RDDsSparkSession
is used to create DataFrame, register DataFrames, execute SQL queriesSparkSession
's createDataFrame()
method5.1.1 Create DataFrame from RDD
iphones_RDD = sc.parallelize([
("8Plus", 2017, 6.23, 3.07, 7.12),
("XR", 2018, 5.94, 2.98, 6.84),
("XS", 2018, 5.65, 2.79, 6.24),
("X10", 2017, 5.65, 2.79, 6.13)
])
names = ['Model', 'Year', 'Height', 'Width', 'Weight']
from pyspark.sql.session import SparkSession
spark = SparkSession(sc)
iphones_df = spark.createDataFrame(iphones_RDD, schema=names)
iphones_df.show()
5.1.2 Create DataFrame from reading a CSV/JSON/TXT
df_csv = spark.read.csv("./Data/Customer_Churn.csv", header=True, inferSchema=True)
df_csv.show(5)
header=True
, inferSchema=True
df_json = spark.read.json("./Data/Customer_Churn.json", multiLine = "true")
df_json.show(5)
df_txt = spark.read.text("./Data/movies.txt")
df_txt.show(5)
5.1.3 DataFrame operators in PySpark
PySpark DataFrame Transformations:
select()
filter()
groupby()
orderby()
dropDuplicates()
withColumnRenamed()
PySpark DataFrame Actions :
printSchema()
head()
show()
count()
columns
and describe()
select()
df_Balance = df_csv.select('Balance')
df_Balance.show(5)
filter()
df_Balance_filter = df_Balance.filter(df_Balance.Balance > 200000)
df_Balance_filter.show(5)
groupby()
count()
.df_Tenure_group = df_csv.groupby('Tenure')
df_Tenure_group.count().show(5)
orderby()
df_Tenure_group.count().orderBy('Tenure').show(5)
dropDuplicates()
test_df_no_dup = df_csv.select('Gender','Age','Tenure').dropDuplicates()
test_df_no_dup.show(5)
withColumnRenamed
df_sex = df_csv.withColumnRenamed('Gender', 'Sex')
df_sex.show(5)
printSchema()
df_csv.printSchema()
columns
actionsdf_csv.columns
describe()
df_csv.select('CreditScore','Age','Tenure','Balance').describe().show(5)
sql()
method executes SQL querysql()
method takes a SQL statement as an argument and returns the result as DataFramefrom pyspark.sql.session import SparkSession
spark = SparkSession(sc)
# Create a temporary table from PySpark DataFrame
df_csv.createOrReplaceTempView("table1")
query = '''SELECT CustomerId,
Geography,
CreditScore
FROM table1
limit 5'''
df2 = spark.sql(query)
df2.collect()
query = '''SELECT Tenure,
max(Balance)
FROM table1
GROUP BY Tenure'''
spark.sql(query).show(5)
Ploting graphs using PySpark DataFrames is done using three methods:
pyspark_dist_explore
librarytoPandas()
HandySpark
library6.1 Pyspark_dist_explore
There are three functions for Pyspark_dist_explore
:
hist()
distplot()
pandas_histogram()
df_csv = spark.read.csv("./Data/Customer_Churn.csv", header=True, inferSchema=True)
df_Tenure = df_csv.select('Tenure')
from pyspark_dist_explore import hist
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
hist(ax,x=df_Tenure, bins=20, color="red")
plt.show()
6.2 Pandas
df_Tenure_pandas = df_csv.toPandas()
df_Tenure_pandas.hist('Tenure', bins=20)
plt.show()
6.3 Pandas DataFrame vs PySpark DataFrame
6.4 HandySpark
from handyspark import *
hdf = df_csv.toHandy()
hdf.cols["Tenure"].hist()
pyspark.mllib
is Spark’s machine learning (ML) library. Its goal is to make practical machine learning scalable and easy. It can only support RDDs unless you change DataFrames to RDDs.
PySpark MLlib Algorithms are:
Three C's of machine learning in PySpark MLlib
Collaborative filtering is finding users that share common interests. For example, companies like Amazon and Netflix providing recommendations based on users' interest. It is commonly used for recommender systems:
In Figure below, if User 1 likes Item A, Item B, and Item C and User 2 likes Item B and Item C, there is a high probability that User 2 also likes Item A and can recommend Item A to User 2.
Image is retrieved from towardsdatascience.com
Another example below is considering Book recommendation scenario. User A and B have given high ratings to the Book1 and Book2. Then it can assume that they must have similar taste. Therefore, there is a higher probability that User B will like a book that he/she haven’t come across but is rated highly by User A. In this scenario Book3 is rated by User A. But User B haven’t come across it, so the recommendation system recommends Book 3 to User B.
Image is retrieved from medium.com
from pyspark.mllib.recommendation import Rating
rt = Rating(user = 1, product = 2, rating = 6.0)
(rt[0], rt[1], rt[2])
7.1.1.1 Splitting the data
data = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
trainingSet, testSet=data.randomSplit([0.7, 0.3])
trainingSet.collect()
testSet.collect()
7.1.1.2 Alternating Least Squares (ALS)
Alternating Least Squares (ALS) algorithm in spark.mllib
provides collaborative filtering
ALS.train(ratings, rank, iterations)
rt1 = Rating(1, 1, 0.5)
rt2 = Rating(1, 2, 0.75)
rt3 = Rating(2, 1, 0.7)
ratings = sc.parallelize([rt1, rt2, rt3])
ratings.collect()
from pyspark.mllib.recommendation import ALS
model = ALS.train(ratings, rank=10, iterations=10)
7.1.1.3 predictAll()
to Returns RDD of Rating Objects
predictAll()
method returns a list of predicted ratings for input user and product pair# unrated RDD for rate prediction
unrated_RDD = sc.parallelize([(1, 1), (1, 2)])
predictions = model.predictAll(unrated_RDD)
predictions.collect()
7.1.1.4 Model Evaluation with MSE
# sort out training set
rt1 = Rating(1, 1, 0.5)
rt2 = Rating(1, 2, 0.75)
rt3 = Rating(2, 1, 0.7)
ratings = sc.parallelize([rt1, rt2, rt3])
rates = ratings.map(lambda x: ((x[0], x[1]), x[2]))
rates.collect()
# sort out prediction
unrated_RDD = sc.parallelize([(1, 1), (1, 2)])
predictions = model.predictAll(unrated_RDD)
predictions.collect()
preds = predictions.map(lambda x: ((x[0], x[1]), x[2]))
preds.collect()
rates_preds = rates.join(preds)
rates_preds.collect()
MSE = rates_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
MSE
In this exercise, the goal is to develop a simple movie recommendation system using PySpark MLlib using a subset of MovieLens 100k dataset.
We first load the MovieLens data (ratings.csv) into RDD and from each line in the RDD which is formatted as userId,movieId,rating,timestamp
, we need to map the MovieLens data to a Ratings object (userID, productID, rating
) after removing timestamp column and finally split the RDD into training and test RDDs.
# Load the data into RDD
data = spark.read.csv("./Data/ratings.csv", header=True, inferSchema=True)
data.take(10)
# Transform the ratings RDD
ratings_final = sc.parallelize(list(map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])),data.collect())))
ratings_final.take(10)
# Split the data into training and test
data_training, data_test = ratings_final.randomSplit([0.8, 0.2])
model = ALS.train(data_training, rank=10, iterations=10,nonnegative=True,lambda_=0.02)
# Drop the ratings column from test set
data_test_without_rating = data_test.map(lambda p: (p[0], p[1]))
#data_test_without_rating =sc.parallelize(data_test_without_rating.collect())
# Predict the model
predictions = model.predictAll(data_test_without_rating)
predictions.take(10)
Lets calculate MSE for predicted ALS:
# Prepare ratings data
rates = ratings_final.map(lambda r: ((r[0], r[1]), r[2]))
rates.take(10)
# Prepare predictions data
preds = predictions.map(lambda r: ((r[0], r[1]), r[2]))
# Join the ratings data with predictions data
rates_and_preds = rates.join(preds)
rates_and_preds.take(10)
# Calculate and print MSE
MSE = rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error of the model for the test data = {:.2f}".format(MSE))
PySpark MLlib contains two specific data types Vectors:
from pyspark.mllib.linalg import Vectors
denseVec = Vectors.dense([2.0, 4.0, 8.0])
print(denseVec)
sparseVec = Vectors.sparse(3, {6: 1.0, 2: 4.5})
print(sparseVec)
LabelledPoint()
in PySpark MLlibA LabeledPoint
is a wrapper for input features and predicted value. A label is either 0 (negative) or 1 (positive) for a binary classification of Logistic Regression,
from pyspark.mllib.regression import LabeledPoint
negative = LabeledPoint(0.0, [3.0, 2.0, 5.0])
positive = LabeledPoint(1.0, [2.0, 3.0, 1.0])
print(positive)
print(negative)
Logistic Regression in Pyspark MLlib can be done by LogisticRegressionWithLBFGS
class.
data_lr = [
LabeledPoint(0.0, [2.0, 1.0]),
LabeledPoint(1.0, [1.0, 2.0]),
]
RDD_lr = sc.parallelize(data_lr)
RDD_lr.collect()
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
lr = LogisticRegressionWithLBFGS.train(RDD_lr)
print(lr.predict([2.0, 1.0]))
print(lr.predict([1.0, 2.0]))
Logistic regression is applied to predict churn rate. churn_retain.csv data set can be downloaded from Kaggle: the target feature is Exited (0 and 1). Exited=1 means churn and Exited=0 means retain. The goal is to train a logistic regression model for predicting churn/retain.
# Load the data into RDD
data = spark.read.csv("./Data/churn_retain.csv", header=True, inferSchema=True)
data.take(5)
# Split the data into training and test
data_training, data_test = data.randomSplit([0.8, 0.2])
# Filter data based on retain and chain
data_0=data_training.filter(data_training.Exited ==0)
data_1=data_training.filter(data_training.Exited ==1)
data_0.collect()[0][9]
# prepare training set
retain= list(map(lambda l: LabeledPoint(0.0,[int(l.CreditScore), float(l.Age), float(l.Tenure), float(l.Balance)
, int(l.NumOfProducts), int(l.HasCrCard), int(l.IsActiveMember),
int(l.EstimatedSalary)]),data_0.collect()))
churn= list(map(lambda l: LabeledPoint(1.0,[int(l.CreditScore), float(l.Age), float(l.Tenure), float(l.Balance)
, int(l.NumOfProducts), int(l.HasCrCard), int(l.IsActiveMember),
int(l.EstimatedSalary)]),data_1.collect()))
data_to_train = retain+churn
RDD_to_train = sc.parallelize(data_to_train)
# Train lr model
lr = LogisticRegressionWithLBFGS.train(RDD_to_train, iterations=10)
lr
data_test_to_predict=sc.parallelize(list(map(lambda l: [int(l.CreditScore), float(l.Age), float(l.Tenure), float(l.Balance)
, int(l.NumOfProducts), int(l.HasCrCard), int(l.IsActiveMember),
int(l.EstimatedSalary)],data_test.collect())))
# Predict test set using trained model
predictions = lr.predict(data_test_to_predict)
y_test_pred=predictions.collect()
y_test_label=list(map(lambda l: int(l.Exited),data_test.collect()))
from sklearn.metrics import accuracy_score
from sklearn.metrics import recall_score
from sklearn.metrics import roc_curve, auc
from sklearn.metrics import precision_recall_curve
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_score, recall_score
conf_mx=confusion_matrix(y_test_label,y_test_pred)
print('Confusion Matrix (Test):\n',conf_mx)
print('------------------------------')
acr=accuracy_score(y_test_label,y_test_pred)
print('\n Accuracy (Test):\n',acr)
print('------------------------------')
prec=precision_score(y_test_label,y_test_pred) # == TP/(TP+FP)
print('\n Precision (Test):\n',prec)
print('------------------------------')
reca=recall_score(y_test_label,y_test_pred) # == TP/(TP+FN) )
print('\n Recall (Test):\n',reca)
#print('------------------------------')
#fpr, tpr, thresold = roc_curve(y_test_label,y_test_pred)
#roc_auc = auc(fpr, tpr)
#print('\n AUC (Test):\n',roc_auc)
from pyspark.mllib.tree import RandomForest
rf = RandomForest.trainClassifier(RDD_to_train, 2, {}, 3, seed=42)
rf
# Predict test set using trained model
predictions = rf.predict(data_test_to_predict)
y_test_pred=predictions.collect()
y_test_label=list(map(lambda l: int(l.Exited),data_test.collect()))
conf_mx=confusion_matrix(y_test_label,y_test_pred)
print('Confusion Matrix (Test):\n',conf_mx)
print('------------------------------')
acr=accuracy_score(y_test_label,y_test_pred)
print('\n Accuracy (Test):\n',acr)
print('------------------------------')
prec=precision_score(y_test_label,y_test_pred) # == TP/(TP+FP)
print('\n Precision (Test):\n',prec)
print('------------------------------')
reca=recall_score(y_test_label,y_test_pred) # == TP/(TP+FN) )
print('\n Recall (Test):\n',reca)
#print('------------------------------')
#fpr, tpr, thresold = roc_curve(y_test_label,y_test_pred)
#roc_auc = auc(fpr, tpr)
#print('\n AUC (Test):\n',roc_auc)