PySpark DataFrames: a Comprehensive Guide

Practical recipes for data analysis using Apache Spark and Python

Reza Bagheri
AI Advances

--

Image by Author

PySpark is an interface for Apache Spark in Python. It combines the power of Spark and Python to build big data applications. Apache Spark is an open-source distributed data processing engine for big data analysis. Spark comes with built-in libraries for data processing, streaming, and machine learning that make it possible to analyze big data. PySpark supports most of Spark’s features, such as Spark SQL, DataFrames, and MLlib (Machine Learning). In this article, you will learn how to use PySpark DataFrames to solve common data analysis problems.

Table of contents

· Code snippets
· Imported libraries
· 1. Introduction
· 2. DataFrames
2.1. Row objects
2.2. Schema
2.3. Creating DataFrames
2.4. Empty DataFrames
2.5. Null values
2.6. Displaying DataFrames
· 3. Importing a DataFrame
· 4. Exporting a DataFrame
4.1. write()
· 5. Showing the schema
· 6. Shape of a DataFrame
· 7. Displaying the rows of a DataFrame
· 8. Calculating the statistics
· 9. Columns
9.1 Selecting columns
9.2 Column expressions
9.3. between()
9.4. concat()
9.5. Column aliases: alias() and name()
9.6. expr()
9.7. String manipulation: upper() and lower()
9.8. Math functions: log() and round()
9.9. Creating new columns
9.10. Renaming columns
9.11. Changing the data type of columns
9.12. Dropping columns
9.13. Dropping duplicates
9.14. Splitting a column
· 10. Rows
10.1. Filtering rows
10.2. when() and otherwise()
10.3. Methods for missing values
· 11. User Defined Functions (UDF)
11.1. UDFs with non-column parameters
11.2. UDFs and broadcasting
11.3. Null values in UDFs
· 12. Timestamp
12.1. date_trunc()
12.2. Extracting timestamp parts
12.3. Changing the time zone
12.4. Creating a date range
· 13. Partitions
13.1. getNumPartitions()
13.2. repartition()
· 14. Adding an index
14.1. monotonically_increasing_id()
14.2. coalesce()
14.3. zipWithIndex()
· 15. Sorting
15.1. sort()
15.2 desc()
· 16. Aggregation
16.1 groupBy()
16.2 agg()
· 17. Pivoting
17.1. pivot()
17.2. Unpivoting
· 18. Window functions
18.1. orderBy()
18.2. partitionBy()
18.3. rowsBetween()
18.4. rangeBetween()
18.5. Shifting a column
18.5. Filling a column
19. Joins
19.1. Inner join
19.2. Left and right joins
19.3 Full join
19.4. Left semi join
19.5. Left anti join
19.6. Cross join
· 20. Concatenating DataFrames
· 21. Resampling
21.1. Downsampling
21.2. Upsampling
· Source code

Code snippets

The code snippets in this article have been tested on a Databricks cluster (Runtime 11.3 LTS) with Spark 3.3.0 and Scala 2.12. You can find a link to the source code of all the code snippets at the end of this article.

Imported libraries

We will use some of the PySpark libraries in this article. Here we import all the libraries that we need:

from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType,
from pyspark.sql.types import IntegerType, FloatType, DoubleType, BooleanType
import pyspark.sql.types as T
from pyspark.sql import functions as F
from pyspark.sql.functions import col, column
from pyspark.sql.window import Window

1. Introduction

This article is focused on PySpark DataFrames. DataFrames generally refer to tabular data structures. PySpark DataFrames are distributed in-memory tables with named columns and schemas, where each column has a specific data type. A PySpark DataFrame is similar to a Pandas DataFrame, however, there are some important differences between them

  • PySpark DataFrames are immutable: After creating a PySpark DataFrame you can’t change it. In PySpark operations do not change the original DataFrame; instead, they will return the transformed results of the operation as a new DataFrame. In fact, the PySpark methods don’t change their parameters or the object on which they are invoked. Instead, they return a new result.
  • PySpark DataFrames are distributed: Pandas runs on a single machine, but PySpark code can be executed in a distributed way, So PySpark DataFrames are distributed in nature.
  • PySpark is lazy: Lazy evaluation is an evaluation strategy in which the evaluation of an expression is delayed until its value is needed. PySpark has two types of operations: transformations and actions. Transformations are operations that transform a PySpark DataFrame into a new DataFrame (of course they don’t really change the original DataFrame since they are immutable. Instead they return a new transformed DataFrame). Actions are any other operations that do not return a DataFrame. For example, they can display a DataFrame on the screen, write it to storage, or trigger a computation on that and return the result (like counting the number of rows in a DataFrame). In PySpark, transformations are computed lazily, so the execution of transformations is delayed until an action is invoked.

2. DataFrames

2.1. Row objects

In PySpark, a DataFrame is a distributed collection of rows under named columns. so, we can think of a DataFrame as a sequence of row objects. In PySpark each row of a DataFrame is a generic Row object. The class Row can be used to create a row object by using named arguments. Once, we create a row object, we can access its elements, using their name or index.

from pyspark.sql import Row
row = Row(name="John", role="Data scientist", salary=4500)
print("Name: ", row.name)
print("Salary: ", row['role'])
print("Salary: ", row[2])
Name:  John
Salary: Data scientist
Salary: 4500

It is not allowed to omit a named argument to represent that the value is None or missing. This should be explicitly set to None in this case. Here is an example:

row = Row(name="John", role="Data scientist", salary=None)

We can also create a row object with unnamed fields:

row = Row("John", "Data scientist", 4500)
row[2]
4500

Of course, we can only use the index to access the fields. We can also create a row object using the custom class method. Here, we first create a custom class from the Row class, and then initialize it with the values of the fields.

Employee = Row("name", "role", "salary")
employee1 = Employee("John", "Data scientist", 4500)
print("Name: {}, Salary: {}".format(employee1.name, employee1.salary))
Name: John, Salary: 4500

2.2. Schema

A DataFrame also has a schema. The schema defines the name of the columns and their data types. Each DataFrame should have a schema, but when creating a DataFrame, you don’t need to define its schema. PySpark can infer the schema based on the available data in each column and assign default names to each column. However, it is a good practice to define the schema up front since inferring the schema may be computationally expensive for large data files, and by having a schema, PySpark can detect any data that doesn’t match it.

The schema for a DataFrame can be defined in two ways: using a StructType or using a DLL string. StructType is a built-in data type which is a collection of StructFields. Each StructType is used to define the schema for a column of the DataFrame. These types are imported from the package pyspark.sql.types. For example, we can define a schema for a DataFrame with 3 columns named: name, role, and salary where the data types of these columns are String, String, and Integer respectively:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
someSchema = StructType([
StructField("name", StringType(), True),
StructField("role", StringType(), True),
StructField("salary", IntegerType(), True)]
)

Each StructField() takes the name of a column and its data type. The third parameter of StructField() is Boolean which shows if the column is nullable or not. If it is set to True it means that the column can have null values. We can also use the Data Definition Language (DDL) string to create the same schema in a much easier way:

import pyspark.sql.types as T
ddl_schema = "`name` STRING, `role` STRING, `salary` INTEGER"
schema = T._parse_datatype_string(ddl_schema)

Here we use the method _parse_datatype_string() to create the schema from the DLL string.

2.3. Creating DataFrames

PySpark has some special methods to create a DataFrame. First, we use the method createDataFrame() to create a DataFrame. We need to pass a list and a schema to this method to create the DataFrame. The simplest way to create a DataFrame using his method is just passing a list of tuples to it as data:

data =[ 
("John", "Data scientist", 4500),
("James", "Data engineer", 3200),
("Laura", "Data scientist", 4100),
("Ali", "Data engineer", 3200),
("Steve", "Developer", 3600)
]

df = spark.createDataFrame(data)
df.show()
+-----+--------------+----+
| _1| _2| _3|
+-----+--------------+----+
| John|Data scientist|4500|
|James| Data engineer|3200|
|Laura|Data scientist|4100|
| Ali| Data engineer|3200|
|Steve| Developer|3600|
+-----+--------------+----+

The method show() is used to display the DataFrame and it will be explained in more detail later. Here we didn’t provide the column names, so PySpark automatically gives a name to each column: We could also provide the column names of a DataFrame using the toDF() method. It returns a new DataFrame with specified column names:

data =[ 
("John", "Data scientist", 4500),
("James", "Data engineer", 3200),
("Laura", "Data scientist", 4100),
("Ali", "Data engineer", 3200),
("Steve", "Developer", 3600)
]

df = spark.createDataFrame(data).toDF("name","role", "salary")
df.show()
+-----+--------------+------+
| name| role|salary|
+-----+--------------+------+
| John|Data scientist| 4500|
|James| Data engineer| 3200|
|Laura|Data scientist| 4100|
| Ali| Data engineer| 3200|
|Steve| Developer| 3600|
+-----+--------------+------+

Please note that toDF() cannot accept a list of columns. So, if we have a list of column names, we have to unpack them when passing them to toDF().

cols = ["name","role", "salary"]
df = spark.createDataFrame(data).toDF(*cols)

We can also pass the schema to createDataFrame() . Here, we use a StructType() to indicate the name and type of each column.

data =[ 
("John", "Data scientist", 4500),
("James", "Data engineer", 3200),
("Laura", "Data scientist", 4100),
("Ali", "Data engineer", 3200),
("Steve", "Developer", 3600)
]

schema = StructType([
StructField("name", StringType(), True),
StructField("role", StringType(), True),
StructField("salary", IntegerType(), True)]
)

df = spark.createDataFrame(data=data,schema=schema)
df.show()
+-----+--------------+------+
| name| role|salary|
+-----+--------------+------+
| John|Data scientist| 4500|
|James| Data engineer| 3200|
|Laura|Data scientist| 4100|
| Ali| Data engineer| 3200|
|Steve| Developer| 3600|
+-----+--------------+------+

We could also use a DLL string to define the schema of the previous DataFrame:

data =[ 
("John", "Data scientist", 4500),
("James", "Data engineer", 3200),
("Laura", "Data scientist", 4100),
("Ali", "Data engineer", 3200),
("Steve", "Developer", 3600)
]

ddl_schema = "`name` STRING, `role` STRING, `salary` INTEGER"

df = spark.createDataFrame(
data,
T._parse_datatype_string(ddl_schema)
)

This DataFrame could also be created using the Row objects:

data =[ 
Row(name="John", role="Data scientist", salary=4500),
Row(name="James", role="Data engineer", salary=3200),
Row(name="Laura", role="Data scientist", salary=4100),
Row(name="Ali", role="Data engineer", salary=3200),
Row(name="Steve", role="Developer", salary=3600)
]

df = spark.createDataFrame(data)

We can use Row objects with unnamed fields, but the column names should be provided using toDF().

data =[ 
Row("John", "Data scientist", 4500),
Row("James", "Data engineer", 3200),
Row("Laura", "Data scientist", 4100),
Row("Ali", "Data engineer", 3200),
Row("Steve", "Developer", 3600)
]

df = spark.createDataFrame(data).toDF("name", "role", "salary")

Or we can first create a custom class from Row and indicate the column names.

Employee = Row("name", "role", "salary")

data = [
Employee("John", "Data scientist", 4500),
Employee("James", "Data engineer", 3200),
Employee("Laura", "Data scientist", 4100),
Employee("Ali", "Data engineer", 3200),
Employee("Steve", "Developer", 3600)
]

df = spark.createDataFrame(data)

Finally, we can use an RDD (Resilient Distributed Dataset) to create a DataFrame. An RDD is a basic abstraction in Spark that represents an immutable, partitioned collection of elements that can be operated on in parallel. We use the method parallelize() in sparkContext to create an RDD from a list of tuples and then use toDF() to convert it to a DataFrame:

data =[ 
("John", "Data scientist", 4500),
("James", "Data engineer", 3200),
("Laura", "Data scientist", 4100),
("Ali", "Data engineer", 3200),
("Steve", "Developer", 3600)
]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(["name","role", "salary"])

Please note that unlike the toDF() of DataFrames, the toDF() method of an RDD can accept a list of column names.

The method createDataFrame() requires a list of Rows, tuples, lists, or dictionaries, and we cannot directly create a single columned DataFrame from a list of scalars. Hence, the following code throws an error.

# This doesn't run
data = [1,2,3,4,5]
df1 = spark.createDataFrame(data).toDF("value")
TypeError: Can not infer schema for type: <class 'int'>

However, we can first convert each scalar into a tuple:

data = [1,2,3,4,5]
df1 = spark.createDataFrame([(i,) for i in data]).toDF("value")
df1.show()
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
| 4|
| 5|
+-----+

The other option is to provide the data type of the scalars in the schema:

data = [1,2,3,4,5]
df1 = spark.createDataFrame(data, T.IntegerType())

This option doesn’t work if the list has mixed data types. So, the following code snippet throws an error:

# This doesn't run
data = [1, 2, 3, 4.0, 5]
df1 = spark.createDataFrame(data, T.IntegerType())
TypeError: field value: IntegerType() can not accept object 4.0 in type <class 'float'>

Here, we need to first convert all the integers in data into floats and then pass it to createDataFrame():

data = [1, 2, 3, 4.0, 5]
df1 = spark.createDataFrame(map(float, data), T.FloatType())
df1.show()
+-----+
|value|
+-----+
| 1.0|
| 2.0|
| 3.0|
| 4.0|
| 5.0|
+-----+

2.4. Empty DataFrames

We can create an empty DataFrame using the following code snippet:

df1 = spark.createDataFrame([], T.StructType([]))
df1.show()
++
||
++
++

We can also create an empty DataFrame using a known schema:

ddl_schema = "`col1` INTEGER, `col2` INTEGER"
schema = T._parse_datatype_string(ddl_schema)
df1 = spark.createDataFrame([], schema)
df1.show()
+----+----+
|col1|col2|
+----+----+
+----+----+

2.5. Null values

It is common to have missing or null values in a DataFrame. PySpark can handle the null values, so we can use null values when creating a DataFrame. In PySpark, null values are usually represented by None when we create a DataFrame. However, in a PySpark DataFrame None values are shown as null.

data = [
("John", None, None),
("James", "Data engineer", 3200),
("Laura", None, 4100),
(None, "Data engineer", 3200),
("Steve", "Developer", 3600)
]

df1 = spark.createDataFrame(data).toDF("name", "role", "salary")
df1.show()
+-----+-------------+------+
| name| role|salary|
+-----+-------------+------+
| John| null| null|
|James|Data engineer| 3200|
|Laura| null| 4100|
| null|Data engineer| 3200|
|Steve| Developer| 3600|
+-----+-------------+------+

Please note that if all the entries in the column are None, then we need to define the schema of the DataFrame. If we don’t define the schema, PySpark cannot infer the column type and throws an error. Here is an example:

data =[ 
("John", "Data scientist", None),
("James", "Data engineer", None),
("Laura", "Data scientist", None),
("Ali", "Data engineer", None),
("Steve", "Developer", None)
]

ddl_schema = "`name` STRING, `role` STRING, `salary` INTEGER"
df = spark.createDataFrame(
data,
T._parse_datatype_string(ddl_schema)
)
df.show()

2.6. Displaying DataFrames

It is important to note that in PySpark, unlike the Pandas library in Python, you cannot display the contents of a DataFrame by simply typing its name. To display a DataFrame we can use the method show(). This method was already used in the previous code snippets. The syntax of show() is as follows:

DataFrame.show(n:int=20, truncate:Union[bool, int]=True, vertical:bool=False) → None

The parameter n controls how many rows should be displayed, and by default, it displays the top 20 rows of a DataFrame in a tabular form. For example, to only show the first 3 rows of df we can write:

data =[ 
("John", "Data scientist", 4500),
("James", "Data engineer", 3200),
("Laura", "Data scientist", 4100),
("Ali", "Data engineer", 3200),
("Steve", "Developer", 3600)
]

