Bypassing TROCCO: Direct Data Transfer from HubSpot to BigQuery Using Cloud Functions

Bypassing TROCCO: Direct Data Transfer from HubSpot to BigQuery Using Cloud Functions

Introduction

This article introduces a method to transfer data managed in HubSpot—specifically ‘contacts,’ ‘companies,’ and ‘deals’—to BigQuery, which serves as a data source for BI tools like Looker Studio. Previously, we utilized a SaaS called TROCCO to transfer data from HubSpot to a spreadsheet, from which Looker Studio would load data. However, changes to TROCCO’s pricing plan made it impossible to update data as frequently as every two hours within the free tier. Additionally, the slow data loading from the spreadsheet presented challenges, prompting the adoption of Google Cloud Functions (GCF) and BigQuery as alternatives.

GCP Configuration

Project Name: hubspot-to-bigquery

Google Cloud Functions (GCF): sync-hubspot-to-bigquery

Implemented in Python

Cloud Scheduler: sync-hubspot-to-bigquery-job

Executes the job every hour at the 0th minute

BigQuery:

Dataset: hubspot_data

Tables: contacts, companies, deals

Data Flow

Cloud Scheduler triggers the GCF once every hour.
GCF extracts data from HubSpot and transfers it to BigQuery.

Note: To update data, all records are deleted before new records are added.

Looker Studio generates reports by referencing the tables in BigQuery.

Implementation

Issuing a HubSpot Access Token

Create a private app from the settings menu in HubSpot and set the required scopes (crm.object.deals.read, crm.object.companies.read, crm.object.contacts.read). After creating the private app, copy the access token for later use.

Creating BigQuery Tables

Create the hubspot_data dataset in BigQuery and set up tables for deals, companies, and contacts using the following SQL query for the companies table:

CREATE TABLE `hubspot-to-bigquery.hubspot_data.companies` (
id INT64,
created_at TIMESTAMP,
updated_at TIMESTAMP,
about_us STRING,

)

Setting Up Google Cloud Function (GCF)

Below is the complete script for the function:

import os
import logging
from datetime import datetime, timezone, timedelta
from hubspot import HubSpot
from hubspot.crm.contacts import ApiException
from google.cloud import bigquery
from google.api_core.retry import Retry

logging.basicConfig(level=logging.INFO)

client = bigquery.Client()
contacts_table_id = hubspot-to-bigquery.hubspot_data.contacts
contacts_table = client.get_table(contacts_table_id)
contacts_table_schema_keys = {field.name for field in contacts_table.schema}

companies_table_id = hubspot-to-bigquery.hubspot_data.companies
companies_table = client.get_table(companies_table_id)
companies_table_schema_keys = {field.name for field in companies_table.schema}

deals_table_id = hubspot-to-bigquery.hubspot_data.deals
deals_table = client.get_table(deals_table_id)
deals_table_schema_keys = {field.name for field in deals_table.schema}

# Properties to fetch from HubSpot
contacts_properties = [id, created_at, updated_at, company_size, date_of_birth, …]
companies_properties = [id, created_at, updated_at, about_us, …]
deals_properties = [id, created_at, updated_at, amount_in_home_currency, …]

def sync_hubspot_to_bigquery(_):
access_token = os.getenv(ACCESS_TOKEN)
if not access_token:
logging.error(Access token not found in environment variables)
return Access token not found in environment variables, 500

api_client = HubSpot(access_token=access_token)
try:
# Delete all records for fresh update
delete_table_records(contacts_table_id)
delete_table_records(companies_table_id)
delete_table_records(deals_table_id)

# Fetch data from HubSpot
contacts_fetched = api_client.crm.contacts.get_all(properties=contacts_properties)
companies_fetched = api_client.crm.companies.get_all(properties=companies_properties)
deals_fetched = api_client.crm.deals.get_all(properties=deals_properties)

# Create rows to insert into BigQuery
contacts_rows = create_rows_to_insert(contacts_fetched, contacts_table_schema_keys)
companies_rows = create_rows_to_insert(companies_fetched, companies_table_schema_keys)
deals_rows = create_rows_to_insert(deals_fetched, deals_table_schema_keys)

# Insert data into BigQuery tables
insert_rows_bigquery(contacts_table_id, contacts_rows)
insert_rows_bigquery(companies_table_id, companies_rows)
insert_rows_bigquery(deals_table_id, deals_rows)

success_message = fData synchronized successfully: {len(contacts_rows)} contacts, {len(companies_rows)} companies, and {len(deals_rows)} deals updated.
logging.info(success_message)
return success_message, 200

except ApiException as e:
error_message = fException when requesting: {e}
logging.error(error_message)
return error_message, 500

