Subqueries and CTEs in Spark: Enhancing Data Analysis and Manipulation

Arun Jijo
Javarevisited
Published in
10 min readApr 24, 2024

--

In the intricate world of data analytics, the power to craft sophisticated and efficient queries is invaluable. Delving into the realm of SQL, two pivotal features stand out for their ability to streamline complex data manipulation tasks: Common Table Expressions (CTEs) and subqueries. These SQL constructs are akin to Swiss Army knives for data professionals, offering a myriad of ways to approach data analysis, from simplifying intricate queries to enhancing readability and maintainability. Apache Spark, with its Spark SQL module, has become a sanctuary for processing large-scale data, marrying the robustness of Spark’s distributed computing prowess with the finesse of SQL. This article embarks on a journey through the practical applications of CTEs and subqueries within Spark SQL, navigating through real-world scenarios to illustrate their versatility. We’ll explore the nuances of these powerful SQL features, including their limitations within the Spark ecosystem, and provide insightful workarounds for scenarios that push the boundaries of Spark’s capabilities. Whether it’s financial analytics, hierarchical data exploration, or complex data transformations, mastering CTEs and subqueries in Spark SQL opens up a world of possibilities for data exploration and insight discovery. Additionally, you can find the complete code implementations in this notebook, allowing you to run and experiment with the code interactively to deepen your understanding of the concepts discussed.

Subqueries

Subqueries are powerful SQL constructs that allow you to nest one query within another. They can be used in various clauses of an SQL statement, including SELECT, FROM, WHERE, and HAVING. Here are real-time use cases for different types of subqueries, illustrating their versatility and utility in data analysis and manipulation:

Scalar Subqueries

A scalar subquery is a subquery that returns a single value. This single value can be used in various parts of an SQL statement, such as in the SELECT list, WHERE clause, or even in a SET statement in an UPDATE. It’s often used for comparisons or assignments.

Use Case: Pricing Strategy Analysis

A retail company wants to analyze its pricing strategy by comparing the price of each product against the average price of all products in the same category. A scalar subquery, which returns a single value, can be used to find the average price per category and compare each product’s price to this average.

SELECT 
ProductID,
Price,
CategoryID,
(SELECT AVG(Price) FROM Products p2 WHERE p2.CategoryID = p1.CategoryID) AS AvgPricePerCategory
FROM
Products p1

In this scenario, the scalar subquery calculates the average price for each category, allowing the company to easily identify products that are priced significantly above or below the category average.

Correlated Subqueries

A correlated subquery is a type of subquery that is linked to the outer query by referencing one or more columns from the outer query within its own WHERE clause. This connection between the subquery and the outer query means that the subquery cannot be executed independently; instead, it relies on information from the outer query to execute, essentially creating a dynamic condition for each row processed by the outer query.

The power of correlated subqueries lies in their ability to perform row-by-row evaluations, making them particularly useful for operations that require a comparison of each row against a set of conditions that may change depending on the row in question. This is often seen in use cases where one needs to find maximum, minimum, or related records within groups defined by the outer query.

Use Case: Employee Performance Review

A company wishes to review employee performance by identifying employees who are earning more than the average salary in their respective departments. A correlated subquery, which references columns from the outer query, can be effectively used for this purpose.

SELECT 
EmployeeID,
Name,
Salary,
DepartmentID
FROM
Employees e1
WHERE
Salary > (SELECT AVG(Salary) FROM Employees e2 WHERE e1.DepartmentID = e2.DepartmentID)

This query uses a correlated subquery to find the average salary for each department and then identifies employees who earn more than this average, helping to highlight high performers or potential salary disparities.

Subqueries in the FROM Clause

Use Case: Sales Trend Analysis

A business analyst needs to analyze monthly sales trends by comparing sales figures against the previous month. A subquery in the FROM clause can create a derived table that includes monthly sales totals, which can then be joined to the original sales table to compute month-over-month growth.

