Create an image in Task Manager from CPU Usage

A lot of people have pass around a internet video of someone using a crazy high core count computer to display a video via Task Manager’s CPU usage. The one problem with it: it’s probably fake. Task Manager displays the last 60 seconds of activity, not a instant view like shown in the video. But what if we kept the usage the same for sixty seconds, could we at least make an image?

So the first problem, is how do we generate enough load for a CPU usage to noticeably increase? Luckily there is an old goto into the benchmark world that is really simple to implement. Square Roots. Throw a few million square roots at a CPU and watch it light up.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from itertools import count
import math
def run_benchmark():
# Warning, no way to stop other than Ctrl+C or killing the process
for num in count():
math.sqrt(num)
run_benchmark()
from itertools import count import math def run_benchmark(): # Warning, no way to stop other than Ctrl+C or killing the process for num in count(): math.sqrt(num) run_benchmark()
from itertools import count
import math

def run_benchmark():
    # Warning, no way to stop other than Ctrl+C or killing the process
    for num in count():
        math.sqrt(num)

run_benchmark()

The real problem is making sure we have a way to stop it. The easiest way is with a timeout, but just the slight work of gathering system time may throw off how much work we want to do in the future. So lets only do that, say, every 100,000 operations.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from itertools import count
import math
import time
def run_benchmark(timeout=60):
start_time = time.perf_counter()
for num in count():
if num % 100_000 == 0.0:
if time.perf_counter() > start_time + timeout:
return
math.sqrt(num)
run_benchmark()
from itertools import count import math import time def run_benchmark(timeout=60): start_time = time.perf_counter() for num in count(): if num % 100_000 == 0.0: if time.perf_counter() > start_time + timeout: return math.sqrt(num) run_benchmark()
from itertools import count
import math
import time 

def run_benchmark(timeout=60):
    start_time = time.perf_counter()
    for num in count():
        if num % 100_000 == 0.0:
            if time.perf_counter() > start_time + timeout:
                return
        math.sqrt(num)

run_benchmark()

Excellent, now we can pump up a core to max workload. However, we currently have no control over which CPU it will run on! We have to set the “affinity” of the process. There is no easy way to do that with Python directly, and in this case we have to go straight to the Win32 API. In this case we will use the pywin32 library to access the win32process.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
pip install pywin32
pip install pywin32
pip install pywin32

We will then use the SetProcessAffinityMask function to lock our program to a specific CPU, which will also require grabbing some details from GetCurrentProcess Win32 API function. One thing not really documented anywhere I could find, is the fact you set the CPU core affinity not by it’s actual number, but by the mask itself. Which we can create by taking 2 ** cpu_core.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from itertools import count
import math
import time
import win32process # pywin32
def run_benchmark(cpu_core=0, timeout=60):
start_time = time.perf_counter()
process_id = win32process.GetCurrentProcess()
win32process.SetProcessAffinityMask(process_id, 2 ** cpu_core)
for num in count():
if num % 100_000 == 0.0:
if time.perf_counter() > start_time + timeout:
return
math.sqrt(num)
run_benchmark()
from itertools import count import math import time import win32process # pywin32 def run_benchmark(cpu_core=0, timeout=60): start_time = time.perf_counter() process_id = win32process.GetCurrentProcess() win32process.SetProcessAffinityMask(process_id, 2 ** cpu_core) for num in count(): if num % 100_000 == 0.0: if time.perf_counter() > start_time + timeout: return math.sqrt(num) run_benchmark()
from itertools import count
import math
import time 

import win32process  # pywin32

def run_benchmark(cpu_core=0, timeout=60):
    start_time = time.perf_counter()
    process_id = win32process.GetCurrentProcess()
    win32process.SetProcessAffinityMask(process_id, 2 ** cpu_core)

    for num in count():
        if num % 100_000 == 0.0:
            if time.perf_counter() > start_time + timeout:
                return
        math.sqrt(num)

run_benchmark()

So now every time we run the program, it should only max out the first core. If we want to do this across multiple cores, we will have to create multiple processes at the same time. My favorite way to do that is with multiprocessing maps. So instead of lighting up a single processor, let’s pump them all to the roof.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from itertools import count
import math
import time
from multiprocessing.pool import Pool
from multiprocessing import cpu_count
import win32process # pywin32
def run_benchmark(cpu_core=0, timeout=60):
start_time = time.perf_counter()
process_id = win32process.GetCurrentProcess()
win32process.SetProcessAffinityMask(process_id, 2 ** cpu_core)
for num in count():
if num % 100_000 == 0.0:
if time.perf_counter() > start_time + timeout:
return
math.sqrt(num)
if __name__ == '__main__':
arguments = [(core, 60) for core in range(cpu_count())]
# [(0, 60), (1, 60), (2, 60), (3, 60)]
# each tuple will be as arguments to run_benchmark as its arguments
with Pool(processes=cpu_count()) as p:
p.starmap(run_benchmark, arguments)
run_benchmark()
from itertools import count import math import time from multiprocessing.pool import Pool from multiprocessing import cpu_count import win32process # pywin32 def run_benchmark(cpu_core=0, timeout=60): start_time = time.perf_counter() process_id = win32process.GetCurrentProcess() win32process.SetProcessAffinityMask(process_id, 2 ** cpu_core) for num in count(): if num % 100_000 == 0.0: if time.perf_counter() > start_time + timeout: return math.sqrt(num) if __name__ == '__main__': arguments = [(core, 60) for core in range(cpu_count())] # [(0, 60), (1, 60), (2, 60), (3, 60)] # each tuple will be as arguments to run_benchmark as its arguments with Pool(processes=cpu_count()) as p: p.starmap(run_benchmark, arguments) run_benchmark()
from itertools import count
import math
import time
from multiprocessing.pool import Pool
from multiprocessing import cpu_count