df = spark.createDataFrame(data).toDF("name","role", "salary")
df.show(3)
+-----+--------------+------+
| name| role|salary|
+-----+--------------+------+
| John|Data scientist| 4500|
|James| Data engineer| 3200|
|Laura|Data scientist| 4100|
+-----+--------------+------+
only showing top 3 rows

A column with more than 20 characters will be truncated by default. To prevent the truncation, we can set truncate to False. If set vertical to True, the output will be displayed vertically. The default value is False.

df.show(n=3, vertical=True)
-RECORD 0----------------
name | John
role | Data scientist
salary | 4500
-RECORD 1----------------
name | James
role | Data engineer
salary | 3200
-RECORD 2----------------
name | Laura
role | Data scientist
salary | 4100
only showing top 3 rows

In Databricks, we can also use the function display() to visualize a DataFrame, so to show a DataFrame in a more user-friendly format, we can write:

display(df)

3. Importing a DataFrame

The property read in SparkSession can be used to load a DataFrame from a data source. It returns a DataFrameREader object that can be used to read data in as a DataFrame. The class DataFrameReader has some public methods for loading the data:

  • format(): This method controls the format of the external resource and is declared as:
DataFrameReader.format(source: str) → DataFrameReader

The parameter source can be "parquet", "csv", "txt", "json", etc. If you don’t specify this method, then the default is "parquet" or whatever is set in spark.sql.sources.default.

  • option(): Adds an input option for the underlying data source, and is declared as:
DataFrameReader.option(key: str, value: OptionalPrimitiveType) → DataFrameReader

It takes a key and a value. value represents a Boolean value and can be a Boolean, number, or String. You can find a list of all the keys that can be used in option() here.

  • load(): It takes the path to the data source, loads the data, and returns it as a DataFrame. The syntax is as follows:
DataFrameReader.load(path: str or list, ...) → DataFrame

The most important parameter of load() is path that provides the path to the data source. For example to read a CSV file we can write:

path = "a path"
df = spark.read.format("csv").option("inferSchema", "true") \
.option("header", "true").load(path)

Here we placed the file path in path. The read() method is used to get an instance handle to DataFrameReader. Then we determine the type of resource (CSV file) in format(). When infershema is set to True, it infers the input schema automatically from data. When header is set to True the first line of files will be used to name columns and will not be included in data. Finally, we load it using the file path. Since option() and format() both return DataFrameReader, their order is not important, however, load() should always come at last since it returns a DataFrame. The keys that option() takes can be specific to a file type (like CSV).

We can also load a parquet file as another example:

path = "a path"
df = spark.read.load(path)

Here we don’t need to write format() since its default source is parquet. The class DataFrameReader has some other methods for loading a specific type of file that can be used instead of load(). For example, to load the previous parquet file we can just use the parquet() method instead of load():

path = "a path"
df = spark.read.parquet(path)

The method parquet() takes the file path and returns a DataFrame. We can use the methods like csv(), json(), … to load other types of files.

4. Exporting a DataFrame

4.1. write()

The property write in class DataFrame acts as an interface for saving the content of the non-streaming DataFrame out into external storage. It returns a DataFrameWriter object. The class DataFrameWriter has some methods that are used to write a DataFrame to external storage systems:

mode(): This method specifies the behavior of a DataFrameWriter object when the DataFrame to be saved already exists. Here is the syntax of this method:

mode(saveMode: Optional[str]) → DataFrameWrite

Here are the possible values that saveMode can take and their effect:

  • overwrite: overwrite the existing data
  • append: Append contents of this to existing data.
  • ignore: Silently ignore this operation if data already exists.
  • error, errorIfExists: Throw an exception at runtime (this is the default option)

format(): It is another method of DataFrameWriter that specifies the format in which the DataFrame will be saved. The built-in options are parquet, JSON, CSV, etc. The syntax is as follows:

format(source: str) → DataFrameWriter

Here source determines the format in which the DataFrame will be saved. If format is not specified, the default data source configured by spark.sql.sources.default will be used.

option(): This method adds an output option for the underlying data source and its syntax is declared as:

def option(key: String, value: OptionalPrimitiveType): DataFrameWriter

It takes a key and a value. You can find a list of all the keys that can be used in option() here. For example, in a CSV file, option("header", "true") writes the names of columns as the first line.

save(): This method saves the Dataset at a specified path

For example, to save df in CSV format, we can write:

path = "a path"
df.write.mode("overwrite").option("header", "true").format("csv").save(path)

We could also the method csv(), which saves the DataFrame in CSV format at the specified path:

df.write.mode("overwrite").option("header", "true").csv(path)

There are other methods, like text(), parquet(), etc., that can save a DataFrame in other formats.

5. Showing the schema

The method printSchema() prints the schema to the console in a nice tree format:

data =[ 
("John", "Data scientist", 4500),
("James", "Data engineer", 3200),
("Laura", "Data scientist", 4100),
("Ali", "Data engineer", 3200),
("Steve", "Developer", 3600)
]

df = spark.createDataFrame(data).toDF("name","role", "salary")
df.printSchema()
root
|-- name: string (nullable = true)
|-- role: string (nullable = true)
|-- salary: long (nullable = true)

We can also get the schema of a DataFrame as a StructType() using schema:

df.schema
StructType([StructField('name', StringType(), True), StructField('role', StringType(), True), StructField('salary', LongType(), True)])

6. Shape of a DataFrame

The method count() returns the number of rows in a DataFrame.

df.count()
5

The method columns() returns all column names as a list.

df.count()
['name', 'role', 'salary']

So to get the number of columns in a DataFrame we can write:

len(df.count())
3

We can easily detect an empty DataFrame using the count() method.

df1 = spark.createDataFrame([], T.StructType([]))
df1.count() == 0
True

We can also use the method isEmpty() to detect an empty DataFrame:

df1.isEmpty()
True

7. Displaying the rows of a DataFrame

We can use the method head() to get the first rows of a DataFrame. The syntax of the method head() is as follows:

DataFrame.head(n: Optional[int]=None) →  Union[Row, None, List[Row]]

It returns the first n rows of a DataFrame as a list. The default value of n is 1. For example, to show the first 3 rows of df we can write:

df.head(3)
[Row(name='John', role='Data scientist', salary=4500),
Row(name='James', role='Data engineer', salary=3200),
Row(name='Laura', role='Data scientist', salary=4100)]

We can also use the method take() to do the same task:

df.take(3)

To get only the first row, we can also use the method first():

df.first()
Row(name='John', role='Data scientist', salary=4500)

The method tail() returns the last n rows of a DataFrame as a list. The syntax is as follows:

DataFrame.tail(n: Int) → List[Row]

All these methods (head(), first(), take(), tail()) return a row or a list of rows. If you need to return a DataFrame, you can use limit(). The syntax for this method is:

DataFrame.limit(n: int) → DataFrame

It returns a new DataFrame by taking the first n rows.

df.limit(3).show() 
+-----+--------------+------+
| name| role|salary|
+-----+--------------+------+
| John|Data scientist| 4500|
|James| Data engineer| 3200|
|Laura|Data scientist| 4100|
+-----+--------------+------+

8. Calculating the statistics

The method describe() computes basic statistics for numeric and string columns, including count, mean, standard deviation, min, and max. It can be used for exploratory data analysis. The argument of this method takes the list of column names that we want to compute the statistics for, and if we don’t pass an argument, it will compute the statistics for the entire DataFrame.

df.describe().show()
+-------+-----+-------------+-----------------+
|summary| name| role| salary|
+-------+-----+-------------+-----------------+
| count| 5| 5| 5|
| mean| null| null| 3720.0|
| stddev| null| null|571.8391382198319|
| min| Ali|Data engineer| 3200|
| max|Steve| Developer| 4500|
+-------+-----+-------------+-----------------+

9. Columns

In PySpark a Column object represents a column of a DataFrame. The module pyspark.sql.functions provides several built-in standard functions to work with the columns. All these functions return a Column object. You can use the functions col() and column() to create a free column which is not associated with a DataFrame yet:

from pyspark.sql.functions import col, column
col1 = col("c1")
col2 = column("c2")
col1
Column<'c1'>

As mentioned before, the method columns() returns all the column names as a list:

data =[ 
("John", "Data scientist", 4500),
("James", "Data engineer", 3200),
("Laura", "Data scientist", 4100),
("Ali", "Data engineer", 3200),
("Steve", "Developer", 3600)
]

df = spark.createDataFrame(data).toDF("name","role", "salary")
df.columns
['name', 'role', 'salary']

In a DataFrame you can use [] or . operators to get the Column object of that specific column. For example, to get the Column object the column name in df we can write:

df["name"]
Column<'name'>

We could also use df.name to get the same result.

9.1 Selecting columns

The method select() can be used to select a set of columns in a DataFrame. The syntax is:

DataFrame.select(*cols: ColumnOrName]) → DataFrame

This method can take column names or Column objects, or a list of column names and objects. It returns a DataFrame that contains those columns or column objects. For example, the following code selects the column name in df:

df.select("name").show()
+-----+
| name|
+-----+
| John|
|James|
|Laura|
| Ali|
|Steve|
+-----+

We could also use a Column object to get the same result:

df.select(col("name")).show()
df.select(df["name"]).show()

The following code snippet selects the columns name and role in df:

df.select("name", "role").show()
+-----+--------------+
| name| role|
+-----+--------------+
| John|Data scientist|
|James| Data engineer|
|Laura|Data scientist|
| Ali| Data engineer|
|Steve| Developer|
+-----+--------------+

Here, we could also use a list to get the same result:

df.select(["name", "role"]).show()

We can select all the columns of a DataFrame by passing * to select():

df.select(col("*")).show()

We can also select a slice of the columns of a DataFrame. For example, we can use the following code to select the first two columns of df:

df.select(df.columns[:2]).show()
+-----+--------------+
| name| role|
+-----+--------------+
| John|Data scientist|
|James| Data engineer|
|Laura|Data scientist|
| Ali| Data engineer|
|Steve| Developer|
+-----+--------------+

9.2 Column expressions

You can use the overloaded operators of the Column class to create expressions with columns:

df.select(col("salary") * 1.2 + 100).show()
+----------------------+
|((salary * 1.2) + 100)|
+----------------------+
| 5500.0|
| 3940.0|
| 5020.0|
| 3940.0|
| 4420.0|
+----------------------+

Here the expression col("salary") * 1.2 + 100 creates a new Column object called ((salary * 1.2) + 100). We could also create it in this way:

df.select(df["salary"] * 1.2 + 100).show()

In the next example, we select a column and a column expression together:

df.select(["name", df["salary"] * 1.2 + 100]).show()
+-----+----------------------+
| name|((salary * 1.2) + 100)|
+-----+----------------------+
| John| 5500.0|
|James| 3940.0|
|Laura| 5020.0|
| Ali| 3940.0|
|Steve| 4420.0|
+-----+----------------------+

We can create Boolean columns using a column expression:

df.select(df["salary"] > 4000).show()
+---------------+
|(salary > 4000)|
+---------------+
| true|
| false|
| true|
| false|
| false|
+---------------+

The column expressions also show the importance of the function col(). You can use col() to define a column expression even before creating the corresponding DataFrame. For example, we can use the following code snippet:

col_expr = col("salary") * 1.2
data =[
("John", "Data scientist", 4500),
("James", "Data engineer", 3200),
("Laura", "Data scientist", 4100),
("Ali", "Data engineer", 3200),
("Steve", "Developer", 3600)
]

df1 = spark.createDataFrame(data).toDF("name","role", "salary")
df1.select(col_expr).show()

However, running the following code throws an error since we have access df1 before defining it.

# This won't run!

col_expr = df1["salary"] * 1.2
data =[
("John", "Data scientist", 4500),
("James", "Data engineer", 3200),
("Laura", "Data scientist", 4100),
("Ali", "Data engineer", 3200),
("Steve", "Developer", 3600)
]

df1 = spark.createDataFrame(data).toDF("name","role", "salary")
df1.select(col_expr).show()
NameError: name 'df1' is not defined

9.3. between()

The syntax of this method is:

Column.between(lowerBound, upperBound) → Column

It returns a Boolean column. If an element in the current column (on which this method was invoked) is between the lower bound and upper bound (inclusive), its corresponding element in the returned column is True. Otherwise, it is False.

df.select(col("salary").between(3200,3600)).show()
+---------------------------------------+
|((salary >= 3200) AND (salary <= 3600))|
+---------------------------------------+
| false|
| true|
| false|
| true|
| true|
+---------------------------------------+

9.4. concat()

We can use the method concat() in functions to combine the String columns:

import pyspark.sql.functions as F
df.select(F.concat(df["name"], F.lit("-"), df["role"])).show()
+---------------------+
|concat(name, -, role)|
+---------------------+
| John-Data scientist|
| James-Data engineer|
| Laura-Data scientist|
| Ali-Data engineer|
| Steve-Developer|
+---------------------+

Here the method lit() in functions was used to create a Column of literal values.

9.5. Column aliases: alias() and name()

As mentioned before the expression used to create a new column determines its name. However, you are free to set a better name for that. The method alias() in class Column gives a name (alias) to a column. In fact, when you invoke it on a Column object, it returns the same column with a new name (alias). The syntax is:

DataFrame.alias(alias: str) → DataFrame

Here the parameter alias will be the alias of the returned Column object. For example, we can give an alias to col_expr:

col_expr = (col("salary") * 1.2 + 100).alias("bonus")
df.select(col_expr).show()
+------+
| bonus|
+------+
|5500.0|
|3940.0|
|5020.0|
|3940.0|
|4420.0|
+------+

The Class Column has another method called name() which can do exactly the same thing:

df.select((col("salary") * 1.2 + 100).name("bonus")).show()
+------+
| bonus|
+------+
|5500.0|
|3940.0|
|5020.0|
|3940.0|
|4420.0|
+------+

9.6. expr()

The function expr() in functions parses its parameter into the column that it represents. The syntax is as follows:

expr(str: str) → Column

The parameter str is an SQL-like expression. This function executes it and returns the result as a Column object. So we can rewrite all the previous column expressions using that. For example:

df.select(F.expr("salary > 4000")).show()

Please note that we can directly use the name of the columns inside an expression.

9.7. String manipulation: upper() and lower()

The method upper() converts the string expressions in a column to upper case and returns the result as a new column. Here is an example:

df.select(F.lower(df['name']).alias('name')).show()
+-----+
| name|
+-----+
| john|
|james|
|laura|
| ali|
|steve|
+-----+

Similarly, the method lower() converts the string expressions in a column to lower case and returns the result as a new column.

9.8. Math functions: log() and round()

The module pyspark.sql.functions has lots of built-in mathematical functions. Here we only discuss a few examples. There are some other functions like exp(), sin(), cos(), …, and you can see the full list of the math functions here.

The function log() calculates the natural logarithm of the given column and returns the result as a new Column object. The syntax of the function round() is as follows:

round(col: ColumnOrName, scale: int = 0) → Column

It rounds the values in the column col to scale decimal places with HALF_UP round mode if scale is greater than or equal to 0. In HALF_UP round mode, a number is rounded towards “nearest neighbor” unless both neighbors are equidistant, in which case it is rounded up. If scale is a negative number, col is rounded to the position of the integral part specified by the absolute value of scale. If scale is equal to -n, this is like dividing col by 10^n, rounding it to zero decimal places with HALF_UP round mode, and multiplying it with 10^n again. Here is an example of using log() and round() together:

from pyspark.sql.types import FloatType
data = [1.0,2.0,3.0,4.0,5.0]
df1 = spark.createDataFrame(data, FloatType()).toDF("number")
df1.select(F.round(F.log("number"), 3)).show()
+--------------------+
|round(ln(number), 3)|
+--------------------+
| 0.0|
| 0.693|
| 1.099|
| 1.386|
| 1.609|
+--------------------+

And where is an example of using round() with a negative scale:

data = [0.6892, 206.892, 1268.0]
df1 = spark.createDataFrame([(i,) for i in data]).toDF("number")
df1.select(F.round(col("number"), -2)).show()

+-----------------+
|round(number, -2)|
+-----------------+
| 0.0|
| 200.0|
| 1300.0|
+-----------------+

9.9. Creating new columns

The method withColumn() can be used to create a new column in a DataFrame. The syntax is as follows:

withColumn(colName: str, col: Column) → DataFrame

It returns a new DataFrame by adding a column or replacing the existing one with the same name. It adds (or replaces) the Column object col by the name colName to the columns of the DataFrame on which it is invoked and returns the result as a new DataFrame. You can also use a Column expression for col. As an example, we use it to add a new constant column vacation to df and create a new DataFrame called df1:

df1 = df.withColumn("vacation", F.lit(15))
df1.show()
+-----+--------------+------+--------+
| name| role|salary|vacation|
+-----+--------------+------+--------+
| John|Data scientist| 4500| 15|
|James| Data engineer| 3200| 15|
|Laura|Data scientist| 4100| 15|
| Ali| Data engineer| 3200| 15|
|Steve| Developer| 3600| 15|
+-----+--------------+------+--------+

Or we can use it to change the existing column salary in df using a Column expression:

df1 = df.withColumn("salary", col("salary") + 300)
df1.show()
+-----+--------------+------+
| name| role|salary|
+-----+--------------+------+
| John|Data scientist| 4800|
|James| Data engineer| 3500|
|Laura|Data scientist| 4400|
| Ali| Data engineer| 3500|
|Steve| Developer| 3900|
+-----+--------------+------+

We can also use select() to add a new column by keeping the original columns of the DataFrame. We can write the previous example using select():

df1 = df.select(col("*"), (col("salary") * 1.2 + 100).name("bonus"))
df1.show()
+-----+--------------+------+------+
| name| role|salary| bonus|
+-----+--------------+------+------+
| John|Data scientist| 4500|5500.0|
|James| Data engineer| 3200|3940.0|
|Laura|Data scientist| 4100|5020.0|
| Ali| Data engineer| 3200|3940.0|
|Steve| Developer| 3600|4420.0|
+-----+--------------+------+------+

We can add more than one column by using a chain of withColumn() calls. However, calling it multiple times in order to add multiple columns can cause performance issues. You can either use select with multiple columns at once to avoid this problem or use the method withColumns(). This method takes a dictionary of column names as keys and Column objects as values and returns a new DataFrame with those additional columns. Here is an example:

df1 = df.withColumns({"bonus1": col("salary") + 100,
"bonus2": col("salary") + 300})
df1.show()
+-----+--------------+------+------+------+
| name| role|salary|bonus1|bonus2|
+-----+--------------+------+------+------+
| John|Data scientist| 4500| 4600| 4800|
|James| Data engineer| 3200| 3300| 3500|
|Laura|Data scientist| 4100| 4200| 4400|
| Ali| Data engineer| 3200| 3300| 3500|
|Steve| Developer| 3600| 3700| 3900|
+-----+--------------+------+------+------+

9.10. Renaming columns

The method withColumnRenamed() renames the existing columns. In each call of this method, we can only rename one column. The syntax is:

DataFrame.withColumnRenamed(existing: str, new: str) → DataFrame

The parameter existing is the name of the existing column to rename, and the parameter new is the new name of the column. This method returns a new DataFrame with the renamed column. In each call of this method, we can only rename one column. So for renaming more than one column, you need to chain it. For example to change the name of the columns name and role to first name and job respectively, we can write:

df1 =df.withColumnRenamed("name", "first name") \
.withColumnRenamed("role", "job")
df1.show()
+----------+--------------+------+
|first name| job|salary|
+----------+--------------+------+
| John|Data scientist| 4500|
| James| Data engineer| 3200|
| Laura|Data scientist| 4100|
| Ali| Data engineer| 3200|
| Steve| Developer| 3600|
+----------+--------------+------+

In Spark version 3.4.0 or later, we can use the method withColumnsRenamed() to rename multiple columns. It tasks a dictionary of existing column names and corresponding desired column names. This method returns a new DataFrame with the renamed columns. The following code snippet uses this method to change the name of the columns name and role to first name and job respectively:

# Only in Spark version 3.4.0 or higher
df1 =df.withColumnsRenamed({"name": "first name", "role": "job"})
df1.show()

9.11. Changing the data type of columns

We can change the data type of a column using the cast() method:

cast(dataType: Union[pyspark.sql.types.DataType, str]) → Column

It changes the data type of the column on which it is invoked to dataType and returns it as a new column. For example, we can use this method to change the data type of a column from string to double:

data = [
("2020-01-01 07:30:10.150007", "17.0"),
("2020-01-02 07:30:10.150007", "25.5"),
("2020-01-03 07:30:10.150007", "19.5"),
("2020-01-04 07:30:10.150007", "21.2"),
("2020-01-05 07:30:10.150007", "18.0"),
("2020-01-06 07:30:10.150007", "20.5")
]

df1 = spark.createDataFrame(data).toDF("time", "temperature")
df1 = df1.withColumn("temperature",
col("temperature").cast(T.DoubleType()))
df1.show(truncate=False)
+--------------------------+-----------+
|time |temperature|
+--------------------------+-----------+
|2020-01-01 07:30:10.150007|17.0 |
|2020-01-02 07:30:10.150007|25.5 |
|2020-01-03 07:30:10.150007|19.5 |
|2020-01-04 07:30:10.150007|21.2 |
|2020-01-05 07:30:10.150007|18.0 |
|2020-01-06 07:30:10.150007|20.5 |
+--------------------------+-----------+

The method cast() can also take the canonical string representation of the data type. For example, the canonical representation of DoubleType() is double. So, the previous example can be also written as:

data = [
("2020-01-01 07:30:10.150007", "17.0"),
("2020-01-02 07:30:10.150007", "25.5"),
("2020-01-03 07:30:10.150007", "19.5"),
("2020-01-04 07:30:10.150007", "21.2"),
("2020-01-05 07:30:10.150007", "18.0"),
("2020-01-06 07:30:10.150007", "20.5")
]

df1 = spark.createDataFrame(data).toDF("time", "temperature")
df1 = df1.withColumn("temperature", col("temperature").cast("double"))

Hence, you won’t need to import DoubleType. Table 1 shows the different types in Spark and their corresponding canonical representation.

Table. 1

+------------- ---+------------------------+
|Type |Canonical representation|
+-----------------+------------------------+
| ByteType | byte |
| ShortType | short |
| IntegerType | int |
| LongType | long |
| StringType | string |
| BooleanType | boolean |
| FloatType | float |
| DoubleType | double |
| DecimalType | decimal |
| DateType | date |
| TimestampType | timestamp |
+------------------------------------------+

Here is another example. We first define a DataFrame with two columns:

data =[ 
("1.0", 2),
("2.0", 4),
("3.0", 6),
("4.0", 8)
]
df1 = spark.createDataFrame(data).toDF("col1","col2")
df1.show()
+----+----+
|col1|col2|
+----+----+
| 1.0| 2|
| 2.0| 4|
| 3.0| 6|
| 4.0| 8|
+----+----+

Then we select a column expression that results in a Boolean column and cast it into an integer:

df1.select((df1["col1"] >= 2).cast('int')).show()
+------------------------+
|CAST((col1 >= 2) AS INT)|
+------------------------+
| 0|
| 1|
| 1|
| 1|
+------------------------+

Please note that PySpark can also do automatic type conversions. For example, in the previous DataFrame, we can add the column without getting an error:

# Automatic conversion from string to float
df1.select(df1["col1"]+df1["col2"]).show()
+-------------+
|(col1 + col2)|
+-------------+
| 3.0|
| 6.0|
| 9.0|
| 12.0|
+-------------+

Here, PySpark automatically converts both columns to float before adding them.

9.12. Dropping columns

The method drop() is used to drop one or more columns from a DataFrame. It takes the names of the columns and returns a new Dataset with those columns dropped. For example, to drop the columns role and salary from df, we can write:

df.drop("role", "salary").show()
+-----+
| name|
+-----+
| John|
|James|
|Laura|
| Ali|
|Steve|
+-----+

We can also drop the columns from a list using * operator to unpack the list into arguments of drop():

cols = ["role", "salary"]
df1 = df.drop(*cols)

9.13. Dropping duplicates

The method dropDuplicates() removes the duplicate rows of the Dataset on which it is invoked and returns the result as a new Dataset. The syntax is as follows:

dropDuplicates(subset: Optional[List[str]] = None) → DataFrame

Two or more rows are considered identical if they have the same values for the columns whose names are given in the list subset. In that case, PySpark keeps the first occurrence of that row and all the duplicate rows are removed. Let’s see an example.

data = [
("John", "Data scientist", 4500),
("James", "Data engineer", 3200),
("Laura", "Data scientist", 4100),
("Ali", "Data engineer", 3200),
("Steve", "Developer", 3600),
("John", "Data scientist", 4500)
]

df1 = spark.createDataFrame(data).toDF("name", "role", "salary")
df1.dropDuplicates(["role"]).show()
+-----+--------------+------+
| name| role|salary|
+-----+--------------+------+
|James| Data engineer| 3200|
| John|Data scientist| 4500|
|Steve| Developer| 3600|
+-----+--------------+------+

Here all the duplicate rows that have the same value in the column role are removed. If you pass no column names to dropDuplicates() then it returns a new DataFrame that contains only the unique rows from the original DataFrame on which it is invoked. Here two rows are considered identical if all their columns have the same value.

df1.dropDuplicates().show()
+-----+--------------+------+
| name| role|salary|
+-----+--------------+------+
| John|Data scientist| 4500|
| Ali| Data engineer| 3200|
|Laura|Data scientist| 4100|
|James| Data engineer| 3200|
|Steve| Developer| 3600|
+-----+--------------+------+

The method distinct() is an alias for dropDuplicates(). For example, to get the distinct values of the column role we could write:

df1.select("role").distinct().show()
+--------------+
| role|
+--------------+
|Data scientist|
| Data engineer|
| Developer|
+--------------+

9.14. Splitting a column

The method split() in the module functions splits a string around matches of the given pattern. The syntax is as follows:

split(str: ColumnOrName, pattern: str, limit: int = - 1) → Column

The parameter str is a string or Column object and will be split around matches of pattern that represent a regular expression. This method returns a Column object. Each row of this column is an array that contains all the results of the splitting. Here is an example:

data = [
("John,Data scientist", 4500),
("James,Data engineer", 3200),
("Laura,Data scientist", 4100)
]
df1 = spark.createDataFrame(data).toDF("name-role", "salary")
df1.select(F.split(col("name-role"), ",") \
.alias("splitted_column")).show(truncate=False)
+-----------------------+
|splitted_column |
+-----------------------+
|[John, Data scientist] |
|[James, Data engineer] |
|[Laura, Data scientist]|
+-----------------------+

The following code snippet splits a column and extracts the split items as separate columns:

df2 = df1.select(F.split(col("name-role"), ",").alias("splitted_column"),
col("salary"))
df2.select(
col("splitted_column")[0].alias("name"),
col("splitted_column")[1].alias("role"),
col("salary")).show()
+-----+--------------+------+
| name| role|salary|
+-----+--------------+------+
| John|Data scientist| 4500|
|James| Data engineer| 3200|
|Laura|Data scientist| 4100|
+-----+--------------+------+

We can also create the name of the new column programmatically. This is useful when we get lots of new columns. The following code splits a column into new columns col1, col2,… coln:

df2.select([col("splitted_column")[i] \
.alias("col{}".format(i+1)) for i in range(2)]).show()
+-----+--------------+
| col1| col2|
+-----+--------------+
| John|Data scientist|
|James| Data engineer|
|Laura|Data scientist|
+-----+--------------+

10. Rows

So far we discussed the operations related to the columns of a DataFrame, now we can focus on its rows. As mentioned before, each row of a DataFrame is a Row object.

10.1. Filtering rows

The method filter() can filter the rows of a DataFrame using a given condition. Here is the syntax of this method:

DataFrame.filter(condition: ColumnOrName) → DataFrame

The parameter condition is a column expression that can be either a Boolean Column object or a string that represents a Boolean SQL expression. The method returns a Boolean column, so you cannot simply use the name of a column unless the type of that column is Boolean. It returns the result as a new DataFrame. In the following example, it is used to filter the rows of df in which salary is equal to 3200.

data =[ 
("John", "Data scientist", 4500),
("James", "Data engineer", 3200),
("Laura", "Data scientist", 4100),
("Ali", "Data engineer", 3200),
("Steve", "Developer", 3600)
]
df = spark.createDataFrame(data).toDF("name","role", "salary")


df.filter(col("salary") == 3200).show()
+-----+-------------+------+
| name| role|salary|
+-----+-------------+------+
|James|Data engineer| 3200|
| Ali|Data engineer| 3200|
+-----+-------------+------+

The method where() is an alias for filter(). So we could also write:

df.where(col("salary") == 3200).show()

Table 2 gives a list of the Boolean and logical operators that you can use for a column expression.

Table. 2

+----------+--------------------------+
| Operator | Descritption |
+----------+--------------------------+
| == | Equal to |
| != | Not equal to |
| > | Greater than |
| < | Less than |
| >= | Greater than or equal to |
| <= | Less than or equal to |
| & | And |
| | | Or |
| ! | Not |
+----------+--------------------------+

Here is another example:

df.filter((col("salary") > 3200) & (col("name") != "Ali")).show()
+-----+--------------+------+
| name| role|salary|
+-----+--------------+------+
| John|Data scientist| 4500|
|Laura|Data scientist| 4100|
|Steve| Developer| 3600|
+-----+--------------+------+

Please note that here you need to add parenthesis around the comparison methods since their evaluation precedence is not higher than & (and). If you remove the parenthesis, the expression throws an error.

Alternatively the condition of filter() can be a string that is a Boolean SQL expression. We can use the same operators of Table. 2 in this string. However, we must use the operators and, or, and not instead of &, | and ! respectively. The following example gives the same result as the previous one:

df.filter("salary > 3200 and name != 'Ali'").show()
+-----+--------------+------+
| name| role|salary|
+-----+--------------+------+
| John|Data scientist| 4500|
|Laura|Data scientist| 4100|
|Steve| Developer| 3600|
+-----+--------------+------+

We can also use the method between() to filter the rows. This method filters the rows in which the target column is between the lower bound and upper bound, inclusive.

df.filter(col("salary").between(3200,3600)).show()
+-----+-------------+------+
| name| role|salary|
+-----+-------------+------+
|James|Data engineer| 3200|
| Ali|Data engineer| 3200|
|Steve| Developer| 3600|
+-----+-------------+------+

10.2. when() and otherwise()

The methods when() and otherwise() are usually used together. The syntax of when() is as follows:

when(condition: Column, value: Any) → Column

It evaluates condition which is a Boolean Column expression and returns a new Column object based on that. Each row of the resulting Column is set to value if the corresponding row of condition is true and null if it is None. Here is an example:

df.select(F.when(df["role"] == "Data scientist", 3) \
.alias("role code")).show()
+---------+
|role code|
+---------+
| 3|
| null|
| 3|
| null|
| null|
+---------+

Here when a row of role is equal to 'Data scientist’ the corresponding row of the result is 3. Otherwise, it will be None. You can chain multiple when() methods. So, when() can be applied to a Column previously generated by another when() method. The method otherwise() can be only used at the end of a when() chain. It is declared as:

otherwise(value: Any) → Column

It will replace all the remaining null elements of the Column resulting from a when() chain with value. Hence, you won’t end up with any None values. In the next example, we use a chain of when() and otherwise() to assign a number to each role in df:

df.select(F.when(df["role"] == "Data scientist", 3) \
.when(df["role"] == "Data engineer", 2) \
.otherwise(1).alias("role code")).show()
+---------+
|role code|
+---------+
| 3|
| 2|
| 3|
| 2|
| 1|
+---------+

