In spark, reading a json file is pretty straightforward but constructing a schema for complex json data is challenging especially for newbies in spark. In this blog I will try to provide some tips to make it simple.
How to create schema:
In spark, Dataframe schema is constructed using a struct object. A struct contains a collection of fields called struct field. In layman terms, struct type is a bag and contains a collection of things.
Tips for creating Dataframe schema:
Tip 1: Understand the json data and construct the schema. I will take an example of below json data for constructing the schema.
[{
"data": {
"emp_id": "12345",
"emp_name": "Mohan",
"awards": [
{
"award_type": "Internal",
"award_name": "Best_emp_of_the_year",
"year": "2000"
},
{
"award_type": "External",
"award_name": "Best_presenter",
"year": "2001"
}
]
}
}]
- Schema always starts with a struct object.
schema = StructType()
- Whenever you see a curly brace in a json data, then the field type is a struct type. In the above example you can see the data field value is a struct object.
schema = StructType() \
.add("data", StructType() \
.add("emp_id", StringType(),True) \
.add("emp_name", StringType(), True) \
.add("awards", ArrayType(StructType()
.add("award_type", StringType(), True) \
.add("award_name", StringType(), True))) \
, True)
- Whenever you see a square bracket in a json data, then the field type is an array type. In the above example you can see the awards field is an array type.
schema = StructType() \
.add("data", StructType() \
.add("emp_id", StringType(),True) \
.add("emp_name", StringType(), True) \
.add("awards", ArrayType(StructType()
.add("award_type", StringType(), True) \
.add("award_name", StringType(), True))) \
, True)
Tip 2: Read the json data without schema and print the schema of the dataframe using the print schema method. This helps us to understand how spark internally creates the schema and using this information you can create a custom schema.
df = spark.read.json(path="test_emp.json", multiLine=True)
- Here the data is a root field. It is a struct type and contains `emp_id`, `emp_name` and `awards` as subfields.
- The awards field is again an array of struct types and it contains the `award_type` and `award_name` fields whose type is string.
Complete source code:
#test_emp.json
[{
"data": {
"emp_id": "12345",
"emp_name": "Mohan",
"awards": [
{
"award_type": "Internal",
"award_name": "Best_emp_of_the_year",
"year": "2000"
},
{
"award_type": "External",
"award_name": "Best_presenter",
"year": "2001"
}
]
}
}]#main.py
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StringType, StructType
schema = StructType() \
.add("data", StructType() \
.add("emp_id", StringType(),True) \
.add("emp_name", StringType(), True) \
.add("awards", ArrayType(StructType()
.add("award_type", StringType(), True) \
.add("award_name", StringType(), True))) \
, True)
spark = SparkSession \
.builder \
.master("local[*]") \
.appName("Create Dataframe") \
.getOrCreate()
df = spark.read.json(path= "test_emp.json", schema=schema, multiLine=True)
df.show(truncate=False)
df.select("data.emp_name").show()
df.select("data.awards.award_name").show()
and you can also use this schema for creating a dataframe using the createDataframe method like shown in the below example.
from pyspark.sql.session import SparkSession
from pyspark.sql.types import ArrayType, StringType, StructType
spark = SparkSession \
.builder \
.master("local[*]") \
.appName("Create Dataframe") \
.getOrCreate()
data = [{
"data": {
"emp_id": "12345",
"emp_name": "Mohan",
"awards": [
{
"award_type": "Internal",
"award_name": "Best_emp_of_the_year",
"year": "2000"
},
{
"award_type": "External",
"award_name": "Best_presenter",
"year": "2001"
}
]
}
}]
schema = StructType() \
.add("data", StructType() \
.add("emp_id", StringType(),True) \
.add("emp_name", StringType(), True) \
.add("awards", ArrayType(StructType()
.add("award_type", StringType(), True) \
.add("award_name", StringType(), True) \
.add("year", StringType(), True))) \
, True)
df1 = spark.createDataFrame(data=data,schema=schema)
df1.show(truncate=False)
df1.printSchema()
Thanks for reading!!