Use MySQL Trigger To Do Incremental ETL

There’s a huge MySQL table that I need to ETL to Google BigQuery daily, about 1 billion rows. The rows are updated in a random fashion all the time so I can’t do incremental ETL by the recording the max primary key.

Then my colleague brought up the trigger idea, which I believe is the perfect tool for this job: record what has changed then only load the changed rows to BQ.

Below is the trigger I will use, rxt is the source table, rxts is the ‘diff’ table:

create TRIGGER rxt_update after UPDATE on rxt
    if new.value is not null then
      insert INTO rxts SET =
      on DUPLICATE KEY UPDATE rxts.ts = current_timestamp();
    end if;

The difference will be generated by the SQL

SELECT rxt.* FROM rxt join rxts on =

BQ doesn’t support update at the moment, so the table can be rebuilt with the original ‘rxt’ and the ‘rxt_diff’, eg. build rxt_0 with rows in rxt but not in rxt_diff, then append rxt_diff into rxt_0 to form the new rxt table.


MySQL/ Aurora to Google BigQuery

Google BigQuery(BQ) is Google’s column storage, it’s super fast when you need to query over many gigabytes of data.

BQ defaults to utf8, so it makes sense the tables in MySQL/ Aurora are also utf8 encoded.

I use Python and SQL Alchemy to load data to BQ. It’s a waste for SA’s ORM features but the raw connection in SA is nearly as fast as a naked MySQL connection, also I’m already familiar with SA.

First, make sure the MySQL URL has the correct encoding, eg. “db?charset= utf8”. It took my hours to figure out why some Russian characters became ??? without the charset set while some Spanish characters were fine.

There’s a number of ways to load data to BG, I have tried using REST API, or Google Storage. Using Google Storage is like 10x faster than REST, no surprise. Just like the Amazon S3 + Redshift combo. As for the data source format, JSON is modern and CSV is history because if there’s new line quoted in CSV, it will be limited to 4GB according to BQ’s documents.

BQ supports gzipped JSON which is sweet consider you only need to pay for 1/10 of the data traffic. This is how I stream MySQL output to gzipped JSON using ujson and gzip libraries.

with + json_file, 'wb') as gzfile:
  for row in a_list_of_dictionaries:
    ujson.dump(row, gzfile, ensure_ascii=False)

Below is a snippet I used to map MySQL’s data types to BQ:

schema_sql = """select CONCAT('{"name": "', COLUMN_NAME, '","type":"', DATA_TYPE, '"}') as json from information_schema.columns where TABLE_SCHEMA = :database AND TABLE_NAME = :table;"""
fields = [] 
results = connection.execute(text(schema_sql), {'database':database, 'table': table} )
for row in results:
  field = ujson.loads(getattr(row, 'json'))
  if re.match('^[0-9]', field['name']):
    field['name'] = bq_prefix + field['name']
  if re.match('(bool|boolean)', field['type']):
    field['type'] = 'BOOLEAN'
  elif re.match('.*int.*', field['type']):
    field['type'] = 'INTEGER'
  elif re.match('.*(float|decimal|double).*', field['type']):
    field['type'] = 'FLOAT'
  elif re.match('.*(datetime|timestamp).*', field['type']):
    field['type'] = 'TIMESTAMP'
  elif re.match('.*binary.*', field['type']):
    field['type'] = 'BYTES'
    field['type'] = 'STRING'

Using gsutil is very simple, similar to s3cmd.

call(['gsutil', 'cp', json_file, bucket])

And later when I load the dumped JSON I can use these fields to rebuild the schema in BQ:

gbq_schema = ','.join([ "{0}:{1}".format(col['name'], col['type']) for col in fields ])
call(['bq', 'load', '--source_format', 'NEWLINE_DELIMITED_JSON', '--replace', gbq_table, gs_source, gbq_schema])

That’s about it. 🙂


MySQL VS. MariaDB 之日期格式差异

MariaDB Foundation 号称 MariaDB 是 MySQL 的替代品, 但我发现它们之间还是有些不兼容的地方(而且不在已列出的清单里). MariaDB 似乎对日期格式要求更为严格. 以下 SQL 的执行结果就很有不同, 在 MySQL 中 changed_at 是正常的 datetime, 但是在 MariaDB 中 changed_at 完全是 ‘0000-00-00 00:00:00’ 的样子.

  GREATEST( COALESCE(`inspection`.`ts`, 0), COALESCE(`inspection_details`.`ts`, 0), COALESCE(`open_time_reason`.`ts`, 0)
 ) AS `changed_at`
 `changed_at` = VALUES(`changed_at`)

解决方法是添加一个明确的格式转换: CONVERT(… , DATETIME)

 GREATEST( COALESCE(`inspection`.`ts`, 0), COALESCE(`inspection_details`.`ts`, 0), COALESCE(`open_time_reason`.`ts`, 0)
 ), datetime
 ) AS `changed_at`

我已经将此 bug 提交: