Save BigQuery query result in Google Cloud Storage (GCS) bucket and send to external FTP Server using Python
Use Case Scenario:
Generate a CSV file from a query results from a Big Query table and save it in a Google Cloud Storage Bucket (GCS Bucket).
Import the needed Client Libraries (Google Storage, Google BigQuery, Google OAuth2, IO, ftplib, etc..)
from google.cloud import bigquery
from google.cloud import storage
from requests import request
from google.oauth2 import service_account
from io import StringIO
import json
import ftplib
from ftplib import FTP
import io
Define your Google Service Account
client = bigquery.Client.from_service_account_json('xxxx-xxxxx-xxxx.json')
storage_client = storage.Client.from_service_account_json('xxxx-xxxxx-xxxx.json')
Create a query to fetch all data from Big Query table
query = f"""
SELECT * FROM sandbox.test_invoices
"""
Define the destination URI where the query results will be stored in GCS
bucket_name = 'bucket-4-test'
blob_name = f'test_data.csv'
destination_uri = f'gs://bucket-4-test/test_data.csv'
Run the query and export the results to GCS
query_job = client.query(query)
destination_blob = storage_client.bucket(bucket_name).blob(blob_name)
destination_blob.content_type = 'text/csv'
savefile = query_job.result().to_dataframe().to_csv(destination_blob.open('w', encoding="utf-8"), index=False)
Verify that the query results are exported to GCS
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.get_blob(blob_name)
Define FTP Server and User credentials
ftp_host = '194.195.xxx.xxx'
ftp_user = 'xxxx.wehelpcode.com'
ftp_password = '*******'
ftp_remote_path = f'/public_html/test_data.csv'
Define FTP Server and User credentials
file_stream = io.BytesIO()
blob.download_to_file(file_stream)
file_stream.seek(0)
Send CSV file to external FTP Server
ftp = FTP(ftp_host)
ftp.login(user=ftp_user, passwd=ftp_password)
ftp.storbinary(f'STOR {ftp_remote_path}', file_stream)
ftp.quit()
print(f"File {blob_name} uploaded to FTP server at {ftp_remote_path}")
Complete Code:
from google.cloud import bigquery
from google.cloud import storage
from requests import request
from google.oauth2 import service_account
from io import StringIO
import json
import ftplib
from ftplib import FTP
import io
client = bigquery.Client.from_service_account_json('xxxx-xxxxx-xxxx.json')
storage_client = storage.Client.from_service_account_json('xxxx-xxxxx-xxxx.json')
query = f"""
SELECT * FROM sandbox.test_invoices
"""
bucket_name = 'bucket-4-test'
blob_name = f'test_data.csv'
destination_uri = f'gs://bucket-4-test/test_data.csv'
query_job = client.query(query)
destination_blob = storage_client.bucket(bucket_name).blob(blob_name)
destination_blob.content_type = 'text/csv'
savefile = query_job.result().to_dataframe().to_csv(destination_blob.open('w', encoding="utf-8"), index=False)
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.get_blob(blob_name)
ftp_host = '194.195.xxx.xxx'
ftp_user = 'xxxx.wehelpcode.com'
ftp_password = '*******'
ftp_remote_path = f'/public_html/test_data.csv'
file_stream = io.BytesIO()
blob.download_to_file(file_stream)
file_stream.seek(0) # Reset the stream position to the beginning
ftp = FTP(ftp_host)
ftp.login(user=ftp_user, passwd=ftp_password)
ftp.storbinary(f'STOR {ftp_remote_path}', file_stream)
ftp.quit()
print(f"File {blob_name} uploaded to FTP server at {ftp_remote_path}")