Pyspark project setup for connecting Google Bigquery

Amarnath
2 min readMay 7, 2021

--

Python is a popular programming language for Apache Spark users, and Google Bigquery is one of the most powerful serverless analytical database service. In this blog, I share a step by step approach to setup Pyspark to connect with Google Bigquery.

System Requirements

Prerequisites

  • Basic knowledge on Pyspark
  • Basic knowledge on Google Cloud Platform

Procedure

  • Open terminal and create python project directory
mkdir pyspark_bigquery_example
cd pyspark_bigquery_example
  • Create virtual environment for the project
pip install pipenv
pipenv --python 3.7
  • Install the pyspark
pipenv install pyspark==3.1.1
pipenv --venv    # This command prints the virtual environment path
cp <jar location> <venv_path>/lib/python3.7/site-packages/pyspark/jars

Note: By running the following command in the terminal, you can verify. Spark is installed or not in your system.

pyspark --version  # If the command is not found then pyspark is not installed in your system

If Spark is installed in your system then you have to copy the above mentioned jars into the pyspark jars folder under the system pyspark installed directory.

  • create a python file using below code.
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
spark = SparkSession \
.builder \
.master("local[*]") \
.appName("Pyspark bigquery Example") \
.getOrCreate()
GCP_PROJECT_ID = "<<<Enter your GCP project id>>>"
BIGQUERY_DATASET_NAME = "<<<Enter your Bigquery dataset name>>>"
BIGQUERY_TABLE_NAME = "<<<Enter your Bigquery table name>>>"
TEMPORARY_BUCKET_NAME = "<<<Enter your bucket name>>>"
def store_df_as_table(dataframe: DataFrame, dataset_name, table_name):
dataframe.write.format('bigquery') \
.option("table", f"{GCP_PROJECT_ID}.{dataset_name}.{table_name}") \
.option('parentProject', GCP_PROJECT_ID) \
.option("temporaryGcsBucket", TEMPORARY_BUCKET_NAME) \
.mode("overwrite") \
.save()
def read_table_as_df(dataset_name, table_name, schema_df):
return spark.read.format("bigquery") \
.option("table", f"{GCP_PROJECT_ID}.{dataset_name}.{table_name}") \
.schema(schema_df) \
.load()
if __name__ == '__main__':sample_data = [ (12345, "CITY 1", "STATE 1", 704, "STANDARD"),
(99999, "CITY 2", "STATE 2", 907, "STANDARD")
]

sample_schema = StructType(
[
StructField("RecordNumber", IntegerType(), True),
StructField("City", StringType(), True),
StructField("State", StringType(), True),
StructField("ZipCodeType", IntegerType(), True),
StructField("Zipcode", StringType(), True)
]
)
sample_df = spark.createDataFrame(data=sample_data, schema=sample_schema)store_df_as_table(sample_df, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME)
print("Dataframe writing is completed")
df = read_table_as_df(BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME, sample_schema)
print("DataFrame read is completed")
df.show()
  • Export the google application credentials environment variable with your gcp service account path.
export GOOGLE_APPLICATION_CREDENTIALS=service_account.json

Note: Make sure that the service account has necessary permission for accessing Bigquery & Cloud storage.

  • Run the program using the below command.
pipenv run python main.py # Add sudo infront of the command if you are encountered any permission issues

Thanks for reading !!

--

--

Amarnath
Amarnath

Written by Amarnath

Passionate Data Engineer, Tech geek | Works at Thoughtworks

Responses (1)