A Simple Python Script to Check Difference between Directories

I needed to track differences between software release packages, so that if anything changed dramatically, eg. some file missing or much smaller than expected, I can then get a notification to review the new potentially flawed package.

I found that filecmp.dircmp class in Python is spot on for this job. Here’s my snippet to compare differences of 2 directories recursively:

!/usr/bin/env python3
import argparse from filecmp import dircmp from os.path import getsize
changed_files = {} deleted_files = {} added_files = {}
def diff_file_size(file1, file2): return getsize(file2) - getsize(file1)
def diff_report(): for k, v in deleted_files.items(): print(k, v)
for k, v in added_files.items():
print(k, v)
for k, v in changed_files.items():
print(k, v)
def compare_dir(dir):
for changed_file in dir.diff_files:
file1 = "{0}/{1}".format(dir.left, changed_file)
file2 = "{0}/{1}".format(dir.right, changed_file)
changed_files[ file2 ] = diff_file_size(file1, file2)
for deleted_file in dir.left_only:
file1 = "{0}/{1}".format(dir.right, deleted_file)
deleted_files[ file1 ] = "DELETED!"
for added_file in dir.right_only:
file1 = "{0}/{1}".format(dir.right, added_file)
added_files[ file1 ] = "ADDED!"
for sub_dir in dir.subdirs.values():
compare_dir(sub_dir)
def main():
parser = argparse.ArgumentParser(description="Usage for diff_dir.py")
parser.add_argument('--dir1', type=str, required=True)
parser.add_argument('--dir2', type=str, required=True)
args = parser.parse_args()
dir = dircmp(args.dir1, args.dir2)
compare_dir(dir)
diff_report()
if name__ == '__main':
main()

🙂

Manage AWS EBS Snapshot Life Cycle with Lambda

The timing is not so great. The AWS Data Lifecycle Manager has been announced but I can’t wait for its release. So I decided to use AWS Lambda to do some snapshot lifecycle management.

First a role for Lambda having full access to snapshots can be created via the console.

To create snapshot with Python 3.6 Lambda in AWS:

from datetime import datetime, timedelta

import boto3

def get_tag(tags, tag_name):
    for t in tags:
        if t['Key'] == tag_name:
            return t['Value']
    return 'None'
    
def get_delete_date():
    today = datetime.today()
    if today.weekday() == 0: 
        #Monday
        retention = 28
    else:
        retention = 7
    return (today + timedelta(days=retention)).strftime('%Y-%m-%d')
    
def snapshot_tags(instance, volume):
    tags = [{'Key': k, 'Value': str(v)} for k,v in volume.attachments[0].items()]
    tags.append({'Key': 'InstanceName', 'Value': get_tag(instance.tags, 'Name')})
    tags.append({'Key': 'DeleteOn', 'Value': get_delete_date()})
    return tags

def lambda_handler(event, context):
    ec2 = boto3.resource('ec2')
    for instance in ec2.instances.filter(Filters=[{'Name': "tag:Name", 'Values': [ 'AFLCDWH*' ] }]):
        for volume in instance.volumes.all():
            snapshot = ec2.create_snapshot(VolumeId=volume.id, Description="Snapshot for volume {0} on instance {1}".format(volume.id, get_tag(instance.tags, 'Name')))
            snapshot.create_tags(Resources=[snapshot.id], Tags=snapshot_tags(instance, volume))
    return 'done'

To recycle snapshots meant to be deleted today:

from datetime import datetime

import boto3

def lambda_handler(event, context):
    today = datetime.today().strftime('%Y-%m-%d')
    ec2 = boto3.resource('ec2')
    for snapshot in ec2.snapshots.filter(Filters=[{'Name': "tag:DeleteOn", 'Values': [ today ] }]):
        print(snapshot.id)
        snapshot.delete()
    return 'done'

At last, these functions can’t finish in 3 seconds, so the default 3 seconds time-out will kill them. I lifted the time-out to 1 minute.

Saving Images in Chrome with Selenium

Here’s how to save images in Chrome browser using Selenium. The API has element.screenshot_as_png() method but apparently it’s not implemented at the moment.

With some minor changes to this answer I can save a image via the browser: http://stackoverflow.com/questions/13832322/how-to-capture-the-screenshot-of-a-specific-element-rather-than-entire-page-usin

from selenium import webdriver
from PIL import Image

#chrome_options = ...
#chrome = webdriver.Chrome(chrome_options=chrome_options)
#element = chrome.find_element_by_id('some_id')

def save_image(chrome, element, save_path):
  # in case the image isn't isn't in the view yet
  location = element.location_once_scrolled_into_view
  size = element.size

  # saves screenshot of entire page
  chrome.save_screenshot(save_path)
  # uses PIL library to open image in memory

  image = Image.open(save_path)
  left = location['x']
  top = location['y']
  right = location['x'] + size['width']
  bottom = location['y'] + size['height']
  image = image.crop((left, top, right, bottom))  # defines crop points
  image.save(save_path, 'png')  # saves new cropped image

🙂

A Simple Python IO Trick

I thought using Python’s gzip is quite straight forward, however the IO performance almost doubled with an extra BufferedWriter:

import io
import gzip

def export_to_json_gz(schema):
  json_file = "test.json.gz"
  with io.BufferedWriter( gzip.open(temp_dir + json_file, 'wb') ) as gzfile:
    for row in stream_table_data(schema):
      ujson.dump(row, gzfile, ensure_ascii=False)
      gzfile.write('\n')

Now the bottle neck is ujson, how can I make that part faster?

🙂