import win32process  # pywin32


def run_benchmark(cpu_core=0, timeout=60):
    start_time = time.perf_counter()
    process_id = win32process.GetCurrentProcess()
    win32process.SetProcessAffinityMask(process_id, 2 ** cpu_core)

    for num in count():
        if num % 100_000 == 0.0:
            if time.perf_counter() > start_time + timeout:
                return
        math.sqrt(num)


if __name__ == '__main__':
    arguments = [(core, 60) for core in range(cpu_count())]
    # [(0, 60), (1, 60), (2, 60), (3, 60)] 
    # each tuple will be as arguments to run_benchmark as its arguments

    with Pool(processes=cpu_count()) as p:
        p.starmap(run_benchmark, arguments)
    run_benchmark()

We have to throw the multiprocessing after the `if __name__ == ‘__main__’: block due to how CPython starts up on Windows. (It’s also just a good idea for any scripts.)

At this point you could also change up which cores it is running on to see how they correspond on the Task Manager. For example, you could change the range increments to only launch on each other core. range(0, cpu_count(), 2)

On my 8 core machine (so 16 logical cores) I can make a quick X shape by selecting certain cores

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
arguments = [(core, 100) for core in [0, 3, 5, 6, 9, 10, 12, 15]]
arguments = [(core, 100) for core in [0, 3, 5, 6, 9, 10, 12, 15]]
arguments = [(core, 100) for core in [0, 3, 5, 6, 9, 10, 12, 15]]

Now remember there are two types of CPU cores according to the operating system, physical and logical. Physical is the number of actual cores on the CPU, but if the CPU has SMT (Simultaneous multi-threading) it will double that. So that means every odd number core is actually a fake one (remember cores start at 0). Which means it has to use some of previous core’s resources. Hence why cores 2, 4, 8 and 14 are showing higher usage.

But what if we want even cooler graphics and don’t want to be limited to just 100% cpu usage? Well then we need to tell the computer to not work for very small amounts of time. Aka sleep. So lets try adding a sleep every 100K ops for oh say, 60 milliseconds.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
def run_benchmark(cpu_core=0, timeout=60):
start_time = time.perf_counter()
process_id = win32process.GetCurrentProcess()
win32process.SetProcessAffinityMask(process_id, 2 ** cpu_core)
for num in count():
if num % 100_000 == 0.0:
if time.perf_counter() > start_time + timeout:
return
time.sleep(0.06)
math.sqrt(num)
def run_benchmark(cpu_core=0, timeout=60): start_time = time.perf_counter() process_id = win32process.GetCurrentProcess() win32process.SetProcessAffinityMask(process_id, 2 ** cpu_core) for num in count(): if num % 100_000 == 0.0: if time.perf_counter() > start_time + timeout: return time.sleep(0.06) math.sqrt(num)
def run_benchmark(cpu_core=0, timeout=60):
    start_time = time.perf_counter()
    process_id = win32process.GetCurrentProcess()
    win32process.SetProcessAffinityMask(process_id, 2 ** cpu_core)

    for num in count():
        if num % 100_000 == 0.0:
            if time.perf_counter() > start_time + timeout:
                return
            time.sleep(0.06)
        math.sqrt(num)

This time I am also going to run it just on physical cores.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
arguments = [(core, 100, 80) for core in range(0, cpu_count(), 2)]
arguments = [(core, 100, 80) for core in range(0, cpu_count(), 2)]
arguments = [(core, 100, 80) for core in range(0, cpu_count(), 2)]

How about that, now it’s using just about 50% usage on each core on my computer. If you’re trying this on your own, this is where the fun begins. I suggest trying out different time offsets to see if you can get a list of times for 10~90% usage. For example, mine is close to:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
usage_to_sleep_time = {
100: 0,
90: 0.014,
80: 0.02,
70: 0.03,
60: 0.04,
50: 0.06,
40: 0.08,
30: 0.1,
20: 0.2,
10: 0.5
}
usage_to_sleep_time = { 100: 0, 90: 0.014, 80: 0.02, 70: 0.03, 60: 0.04, 50: 0.06, 40: 0.08, 30: 0.1, 20: 0.2, 10: 0.5 }
usage_to_sleep_time = {
    100: 0,
    90: 0.014,
    80: 0.02,
    70: 0.03,
    60: 0.04,
    50: 0.06,
    40: 0.08,
    30: 0.1,
    20: 0.2,
    10: 0.5
}

Then lets throw that into the run_benchmark function to be able to set a precise amount of usage per core.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
def run_benchmark(cpu_core=0, timeout=60, usage=100):
start_time = time.perf_counter()
process_id = win32process.GetCurrentProcess()
win32process.SetProcessAffinityMask(process_id, 2 ** cpu_core)
for num in count():
if num % 100_000 == 0.0:
if time.perf_counter() > start_time + timeout:
return
if usage != 100:
time.sleep(usage_to_sleep_time[usage])
math.sqrt(num)
def run_benchmark(cpu_core=0, timeout=60, usage=100): start_time = time.perf_counter() process_id = win32process.GetCurrentProcess() win32process.SetProcessAffinityMask(process_id, 2 ** cpu_core) for num in count(): if num % 100_000 == 0.0: if time.perf_counter() > start_time + timeout: return if usage != 100: time.sleep(usage_to_sleep_time[usage]) math.sqrt(num)
def run_benchmark(cpu_core=0, timeout=60, usage=100):
    start_time = time.perf_counter()
    process_id = win32process.GetCurrentProcess()
    win32process.SetProcessAffinityMask(process_id, 2 ** cpu_core)

    for num in count():
        if num % 100_000 == 0.0:
            if time.perf_counter() > start_time + timeout:
                return
            if usage != 100:
                time.sleep(usage_to_sleep_time[usage])
        math.sqrt(num)

Then if you have enough cores, you can see them all in action at the same time.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
arguments = [(core, 100, usage) for core, usage in enumerate(usage_to_sleep_time)]
arguments = [(core, 100, usage) for core, usage in enumerate(usage_to_sleep_time)]
arguments = [(core, 100, usage) for core, usage in enumerate(usage_to_sleep_time)]

(I was impatient and didn’t do this one while the computer was idle, hence the messy lower ones.)

From here I am sure some of you could go crazy making individual cores ramp up and done, and create something truly spectacular, but my whistle was wetted. It is totally possible to have control over exactly how much CPU core usage is being shown in Task Manager with Python.

Here is the full final code, hope you enjoyed!

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from itertools import count
import math
import time
from multiprocessing.pool import Pool
from multiprocessing import cpu_count
import win32process # pywin32
usage_to_sleep_time = {
100: 0,
90: 0.014,
80: 0.02,
70: 0.03,
60: 0.04,
50: 0.06,
40: 0.08,
30: 0.1,
20: 0.2,
10: 0.5
}
def run_benchmark(cpu_core=0, timeout=60, usage=100):
start_time = time.perf_counter()
process_id = win32process.GetCurrentProcess()
win32process.SetProcessAffinityMask(process_id, 2 ** cpu_core)
for num in count():
if num % 100_000 == 0.0:
if time.perf_counter() > start_time + timeout:
return num
if usage != 100:
time.sleep(usage_to_sleep_time[usage])
math.sqrt(num)
if __name__ == '__main__':
arguments = [(core, 100, usage) for core, usage in enumerate(usage_to_sleep_time)]
with Pool(processes=len(arguments)) as p:
print(p.starmap(run_benchmark, arguments))
from itertools import count import math import time from multiprocessing.pool import Pool from multiprocessing import cpu_count import win32process # pywin32 usage_to_sleep_time = { 100: 0, 90: 0.014, 80: 0.02, 70: 0.03, 60: 0.04, 50: 0.06, 40: 0.08, 30: 0.1, 20: 0.2, 10: 0.5 } def run_benchmark(cpu_core=0, timeout=60, usage=100): start_time = time.perf_counter() process_id = win32process.GetCurrentProcess() win32process.SetProcessAffinityMask(process_id, 2 ** cpu_core) for num in count(): if num % 100_000 == 0.0: if time.perf_counter() > start_time + timeout: return num if usage != 100: time.sleep(usage_to_sleep_time[usage]) math.sqrt(num) if __name__ == '__main__': arguments = [(core, 100, usage) for core, usage in enumerate(usage_to_sleep_time)] with Pool(processes=len(arguments)) as p: print(p.starmap(run_benchmark, arguments))
from itertools import count
import math
import time
from multiprocessing.pool import Pool
from multiprocessing import cpu_count

import win32process  # pywin32

usage_to_sleep_time = {
    100: 0,
    90: 0.014,
    80: 0.02,
    70: 0.03,
    60: 0.04,
    50: 0.06,
    40: 0.08,
    30: 0.1,
    20: 0.2,
    10: 0.5
}

def run_benchmark(cpu_core=0, timeout=60, usage=100):
    start_time = time.perf_counter()
    process_id = win32process.GetCurrentProcess()
    win32process.SetProcessAffinityMask(process_id, 2 ** cpu_core)

    for num in count():
        if num % 100_000 == 0.0:
            if time.perf_counter() > start_time + timeout:
                return num
            if usage != 100:
                time.sleep(usage_to_sleep_time[usage])
        math.sqrt(num)



if __name__ == '__main__':
    arguments = [(core, 100, usage) for core, usage in enumerate(usage_to_sleep_time)]

    with Pool(processes=len(arguments)) as p:
        print(p.starmap(run_benchmark, arguments))