Summary

In this notebook, fundamental of big data in PySpark is presented. First, it will show how to install PySpark on local machine for testing, debugging and demonstration. Then, PySpark DataFrame and PySparkSQL will be discussed. Finally, application of machine learning with PySpark MLib library will be presented with synthetic and realistic examples.

Python functions and data files needed to run this notebook are available via this link.

What is big Data

Big data is a term used to refer to the study and applications of data sets that are too complex for traditional data-processing so/ware - Wikipedia

  • Three Versions of Big Data

    1. Volume: Size of the data. Volumes of data that can reach unprecedented heights in fact. It is now not uncommon for large companies to have Terabytes – and even Petabytes – of data in storage devices and on servers.

    2. Variety: Different sources and formats. Data was once collected from one place and delivered in one format. Once taking the shape of database files - such as, excel, csv and access - it is now being presented in non-traditional forms, like video, text, pdf, and graphics on social media, as well as via tech such as wearable devices.

    3. Velocity: Speed of the data. Velocity essentially measures how fast the data is coming in. Some data will come in in real-time, whereas other will come in fits and starts, sent to us in batches.

      image.png

retrieved from https://bigdataldn.com/news/big-data-the-3-vs-explained/

  • Concepts and Terminology of Big Data

    1. Clustered computing: Collection of resources of multiple machines
    2. Parallel computing: Simultaneous computation
    3. Distributed computing: Collection of nodes (networked computers) that run in parallel
    4. Batch processing: Breaking the job into small pieces and running them on individual machines
    5. Real-time processing: Immediate processing of data
  • Big Data Processing Systems
  1. Hadoop/MapReduce: Scalable and fault tolerant framework written in Java used to efficiently store and process large datasets ranging in size from gigabytes to petabytes of data. Instead of using one large computer to store and process the data, Hadoop allows clustering multiple computers to analyze massive datasets in parallel more quickly:
    • Open Source
    • Batch processing
  1. Apache Spark: an open-source, distributed processing system used for big data workloads. It utilizes in-memory caching, and optimized query execution for fast analytic queries against data of any size:
    • Open source
    • Both batch and real-time data processing

In this notebook, we present Apache Spark.

  • Main Features of Apache Spark

    • Distributed cluster computing framework
    • Efficient in-memory computations for large data sets. 200 times faster in memory
    • Lightning fast data processing framework.
    • Mainly written in Scala language but provides support for Java, Scala, Python, R and SQL
  • Apache Spark Components

    1. Spark SQL
    2. MLlib Machine Learning
    3. GraphX (manipulating graphs)
    4. Spark streaming
  • Deployment of Spark Modes

    1. Local mode: It has a single machine such as pc, laptop

      • Local model convenient for testing, debugging and demonstration
    2. Cluster mode: It has a set of pre-defined machines

      • Suitable for production

The workflow is to start with local model and transition to cluster mode. Changing code is not require for this transition. Here we apply local mode.

Spark with Python (PySpark)

Apache Spark is written in Scala. Apache Spark Community released PySpark to support Python with Spark. PySpark is similar computation speed and power as Scala. PySpark APIs are similar to Pandas and Scikit-learn

  • Three different Spark shells:
    • Spark-shell for Scala
    • PySpark-shell for Python
    • SparkR for R

PySpark Shell is the Python-based command line tool. PySpark shell allows data scientists interface with Spark data structures. PySpark shell support connecting to a cluster. Figure below shows Pyspark shell.

image.png

SparkContext

  • SparkContext is an entry point into the world of Spark
  • An entry point is a way of connecting to Spark cluster
  • An entry point is like a key to the house
  • PySpark has a default SparkContext called sc
  • Version: To retrieve SparkContext version

Install PySpark

Prerequest to install pyspark is to install java JDK

image.png

Next, install pyspark:

conda install pyspark

  • Create Environment Path

image-2.png

  1. "JAVA_HOME" as "C:\Program Files\Java\jdk1.8.0_341"
  1. "PATH" = "C:\Program Files\Java\jdk1.8.0_341\bin"
  1. "SPARK_HOME" = "C:\spark3\spark-3.3.1-bin-hadoop3\bin"
  1. "HADOOP_HOME" = "C:\spark3\spark-3.3.1-bin-hadoop3\bin"

Finally, run the code below to install pyspark on your environment:

C:\Users\MehdiRezvandehy>conda install -c conda-forge findspark

In [1]:
import findspark
In [2]:
import findspark
findspark.init()
findspark.find()
Out[2]:
'C:\\spark\\spark-3.3.1-bin-hadoop3'
In [3]:
## import pyspark
import pyspark
print(pyspark.__version__)
# 3.3.0
3.3.1
In [4]:
from pyspark import SparkContext
sc=SparkContext(master='local')
In [5]:
# Verify SparkContext (SparkContext called as sc)
print(sc)

# Print Spark version
print(sc.version)
<SparkContext master=local appName=pyspark-shell>
3.3.1
  • Python Version: To retrieve Python version of SparkContext
In [6]:
sc.pythonVer
Out[6]:
'3.9'
  • Master: URL of the cluster or “local” string to run in local mode of SparkContext
In [7]:
sc.master
Out[7]:
'local'
  • How to Load data in PySpark

SparkContext's parallelize() method is used to load data in PySpark:

In [8]:
rdd = sc.parallelize([1,2,3,4,5])

SparkContext's textFile() method is used to load a text file in PySpark:

In [9]:
rdd2 = sc.textFile("test.txt")
In [10]:
# Print the version of SparkContext
print("The version of Spark Context in the PySpark shell is", sc.version)

# Print the Python version of SparkContext
print("The Python version of Spark Context in the PySpark shell is", sc.pythonVer)

