Getting your application’s data into a common data warehouse in order to do various reporting or business intelligence work is pretty common and also pretty useful. I often reach for Google’s BigQuery product for this since their web UI and simple connections to reporting are easy for others to use out of the box. BigQuery is also widely supported in various other tools such as Metabase which makes it even easier for self-service analytics for your team.
Some of your bigger applications you’ll probably pay for an ETL product to help you push that data over or setup maybe a replica of your database that gets copied over to there, maybe using something like Airbyte.
Sometimes though you just have a simple use case of streaming new records for a table over to BigQuery. Or maybe you want to send a record to BigQuery after some internal processing within your application has completed.
I’ve been using a simple Rails concern to do just that in various applications for the last few years now and figured I’d share. It’s a simple concern where you can specify the dataset, table, and then attributes that you’d like to send to BigQuery. The concern automatically creates the dataset, table, and schema based on the configured attributes, and then finally streams it to BigQuery via Sidekiq.
To get started with this add the google-cloud-bigquery
gem to your Gemfile:
bundle add google-cloud-bigquery
Once that’s been added go ahead and start working on a new concern, we’ll call it BigQuery::Streamable
and place it in app/models/concerns/big_query/streamable.rb
. In a production application you might end up having variations of how you stream objects and sometimes you might just want a full table copy of something from your application, we’ll just focus on streaming complete records for this post. I’ve placed inline comments throughout the concern to talk through everything that’s in there.
# frozen_string_literal: true
# Require the gem
require 'google/cloud/bigquery'
module BigQuery
module Streamable
extend ActiveSupport::Concern
class_methods do
# This class method is how Sidekiq actually exports the record to BigQuery. This allows us to queue it with just the record ID and the job will then find the record and call the instance method `export_to_bigquery`.
def export_to_bigquery(id)
find(id).export_to_bigquery
end
end
# This loads your BigQuery configuration which can be referenced in a variety of ways, take a look at the gem docs for more details.
# https://cloud.google.com/ruby/docs/reference/google-cloud-bigquery/latest/AUTHENTICATION
def bigquery_client
@bigquery_client ||= Google::Cloud::Bigquery.new
end
# This method will find or create the dataset specified by `bigquery_dataset_id` in your model. The find or create flow was added so that you can switch projects and have for instance a staging project and a production project and it'll provision as needed. I usually add a cache key in here so that once we look it up once within the application we'll easily know as soon as the dataset has been created for subsequent requests.
def bigquery_dataset
# Memoize the dataset lookup so we don't have to make the subsequent API calls again for this record.
return @bigquery_dataset if defined?(@bigquery_dataset)
# Calling dataset on the client attempts to find the dataset within BigQuery. We do this to figure out if we need to create the dataset or if we can just reference the existing dataset details.
@bigquery_dataset = bigquery_client.dataset(bigquery_dataset_id)
# Return the dataset details that we just looked up if it's available, if not we'll continue and create the dataset.
return @bigquery_dataset unless @bigquery_dataset.nil?
# Create the dataset and set it to the memoized value so we don't have to look it up again for this record.
@bigquery_dataset = bigquery_client.create_dataset(bigquery_dataset_id)
end
def bigquery_table
return @bigquery_table if defined?(@bigquery_table)
# Calling table to see if the table exists in BigQuery yet so we can load the table details.
@bigquery_table = bigquery_dataset.table(bigquery_table_id)
return @bigquery_table unless @bigquery_table.nil?
# Creating the table based on the configured attributes for the class.
@bigquery_table = bigquery_dataset.create_table(bigquery_table_id) do |t|
# This block allows you to specify the schema within it so you can configure a dynamic schema with the create call in a pretty similar way to a Rails migration but within BigQuery.
t.schema do |s|
# bigquery_schema is a helper method that utilizes bigquery_attribute_names on the model to figure out which attributes should be created with the schema.
bigquery_schema.each do |column|
# The gem does a pretty decent job of mapping the Rails column type to a BigQuery column type but for some types it needs a little bit of help. Also, I don't recommend utilizing json column types with BigQuery, you'd want to flatten that data out.
type = case column.type
when :datetime
:timestamp
when :decimal
:numeric
else
column.type
end
# This is a send call to the BigQuery schema so we can specify a column with that type and name. This would be the equivalent of `s.string 'column_name'` for the BigQuery schema.
s.send(type, column.name)
end
end
end
end
# Selecting the columns based on the bigquery_attribute_names that have been provided for the model.
def bigquery_schema
self.class.columns.select do |c|
bigquery_attribute_names.include?(c.name)
end
end
# Slicing out just the attributes that we need for BigQuery, this allows you to be selective about what's sent over to BigQuery. This is useful for ignoring sensitive data or data that isn't useful in BigQuery.
def bigquery_attributes
attributes.slice(*bigquery_attribute_names)
end
# This method can be called wherever in your model. In some situations this is called in an after_create callback like so:
# after_create :queue_export_to_bigquery
# Or you could process the record and then call this at the end of it, something like:
# def process_stats
# calculate_things
# queue_export_to_bigquery
# end
def queue_export_to_bigquery
self.class.delay(queue: :bigquery).export_to_bigquery(id)
end
# This is the generic export_to_bigquery method that checks if a record has already been exported and returns early if so. If there was an insert failure then it raises so you can triage within your error reports. You'll notice that you'll need a new `exported_to_bigquery_at` column on your models to record the export timestamp.
def export_to_bigquery
return if exported_to_bigquery_at.present?
# This insert method is a streaming insert in the BigQuery API so it's going to stream the record in almost instantly to BigQuery. The attributes is just a ruby hash that the gem will map back to the schema as long as the names match.
# Streaming insert docs: https://cloud.google.com/bigquery/docs/samples/bigquery-table-insert-rows#bigquery_table_insert_rows-ruby
insert = bigquery_table.insert bigquery_attributes
raise "Failed to insert row into BigQuery #{insert.inspect}" unless insert.success?
update_column(:exported_to_bigquery_at, Time.now)
end
end
end
With this concern in place then you just need to configure a few details within your model. Here’s a simple Deploy
model that was created with the following schema:
create_table "deploys", force: :cascade do |t|
t.datetime "recorded_at"
t.string "status"
t.string "performer"
t.string "version"
t.string "service_version"
t.text "hosts"
t.string "command"
t.string "subcommand"
t.string "destination"
t.string "role"
t.integer "runtime"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.datetime "exported_to_bigquery_at"
t.string "service"
end
class Deploy < ApplicationRecord
include BigQuery::Streamable
after_create :queue_export_to_bigquery
def bigquery_dataset_id
'shipyrd'
end
def bigquery_table_id
'deploys'
end
def bigquery_attribute_names
%w[
recorded_at
status
performer
version
]
end
end
Now whenever a new Deploy
record is created it’ll be in BigQuery as soon as it’s processed out of Sidekiq. You don’t have to utilize Sidekiq, you can use whatever backend queueing library you’d like of course. With the concern created you can now start streaming any of your models by setting the dataset, table, and attribute names within your models.