Company logo

Quantlane tech blog

Generators in Python

Basics of synchronous generators

What is a generator, you ask? Imagine you are building something, for example a treehouse and you have a friend to assist you. You are nailing the boards, you have a hammer in one hand and a nail in the other. You could take all the nails, but then you would carry a lot of nails which is a bit inconvenient, and you could also carry more nails than you need. So how about each time you hammer a nail, you ask your friend for another nail so you can continue? This would limit the number of nails you have on you at any time to 1 and you don't have to care about the rest. In this example, your friend is the "generator" as it gives you ("yields") a nail every time you ask it for the next one.

We could write this in Python like:

def very_helpful_friend() -> Iterator[Nail]:
    while True:
        yield Nail()

nail_generator = very_helpful_friend()
while Board_is_stil_loose:
    nail = next(nail_generator)
    hammer(nail, board)

This example is quite simple but you might ask why would you want to use the generator in Python? The simplest example can be a number range generator, if you ever wondered how range function can be implemented in Python, it looks like this:

from typing import Iterator

def range(start: int, stop: int, step: int = 1) -> Iterator[int]:
    value = start
    while value < stop:
        yield value
        value = value + step

But this is quite trivial, so what else can we do with generators? How about some exponential delay generator?

An exponential delay generator is a generator that gives you a delay time which is exponentially increasing as you request another value. This generator is typically used in distributed systems for delaying retry requests.

We can implement such generator like:

def next_delay(initial_value: float, max_value: float, exponent: float = 1.1) -> Iterator[float]:
    value = initial_value
    while True:
        yield value
        value = min(max_value, value * exponent)

This function starts with a delay of initial_value and in each iteration, it increases the last delay by the exponent (by using 1.1 we increase it by 10% each time), up until the max_value which represents the maximum delay this function can yield.

Sample usage:

generator = next_delay(1, 2)

next(generator)
# 1

next(generator)
# 1.1

next(generator)
# 1.2100000000000002

next(generator)
# 1.3310000000000004

next(generator)
# 1.4641000000000006

next(generator)
# 1.6105100000000008

next(generator)
# 1.771561000000001

next(generator)
# 1.9487171000000014

next(generator)
# 2

next(generator)
# 2

As you can see, the first call to next(generator) returns the initial_value, and then every next(generator) call returns a value that is increased 1.1 times to the previous, up until 2 which is the max_value.

Note that generators can also call other functions, a prime example would be map function in Python, we could be implemented as

from typing import Callable, Iterator, TypeVar

InputType = TypeVar('InputType')
OutputType = TypeVar('OutputType')

def map(
    function: Callable[[InputType], OutputType],
    data: Iterator[InputType]
) -> Iterator[OutputType]:
    for item in data:
        yield function(item)

This function calls the function on each item in data and yields the output of the call.

Hopefully, at this point, you know how to write a simple generator; but you may ask why would you need them or when to use them?

Generators are best suited in cases when you work with a long sequence, from the above examples, using range with high numbers like 1,000,000,000, using map either with a long data sequence or expensive function call (i.e function call takes a lot of memory), or using next_delay without prior knowledge of how many times you need to call next(generator) (i.e. you are trying to get some data from a website but it keeps returning error code caused by some request rate-limiting).

for delay in next_delay(1, 5):
    response = requests.get('https://example.com')

    try:
        response.raise_for_status()
    except Exception:
        # In case request fails (does not return status code 200)
        # we wait before retrying the request
        time.sleep(delay)
    else:
        break
# Response processing
...

Next, I would like to introduce... asynchronous generators!

Asynchronous generators