# Print the master of SparkContext
print("The master of Spark Context in the PySpark shell is", sc.master)
The version of Spark Context in the PySpark shell is 3.3.1
The Python version of Spark Context in the PySpark shell is 3.9
The master of Spark Context in the PySpark shell is local
In [11]:
# Create a Python list of numbers from 1 to 100 
numb = range(0, 20)

# Load the list into PySpark  
spark_data = sc.parallelize(numb)

Functional Programming in Python

  • Python Anonymous Functions

    • Lambda functions are anonymous functions in Python
    • Very powerful and used in Python. Quite efficient with map() and filter()
    • Lambda functions create functions to be called later similar to def
    • It returns the functions without any name (i.e anonymous)
  • Lambda function
In [12]:
# general expression
lambda arguments: expression
Out[12]:
<function __main__.<lambda>(arguments)>
In [13]:
times2 = lambda x: x * 2
print(times2(3))
6
In [14]:
# Compare with def 
def square(x):
    return x ** 2

s = lambda x: x ** 2
print(s(10))
print(square(10))
100
100
  • lambda does not need return statement
  • lambda function can be put anywhere
  • Lambda function with map()
  • map() function takes a function and a list and returns a new list which contains items returned by that function for each item
    • map(function, list)
In [15]:
items = [0, 2, 5, 6]
list(map(lambda x: x*2 , items))
Out[15]:
[0, 4, 10, 12]

We should always have map within a list to show the result.

  • Lambda function with filter()
  • filter() function takes a function and a list and returns a new list for which the function evaluates as true.

filter(function, list)

In [16]:
items = [0, 2, 5, 6]
list(filter(lambda x: (True if (x%2 == 0) else False), items))
Out[16]:
[0, 2, 6]
In [17]:
items = [0, 2, 5, 6]
list(filter(lambda x: (x%2 == 0), items))
Out[17]:
[0, 2, 6]

PySpark RDD

  • RDD = Resilient Distributed Datasets

image.png

RDD (Resilient Distributed Datasets):

  • Resilient: Ability to withstand failures
  • Distributed: Spanning across multiple machines
  • Datasets: Collection of partitioned data e.g, Arrays, Tables, Tuples etc.,
  • Parallelizing

    • parallelize() is used for creating RDDs from python lists.
In [18]:
RDDnum = sc.parallelize([1,2,3,4])
In [19]:
RDDhello = sc.parallelize("Hello world")
  • From external datasets

PySpark can easily create RDDs from files that are stored in external storage devices such as HDFS (Hadoop Distributed File System), Amazon S3 buckets, etc:

  • textFile() for creating RDDs from external datasets
In [20]:
RDDfile = sc.textFile("README.md", minPartitions = 6)
In [21]:
type(RDDfile)
Out[21]:
pyspark.rdd.RDD

PySpark Operation Overview

image.png

  • Transformations creates new RDDS
  • Actions performs computation on the RDDs

RDD Transformations

  • Transformations follow Lazy evaluation

image.png

  • Basic RDD Transformations

    • map() , filter() , flatMap() , and union()
  • map() Transformation

    • map() transformation applies a function to all elements in the RDD

image-2.png

In [22]:
RDD = sc.parallelize([1,2,3,4])
RDD_map = RDD.map(lambda x: x * x)
  • filter() Transformation

    • Filter transformation returns a new RDD with only the elements that pass the condition
In [23]:
RDD = sc.parallelize([1,2,3,4])
RDD_filter = RDD.filter(lambda x: x > 2)
  • flatMap() Transformation

    • flatMap() transformation returns multiple values for each element in the original RDD

image-2.png

In [24]:
RDD = sc.parallelize(["hello world", "how are you"])
RDD_flatmap = RDD.flatMap(lambda x: x.split(" "))
  • union() Transformation
    • PySpark union() is a transformation in PySpark that is used to merge two or more data frames in a PySpark application. The union operation is applied to spark data frames with the same schema and structure.

image.png

In [25]:
#input_RDD = sc.textFile("logs.txt")
#error_RDD = input_RDD.filter(lambda x: "error" in x.split())
#warnings_RDD = input_RDD.filter(lambda x: "warnings" in x.split())
#combined_RDD = error_RDD.union(warnings_RDD)

RDD Actions

  • RDD actions are operations that return non-RDD values, since RDD's are lazy they do not execute the transformation functions until we call actions. hence, all these functions trigger the transformations to execute and finally returns the value of the action functions to the driver program. Basic RDD Actions
    • collect()
    • take(N)
    • first()
    • count()
  • collect() and take() Actions

    • collect() return all the elements of the dataset as an array
    • take(N) returns an array with the ,first N elements of the dataset
In [26]:
RDD = sc.parallelize(["hello world", "how are you doing"])
flatmap_RDD = RDD.flatMap(lambda x: x.split(" "))
In [27]:
flatmap_RDD.collect()
Out[27]:
['hello', 'world', 'how', 'are', 'you', 'doing']
In [28]:
flatmap_RDD.take(3)
Out[28]:
['hello', 'world', 'how']
  • first() and count() Actions

    • first() prints the first element of the RDD
In [29]:
flatmap_RDD.first()
Out[29]:
'hello'
In [30]:
flatmap_RDD.count()
Out[30]:
6

Pair RDD

We usually have key/value pairs in real life datasets. Each row is a key and map to one or more values. Pair RDD is a special data structure to work with this kind of datasets.

There are two common ways to create pair RDDs:

1. From a list of key-value tuple

In [31]:
my_tuple = [('Ali', 15), ('John', 18), ('Peter', 68)]
tuple_pairRDD = sc.parallelize(my_tuple)
In [32]:
tuple_pairRDD.collect()
Out[32]:
[('Ali', 15), ('John', 18), ('Peter', 68)]

2. From a regular RDD

