Company logo

Quantlane tech blog

Writing ETL pipelines using asynchronous generators

Every modern data-driven company must somehow process its data, whether it is using an ELT (extract-load-transform) approach or an ETL (extract-transform-load) approach.

ELT pipelines first extract the data from the source, load them to the data storage and then use storage tools to transform it. For example, if we use a database as data storage, we could use standard SQL to do the transformation. On the other hand ETL pipelines, extract the data from the source, transform it and only then load them to the data storage.

In Quantlane we usually use the ETL approach, as we need to join data from multiple systems or apply some complex conditions during the transformation.

In this article, I would like to present our approach to writing ETL pipelines using asynchronous generators in python.

If you are not acquainted with asynchronous generators or generators at all, you can read about them in a previous article.

Writing asynchronous ETL pipelines

Put simply, our ETL pipelines are a composition of asynchronous generators that transform the data in simple steps.

Each of these steps usually has only one purpose...

E.g. Do you need to change the structure of the data, e.g. extract only necessary fields?

> Put it in a separate function (step).

Do you need to map the field to some other data (e.g. translating words, names, ...)?

> Put it in a separate function (step).

Do you need to group the data by some field?

> You guessed it, ... separate function (step).

Now that you know the basics, let's make an ETL pipeline. Let's say we have stored trades from the NASDAQ stock exchange in a database, each trade has a ticker (identifier of security, i.e TSLA (Tesla), AAPL (Apple), MSFT (Microsoft)), a time (when the trade happened), a quantity (number of shares exchanged between two parties), and a price (negotiated price at which the shares were exchanged).

From this data, we would like to create daily trade candles (also known as OHLC - open-high-low-close) aggregates. In short trade candles are simple aggregates containing the price of the first trade, last trade, and the trade with the highest and trade with the lowest price.

So we first need to acquire the data from the database:

async def acquire_data(credentials: dict[str, Any]) -> AsyncIterator[dict[str, Any]]:
    connection = await asyncpg.connect(**credentials)
    async with connection.transaction():
        async for row in connection.cursor(
            '''
            SELECT
                ticker,
                time,
                quantity,
                price
            FROM "trades"
            WHERE
                time BETWEEN $1 AND $2
            ORDER BY time ASC
            ''',
            datetime.datetime(2021, 12, 1),
            datetime.datetime(2021, 12, 31),
        ):
            yield row

So here we have an asynchronous generator that connects to the database, queries it, and yields the data, by using the cursor we "iterate" over the data on the database side. If we wanted, we could implement similar behavior using LIMIT and OFFSET in databases that support it.

In this example, I have used asyncpg database clients which is an asynchronous database client for PostgreSQL.

Now that we have data we need to group it by date and ticker:

Trade: TypeAlias = dict[str, Any]

async def group_by_date_and_ticker(
    stream: AsyncIterator[Trade],
) -> AsyncIterator[list[Trade]]:
    last_observed_date: Optional[datetime.date] = None
    # Data belonging to same date
    # Key here is ticker and value is a list of trades for same date and ticker
    # We use defaultdict so that if key is missing it is added with an empty list value
    grouped_data: dict[str, list[Trade]]= collections.defaultdict(list)

    async for trade in stream:
        # Extract date on which this trade happened
        date = trade['time'].date()
        # In case we have detected a change of dates
        # We yield grouped data that we've stored for the previous date
        if date != last_observed_date:
            for ticker_data in grouped_data.values():
                yield ticker_data
            grouped_data.clear()

        grouped_data[trade['ticker']].append(trade)
        last_observed_date = date

    # After we've iterated over all of the data from input (stream)
    # we need to yield the rest of the grouped data.
    # At this point grouped data should contain data from the last available date
    for ticker_data in grouped_data.values():
        yield ticker_data

In this function we iterate over the rows (trades) and group them by date and ticker; we can do this because we are processing data (trades) ordered in time (see ORDER BY clause in acquire_data query).

If the data would not be ordered we would have to iterate over all the data and yield grouped data at the end. No yielding would be done in async for block. This is because we would not know whether we have received the last trade of the day or if there are some more trades.

Next, we need to aggregate the list of trades into trade candles:

# Thats how the output of the aggregation should look like
class TradeCandle(TypedDict):
    date: datetime.date
    ticker: str
    open: float
    high: float
    low: float
    close: float
    volume: int

async def aggregate_into_trade_candles(
    # Mind the use of "Trade" TypeAlias from the previous code block
    stream: AsyncIterator[list[Trade]]
) -> AsyncIterator[TradeCandle]:
    async for trades in stream:
        # If we receive empty list, skip it
        if not trades:
            continue
        # We compute volume, high and low in a loop so that we iterate
        # over the data only once.
        # We could use something like:
        #   high_price = max(trades, key = operator.itemgetter('price'))
        #   low_price = min(trades, key = operator.itemgetter('price'))
        #   volume = sum(trades, key = operator.itemgetter('quantity'))
        # But that would iterate over the data three times
        # Note that `operator.itemgetter(key)` is same as `lambda trade: trade[key]`
        high_price: Optional[float] = None
        low_price: Optional[float] = None
        volume: int = 0
        for trade in trades:
            if low_price is None or low_price > trade['price']:
                low_price = trade['price']

            if high_price is None or high_price < trade['price']:
                high_price = trade['price']
            volume += trade['quantity']

        yield TradeCandle(
            # Trades still have exact time during the day, we need to extract the date part
            date = trades[0]['time'].date(),
            ticker = trades[0]['ticker'],
            # Price of the first trade
            open = trades[0]['price'],
            high = high_price,
            low = low_price,
            # Price of the last trade
            close = trades[-1]['price']
        )

And now once we have trade candle data, we could either print it out, store it back into the database, send it to a messaging broker, or something else. But for the sake of simplicity, let's just print it out using:

import pprint

async def print_output(stream: AsyncIterator[TradeCandle]) -> None:
    async for trade_candle in stream:
        pprint.pprint(trade_candle)

That is the entire pipeline, now we only have to run it, and what could be simpler:

import asyncio

if __name__ == '__main__':
    asyncio.run(
        print_output(
            aggregate_into_trade_candles(
                group_by_date_and_ticker(
                    acquire_data(
                        credentials = {
                            'host': 'your-database-host',
                            'port': 5432,
                        }
                    )
                )
            )
        )
    )

You can find the entire example code here.

And that is it. That is how we write ETL pipelines using asynchronous generators. Although we have a bit of tooling around, that helps us with logging errors, measuring the throughput of the steps or generic steps that simplifies data publishing to a database, messaging and other outputs. This tooling also allows us to write the pipeline in a more "natural way", like this:

pipeline = (
    acquire_data(...)
    >> group_by_date_and_ticker
    >> aggregate_into_trade_candles
    >> print_output
)
Quantlane Written by Peter Babics on April 21, 2022. We have more articles like this on our blog. If you want to be notified when we publish something about Python or finance, sign up for our newsletter:
Peter Babics

Hello, my name is Peter. I joined Quantlane as a software developer and worked on its infrastructure and simple strategies. Later I moved to the Quantitative Research team, where I continued work on strategies, although this time a bit more complex. And lately, I moved again, this time to the Data Team, where my colleagues and I are building a data empire to support all of Quantlane's needs.

In my free time, I like archery and attending concerts.

You can email me at peter.babics@quantlane.com.