The parameter value can also be a column expression. Here is another example that adds 50 to salary when role is Data scientist:

df.withColumn("salary",
F.when(df["role"] == "Data scientist", df["salary"] + 50) \
.otherwise(df["salary"])).show()
+-----+--------------+------+
| name| role|salary|
+-----+--------------+------+
| John|Data scientist| 4550|
|James| Data engineer| 3200|
|Laura|Data scientist| 4150|
| Ali| Data engineer| 3200|
|Steve| Developer| 3600|
+-----+--------------+------+

10.3. Methods for missing values

10.3.1. isNull() and isNotNull()

We can use the method isNull() to filter the null values. Let’s see an example. We first define a DataFrame with some null values:

data = [
("John", None, None),
("James", "Data engineer", 3200),
("Laura", None, 4100),
(None, "Data engineer", 3200),
("Steve", "Developer", 3600)
]

df1 = spark.createDataFrame(data).toDF("name", "role", "salary")
df1.show()
+-----+-------------+------+
| name| role|salary|
+-----+-------------+------+
| John| null| null|
|James|Data engineer| 3200|
|Laura| null| 4100|
| null|Data engineer| 3200|
|Steve| Developer| 3600|
+-----+-------------+------+

To filter the rows with null values we can write:

df1.filter(df["role"].isNull()).show()
+-----+----+------+
| name|role|salary|
+-----+----+------+
| John|null| null|
|Laura|null| 4100|
+-----+----+------+

We could also use an SQL expression to get the same result:

df1.filter(F.expr("role IS NULL")).show()

To filter the rows which are not null, we use the method isNotNull():

df1.filter(df1["role"].isNotNull()).show()
+-----+-------------+------+
| name| role|salary|
+-----+-------------+------+
|James|Data engineer| 3200|
| null|Data engineer| 3200|
|Steve| Developer| 3600|
+-----+-------------+------+

10.3.2. na

The class DataFrame has a property called na that returns an instance of class DataFrameNaFunctions. This class has some useful methods for working with the missing data in a DataFrame. na takes the DataFrame object on which it is invoked and returns a DataFrameNaFunctions object instantiated with that.

df1.na
<pyspark.sql.dataframe.DataFrameNaFunctions at 0x7f2118b2ef10>

Once, we get the DataFrameNaFunctions object, we can use its methods to handle missing data.

10.3.2.1. fill()

To fill the null values we can use the fill() which is available in the class DataFrameNaFunctions. The syntax is:

fill(value, subset = None) → DataFrame

It returns a new DataFrame that replaces the null entries with value. We can also specify the name of the columns in which the null values should be replaced. The optional parameter subset provides the list of column names to consider. If value is a dict, then subset is ignored. In that case, value must be a dictionary of column names and their corresponding replacement value. Please note that if the data type of a column does not match the data type of value, that column will be ignored. For example, the following code replaces the null values in all the string columns with N/A:

For example:

df1.na.fill("N/A").show()
+-----+-------------+------+
| name| role|salary|
+-----+-------------+------+
| John| N/A| null|
|James|Data engineer| 3200|
|Laura| N/A| 4100|
| N/A|Data engineer| 3200|
|Steve| Developer| 3600|
+-----+-------------+------+

As you see the null value in salary wasn’t filled. That is because it is a numeric column and the value is a string (here it is "N/A"). In the next example, we provide a separate numerical value for salary:

df1.na.fill({"name": "N/A", "role": "N/A", "salary": 0}).show()
+-----+-------------+------+
| name| role|salary|
+-----+-------------+------+
| John| N/A| 0|
|James|Data engineer| 3200|
|Laura| N/A| 4100|
| N/A|Data engineer| 3200|
|Steve| Developer| 3600|
+-----+-------------+------+

10.3.2.2. drop()

We can use the method drop() in class DataFrameNaFunctions to drop the null. The syntax form of this method is defined as:

drop(how: str = 'any', threshn= None, subset = None) → DataFrame

When you invoke it on a DataFrame, it drops the rows containing null values in the specified columns, and returns the result as a new DataFrame. If how is set to any, it drops the rows containing at least one null values in the specified columns. If how is set to all, then it only drops the rows in which all the specified columns are null.

If you don’t specify the columns, then it will be applied to all the columns of the DataFrame, and if you don’t specify how, then its default value is any. Here are a few examples:

df1.na.drop().show() 
+-----+-------------+------+
| name| role|salary|
+-----+-------------+------+
|James|Data engineer| 3200|
|Steve| Developer| 3600|
+-----+-------------+------+

and

df1.na.drop(how="all", subset=["role", "salary"]).show()
+-----+-------------+------+
| name| role|salary|
+-----+-------------+------+
|James|Data engineer| 3200|
|Laura| null| 4100|
| null|Data engineer| 3200|
|Steve| Developer| 3600|
+-----+-------------+------+

We can also use the method dropna() in the DataFrame class to remove the null values. The syntax is similar to drop(), but you directly invoke it on a DataFrame.

df1.dropna(how="all", subset=["role", "salary"]).show()
+-----+-------------+------+
| name| role|salary|
+-----+-------------+------+
|James|Data engineer| 3200|
|Laura| null| 4100|
| null|Data engineer| 3200|
|Steve| Developer| 3600|
+-----+-------------+------+

11. User Defined Functions (UDF)

In PySpark, you can define User Defined Functions (UDFs) to extend the built-in functions. UDFs are column-based functions that take one or more columns as input parameters and return a new column as their output. You can define a UDF by defining a Python function and wrapping it with udf() function. The function udf() in module functions creates a UDF. It takes a Python function and the return type of the UDF:

udf(f, returnType = StringType(), *, useArrow = None) → UserDefinedFunctionLike

The default value of return type is String. The user-defined functions do not take keyword arguments on the calling side. UDF’s are computationally expensive operations and you should avoid uing them as much as possible.

For example, the following code defines a UDF to determine if the numbers in a column are even or not. We first create a DataFrame with one column.

df1 = spark.range(1, 11).select(F.col("id").cast("int").alias("number"))
df1.show()
+------+
|number|
+------+
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
| 10|
+------+

Then we define the UDF and use withColumn() to apply it to a column. Please note that udf() only takes a lambda function.

def is_even(x):
return x % 2 == 0
is_evenUDF = F.udf(lambda x: is_even(x), T.BooleanType())
df2 = df1.withColumn("even", is_evenUDF(df1["number"]))
df2.show()
+------+-----+
|number| even|
+------+-----+
| 1|false|
| 2| true|
| 3|false|
| 4| true|
| 5|false|
| 6| true|
| 7|false|
| 8| true|
| 9|false|
| 10| true|
+------+-----+

We could also use select() to apply it:

df2 = df1.select(col("*"), is_evenUDF(col("number")).alias("even"))
df2.show()
+------+-----+
|number| even|
+------+-----+
| 1|false|
| 2| true|
| 3|false|
| 4| true|
| 5|false|
| 6| true|
| 7|false|
| 8| true|
| 9|false|
| 10| true|
+------+-----+

We can also define a UDF using decorators:

@F.udf(returnType=T.BooleanType())
def is_even(x):
return x % 2 == 0

df2 = df1.withColumn("even", is_even(df1["number"]))

In the next example, we wrap a function with two parameters with udf():

@F.udf()
def func(name, role):
return name+" "+role

df2 = df.withColumn("name_role", func(col("name"), col("role")))
df2.show()
+-----+--------------+------+--------------------+
| name| role|salary| name_role|
+-----+--------------+------+--------------------+
| John|Data scientist| 4500| John Data scientist|
|James| Data engineer| 3200| James Data engineer|
|Laura|Data scientist| 4100|Laura Data scientist|
| Ali| Data engineer| 3200| Ali Data engineer|
|Steve| Developer| 3600| Steve Developer|
+-----+--------------+------+--------------------+

11.1. UDFs with non-column parameters

As mentioned before a UDF function returned by udf() can only take column objects as parameters. So if the the Python function takes a parameter which is not a Column object, we need to convert it to a Column object. For example, suppose that we need a function which takes an integer as its second parameter and adds it to a column. we define the UDF as usual. But when we want to use the UDF inside withColumn(), we need to wrap the second parameter which is an integer with lit():

def add_const(column, number):
return column+number

add_const_udf = F.udf(lambda column, number: add_const(column, number),
T.IntegerType())
df2 = df.withColumn("salary", add_const_udf(col("salary"), F.lit(200)))
df2.show()
+-----+--------------+------+
| name| role|salary|
+-----+--------------+------+
| John|Data scientist| 4700|
|James| Data engineer| 3400|
|Laura|Data scientist| 4300|
| Ali| Data engineer| 3400|
|Steve| Developer| 3800|
+-----+--------------+------+

The syntax of the method lit() is as follows:

lit(col: Any) → Column

It creates a Column object with the literal value. So lit(200) creates a Column in which all the elements are equal to 5. The number of rows in this column will be equal to the number of rows in the original DataFrame.

We can also use a curried function for this purpose. In that case, we don’t need lit() anymore:

def add_const(column, number):
return column+number

def add_const_udf(number):
return F.udf(lambda column: add_const(column, number), T.IntegerType())

df2 = df.withColumn("salary", add_const_udf(200)(col("salary")))
df2.show()
+-----+--------------+------+
| name| role|salary|
+-----+--------------+------+
| John|Data scientist| 4700|
|James| Data engineer| 3400|
|Laura|Data scientist| 4300|
| Ali| Data engineer| 3400|
|Steve| Developer| 3800|
+-----+--------------+------+

11.2. UDFs and broadcasting

In the next section, we will discuss the UDFs in which some of the parameters are not a Column object. To use such UDFs, we need to broadcast their parameters. Broadcast variables allow us to share a read-only copy of a variable with every node in an efficient manner. So there will be no need to ship a copy of it with tasks. Spark can distribute broadcast variables using efficient broadcast algorithms to reduce the communication cost. We can use the method broadcast()in SparkContext class to create a broadcast variable. Here is an example:

v = [1, 2, 3]
broadcast_v = spark.sparkContext.broadcast(v)
broadcast_v.value
[1, 2, 3]

As you see the original variable is wrapped with a Broadcast object. After creating a broadcast variable, it should be used instead of the original variable in any functions that is running on the cluster so that the original variable is not shipped to the nodes more than once. In addition, the original variable should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable. When a method needs to process a broadcast variable, it cannot use the broadcast variable directly. Instead, its value should be accessed by calling the value() method. This method returns the actual variable which was wrapped by broadcast().

In the next example, we use broadcasting to create a UDF that takes a dict as a parameter. We first broadcast the dict, and pass it into the curried UDF function get_job_code_udf().

job_codes = {"Data scientist": 0, "Data engineer": 1, "Developer": 2}
broadcast_job_codes = spark.sparkContext.broadcast(job_codes)

def get_job_code_udf(codes):
return F.udf(lambda role: codes[role])

df1 = df.select(F.col("role"),
get_job_code_udf(broadcast_job_codes.value)(col("role")) \
.alias("job_code"))
df1.show()
+--------------+--------+
| role|job_code|
+--------------+--------+
|Data scientist| 0|
| Data engineer| 1|
|Data scientist| 0|
| Data engineer| 1|
| Developer| 2|
+--------------+--------+

11.3. Null values in UDFs

UDFs cannot handle the null values alone. For example, the following code snippet will throw an error in PySpark:

# This doesn't run 
data = [1, 2, 3, 4, None]
df1 = spark.createDataFrame(data, T.IntegerType()).toDF("number")

is_evenUDF = F.udf(lambda x: x % 2 == 0, T.BooleanType())
df2 = df1.withColumn("even", is_evenUDF(F.col("number")))
df2.show()

Here data has a None value and the UDF doesn’t know how to handle it. However, we can modify the UDF function to expect null values:

data = [1, 2, 3, 4, None]
df1 = spark.createDataFrame(data, IntegerType()).toDF("number")
def is_even(x):
return x % 2 == 0 if x else None

is_evenUDF = F.udf(lambda x: is_even(x), BooleanType())
df2 = df1.withColumn("even", is_evenUDF(F.col("number")))
df2.show()
+------+-----+
|number| even|
+------+-----+
| 1|false|
| 2| true|
| 3|false|
| 4| true|
| null| null|
+------+-----+

12. Timestamp

We can create a timestamp column using cast():

data = [
("2020-01-01 07:30:10.150007", "17.0"),
("2020-01-02 07:30:10.150007", "25.5"),
("2020-01-03 07:30:10.150007", "19.5"),
("2020-01-04 07:30:10.150007", "21.2"),
("2020-01-05 07:30:10.150007", "18.0"),
("2020-01-06 07:30:10.150007", "20.5")
]
df1 = spark.createDataFrame(data).toDF("time", "temperature")

df1 = df1.withColumn("time", col("time").cast("timestamp"))
df1.show(truncate=False)
+--------------------------+-----------+
|time |temperature|
+--------------------------+-----------+
|2020-01-01 07:30:10.150007|17.0 |
|2020-01-02 07:30:10.150007|25.5 |
|2020-01-03 07:30:10.150007|19.5 |
|2020-01-04 07:30:10.150007|21.2 |
|2020-01-05 07:30:10.150007|18.0 |
|2020-01-06 07:30:10.150007|20.5 |
+--------------------------+-----------+

12.1. date_trunc()

The method date_trunc() returns a timestamp truncated to the unit specified by the format. The syntax is as follows:

date_trunc(format: str, timestamp: ColumnOrName) → Column

The format is given by the parameter format. It can be 'year’, 'yyyy’, 'yy’ to truncate by year, 'month', 'mon', 'mm' to truncate by month, ‘day’, ‘dd’ to truncate by day, Other options are: 'microsecond', 'millisecond', 'second', 'minute', 'hour', 'week', 'quarter'. The parameter timestamp provides the name of the timestamp column (or the Column object) that we want to truncate. Here is an example:

df2 = df1.withColumn("hour", F.date_trunc('hour', col('time'))) \
.withColumn("minute", F.date_trunc('minute', col('time')))
df2.show(truncate=False)
+--------------------------+-----------+-------------------+-------------------+
|time |temperature|hour |minute |
+--------------------------+-----------+-------------------+-------------------+
|2020-01-01 07:30:10.150007|17.0 |2020-01-01 07:00:00|2020-01-01 07:30:00|
|2020-01-02 07:30:10.150007|25.5 |2020-01-02 07:00:00|2020-01-02 07:30:00|
|2020-01-03 07:30:10.150007|19.5 |2020-01-03 07:00:00|2020-01-03 07:30:00|
|2020-01-04 07:30:10.150007|21.2 |2020-01-04 07:00:00|2020-01-04 07:30:00|
|2020-01-05 07:30:10.150007|18.0 |2020-01-05 07:00:00|2020-01-05 07:30:00|
|2020-01-06 07:30:10.150007|20.5 |2020-01-06 07:00:00|2020-01-06 07:30:00|
+--------------------------+-----------+-------------------+-------------------+

In the next example, we round a timestamp to the nearest hour by first adding 30 minutes to it and then using date_trunc():

# Round the time column to the nearest hour
df2 = df1.withColumn('time1',
F.date_trunc('hour', col('time') + F.expr('INTERVAL 30 MINUTES')))
df2.show(truncate=False)
+--------------------------+-----------+-------------------+
|time |temperature|time1 |
+--------------------------+-----------+-------------------+
|2020-01-01 07:30:10.150007|17.0 |2020-01-01 08:00:00|
|2020-01-02 07:30:10.150007|25.5 |2020-01-02 08:00:00|
|2020-01-03 07:30:10.150007|19.5 |2020-01-03 08:00:00|
|2020-01-04 07:30:10.150007|21.2 |2020-01-04 08:00:00|
|2020-01-05 07:30:10.150007|18.0 |2020-01-05 08:00:00|
|2020-01-06 07:30:10.150007|20.5 |2020-01-06 08:00:00|
+--------------------------+-----------+-------------------+