In [33]:
my_list = ['Ali 15', 'John 18', 'Peter 68']
RDD_regular = sc.parallelize(my_list)
RDD_pairRDD = RDD_regular.map(lambda s: (s.split(' ')[0], s.split(' ')[1]))
In [34]:
RDD_pairRDD.collect()
Out[34]:
[('Ali', '15'), ('John', '18'), ('Peter', '68')]
  • Transformations on pair RDDs

All regular transformations work on pair RDD. Have to pass functions that operate on key value pairs rather than on individual elements.

  • Examples of paired RDD Transformations:

    • reduceByKey(func): Combine values with the same key
    • groupByKey(): Group values with the same key
    • sortByKey(): Return an RDD sorted by the key
    • join(): Join two pair RDDs based on their key

Below explains each paired RDD Transformations.

  • reduceByKey() transformation: it combines values with the same key.
In [35]:
regularRDD = sc.parallelize([ ("Ronaldo", 12),("Messi", 15),("Neymar", 36),("Ronaldo", 8),("Messi", 3)])
reducebykey_pairRDD = regularRDD.reduceByKey(lambda x,y : x + y)
reducebykey_pairRDD.collect()
Out[35]:
[('Ronaldo', 20), ('Messi', 18), ('Neymar', 36)]
  • sortByKey() transformation: it orders pair RDD by key
In [36]:
reducebykey_pairRDD_sort = reducebykey_pairRDD.map(lambda x: (x[1], x[0]))
reducebykey_pairRDD_sort.sortByKey(ascending=False).collect()
Out[36]:
[(36, 'Neymar'), (20, 'Ronaldo'), (18, 'Messi')]
  • groupByKey() transformation: it groups all the values with the same key in the pair RDD.
In [37]:
airprts = [("US", "JFK1"),("UK", "LHYR"),("US", "FOSQ"),("FR", "CYDG"),("US", "S_FO")]
regularRDD = sc.parallelize(airprts)
pairRDD_groupby = regularRDD.groupByKey().collect()
for conts, airs in pairRDD_groupby:
    print(conts, list(airs))
US ['JFK1', 'FOSQ', 'S_FO']
UK ['LHYR']
FR ['CYDG']
  • join() transformation: it joins two pair RDDs based on their key.
In [38]:
RDD1 = sc.parallelize([("Ronaldo", 13),("Messi", 12),("Neymar", 32)])
RDD2 = sc.parallelize([("Neymar", 39),("Ronaldo", 10),("Messi", 89)])
In [39]:
RDD1.join(RDD2).collect()
Out[39]:
[('Ronaldo', (13, 10)), ('Messi', (12, 89)), ('Neymar', (32, 39))]