SELECT 
a.Month,
a.TotalSales,
a.TotalSales - b.TotalSales AS MonthOverMonthChange
FROM
(SELECT MONTH(SaleDate) AS Month, SUM(Amount) AS TotalSales FROM Sales GROUP BY MONTH(SaleDate)) a
LEFT JOIN
(SELECT MONTH(SaleDate) AS Month, SUM(Amount) AS TotalSales FROM Sales GROUP BY MONTH(SaleDate)) b
ON
a.Month = b.Month + 1

This approach allows analysts to easily visualize sales trends and identify any significant changes in monthly revenue, which is crucial for strategic planning.

Subqueries in the WHERE Clause

Use Case: Targeted Marketing Campaigns

A marketing team wants to target customers who have made purchases above a certain amount but have not engaged with recent marketing campaigns. Subqueries in the WHERE clause can be used to identify these customers by checking for existence or non-existence of certain conditions.

SELECT 
CustomerID,
Email
FROM
Customers
WHERE
CustomerID IN (SELECT CustomerID FROM Orders WHERE TotalAmount > 500)
AND
CustomerID NOT IN (SELECT CustomerID FROM CampaignResponses WHERE CampaignID = 'X')

This query identifies customers who have spent more than a certain amount and excludes those who have already responded to a recent campaign, allowing for more targeted and efficient marketing efforts.

Subqueries in the SELECT Clause

Use Case: Inventory Management

A retail manager needs to manage inventory levels by identifying products that are low in stock and need reordering. A subquery in the SELECT clause can be used to compare current stock levels against minimum required levels for each product.

SELECT 
ProductID,
Name,
(SELECT MIN(RequiredLevel) FROM InventoryLevels WHERE ProductID = p.ProductID) - StockLevel AS UnitsShort
FROM
Products p
WHERE
StockLevel < (SELECT MIN(RequiredLevel) FROM InventoryLevels WHERE ProductID = p.ProductID)

This query helps in inventory management by pinpointing products that are below the required inventory levels, facilitating timely reordering and avoiding stockouts.

Common Table Expressions (CTEs) are a powerful feature in SQL that allow you to create temporary result sets that can be referenced within a SELECT, INSERT, UPDATE, or DELETE statement. CTEs enhance the readability and maintainability of complex SQL queries by breaking them down into simpler, modular components. While Apache Spark SQL supports CTEs, it’s important to note its limitations, particularly regarding recursive CTEs. Let’s explore various types of CTEs through real-world scenarios and examples, including those not directly supported by Spark.

Non-Recursive CTEs

Non-recursive CTEs are the most commonly used form of CTEs, ideal for simplifying complex queries by dividing them into more manageable subqueries.

Real-World Scenario: Financial Reporting

In a financial reporting scenario, an analyst needs to generate a report showing monthly revenue, the cumulative revenue for the year, and comparisons with the previous year’s monthly revenue.

WITH MonthlyRevenue AS (
SELECT
DATE_FORMAT(TransactionDate, 'yyyy-MM') AS Month,
SUM(Amount) AS Revenue
FROM Transactions
GROUP BY DATE_FORMAT(TransactionDate, 'yyyy-MM')
),
CumulativeRevenue AS (
SELECT
Month,
SUM(Revenue) OVER (ORDER BY Month) AS Cumulative
FROM MonthlyRevenue
),
PreviousYearComparison AS (
SELECT
m.Month,
m.Revenue,
m.Revenue - p.Revenue AS YearOverYearChange
FROM MonthlyRevenue m
LEFT JOIN MonthlyRevenue p ON m.Month = ADD_MONTHS(p.Month, 12)
)
SELECT *
FROM CumulativeRevenue
JOIN PreviousYearComparison USING (Month)

In this example, the first CTE calculates the monthly revenue, the second calculates the cumulative revenue, and the third provides a comparison with the previous year, making the complex analysis more digestible.