12.2. Extracting timestamp parts

The module functions has some built-in functions to extract the different parts of a timestamp. For example, we can use day() to extract the day of the month of a given timestamp as an integer. Similarly, we can use the functions year(), month(), … to extract the other parts. The next example shows how to use these functions:

df2 = df1.withColumn("year", F.year(df1["time"])) \
.withColumn("month", F.month(df1["time"])) \
.withColumn("day", F.dayofmonth(df1["time"])) \
.withColumn("week", F.weekofyear(df1["time"])) \
.withColumn("day of week", F.dayofweek(df1["time"])) \
.withColumn("hour", F.hour(df1["time"])) \
.withColumn("minute", F.minute(df1["time"])) \
.withColumn("second", F.second(df1["time"]))

df2.show(truncate=False)
+--------------------------+-----------+----+-----+---+----+-----------+----+------+------+
|time |temperature|year|month|day|week|day of week|hour|minute|second|
+--------------------------+-----------+----+-----+---+----+-----------+----+------+------+
|2020-01-01 07:30:10.150007|17.0 |2020|1 |1 |1 |4 |7 |30 |10 |
|2020-01-02 07:30:10.150007|25.5 |2020|1 |2 |1 |5 |7 |30 |10 |
|2020-01-03 07:30:10.150007|19.5 |2020|1 |3 |1 |6 |7 |30 |10 |
|2020-01-04 07:30:10.150007|21.2 |2020|1 |4 |1 |7 |7 |30 |10 |
|2020-01-05 07:30:10.150007|18.0 |2020|1 |5 |1 |1 |7 |30 |10 |
|2020-01-06 07:30:10.150007|20.5 |2020|1 |6 |2 |2 |7 |30 |10 |
+--------------------------+-----------+----+-----+---+----+-----------+----+------+------+

The function unix_timestamp() converts time string with a given pattern (yyyy-MM-dd HH:mm:ss, by default) to a Unix timestamp in seconds using the default timezone and the default locale. A Unix timestamp is a count of total seconds since January 1st, 1970 at UTC. Therefore, the Unix timestamp is the number of seconds between a particular date and 1970–01–01 00:00:00 UTC as an unsigned integer.

df1.select(F.unix_timestamp(col('time'))).show()
+-----------------------------------------+
|unix_timestamp(time, yyyy-MM-dd HH:mm:ss)|
+-----------------------------------------+
| 1577863810|
| 1577950210|
| 1578036610|
| 1578123010|
| 1578209410|
| 1578295810|
+-----------------------------------------+

We can also cast the timestamp to a long to get the same result:

df1.select(col('time').cast('long')).show()
+----------+
| time|
+----------+
|1577863810|
|1577950210|
|1578036610|
|1578123010|
|1578209410|
|1578295810|
+----------+

However, both unix_timestamp() and cast('long') ignore the microseconds in a timestamp. To extract the microseconds, you can use the following code snippet:

microsec_str = F.when(F.split(df1['time'], '[.]').getItem(1).isNull() , 0) \
.otherwise(F.split(df1['time'], '[.]').getItem(1))
df1.select((microsec_str/ 10**F.length(microsec_str)) \
.alias("microseconds")).show()
+------------+
|microseconds|
+------------+
| 0.150007|
| 0.150007|
| 0.150007|
| 0.150007|
| 0.150007|
| 0.150007|
+------------+

12.3. Changing the time zone

The function from_utc_timestamp() shifts the timestamp value from the UTC timezone to the given timezone. The next example changes the time zone of the column time in df1 to America/New_York assuming that it is in UTC.

df2 = df1.withColumn("time", F.from_utc_timestamp(df1["time"],
"America/New_York"))
df2.show(truncate=False)
+--------------------------+-----------+
|time |temperature|
+--------------------------+-----------+
|2020-01-01 02:30:10.150007|17.0 |
|2020-01-02 02:30:10.150007|25.5 |
|2020-01-03 02:30:10.150007|19.5 |
|2020-01-04 02:30:10.150007|21.2 |
|2020-01-05 02:30:10.150007|18.0 |
|2020-01-06 02:30:10.150007|20.5 |
+--------------------------+-----------+

The function to_utc_timestamp() shifts the timestamp value from the given timezone to the UTC timezone. In the next example, we shift the column time from America/New_York back to UTC.

df2 = df2.withColumn("time", F.to_utc_timestamp(df2["time"],
"America/New_York"))
df2.show(truncate=False)
+--------------------------+-----------+
|time |temperature|
+--------------------------+-----------+
|2020-01-01 07:30:10.150007|17.0 |
|2020-01-02 07:30:10.150007|25.5 |
|2020-01-03 07:30:10.150007|19.5 |
|2020-01-04 07:30:10.150007|21.2 |
|2020-01-05 07:30:10.150007|18.0 |
|2020-01-06 07:30:10.150007|20.5 |
+--------------------------+-----------+

12.4. Creating a date range

A date range is the range of equally spaced time points between a start date and an end date where the difference between any two adjacent points is specified by a given frequency. For example, we can create a date range between 2018–01–01 and 2018–05–01 with a frequency of 1 month. In that case, the date range is:

2018-01-01, 2018-02-01, 2018-03-01, 2018-04-01, 2018-05-01

We can use a SQL query to create a date range in PySpark. Here is an example:

start_dt='2018-01-01'
end_dt='2018-05-01'
query_str = "SELECT explode(sequence(to_date('{}'), to_date('{}'), interval 1 month)) as date".format(start_dt, end_dt)
df1 = spark.sql(query_str)
df1.show()
+----------+
| date|
+----------+
|2018-01-01|
|2018-02-01|
|2018-03-01|
|2018-04-01|
|2018-05-01|
+----------+

In this query, we used the functions explode() and sequence() to create the date range between start_dt and end_dt.

13. Partitions

As mentioned before, in PySpark DataFrames are distributed data structures, so PySpark splits them and distributes them among the worker nodes of a cluster. Each split of such a distributed data set is called a partition. In fact, in PySpark, a partition is a logical chunk of data stored on a node in the cluster. Partitions are the basic units of parallelism in PySpark, and DataFrames are a collection of partitions. Every node in a spark cluster contains one or more partitions. However, a partition cannot span multiple nodes. The number of partitions in PySpark is configurable, and having too few or too many partitions lowers the performance.

PySpark can run one concurrent task for every partition of an RDD, however, the total number of concurrent tasks is limited by the total number of cores in the cluster (Each node can have more than one core). For example, If a cluster has 10 cores, then Spark can run 10 concurrent tasks at most. So a DataFrame on such a cluster should have at least 10 partitions (in practice it is 2 to 4 times more). If you have less than 10 partitions, some of the cores remain idle which results in less concurrency.

On the other hand, if you have too many partitions, then task scheduling for many small tasks in Spark may take more time than the actual execution time. In addition, the data communication between nodes is very expensive., so Spark gets slow. Thus, there is always a trade-off when it comes to deciding on the number of partitions. As a rule of thumb, the number of partitions should be 1 to 4 times the number of cores in the cluster.

13.1. getNumPartitions()

To get the number of partitions of a DataFrame you should first convert it to an RDD using the property rdd that returns the content of the DataFrame as an RDD. Then the method getNumPartitions() can be used to get the number of partitions in that RDD. Here is an example:

df1 = spark.range(1, 11).select(col("id").cast("int").alias("number"))
df1.rdd.getNumPartitions()
8

So this DataFrame has 8 partitions.

13.2. repartition()

You can also change the number of partitions of an RDD or DataFrame. The method repartition with the following syntax:

repartition(numPartitions: int) → RDD[T][source]

It converts the RDD on which it is invoked into an RDD that has exactly numPartitions partitions and returns it as a new RDD.

df2 = df1.repartition(3)
df2.rdd.getNumPartitions()
3

14. Adding an index

PySpark DataFrames are inherently unordered. They do not support random access, so they don’t have a built-in index like the Pandas library in Python. A DataFrame is a distributed data structure in PySpark, and each node of a PySpark cluster can store and process part of that without being worried about the order of rows. In fact, each row of a DataFrame is considered an independent collection of structured data to be able to support the distributed parallel processing. However, it is possible to add a new column to a DataFrame that plays the role of an index.

14.1. monotonically_increasing_id()

The method monotonically_increasing_id() in the module functions returns a Column object with monotonically increasing 64-bit integers. The rows of this column are guaranteed to be monotonically increasing and unique, but not consecutive.

As an example, consider the DataFrame df1 which was created before. We repartition it to have 3 partitions and then add an ID column to it.

df1 = spark.range(1, 11).select(col("id").cast("int").alias("number"))
df2 = df1.repartition(3)
df2 = df2.withColumn("id", F.monotonically_increasing_id())
df2.show()
+------+-----------+
|number| id|
+------+-----------+
| 5| 0|
| 6| 1|
| 7| 2|
| 9| 3|
| 1| 8589934592|
| 2| 8589934593|
| 4| 8589934594|
| 3|17179869184|
| 8|17179869185|
| 10|17179869186|
+------+-----------+

Now let’s see how the ID values were generated. The original DataFrame had 3 partitions. The partition index is shifted by 33 bits for each partition to create a partition ID. The records in each partition are numbered by consecutive integers starting from zero. These numbers are added to the partition ID to create the ID of that record. So the IDs are not generally consecutive, however, in each partition the IDs are consecutive.

Figure 1 shows how these IDs in the previous code snippet were generated.

Figure 1

For example, the last row of the column id belongs to partition 2. So, the partition ID is:

2L << 33
17179869184

Since there are 3 rows in this partition, their IDs will be: 17179869184+0, 17179869184+1, and 17179869184+2. In PySpark an expression is deterministic if it evaluates to the same result for the same inputs. monotonically_increasing_id() is not deterministic since its result depends on partition IDs.

14.2. coalesce()

We can also use the method coalesce() to change the number of partitions of an RDD or DataFrame. Here is an example:

df2 = df1.coalesce(3)
df2.rdd.getNumPartitions()
3

Though coalesce() and repartition() look similar, they have some important differences. The method repartition() can be used to increase or decrease the number of partitions, however, coalesce() can only decrease the number of partitions. In addition, repartition() shuffles the records across the nodes of the cluster to create the new partitions. On the other hand, coalesce() does not trigger a shuffle and uses the existing partitions to create the new ones. The next example shows this difference. Here, we first create a DataFrame with 4 partitions.

df1 = spark.range(1, 13).select(col("id").cast("int") \
.alias("number")).coalesce(4)
df2 = df1.withColumn("id", F.monotonically_increasing_id())
df2.show()
+------+-----------+
|number| id|
+------+-----------+
| 1| 0|
| 2| 1|
| 3| 2|
| 4| 8589934592|
| 5| 8589934593|
| 6| 8589934594|
| 7|17179869184|
| 8|17179869185|
| 9|17179869186|
| 10|25769803776|
| 11|25769803777|
| 12|25769803778|
+------+-----------+

Please note that the records on one partition have consecutive ids. Hence, we have four partitions that contain [1,2,3], [4,5,6], [7,8,9], [10,11,12]. Next, we first decrease the number of partitions using repartition():

df2 = df1.repartition(2)
df2 = df2.withColumn("id", F.monotonically_increasing_id())
df2.show()
+------+----------+
|number| id|
+------+----------+
| 1| 0|
| 3| 1|
| 4| 2|
| 5| 3|
| 7| 4|
| 8| 5|
| 10| 6|
| 11| 7|
| 2|8589934592|
| 6|8589934593|
| 9|8589934594|
| 12|8589934595|
+------+----------+

As you see, the records in the original partitions are shuffled to form the resulting 2 partitions. This can be an expensive operation when you have lots of data. Now, we do the same thing using coalesce():

df2 = df1.coalesce(2)
df2 = df2.withColumn("id", F.monotonically_increasing_id())
df2.show()
+------+----------+
|number| id|
+------+----------+
| 1| 0|
| 2| 1|
| 3| 2|
| 4| 3|
| 5| 4|
| 6| 5|
| 7|8589934592|
| 8|8589934593|
| 9|8589934594|
| 10|8589934595|
| 11|8589934596|
| 12|8589934597|
+------+----------+

Here, the records on the second node ([4,5,6]) were moved to the node that contains [1,2,3] to form the first partition. Similarly, the records on the 4th node([10,11,12]) were moved to the node that contains [7,8,9]. Here the records were not shuffled. Hence the data movement between the nodes of the cluster is minimized. The method coalesce() avoids unnecessary data movement, when you are decreasing the number of partitions.

14.3. zipWithIndex()

We can also use the method zipWithIndex() to generate an ID column. It zips the RDD on which it is invoked with its element indices. The original RDD has objects of type Row, so the result is an RDD that has objects of type (Row, long). Hence, each record of the resulting RDD is a tuple that combines the records of the original RDD with an index of type long. The indices start from zero and increase by 1.

The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index. Here is an example:

data =[ 
("John", "Data scientist", 4500),
("James", "Data engineer", 3200),
("Laura", "Data scientist", 4100),
("Ali", "Data engineer", 3200),
("Steve", "Developer", 3600)
]
df = spark.createDataFrame(data).toDF("name","role", "salary")
df.show()

zipped_rdd = df.rdd.zipWithIndex()
zipped_rdd.collect()
[(Row(name='John', role='Data scientist', salary=4500), 0),
(Row(name='James', role='Data engineer', salary=3200), 1),
(Row(name='Laura', role='Data scientist', salary=4100), 2),
(Row(name='Ali', role='Data engineer', salary=3200), 3),
(Row(name='Steve', role='Developer', salary=3600), 4)]

So the result is an RDD in which each record is a tuple. The first element of this tuple is the original record of df and the second element is its generated index. To create a DataFrame from this RDD we can use the following code:

rdd = df.rdd.zipWithIndex().map(lambda row: (row[1],) + tuple(row[0]))
df2 = spark.createDataFrame(rdd, schema= ['id']+ list(df.columns))
df2.show()
+---+-----+--------------+------+
| id| name| role|salary|
+---+-----+--------------+------+
| 0| John|Data scientist| 4500|
| 1|James| Data engineer| 3200|
| 2|Laura|Data scientist| 4100|
| 3| Ali| Data engineer| 3200|
| 4|Steve| Developer| 3600|
+---+-----+--------------+------+

15. Sorting

15.1. sort()

This method sorts a DataFrame on which it is invoked by the specified columns (all in ascending order) and returns the result as a new DataFrame. In the next example, we sort the column name in df in ascending order:

data =[ 
("John", "Data scientist", 4500),
("James", "Data engineer", 3200),
("Laura", "Data scientist", 4100),
("Ali", "Data engineer", 3200),
("Steve", "Developer", 3600)
]
df = spark.createDataFrame(data).toDF("name","role", "salary")

df.sort("name").show()
+-----+--------------+------+
| name| role|salary|
+-----+--------------+------+
| Ali| Data engineer| 3200|
|James| Data engineer| 3200|
| John|Data scientist| 4500|
|Laura|Data scientist| 4100|
|Steve| Developer| 3600|
+-----+--------------+------+

We could also pass a Column object to sort():

df.sort(df["name"]).show()

This method can also take a list of columns to sort by:

df.sort("salary", "name").show()
+-----+--------------+------+
| name| role|salary|
+-----+--------------+------+
| Ali| Data engineer| 3200|
|James| Data engineer| 3200|
|Steve| Developer| 3600|
|Laura|Data scientist| 4100|
| John|Data scientist| 4500|
+-----+--------------+------+

15.2 desc()

If you need to sort a column in descending order, then you need to use the desc() method in class Column. It returns a sort expression based on the descending order of the column on which it is invoked. For example, we can sort the column name in the previous example in descending order using the following code snippet:

df.sort(df["name"].desc()).show()
+-----+--------------+------+
| name| role|salary|
+-----+--------------+------+
|Steve| Developer| 3600|
|Laura|Data scientist| 4100|
| John|Data scientist| 4500|
|James| Data engineer| 3200|
| Ali| Data engineer| 3200|
+-----+--------------+------+

