Originally published on Efficiently (Substack)

We’ve spoken a lot about on-disk and distributed storage, as well as blocks. All of this theory is great, let’s talk about this in practice.

In this post, I’m going to:

  1. Read a CSV dataset into Spark
  2. Write the dataset into 5 Parquet files (treating each file as a block)
  3. Introspect metadata existing on the files
  4. Run queries demonstrating predicate pushdown power

Predicate Pushdown

Hands-On: Setup

The tutorial uses an airports dataset. Download it via:

1
wget https://raw.githubusercontent.com/curran/data/gh-pages/vegaExamples/airports.csv -O dataset.csv

The CSV contains columns: iata, name, city, state, country, latitude, longitude.

Loading Data into Spark

1
val dataset = spark.read.option("header","true").option("inferSchema","true").csv("dataset.csv")

The inferred schema shows latitude and longitude as double types.

Writing Parquet Files

1
dataset.repartition(5).write.parquet("/root/parquet_dataset")

This creates 5 Parquet files plus a _SUCCESS flag file.

Inspecting Parquet Metadata

Install the inspection tools:

1
2
pip3 install parquet-tools
pip3 install parquet-metadata

View file contents:

1
parquet-tools show part-00000-53b27d15-b049-41db-a8aa-fa3033763836-c000.snappy.parquet

Hands-On: Query Plans

Simple Filter Query

1
2
val simpleFilter = dataset.filter($"latitude" > 30)
simpleFilter.show()

The result shows all rows where latitude exceeds 30.

The query plan analysis reveals three optimization stages: parsed logical plan, analyzed logical plan, and optimized logical plan. The optimized logical plan has added some null checking — which also matches our predicate.

Complex Filter Query

1
val complexFilter = dataset.filter($"latitude" > 30).filter($"latitude" < 40)

As you can see, the plan has combined both of our predicates into one step as part of the query process (meaning that what would previously take two passes over the data now only requires one).

The optimized plan consolidates the filters: Filter ((isnotnull(latitude#21) AND (latitude#21 > 30.0)) AND (latitude#21 < 40.0))

Hands-On: Querying with Parquet

Row Group Metadata Analysis

1
parquet-metadata /root/parquet_dataset/part-00000-...

Critical metadata fields include:

  • stats:min — smallest value in the column
  • stats:max — largest value in the column

Example statistics from one file:

1
2
row_group 0 latitude stats:min 14.1743075
row_group 0 latitude stats:max 70.46727611

Predicate Pushdown Mechanism

For a second file with stats:min=44.4430157 and stats:max=74.46727611, a query filtering for latitude between 30 and 40 would exclude this entire file — because we know from the metadata that no values in this file fall within our filter range.

In practice, this is called predicate pushdown. The requirements of the predicate (the query) have been pushed down, allowing the optimizers to look at the metadata on the row groups themselves to decide which row groups to read, and when they can be ignored.

Conclusion

There is a lot of magic that goes into our ability to query data quickly and Efficiently. Query optimizers do a lot for us — and understanding how they work under the hood helps us write better queries and design better data layouts.