Recursive CTEs

A Recursive CTE, also known as a Hierarchical CTE, is a special type of CTE that references itself. This allows you to traverse hierarchical or recursive data structures, such as organizational charts, bill-of-materials, or hierarchical category trees, in a straightforward manner.

A Recursive CTE consists of two parts:

  1. Base Query: The non-recursive part that serves as the anchor for the recursion. It selects the initial set of rows to start the recursive process.
  2. Recursive Query: The part that references the CTE itself. It joins the CTE with the original table to fetch additional rows that meet the recursive condition, and this process continues until no more rows are returned. The recursive query will have a condition defined that determines when the recursion should stop. It checks for a specific condition that indicates the end of the recursion.

Real-World Scenario: Organizational Hierarchy Analysis

In a corporate setting, understanding the organizational hierarchy is crucial. For example, finding all subordinates under a specific manager in a hierarchical structure. In traditional SQL, this could be done using a recursive CTE, but in Spark, you’d need to use iterative DataFrame operations or GraphFrames.

WITH RECURSIVE OrgChart AS ( 
--Base Query
SELECT EmployeeID, Name, ManagerID
FROM Employees
WHERE Name = 'John Doe' -- Starting point
UNION ALL

--Recursive Query
SELECT e.EmployeeID, e.Name, e.ManagerID
FROM Employees e
INNER JOIN OrgChart oc ON e.ManagerID = oc.EmployeeID
)
SELECT * FROM OrgChart

Base Query:

SELECT EmployeeID, Name, ManagerID
FROM Employees
WHERE Name = 'John Doe'
  • The Base Query selects the employee with the name ‘John Doe’ from the Employees table.
  • This serves as the starting point or anchor for the recursive process. It defines the initial set of rows to begin the recursion.

Recursive Query:

SELECT e.EmployeeID, e.Name, e.ManagerID
FROM Employees e
INNER JOIN OrgChart oc ON e.ManagerID = oc.EmployeeID
  • The Recursive Query joins the Employees table (e) with the OrgChart CTE (oc) on the ManagerID field.
  • It fetches the employees who report to the managers identified in the previous iteration (OrgChart).
  • The INNER JOIN ensures that we only consider employees who have a matching ManagerID in the OrgChart CTE, creating a hierarchical chain.

Termination Condition:

In the provided recursive CTE, the termination condition is implicitly handled through the join condition in the recursive query. The recursion stops when an employee’s ManagerID does not match any EmployeeID in the previous iteration's result set, indicating the end of the hierarchical chain for that branch.

Need for Starting Point:

The starting point (Name = 'John Doe') in the Base Query is essential because it defines where the recursion begins. Without a starting point, the recursion would have no anchor, and the CTE would not know where to begin fetching data.

In this example:

  • We start with an employee named ‘John Doe’.
  • The recursive query then fetches all employees who report to ‘John Doe’ directly or indirectly through other employees.

CTEs in Spark SQL: Limitations and Workarounds

While Spark SQL supports non-recursive CTEs, its lack of support for recursive CTEs means that for hierarchical or recursive queries, alternative approaches must be adopted.
To address the limitation of Apache Spark SQL, which does not support recursive Common Table Expressions (CTEs), we can use Spark DataFrames to achieve similar results, especially for tasks like traversing hierarchical or tree-structured data. A common real-world scenario requiring recursion is to navigate through an organizational hierarchy or a parts explosion in a manufacturing process.

Example Scenario: Organizational Hierarchy Analysis

Imagine an organization where each employee record in the employees DataFrame has an employee_id, name, and manager_id, indicating who each employee reports to. We want to find all direct and indirect subordinates of a specific manager.

Spark DataFrame Approach to Emulate Recursive CTE

We’ll use a while loop to iteratively join the employees DataFrame with a temperory DataFrame, effectively walking down the hierarchy level by level. This approach loops on levels of all subordinates under the specified manager.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# Initialize Spark session
spark = SparkSession.builder \
.appName("OrganizationalHierarchyAnalysis") \
.getOrCreate()

