Discover AWS State Machines using Python Lambdas for an ETL process

Step Functions, State Machines, and Lambdas oh my! AWS has really been expanding what you can do without needing to actually stand up any servers. I’m going to walk through a very basic example of how to get going with your own Python code to create an ETL (Extract Transform Load) process using Amazon’s services. And don’t worry, all this goodness is included in the free tier!

The goal of this exercise will be to have an aggregation of news headlines downloaded and transformed into CSV format and uploaded to another service. We are going to achieve this by breaking up each step of the process into its own AWS Lambda.

What are Lambdas?

AWS Lambdas are a “serverless”, stateless way to run snippets of code with no extra initialization or shutdown time.

When to use Lambdas

They are great if you have small highly reusable pieces of code that serve a single purpose. (If you have a few that go together really well, that’s where state machines come in.)  For example if you have some code that does image recognition and you need to use it across multiple projects. Or even just want it to run faster or be more accessible, as Lambdas have several ways they can be initiated, including via an API you can define.

They will NOT fit your purpose if you need something that does a multitude of tasks, will run for a long time, use a lot of memory or update frequently.

Creating a Lambda

Creating your own is a lot easier than a lot of other tutorials seem to show. If you haven’t already, sign up for an AWS account. Then open your AWS console and search for Lambda.

You’ll be presented with a welcome screen most likely, after clicking through “Get Started” or whatever they updated it to this month, you’ll have a screen where you can create new functions as well as check on existing ones.

See the big orange button that even Trump would be proud of? Click it.

As this is probably your first Lambda, you will have to create a new role. Super simple, don’t have to leave the page even. Just give it a new name, and give it a policy template. I used the Simple Microserve permissions as it seemed to fit the bill for me most.

Then you will be greeted with a page with a large amount of info and stuff going on. The part that we are going to be most concerned about is the Function Code area (and will also need Environment Variables to store API keys in).

It may seem like we need to set up triggers or resources for this information to go to, but as we plan to use these inside a state machine, that will handle all that bother for us.

ETL – Extract Transform Load

Now that we know how to make a Lambda, lets look at some code we could use with it. For the state machine we will create later, I want to have an entire process where I pull in information from an outside source (extract), modify to fit my needs (transform) and then put it into my own system (load.)

Extract

As stated above, this scenario involves pulling down data from a news source, in this case we are using News API that allows you to create a free API key to grab top news headlines and link to their stories.

That code is dead simple:

import json
from urllib import request


def retrieve_news(key, source):
    url = f"https://newsapi.org/v2/top-headlines?sources={source}&apiKey={key}"
    with request.urlopen(url) as req:
        return json.loads(req.read())