Advanced RDD Actions

  • reduce(): it is used for aggregating the elements of a regular RDD. The function should be commutative (changing the order of the operands does not change the result.
In [40]:
x = [4,3,2,1]
RDD = sc.parallelize(x)
RDD.reduce(lambda x, y : x + y)
Out[40]:
10
  • saveAsTextFile(): it saves the path of a file and writes the content of the RDD to that file. The path is considered as a directory, and multiple outputs will be produced in that directory. This is how Spark becomes able to write output from multiple codes.
In [41]:
#RDD.saveAsTextFile("tempFile")
  • coalesce() method can be used to save RDD as a single text file.
In [42]:
#RDD.coalesce(1).saveAsTextFile("tempFile")
  • countByKey(): it only available for type (K, V). It counts the number of elements for each key.
In [43]:
rdd = sc.parallelize([("a", 2), ("c", 10), ("a", 4), ("b", 3), ("c", 6), ("a", 1), ("b", 8)])
for keys, vals in rdd.countByKey().items():
    print(keys, vals)
a 3
c 2
b 2
  • collectAsMap(): it returns the key-value pairs in the RDD as a dictionary
In [44]:
sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
Out[44]:
{1: 2, 3: 4}

PySpark DataFrames & PySpark SQL

  • PySpark SQL is a Spark library for structured data. It provides more information about the structure of data and computation.

  • PySpark DataFrames support both SQL queries (SELECT * from table) or expression methods (df.select()).

  • SparkContext is the main entry point for creating RDDs
  • SparkSession is used to create DataFrame, register DataFrames, execute SQL queries

PySpark DataFrames

  • Two different methods of creating DataFrames in PySpark
    • From existing RDDs using SparkSession's createDataFrame() method
    • From various data sources (CSV, JSON, TXT) using SparkSession's read method

5.1.1 Create DataFrame from RDD

In [45]:
iphones_RDD = sc.parallelize([
("8Plus", 2017, 6.23, 3.07, 7.12),
("XR", 2018, 5.94, 2.98, 6.84),    
("XS", 2018, 5.65, 2.79, 6.24),
("X10", 2017, 5.65, 2.79, 6.13)
])
names = ['Model', 'Year', 'Height', 'Width', 'Weight']
In [46]:
from pyspark.sql.session import SparkSession
spark = SparkSession(sc)

iphones_df = spark.createDataFrame(iphones_RDD, schema=names)
iphones_df.show()
+-----+----+------+-----+------+
|Model|Year|Height|Width|Weight|
+-----+----+------+-----+------+
|8Plus|2017|  6.23| 3.07|  7.12|
|   XR|2018|  5.94| 2.98|  6.84|
|   XS|2018|  5.65| 2.79|  6.24|
|  X10|2017|  5.65| 2.79|  6.13|
+-----+----+------+-----+------+

5.1.2 Create DataFrame from reading a CSV/JSON/TXT

In [47]:
df_csv = spark.read.csv("./Data/Customer_Churn.csv", header=True, inferSchema=True)
df_csv.show(5)
+----------+-----------+---------+------+---+------+---------+---------------+------+
|CustomerId|CreditScore|Geography|Gender|Age|Tenure|  Balance|EstimatedSalary|Exited|
+----------+-----------+---------+------+---+------+---------+---------------+------+
|  15634602|        619|   France|Female| 42|     2|      0.0|      101348.88|     1|
|  15647311|        608|    Spain|Female| 41|     1| 83807.86|      112542.58|     0|
|  15619304|        502|   France|Female| 42|     8| 159660.8|      113931.57|     1|
|  15701354|        699|   France|Female| 39|     1|      0.0|       93826.63|     0|
|  15737888|        850|    Spain|Female| 43|     2|125510.82|        79084.1|     0|
+----------+-----------+---------+------+---+------+---------+---------------+------+
only showing top 5 rows

  • Path to the file and two optional parameters
  • Two optional parameters
    • header=True , inferSchema=True
In [48]:
df_json = spark.read.json("./Data/Customer_Churn.json", multiLine = "true")
df_json.show(5)
+---+---------+-----------+----------+---------------+------+------+---------+------+
|Age|  Balance|CreditScore|CustomerId|EstimatedSalary|Exited|Gender|Geography|Tenure|
+---+---------+-----------+----------+---------------+------+------+---------+------+
| 42|      0.0|        619|  15634602|      101348.88|     1|Female|   France|     2|
| 41| 83807.86|        608|  15647311|      112542.58|     0|Female|    Spain|     1|
| 42| 159660.8|        502|  15619304|      113931.57|     1|Female|   France|     8|
| 39|      0.0|        699|  15701354|       93826.63|     0|Female|   France|     1|
| 43|125510.82|        850|  15737888|        79084.1|     0|Female|    Spain|     2|
+---+---------+-----------+----------+---------------+------+------+---------+------+
only showing top 5 rows

In [49]:
df_txt = spark.read.text("./Data/movies.txt")
df_txt.show(5)
+--------------------+
|               value|
+--------------------+
|title\tdescriptio...|
|The Girl from Mon...|
|Every Jack has a ...|
|Dorian Gray\t"A v...|
|How to Lose Frien...|
+--------------------+
only showing top 5 rows

5.1.3 DataFrame operators in PySpark

  • PySpark DataFrame Transformations:

    • select()

    • filter()

    • groupby()

    • orderby()

    • dropDuplicates()

    • withColumnRenamed()

  • PySpark DataFrame Actions :

    • printSchema()
    • head()
    • show()
    • count()
    • columns and describe()
  • select()
    • it transformation subsets the columns in the DataFrame.
In [50]:
df_Balance = df_csv.select('Balance')
In [51]:
df_Balance.show(5)
+---------+
|  Balance|
+---------+
|      0.0|
| 83807.86|
| 159660.8|
|      0.0|
|125510.82|
+---------+
only showing top 5 rows

  • filter()
    • it transformation filters out the rows based on a condition.
In [52]:
df_Balance_filter = df_Balance.filter(df_Balance.Balance > 200000)
df_Balance_filter.show(5)
+---------+
|  Balance|
+---------+
| 213146.2|
|211774.31|
|209767.31|
|214346.96|
|203715.15|
+---------+
only showing top 5 rows

  • groupby()
    • It groups a variable and usually followed with aggregation operation such as count().
In [53]:
df_Tenure_group = df_csv.groupby('Tenure')
df_Tenure_group.count().show(5)
+------+-----+
|Tenure|count|
+------+-----+
|     1| 1035|
|     6|  967|
|     3| 1009|
|     5| 1012|
|     9|  984|
+------+-----+
only showing top 5 rows

  • orderby()
    • It sorts the DataFrame based on one or more columns.
In [54]:
df_Tenure_group.count().orderBy('Tenure').show(5)
+------+-----+
|Tenure|count|
+------+-----+
|     0|  413|
|     1| 1035|
|     2| 1048|
|     3| 1009|
|     4|  989|
+------+-----+
only showing top 5 rows

  • dropDuplicates()
    • It removes the duplicate rows of a DataFrame.
In [55]:
test_df_no_dup = df_csv.select('Gender','Age','Tenure').dropDuplicates()
test_df_no_dup.show(5)
+------+---+------+
|Gender|Age|Tenure|
+------+---+------+
|Female| 37|     1|
|  Male| 35|     4|
|Female| 77|     6|
|  Male| 40|     5|
|  Male| 42|     7|
+------+---+------+
only showing top 5 rows

  • withColumnRenamed
    • It renames a column in the DataFrame.
In [56]:
df_sex = df_csv.withColumnRenamed('Gender', 'Sex')
df_sex.show(5)
+----------+-----------+---------+------+---+------+---------+---------------+------+
|CustomerId|CreditScore|Geography|   Sex|Age|Tenure|  Balance|EstimatedSalary|Exited|
+----------+-----------+---------+------+---+------+---------+---------------+------+
|  15634602|        619|   France|Female| 42|     2|      0.0|      101348.88|     1|
|  15647311|        608|    Spain|Female| 41|     1| 83807.86|      112542.58|     0|
|  15619304|        502|   France|Female| 42|     8| 159660.8|      113931.57|     1|
|  15701354|        699|   France|Female| 39|     1|      0.0|       93826.63|     0|
|  15737888|        850|    Spain|Female| 43|     2|125510.82|        79084.1|     0|
+----------+-----------+---------+------+---+------+---------+---------------+------+
only showing top 5 rows

  • printSchema()
    • It prints the types of columns in the DataFrame.
In [57]:
df_csv.printSchema()
root
 |-- CustomerId: integer (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)

  • columns actions
    • It prints the columns of a DataFrame.
In [58]:
df_csv.columns
Out[58]:
['CustomerId',
 'CreditScore',
 'Geography',
 'Gender',
 'Age',
 'Tenure',
 'Balance',
 'EstimatedSalary',
 'Exited']
  • describe()
    • It computes summary statistics of numerical columns in the DataFrame.
In [59]:
df_csv.select('CreditScore','Age','Tenure','Balance').describe().show(5)
+-------+-----------------+------------------+------------------+-----------------+
|summary|      CreditScore|               Age|            Tenure|          Balance|
+-------+-----------------+------------------+------------------+-----------------+
|  count|            10000|             10000|             10000|            10000|
|   mean|         650.5288|           38.9218|            5.0128|76485.88928799961|
| stddev|96.65329873613035|10.487806451704587|2.8921743770496837|62397.40520238599|
|    min|              350|                18|                 0|              0.0|
|    max|              850|                92|                10|        250898.09|
+-------+-----------------+------------------+------------------+-----------------+

PySpark SQL

  • Executing SQL Queries
    • The SparkSession sql() method executes SQL query
    • sql() method takes a SQL statement as an argument and returns the result as DataFrame
In [60]:
from pyspark.sql.session import SparkSession
spark = SparkSession(sc)
In [61]:
# Create a temporary table from PySpark DataFrame
df_csv.createOrReplaceTempView("table1")
In [62]:
query = '''SELECT CustomerId,
                   Geography,
                   CreditScore
            FROM table1
            limit 5'''
df2 = spark.sql(query)
df2.collect()
Out[62]:
[Row(CustomerId=15634602, Geography='France', CreditScore=619),
 Row(CustomerId=15647311, Geography='Spain', CreditScore=608),
 Row(CustomerId=15619304, Geography='France', CreditScore=502),
 Row(CustomerId=15701354, Geography='France', CreditScore=699),
 Row(CustomerId=15737888, Geography='Spain', CreditScore=850)]
  • Summarizing and grouping data using SQL queries
In [63]:
query = '''SELECT Tenure, 
                  max(Balance) 
           FROM table1 
           GROUP BY Tenure'''
spark.sql(query).show(5)
+------+------------+
|Tenure|max(Balance)|
+------+------------+
|     1|   211774.31|
|     6|   206014.94|
|     3|   250898.09|
|     5|   216109.88|
|     9|   222267.63|
+------+------------+
only showing top 5 rows

Data Visualization in PySpark

Ploting graphs using PySpark DataFrames is done using three methods:

  • pyspark_dist_explore library
  • toPandas()
  • HandySpark library

6.1 Pyspark_dist_explore

There are three functions for Pyspark_dist_explore:

  • hist()
  • distplot()
  • pandas_histogram()
In [64]:
df_csv = spark.read.csv("./Data/Customer_Churn.csv", header=True, inferSchema=True)
In [65]:
df_Tenure = df_csv.select('Tenure')
In [66]:
from pyspark_dist_explore import hist
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
hist(ax,x=df_Tenure, bins=20, color="red")
plt.show()

6.2 Pandas

In [67]:
df_Tenure_pandas = df_csv.toPandas()
In [68]:
df_Tenure_pandas.hist('Tenure', bins=20)
plt.show()

6.3 Pandas DataFrame vs PySpark DataFrame

  • Pandas DataFrames are in-memory, single-server based structures while operations on PySpark run in parallel
  • The result is generated as we apply any operation in Pandas whereas operations in PySpark DataFrame are lazy evaluation
  • Pandas DataFrame as mutable and PySpark DataFrames are immutable

6.4 HandySpark

  • It is a package designed to improve PySpark user experience
In [69]:
from handyspark import *
hdf = df_csv.toHandy()
hdf.cols["Tenure"].hist()
C:\spark\spark-3.3.1-bin-hadoop3\python\pyspark\sql\dataframe.py:148: UserWarning: DataFrame.sql_ctx is an internal property, and will be removed in future releases. Use DataFrame.sparkSession instead.
  warnings.warn(
C:\spark\spark-3.3.1-bin-hadoop3\python\pyspark\sql\dataframe.py:127: UserWarning: DataFrame constructor is internal. Do not directly use it.
  warnings.warn("DataFrame constructor is internal. Do not directly use it.")
Out[69]:
<AxesSubplot: title={'center': 'Tenure'}, xlabel='Tenure'>

PySpark MLlib

pyspark.mllib is Spark’s machine learning (ML) library. Its goal is to make practical machine learning scalable and easy. It can only support RDDs unless you change DataFrames to RDDs.

PySpark MLlib Algorithms are:

  1. Classification (Binary and Multiclass) and Regression:
    • Linear SVMs
    • logistic regression
    • decision trees
    • random forests
    • gradient-boosted trees
    • naive Bayes
    • linear least squares
    • Lasso, ridge regression
    • isotonic regression
  1. Collaborative filtering:
    • Alternating least squares (ALS)
  1. Clustering:
    • K-means
    • Gaussian mixture
    • Bisecting K-means
    • Streaming K-Means

Three C's of machine learning in PySpark MLlib

  • Collaborative filtering (recommender engines): Produce recommendations
  • Classification: Identifying to which of a set of categories a new observation
  • Clustering: Groups data based on similar characteristics

Collaborative filtering

Collaborative filtering is finding users that share common interests. For example, companies like Amazon and Netflix providing recommendations based on users' interest. It is commonly used for recommender systems:

  • User-User Collaborative: Finds users that are similar to the target user
  • Item-Item Collaborative: Finds and recommends items that are similar to items with the target user

In Figure below, if User 1 likes Item A, Item B, and Item C and User 2 likes Item B and Item C, there is a high probability that User 2 also likes Item A and can recommend Item A to User 2.

image.png

Image is retrieved from towardsdatascience.com

Another example below is considering Book recommendation scenario. User A and B have given high ratings to the Book1 and Book2. Then it can assume that they must have similar taste. Therefore, there is a higher probability that User B will like a book that he/she haven’t come across but is rated highly by User A. In this scenario Book3 is rated by User A. But User B haven’t come across it, so the recommendation system recommends Book 3 to User B.

image-2.png

Image is retrieved from medium.com

Rating Class

  • The Rating class is a wrapper around tuple (user, product and rating)
  • Useful for parsing the RDD and creating a tuple of user, product and rating
In [70]:
from pyspark.mllib.recommendation import Rating
rt = Rating(user = 1, product = 2, rating = 6.0)
(rt[0], rt[1], rt[2])
Out[70]:
(1, 2, 6.0)

7.1.1.1 Splitting the data

  • Splitting data into training and testing sets
In [71]:
data = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
trainingSet, testSet=data.randomSplit([0.7, 0.3])
In [72]:
trainingSet.collect()
Out[72]:
[1, 4, 6, 7, 9, 10]
In [73]:
testSet.collect()
Out[73]:
[2, 3, 5, 8]

7.1.1.2 Alternating Least Squares (ALS)

  • Alternating Least Squares (ALS) algorithm in spark.mllib provides collaborative filtering

  • ALS.train(ratings, rank, iterations)

In [74]:
rt1 = Rating(1, 1, 0.5)
rt2 = Rating(1, 2, 0.75)
rt3 = Rating(2, 1, 0.7)
ratings = sc.parallelize([rt1, rt2, rt3])
ratings.collect()
Out[74]:
[Rating(user=1, product=1, rating=0.5),
 Rating(user=1, product=2, rating=0.75),
 Rating(user=2, product=1, rating=0.7)]
In [75]:
from pyspark.mllib.recommendation import ALS
model = ALS.train(ratings, rank=10, iterations=10)

7.1.1.3 predictAll() to Returns RDD of Rating Objects

  • The predictAll() method returns a list of predicted ratings for input user and product pair
  • The method takes in an RDD without ratings to generate the ratings
In [76]:
# unrated RDD for rate prediction
unrated_RDD = sc.parallelize([(1, 1), (1, 2)])
In [77]:
predictions = model.predictAll(unrated_RDD)
predictions.collect()
Out[77]:
[Rating(user=1, product=1, rating=0.4996941937079406),
 Rating(user=1, product=2, rating=0.7309803017090071)]

7.1.1.4 Model Evaluation with MSE

In [78]:
# sort out training set
rt1 = Rating(1, 1, 0.5)
rt2 = Rating(1, 2, 0.75)
rt3 = Rating(2, 1, 0.7)
ratings = sc.parallelize([rt1, rt2, rt3])

rates = ratings.map(lambda x: ((x[0], x[1]), x[2]))
rates.collect()
Out[78]:
[((1, 1), 0.5), ((1, 2), 0.75), ((2, 1), 0.7)]
In [79]:
# sort out prediction
unrated_RDD = sc.parallelize([(1, 1), (1, 2)])
predictions = model.predictAll(unrated_RDD)
predictions.collect()
Out[79]:
[Rating(user=1, product=1, rating=0.4996941937079406),
 Rating(user=1, product=2, rating=0.7309803017090071)]
In [80]:
preds = predictions.map(lambda x: ((x[0], x[1]), x[2]))
preds.collect()
Out[80]:
[((1, 1), 0.4996941937079406), ((1, 2), 0.7309803017090071)]
In [81]:
rates_preds = rates.join(preds)
rates_preds.collect()
Out[81]:
[((1, 1), (0.5, 0.4996941937079406)), ((1, 2), (0.75, 0.7309803017090071))]
In [82]:
MSE = rates_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
MSE
Out[82]:
0.00018092122028433118

Realistic Example

In this exercise, the goal is to develop a simple movie recommendation system using PySpark MLlib using a subset of MovieLens 100k dataset.

We first load the MovieLens data (ratings.csv) into RDD and from each line in the RDD which is formatted as userId,movieId,rating,timestamp, we need to map the MovieLens data to a Ratings object (userID, productID, rating) after removing timestamp column and finally split the RDD into training and test RDDs.

In [83]:
# Load the data into RDD
data = spark.read.csv("./Data/ratings.csv", header=True, inferSchema=True)
data.take(10)
Out[83]:
[Row(userId=1, movieId=296, rating=5.0, timestamp=1147880044),
 Row(userId=1, movieId=306, rating=3.5, timestamp=1147868817),
 Row(userId=1, movieId=307, rating=5.0, timestamp=1147868828),
 Row(userId=1, movieId=665, rating=5.0, timestamp=1147878820),
 Row(userId=1, movieId=899, rating=3.5, timestamp=1147868510),
 Row(userId=1, movieId=1088, rating=4.0, timestamp=1147868495),
 Row(userId=1, movieId=1175, rating=3.5, timestamp=1147868826),
 Row(userId=1, movieId=1217, rating=3.5, timestamp=1147878326),
 Row(userId=1, movieId=1237, rating=5.0, timestamp=1147868839),
 Row(userId=1, movieId=1250, rating=4.0, timestamp=1147868414)]
In [84]:
# Transform the ratings RDD
ratings_final = sc.parallelize(list(map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])),data.collect())))
ratings_final.take(10)
Out[84]:
[Rating(user=1, product=296, rating=5.0),
 Rating(user=1, product=306, rating=3.5),
 Rating(user=1, product=307, rating=5.0),
 Rating(user=1, product=665, rating=5.0),
 Rating(user=1, product=899, rating=3.5),
 Rating(user=1, product=1088, rating=4.0),
 Rating(user=1, product=1175, rating=3.5),
 Rating(user=1, product=1217, rating=3.5),
 Rating(user=1, product=1237, rating=5.0),
 Rating(user=1, product=1250, rating=4.0)]