# Sample Employees DataFrame
data = [(1, 'John Doe', None),
(2, 'Jane Smith', 1),
(3, 'Bob Johnson', 1),
(4, 'Alice Williams', 2),
(5, 'Charlie Brown', 3),
(6, 'Diana Ross', 3)]

columns = ["EmployeeID", "Name", "ManagerID"]
employees_df = spark.createDataFrame(data, columns).alias("e")

org_chart_df = employees_df.filter(employees_df["Name"] == "John Doe").alias("oc")
org_chart_df.show()
# Iterative DataFrame joins to build the organizational chart
while True:
# Join employees_df with org_chart_df based on ManagerID
temp_df = employees_df.join(org_chart_df, col("e.ManagerID") == col("oc.EmployeeID"), "inner") \
.select("e.EmployeeID", "e.Name", "e.ManagerID")
temp_df.show()
# Check for new rows added in the iteration
new_rows_df = temp_df.join(org_chart_df, on="EmployeeID", how="left_anti")

# If no new rows added, break the loop
if new_rows_df.count() == 0:
break

# Add new rows to org_chart_df
org_chart_df = org_chart_df.union(new_rows_df.select(org_chart_df.columns))

# Show the final organizational chart
org_chart_df.show()

The Spark code employs a while loop to iteratively build an organizational hierarchy, starting from an initial employee, ‘John Doe’. The process begins by initializing an Employees DataFrame (employees_df) and creating an initial organizational chart (org_chart_df) by filtering employees with the name 'John Doe'. This initial chart serves as the starting point for the hierarchical construction.

Inside the while loop, the first operation is an inner join between the employees_df and org_chart_df DataFrames. The join condition compares the ManagerID from employees_df with the EmployeeID from org_chart_df. This join operation identifies all employees who report to the managers already present in the org_chart_df. The resultant DataFrame (temp_df) contains the additional hierarchical levels or subordinates discovered in the current iteration.

Following the join operation, a left-anti join (new_rows_df) is performed to identify any new rows in temp_df that are not present in org_chart_df. These new rows represent additional hierarchical levels or subordinates discovered in the current iteration, expanding the organizational chart.

If new_rows_df contains new additions, they are appended to the org_chart_df DataFrame. This step expands the organizational chart with the newly discovered hierarchical levels. The loop continues to iterate as long as new rows are added to the org_chart_df, indicating the discovery of additional hierarchical levels. Once no new rows are added, the loop terminates, completing the construction of the organizational hierarchy.

By following these iterative steps, the Spark code dynamically builds an organizational hierarchy starting from the specified initial employee, ‘John Doe’, and continues to expand the hierarchy until no new hierarchical levels are discovered.Iterative Self-Join Using Spark SQL

Conclusion

In conclusion, this exploration into the realms of Common Table Expressions (CTEs) and subqueries within the context of Apache Spark SQL has unveiled the robust flexibility and power these features bring to data analysis and manipulation. Through a series of real-world scenarios and examples, we’ve delved into the applications of non-recursive CTEs, illuminating their role in enhancing query readability and modularizing complex SQL operations. Equally, we’ve traversed the landscape of subqueries, showcasing their versatility in filtering, aggregation, and data organization tasks across a variety of use cases.

While embracing the strengths of Spark SQL in handling CTEs and subqueries, we’ve also navigated its limitations, particularly the absence of native support for recursive CTEs. However, the innovative workarounds employing iterative DataFrame operations and self-joins within Spark SQL have demonstrated that, despite these limitations, Spark remains an incredibly powerful tool for hierarchical data processing and complex query construction.

--

--

Arun Jijo
Javarevisited

Data engineer at DataKare Solutions who gained expertise at Apache Nifi, Kafka, Spark and passionate in Java.