Every modern data-driven company must somehow process its data, whether it is using an ELT (extract-load-transform) approach or an ETL (extract-transform-load) approach.
ELT pipelines first extract the data from the source, load them to the data storage and then use storage tools to transform it. For example, if we use a database as data storage, we could use standard SQL to do the transformation. On the other hand ETL pipelines, extract the data from the source, transform it and only then load them to the data storage.
In Quantlane we usually use the ETL approach, as we need to join data from multiple systems or apply some complex conditions during the transformation.
In this article, I would like to present our approach to writing ETL pipelines using asynchronous generators in python.
If you are not acquainted with asynchronous generators or generators at all, you can read about them in a previous article.
Writing asynchronous ETL pipelines
Put simply, our ETL pipelines are a composition of asynchronous generators that transform the data in simple steps.
Each of these steps usually has only one purpose...
E.g. Do you need to change the structure of the data, e.g. extract only necessary fields?
> Put it in a separate function (step).
Do you need to map the field to some other data (e.g. translating words, names, ...)?
> Put it in a separate function (step).
Do you need to group the data by some field?
> You guessed it, ... separate function (step).
Now that you know the basics, let's make an ETL pipeline. Let's say we have stored trades from the NASDAQ stock exchange in a database, each trade has a ticker (identifier of security, i.e TSLA (Tesla), AAPL (Apple), MSFT (Microsoft)), a time (when the trade happened), a quantity (number of shares exchanged between two parties), and a price (negotiated price at which the shares were exchanged).
From this data, we would like to create daily trade candles (also known as OHLC - open-high-low-close) aggregates. In short trade candles are simple aggregates containing the price of the first trade, last trade, and the trade with the highest and trade with the lowest price.
So we first need to acquire the data from the database:
async def acquire_data(credentials: dict[str, Any]) -> AsyncIterator[dict[str, Any]]: connection = await asyncpg.connect(**credentials) async with connection.transaction(): async for row in connection.cursor( ''' SELECT ticker, time, quantity, price FROM "trades" WHERE time BETWEEN $1 AND $2 ORDER BY time ASC ''', datetime.datetime(2021, 12, 1), datetime.datetime(2021, 12, 31), ): yield row
So here we have an asynchronous generator that connects to the database, queries it, and yields the data, by using the cursor we "iterate" over the data on the database side. If we wanted, we could implement similar behavior using LIMIT and OFFSET in databases that support it.
In this example, I have used asyncpg database clients which is an asynchronous database client for PostgreSQL.
Now that we have data we need to group it by date and ticker:
Trade: TypeAlias = dict[str, Any] async def group_by_date_and_ticker( stream: AsyncIterator[Trade], ) -> AsyncIterator[list[Trade]]: last_observed_date: Optional[datetime.date] = None # Data belonging to same date # Key here is ticker and value is a list of trades for same date and ticker # We use defaultdict so that if key is missing it is added with an empty list value grouped_data: dict[str, list[Trade]]= collections.defaultdict(list) async for trade in stream: # Extract date on which this trade happened date = trade['time'].date() # In case we have detected a change of dates # We yield grouped data that we've stored for the previous date if date != last_observed_date: for ticker_data in grouped_data.values(): yield ticker_data grouped_data.clear() grouped_data[trade['ticker']].append(trade) last_observed_date = date # After we've iterated over all of the data from input (stream) # we need to yield the rest of the grouped data. # At this point grouped data should contain data from the last available date for ticker_data in grouped_data.values(): yield ticker_data
In this function we iterate over the rows (trades) and group them by date and ticker; we can do this because we are processing data (trades) ordered in time (see ORDER BY clause in acquire_data query).
If the data would not be ordered we would have to iterate over all the data and yield grouped data at the end. No yielding would be done in async for block. This is because we would not know whether we have received the last trade of the day or if there are some more trades.
Next, we need to aggregate the list of trades into trade candles:
# Thats how the output of the aggregation should look like class TradeCandle(TypedDict): date: datetime.date ticker: str open: float high: float low: float close: float volume: int async def aggregate_into_trade_candles( # Mind the use of "Trade" TypeAlias from the previous code block stream: AsyncIterator[list[Trade]] ) -> AsyncIterator[TradeCandle]: async for trades in stream: # If we receive empty list, skip it if not trades: continue # We compute volume, high and low in a loop so that we iterate # over the data only once. # We could use something like: # high_price = max(trades, key = operator.itemgetter('price')) # low_price = min(trades, key = operator.itemgetter('price')) # volume = sum(trades, key = operator.itemgetter('quantity')) # But that would iterate over the data three times # Note that `operator.itemgetter(key)` is same as `lambda trade: trade[key]` high_price: Optional[float] = None low_price: Optional[float] = None volume: int = 0 for trade in trades: if low_price is None or low_price > trade['price']: low_price = trade['price'] if high_price is None or high_price < trade['price']: high_price = trade['price'] volume += trade['quantity'] yield TradeCandle( # Trades still have exact time during the day, we need to extract the date part date = trades[0]['time'].date(), ticker = trades[0]['ticker'], # Price of the first trade open = trades[0]['price'], high = high_price, low = low_price, # Price of the last trade close = trades[-1]['price'] )
And now once we have trade candle data, we could either print it out, store it back into the database, send it to a messaging broker, or something else. But for the sake of simplicity, let's just print it out using:
import pprint async def print_output(stream: AsyncIterator[TradeCandle]) -> None: async for trade_candle in stream: pprint.pprint(trade_candle)
That is the entire pipeline, now we only have to run it, and what could be simpler:
import asyncio if __name__ == '__main__': asyncio.run( print_output( aggregate_into_trade_candles( group_by_date_and_ticker( acquire_data( credentials = { 'host': 'your-database-host', 'port': 5432, } ) ) ) ) )
You can find the entire example code here.
And that is it. That is how we write ETL pipelines using asynchronous generators. Although we have a bit of tooling around, that helps us with logging errors, measuring the throughput of the steps or generic steps that simplifies data publishing to a database, messaging and other outputs. This tooling also allows us to write the pipeline in a more "natural way", like this:
pipeline = ( acquire_data(...) >> group_by_date_and_ticker >> aggregate_into_trade_candles >> print_output )