The module functions has a method called desc() that can be used for the same purpose. So the previous example can be also written as:

df.sort(F.desc("name")).show()

16. Aggregation

16.1 groupBy()

The method groupBy() is used to collect the identical data of specific columns into groups in a DataFarme and run aggregation on the grouped data. The syntax is as follows:

groupBy(*cols: ColumnOrName) → GroupedData

It takes the name of Column objects as parameters. But you can also pass the name of the columns. It returns an object of the class GroupedData. This class has a set of methods for aggregations on a DataFrame created by groupby(). Table 3 gives a list of some of the aggregation methods in RelationalGroupedDataset that we can use.

Table. 3

+-----------------+---------------------------------------------+
| Method | Descritption |
+-----------------+---------------------------------------------+
| mean(), avg() | Returns the mean of values in each group |
| max() | Returns the maximum of values in each group |
| min() | Returns the minimum of values in each group |
| sum() | Returns the sum of values in each group |
| count() | Counts the number of rows in each group |
+-----------------+---------------------------------------------+

Here is an example. In the DataFrame df, we want to see how many people are employed in each role. To do that we use groupBy() with the aggregation method count() (this is like using the value_counts() function in Pandas).

data =[ 
("John", "Data scientist", 4500),
("James", "Data engineer", 3200),
("Laura", "Data scientist", 4100),
("Ali", "Data engineer", 3200),
("Steve", "Developer", 3600)
]
df = spark.createDataFrame(data).toDF("name","role", "salary")

df.groupBy("role").count().show()
+--------------+-----+
| role|count|
+--------------+-----+
|Data scientist| 2|
| Data engineer| 2|
| Developer| 1|
+--------------+-----+

The method count() is used to count the number of rows in each group. The resulting DataFrame will also contain the grouping columns and their corresponding counts. We can have multiple columns in groupBy(). For example, we can group by both role and salary:

df.groupBy("role", "salary").count().show()
+--------------+------+-----+
| role|salary|count|
+--------------+------+-----+
|Data scientist| 4500| 1|
| Data engineer| 3200| 2|
|Data scientist| 4100| 1|
| Developer| 3600| 1|
+--------------+------+-----+

16.2 agg()

We can also use the method agg() to compute aggregates by specifying a series of aggregate columns. Each parameter is a column expression. This function is useful when we want to run more than one aggregate function at a time. Remember that the module functions provides the built-in standard functions to work with the columns. We can use them as our aggregate functions after groupBy().

For example suppose that in the DataFrame df we want to see how many people are employed in each role, and what is the average salary in each role. We can use the following code:

df.groupBy("role").agg(F.count("salary"), F.avg("salary")).show()
+--------------+-------------+-----------+
| role|count(salary)|avg(salary)|
+--------------+-------------+-----------+
|Data scientist| 2| 4300.0|
| Data engineer| 2| 3200.0|
| Developer| 1| 3600.0|
+--------------+-------------+-----------+

It is important to note that these functions can be also used for a DataFrame that is not partitioned into groups. In that case, the function is applied to the whole DataFrame (it is like a DataFrame with only one group), and a Column object with only one row is returned. For example, to calculate the sum of all salaries in df, we can write:

df.agg(F.sum("salary")).show()
+-----------+
|sum(salary)|
+-----------+
| 18600|
+-----------+

In addition, you don’t need to use agg() here. The resulting column can be selected using the select() method. So the previous code can be also written as:

df.select(F.sum("salary")).show()

17. Pivoting

17.1. pivot()

The method pivot() is a member of the class GroupedData. So to use it, first you need to invoke a method like groupby() on a DataFrame to return a GroupedDataobject, and then invoke pivot() on that object. The syntax is as follows:

pivot(pivot_col, values = None) → GroupedData

It pivots a column of the grouped DataFrame in the GroupedData object which means that it turns the unique row values of pivot_col into column headings of the pivoted DataFrame. This pivoted DataFrame is returned as a new GroupedData object. Then you can apply the aggregation methods to this result. The parameter pivot_col gives the name of the column to pivot as a String. However, you can also pass a column object.

We can also specify the list of distinct values in pivot_col to pivot on in the parameter values. This version is more efficient since if you don’t specify values, PySpark needs to first compute the list of distinct values in pivot_col internally. Let’s see an example of this method. We first define a DataFrame called product_df.

data = [("P1", 100, "Vancouver"), ("P2", 150, "Vancouver"),
("P3", 130, "Vancouver"), ("P4", 190, "Vancouver"),
("P1", 50, "Toronto"), ("P2", 60, "Toronto"),
("P3", 70, "Toronto"), ("P4", 60, "Toronto"),
("P1", 30, "Calgary"), ("P2", 140 ,"Calgary")]

product_df = spark.createDataFrame(data) \
.toDF("product_id", "quantity", "city")
product_df.show()
+----------+--------+---------+
|product_id|quantity| city|
+----------+--------+---------+
| P1| 100|Vancouver|
| P2| 150|Vancouver|
| P3| 130|Vancouver|
| P4| 190|Vancouver|
| P1| 50| Toronto|
| P2| 60| Toronto|
| P3| 70| Toronto|
| P4| 60| Toronto|
| P1| 30| Calgary|
| P2| 140| Calgary|
+----------+--------+---------+

This DataFrame gives the quantity of an ordered product for each city. Now we first use groupBy() to group this DataFrame by the column productID and then pivot the column city and get the total quantity of each ordered product in each city.

pivot_df = product_df.groupBy("product_id").pivot("city").sum("quantity")
pivot_df.show()
+----------+-------+-------+---------+
|product_id|Calgary|Toronto|Vancouver|
+----------+-------+-------+---------+
| P4| null| 60| 190|
| P3| null| 70| 130|
| P1| 30| 50| 100|
| P2| 140| 60| 150|
+----------+-------+-------+---------+

We could also specify the list of distinct cities specify the list of distinct values in city to pivot to get the same result:

cities = ["Calgary", "Toronto", "Vancouver"]
pivot_df =product_df.groupBy("product_id") \
.pivot("city", cities).sum("quantity")
pivot_df.show()
+----------+-------+-------+---------+
|product_id|Calgary|Toronto|Vancouver|
+----------+-------+-------+---------+
| P4| null| 60| 190|
| P3| null| 70| 130|
| P1| 30| 50| 100|
| P2| 140| 60| 150|
+----------+-------+-------+---------+

17.2. Unpivoting

Unpivoting a table means that we turn multiple column headers into a single column (each column header will be a row of this new column). We also create a second column to store the values of the original columns. To unpivot a DataFrame, we can use the built-in SQL method stack(). The following code unpivots pivot_df:

expr = "stack(3, 'Vancouver', Vancouver, 'Toronto', Toronto, 'Calgary', Calgary) as (city, quantity)"
unpivot_df = pivot_df.select(F.col("product_id"), F.expr(expr)) \
.filter(F.col("quantity").isNotNull())
unpivot_df.show()
+----------+---------+--------+
|product_id| city|quantity|
+----------+---------+--------+
| P4|Vancouver| 190|
| P4| Toronto| 60|
| P3|Vancouver| 130|
| P3| Toronto| 70|
| P1|Vancouver| 100|
| P1| Toronto| 50|
| P1| Calgary| 30|
| P2|Vancouver| 150|
| P2| Toronto| 60|
| P2| Calgary| 140|
+----------+---------+--------+

Let’s see how it works. The SQL statement:

stack(3, 'Vancouver', Vancouver, 'Toronto', Toronto, 'Calgary', Calgary) as (city, quantity)

takes each row of pivot_df and turns it into 3 rows. Since we have 6 expressions in stack(), we get 2 columns for these 3 rows. These columns will be named city and quantity. In each row, the first column will take the city name as a literal, and the second column takes the value of the column that represents that city. For example, the first row of pivot_df which is

+----------+-------+-------+---------+
|product_id|Calgary|Toronto|Vancouver|
+----------+-------+-------+---------+
| P4| null| 60| 190|
+----------+-------+-------+---------+

turns into

+----------+---------+--------+
|product_id| city|quantity|
+----------+---------+--------+
| P4|Vancouver| 190|
| P4| Toronto| 60|
| P4| Calgary| null|
+----------+---------+--------+

We also select product_id to keep it as a separate column. Finally filter() removes the rows in which quantity is null.

18. Window functions

Window functions are useful for tasks like calculating a moving average or a cumulative sum. A window is a collection of rows that are defined relative to the current row of a DataFrame. Each row of a window is a specified number of rows away from the current row. For example, for each row of a DataFrame we can define a window that contains that row and the row before it. So for each row, the corresponding window follows the same pattern, but its elements are different. Once a window is defined, we can apply a window function to it. Then for each row of the DataFrame, the window function will use its corresponding window.

The class Window in package pyspark.sql.window provides the utility methods for defining a window in a DataFrame. All these methods return a WindowSpec object that stores the partitioning, ordering, and frame boundaries of a window for a DataFrame. So when you define a window, you need to use the methods in the class Window to provide this information.

18.1. orderBy()

A window is defined based on the rows of one or more ordered columns in a DataFrame. The method orderBy() in class Window takes these columns and returns a WindowSpec object with the defined ordering. In fact it sorts the DataFrame in ascending order based on the given columns and assigns a window to each row. The syntax is as follows:

orderBy(*cols: Union[ColumnOrName, List[ColumnOrName_]]) → WindowSpec

It can take either the name of a column as a string or a Column object.

18.2. partitionBy()

We can also first partition a DataFrame into groups and then create windows for all the rows in each partition. We use the method partitionBy() in class Window to define the partitions. The syntax is as follows:

partitionBy(*cols: Union[ColumnOrName, List[ColumnOrName_]]) → WindowSpec

18.3. rowsBetween()

To define the frame boundaries of the window, we can use two other methods of Window: rowsBetween() and rangeBetween(). The syntax of rowsBetween() is defined as:

rowsBetween(start: int, end: int) → WindowSpec

It returns a WindowSpec object with the frame boundaries defined from start (inclusive) to end (inclusive). Both of them are defined relative to the current row. For example, start=0 means that the frame boundary of the window starts from the current row, start=-1 means that it starts from the row before the current row, and start=2 means that it starts 2 rows after the current row. It is recommended to set start or end to Window.currentRow() rather than using the value of 0 to describe the current row.

A frame is unbounded if start is set to Window.unboundedPreceding or end is set to Window.unboundedFollowing. The constant Window.unboundedPreceding is equal to the minimum value of a long type and represents the first row in a partition. Similarly Window.unboundedFollowing is equal to the maximum value of a long type and represents the last row in a partition. Again it is recommended to use them instead of a value to describe the first or last row of a partition.

Let’s see some examples of using Windows. Remember the DataFrame df1 which was defined before:

data = [
("2020-01-01 07:30:00", "17.0"),
("2020-01-02 07:30:00", "25.5"),
("2020-01-03 07:30:00", "19.5"),
("2020-01-04 07:30:00", "21.2"),
("2020-01-05 07:30:00", "18.0"),
("2020-01-06 07:30:00", "20.5")
]

df1 = spark.createDataFrame(data).toDF("time", "temperature")
df1 = df1.withColumn("temperature",
col("temperature").cast(DoubleType()))
df1.show()
+-------------------+-----------+
| time|temperature|
+-------------------+-----------+
|2020-01-01 07:30:00| 17.0|
|2020-01-02 07:30:00| 25.5|
|2020-01-03 07:30:00| 19.5|
|2020-01-04 07:30:00| 21.2|
|2020-01-05 07:30:00| 18.0|
|2020-01-06 07:30:00| 20.5|
+-------------------+-----------+

Now suppose that we want to calculate the moving average of the last 3 rows of the column temperature. We can use the following code:

from pyspark.sql.window import Window
w = Window.orderBy("time").rowsBetween(-2, Window.currentRow)
df1.withColumn("rolling_average",
F.round(F.avg("temperature").over(w), 2)).show()
+-------------------+-----------+---------------+
| time|temperature|rolling_average|
+-------------------+-----------+---------------+
|2020-01-01 07:30:00| 17.0| 17.0|
|2020-01-02 07:30:00| 25.5| 21.25|
|2020-01-03 07:30:00| 19.5| 20.67|
|2020-01-04 07:30:00| 21.2| 22.07|
|2020-01-05 07:30:00| 18.0| 19.57|
|2020-01-06 07:30:00| 20.5| 19.9|
+-------------------+-----------+---------------+

Here we first define the window by ordering the column time and defining the window frame boundaries or including the current row and the two rows before that. We apply the aggregate function avg() to column temperature and then apply the method over() to it to calculate the average of this window for each row of df1.

Remember that in orderBy() the order is ascending by default. If you need to sort the columns in descending order, you can use the desc() method:

w = Window.orderBy(df1["time"].desc()).rowsBetween(-2, Window.currentRow)  
df1.withColumn("rolling_average",
F.round(F.avg("temperature").over(w), 2)).show()
+-------------------+-----------+---------------+
| time|temperature|rolling_average|
+-------------------+-----------+---------------+
|2020-01-06 07:30:00| 20.5| 20.5|
|2020-01-05 07:30:00| 18.0| 19.25|
|2020-01-04 07:30:00| 21.2| 19.9|
|2020-01-03 07:30:00| 19.5| 19.57|
|2020-01-02 07:30:00| 25.5| 22.07|
|2020-01-01 07:30:00| 17.0| 20.67|
+-------------------+-----------+---------------+

In the previous example, we also used the method round() in functions to round the result. We can also use the other methods of this module like count(), min(), max(), and sum()over the windows.

If orderBy() is used to create a window, but window frame specification is not given, then the frame boundaries are defined from start=Window.unboundedPreceding (inclusive) to end=Window.currentRow (inclusive). We can use this to calculate the cumulative sum of the column temperature in df1:

w = Window.orderBy("time")
df1.withColumn("cum_sum", F.sum("temperature").over(w)).show()
+-------------------+-----------+-------+
| time|temperature|cum_sum|
+-------------------+-----------+-------+
|2020-01-01 07:30:00| 17.0| 17.0|
|2020-01-02 07:30:00| 25.5| 42.5|
|2020-01-03 07:30:00| 19.5| 62.0|
|2020-01-04 07:30:00| 21.2| 83.2|
|2020-01-05 07:30:00| 18.0| 101.2|
|2020-01-06 07:30:00| 20.5| 121.7|
+-------------------+-----------+-------+

The next example shows the use of partitionBy(). In the DataFrame df, we want to give an id to each employee with a specific role starting with the lowest salary:

data =[ 
("John", "Data scientist", 4500),
("James", "Data engineer", 3200),
("Laura", "Data scientist", 4100),
("Ali", "Data engineer", 3200),
("Steve", "Developer", 3600)
]
df = spark.createDataFrame(data).toDF("name","role", "salary")

window_spec = Window.partitionBy("role").orderBy("salary")
df.withColumn("row_number", F.row_number().over(window_spec)).show()
+-----+--------------+------+----------+
| name| role|salary|row_number|
+-----+--------------+------+----------+
|James| Data engineer| 3200| 1|
| Ali| Data engineer| 3200| 2|
|Laura|Data scientist| 4100| 1|
| John|Data scientist| 4500| 2|
|Steve| Developer| 3600| 1|
+-----+--------------+------+----------+

Here we first partition the window with role and then sort it with salary. The method row_number() in functions returns a sequential number starting at 1 within a window partition.

18.4. rangeBetween()

Class Window has another method to specify the frame boundaries which is called rangeBetween(). The syntax of rangeBetween() is defined as:

rangeBetween(start: int, end: int) → WindowSpec

Similar to rowsBetween(), this method creates a WindowSpec with the frame boundaries defined, from start (inclusive) to end (inclusive). However, there are some differences. rangeBetween() works based on the actual value of the column object returned by orderBy() not the position of a row within a partition. Let me explain it using an example:

data = [(1, 1),(2, 2), (2, 3), (2, 4), (4, 5), (6, 6), (7, 7)]
df1 = spark.createDataFrame(data).toDF("id", "num")

w = Window.orderBy("id").rowsBetween(Window.currentRow, 1)
df1.withColumn("rolling_sum", F.sum("num").over(w)).show()
+---+---+-----------+
| id|num|rolling_sum|
+---+---+-----------+
| 1| 1| 3|
| 2| 2| 5|
| 2| 3| 7|
| 2| 4| 9|
| 4| 5| 11|
| 6| 6| 13|
| 7| 7| 7|
+---+---+-----------+

Here we created a DataFrame and defined a window that contains the current row and the row after that. Then we calculated the rolling sum of the column num using that window. Now we replace rowsBetween() with rangeBetween() to see how it changes the result:

w1 = Window.orderBy("id").rangeBetween(Window.currentRow, 1)  
df1.withColumn("rolling_sum", F.sum("num").over(w1)).show()
+---+---+-----------+
| id|num|rolling_sum|
+---+---+-----------+
| 1| 1| 10|
| 2| 2| 9|
| 2| 3| 9|
| 2| 4| 9|
| 4| 5| 5|
| 6| 6| 13|
| 7| 7| 7|
+---+---+-----------+

As you see these two methods give different results. In rowsBetween() the position of the current row is used to calculate its window frame boundaries. For example, using the rowsBetween(), the window for the first row in df1 (id=1, num=1) includes this row and the row in the next position (id=2, num=2). So rolling_sum for this row is 1+2=3.

On the other hand, rangeBetween() uses the actual values of orderBy("id") to calculate the frame boundaries of that row, not its position. Here the column id was sorted at first, so it has the same values of orderBy("id"), and for the current row, the value of orderBy("id") (or id) is used to specify the frame boundaries.

Suppose that the first row (id=1, num=1) is the current row, The value of start and end parameters in rowsBetween() are added to id to calculate the frame boundaries. So the next row in the window should have an id of 1+1=2. But we have 3 rows with id=2. So all these rows are included in the window, and the window contains the rows with num=1, 2, 3, and 4. So rolling_sum for the first row is 1+2+3+4=10. For the second row we have id=2 and num=2, so the next row in the window should have id=2+1=3. However, a row with this id doesn’t exist, so the window only contains the current row, and the current row includes all the rows with id=2. Hence rolling_sum is 2+3+4=9. Similarly, the rows with num=3 and num=4 get the same value of rolling_sum. Now let’s try the following:

w2 = Window.orderBy(F.col("id") * 2).rangeBetween(Window.currentRow, 1)  
df1.withColumn("rolling_sum", F.sum("num").over(w1)) \
.withColumn("id*2", df1["id"]*2).show()
+---+---+-----------+----+
| id|num|rolling_sum|id*2|
+---+---+-----------+----+
| 1| 1| 10| 2|
| 2| 2| 9| 4|
| 2| 3| 9| 4|
| 2| 4| 9| 4|
| 4| 5| 5| 8|
| 6| 6| 13| 12|
| 7| 7| 7| 14|
+---+---+-----------+----+

Here use orderBy("id" * 2) to create the window. The values of orderBy("id" * 2) are not consecutive anymore, so the window for each row only contains the current row.

The method rangeBetween() adds some restrictions to the orderBy() method. When rangeBetween() is used, orderBy() can only take one parameter (a column expression). Since the value of that column expression is added to start and end parameters in rangeBetween() to calculate the frame boundaries, it should have a numerical data type. However, you can use a non-numeric column expression in orderBy(), if the values of start and end in rangeBetween() are limited to Window.unboundedPreceding, Window.currentRow, and Window.unboundedFollowing. In that case, PySpark doesn’t need the value of the column expression in orderBy() to calculate the frame boundaries.

18.5. Shifting a column

We can shift a column using the lag() method in functions. The syntax of lag() is:

lag(col, offset = 1, default = None) → Column

Here col is the name of a column (string) or a Column object. It shifts col by offset and returns the result as a new Column object. For each row of col it returns the value that is offset rows before the current row, and default if there are less than offset rows before the current row. The result of applying it to all rows of col is a new column object which is returned by lag(). Let’s see an example:

data = [
("2020-01-01 07:30:00", "17.0"),
("2020-01-02 07:30:00", "25.5"),
("2020-01-03 07:30:00", "19.5"),
("2020-01-04 07:30:00", "21.2"),
("2020-01-05 07:30:00", "18.0"),
("2020-01-06 07:30:00", "20.5")
]

df1 = spark.createDataFrame(data).toDF("time", "temperature")

w = Window.orderBy("time")
df1.withColumn("lag_col", F.lag("temperature", 1).over(w)).show()
+-------------------+-----------+-------+
| time|temperature|lag_col|
+-------------------+-----------+-------+
|2020-01-01 07:30:00| 17.0| null|
|2020-01-02 07:30:00| 25.5| 17.0|
|2020-01-03 07:30:00| 19.5| 25.5|
|2020-01-04 07:30:00| 21.2| 19.5|
|2020-01-05 07:30:00| 18.0| 21.2|
|2020-01-06 07:30:00| 20.5| 18.0|
+-------------------+-----------+-------+

Here we shift the column temperature by 1 and store it in the column lag_col. In the next example, we shift temperature by -1 (shifting backward) and fill the resulting null value at the end with 0.

w = Window.orderBy("time")  
df1.withColumn("lead_col", F.lag("temperature", -1, 0).over(w)).show()
+-------------------+-----------+--------+
| time|temperature|lead_col|
+-------------------+-----------+--------+
|2020-01-01 07:30:00| 17.0| 25.5|
|2020-01-02 07:30:00| 25.5| 19.5|
|2020-01-03 07:30:00| 19.5| 21.2|
|2020-01-04 07:30:00| 21.2| 18.0|
|2020-01-05 07:30:00| 18.0| 20.5|
|2020-01-06 07:30:00| 20.5| 0.0|
+-------------------+-----------+--------+

In the next example, we calculate the difference of a DataFrame element compared with the previous row.

w = Window.orderBy("time")
df1.withColumn("diff",
F.col("temperature") - F.lag("temperature", 1).over(w)).show()
+-------------------+-----------+-------------------+
| time|temperature| diff|
+-------------------+-----------+-------------------+
|2020-01-01 07:30:00| 17.0| null|
|2020-01-02 07:30:00| 25.5| 8.5|
|2020-01-03 07:30:00| 19.5| -6.0|
|2020-01-04 07:30:00| 21.2| 1.6999999999999993|
|2020-01-05 07:30:00| 18.0|-3.1999999999999993|
|2020-01-06 07:30:00| 20.5| 2.5|
+-------------------+-----------+-------------------+

18.5. Filling a column

Suppose that we have a DataFrame defined as below:

data = [
("2020-01-01 07:30:00", None),
("2020-01-02 07:30:00", 25.5),
("2020-01-03 07:30:00", None),
("2020-01-04 07:30:00", 21.2),
("2020-01-05 07:30:00", 18.0),
("2020-01-06 07:30:00", None)
]

df1 = spark.createDataFrame(data).toDF("time", "temperature")
df1 = df1.withColumn("time", col("time").cast('timestamp'))
df1.show()
+-------------------+-----------+
| time|temperature|
+-------------------+-----------+
|2020-01-01 07:30:00| null|
|2020-01-02 07:30:00| 25.5|
|2020-01-03 07:30:00| null|
|2020-01-04 07:30:00| 21.2|
|2020-01-05 07:30:00| 18.0|
|2020-01-06 07:30:00| null|
+-------------------+-----------+

Now we can use the following code to forward fill the column temperature. It means that we fill null values by filling each null value with the last available observation.

w = Window.orderBy('time')
df2 = df1.select([F.last(c, ignorenulls=True).over(w).alias(c) \
for c in df1.columns])
df2.show()
+-------------------+-----------+
| time|temperature|
+-------------------+-----------+
|2020-01-01 07:30:00| null|
|2020-01-02 07:30:00| 25.5|
|2020-01-03 07:30:00| 25.5|
|2020-01-04 07:30:00| 21.2|
|2020-01-05 07:30:00| 18.0|
|2020-01-06 07:30:00| 18.0|
+-------------------+-----------+

The next code snippet shows how to backward fill the same column. It means that we fill null values by filling each null value with the next available observation.

w = Window.orderBy('time').rowsBetween(Window.currentRow,
Window.unboundedFollowing)
df2 = df1.select([F.first(c,ignorenulls=True).over(w).alias(c) \
for c in df1.columns])
df2.show()
+-------------------+-----------+
| time|temperature|
+-------------------+-----------+
|2020-01-01 07:30:00| 25.5|
|2020-01-02 07:30:00| 25.5|
|2020-01-03 07:30:00| 21.2|
|2020-01-04 07:30:00| 21.2|
|2020-01-05 07:30:00| 18.0|
|2020-01-06 07:30:00| null|
+-------------------+-----------+

19. Joins

The method join() in class DataFrame can be used to join DataFrames using a join expression. The syntax of this method is defined as:

join(other: DataFrame, on: Union[str,List[str],Column,List[Column],None]=None,
how: Optional[str]=None) → DataFrame

It joins the DataFrame on which it is invoked with another DataFrame (right) based on the join expression given in on and returns the result as a new DataFrame. The join expression can be a column name or Column object, a list of column names, Column objects, or a Column expression. The parameter how determines the join type and can take the following values: inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti.

However, we only have 7 different types of join: Inner, full outer, left outer, right outer, left semi, left anti and cross join. Now we are going to explain each of them. It is interesting to note that the name of the join types is case-insensitive. In addition, you can use the underscore (_) anywhere since it will be replaced by ''. Hence full_outer and FULLOUT_er are considered equivalent by join()!

19.1. Inner join

The inner join gives the intersection of two DataFrames. For each row of the first DataFrame (on which join() is invoked), if a row in the second DataFrame (right) is found for which the join expression in on is true, its columns are added to the columns of the row in the first DataFrame to create a new row. Here is an example. We first define two DataFrames df and age_df:

data =[ 
("John", "Data scientist", 4500),
("James", "Data engineer", 3200),
("Laura", "Data scientist", 4100),
("Ali", "Data engineer", 3200),
("Steve", "Developer", 3600)
]
df = spark.createDataFrame(data).toDF("name","role", "salary")
df.show()
-----+--------------+------+
| name| role|salary|
+-----+--------------+------+
| John|Data scientist| 4500|
|James| Data engineer| 3200|
|Laura|Data scientist| 4100|
| Ali| Data engineer| 3200|
|Steve| Developer| 3600|
+-----+--------------+------+
data = [
("John", 45),
("James", 25),
("Laura", 30),
("Will", 28)
]
age_df = spark.createDataFrame(data).toDF("name", "age")
age_df.show()
+-----+---+
| name|age|
+-----+---+
| John| 45|
|James| 25|
|Laura| 30|
| Will| 28|
+-----+---+

The following code gives the inner join of these DataFrames:

joined_df = df.join(age_df, df["name"] == age_df["name"], "inner")
joined_df.show()
+-----+--------------+------+-----+---+
| name| role|salary| name|age|
+-----+--------------+------+-----+---+
| John|Data scientist| 4500| John| 45|
|James| Data engineer| 3200|James| 25|
|Laura|Data scientist| 4100|Laura| 30|
+-----+--------------+------+-----+---+

For inner join, how is set to "inner". Here the rows in both DataFrames that have the same name are selected. Please note that we have a duplicated name column in the resulting DataFrame. To avoid the column duplication in an inner join, you must only give the name of the column that should be matched in both Datasets:

joined_df = df.join(age_df, ["name"], "inner")
joined_df.show()
+-----+--------------+------+---+
| name| role|salary|age|
+-----+--------------+------+---+
| John|Data scientist| 4500| 45|
|James| Data engineer| 3200| 25|
|Laura|Data scientist| 4100| 30|
+-----+--------------+------+---+

Here passing a column name like name means that the join expression is an equality (df['name'] == age_df['name']). You cannot use this method to check a different condition like df['name'] != age_df['name']. You must write the join expression explicitly for such a case.

If you don’t pass a join type, the default join is an inner join. So the previous example could be concisely written as:

df.join(age_df, "name").show()

Since on is a column expression, we can use all the operators in Table. 2 in that. We can also use the column name, however, when a column name exists in both DataFrames, we need to add the name of the DataFrame to the column name to avoid ambiguity. For example, the previous join could be also written as either:

joined_df = df.alias("df1").join(age_df.alias("df2"),
col("df1.name") == col("df2.name"), "inner")
joined_df.show()

or

joined_df = df.alias("df1").join(age_df.alias("df2"), 
F.expr("df1.name == df2.name"), "inner")

Here Please note that we cannot directly use the original name of the DataFrame in a Column expression, so the following code would throw an error:

joined_df = df.join(age_df, F.expr("df.name == age_df.name"), "inner")

To avoid this error, we first create an alias for each DataFrame and then add the column name to that alias using the dot operator.

This is also useful when we want to join a DataFrame with itself (self-join). For example, to join df with itself, we can write:

joined_df = df.alias("df1").join(df.alias("df2"),
col("df1.salary") < col("df2.salary"), "inner")
joined_df.show()
+-----+--------------+------+-----+--------------+------+
| name| role|salary| name| role|salary|
+-----+--------------+------+-----+--------------+------+
|James| Data engineer| 3200| John|Data scientist| 4500|
|James| Data engineer| 3200|Laura|Data scientist| 4100|
|James| Data engineer| 3200|Steve| Developer| 3600|
|Laura|Data scientist| 4100| John|Data scientist| 4500|
| Ali| Data engineer| 3200| John|Data scientist| 4500|
| Ali| Data engineer| 3200|Laura|Data scientist| 4100|
| Ali| Data engineer| 3200|Steve| Developer| 3600|
|Steve| Developer| 3600| John|Data scientist| 4500|
|Steve| Developer| 3600|Laura|Data scientist| 4100|
+-----+--------------+------+-----+--------------+------+

When the column names in the DataFrames are not the same, we don’t need to mention the name of the DataFrame. Here is an example:

age_df1 = age_df.withColumnRenamed("name", "employee_name") 
joined_df = df.join(age_df1, F.expr("name == employee_name"), "inner")
joined_df.show()
+-----+--------------+------+-------------+---+
| name| role|salary|employee_name|age|
+-----+--------------+------+-------------+---+
| John|Data scientist| 4500| John| 45|
|James| Data engineer| 3200| James| 25|
|Laura|Data scientist| 4100| Laura| 30|
+-----+--------------+------+-------------+---+

For an inner join, we can also specify the join expression separately with a where() or filter() method. The following code gives an example:

joined_df = df.join(age_df).where(df["name"] == age_df["name"])
joined_df.show()
+-----+--------------+------+-----+---+
| name| role|salary| name|age|
+-----+--------------+------+-----+---+
| John|Data scientist| 4500| John| 45|
|James| Data engineer| 3200|James| 25|
|Laura|Data scientist| 4100|Laura| 30|
+-----+--------------+------+-----+---+

19.2. Left and right joins

In a left join, all the rows of the first DataFrame (on which join() is invoked) are returned in the result. For each row of the first DataFrame, if a row in the second DataFrame (right) is found that satisfies the join expression (on), its columns are added to the columns of the row in the first DataFrame. If none of the rows of the second DataFrame satisfy the join expression, then the columns of the second DataFrame with null values are added to the columns of the row in the first DataFrame.

As a result, all the original rows of the first DataFrame are returned with either the matching columns of the second one or just null values for them. To do a left join we use join() with a join type of "left" or "leftouter" or "left_outer”. Let’s see an example:

