Python is a popular programming language for Apache Spark users, and Google Bigquery is one of the most powerful serverless analytical database service. In this blog, I share a step by step approach to setup Pyspark to connect with Google Bigquery.
System Requirements
- Java 11.0.8 or Java 1.8
- Python 3.7.7
- Pyspark 3.1.1
- Required jar files
google-api-client-1.30.10.jar
spark-bigquery-with-dependencies_2.12–0.20.0.jar
gcs-connector-hadoop3-latest.jar - GCP service account.
Prerequisites
- Basic knowledge on Pyspark
- Basic knowledge on Google Cloud Platform
Procedure
- Open terminal and create python project directory
mkdir pyspark_bigquery_example
cd pyspark_bigquery_example
- Create virtual environment for the project
pip install pipenv
pipenv --python 3.7
- Install the pyspark
pipenv install pyspark==3.1.1
- If spark is not installed in your system then copy the google-api-client-1.30.10.jar, gcs-connector-hadoop3-latest.jar and spark-bigquery-with-dependencies_2.12–0.20.0.jar files into pyspark jars folder under virtual environment directory.
pipenv --venv # This command prints the virtual environment path
cp <jar location> <venv_path>/lib/python3.7/site-packages/pyspark/jars
Note: By running the following command in the terminal, you can verify. Spark is installed or not in your system.
pyspark --version # If the command is not found then pyspark is not installed in your system
If Spark is installed in your system then you have to copy the above mentioned jars into the pyspark jars folder under the system pyspark installed directory.
- create a python file using below code.
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringTypespark = SparkSession \
.builder \
.master("local[*]") \
.appName("Pyspark bigquery Example") \
.getOrCreate()GCP_PROJECT_ID = "<<<Enter your GCP project id>>>"
BIGQUERY_DATASET_NAME = "<<<Enter your Bigquery dataset name>>>"
BIGQUERY_TABLE_NAME = "<<<Enter your Bigquery table name>>>"
TEMPORARY_BUCKET_NAME = "<<<Enter your bucket name>>>"def store_df_as_table(dataframe: DataFrame, dataset_name, table_name):
dataframe.write.format('bigquery') \
.option("table", f"{GCP_PROJECT_ID}.{dataset_name}.{table_name}") \
.option('parentProject', GCP_PROJECT_ID) \
.option("temporaryGcsBucket", TEMPORARY_BUCKET_NAME) \
.mode("overwrite") \
.save()def read_table_as_df(dataset_name, table_name, schema_df):
return spark.read.format("bigquery") \
.option("table", f"{GCP_PROJECT_ID}.{dataset_name}.{table_name}") \
.schema(schema_df) \
.load()if __name__ == '__main__':sample_data = [ (12345, "CITY 1", "STATE 1", 704, "STANDARD"),
(99999, "CITY 2", "STATE 2", 907, "STANDARD")
]
sample_schema = StructType(
[
StructField("RecordNumber", IntegerType(), True),
StructField("City", StringType(), True),
StructField("State", StringType(), True),
StructField("ZipCodeType", IntegerType(), True),
StructField("Zipcode", StringType(), True)
]
)sample_df = spark.createDataFrame(data=sample_data, schema=sample_schema)store_df_as_table(sample_df, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME)
print("Dataframe writing is completed")df = read_table_as_df(BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME, sample_schema)
print("DataFrame read is completed")df.show()
- Export the google application credentials environment variable with your gcp service account path.
export GOOGLE_APPLICATION_CREDENTIALS=service_account.json
Note: Make sure that the service account has necessary permission for accessing Bigquery & Cloud storage.
- Run the program using the below command.
pipenv run python main.py # Add sudo infront of the command if you are encountered any permission issues
Thanks for reading !!