def delete_table_records(table_id):
delete_query = fDELETE FROM `{table_id}` WHERE TRUE
try:
query_job = client.query(delete_query)
query_job.result()
logging.info(fAll records have been deleted from {table_id}.)
except Exception as e:
logging.error(fFailed to delete records from {table_id}: {e})

def convert_utc_to_jst(timestamp):
jst_zone = timezone(timedelta(hours=9))
jst_time = timestamp.astimezone(jst_zone)
logging.debug(fConverted {timestamp} to {jst_time})
return jst_time.isoformat()

def create_rows_to_insert(fetched_data, table_schema_keys):
rows_to_insert = []
for data in fetched_data:
data_properties = data.properties
row = {
id: data.id,
created_at: convert_utc_to_jst(data.created_at) if data.created_at else None,
updated_at: convert_utc_to_jst(data.updated_at) if data.updated_at else None
}
for key, prop in data_properties.items():
if key in table_schema_keys:
value = prop if prop != and prop is not None else None
if isinstance(value, datetime):
value = convert_utc_to_jst(value)
row[key] = value
rows_to_insert.append(row)
return rows_to_insert

def insert_rows_bigquery(table_id, rows_to_insert, batch_size=100):
custom_retry = Retry(initial=1.0, maximum=10.0, multiplier=2.0, deadline=1200.0)
for i in range(0, len(rows_to_insert), batch_size):
batch = rows_to_insert[i:i + batch_size]
try:
errors = client.insert_rows_json(table_id, batch, retry=custom_retry)
if errors:
logging.error(fErrors occurred in batch {i // batch_size + 1}: {errors})
else:
logging.info(fBatch {i // batch_size + 1} inserted successfully into {table_id}.)
except Exception as e:
logging.error(fError inserting data into {table_id}: {e})

The requirements for this function are as follows:

functions-framework==3.*
hubspot-api-client
google-cloud-bigquery

Log Configuration and BigQuery Client Initialization

Logging is set to the INFO level, and the BigQuery client is initialized with default project settings. We also retrieve the IDs and schema keys of the BigQuery tables that will be used later for data insertion.

logging.basicConfig(level=logging.INFO)
client = bigquery.Client()

contacts_table_id = hubspot-to-bigquery.hubspot_data.contacts
contacts_table = client.get_table(contacts_table_id)
contacts_table_schema_keys = {field.name for field in contacts_table.schema}
# Similarly, settings for the company and deals tables are also configured.

Definition of the Data Synchronization Function

The sync_hubspot_to_bigquery function retrieves the HubSpot access token from environment variables and initializes the API client. Subsequently, it extracts data from HubSpot using specified properties and inserts it into BigQuery.

def sync_hubspot_to_bigquery(_):
access_token = os.getenv(ACCESS_TOKEN)
if not access_token:
logging.error(Access token not found in environment variables)
return Access token not found in environment variables, 500

api_client = HubSpot(access_token=access_token)
# Detailed steps for data extraction and insertion are described later.

Data Insertion and Retry Policy

The data extracted is batch-inserted into BigQuery. A retry policy is set up to automatically retry failed insertion operations, ensuring data integrity.

def insert_rows_bigquery(table_id, rows_to_insert, batch_size=100):
custom_retry = Retry(initial=1.0, maximum=10.0, multiplier=2.0, deadline=1200.0)
for i in range(0, len(rows_to_insert), batch_size):
batch = rows_to_insert[i:i + batch_size]
errors = client.insert_rows_json(table_id, batch, retry=custom_retry)
if errors:
logging.error(fErrors occurred in batch {i // batch_size + 1}: {errors})

Notes on BigQuery Data Updates and Time Conversion

Streaming Buffer Limitation: The specification of BigQuery’s streaming buffer means that setting a data update frequency shorter than one hour could result in errors. While we previously updated data every two hours using TROCCO, this setup allows us to update data more frequently, though care must be taken if even shorter update intervals are required.

Necessity for Timestamp Conversion: Since TIMESTAMP data in BigQuery is stored in UTC, it is necessary to convert it to Japan Standard Time (JST). When analyzing data in Looker Studio, appropriate conversion of these timestamps allows for accurate report generation without the effects of time zone differences.

Cloud Scheduler Setup

Set up Cloud Scheduler to automatically execute the GCF every hour at minute zero. This schedule ensures that HubSpot data is regularly updated, keeping the latest information stored in BigQuery. The scheduler’s Cron configuration is 0 * * * *, which triggers the job every hour on the hour.

References

Trocco Official Site
HubSpot API Documentation
HubSpot API Python Client Library (GitHub)

Please follow and like us:
Pin Share