In [85]:
# Split the data into training and test
data_training, data_test = ratings_final.randomSplit([0.8, 0.2])
In [86]:
model = ALS.train(data_training, rank=10, iterations=10,nonnegative=True,lambda_=0.02)
In [87]:
# Drop the ratings column from test set
data_test_without_rating = data_test.map(lambda p: (p[0], p[1]))
In [88]:
#data_test_without_rating =sc.parallelize(data_test_without_rating.collect())
In [89]:
# Predict the model  
predictions = model.predictAll(data_test_without_rating)
predictions.take(10)
Out[89]:
[Rating(user=3, product=6333, rating=3.1514603420775305),
 Rating(user=2, product=6287, rating=1.4676445654578654),
 Rating(user=3, product=589, rating=2.5211682372938933),
 Rating(user=2, product=4306, rating=1.9568594087501565),
 Rating(user=3, product=4874, rating=2.8363142389187805),
 Rating(user=3, product=4993, rating=3.1514603420775305),
 Rating(user=2, product=7090, rating=1.9568594087501565),
 Rating(user=2, product=1682, rating=1.9568594087501565),
 Rating(user=2, product=5349, rating=1.9568594087501565),
 Rating(user=3, product=4023, rating=1.8908761472590843)]

Lets calculate MSE for predicted ALS:

In [90]:
# Prepare ratings data
rates = ratings_final.map(lambda r: ((r[0], r[1]), r[2]))
rates.take(10)
Out[90]:
[((1, 296), 5.0),
 ((1, 306), 3.5),
 ((1, 307), 5.0),
 ((1, 665), 5.0),
 ((1, 899), 3.5),
 ((1, 1088), 4.0),
 ((1, 1175), 3.5),
 ((1, 1217), 3.5),
 ((1, 1237), 5.0),
 ((1, 1250), 4.0)]
In [91]:
# Prepare predictions data
preds = predictions.map(lambda r: ((r[0], r[1]), r[2]))
In [92]:
# Join the ratings data with predictions data
rates_and_preds = rates.join(preds)
rates_and_preds.take(10)
Out[92]:
[((2, 480), (2.0, 0.9784297043750783)),
 ((2, 1682), (4.5, 1.9568594087501565)),
 ((2, 4306), (4.5, 1.9568594087501565)),
 ((2, 5816), (5.0, 1.7122520209427368)),
 ((2, 7090), (2.0, 1.9568594087501565)),
 ((2, 8360), (4.0, 4.098430905771567)),
 ((3, 589), (4.0, 2.5211682372938933)),
 ((3, 4023), (4.0, 1.8908761472590843)),
 ((3, 4993), (4.0, 3.1514603420775305)),
 ((3, 6333), (4.0, 3.1514603420775305))]
In [93]:
# Calculate and print MSE
MSE = rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error of the model for the test data = {:.2f}".format(MSE))
Mean Squared Error of the model for the test data = 3.54

Classification

PySpark MLlib contains two specific data types Vectors:

  1. Dense Vector: store all their entries in an array of floating point numbers
In [94]:
from pyspark.mllib.linalg import Vectors
denseVec = Vectors.dense([2.0, 4.0, 8.0])
print(denseVec)
[2.0,4.0,8.0]
  1. Sparse Vector: store only the nonzero values and their indices
In [95]:
sparseVec = Vectors.sparse(3, {6: 1.0, 2: 4.5})
print(sparseVec)
(3,[2,6],[4.5,1.0])
  • LabelledPoint() in PySpark MLlib

A LabeledPoint is a wrapper for input features and predicted value. A label is either 0 (negative) or 1 (positive) for a binary classification of Logistic Regression,

In [96]:
from pyspark.mllib.regression import LabeledPoint
negative = LabeledPoint(0.0, [3.0, 2.0, 5.0])
positive = LabeledPoint(1.0, [2.0, 3.0, 1.0])
print(positive)
print(negative)
(1.0,[2.0,3.0,1.0])
(0.0,[3.0,2.0,5.0])
  • Logistic Regression

Logistic Regression in Pyspark MLlib can be done by LogisticRegressionWithLBFGS class.

In [97]:
data_lr = [
LabeledPoint(0.0, [2.0, 1.0]),
LabeledPoint(1.0, [1.0, 2.0]),
]
RDD_lr = sc.parallelize(data_lr)
RDD_lr.collect()
Out[97]:
[LabeledPoint(0.0, [2.0,1.0]), LabeledPoint(1.0, [1.0,2.0])]
In [98]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
lr = LogisticRegressionWithLBFGS.train(RDD_lr)
In [99]:
print(lr.predict([2.0, 1.0]))
0
In [100]:
print(lr.predict([1.0, 2.0]))
1

Realistic Example

Logistic Regression

Logistic regression is applied to predict churn rate. churn_retain.csv data set can be downloaded from Kaggle: the target feature is Exited (0 and 1). Exited=1 means churn and Exited=0 means retain. The goal is to train a logistic regression model for predicting churn/retain.