At this point, you might ask "what the heck are asynchronous generators?". Simply put these are generators written in asynchronous functions (instead of def function(..) they use async def function(..)

So to convert next_delay function from previous example we just add async keyword before def. Like this:

async def next_delay(initial_value: float, max_value: float, exponent: float = 1.1) -> Iterator[float]:
    value = initial_value
    while True:
        yield value
        value = min(max_value, value * exponent)

And thus we've created our first asynchronous generator.

The main advantage of asynchronous generators is their asynchronous part, by this I mean that you can call coroutines and use other asynchronous generators.

For example, let say we want to download data from some webpage, this data is available on a per-year basis and we would like to download the last 5 years of the data. The order of the data does not matter.

In a synchronous Python you would do something like this:

def sync_get():
    for year in range(2016, 2023):
        # Mind the use of an f-string to add the year into the URL.
        response = requests.get(f'https://my.data.source/?year={year}')
        yield response.json()

This would download data in a year-by-year manner, whilst always waiting for the current request to finish before starting another one.

In asynchronous Python we could send multiple requests at once and process them in the order they are retrieved:

from typing import AsyncIterator, Any
import asyncio

import aiohttp


async def fetch_site(session: aiohttp.ClientSession, year: int) -> dict[str, Any]:
    async with session.get(f'https://my.data.source/?year={year}') as response:
        return await response.json()


async def async_get() -> AsyncIterator[dict[str, Any]]:
    # Initialize aiohttp ClientSession
    async with aiohttp.ClientSession() as session:
        # Create downloading tasks
        pending_requests = [
            asyncio.create_task(
                fetch_site_data(session, year)
            )
            for year in range(2016, 2023)
        ]
        # Process downloading tasks
        while pending_requests:
            # Wait for next completed download task
            # Note that return value is tuple of [finished tasks, pending tasks]
            # so this should "move" tasks from `pending_requests` to `done` once any task finishes
            done, pending_requests = asyncio.wait(pending_requests, return_when = asyncio.FIRST_COMPLETED)
            # Yield the retrieved data
            for task in done:
                yield task.result()

So this is a downloader that downloads multiple data concurrently.

Now we would like to process the data, again in synchronous version we would do this like:

def sync_process(input_data: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
    for data in input_data:
        yield {
            'year': data['year'],
            'value': data['value'] * data['other_value'],
        }

process_sync function takes one argument and that is something that can be iterated over, and for each element of the input data, it extracts fields we are interested in and mutates them.

Asynchronous version of this could look like this:

async def async_process(input_data: AsyncIterable[dict[str, Any]]) -> AsyncIterator[dict[str, Any]]:
    async for data in input_data:
        yield {
            'year': data['year'],
            'value': data['value'] * data['other_value'],
        }

The only difference between sync_process and async_process is the fact that async_process iterates over the data asynchronously and so it could use async_get as input: async_process(async_get()).

Now let's look at how we can compose generators in a chain.

Composing generators

The nice thing about generators both synchronous and asynchronous is that you can compose them.

For previous examples we could do a data processing pipeline like this:

for processed_data in sync_process(
    sync_get_data()
):
    print(processed_data)

Similarly asynchronous version:

async for processed_data in async_process(
    async_get_data()
):
    print(processed_data)

Now let's imagine that instead of a fixed range of years in *_get_data, both of these functions have input arguments that we would iterate over to get the years we are interested in.

If we just move the range generator out and use the argument, previous examples would look like this:

for processed_data in sync_process_data(
    sync_get_data(
        range(2016, 2023)
    )
):
    print(processed_data)

My point here is that you can compose more than two functions and generate complex transformations by using simple functions that change the data in steps.

Quantlane Written by Peter Bábics on March 3, 2022. We have more articles like this on our blog. If you want to be notified when we publish something about Python or finance, sign up for our newsletter:
Peter Bábics

Hello, my name is Peter. I joined Quantlane as a software developer and worked on its infrastructure and simple strategies. Later I moved to the Quantitative Research team, where I continued work on strategies, although this time a bit more complex. And lately, I moved again, this time to the Data Team, where my colleagues and I are building a data empire to support all of Quantlane's needs.

In my free time, I like archery and attending concerts.

You can email me at peter.babics@quantlane.com.