joined_df = df.join(age_df, df["name"] == age_df["name"], "left")
joined_df.show()
+-----+--------------+------+-----+----+
| name| role|salary| name| age|
+-----+--------------+------+-----+----+
| John|Data scientist| 4500| John| 45|
|James| Data engineer| 3200|James| 25|
|Laura|Data scientist| 4100|Laura| 30|
| Ali| Data engineer| 3200| null|null|
|Steve| Developer| 3600| null|null|
+-----+--------------+------+-----+----+

Again we can avoid duplicated columns by using the column names in join():

joined_df = df.join(age_df, "name", "left")
joined_df.show()
+-----+--------------+------+----+
| name| role|salary| age|
+-----+--------------+------+----+
| John|Data scientist| 4500| 45|
|James| Data engineer| 3200| 25|
|Laura|Data scientist| 4100| 30|
| Ali| Data engineer| 3200|null|
|Steve| Developer| 3600|null|
+-----+--------------+------+----+

In a right join, all the original rows of the second DataFrame are returned with either the matching columns of the first one or just null values for them. We should use join() with how set to "right”, "rightouter", or "right_outer” to do a right join.

joined_df = df.join(age_df, ["name"], "right")
joined_df.show()
+-----+--------------+------+---+
| name| role|salary|age|
+-----+--------------+------+---+
| John|Data scientist| 4500| 45|
|James| Data engineer| 3200| 25|
|Laura|Data scientist| 4100| 30|
| Will| null| null| 28|
+-----+--------------+------+---+

19.3 Full join

The full join combines the results of both left and right joins. So all the rows resulting from a left join are returned. In addition, all the rows from a right join are added except the rows that already exist in the left join. So duplicated rows are not returned. To have a full (or full outer) join we set how to "full", "outer", "fullouter", or "full_outer". Let’s see an example:

df.join(age_df, "name", "full").show()
+-----+--------------+------+----+
| name| role|salary| age|
+-----+--------------+------+----+
| Ali| Data engineer| 3200|null|
|James| Data engineer| 3200| 25|
| John|Data scientist| 4500| 45|
|Laura|Data scientist| 4100| 30|
|Steve| Developer| 3600|null|
| Will| null| null| 28|
+-----+--------------+------+----+

If you compare this output with the output of left and right joins, you will see that it has all the rows of both of them.

19.4. Left semi join

It is similar to left join, but there are two differences. First, for each row of the first DataFrame, if a row in the second DataFrame is found for which join expression (on) is true, only the columns of the first row are returned and the columns of the row of the second DataFrame are not added to that. In addition, for each row of the first DataFrame, if none of the rows of the second DataFrame satisfy the join expression then that row is not returned.

So it is like a left join in which the columns of the second DataFrame are removed, and the rows for which these columns are null are removed too. To do a left semi join we can join() with a join type set to "leftsemi", "left_semi", or "semi". Here is an example of using it:

joined_df = df.join(age_df, df["name"] == age_df["name"], "leftsemi")
joined_df.show()
+-----+--------------+------+
| name| role|salary|
+-----+--------------+------+
| John|Data scientist| 4500|
|James| Data engineer| 3200|
|Laura|Data scientist| 4100|
+-----+--------------+------+

19.5. Left anti join

As the name suggests, it does the exact opposite of left semi join. For each row of the first DataFrame, if there are no rows in the second DataFrame for which the join expression is true, that row is returned. To do a left anti join we use join() with the join type set to "leftanti" or "left_anti".

joined_df = df.join(age_df, df["name"] == age_df["name"], "leftanti")
joined_df.show()
+-----+-------------+------+
| name| role|salary|
+-----+-------------+------+
| Ali|Data engineer| 3200|
|Steve| Developer| 3600|
+-----+-------------+------+

19.6. Cross join

To have a cross join we can use join() with the join type set to "cross". Cross join creates a cartesian product of the first DataFrame and the second DataFrame. A cartesian product of two DataFrames is a set of ordered pairs in which the first element is one row of the first DataFrame and the second element is one row of the second DataFrame. So it gives every possible combination of the rows of the first DataFrame with the rows of the second DataFrame. For each pair, if the join expression is true, the elements of that pair are combined to create a new row. So the columns of the row of the second DataFrame are added to the columns of the row of the first DataFrame. In fact, the cross join and inner join give the same result. Here is an example:

joined_df = df.join(age_df, df["name"] == age_df["name"], "cross")
joined_df.show()
+-----+--------------+------+-----+---+
| name| role|salary| name|age|
+-----+--------------+------+-----+---+
| John|Data scientist| 4500| John| 45|
|James| Data engineer| 3200|James| 25|
|Laura|Data scientist| 4100|Laura| 30|
+-----+--------------+------+-----+---+

19.6.1 crossJoin()

We can also use the method crossJoin() in class DataFrame to create the cartesian product of two DataFrames without any additional conditions:

joined_df = df.crossJoin(age_df)
joined_df.show()
+-----+--------------+------+-----+---+
| name| role|salary| name|age|
+-----+--------------+------+-----+---+
| John|Data scientist| 4500| John| 45|
| John|Data scientist| 4500|James| 25|
| John|Data scientist| 4500|Laura| 30|
| John|Data scientist| 4500| Will| 28|
|James| Data engineer| 3200| John| 45|
|James| Data engineer| 3200|James| 25|
|James| Data engineer| 3200|Laura| 30|
|James| Data engineer| 3200| Will| 28|
|Laura|Data scientist| 4100| John| 45|
|Laura|Data scientist| 4100|James| 25|
|Laura|Data scientist| 4100|Laura| 30|
|Laura|Data scientist| 4100| Will| 28|
| Ali| Data engineer| 3200| John| 45|
| Ali| Data engineer| 3200|James| 25|
| Ali| Data engineer| 3200|Laura| 30|
| Ali| Data engineer| 3200| Will| 28|
|Steve| Developer| 3600| John| 45|
|Steve| Developer| 3600|James| 25|
|Steve| Developer| 3600|Laura| 30|
|Steve| Developer| 3600| Will| 28|
+-----+--------------+------+-----+---+

This is a very expensive operation for large DataFrames. The cross join of a DataFrame with m rows with a DataFrame with n rows results in a DataFrame with m×n rows. So for large DataFrames, it can easily result in an out-of-memory exception.

20. Concatenating DataFrames

The method union() in class DataFrame is used to append two Datasets.

union(other: DataFrame) → DataFrame

It appends the rows of other to the end of the DataFrame on which it is invoked and returns a new DataFrame. Here is an example:

df1 = spark.createDataFrame([(1, 2, "a")]).toDF("col0", "col1", "col2")
df2 = spark.createDataFrame([(3, 4, "b")]).toDF("col0", "col1", "col2")
df1.union(df2).show()
+----+----+----+
|col0|col1|col2|
+----+----+----+
| 1| 2| a|
| 3| 4| b|
+----+----+----+

The DataFrames that are being appended by union() should have the same number of columns. Please note that union() doesn’t match the column names when appending the rows of the second DataFrame, and the rows are being appended by their original column order.

df1 = spark.createDataFrame([(1, 2, "a")]).toDF("col0", "col1", "col2")
df2 = spark.createDataFrame([("b", 3, 4)]).toDF("col2", "col0", "col1")
df1.union(df2).show()
+----+----+----+
|col0|col1|col2|
+----+----+----+
| 1| 2| a|
| b| 3| 4|
+----+----+----+

We can use the method unionByName() to resolve columns by field name in the typed objects.

df1 = spark.createDataFrame([(1, 2, "a")]).toDF("col0", "col1", "col2")
df2 = spark.createDataFrame([("b", 3, 4)]).toDF("col2", "col0", "col1")
df1.unionByName(df2).show()
+----+----+----+
|col0|col1|col2|
+----+----+----+
| 1| 2| a|
| 3| 4| b|
+----+----+----+

We can also append the columns of a DataFrame to another DataFrame using an inner join. Here first we create a dummy column in both DataFrames using monotonically_increasing_id() and then inner join these DataFrames based on that. Finally, we drop the dummy column to form the resulting DataFrame.

df1 = spark.createDataFrame([(1, 2), (3, 4)]).toDF("col0", "col1")
df2 = spark.createDataFrame([("a", "b"), ("c", "d")]).toDF("col3", "col4")

df1.withColumn("row_id", F.monotonically_increasing_id()) \
.join(df2.withColumn("row_id", F.monotonically_increasing_id()), "row_id") \
.drop("row_id").show()
+----+----+----+----+
|col0|col1|col3|col4|
+----+----+----+----+
| 1| 2| a| b|
| 3| 4| c| d|
+----+----+----+----+

21. Resampling

21.1. Downsampling

Resampling is used for frequency conversion and resampling of time series. You can use it to convert a DataFrame recorded in some time interval to something else. For example, the daily data can be changed to weekly or monthly data or vice versa. In this section, we will see how we can do different types of resampling using PySpark. We start we a DataFrame which has a record every 20 minutes:

data = [
("2020-01-02 01:00:00", 1),
("2020-01-02 01:20:00", 2),
("2020-01-02 01:40:00", 4),
("2020-01-02 02:00:00", 5),
("2020-01-02 02:20:00", 6),
("2020-01-02 02:40:00", 7),
("2020-01-02 03:00:00", 8),
("2020-01-02 03:20:00", 9),
("2020-01-02 03:40:00", 10),
("2020-01-02 04:00:00", 11),
("2020-01-02 04:20:00", 12),
("2020-01-02 04:40:00", 13),
("2020-01-02 05:00:00", 14),
]

df1 = spark.createDataFrame(data).toDF("time", "value")
df1 = df1.withColumn("time", F.col("time").cast("timestamp"))
df1.show()
+-------------------+-----+
| time|value|
+-------------------+-----+
|2020-01-02 01:00:00| 1|
|2020-01-02 01:20:00| 2|
|2020-01-02 01:40:00| 4|
|2020-01-02 02:00:00| 5|
|2020-01-02 02:20:00| 6|
|2020-01-02 02:40:00| 7|
|2020-01-02 03:00:00| 8|
|2020-01-02 03:20:00| 9|
|2020-01-02 03:40:00| 10|
|2020-01-02 04:00:00| 11|
|2020-01-02 04:20:00| 12|
|2020-01-02 04:40:00| 13|
|2020-01-02 05:00:00| 14|
+-------------------+-----+

Now we try to downsample this DataFrame into 1-hour bins and sum the temperature of the timestamps falling into a bin:

df1.groupBy(F.date_trunc('hour', df1['time']).alias('time')) \
.sum().orderBy('time').show()
+-------------------+----------+
| time|sum(value)|
+-------------------+----------+
|2020-01-02 01:00:00| 7|
|2020-01-02 02:00:00| 18|
|2020-01-02 03:00:00| 27|
|2020-01-02 04:00:00| 36|
|2020-01-02 05:00:00| 14|
+-------------------+----------+

This is equivalent to running the following command in Pandas:

df1.resample('1H', closed='left', label = 'left').sum()

Here we downsampled the DataFrame into 1 hour bins. Each one-hour bin has a start datetime and an end datetime which are called left and right edges of the bin respectively. For example in df1, the left and right edges are 2020–01–02 01:00:00 and 2020–01–02 02:00:00 respectively. In addition, each bin has a label (datetime) in the resampled DataFrame. This label is equal to either the left or right edge of the bin. When we resample df1, we calculate the sum (it can be another operation like mean, count, …) of all the values in each bin. The bin labels and their corresponding sums form the resulting resampled DataFrame. However, in each bin, when calculating the sum, only the value of one edge (left or right) is included which means that the bin is closed on that edge.

In the previous example, the label of each bin is its left edge (label='left') and the bin is closed on the left edge which means that the value at the right edge is not included in the calculation (closed='left'). For example, in the first bin the label is 2020–01–02 01:00:00 and the value at 2020–01–02 02:00:00 is not included in the sum, so the sum is 1+2+4=7.

We can choose which edge is closed and which edge is used as the label when we do a resampling. Here, we see how we can implement the other combinations of label and closed. The following code implements label='right' and closed='left' in PySpark:

# Pandas: df1.resample('1H', closed='left', label = 'right').sum()

df1.groupBy((F.date_trunc('hour', df1['time'])+F.expr('INTERVAL 60 MINUTES')) \
.alias('time')).sum().orderBy('time').show()
+-------------------+----------+
| time|sum(value)|
+-------------------+----------+
|2020-01-02 02:00:00| 7|
|2020-01-02 03:00:00| 18|
|2020-01-02 04:00:00| 27|
|2020-01-02 05:00:00| 36|
|2020-01-02 06:00:00| 14|
+-------------------+----------+

Next, we have The following code implements label='right' and closed='left' :

# Pandas: df1.resample('1H', closed='right', label = 'right').sum()

diff = col('time').cast('long') - F.date_trunc('hour', col('time')).cast('long')
ind = F.when(diff == 0, col('time')) \
.otherwise(F.date_trunc('hour', col('time')+F.expr('INTERVAL 60 MINUTES')))
df1.groupBy(ind.alias('time')) \
.sum().orderBy('time').show()
+-------------------+----------+
| time|sum(value)|
+-------------------+----------+
|2020-01-02 01:00:00| 1|
|2020-01-02 02:00:00| 11|
|2020-01-02 03:00:00| 21|
|2020-01-02 04:00:00| 30|
|2020-01-02 05:00:00| 39|
+-------------------+----------+

And finally, we have label='right' and closed='left'

# Pandas: df1.resample('1H', closed='right', label = 'left').sum()

diff = col('time').cast('long') - F.date_trunc('hour', col('time')).cast('long')
ind = F.when(diff == 0, col('time')) \
.otherwise(F.date_trunc('hour', col('time')+F.expr('INTERVAL 60 MINUTES')))
df1.groupBy((ind-F.expr('INTERVAL 60 MINUTES')).alias('time')) \
.sum().orderBy('time').show()
+-------------------+----------+
| time|sum(value)|
+-------------------+----------+
|2020-01-02 00:00:00| 1|
|2020-01-02 01:00:00| 11|
|2020-01-02 02:00:00| 21|
|2020-01-02 03:00:00| 30|
|2020-01-02 04:00:00| 39|
+-------------------+----------+

21.2. Upsampling

In the next example, we upsample a DaraFrame. We start we defining the following DataFrame:

data = [
("2020-01-01 01:00:00", 1),
("2020-01-01 05:00:00", 2),
("2020-01-01 08:00:00", 3),
]

df1 = spark.createDataFrame(data).toDF("time", "value")
df1 = df1.withColumn("time", F.col("time").cast("timestamp"))
df1.show()
+-------------------+-----+
| time|value|
+-------------------+-----+
|2020-01-01 01:00:00| 1|
|2020-01-01 05:00:00| 2|
|2020-01-01 10:00:00| 3|
+-------------------+-----+

Next, we upsample it into 1-hour bins:

hour = 60 * 60 
epoch = (col("time").cast("long") / hour).cast("long") * hour
with_epoch = df1.withColumn("epoch", epoch)
min_epoch, max_epoch = with_epoch.select(F.min("epoch"), F.max("epoch")).first()

time_range = spark.range(min_epoch, max_epoch + 1, hour).toDF("epoch")

time_range.join(with_epoch.select(['epoch', 'value']), "epoch", "left") \
.orderBy("epoch").withColumn("time", col("epoch") \
.cast("timestamp")).drop('epoch') \
.select(['time', 'value']).show()
+-------------------+-----+
| time|value|
+-------------------+-----+
|2020-01-01 01:00:00| 1|
|2020-01-01 02:00:00| null|
|2020-01-01 03:00:00| null|
|2020-01-01 04:00:00| null|
|2020-01-01 05:00:00| 2|
|2020-01-01 06:00:00| null|
|2020-01-01 07:00:00| null|
|2020-01-01 08:00:00| null|
|2020-01-01 09:00:00| null|
|2020-01-01 10:00:00| 3|
+-------------------+-----+

Here, we first create the new index for 1-hour bins and then join the original DataFrame to it.

Source code

I hope that you enjoyed reading this post. All the Code Listings in this article are available for download as a Jupyter Notebook from GitHub at:

--

--