In [101]:
# Load the data into RDD
data = spark.read.csv("./Data/churn_retain.csv", header=True, inferSchema=True)
data.take(5)
Out[101]:
[Row(RowNumber=1, CustomerId=15634602, Surname='Hargrave', CreditScore=619, Geography='France', Gender='Female', Age=42, Tenure=2, Balance=0.0, NumOfProducts=1, HasCrCard=1, IsActiveMember=1, EstimatedSalary=101348.88, Exited=1),
 Row(RowNumber=2, CustomerId=15647311, Surname='Hill', CreditScore=608, Geography='Spain', Gender='Female', Age=41, Tenure=1, Balance=83807.86, NumOfProducts=1, HasCrCard=0, IsActiveMember=1, EstimatedSalary=112542.58, Exited=0),
 Row(RowNumber=3, CustomerId=15619304, Surname='Onio', CreditScore=502, Geography='France', Gender='Female', Age=42, Tenure=8, Balance=159660.8, NumOfProducts=3, HasCrCard=1, IsActiveMember=0, EstimatedSalary=113931.57, Exited=1),
 Row(RowNumber=4, CustomerId=15701354, Surname='Boni', CreditScore=699, Geography='France', Gender='Female', Age=39, Tenure=1, Balance=0.0, NumOfProducts=2, HasCrCard=0, IsActiveMember=0, EstimatedSalary=93826.63, Exited=0),
 Row(RowNumber=5, CustomerId=15737888, Surname='Mitchell', CreditScore=850, Geography='Spain', Gender='Female', Age=43, Tenure=2, Balance=125510.82, NumOfProducts=1, HasCrCard=1, IsActiveMember=1, EstimatedSalary=79084.1, Exited=0)]
In [102]:
# Split the data into training and test
data_training, data_test = data.randomSplit([0.8, 0.2])
In [103]:
# Filter data based on retain and chain
data_0=data_training.filter(data_training.Exited ==0)
data_1=data_training.filter(data_training.Exited ==1)
In [104]:
data_0.collect()[0][9]
Out[104]:
1
In [105]:
# prepare training set
retain= list(map(lambda l: LabeledPoint(0.0,[int(l.CreditScore), float(l.Age), float(l.Tenure), float(l.Balance)
                                            , int(l.NumOfProducts), int(l.HasCrCard), int(l.IsActiveMember),
                                            int(l.EstimatedSalary)]),data_0.collect()))
churn= list(map(lambda l: LabeledPoint(1.0,[int(l.CreditScore), float(l.Age), float(l.Tenure), float(l.Balance)
                                            , int(l.NumOfProducts), int(l.HasCrCard), int(l.IsActiveMember),
                                            int(l.EstimatedSalary)]),data_1.collect()))

data_to_train = retain+churn
RDD_to_train = sc.parallelize(data_to_train)
In [106]:
# Train lr model
lr = LogisticRegressionWithLBFGS.train(RDD_to_train, iterations=10)
lr
Out[106]:
pyspark.mllib.LogisticRegressionModel: intercept = 0.0, numFeatures = 8, numClasses = 2, threshold = 0.5
In [107]:
data_test_to_predict=sc.parallelize(list(map(lambda l: [int(l.CreditScore), float(l.Age), float(l.Tenure), float(l.Balance)
                                            , int(l.NumOfProducts), int(l.HasCrCard), int(l.IsActiveMember),
                                            int(l.EstimatedSalary)],data_test.collect())))
In [108]:
# Predict test set using trained model  
predictions = lr.predict(data_test_to_predict)
y_test_pred=predictions.collect()
In [109]:
y_test_label=list(map(lambda l: int(l.Exited),data_test.collect()))
In [110]:
from sklearn.metrics import accuracy_score
from sklearn.metrics import recall_score
from sklearn.metrics import roc_curve, auc
from sklearn.metrics import precision_recall_curve
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_score, recall_score
In [111]:
conf_mx=confusion_matrix(y_test_label,y_test_pred)
print('Confusion Matrix (Test):\n',conf_mx)
print('------------------------------')
acr=accuracy_score(y_test_label,y_test_pred)
print('\n Accuracy (Test):\n',acr)
print('------------------------------')
prec=precision_score(y_test_label,y_test_pred) # == TP/(TP+FP) 
print('\n Precision (Test):\n',prec)
print('------------------------------')
reca=recall_score(y_test_label,y_test_pred) # == TP/(TP+FN) ) 
print('\n Recall (Test):\n',reca)
#print('------------------------------')
#fpr, tpr, thresold = roc_curve(y_test_label,y_test_pred)
#roc_auc = auc(fpr, tpr)
#print('\n AUC (Test):\n',roc_auc)
Confusion Matrix (Test):
 [[1530   58]
 [ 348   73]]
------------------------------

 Accuracy (Test):
 0.7979094076655052
------------------------------

 Precision (Test):
 0.5572519083969466
------------------------------

 Recall (Test):
 0.17339667458432304

Random Forest

In [112]:
from pyspark.mllib.tree import RandomForest

rf = RandomForest.trainClassifier(RDD_to_train, 2, {}, 3, seed=42)
rf
Out[112]:
TreeEnsembleModel classifier with 3 trees
In [113]:
# Predict test set using trained model  
predictions = rf.predict(data_test_to_predict)
y_test_pred=predictions.collect()
In [114]:
y_test_label=list(map(lambda l: int(l.Exited),data_test.collect()))
In [115]:
conf_mx=confusion_matrix(y_test_label,y_test_pred)
print('Confusion Matrix (Test):\n',conf_mx)
print('------------------------------')
acr=accuracy_score(y_test_label,y_test_pred)
print('\n Accuracy (Test):\n',acr)
print('------------------------------')
prec=precision_score(y_test_label,y_test_pred) # == TP/(TP+FP) 
print('\n Precision (Test):\n',prec)
print('------------------------------')
reca=recall_score(y_test_label,y_test_pred) # == TP/(TP+FN) ) 
print('\n Recall (Test):\n',reca)
#print('------------------------------')
#fpr, tpr, thresold = roc_curve(y_test_label,y_test_pred)
#roc_auc = auc(fpr, tpr)
#print('\n AUC (Test):\n',roc_auc)
Confusion Matrix (Test):
 [[1581    7]
 [ 365   56]]
------------------------------

 Accuracy (Test):
 0.8148332503733201
------------------------------

 Precision (Test):
 0.8888888888888888
------------------------------

 Recall (Test):
 0.1330166270783848