Chris Griffith

Truffle: going from ganache to testnet (ropsten)

Truffle is an amazing suite of tools created by Consensys to develop smart contracts for the Ethereum blockchain network. However, it can be a bit jarring to make the leap from local development to the real test network, ropsten.

Required Setup

For this walk through, I have installed:

I will be using the default example truffle project, MetaCoin, that you can walk through how to unbox here or follow along using your own project.

First things first, if you do NOT have a package.json file yet, make sure to run npm init. This will turn the directory into a node package that we can easily manage with the npm package manager. So we don’t have to download dependices into the global package scope.

Now we can download all the things we are going to need:

npm install bip39 dotenv --save
  • bip39 – used to generate wallet mnemonic
  • dotenv – simple way to read environment variable files

We got everything development wise we need now.

Storing Secrets outside the code

We will have to create a private key or mnemonic, and that means we need somewhere relatively secure to store it. For testnet stuff, this can be as simple as making sure it’s not being put into version control alongside the code. To that end, we are going to use Environment Variables, and will to store them in a file called .env (that’s it, just an extension basically. Make sure to add it to your .gitignore if you’re using git). To learn more, check out the github page for dotenv. But for our purposes, all you need to know is that this file will have a format of:

ENV_VARIABLE_NAME=someting
ANOTHER_ENV=something else

Accessing testnet

The easiest way to reach out to testnet is by using a provider. I personally like using infura.io (free, just requires registration).  After you register and have your API key emailed to you, make sure you select the URL for the test network and add to the .env file using a variable named ROPSTEN_URL.

ROPSTEN_URL=https://ropsten.infura.io/<your-api-key>

It’s also possible to use your own geth node set to testnet, but that is not required.

Next we are going to create our own wallet, if you already have one set up, like with MetaMask, you can skip this next part.

Creating your testnet wallet

So now you have an place to put your secrets, lets create some. This is where bip39 comes in, it will create random mnemonics which can be used as the basis for private key of a wallet. It will be a series of 12 random words.

We could put this generation in a file, but it’s easy enough to just do straight from the command line:

node -e "console.log(require('bip39').generateMnemonic())"

This will output 12 words, DO NOT SHARE THESE ANYWHERE. The ones I am using below are example ones, and also shout NOT be used. Put them in .env file as the variable MNEMONIC. So now your .env file should now contain:

MNEMONIC=candy maple cake sugar pudding cream honey rich smooth crumble sweet treat
ROPSTEN_URL=https://ropsten.infura.io/<your-api-key>

We have our seed, so it’s time to hook it into our code. In your truffle.js or truffle-config.js file, you will need to now import the environment variables and a wallet provider at the top of the file.

require('dotenv').config()
const HDWalletProvider = require('truffle-hdwallet-provider')

After that is added, we will move down to the the exports section, we are going to add a new network, named ropsten. Then are going to use the HDWalletProvider and supply it with the mnemonic and Ifura url provided via environment variables.

module.exports = {
  networks: {
    ropsten: {
      provider: () => new HDWalletProvider(
        process.env.MNEMONIC,
        process.env.ROPSTEN_URL),
      network_id: 3
    },
  },
}

Test and make sure everything’s working by opening a truffle console, specifying our new network.

truffle console --network ropsten

We can then get our public account address via the console.

truffle(ropsten)> web3.eth.getAccounts((err, accounts) => console.log(accounts))
[ '0x627306090abab3a6e1400e9345bc60c78a8bef57' ]

If you are seeing this same wallet address, you did it wrong. Go back and make your own mnemonic, don’t copy the candy one from above.

Funding the wallet

In your development environment, the wallet already has ETH in it to pay for gas and deploying the contract. On the mainnet, you will have to buy some real ETH. On testnet, you can get some for free by using a Faucet, such as https://faucet.ropsten.be/ or if you’re using MetaMask just use https://faucet.metamask.io/.

Make sure to use the address you gathered from the console for the faucet,  and soon you should have test funds to play around with and actually deploy your contract.

Deploying the Contract

Now where the rubber meets the road, getting your contract out into the real (test) world.

truffle deploy --network ropsten

If everything is successful, you’ll get messages like these:

Using network 'ropsten'.

Running migration: 1_initial_migration.js
  Deploying Migrations...
  ... 0xefe70115c578c92bfa97154f70f9c3fbaa2b8400b1da1ee7cdxxxxxxxxxxxxxx
  Migrations: 0x6eeedefb64bd6ee6618ac54623xxxxxxxxxxxxxx
Saving successful migration to network...
  ... 0xd4294e35c166e2dca771ba9bf5eb3801bc1793e30db6a53d4dxxxxxxxxxxxxxx
Saving artifacts...
Running migration: 2_deploy_contracts.js
  Deploying Capture...
  ... 0x446d5e92d6976bb05c85bb95b243d6f7405af6bb12b3b6fe08xxxxxxxxxxxxxx
  Capture: 0x1d2f60c6ef979ca86f53af1942xxxxxxxxxxxxxx
Saving successful migration to network...
  ... 0x0b6f918ccc8e3b82cdf43038a2c32fe1fef66d0fa9aeb2260bxxxxxxxxxxxxxx
Saving artifacts...

Tada! You now have your custom contracts deployed to testnet!

Or, you got an out of gas error, as it is not uncommon to have to adjust the gas price to get it onto the network, as truffle does not automatically figure that out for you. A follow up post will show how to calculate and adjust gas price as needed.

 

 

 

Discover AWS State Machines using Python Lambdas for an ETL process

Step Functions, State Machines, and Lambdas oh my! AWS has really been expanding what you can do without needing to actually stand up any servers. I’m going to walk through a very basic example of how to get going with your own Python code to create an ETL (Extract Transform Load) process using Amazon’s services. And don’t worry, all this goodness is included in the free tier!

The goal of this exercise will be to have an aggregation of news headlines downloaded and transformed into CSV format and uploaded to another service. We are going to achieve this by breaking up each step of the process into its own AWS Lambda.

What are Lambdas?

AWS Lambdas are a “serverless”, stateless way to run snippets of code with no extra initialization or shutdown time.

When to use Lambdas

They are great if you have small highly reusable pieces of code that serve a single purpose. (If you have a few that go together really well, that’s where state machines come in.)  For example if you have some code that does image recognition and you need to use it across multiple projects. Or even just want it to run faster or be more accessible, as Lambdas have several ways they can be initiated, including via an API you can define.

They will NOT fit your purpose if you need something that does a multitude of tasks, will run for a long time, use a lot of memory or update frequently.

Creating a Lambda

Creating your own is a lot easier than a lot of other tutorials seem to show. If you haven’t already, sign up for an AWS account. Then open your AWS console and search for Lambda.

You’ll be presented with a welcome screen most likely, after clicking through “Get Started” or whatever they updated it to this month, you’ll have a screen where you can create new functions as well as check on existing ones.

See the big orange button that even Trump would be proud of? Click it.

As this is probably your first Lambda, you will have to create a new role. Super simple, don’t have to leave the page even. Just give it a new name, and give it a policy template. I used the Simple Microserve permissions as it seemed to fit the bill for me most.

Then you will be greeted with a page with a large amount of info and stuff going on. The part that we are going to be most concerned about is the Function Code area (and will also need Environment Variables to store API keys in).

It may seem like we need to set up triggers or resources for this information to go to, but as we plan to use these inside a state machine, that will handle all that bother for us.

ETL – Extract Transform Load