print(retrieve_news(my_key, 'associated-press')

If I wasn’t using this in a Lambda, I would be using the wonderful Requests module instead, but Python 3’s urllib is at least a lot better than 2s.

So now, we need a way for the Lambda function to call this code and pass along the results in a manor we can use later. On the page to fill in the code, you’ll see a place that says under Function Code that lists the Handler this is the entry point to your code. lambda_function.lambda_handler is the default, which means it will use the function lambda_handler inside the file lambda_function.py as the entry.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import os
import json
from urllib import request


def retrieve_news(key, source):
    url = f"https://newsapi.org/v2/top-headlines?sources={source}&apiKey={key}"
    with request.urlopen(url) as req:
        return json.load(req)


# What AWS Lambda calls
def lambda_handler(event, context):
    key = os.getenv('NEWSAPI_KEY')
    if not key:
        raise Exception('Not all environment variables set')

    if not event['source']:
        raise Exception('Source not set')

    return {'data': retrieve_news(key, event['source']),
            'source': event['source']}

There are two arguments passed into the function, the first is event which is all the information sent to the lambda function (if using a standard JSON object this will be a dictionary, as seen above). The second is context which is a class that will tell you about the current lambda function if necessary, you can learn more about it here, but it will not be used in this example.

Testing the lambda

You may also notice that we are pulling the API key not from the event, but from an environment variable, so make sure to set that as well on the page. Last and not least, I would suggest increasing the timeout for the lambda to 10 seconds, from the default 3.

Before we go on and add the other functions, lets make sure this one works properly.  At the top of the page, where there is a drop down beside test and Actions on the right, click Configure test events we are going to add a new one with the details that will be passed into the event dictionary.

{
  "source": "associated-press"
}

On the pop-up, copy in the above JSON and save it as a new test event.

Hit the test button at the top, and see the results. You should get a big green window that shows you how it ran. If you have a red error window, you will have to figure out what went wrong first.

Transform

This will be our second lambda, so we get to go through the process again of creating a new one (you can use the exiting role from the last one) and copying this code into it. No Environment variables needed this time!

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import csv
from io import StringIO


# What AWS Lambda calls
def lambda_handler(event, context):

    sio = StringIO()
    writer = csv.writer(sio)
    writer.writerow(["Source", "Title", "Author", "URL"])
    for article in event['data']['articles']:
        writer.writerow([
            article['source']['name'],
            article['title'],
            article['author'],
            article['url']
        ])

    csv_content = sio.getvalue()
    print(csv_content)

    return {'data': csv_content,
            'source': event['source']}


The tricky part here is now you need good test data for it. Luckily you can copy the output of the last Lambda (provided snippet below) to do just that.

{
  "data": {
    "status": "ok",
    "totalResults": 5,
    "articles": [
      {
        "source": {
          "id": "associated-press",
          "name": "Associated Press"
        },
        "author": "FRANCES D'EMILIO",
        "title": "Pope accepts resignation of McCarrick after sex abuse claims",
        "description": "VATICAN CITY (AP) — In a move described as unprecedented, Pope Francis has effectively stripped U.S. prelate Theodore McCarrick of his cardinal's title and rank following allegations of sexual abuse, including one involving an 11-year-old boy. The Vatican ann…",
        "url": "https://apnews.com/46e8e15911034e7f971c7542b60a6444",
        "urlToImage": "https://storage.googleapis.com/afs-prod/media/media:b5c82ad2f2b74b50ab9faccf51898309/2628.jpeg",
        "publishedAt": "2018-07-28T16:21:57Z"
      },
      {
        "source": {
          "id": "associated-press",
          "name": "Associated Press"
        },
        "author": "KEVIN FREKING",
        "title": "On trade policy, Trump is turning GOP orthodoxy on its head",
        "description": "WASHINGTON (AP) — President Donald Trump's trade policies are turning long-established Republican orthodoxy on its head, marked by tariff fights and now $12 billion in farm aid that represents the type of government intervention GOP voters railed against a de…",
        "url": "https://apnews.com/57cd042b57054e5790b9b444c561ac3b",
        "urlToImage": "https://storage.googleapis.com/afs-prod/media/media:90f04d837f514d0b984e25bd5153be8a/3000.jpeg",
        "publishedAt": "2018-07-28T16:20:11Z"
      },
      {
        "source": {
          "id": "associated-press",
          "name": "Associated Press"
        },
        "author": "SETH BORENSTEIN and FRANK JORDANS",
        "title": "Science Says: Record heat, fires worsened by climate change",
        "description": "Heat waves are setting all-time temperature records across the globe, again. Europe suffered its deadliest wildfire in more than a century, and one of nearly 90 large fires in the U.S. West burned dozens of homes and forced the evacuation of at least 37,000 p…",
        "url": "https://apnews.com/a4255779e2b6461b9cc8dbf24ea4b96c",
        "urlToImage": "https://storage.googleapis.com/afs-prod/media/media:f9b76dc0354e47caafcfad96c36443ca/3000.jpeg",
        "publishedAt": "2018-07-28T15:03:01Z"
      },
      {
        "source": {
          "id": "associated-press",
          "name": "Associated Press"
        },
        "author": "MICHAEL KUNZELMAN and LARRY NEUMEISTER",
        "title": "No mystery to Supreme Court nominee Kavanaugh's gun views",
        "description": "SILVER SPRING, Md. (AP) — Supreme Court nominee Brett Kavanaugh says he recognizes that gun, drug and gang violence \"has plagued all of us.\" Still, he believes the Constitution limits how far government can go to restrict gun use to prevent crime. As a federa…",
        "url": "https://apnews.com/c8fc0785b429497abf9621efcdb345e8",
        "urlToImage": "https://storage.googleapis.com/afs-prod/media/media:4c3619ea948b4c91b8f2fcdd50162d26/3000.jpeg",
        "publishedAt": "2018-07-28T14:11:06Z"
      },
      {
        "source": {
          "id": "associated-press",
          "name": "Associated Press"
        },
        "author": "HOPE YEN, JOSH BOAK and CHRISTOPHER RUGABER",
        "title": "AP FACT CHECK: Trump's hyped claims on economy, NKorea, vets",
        "description": "WASHINGTON (AP) — President Donald Trump received positive economic news this past week and twisted it out of proportion. That impulse ran through days of rhetoric as he hailed the success of a veterans program that hasn't started and saw progress with North …",
        "url": "https://apnews.com/5b405824a9d843a09a641754d84aa1ab",
        "urlToImage": "https://storage.googleapis.com/afs-prod/media/media:636c2c3068b94181ba3c5bcb8d2a3ae9/3000.jpeg",
        "publishedAt": "2018-07-28T12:30:33Z"
      }
    ]
  },
  "source": "associated-press"
}

Configure and run the test like before using the above data.

In this case I also printed the output so you could see that any standard output is captured by the logs.

Load

Now to actually submit this data to a server, you could set up your own, or use file.io which is a free filedropper website, as the code uses below. No API needed!

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

from urllib import request, parse
import json


# What AWS Lambda calls
def lambda_handler(event, context):
    url = 'https://file.io'

    encoded_args = parse.urlencode({'text': event['data']}).encode('utf-8')

    with request.urlopen(url, encoded_args) as req:
        info = json.load(req)

    return {'data': info, 'source': event['source']}

Again, as this is reaching out to an external API, I would increase the default 3 second timeout limit of the Lambda from 3 to 10 seconds.

Woo! We now have three lambda’s that can take each other’s outputs in a row and do a full ETL process. Now lets put them together.

State Machines

AWS Step functions allow for creating a set of various actions to run with each other, and then presented in a pretty auto-generated graph. Back at the console, find the Step functions.

Then create a new state machine.

This is probably the hardest part, is the actual state machine definition. The state language can be confusing, thankfully for our needs we don’t need to do anything complicated.

You can use this code, and will just have to update the actual Resource links under Extract, Transform and Load. (You can even click on them and should be presented with a drop down of your previously created resources so you don’t have to copy the ARNs manually.)

{
  "StartAt": "Set Source",
  "States": {
    "Set Source": {
      "Type": "Pass",
      "Result": {"source": "associated-press"},
      "ResultPath": "$",
      "Next": "Extract"
    },
    "Extract": {
      "Type": "Task",
      "Resource": "<ARN>:function:google-news-extract",
      "ResultPath": "$",
      "Next": "Transform"
    },
    "Transform": {
      "Type": "Task",
      "Resource": "<ARN>:function:google-news-transform",
      "ResultPath": "$",
      "Next": "Load"
    },
     "Load": {
      "Type": "Task",
      "Resource": "<ARN>:function:google-news-load",
      "ResultPath": "$",
      "End": true
    }
  }
}

Notice the first step is not a task, but rather a pass through state that sets the source. We could do this during initialization, but wanted to highlight the ability to add information where needed.

After creation, we will need to start a new execution. It doesn’t need any input, but doesn’t hurt to include a comment if you want.

Then run it!

 

During the middle of an execution, it will show what has been run successfully and what is currently in progress, or erred. At any time, you can click on a specific block to see what it’s input and outputs were.

This function then can be run whenever to run the full ETL process!

Scheduling

For a process like this, you want to run it on a schedule. That means creating a new CloudWatch rule. Search for CloudWatch in the console, then click on Rules on the left hand side.

Then, click the big blue button.

It’s pretty simple to create a fixed rate schedule, and then just make sure to select the right state machine on the right side!