Summary
In this notebook, fundamental of big data in PySpark is presented. First, it will show how to install PySpark on local machine for testing, debugging and demonstration. Then, PySpark DataFrame and PySparkSQL will be discussed. Finally, application of machine learning with PySpark MLib library will be presented with synthetic and realistic examples.
Python functions and data files to run this notebook are in my Github page.
What is big Data¶
Big data is a term used to refer to the study and applications of data sets that are too complex for traditional data-processing so/ware - Wikipedia
Three Versions of Big Data
Volume: Size of the data. Volumes of data that can reach unprecedented heights in fact. It is now not uncommon for large companies to have Terabytes – and even Petabytes – of data in storage devices and on servers.
Variety: Different sources and formats. Data was once collected from one place and delivered in one format. Once taking the shape of database files - such as, excel, csv and access - it is now being presented in non-traditional forms, like video, text, pdf, and graphics on social media, as well as via tech such as wearable devices.
Velocity: Speed of the data. Velocity essentially measures how fast the data is coming in. Some data will come in in real-time, whereas other will come in fits and starts, sent to us in batches.
retrieved from https://bigdataldn.com/news/big-data-the-3-vs-explained/
Concepts and Terminology of Big Data
- Clustered computing: Collection of resources of multiple machines
- Parallel computing: Simultaneous computation
- Distributed computing: Collection of nodes (networked computers) that run in parallel
- Batch processing: Breaking the job into small pieces and running them on individual machines
- Real-time processing: Immediate processing of data
- Big Data Processing Systems
- Hadoop/MapReduce: Scalable and fault tolerant framework written in Java used to efficiently store and process large datasets ranging in size from gigabytes to petabytes of data. Instead of using one large computer to store and process the data, Hadoop allows clustering multiple computers to analyze massive datasets in parallel more quickly:
- Open Source
- Batch processing
- Apache Spark: an open-source, distributed processing system used for big data workloads. It utilizes in-memory caching, and optimized query execution for fast analytic queries against data of any size:
- Open source
- Both batch and real-time data processing
In this notebook, we present Apache Spark.
Main Features of Apache Spark
- Distributed cluster computing framework
- Efficient in-memory computations for large data sets. 200 times faster in memory
- Lightning fast data processing framework.
- Mainly written in Scala language but provides support for Java, Scala, Python, R and SQL
Apache Spark Components
- Spark SQL
- MLlib Machine Learning
- GraphX (manipulating graphs)
- Spark streaming
Deployment of Spark Modes
Local mode: It has a single machine such as pc, laptop
- Local model convenient for testing, debugging and demonstration
Cluster mode: It has a set of pre-defined machines
- Suitable for production
The workflow is to start with local model and transition to cluster mode. Changing code is not require for this transition. Here we apply local mode.
Spark with Python (PySpark)¶
Apache Spark is written in Scala. Apache Spark Community released PySpark to support Python with Spark. PySpark is similar computation speed and power as Scala. PySpark APIs are similar to Pandas and Scikit-learn
- Three different Spark shells:
- Spark-shell for Scala
- PySpark-shell for Python
- SparkR for R
PySpark Shell is the Python-based command line tool. PySpark shell allows data scientists interface with Spark data structures. PySpark shell support connecting to a cluster. Figure below shows Pyspark shell.
SparkContext¶
SparkContext
is an entry point into the world of Spark- An entry point is a way of connecting to
Spark cluster
- An entry point is like a key to the house
PySpark
has a defaultSparkContext
calledsc
- Version: To retrieve SparkContext version
Install PySpark¶
Prerequest to install pyspark is to install java JDK
Next, install pyspark:
conda install pyspark
- Create Environment Path
- "JAVA_HOME" as "C:\Program Files\Java\jdk1.8.0_341"
- "PATH" = "C:\Program Files\Java\jdk1.8.0_341\bin"
download spark from https://www.apache.org/dyn/closer.lua/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz then put the unzipped file in folder "C:\spark3".
winutils.exe should be downloaded from link below and place it in bin folder of downloaded spark https://github.com/steveloughran/winutils/blob/master/hadoop-2.7.1/bin/winutils.exe
- "SPARK_HOME" = "C:\spark3\spark-3.3.1-bin-hadoop3\bin"
- "HADOOP_HOME" = "C:\spark3\spark-3.3.1-bin-hadoop3\bin"
Finally, run the code below to install pyspark on your environment:
C:\Users\MehdiRezvandehy>conda install -c conda-forge findspark
See these videos for details to install Pyspark :
import findspark
import findspark
findspark.init()
findspark.find()
'C:\\spark\\spark-3.3.1-bin-hadoop3'
## import pyspark
import pyspark
print(pyspark.__version__)
# 3.3.0
3.3.1
from pyspark import SparkContext
sc=SparkContext(master='local')
# Verify SparkContext (SparkContext called as sc)
print(sc)
# Print Spark version
print(sc.version)
<SparkContext master=local appName=pyspark-shell> 3.3.1
- Python Version: To retrieve Python version of SparkContext
sc.pythonVer
'3.9'
- Master: URL of the cluster or “local” string to run in local mode of SparkContext
sc.master
'local'
- How to Load data in PySpark
SparkContext's parallelize()
method is used to load data in PySpark:
rdd = sc.parallelize([1,2,3,4,5])
SparkContext's textFile()
method is used to load a text file in PySpark:
rdd2 = sc.textFile("test.txt")
# Print the version of SparkContext
print("The version of Spark Context in the PySpark shell is", sc.version)
# Print the Python version of SparkContext
print("The Python version of Spark Context in the PySpark shell is", sc.pythonVer)
# Print the master of SparkContext
print("The master of Spark Context in the PySpark shell is", sc.master)
The version of Spark Context in the PySpark shell is 3.3.1 The Python version of Spark Context in the PySpark shell is 3.9 The master of Spark Context in the PySpark shell is local
# Create a Python list of numbers from 1 to 100
numb = range(0, 20)
# Load the list into PySpark
spark_data = sc.parallelize(numb)
Functional Programming in Python¶
Python Anonymous Functions
Lambda
functions are anonymous functions in Python- Very powerful and used in Python. Quite efficient with
map()
andfilter()
- Lambda functions create functions to be called later similar to def
- It returns the functions without any name (i.e anonymous)
- Lambda function
# general expression
lambda arguments: expression
<function __main__.<lambda>(arguments)>
times2 = lambda x: x * 2
print(times2(3))
6
# Compare with def
def square(x):
return x ** 2
s = lambda x: x ** 2
print(s(10))
print(square(10))
100 100
lambda
does not need return statementlambda
function can be put anywhere
Lambda
function withmap()
map()
function takes a function and a list and returns a new list which contains items returned by that function for each itemmap(function, list)
items = [0, 2, 5, 6]
list(map(lambda x: x*2 , items))
[0, 4, 10, 12]
We should always have map within a list to show the result.
Lambda
function withfilter()
filter()
function takes a function and a list and returns a new list for which the function evaluates as true.
filter(function, list)
items = [0, 2, 5, 6]
list(filter(lambda x: (True if (x%2 == 0) else False), items))
[0, 2, 6]
items = [0, 2, 5, 6]
list(filter(lambda x: (x%2 == 0), items))
[0, 2, 6]
PySpark RDD¶
- RDD = Resilient Distributed Datasets
RDD (Resilient Distributed Datasets):
- Resilient: Ability to withstand failures
- Distributed: Spanning across multiple machines
- Datasets: Collection of partitioned data e.g, Arrays, Tables, Tuples etc.,
Parallelizing
parallelize()
is used for creating RDDs from python lists.
RDDnum = sc.parallelize([1,2,3,4])
RDDhello = sc.parallelize("Hello world")
- From external datasets
PySpark
can easily create RDDs from files that are stored in external storage devices such as HDFS (Hadoop Distributed File System), Amazon S3 buckets, etc:
textFile()
for creating RDDs from external datasets
RDDfile = sc.textFile("README.md", minPartitions = 6)
type(RDDfile)
pyspark.rdd.RDD
PySpark Operation Overview¶
- Transformations creates new RDDS
- Actions performs computation on the RDDs
RDD Transformations¶
- Transformations follow Lazy evaluation
Basic RDD Transformations
map()
,filter()
,flatMap()
, andunion()
map()
Transformationmap()
transformation applies a function to all elements in the RDD
RDD = sc.parallelize([1,2,3,4])
RDD_map = RDD.map(lambda x: x * x)
filter()
Transformation- Filter transformation returns a new RDD with only the elements that pass the condition
RDD = sc.parallelize([1,2,3,4])
RDD_filter = RDD.filter(lambda x: x > 2)
flatMap()
TransformationflatMap()
transformation returns multiple values for each element in the original RDD
RDD = sc.parallelize(["hello world", "how are you"])
RDD_flatmap = RDD.flatMap(lambda x: x.split(" "))
union()
Transformation- PySpark
union()
is a transformation in PySpark that is used to merge two or more data frames in a PySpark application. The union operation is applied to spark data frames with the same schema and structure.
- PySpark
#input_RDD = sc.textFile("logs.txt")
#error_RDD = input_RDD.filter(lambda x: "error" in x.split())
#warnings_RDD = input_RDD.filter(lambda x: "warnings" in x.split())
#combined_RDD = error_RDD.union(warnings_RDD)
RDD Actions¶
- RDD actions are operations that return non-RDD values, since RDD's are lazy they do not execute the transformation functions until we call actions. hence, all these functions trigger the transformations to execute and finally returns the value of the action functions to the driver program. Basic RDD Actions
collect()
take(N)
first()
count()
collect()
andtake()
Actionscollect()
return all the elements of the dataset as an arraytake(N)
returns an array with the ,first N elements of the dataset
RDD = sc.parallelize(["hello world", "how are you doing"])
flatmap_RDD = RDD.flatMap(lambda x: x.split(" "))
flatmap_RDD.collect()
['hello', 'world', 'how', 'are', 'you', 'doing']
flatmap_RDD.take(3)
['hello', 'world', 'how']
first()
andcount()
Actionsfirst()
prints the first element of the RDD
flatmap_RDD.first()
'hello'
flatmap_RDD.count()
6
Pair RDD¶
We usually have key/value pairs in real life datasets. Each row is a key and map to one or more values. Pair RDD is a special data structure to work with this kind of datasets.
There are two common ways to create pair RDDs:
1. From a list of key-value tuple
my_tuple = [('Ali', 15), ('John', 18), ('Peter', 68)]
tuple_pairRDD = sc.parallelize(my_tuple)
tuple_pairRDD.collect()
[('Ali', 15), ('John', 18), ('Peter', 68)]
2. From a regular RDD
my_list = ['Ali 15', 'John 18', 'Peter 68']
RDD_regular = sc.parallelize(my_list)
RDD_pairRDD = RDD_regular.map(lambda s: (s.split(' ')[0], s.split(' ')[1]))
RDD_pairRDD.collect()
[('Ali', '15'), ('John', '18'), ('Peter', '68')]
- Transformations on pair RDDs
All regular transformations work on pair RDD. Have to pass functions that operate on key value pairs rather than on individual elements.
Examples of paired RDD Transformations:
reduceByKey(func)
: Combine values with the same keygroupByKey()
: Group values with the same keysortByKey()
: Return an RDD sorted by the keyjoin()
: Join two pair RDDs based on their key
Below explains each paired RDD Transformations.
reduceByKey()
transformation: it combines values with the same key.
regularRDD = sc.parallelize([ ("Ronaldo", 12),("Messi", 15),("Neymar", 36),("Ronaldo", 8),("Messi", 3)])
reducebykey_pairRDD = regularRDD.reduceByKey(lambda x,y : x + y)
reducebykey_pairRDD.collect()
[('Ronaldo', 20), ('Messi', 18), ('Neymar', 36)]
sortByKey()
transformation: it orders pair RDD by key
reducebykey_pairRDD_sort = reducebykey_pairRDD.map(lambda x: (x[1], x[0]))
reducebykey_pairRDD_sort.sortByKey(ascending=False).collect()
[(36, 'Neymar'), (20, 'Ronaldo'), (18, 'Messi')]
groupByKey()
transformation: it groups all the values with the same key in the pair RDD.
airprts = [("US", "JFK1"),("UK", "LHYR"),("US", "FOSQ"),("FR", "CYDG"),("US", "S_FO")]
regularRDD = sc.parallelize(airprts)
pairRDD_groupby = regularRDD.groupByKey().collect()
for conts, airs in pairRDD_groupby:
print(conts, list(airs))
US ['JFK1', 'FOSQ', 'S_FO'] UK ['LHYR'] FR ['CYDG']
join()
transformation: it joins two pair RDDs based on their key.
RDD1 = sc.parallelize([("Ronaldo", 13),("Messi", 12),("Neymar", 32)])
RDD2 = sc.parallelize([("Neymar", 39),("Ronaldo", 10),("Messi", 89)])
RDD1.join(RDD2).collect()
[('Ronaldo', (13, 10)), ('Messi', (12, 89)), ('Neymar', (32, 39))]
Advanced RDD Actions¶
reduce()
: it is used for aggregating the elements of a regular RDD. The function should be commutative (changing the order of the operands does not change the result.
x = [4,3,2,1]
RDD = sc.parallelize(x)
RDD.reduce(lambda x, y : x + y)
10
saveAsTextFile()
: it saves the path of a file and writes the content of the RDD to that file. The path is considered as a directory, and multiple outputs will be produced in that directory. This is how Spark becomes able to write output from multiple codes.
#RDD.saveAsTextFile("tempFile")
coalesce()
method can be used to save RDD as a single text file.
#RDD.coalesce(1).saveAsTextFile("tempFile")
countByKey()
: it only available for type (K, V). It counts the number of elements for each key.
rdd = sc.parallelize([("a", 2), ("c", 10), ("a", 4), ("b", 3), ("c", 6), ("a", 1), ("b", 8)])
for keys, vals in rdd.countByKey().items():
print(keys, vals)
a 3 c 2 b 2
collectAsMap()
: it returns the key-value pairs in the RDD as a dictionary
sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
{1: 2, 3: 4}
PySpark DataFrames & PySpark SQL¶
PySpark SQL is a Spark library for structured data. It provides more information about the structure of data and computation.
PySpark DataFrames support both SQL queries (
SELECT * from table
) or expression methods (df.select()
).SparkContext
is the main entry point for creating RDDsSparkSession
is used to create DataFrame, register DataFrames, execute SQL queries
PySpark DataFrames¶
- Two different methods of creating DataFrames in PySpark
- From existing RDDs using
SparkSession
'screateDataFrame()
method - From various data sources (CSV, JSON, TXT) using SparkSession's read method
- From existing RDDs using
5.1.1 Create DataFrame from RDD
iphones_RDD = sc.parallelize([
("8Plus", 2017, 6.23, 3.07, 7.12),
("XR", 2018, 5.94, 2.98, 6.84),
("XS", 2018, 5.65, 2.79, 6.24),
("X10", 2017, 5.65, 2.79, 6.13)
])
names = ['Model', 'Year', 'Height', 'Width', 'Weight']
from pyspark.sql.session import SparkSession
spark = SparkSession(sc)
iphones_df = spark.createDataFrame(iphones_RDD, schema=names)
iphones_df.show()
+-----+----+------+-----+------+ |Model|Year|Height|Width|Weight| +-----+----+------+-----+------+ |8Plus|2017| 6.23| 3.07| 7.12| | XR|2018| 5.94| 2.98| 6.84| | XS|2018| 5.65| 2.79| 6.24| | X10|2017| 5.65| 2.79| 6.13| +-----+----+------+-----+------+
5.1.2 Create DataFrame from reading a CSV/JSON/TXT
df_csv = spark.read.csv("./Data/Customer_Churn.csv", header=True, inferSchema=True)
df_csv.show(5)
+----------+-----------+---------+------+---+------+---------+---------------+------+ |CustomerId|CreditScore|Geography|Gender|Age|Tenure| Balance|EstimatedSalary|Exited| +----------+-----------+---------+------+---+------+---------+---------------+------+ | 15634602| 619| France|Female| 42| 2| 0.0| 101348.88| 1| | 15647311| 608| Spain|Female| 41| 1| 83807.86| 112542.58| 0| | 15619304| 502| France|Female| 42| 8| 159660.8| 113931.57| 1| | 15701354| 699| France|Female| 39| 1| 0.0| 93826.63| 0| | 15737888| 850| Spain|Female| 43| 2|125510.82| 79084.1| 0| +----------+-----------+---------+------+---+------+---------+---------------+------+ only showing top 5 rows
- Path to the file and two optional parameters
- Two optional parameters
header=True
,inferSchema=True
df_json = spark.read.json("./Data/Customer_Churn.json", multiLine = "true")
df_json.show(5)
+---+---------+-----------+----------+---------------+------+------+---------+------+ |Age| Balance|CreditScore|CustomerId|EstimatedSalary|Exited|Gender|Geography|Tenure| +---+---------+-----------+----------+---------------+------+------+---------+------+ | 42| 0.0| 619| 15634602| 101348.88| 1|Female| France| 2| | 41| 83807.86| 608| 15647311| 112542.58| 0|Female| Spain| 1| | 42| 159660.8| 502| 15619304| 113931.57| 1|Female| France| 8| | 39| 0.0| 699| 15701354| 93826.63| 0|Female| France| 1| | 43|125510.82| 850| 15737888| 79084.1| 0|Female| Spain| 2| +---+---------+-----------+----------+---------------+------+------+---------+------+ only showing top 5 rows
df_txt = spark.read.text("./Data/movies.txt")
df_txt.show(5)
+--------------------+ | value| +--------------------+ |title\tdescriptio...| |The Girl from Mon...| |Every Jack has a ...| |Dorian Gray\t"A v...| |How to Lose Frien...| +--------------------+ only showing top 5 rows
5.1.3 DataFrame operators in PySpark
PySpark DataFrame Transformations:
select()
filter()
groupby()
orderby()
dropDuplicates()
withColumnRenamed()
PySpark DataFrame Actions :
printSchema()
head()
show()
count()
columns
anddescribe()
select()
- it transformation subsets the columns in the DataFrame.
df_Balance = df_csv.select('Balance')
df_Balance.show(5)
+---------+ | Balance| +---------+ | 0.0| | 83807.86| | 159660.8| | 0.0| |125510.82| +---------+ only showing top 5 rows
filter()
- it transformation filters out the rows based on a condition.
df_Balance_filter = df_Balance.filter(df_Balance.Balance > 200000)
df_Balance_filter.show(5)
+---------+ | Balance| +---------+ | 213146.2| |211774.31| |209767.31| |214346.96| |203715.15| +---------+ only showing top 5 rows
groupby()
- It groups a variable and usually followed with aggregation operation such as
count()
.
- It groups a variable and usually followed with aggregation operation such as
df_Tenure_group = df_csv.groupby('Tenure')
df_Tenure_group.count().show(5)
+------+-----+ |Tenure|count| +------+-----+ | 1| 1035| | 6| 967| | 3| 1009| | 5| 1012| | 9| 984| +------+-----+ only showing top 5 rows
orderby()
- It sorts the DataFrame based on one or more columns.
df_Tenure_group.count().orderBy('Tenure').show(5)
+------+-----+ |Tenure|count| +------+-----+ | 0| 413| | 1| 1035| | 2| 1048| | 3| 1009| | 4| 989| +------+-----+ only showing top 5 rows
dropDuplicates()
- It removes the duplicate rows of a DataFrame.
test_df_no_dup = df_csv.select('Gender','Age','Tenure').dropDuplicates()
test_df_no_dup.show(5)
+------+---+------+ |Gender|Age|Tenure| +------+---+------+ |Female| 37| 1| | Male| 35| 4| |Female| 77| 6| | Male| 40| 5| | Male| 42| 7| +------+---+------+ only showing top 5 rows
withColumnRenamed
- It renames a column in the DataFrame.
df_sex = df_csv.withColumnRenamed('Gender', 'Sex')
df_sex.show(5)
+----------+-----------+---------+------+---+------+---------+---------------+------+ |CustomerId|CreditScore|Geography| Sex|Age|Tenure| Balance|EstimatedSalary|Exited| +----------+-----------+---------+------+---+------+---------+---------------+------+ | 15634602| 619| France|Female| 42| 2| 0.0| 101348.88| 1| | 15647311| 608| Spain|Female| 41| 1| 83807.86| 112542.58| 0| | 15619304| 502| France|Female| 42| 8| 159660.8| 113931.57| 1| | 15701354| 699| France|Female| 39| 1| 0.0| 93826.63| 0| | 15737888| 850| Spain|Female| 43| 2|125510.82| 79084.1| 0| +----------+-----------+---------+------+---+------+---------+---------------+------+ only showing top 5 rows
printSchema()
- It prints the types of columns in the DataFrame.
df_csv.printSchema()
root |-- CustomerId: integer (nullable = true) |-- CreditScore: integer (nullable = true) |-- Geography: string (nullable = true) |-- Gender: string (nullable = true) |-- Age: integer (nullable = true) |-- Tenure: integer (nullable = true) |-- Balance: double (nullable = true) |-- EstimatedSalary: double (nullable = true) |-- Exited: integer (nullable = true)
columns
actions- It prints the columns of a DataFrame.
df_csv.columns
['CustomerId', 'CreditScore', 'Geography', 'Gender', 'Age', 'Tenure', 'Balance', 'EstimatedSalary', 'Exited']
describe()
- It computes summary statistics of numerical columns in the DataFrame.
df_csv.select('CreditScore','Age','Tenure','Balance').describe().show(5)
+-------+-----------------+------------------+------------------+-----------------+ |summary| CreditScore| Age| Tenure| Balance| +-------+-----------------+------------------+------------------+-----------------+ | count| 10000| 10000| 10000| 10000| | mean| 650.5288| 38.9218| 5.0128|76485.88928799961| | stddev|96.65329873613035|10.487806451704587|2.8921743770496837|62397.40520238599| | min| 350| 18| 0| 0.0| | max| 850| 92| 10| 250898.09| +-------+-----------------+------------------+------------------+-----------------+
PySpark SQL¶
- Executing SQL Queries
- The SparkSession
sql()
method executes SQL query sql()
method takes a SQL statement as an argument and returns the result as DataFrame
- The SparkSession
from pyspark.sql.session import SparkSession
spark = SparkSession(sc)
# Create a temporary table from PySpark DataFrame
df_csv.createOrReplaceTempView("table1")
query = '''SELECT CustomerId,
Geography,
CreditScore
FROM table1
limit 5'''
df2 = spark.sql(query)
df2.collect()
[Row(CustomerId=15634602, Geography='France', CreditScore=619), Row(CustomerId=15647311, Geography='Spain', CreditScore=608), Row(CustomerId=15619304, Geography='France', CreditScore=502), Row(CustomerId=15701354, Geography='France', CreditScore=699), Row(CustomerId=15737888, Geography='Spain', CreditScore=850)]
- Summarizing and grouping data using SQL queries
query = '''SELECT Tenure,
max(Balance)
FROM table1
GROUP BY Tenure'''
spark.sql(query).show(5)
+------+------------+ |Tenure|max(Balance)| +------+------------+ | 1| 211774.31| | 6| 206014.94| | 3| 250898.09| | 5| 216109.88| | 9| 222267.63| +------+------------+ only showing top 5 rows
Data Visualization in PySpark¶
Ploting graphs using PySpark DataFrames is done using three methods:
pyspark_dist_explore
librarytoPandas()
HandySpark
library
6.1 Pyspark_dist_explore
There are three functions for Pyspark_dist_explore
:
hist()
distplot()
pandas_histogram()
df_csv = spark.read.csv("./Data/Customer_Churn.csv", header=True, inferSchema=True)
df_Tenure = df_csv.select('Tenure')
from pyspark_dist_explore import hist
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
hist(ax,x=df_Tenure, bins=20, color="red")
plt.show()
6.2 Pandas
df_Tenure_pandas = df_csv.toPandas()
df_Tenure_pandas.hist('Tenure', bins=20)
plt.show()
6.3 Pandas DataFrame vs PySpark DataFrame
- Pandas DataFrames are in-memory, single-server based structures while operations on PySpark run in parallel
- The result is generated as we apply any operation in Pandas whereas operations in PySpark DataFrame are lazy evaluation
- Pandas DataFrame as mutable and PySpark DataFrames are immutable
6.4 HandySpark
- It is a package designed to improve PySpark user experience
from handyspark import *
hdf = df_csv.toHandy()
hdf.cols["Tenure"].hist()
C:\spark\spark-3.3.1-bin-hadoop3\python\pyspark\sql\dataframe.py:148: UserWarning: DataFrame.sql_ctx is an internal property, and will be removed in future releases. Use DataFrame.sparkSession instead. warnings.warn( C:\spark\spark-3.3.1-bin-hadoop3\python\pyspark\sql\dataframe.py:127: UserWarning: DataFrame constructor is internal. Do not directly use it. warnings.warn("DataFrame constructor is internal. Do not directly use it.")
<AxesSubplot: title={'center': 'Tenure'}, xlabel='Tenure'>
PySpark MLlib¶
pyspark.mllib
is Spark’s machine learning (ML) library. Its goal is to make practical machine learning scalable and easy. It can only support RDDs unless you change DataFrames to RDDs.
PySpark MLlib Algorithms are:
- Classification (Binary and Multiclass) and Regression:
- Linear SVMs
- logistic regression
- decision trees
- random forests
- gradient-boosted trees
- naive Bayes
- linear least squares
- Lasso, ridge regression
- isotonic regression
- Collaborative filtering:
- Alternating least squares (ALS)
- Clustering:
- K-means
- Gaussian mixture
- Bisecting K-means
- Streaming K-Means
Three C's of machine learning in PySpark MLlib
- Collaborative filtering (recommender engines): Produce recommendations
- Classification: Identifying to which of a set of categories a new observation
- Clustering: Groups data based on similar characteristics
Collaborative filtering¶
Collaborative filtering is finding users that share common interests. For example, companies like Amazon and Netflix providing recommendations based on users' interest. It is commonly used for recommender systems:
- User-User Collaborative: Finds users that are similar to the target user
- Item-Item Collaborative: Finds and recommends items that are similar to items with the target user
In Figure below, if User 1 likes Item A, Item B, and Item C and User 2 likes Item B and Item C, there is a high probability that User 2 also likes Item A and can recommend Item A to User 2.
Image is retrieved from towardsdatascience.com
Another example below is considering Book recommendation scenario. User A and B have given high ratings to the Book1 and Book2. Then it can assume that they must have similar taste. Therefore, there is a higher probability that User B will like a book that he/she haven’t come across but is rated highly by User A. In this scenario Book3 is rated by User A. But User B haven’t come across it, so the recommendation system recommends Book 3 to User B.
Image is retrieved from medium.com
Rating Class¶
- The Rating class is a wrapper around tuple (user, product and rating)
- Useful for parsing the RDD and creating a tuple of user, product and rating
from pyspark.mllib.recommendation import Rating
rt = Rating(user = 1, product = 2, rating = 6.0)
(rt[0], rt[1], rt[2])
(1, 2, 6.0)
7.1.1.1 Splitting the data
- Splitting data into training and testing sets
data = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
trainingSet, testSet=data.randomSplit([0.7, 0.3])
trainingSet.collect()
[1, 4, 6, 7, 9, 10]
testSet.collect()
[2, 3, 5, 8]
7.1.1.2 Alternating Least Squares (ALS)
Alternating Least Squares (ALS) algorithm in
spark.mllib
provides collaborative filteringALS.train(ratings, rank, iterations)
rt1 = Rating(1, 1, 0.5)
rt2 = Rating(1, 2, 0.75)
rt3 = Rating(2, 1, 0.7)
ratings = sc.parallelize([rt1, rt2, rt3])
ratings.collect()
[Rating(user=1, product=1, rating=0.5), Rating(user=1, product=2, rating=0.75), Rating(user=2, product=1, rating=0.7)]
from pyspark.mllib.recommendation import ALS
model = ALS.train(ratings, rank=10, iterations=10)
7.1.1.3 predictAll()
to Returns RDD of Rating Objects
- The
predictAll()
method returns a list of predicted ratings for input user and product pair - The method takes in an RDD without ratings to generate the ratings
# unrated RDD for rate prediction
unrated_RDD = sc.parallelize([(1, 1), (1, 2)])
predictions = model.predictAll(unrated_RDD)
predictions.collect()
[Rating(user=1, product=1, rating=0.4996941937079406), Rating(user=1, product=2, rating=0.7309803017090071)]
7.1.1.4 Model Evaluation with MSE
# sort out training set
rt1 = Rating(1, 1, 0.5)
rt2 = Rating(1, 2, 0.75)
rt3 = Rating(2, 1, 0.7)
ratings = sc.parallelize([rt1, rt2, rt3])
rates = ratings.map(lambda x: ((x[0], x[1]), x[2]))
rates.collect()
[((1, 1), 0.5), ((1, 2), 0.75), ((2, 1), 0.7)]
# sort out prediction
unrated_RDD = sc.parallelize([(1, 1), (1, 2)])
predictions = model.predictAll(unrated_RDD)
predictions.collect()
[Rating(user=1, product=1, rating=0.4996941937079406), Rating(user=1, product=2, rating=0.7309803017090071)]
preds = predictions.map(lambda x: ((x[0], x[1]), x[2]))
preds.collect()
[((1, 1), 0.4996941937079406), ((1, 2), 0.7309803017090071)]
rates_preds = rates.join(preds)
rates_preds.collect()
[((1, 1), (0.5, 0.4996941937079406)), ((1, 2), (0.75, 0.7309803017090071))]
MSE = rates_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
MSE
0.00018092122028433118
Realistic Example¶
In this exercise, the goal is to develop a simple movie recommendation system using PySpark MLlib using a subset of MovieLens 100k dataset.
We first load the MovieLens data (ratings.csv) into RDD and from each line in the RDD which is formatted as userId,movieId,rating,timestamp
, we need to map the MovieLens data to a Ratings object (userID, productID, rating
) after removing timestamp column and finally split the RDD into training and test RDDs.
# Load the data into RDD
data = spark.read.csv("./Data/ratings.csv", header=True, inferSchema=True)
data.take(10)
[Row(userId=1, movieId=296, rating=5.0, timestamp=1147880044), Row(userId=1, movieId=306, rating=3.5, timestamp=1147868817), Row(userId=1, movieId=307, rating=5.0, timestamp=1147868828), Row(userId=1, movieId=665, rating=5.0, timestamp=1147878820), Row(userId=1, movieId=899, rating=3.5, timestamp=1147868510), Row(userId=1, movieId=1088, rating=4.0, timestamp=1147868495), Row(userId=1, movieId=1175, rating=3.5, timestamp=1147868826), Row(userId=1, movieId=1217, rating=3.5, timestamp=1147878326), Row(userId=1, movieId=1237, rating=5.0, timestamp=1147868839), Row(userId=1, movieId=1250, rating=4.0, timestamp=1147868414)]
# Transform the ratings RDD
ratings_final = sc.parallelize(list(map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])),data.collect())))
ratings_final.take(10)
[Rating(user=1, product=296, rating=5.0), Rating(user=1, product=306, rating=3.5), Rating(user=1, product=307, rating=5.0), Rating(user=1, product=665, rating=5.0), Rating(user=1, product=899, rating=3.5), Rating(user=1, product=1088, rating=4.0), Rating(user=1, product=1175, rating=3.5), Rating(user=1, product=1217, rating=3.5), Rating(user=1, product=1237, rating=5.0), Rating(user=1, product=1250, rating=4.0)]
# Split the data into training and test
data_training, data_test = ratings_final.randomSplit([0.8, 0.2])
model = ALS.train(data_training, rank=10, iterations=10,nonnegative=True,lambda_=0.02)
# Drop the ratings column from test set
data_test_without_rating = data_test.map(lambda p: (p[0], p[1]))
#data_test_without_rating =sc.parallelize(data_test_without_rating.collect())
# Predict the model
predictions = model.predictAll(data_test_without_rating)
predictions.take(10)
[Rating(user=3, product=6333, rating=3.1514603420775305), Rating(user=2, product=6287, rating=1.4676445654578654), Rating(user=3, product=589, rating=2.5211682372938933), Rating(user=2, product=4306, rating=1.9568594087501565), Rating(user=3, product=4874, rating=2.8363142389187805), Rating(user=3, product=4993, rating=3.1514603420775305), Rating(user=2, product=7090, rating=1.9568594087501565), Rating(user=2, product=1682, rating=1.9568594087501565), Rating(user=2, product=5349, rating=1.9568594087501565), Rating(user=3, product=4023, rating=1.8908761472590843)]
Lets calculate MSE for predicted ALS:
# Prepare ratings data
rates = ratings_final.map(lambda r: ((r[0], r[1]), r[2]))
rates.take(10)
[((1, 296), 5.0), ((1, 306), 3.5), ((1, 307), 5.0), ((1, 665), 5.0), ((1, 899), 3.5), ((1, 1088), 4.0), ((1, 1175), 3.5), ((1, 1217), 3.5), ((1, 1237), 5.0), ((1, 1250), 4.0)]
# Prepare predictions data
preds = predictions.map(lambda r: ((r[0], r[1]), r[2]))
# Join the ratings data with predictions data
rates_and_preds = rates.join(preds)
rates_and_preds.take(10)
[((2, 480), (2.0, 0.9784297043750783)), ((2, 1682), (4.5, 1.9568594087501565)), ((2, 4306), (4.5, 1.9568594087501565)), ((2, 5816), (5.0, 1.7122520209427368)), ((2, 7090), (2.0, 1.9568594087501565)), ((2, 8360), (4.0, 4.098430905771567)), ((3, 589), (4.0, 2.5211682372938933)), ((3, 4023), (4.0, 1.8908761472590843)), ((3, 4993), (4.0, 3.1514603420775305)), ((3, 6333), (4.0, 3.1514603420775305))]
# Calculate and print MSE
MSE = rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error of the model for the test data = {:.2f}".format(MSE))
Mean Squared Error of the model for the test data = 3.54
Classification¶
PySpark MLlib contains two specific data types Vectors:
- Dense Vector: store all their entries in an array of floating point numbers
from pyspark.mllib.linalg import Vectors
denseVec = Vectors.dense([2.0, 4.0, 8.0])
print(denseVec)
[2.0,4.0,8.0]
- Sparse Vector: store only the nonzero values and their indices
sparseVec = Vectors.sparse(3, {6: 1.0, 2: 4.5})
print(sparseVec)
(3,[2,6],[4.5,1.0])
LabelledPoint()
in PySpark MLlib
A LabeledPoint
is a wrapper for input features and predicted value. A label is either 0 (negative) or 1 (positive) for a binary classification of Logistic Regression,
from pyspark.mllib.regression import LabeledPoint
negative = LabeledPoint(0.0, [3.0, 2.0, 5.0])
positive = LabeledPoint(1.0, [2.0, 3.0, 1.0])
print(positive)
print(negative)
(1.0,[2.0,3.0,1.0]) (0.0,[3.0,2.0,5.0])
- Logistic Regression
Logistic Regression in Pyspark MLlib can be done by LogisticRegressionWithLBFGS
class.
data_lr = [
LabeledPoint(0.0, [2.0, 1.0]),
LabeledPoint(1.0, [1.0, 2.0]),
]
RDD_lr = sc.parallelize(data_lr)
RDD_lr.collect()
[LabeledPoint(0.0, [2.0,1.0]), LabeledPoint(1.0, [1.0,2.0])]
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
lr = LogisticRegressionWithLBFGS.train(RDD_lr)
print(lr.predict([2.0, 1.0]))
0
print(lr.predict([1.0, 2.0]))
1
Realistic Example¶
Logistic Regression¶
Logistic regression is applied to predict churn rate. churn_retain.csv data set can be downloaded from Kaggle: the target feature is Exited (0 and 1). Exited=1 means churn and Exited=0 means retain. The goal is to train a logistic regression model for predicting churn/retain.
# Load the data into RDD
data = spark.read.csv("./Data/churn_retain.csv", header=True, inferSchema=True)
data.take(5)
[Row(RowNumber=1, CustomerId=15634602, Surname='Hargrave', CreditScore=619, Geography='France', Gender='Female', Age=42, Tenure=2, Balance=0.0, NumOfProducts=1, HasCrCard=1, IsActiveMember=1, EstimatedSalary=101348.88, Exited=1), Row(RowNumber=2, CustomerId=15647311, Surname='Hill', CreditScore=608, Geography='Spain', Gender='Female', Age=41, Tenure=1, Balance=83807.86, NumOfProducts=1, HasCrCard=0, IsActiveMember=1, EstimatedSalary=112542.58, Exited=0), Row(RowNumber=3, CustomerId=15619304, Surname='Onio', CreditScore=502, Geography='France', Gender='Female', Age=42, Tenure=8, Balance=159660.8, NumOfProducts=3, HasCrCard=1, IsActiveMember=0, EstimatedSalary=113931.57, Exited=1), Row(RowNumber=4, CustomerId=15701354, Surname='Boni', CreditScore=699, Geography='France', Gender='Female', Age=39, Tenure=1, Balance=0.0, NumOfProducts=2, HasCrCard=0, IsActiveMember=0, EstimatedSalary=93826.63, Exited=0), Row(RowNumber=5, CustomerId=15737888, Surname='Mitchell', CreditScore=850, Geography='Spain', Gender='Female', Age=43, Tenure=2, Balance=125510.82, NumOfProducts=1, HasCrCard=1, IsActiveMember=1, EstimatedSalary=79084.1, Exited=0)]
# Split the data into training and test
data_training, data_test = data.randomSplit([0.8, 0.2])
# Filter data based on retain and chain
data_0=data_training.filter(data_training.Exited ==0)
data_1=data_training.filter(data_training.Exited ==1)
data_0.collect()[0][9]
1
# prepare training set
retain= list(map(lambda l: LabeledPoint(0.0,[int(l.CreditScore), float(l.Age), float(l.Tenure), float(l.Balance)
, int(l.NumOfProducts), int(l.HasCrCard), int(l.IsActiveMember),
int(l.EstimatedSalary)]),data_0.collect()))
churn= list(map(lambda l: LabeledPoint(1.0,[int(l.CreditScore), float(l.Age), float(l.Tenure), float(l.Balance)
, int(l.NumOfProducts), int(l.HasCrCard), int(l.IsActiveMember),
int(l.EstimatedSalary)]),data_1.collect()))
data_to_train = retain+churn
RDD_to_train = sc.parallelize(data_to_train)
# Train lr model
lr = LogisticRegressionWithLBFGS.train(RDD_to_train, iterations=10)
lr
pyspark.mllib.LogisticRegressionModel: intercept = 0.0, numFeatures = 8, numClasses = 2, threshold = 0.5
data_test_to_predict=sc.parallelize(list(map(lambda l: [int(l.CreditScore), float(l.Age), float(l.Tenure), float(l.Balance)
, int(l.NumOfProducts), int(l.HasCrCard), int(l.IsActiveMember),
int(l.EstimatedSalary)],data_test.collect())))
# Predict test set using trained model
predictions = lr.predict(data_test_to_predict)
y_test_pred=predictions.collect()
y_test_label=list(map(lambda l: int(l.Exited),data_test.collect()))
from sklearn.metrics import accuracy_score
from sklearn.metrics import recall_score
from sklearn.metrics import roc_curve, auc
from sklearn.metrics import precision_recall_curve
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_score, recall_score
conf_mx=confusion_matrix(y_test_label,y_test_pred)
print('Confusion Matrix (Test):\n',conf_mx)
print('------------------------------')
acr=accuracy_score(y_test_label,y_test_pred)
print('\n Accuracy (Test):\n',acr)
print('------------------------------')
prec=precision_score(y_test_label,y_test_pred) # == TP/(TP+FP)
print('\n Precision (Test):\n',prec)
print('------------------------------')
reca=recall_score(y_test_label,y_test_pred) # == TP/(TP+FN) )
print('\n Recall (Test):\n',reca)
#print('------------------------------')
#fpr, tpr, thresold = roc_curve(y_test_label,y_test_pred)
#roc_auc = auc(fpr, tpr)
#print('\n AUC (Test):\n',roc_auc)
Confusion Matrix (Test): [[1530 58] [ 348 73]] ------------------------------ Accuracy (Test): 0.7979094076655052 ------------------------------ Precision (Test): 0.5572519083969466 ------------------------------ Recall (Test): 0.17339667458432304
Random Forest¶
from pyspark.mllib.tree import RandomForest
rf = RandomForest.trainClassifier(RDD_to_train, 2, {}, 3, seed=42)
rf
TreeEnsembleModel classifier with 3 trees
# Predict test set using trained model
predictions = rf.predict(data_test_to_predict)
y_test_pred=predictions.collect()
y_test_label=list(map(lambda l: int(l.Exited),data_test.collect()))
conf_mx=confusion_matrix(y_test_label,y_test_pred)
print('Confusion Matrix (Test):\n',conf_mx)
print('------------------------------')
acr=accuracy_score(y_test_label,y_test_pred)
print('\n Accuracy (Test):\n',acr)
print('------------------------------')
prec=precision_score(y_test_label,y_test_pred) # == TP/(TP+FP)
print('\n Precision (Test):\n',prec)
print('------------------------------')
reca=recall_score(y_test_label,y_test_pred) # == TP/(TP+FN) )
print('\n Recall (Test):\n',reca)
#print('------------------------------')
#fpr, tpr, thresold = roc_curve(y_test_label,y_test_pred)
#roc_auc = auc(fpr, tpr)
#print('\n AUC (Test):\n',roc_auc)
Confusion Matrix (Test): [[1581 7] [ 365 56]] ------------------------------ Accuracy (Test): 0.8148332503733201 ------------------------------ Precision (Test): 0.8888888888888888 ------------------------------ Recall (Test): 0.1330166270783848
- Home
-
- Prediction of Movie Genre by Fine-tunning GPT
- Fine-tunning BERT for Fake News Detection
- Covid Tweet Classification by Fine-tunning BART
- Semantic Search Using BERT
- Abstractive Semantic Search by OpenAI Embedding
- Fine-tunning GPT for Style Completion
- Extractive Question-Answering by BERT
- Fine-tunning T5 Model for Abstract Title Prediction
- Image Captioning by Fine-tunning ViT
- Build Serverless ChatGPT API
- Statistical Analysis in Python
- Clustering Algorithms
- Customer Segmentation
- Time Series Forecasting
- PySpark Fundamentals for Big Data
- Predict Customer Churn
- Classification with Imbalanced Classes
- Feature Importance
- Feature Selection
- Text Similarity Measurement
- Dimensionality Reduction
- Prediction of Methane Leakage
- Imputation by LU Simulation
- Histogram Uncertainty
- Delustering to Improve Preferential Sampling
- Uncertainty in Spatial Correlation
-
- Machine Learning Overview
- Python and Pandas
- Main Steps of Machine Learning
- Classification
- Model Training
- Support Vector Machines
- Decision Trees
- Ensemble Learning & Random Forests
- Artificial Neural Network
- Deep Neural Network (DNN)
- Unsupervised Learning
- Multicollinearity
- Introduction to Git
- Introduction to R
- SQL Basic to Advanced Level
- Develop Python Package
- Introduction to BERT LLM
- Exploratory Data Analysis
- Object Oriented Programming in Python
- Natural Language Processing
- Convolutional Neural Network
- Publications