Now that we know how to make a Lambda, lets look at some code we could use with it. For the state machine we will create later, I want to have an entire process where I pull in information from an outside source (extract), modify to fit my needs (transform) and then put it into my own system (load.)

Extract

As stated above, this scenario involves pulling down data from a news source, in this case we are using News API that allows you to create a free API key to grab top news headlines and link to their stories.

That code is dead simple:

import json
from urllib import request


def retrieve_news(key, source):
    url = f"https://newsapi.org/v2/top-headlines?sources={source}&apiKey={key}"
    with request.urlopen(url) as req:
        return json.loads(req.read())

print(retrieve_news(my_key, 'associated-press')

If I wasn’t using this in a Lambda, I would be using the wonderful Requests module instead, but Python 3’s urllib is at least a lot better than 2s.

So now, we need a way for the Lambda function to call this code and pass along the results in a manor we can use later. On the page to fill in the code, you’ll see a place that says under Function Code that lists the Handler this is the entry point to your code. lambda_function.lambda_handler is the default, which means it will use the function lambda_handler inside the file lambda_function.py as the entry.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import os
import json
from urllib import request


def retrieve_news(key, source):
    url = f"https://newsapi.org/v2/top-headlines?sources={source}&apiKey={key}"
    with request.urlopen(url) as req:
        return json.load(req)


# What AWS Lambda calls
def lambda_handler(event, context):
    key = os.getenv('NEWSAPI_KEY')
    if not key:
        raise Exception('Not all environment variables set')

    if not event['source']:
        raise Exception('Source not set')

    return {'data': retrieve_news(key, event['source']),
            'source': event['source']}

There are two arguments passed into the function, the first is event which is all the information sent to the lambda function (if using a standard JSON object this will be a dictionary, as seen above). The second is context which is a class that will tell you about the current lambda function if necessary, you can learn more about it here, but it will not be used in this example.

Testing the lambda

You may also notice that we are pulling the API key not from the event, but from an environment variable, so make sure to set that as well on the page. Last and not least, I would suggest increasing the timeout for the lambda to 10 seconds, from the default 3.

Before we go on and add the other functions, lets make sure this one works properly.  At the top of the page, where there is a drop down beside test and Actions on the right, click Configure test events we are going to add a new one with the details that will be passed into the event dictionary.

{
  "source": "associated-press"
}

On the pop-up, copy in the above JSON and save it as a new test event.

Hit the test button at the top, and see the results. You should get a big green window that shows you how it ran. If you have a red error window, you will have to figure out what went wrong first.

Transform

This will be our second lambda, so we get to go through the process again of creating a new one (you can use the exiting role from the last one) and copying this code into it. No Environment variables needed this time!

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import csv
from io import StringIO


# What AWS Lambda calls
def lambda_handler(event, context):

    sio = StringIO()
    writer = csv.writer(sio)
    writer.writerow(["Source", "Title", "Author", "URL"])
    for article in event['data']['articles']:
        writer.writerow([
            article['source']['name'],
            article['title'],
            article['author'],
            article['url']
        ])

    csv_content = sio.getvalue()
    print(csv_content)

    return {'data': csv_content,
            'source': event['source']}


The tricky part here is now you need good test data for it. Luckily you can copy the output of the last Lambda (provided snippet below) to do just that.

{
  "data": {
    "status": "ok",
    "totalResults": 5,
    "articles": [
      {
        "source": {
          "id": "associated-press",
          "name": "Associated Press"
        },
        "author": "FRANCES D'EMILIO",
        "title": "Pope accepts resignation of McCarrick after sex abuse claims",
        "description": "VATICAN CITY (AP) — In a move described as unprecedented, Pope Francis has effectively stripped U.S. prelate Theodore McCarrick of his cardinal's title and rank following allegations of sexual abuse, including one involving an 11-year-old boy. The Vatican ann…",
        "url": "https://apnews.com/46e8e15911034e7f971c7542b60a6444",
        "urlToImage": "https://storage.googleapis.com/afs-prod/media/media:b5c82ad2f2b74b50ab9faccf51898309/2628.jpeg",
        "publishedAt": "2018-07-28T16:21:57Z"
      },
      {
        "source": {
          "id": "associated-press",
          "name": "Associated Press"
        },
        "author": "KEVIN FREKING",
        "title": "On trade policy, Trump is turning GOP orthodoxy on its head",
        "description": "WASHINGTON (AP) — President Donald Trump's trade policies are turning long-established Republican orthodoxy on its head, marked by tariff fights and now $12 billion in farm aid that represents the type of government intervention GOP voters railed against a de…",
        "url": "https://apnews.com/57cd042b57054e5790b9b444c561ac3b",
        "urlToImage": "https://storage.googleapis.com/afs-prod/media/media:90f04d837f514d0b984e25bd5153be8a/3000.jpeg",
        "publishedAt": "2018-07-28T16:20:11Z"
      },
      {
        "source": {
          "id": "associated-press",
          "name": "Associated Press"
        },
        "author": "SETH BORENSTEIN and FRANK JORDANS",
        "title": "Science Says: Record heat, fires worsened by climate change",
        "description": "Heat waves are setting all-time temperature records across the globe, again. Europe suffered its deadliest wildfire in more than a century, and one of nearly 90 large fires in the U.S. West burned dozens of homes and forced the evacuation of at least 37,000 p…",
        "url": "https://apnews.com/a4255779e2b6461b9cc8dbf24ea4b96c",
        "urlToImage": "https://storage.googleapis.com/afs-prod/media/media:f9b76dc0354e47caafcfad96c36443ca/3000.jpeg",
        "publishedAt": "2018-07-28T15:03:01Z"
      },
      {
        "source": {
          "id": "associated-press",
          "name": "Associated Press"
        },
        "author": "MICHAEL KUNZELMAN and LARRY NEUMEISTER",
        "title": "No mystery to Supreme Court nominee Kavanaugh's gun views",
        "description": "SILVER SPRING, Md. (AP) — Supreme Court nominee Brett Kavanaugh says he recognizes that gun, drug and gang violence \"has plagued all of us.\" Still, he believes the Constitution limits how far government can go to restrict gun use to prevent crime. As a federa…",
        "url": "https://apnews.com/c8fc0785b429497abf9621efcdb345e8",
        "urlToImage": "https://storage.googleapis.com/afs-prod/media/media:4c3619ea948b4c91b8f2fcdd50162d26/3000.jpeg",
        "publishedAt": "2018-07-28T14:11:06Z"
      },
      {
        "source": {
          "id": "associated-press",
          "name": "Associated Press"
        },
        "author": "HOPE YEN, JOSH BOAK and CHRISTOPHER RUGABER",
        "title": "AP FACT CHECK: Trump's hyped claims on economy, NKorea, vets",
        "description": "WASHINGTON (AP) — President Donald Trump received positive economic news this past week and twisted it out of proportion. That impulse ran through days of rhetoric as he hailed the success of a veterans program that hasn't started and saw progress with North …",
        "url": "https://apnews.com/5b405824a9d843a09a641754d84aa1ab",
        "urlToImage": "https://storage.googleapis.com/afs-prod/media/media:636c2c3068b94181ba3c5bcb8d2a3ae9/3000.jpeg",
        "publishedAt": "2018-07-28T12:30:33Z"
      }
    ]
  },
  "source": "associated-press"
}

Configure and run the test like before using the above data.

In this case I also printed the output so you could see that any standard output is captured by the logs.

Load

Now to actually submit this data to a server, you could set up your own, or use file.io which is a free filedropper website, as the code uses below. No API needed!

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

from urllib import request, parse
import json


# What AWS Lambda calls
def lambda_handler(event, context):
    url = 'https://file.io'

    encoded_args = parse.urlencode({'text': event['data']}).encode('utf-8')

    with request.urlopen(url, encoded_args) as req:
        info = json.load(req)

    return {'data': info, 'source': event['source']}

Again, as this is reaching out to an external API, I would increase the default 3 second timeout limit of the Lambda from 3 to 10 seconds.

Woo! We now have three lambda’s that can take each other’s outputs in a row and do a full ETL process. Now lets put them together.

State Machines

AWS Step functions allow for creating a set of various actions to run with each other, and then presented in a pretty auto-generated graph. Back at the console, find the Step functions.

Then create a new state machine.

This is probably the hardest part, is the actual state machine definition. The state language can be confusing, thankfully for our needs we don’t need to do anything complicated.

You can use this code, and will just have to update the actual Resource links under Extract, Transform and Load. (You can even click on them and should be presented with a drop down of your previously created resources so you don’t have to copy the ARNs manually.)

{
  "StartAt": "Set Source",
  "States": {
    "Set Source": {
      "Type": "Pass",
      "Result": {"source": "associated-press"},
      "ResultPath": "$",
      "Next": "Extract"
    },
    "Extract": {
      "Type": "Task",
      "Resource": "<ARN>:function:google-news-extract",
      "ResultPath": "$",
      "Next": "Transform"
    },
    "Transform": {
      "Type": "Task",
      "Resource": "<ARN>:function:google-news-transform",
      "ResultPath": "$",
      "Next": "Load"
    },
     "Load": {
      "Type": "Task",
      "Resource": "<ARN>:function:google-news-load",
      "ResultPath": "$",
      "End": true
    }
  }
}

Notice the first step is not a task, but rather a pass through state that sets the source. We could do this during initialization, but wanted to highlight the ability to add information where needed.

After creation, we will need to start a new execution. It doesn’t need any input, but doesn’t hurt to include a comment if you want.

Then run it!

 

During the middle of an execution, it will show what has been run successfully and what is currently in progress, or erred. At any time, you can click on a specific block to see what it’s input and outputs were.

This function then can be run whenever to run the full ETL process!

Scheduling

For a process like this, you want to run it on a schedule. That means creating a new CloudWatch rule. Search for CloudWatch in the console, then click on Rules on the left hand side.

Then, click the big blue button.

It’s pretty simple to create a fixed rate schedule, and then just make sure to select the right state machine on the right side!

 

Uploading large files by chunking – featuring Python Flask and Dropzone.js

It can be a real pain to upload huge files. Many services limit their upload sizes to a few megabytes, and you don’t want a single connection open forever either. The super simple way to get around that is simply send the file in lots of small parts, aka chunking.

Chunking Food - Artwork by Clara Griffith

Chunking Food – Artwork by Clara Griffith

Finished code example can be downloaded here.

So there are going to be two parts to making this work, the front-end (website) and backend (server). Lets start on what the user will see.

Webpage with Dropzone.js

Beautiful, ain’t it? The best part is, the code powering it is just as succinct.

<!doctype html>
<html lang="en">
<head>

    <meta charset="UTF-8">

    <link rel="stylesheet" 
     href="https://cdnjs.cloudflare.com/ajax/libs/dropzone/5.4.0/min/dropzone.min.css"/>

    <link rel="stylesheet" 
     href="https://cdnjs.cloudflare.com/ajax/libs/dropzone/5.4.0/min/basic.min.css"/>

    <script type="application/javascript" 
     src="https://cdnjs.cloudflare.com/ajax/libs/dropzone/5.4.0/min/dropzone.min.js">
    </script>

    <title>File Dropper</title>
</head>
<body>

<form method="POST" action='/upload' class="dropzone dz-clickable" 
      id="dropper" enctype="multipart/form-data">
</form>


</body>
</html>

This is using the dropzone.js library, which has no additional dependencies and decent CSS included. All you have to do is add the class “dropzone” to a form and it automatically turns it into one of their special drag and drop fields (you can also click and select).

However, by default, dropzone does not chunk files. Luckily, it is really easy to enable. We are going to add some custom JavaScript and insert it between the form and the end of the body

</form>

<script type="application/javascript">
    Dropzone.options.dropper = {
        paramName: 'file',
        chunking: true,
        forceChunking: true,
        url: '/upload',
        maxFilesize: 1025, // megabytes
        chunkSize: 1000000 // bytes
    }
</script>

</body>

When enabling chunking, it will break up any files larger than the chunkSize and send them to the server over multiple requests. It accomplishes this by adding form data that has information about the chunk (uuid, current chunk, total chunks, chunk size, total size). By default, anything under that size will not have that information send as part of the form data and the server would have to have an additional logic path. Thankfully, there is the forceChunking option which will always send that information, even if it’s a smaller file. Everything else is pretty self-explanatory, but if you want more details about the possible options, just check out their list of configuration options.

Python Flask Server

Onto the backend. I am going to be using Flask, which is currently the most popular Python web framework (by github stars), other good options include Bottle and CherryPy. If you hate yourself or your colleagues, you could also use Django or Pyramid. There are a ton of good example Flask projects, and boiler plates to start from, I am going to use one that I have created for my own use that fits my needs, but don’t feel obligated to use it.

This type of upload will work across any real website back-end. You will simply need two routes, one that displays the frontend, and the other that accepts the file as an upload. At first, lets just view what dropzone is sending us. In this example my project’s name is called ‘pydrop’, and if you’re using my FlaskBootstrap code, this is the views/templated.py file.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import logging
import os

from flask import render_template, Blueprint, request, make_response
from werkzeug.utils import secure_filename

from pydrop.config import config

blueprint = Blueprint('templated', __name__, template_folder='templates')

log = logging.getLogger('pydrop')


@blueprint.route('/')
@blueprint.route('/index')
def index():
    # Route to serve the upload form
    return render_template('index.html',
                           page_name='Main',
                           project_name="pydrop")


@blueprint.route('/upload', methods=['POST'])
def upload():
    # Route to deal with the uploaded chunks
    log.info(request.form)
    log.info(request.files)
    return make_response(('ok', 200))

Run the flask server and upload a small file (under the size of the chunk limit). It should log a single instance of a POST to /upload:

[INFO] werkzeug: 127.0.0.1 "POST /upload HTTP/1.1" 200 -

[INFO] pydrop: ImmutableMultiDict([
     ('dzuuid', '807f99b7-7f58-4d9b-ac05-2a20f5e53782'), 
     ('dzchunkindex', '0'), 
     ('dztotalfilesize', '1742'), 
     ('dzchunksize', '1000000'), 
     ('dztotalchunkcount', '1'), 
     ('dzchunkbyteoffset', '0')])

[INFO] pydrop: ImmutableMultiDict([
     ('file', <FileStorage: 'README.md' ('application/octet-stream')>)])

Lets break down what information we are getting:

dzuuid – Unique identifier of the file being uploaded

dzchunkindex – Which block number we are currently on

dztotalfilesize – The entire file’s size

dzchunksize – The max chunk size set on the frontend (note this may be larger than the actual chuck’s size)

dztotalchunkcount – The number of chunks to expect

dzchunkbyteoffset – The file offset we need to keep appending to the file being  uploaded

Next, let’s upload something just a bit larger that will require it to be chunked into multiple parts:

[INFO] werkzeug: 127.0.0.1 "POST /upload HTTP/1.1" 200 -

[INFO] pydrop: ImmutableMultiDict([
    ('dzuuid', 'b4b2409a-99f0-4300-8602-8becbef24c91'), 
    ('dzchunkindex', '0'), 
    ('dztotalfilesize', '1191708'), 
    ('dzchunksize', '1000000'), 
    ('dztotalchunkcount', '2'), 
    ('dzchunkbyteoffset', '0')])

[INFO] pydrop: ImmutableMultiDict([
    ('file', <FileStorage: '04vfpknzx8z01.png' ('application/octet-stream')>)])



[INFO] werkzeug: 127.0.0.1 "POST /upload HTTP/1.1" 200 -

[INFO] pydrop: ImmutableMultiDict([
    ('dzuuid', 'b4b2409a-99f0-4300-8602-8becbef24c91'), 
    ('dzchunkindex', '1'),
    ('dztotalfilesize', '1191708'),  
    ('dzchunksize', '1000000'), 
    ('dztotalchunkcount', '2'), 
    ('dzchunkbyteoffset', '1000000')])

[INFO] pydrop: ImmutableMultiDict([
    ('file', <FileStorage: '04vfpknzx8z01.png' ('application/octet-stream')>)])

Notice how /upload has been called twice. And that the dzchunkindex and dzchunkbyteoffset have been updated accordingly.  That means our upload function has to be smart enough to handle both new requests and existing multipart uploads.  That means for new requests we should open existing files and only write data after the data already in them, whereas we will create a file and start at the beginning for new uploads. Luckily, both can be accomplished by opening with the same code. First open file in append mode,  then ‘seek’ to the end of the current data (in this case we are relying on the seek offset to be provided by dropzone.)

@blueprint.route('/upload', methods=['POST'])
def upload():
    # Remember the paramName was set to 'file', we can use that here to grab it
    file = request.files['file']

    # secure_filename makes sure the filename isn't unsafe to save
    save_path = os.path.join(config.data_dir, secure_filename(file.filename))

    # We need to append to the file, and write as bytes
    with open(save_path, 'ab') as f:
        # Goto the offset, aka after the chunks we already wrote 
        f.seek(int(request.form['dzchunkbyteoffset']))
        f.write(file.stream.read())
       
    # Giving it a 200 means it knows everything is ok
    return make_response(('Uploaded Chunk', 200))

At this point you should have a working upload script, tada!

But lets beef this up a little bit. The following code improvements make it so we don’t overwrite existing files that have already been uploaded, checks the file size matches what we expect when we’re done, and gives a little more output along the way.

@blueprint.route('/upload', methods=['POST'])
def upload():
    file = request.files['file']

    save_path = os.path.join(config.data_dir, secure_filename(file.filename))
    current_chunk = int(request.form['dzchunkindex'])

    # If the file already exists it's ok if we are appending to it,
    # but not if it's new file that would overwrite the existing one
    if os.path.exists(save_path) and current_chunk == 0:
        # 400 and 500s will tell dropzone that an error occurred and show an error
        return make_response(('File already exists', 400))

    try:
        with open(save_path, 'ab') as f:
            f.seek(int(request.form['dzchunkbyteoffset']))
            f.write(file.stream.read())
    except OSError:
        # log.exception will include the traceback so we can see what's wrong 
        log.exception('Could not write to file')
        return make_response(("Not sure why,"
                              " but we couldn't write the file to disk", 500))

    total_chunks = int(request.form['dztotalchunkcount'])

    if current_chunk + 1 == total_chunks:
        # This was the last chunk, the file should be complete and the size we expect
        if os.path.getsize(save_path) != int(request.form['dztotalfilesize']):
            log.error(f"File {file.filename} was completed, "
                      f"but has a size mismatch."
                      f"Was {os.path.getsize(save_path)} but we"
                      f" expected {request.form['dztotalfilesize']} ")
            return make_response(('Size mismatch', 500))
        else:
            log.info(f'File {file.filename} has been uploaded successfully')
    else:
        log.debug(f'Chunk {current_chunk + 1} of {total_chunks} '
                  f'for file {file.filename} complete')

    return make_response(("Chunk upload successful", 200))

Now lets give this a try:

[DEBUG] pydrop: Chunk 1 of 6 for file DSC_0051-1.jpg complete
[DEBUG] pydrop: Chunk 2 of 6 for file DSC_0051-1.jpg complete
[DEBUG] pydrop: Chunk 3 of 6 for file DSC_0051-1.jpg complete
[DEBUG] pydrop: Chunk 4 of 6 for file DSC_0051-1.jpg complete
[DEBUG] pydrop: Chunk 5 of 6 for file DSC_0051-1.jpg complete
[INFO] pydrop: File DSC_0051-1.jpg has been uploaded successfully

Sweet! But wait, what if we remove the directories where the files are stored? Or try to upload the same file again?

(Dropzone’s text out of the box is a little hard to read, but it says “File already exists” on the left and “Not sure why, but we couldn’t write file the disk” on the right. Exactly what we’d expect.)

2018-05-28 14:29:19,311 [ERROR] pydrop: Could not write to file
Traceback (most recent call last):
    ....
FileNotFoundError: [Errno 2] No such file or directory:

We get error message on the webpage and in the logs, perfect.

I hope you found this information useful and if you have any suggestions on how to improve it, please let me know!

Thinking further down the road

In the long-term I would have a database or some permanent storage option to keep track of file uploads. That way you could see if one fails or stops halfway and be able to remove incomplete ones. I would also base saving files first into a temp directory based off their UUID then, when complete, moving them to a place based off their file hash. Would also be nice to have a page to see everything uploaded and manage directories or other options, or even password protected uploads.

 

Interprocess Communications

Inter-Office-Process Communications, Artwork by Clara Griffith

Inter-Office-Process Communications, Artwork by Clara Griffith

Dealing with communications across programs, processes or even threads can be a real pain in the patella. Each situation usually calls for something slightly different and has to work with a limited set of options. On top of that, a lot of tutorials I see are people simply starting one program from inside the other (for example with subprocess) but that just won’t do for my taste.

Just show me the table of which IPC method I need 

I’m going to go through options that can handle have two independent programs, started on their own, talk with each other. And when I say talk, I mean able to at least execute tasks in the other’s process and, in most cases, transfer data as well.

Pipes

Probably one of the most old school ways of transferring data is to just pipe it back and forth as needed. This is how terminals and shells interact with programs, by using the standard pipes of stdin, stdout, and stderr (standard in, standard out, standard error).

Every time you print you are writing to stdout and it is very common to use this in Python when running another program from within it (i.e. if we did subprocess.run('bob.py') we could easily communicate with it via the pipes). These are anonymous pipes, that exist only while the program is running. To communicate between different programs you should use a Named Pipe which creates a file descriptor that you connect to as an entry point.

Here are some short examples showing their use on Linux. It is also possible on Windows with the pywin32 module or do it yourself with ctypes. But I find it easier to just use other methods on Windows.

Our example will be two programs, the first of which, Alice.py is simply a text converter. The message sent will be in three parts, a starting identifier, X, four integers to denote the message length, and the message itself. (This is by no means a common standard or practical, just something I made up for a quick example.)

So to send ‘Howdy’, it would look like X0005HOWDY, X being the identifier of a new message, 0005 denoting the length of the message to come, and HOWDY being the message body.

Alice.py

import time 
import os 

# This is a full path on the file system, 
# in this scenario both are run from the same directory
pipename = 'fifo'
os.mkfifo(pipename)

# For non-blocking pipes, you have to be reading from it 
# before you can have something else writing to it
pipein = os.open(pipename, os.O_NONBLOCK|os.O_RDONLY) 
p = os.fdopen(pipein, 'rb', buffering=0)

# This program will simply make the output of the other program more readable
def converter(message):
    return message.decode('utf-8').replace("_", " ").replace("-", " ").lower()

while True:
    # Wait until we have a message identifier
    new_data = p.read(1)
    if new_data == b'X':
        # Figure out the length of the message 
        raw_length = p.read(4)
        message = p.read(int(raw_length))
        # Read and convert the message 
        print(converter(message))
    elif new_data:
        # If we read a single byte that isn't an identifier, something went wrong
        raise Exception('Out of sync!') 
    else:
        time.sleep(1)

That’s all our conversion server is. It creates a pipe that something else can connect to, and will convert the incoming messages to lower case and replace dashes and underscores with spaces.

So let’s have Bob.py talk to Alice.py, but as Bob is a computer program, he sometimes spits out gobilty gook messages that need some help.

Bob.py

import os 

# Connect to the pipe created by Alice.py
pipename = 'fifo'
pipeout = os.open(pipename, os.O_NONBLOCK|os.O_WRONLY) 
p = os.fdopen(pipeout, 'wb', buffering=0, closefd=False)

def write_message(message):
    """Covert a string into our message format and send it"""
    length = '{0:04d}'.format(len(message))
    p.write(b'X')
    p.write(length.encode('utf-8'))
    p.write(message.encode('utf-8'))
     
write_message('TERriBLe_looking-machine-oUTput')

Start up Alice.py first, then Bob.py.

Alice will print out a pretty message of:  terrible looking machine output

While pipes are super handy for terminal usage and running programs inside each other, it has become uncommon to use named pipes as actual comms between two independent programs. Mainly because of the required setup procedure and non-uniformity across operating systems. This has lead to more modern and cross-compatible being preferred.

Pros: 

  • Fast and Efficient
  • No external services

Cons:

  • Not cross-platform compatible code
  • Difficult to code well

Files

Another ye olde (yet perfectly valid) way to communicate between programs is to simply create files that each program can interpret. Sometimes it’s as simple as having a lockfile. If the lockfile exists, it can serve as a message to other programs to let it finish before they do something, or even to stop new instances of itself from running. For example, running system updates in most Linux environments will create a lock file to make sure that two different update processes aren’t started at the same time.

It’s possible to take that idea further and share a file or two to actually transfer information. In these examples, the two programs will both work out of the same file, with a lock file for safety. (You can write your own code for file lock control, but I will be using the py-filelock package for brevity.)

There are a lot of possible ways to format the shared file, this example will keep it very basic, giving each command a unique id (used to know if command has been run before or not), then the command, and it’s argument. The same dictionary will also leave room for a result or an error message.

The shared JSON file will have the following format:

{
  "commands": { 
    "<random uuid>": {
      "command": "add",
      "args": [2,5],
      "result": 7  # "result" field only exists after server has responded
      # if "error" key exists instead of "result", something bad happened
    }
  }
}

The server, Alice.py will then keep looping, waiting for the shared file to change. Once it does, Alice will obtain the lock for safety (so there isn’t any corrupt JSON from writing being interrupted), read the file, run the desired command, and save the result.  In a real world scenario the lock would only be obtained during the individual reading and writing phases, so to keep the lock held as short as possible by a single program. But that complicates the code (as then you would have to do a second read before writing and only update the sections you ran, in case there were other ones added) and makes it a bit much for an off the shelf example.

Alice.py

import json
import time
import os

from filelock import FileLock

# Keep track of commands that have been run 
completed = []
# Track file size to know when new commands have been added
last_size = 0

lock_file = "shared.json.lock"
lock = FileLock(lock_file)
shared_file = "shared.json"


# Not totally necessary, but if you ever need to raise exceptions
# it's better to create your own than use the built-ins
class AliceBroke(Exception):
    """Custom exception for catching our own errors"""


# Functions that can be executed by Bob.py
def adding(a, b):
    if not isinstance(a, (int, float)) or not isinstance(b, (int, float)):
        raise AliceBroke('Yeah, we are only going to add numbers')
    return a + b


def multiply(a, b):
    if not isinstance(a, (int, float)) or not isinstance(b, (int, float)):
        raise AliceBroke('Yeah, we are only going to multiply numbers')
    return a * b


# Right now just have a way to map the incoming strings from Bob to functions
# This could be expanded to include what arguments it expects for pre-validation
translate = {
    "add": adding,
    "multiply": multiply
}


def main():
    global last_size, completed
    while True:
        # Poor man's file watcher
        file_size = os.path.getsize(shared_file)
        if file_size == last_size:
            # File hasn't changed
            time.sleep(.1)
            continue
        else:
            print(f'File has changed from {last_size} to {file_size} bytes,'
                  f' lets see what they want to do!')
            last_size = file_size

        run_commands()
        last_size = os.path.getsize(shared_file)


def run_commands():
    # Grab the lock, once it is acquired open the file
    with lock, open(shared_file, 'r+') as f:
        data = json.load(f)
        # Iterate over the command keys, if we haven't run it yet, do so now
        for name in data['commands']:
            if name not in completed:

                command = data['commands'][name]['command']
                args = data['commands'][name]['args']
                print(f'running command {command} with args {args}')

                try:
                    data['commands'][name]['result'] = translate[command](*args)
                except AliceBroke as err:
                    # Arguments weren't the type we were expecting
                    data['commands'][name]['error'] = str(err)
                except TypeError:
                    data['commands'][name]['error'] = "Incorrect number of arguments"
                finally:
                    completed.append(name)
        # As we are writing the data back to the same file that is still
        # open, we need to go back to the begging of it before writing
        f.seek(0)
        json.dump(data, f, indent=2)


if __name__ == '__main__':
    # Create / blank out the shared file
    with open(shared_file, 'w') as f:
        json.dump({"commands": {}}, f)
    last_size = os.path.getsize(shared_file)

    try:
        main()
    finally:
        # Be nice and clean up after ourselves
        os.unlink(shared_file)
        os.unlink(lock_file)

Our client, Bob.py will ask some simple commands that Alice supports and wait until it gets the answer back.

Bob.py

import json
import time
import uuid

from filelock import FileLock

completed = []
last_size = 0

lock = FileLock("shared.json.lock")
shared_file = "shared.json"


def ask_alice(command, *args, wait_for_answer=True):
    # Create a new random ID for the command
    # could be as simple as incremented numbers 
    cmd_uuid = str(uuid.uuid4())
    with lock, open(shared_file, "r+") as f:
        data = json.load(f)
        data['commands'][cmd_uuid] = {'command': command, 'args': args}
        f.seek(0)
        json.dump(data, f)
    if wait_for_answer:
        return get_answer(cmd_uuid)
    return cmd_uuid


def get_answer(cmd_uuid):
    # Wait until we get an answer back for a command
    # Ideally this would be more of an asynchronous callback, but there 
    # are plenty of cases where serialized processes like this must happen
    while True:
        with lock, open(shared_file) as f:
            data = json.load(f)
            command = data['commands'][cmd_uuid]
            if 'result' in command:
                return command['result']
            elif 'error' in command:
                raise Exception(command['error'])
        time.sleep(.2)



print(f"Lets add 2 and 5 {ask_alice('add', 2, 5, wait_for_answer=True)}")

print(f"Lets multiply 8 and 5 {ask_alice('multiply', 2, 5, wait_for_answer=True)}")

print("Lets break it and cause an exception!")
ask_alice('add', 'bad', 'data', wait_for_answer=True)

Start up Alice.py first, then Bob.py.

Alice will return:

File has changed from 16 to 90 bytes, lets see what they want to do!
running command add with args [2, 5]
File has changed from 103 to 184 bytes, lets see what they want to do!
running command multiply with args [2, 5]
File has changed from 198 to 283 bytes, lets see what they want to do!
running command add with args ['bad', 'data']

Bob

Lets add 2 and 5: 7
Lets multiply 8 and 5: 40
Lets break it and cause an exception!
Traceback (most recent call last):
     ...
Exception: Yeah, we are only going to add numbers

 

Pros:

  • Cross-platform compatible
  • Simple to implement and understand

Cons: 

  • Have to worry about File System security if anything sensitive is being shared
  • Programs now responsible to clean up after themselves

 

Message Queue

Welcome to the 21st Century, where message queues serve as quick and efficient ways to transfer commands and information. I like to think of them as an always running middlemen that can survive outside your process.

Here you are spoiled for choice with options:  ActiveMQ, ZeroMQ, Redis, RabbitMQSparrowStarlingKestrelAmazon SQSBeanstalkKafkaIronMQ, and POSIX IPC message queue are the ones I know. You can even use NoSQL databases like mongoDB or couchDB in a similar manner, though for simple IPC I suggest against using those.

I suggest looking into RabbitMQ’s tutorials to get a good in-depth look at how you can write code for it (and across multiple different languages). RabbitMQ is cross-platform and even provides Windows binaries directly, unlike some others.

For my own examples we will use Redis, as they have the best summary I can steal quote from their website https://redis.io/ yet have a surprising lack of Python tutorials.

Redis is an open source (BSD licensed), in-memory data structure store, used as a database, cache and message broker. It supports data structures such as strings, hashes, lists, sets, sorted sets with range queries, bitmaps, [etc…]

That’s it, it stores data somewhere that is easily access from multiple programs, just what you need for IPC. You can use it as a keystore (for data transfer and storage) or, like I am about to show, a publisher / subscriber model, better for command and control.

Alice.py is our service and simply subscribes to a channel and waits for any updates and will print them to the screen.

import time

import redis

r = redis.StrictRedis()
p = r.pubsub()


def handler(message):
    print(message['data'].decode('utf-8'))


p.subscribe(my_channel=handler)
thread = p.run_in_thread(sleep_time=0.001)

try:
    # Runs in the background while your program otherwise operates
    time.sleep(1000)
finally:
    # Shut down cleanly when all done
    thread.stop()

Bob.py is another program just simply pushes a message to that channel

import redis

r = redis.StrictRedis()
p = r.pubsub()

r.publish('my_channel', 'example data'.encode('utf-8'))

Start up Alice.py first, then Bob.py.

Alice.py will print example data, and can be stopped by pressing CTRL+C

Pros:

  • Built-in publisher / subscriber methodology, don’t have to do manual checks
  • Built-in background thread running
  • (Can be) Cross-platform compatible
  • State can be saved if a program exits unexpectedly (depending on setup)

Cons: 

  • Requirement for external service
  • More overhead
  • Have to worry about the external service’s security and how you connect to it

Shared Memory

If written correctly, this can one of the fastest way to transfer data or execute tasks between programs, but it is also the most low-level, meaning a lot more coding and error handling yourself.

Think of using memory in the same way as the single file with using a lock. While one program writes to memory, the other has to wait until it finishes, then can read it and write its own thing back. (It’s possible to also use multiple sections of memory, just it gets to be a lot of example code really fast so I am holding off.)

So now you have to create the Semaphore (basically a memory lockfile) and mapped memory that each program can access.

On Linux (and Windows with Cywin) you can use posix_ipc, and check out their example here.

A lot simpler is to just use the built in mmap directly and share a file in memory. This example won’t even go as far as creating the lockfile,  just showing off the basics only.

Alice.py is going to create and write stuff to the memory mapped file

Alice.py

import time
import mmap
import os

# Create the file and fill it with line ends
fd = os.open('mmaptest', os.O_CREAT | os.O_TRUNC | os.O_RDWR)
os.write(fd, b'\n' * mmap.PAGESIZE)

# Map it to memory and write some data to it, then change it 10 seconds later
buf = mmap.mmap(fd, mmap.PAGESIZE, access=mmap.ACCESS_WRITE)
buf.write(b'now we are in memory\n')
time.sleep(10)
buf.seek(0)
buf.write(b'again\n')

Bob.py will simply read the content of the memory mapped file. Showing that it does know when the contents change.

Bob.py

import mmap
import os
import time


# Open the file for reading only
fd = os.open('mmaptest', os.O_RDONLY)
buf = mmap.mmap(fd, mmap.PAGESIZE, access=mmap.ACCESS_READ)

# Print when the content changes
last = b''
while True:
    buf.seek(0)
    msg = buf.readline()
    if msg != last:
        print(msg)
        last = msg
    else:
        time.sleep(1)

This example isn’t the most friendly to run, as you have to start-up Alice.py and then Bob.py within ten seconds after that, but it shows the basics of how memory mapping is very similar to just using a file.

Pros:

  • Fast
  • Cross-platform compatible code

Cons: 

  • Can be difficult to write
  • Limited size to accessible memory
  • Using mmap without posix_ipc will also create a physical file with the same content

 

Signals

On Linux? Don’t want to send information, just need to toggle state on something? Send a signal!

Imagine you have a service, Alice, that anything local can connect to, but you want to be able to tell them if the service goes down or comes back up.

In your clients code, they should register their process identification number with Alice when they first connect to her, and have a method to capture a custom signal to know Alice‘s current state.

Bob.py

import signal, os

service_running = True

my_pid = os.getpid()

# 'Touch' a file as the name of the program's PID 
# in Alice service's special directory
# Make sure to delete this file when your program exists!
open(f"/etc/my_service/pids/{my_pid}", "w").close()


    
def service_off(signum, frame):
    global service_running 
    service_running = False

def service_on(signum, frame):
    global service_running 
    service_running = True

signal.signal(signal.SIGUSR1, service_off)
signal.signal(signal.SIGUSR2, service_on)

SIGUSR1 is a custom signal reserved for custom use, as well as SIGUSR2, so they are safe to use in this manner without fear of secondary actions happening. (For example, if you send something like SIGINT , aka interrupt, it will just kill your program if not caught properly.)

Alice.py will then simply go through the directory of PID files when it starts up or shuts down, and sends each one that signal to let them know she’s back online.

import os
import signal 

def let_them_know(startup=True): 
   signal_to_send = signal.SIGUSR1 if startup else signal.SIGUSR2
    for pid_file in os.listdir(f"/etc/my_service/pids/"):
        # Put in try catch block for production
        os.kill(int(pid_file), signal_to_send) 

Pros:

  • Super simple

Cons: 

  • Not cross-platform compatible
  • Cannot transfer data

Sockets

The traditional IPC. Every time I look for different IPC methods, sockets always come up. It’s easy to understand why, they are cross platform and natively supported by most languages.

However, dealing directly with raw sockets is very low-level and require a lot more coding and configuration. The Python standard library has a great example of an echo server to get you started.

But this is batteries included, everyone-else-has-already-done-the-work-for-you Python. Sure you could set up everything yourself, or you can use the higher level Listeners and Clients from the multiprocessing library.

Alice.py is hosting a server party, and executing incoming requests.

from multiprocessing.connection import Listener

def function_to_execute(*args):
    """" Our handler function that will run 
         when an incoming request is a list of arguments
    """
    return args[0] * args[1]

with Listener(('localhost', 6000), authkey=b'Do not let eve find us!') as listener:
    # Looping here so that the clients / party goers can 
    # always come back for more than a single request
    while True:
        print('waiting for someone to ask for something')
        with listener.accept() as conn:
            args = conn.recv()
            
            if args == b'stop server':
                print('Goodnight')
                break
            elif isinstance(args, list):  
                # Very basic check, must be more secure in production
                print('Someone wants me to do something')
                result = function_to_execute(*args)
                conn.send(result)
            else:
                conn.send(b'I have no idea what you want me to do')

Bob.py is going to go for just a quick function and then call it a night.

from multiprocessing.connection import Client

with Client(('localhost', 6000), authkey=b'Do not let eve find us!') as conn:
    conn.send([8, 8])
    print(f"What is 8 * 8? Why Alice told me it's {conn.recv()}")

# We have to connect again because Alice can only handle one request at a time
with Client(('localhost', 6000), authkey=b'Do not let eve find us!') as conn:
    # Bob's a party pooper and going to end it for everyone
    conn.send(b'stop server')

Start up Alice.py first, then run Bob.py. Bob will quickly exit with the message What is 8 * 8, Why Alice told me it's 64

Alice will have four total messages:

waiting for someone to ask for something
Someone wants me to do something
waiting for someone to ask for something
Goodnight

Pros:

  • Cross-platform compatible
  • Can be extended to RPC

Cons: 

  • Slower
  • More overhead

 

RPC

Believe it or not, you can use a lot of the methods from remote procedure calls locally. Even sockets and message queues can already be set up to work for either IPC or RPC.

Now, barring using IR receivers and transmitters or laser communications, you are probably going to connect remote programs via the internet. That means most RPC standards are going to be networking based, aka on sockets. So it’s less about deciding which protocol to use, and more on choosing which data transmission standard to use. Two that I have used are JSONRPC, for normal humans, and XMLRPC, for if you are a XML fan that like sniffing glue and running with scissors 1. There is also SOAP, for XML fans who need their acronyms to also spell something, and Apache Thrift that I found while doing research for this article that I have not touched. Those standards transfer data as text, which makes it easier to read, but inefficient. Newer options, like gRPC use protocol buffers to serialize the data and reduce overhead.

It’s also very common and easy to just write up a simple HTTP REST interface for you programs, using a lightweight framework like bottle or flask, and communicate that way.

In the future (if they don’t already exist) I expect to see even more choices with WebSocket or WebRTC based communications.

 

Summary

Lets wrap this all up with a comparison table:

MethodCross Platform CompatibleRequires File or File DescriptorRequires External ServiceEasy to write / read code 5
PipesNoYesNoNo
FilesYesYesNoYes
Message QueuesYes4NoYesYes
Shared MemoryYes3Yes2NoNo
SocketsYesNoNoYes
SignalsNoNoNoYes

Hopefully that at least clears up why Sockets are the go-to IPC method, as they have the most favorable traits. Whereas for me, I usually want something either a little more robust, like message queues, or REST APIs so that it can be used locally or remotely. Which, to be fair, are built on top of sockets.

 

 

ThreadPools explained – In the deep end

Thread and Multiprocessing Pools are an underused feature of Python. In my opinion, they are the easiest way to dip your feet into concurrency, and yet still the method I use most often.

Threads in a Pool

Threads in a Pool, Artwork by Clara Griffith

They allow you to easily offload CPU or I/O bound tasks to a pre-instantiated group (pool) of threads or processes. One of the great things about them, is that both the ThreadPool and Pool (Multiprocessing) classes have the same methods, so all the following examples are interchangeable between them. (This article will not get into the differences between Threading and Multiprocessing in Python, as that is worth a post on its own.)

Map

Let’s jump in with a super simple example. We will use a pool of 5 workers to square numbers. We will use the map method which takes a function as it’s first argument (make sure it’s not called, no parentheses!)  then a list (or iterable) of arguments, which will be passed singularly to that function per Thread or Process. That way, multiple instances of that function are working at the same time!

from multiprocessing.pool import ThreadPool, Pool

def square_it(x):
    return x*x

# On Windows, make sure that multiprocessing doesn't start
# until after "if __name__ == '__main__'" 

with Pool(processes=5) as pool:
   results = pool.map(square_it, [5, 4, 3, 2 ,1])

print(results) 
# [25, 16, 9, 4, 1]

Think of map as running a for loop over the list and sending each item in it to a worker process as soon as it is free (little more complicated internally, but we’ll come back to that later). Each process has been told to run the function square_it against that item.

So in this case, all the processes will be running the same function against a set of data, and waiting until all the data has been processed to return. The list of returned data will be in order based on the iterable that was put in. This is super handy if you want to do something like make a lot of requests to different websites and wait for the results, or need to run a lot of calculations. I actually did just that in the Birthday Paradox blog post using the Multiprocessing pool to speed up probability calculations.

Thankfully, Pools are versatile, they have several other handy methods, and you can let most of them run in the background, instead of waiting on it to finish immediately, by going asynchronous.

Async

So lets use the map style functionality again, but this time we want to not bother waiting around for the results, which means we need to use map_async. Lets say you want to capture a lot of images off of a website. You populate the list of links to the images ,then you just need to add those to the pool and download them.

from multiprocessing.pool import ThreadPool
import time
import reusables

# When downloading from a website, be kind with how often 
# and how many requests you are making
tp = ThreadPool(processes=2)

urls = ["https://codecalamity.com/wp-content/uploads/2017/10/birthday.png",
        "https://codecalamity.com/wp-content/uploads/2017/06/Capture.png"]


# Same as the previous map, taking a function and iterable,
# just with an additional callback function 
tp.map_async(reusables.download, urls, callback=print) 
# Also, this is why having print as a function in python 3 is so dang handy


# Do something else 
time.sleep(10) 

# Results are printed when done 
# ['/home/me/birthday.png', '/home/me/Capture.png']

# Not using a context manager means we have to clean house ourselves. 
tp.terminate()
tp.close() 

This is really advantageous in scenarios where you need an instant reply, such as an API call or working with a GUI. Then later a callback can either update the GUI or database. There is also an error_callback argument it can take if the function raises an exception. The error_callback function will receive the Exception caught when the worker erred, that way you can decide if you want to ignore it, or raise it in the main Thread.

You can also ignore using the callbacks, and deal with the AsyncResult directly. It has the methods:

  • ready – See if the results are available
  • success – Boolean, True if it didn’t raise an exception
  • wait – takes a timeout, will wait for the results to be ready
  • get – Grab the results, also takes a timeout, will automatically raise the exception if one occurred.
from multiprocessing.pool import ThreadPool
import time

timeout = 25

with ThreadPool(processes=4) as tp:
    async_result = tp.map_async(time.sleep, [5, 4])

    for i in range(timeout):
        time.sleep(1)

        if async_result.ready():
            if async_result.successful():
                print(async_result.get())
                break
    else:
        print("Task did not complete on time, or with errors")

Three of the method’s available have corresponding async methods:

  • map – map_async
  • starmap – starmap_async
  • apply – apply_async

Passing additional arguments with “partial” or “starmap”

Now, notice that map will only provide a single argument to a function. So if you have a function that takes more than one argument, you will either need to use partial to redefine the function with default parameters, or use starmap which takes an iterable of tuples.

So lets use partial from functools first.

from multiprocessing.pool import ThreadPool
from functools import partial
import time
import reusables

urls = ["https://codecalamity.com/wp-content/uploads/2017/10/birthday.png",
        "https://codecalamity.com/wp-content/uploads/2017/06/Capture.png"]


def download_file(url, wait_time):
    time.sleep(wait_time)
    return reusables.download(url)

# Replace the required `wait_time` with a default of 5
down_the_file = partial(download_file, wait_time=5)


with ThreadPool(2) as tp:
    # Notice we are now using the new function, down_the_file we created with partial
    print(tp.map(down_the_file, urls))

Not too difficult, just you’re stuck with using the same setting for everything. If you want to customize it, you can send multiple arguments using starmap.

Starmap

from multiprocessing.pool import ThreadPool
import reusables
import time

# Notice the list now is a list of tuples, that have a second argument, 
# that will be passed in as the second parameter. In this case, as wait_time
urls = [("https://codecalamity.com/wp-content/uploads/2017/10/birthday.png", 4),
        ("https://codecalamity.com/wp-content/uploads/2017/06/Capture.png", 10)]


def download_file(url, wait_time):
    time.sleep(wait_time)
    return reusables.download(url)


with ThreadPool(2) as tp:
    # Using `starmap` instead of just `map`
    print(tp.starmap(download_file, urls))

Apply

Both map and starmap take a single function to run a lot of things against. But there are many times where you just want background workers to take on a variety of different tasks. That’s where apply comes in.

from multiprocessing.pool import Pool
import reusables

pool = Pool(processes=5)

# apply takes `args`, aka arguments, in a tuple format 
# and `kwds`, aka keyword arguments, as a dictionary

print(pool.apply(sum, args=([1, 2, 3, 4, 5], )))
# 15
print(pool.apply(abs, (-5.67, )))
# 5.67
print(pool.apply(reusables.download, 
                 args=("http://example.com", ), 
                 kwds=dict(save_to_file=False)))
# b'<!doctype html>\n<html>\n<head>\n   ...


pool.terminate()
pool.close()

Additional Content

You can of course also mix and match any methods while the pool is still not terminated.

from multiprocessing.pool import Pool
import time

pool = Pool(processes=5)

print(pool.apply(sum, args=([1, 2, 3, 4, 5], )))
# 15
pool.apply_async(abs, (-5.67, ), callback=print)
# 5.67
pool.map_async(any, [(True, False), (False, False)], callback=print)
# [True, False]

time.sleep(1)
pool.terminate()
pool.close()

imap

Now remember how I said map basically iterates over the list and send each one to a worker? Well that’s not entirely true. imap does that, map can be a lot faster because it breaks the list into chunks first and sends each to a worker’s queue to make sure it always has something in the pipeline.

Ok, cool, so map is faster, what’s the point of imap then? With speed comes the price of a larger memory footprint. When map takes in an interable, it converts it to a list to a list so it can be chucked out, whereas imap will only pull items out of the iterable as needed (it defaults to 1 for chunksize, aka how many it will pull out, but it can be increased). It also has the advantage of giving you the results as soon as possible (as an iterable, hence the name imap), while still preserving order. There is also imap_unordered which will simply give you the results as fast as they come, in the order they finish.

from multiprocessing.pool import ThreadPool
import time

def wait(x):
    time.sleep(x)
    return x

iterable = [0, 4, 5, 2]

with ThreadPool(processes=4) as tp:

    print("map")
    map_start = time.time()
    for map_result in tp.map(wait, iterable):
        print(f"{map_result} took {time.time() - map_start:.0f} seconds")

    print("\nimap")
    imap_start = time.time()
    for imap_result in tp.imap(wait, iterable):
        print(f"{imap_result} took {time.time() - imap_start:.0f} seconds")

    print("\nimap_unordered")
    imap_unordered_start = time.time()
    for imap_un_result in tp.imap_unordered(wait, iterable):
        print(f"{imap_un_result} took" 
              f"{time.time() - imap_unordered_start:.0f} seconds")

Using map it will wait all 5 seconds (the largest wait time in the argument list) to return all the results at once.

map
0 took 5 seconds
4 took 5 seconds
5 took 5 seconds
2 took 5 seconds

imap will immediately return the 0 result, then four seconds later will return the 4, one second later it will return 5 and 2 at the same time.

imap
0 took 0 seconds
4 took 4 seconds
5 took 5 seconds
2 took 5 seconds

imap_unordered will return them as soon as each one finishes. (Notice this won’t always be the shortest one first, as the argument list may be longer than the number of worker processes).

imap_unordered
0 took 0 seconds
2 took 2 seconds
4 took 4 seconds
5 took 5 seconds

The Methods

Here are the methods, their parameters and docstring, and an overview of what they are.

map

map(func, iterable, chunksize=None):
    ''' Apply `func` to each element in `iterable`, collecting the results
        in a list that is returned. '''

map takes an interable and turns it into a list, then breaks it up to send to worker processes. Each worker process will run the function given as the first argument with a single argument (given to it from the iterable). The results will be collected into a list and returned, in order, when all results finish. Returns a list.

starmap

starmap(func, iterable, chunksize=None):
    ''' Like `map()` method but the elements of the `iterable` are expected to
        be iterables as well and will be unpacked as arguments. Hence
        `func` and (a, b) becomes func(a, b). '''

starmap allows multiple arguments to be given to the function by passing in an iterable of iterables (i.e. list of tuples, list of lists, generator of generators, etc..).  The inner iterables do NOT have to be the same length either, so multiple defaults could be overridden for one item of the list, but not for all of them. Returns a list.

apply

apply(func, args=(), kwds={}):
    ''' Equivalent of `func(*args, **kwds)`. '''

apply runs a single function with one of the pool’s workers. Returns the result of the function.

imap

imap(func, iterable, chunksize=1):
    ''' Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. '''

imap takes the same arguments as map, however it’s result is iterable and will start returning results, in order, as soon as they have finished. Returns an interable.

imap_unordered

imap_unordered(self, func, iterable, chunksize=1):
    ''' Like `imap()` method but ordering of results is arbitrary. '''

imap_unordered is the same as imap except it will return each result as soon as it finishes, not in order. Returns an interable.

The Asynchronous Methods

The asynchronous versions of map, starmap, and apply all take the same parameters as their original functions, as well as a callback and error_callback parameters.

  • callback – function that takes a single argument, that will be the result(s) of the function(s) run.
  • error_callback – function that takes a single argument, which will be the Exception raised (if one occurs).

The async methods immediately return an AsyncResult (also called ApplyResult or sub-classed to MapResult)  that can be directly used to view the results and check on its status. (View the Async section above for an example).

  • ready – See if the results are available
  • success – Boolean, True if it didn’t raise an exception
  • wait – takes a timeout, will wait for the results to be ready
  • get – Grab the results, also takes a timeout, will automatically raise the exception if one occurred.

map_async

map_async(func, iterable, chunksize=None, callback=None, error_callback=None):
    ''' Asynchronous version of `map()` method. '''

starmap_async

starmap_async(func, iterable, chunksize=None, callback=None, error_callback=None):
    ''' Asynchronous version of `starmap()` method. '''

apply_async

apply_async(func, args=(), kwds={}, callback=None, error_callback=None):
    ''' Asynchronous version of `apply()` method. '''