MySQL/ Aurora to Google BigQuery

Google BigQuery(BQ) is Google’s column storage, it’s super fast when you need to query over many gigabytes of data.

BQ defaults to utf8, so it makes sense the tables in MySQL/ Aurora are also utf8 encoded.

I use Python and SQL Alchemy to load data to BQ. It’s a waste for SA’s ORM features but the raw connection in SA is nearly as fast as a naked MySQL connection, also I’m already familiar with SA.

First, make sure the MySQL URL has the correct encoding, eg. “db?charset= utf8”. It took my hours to figure out why some Russian characters became ??? without the charset set while some Spanish characters were fine.

There’s a number of ways to load data to BG, I have tried using REST API, or Google Storage. Using Google Storage is like 10x faster than REST, no surprise. Just like the Amazon S3 + Redshift combo. As for the data source format, JSON is modern and CSV is history because if there’s new line quoted in CSV, it will be limited to 4GB according to BQ’s documents.

BQ supports gzipped JSON which is sweet consider you only need to pay for 1/10 of the data traffic. This is how I stream MySQL output to gzipped JSON using ujson and gzip libraries.

with gzip.open(temp_dir + json_file, 'wb') as gzfile:
  for row in a_list_of_dictionaries:
    ujson.dump(row, gzfile, ensure_ascii=False)
    gzfile.write('\n')

Below is a snippet I used to map MySQL’s data types to BQ:

schema_sql = """select CONCAT('{"name": "', COLUMN_NAME, '","type":"', DATA_TYPE, '"}') as json from information_schema.columns where TABLE_SCHEMA = :database AND TABLE_NAME = :table;"""
fields = [] 
results = connection.execute(text(schema_sql), {'database':database, 'table': table} )
for row in results:
  field = ujson.loads(getattr(row, 'json'))
  if re.match('^[0-9]', field['name']):
    field['name'] = bq_prefix + field['name']
  if re.match('(bool|boolean)', field['type']):
    field['type'] = 'BOOLEAN'
  elif re.match('.*int.*', field['type']):
    field['type'] = 'INTEGER'
  elif re.match('.*(float|decimal|double).*', field['type']):
    field['type'] = 'FLOAT'
  elif re.match('.*(datetime|timestamp).*', field['type']):
    field['type'] = 'TIMESTAMP'
  elif re.match('.*binary.*', field['type']):
    field['type'] = 'BYTES'
  else:
    field['type'] = 'STRING'
  fields.append(field)

Using gsutil is very simple, similar to s3cmd.

call(['gsutil', 'cp', json_file, bucket])

And later when I load the dumped JSON I can use these fields to rebuild the schema in BQ:

gbq_schema = ','.join([ "{0}:{1}".format(col['name'], col['type']) for col in fields ])
call(['bq', 'load', '--source_format', 'NEWLINE_DELIMITED_JSON', '--replace', gbq_table, gs_source, gbq_schema])

That’s about it. 🙂

 

|2|left

Leave a Reply

Your email address will not be published. Required fields are marked *