Company logo

Quantlane tech blog

Design your app using the pub-sub pattern with aiopubsub

In the design of our applications, we have tried to decouple all the parts as much as possible. This makes sense to us because when components do not depend on each other, we can easily substitute or change them. The development process is then much faster if we can make changes to isolated parts of the code.

Ok, decoupling sounds like a good idea, but how do we achieve that? In the messaging world (which is quite close to us) there is the publish-subscribe pattern. It separates producers and consumers of messages and inserts a simple interface between them. A producer publishes a message with a certain key and does care about what happens to it afterward. On the other side, a consumer subscribes to a given key (or a set of them), receives messages corresponding with the key, and does not need to know a thing about who produced it. If you want to get a more hands-on explanation, you can take a look at RabbitMQ (which we use as our message broker) tutorials on publish/subscribe and topics.

The publish-subscribe pattern is a simple, but a mighty pattern that helps us design our system. We liked the idea so much that we created a simple library named aiopubsub that is going to be introduced now. From the prefix aio you can guess that it is based on asyncio. This is because most of our applications run on asyncio and it pairs perfectly with the publish-subscribe pattern. Let's see it in action.

Basic example

First, we need to install it:

$ pip install aiopubsub

To use the library, we need to initialize a aiopubsub.Hub, which will provide the basic infrastructure needed for aiopubsub:

import aiopubsub

hub = aiopubsub.Hub()

Data

Now, we should agree on what data we want to send through aiopubsub. We are a trading company, so one of the most fundamental structures in our domain is a trade. A trade is a record of a transaction that took place on an exchange: one subject sold a certain quantity of shares, at a given price, at a specific time, to another subject. It can look like this:

import dataclasses
import decimal

@dataclasses.dataclass
class Trade:
    timestamp: float
    quantity: int
    price: decimal.Decimal

It is better to send structured data, and not just some dicts, because then the data have some schema and we can easily use type checking and we know exactly what the data should look like. Another thing worth mentioning is that we use decimal for a price to avoid loss of precision (see What Every Programmer Should Know About Floating-Point Arithmetic for more details). We can then instantiate the trade with some dummy values in the following way:

trade = Trade(timestamp = 123.5, quantity = 56, price = decimal.Decimal('1639.43'))

Key

Another important thing we need to know about is aiopubsub.Key. The key is just a regular tuple that describes the accompanying message with one important feature; we can use an asterisk to describe a set of keys. As an example both of:

key = aiopubsub.Key('com', 'quantlane')
key = aiopubsub.Key('com', 'google')

are a subset of:

key = aiopubsub.Key('com', '*')

Subscriber

Now, to be able to receive the data, we usually use callback style; we provide a function that should be called when we obtain a message with a certain key:

1
2
3
4
5
6
async def on_trade(key: aiopubsub.Key, trade: Trade) -> None:
    print(f'Processing trade = {trade}  with key = {key}.')

subscriber = aiopubsub.Subscriber(hub, 'trades')
subscribe_key = aiopubsub.Key('*', 'trade', '*')
subscriber.add_async_listener(subscribe_key, on_trade)

Let's analyze this piece of code:

  1. Define a function that is called when a new message arrives. Here it just pretends that it processes the trade.
  1. Create a new subscriber that will enable you to obtain the data. Its arguments are the hub and identifier 'trade' used only for logging purposes.
  2. Define a key to subscribe to. We don't use only exact keys like aiopubsub.Key('NASDAQ', 'trade', 'GOOGL') but the asterisk notation for here. aiopubsub.Key('*', 'trade', '*') says that we want trades no matter from what publisher and of what stock.
  3. Now put it all together. Add asynchronous listener (because the function is async); this means that the function on_trade will be called when there will be a message with a key that is a subset of our subscribe_key.

Publisher

We have data that we want to send, and we have a subscriber listening to the data, so let's start sending something:

1
2
3
publisher = aiopubsub.Publisher(hub, prefix = aiopubsub.Key('NASDAQ'))
publish_key = aiopubsub.Key('trade', 'GOOGL')
publisher.publish(publish_key, trade)

Ok let's explain this line by line:

  1. We instantiate a publisher first. It takes the hub we created earlier and prefix argument. prefix is the first part of a key associated with a message published used to identify the publisher. Here it identifies an exchange where the trade happened (Nasdaq).
  2. We create a key that identifies a message. Here it is a trade made on shares of Alphabet Inc. ('GOOGL').
  3. We publish the trade under this key.

If we run this with asyncio loop running we should see the following output:

Processing trade = Trade(timestamp=123.5, quantity=56, price=Decimal('1639.43'))  with key = ('NASDAQ', 'trade', 'GOOGL').

We see that the message from the publisher reached the listener in the subscriber. Yay! As you can see in the following picture, we have only one publisher that publishes one specific message from one stock exchange. The subscriber then listens to all trade messages.

Sync vs async listener

In the last example, you could see that we used add_async_listener; it also has its counterpart add_sync_listener. There is only a small difference between these two functions: add_async_listeners expects a coroutine callback which is awaited, while add_sync_listener expects a normal function, which is called immediately. Except that one is used for async functions and the other for sync functions it brings another subtle detail: an async function is executed eventually while a sync function is executed immediately and a publisher waits until all of its sync subscribers processed the message. Sync listeners thus weaken a decoupling a bit.

Expanding the example

The previous example may seem quite lengthy in what it does; it only calls on_trade when there is a new trade. We don't need such instrumentation to do that, but the fun starts when we add more publishers and subscribers.

We can add a subscription to the existing subscriber that will read only trades from NYSE:

1
2
3
4
5
async def on_nyse_trade(key: aiopubsub.Key, trade: Trade) -> None:
    print(f'Processing trade = {trade}  with key = {key} that happened in NYSE')

subscribe_key_nyse = aiopubsub.Key('NYSE', 'trade', '*')
subscriber.add_async_listener(subscribe_key, on_nyse_trade)

Now add a publisher that will produce Google Inc. shares from another exchange: New York Stock Exchange (NYSE):

1
2
3
4
trade_nyse = Trade(timestamp = 127.45, quantity = 67, price = decimal.Decimal('1639.44'))

publisher_nyse = aiopubsub.Publisher(hub, prefix = aiopubsub.Key('NYSE'))
publisher_nyse.publish(aiopubsub.Key('trade', 'GOOGL'), trade_nyse)

This can be a completely separate code, but we have automatically ensured that all messages will end in our on_trade listener. We don't need to have the function available here; we don't need to know who exactly listens. It's decoupled.

We have a few lines of code added, but now we have an architecture where there are two publishers from two stock exchanges. The function on_trade is called on every trade while on_nyse_trade only on a trade from NYSE. And we can make it more and more complicated as our use-case requires.

We can easily add new stock exchanges, other company shares, or new types of data. Everything is in one system and we know that the message will reach the desired destination.

The following image captures the situation after several additions of new publishers and subscribers. A Subscriber still listens to all trades (from NASDAQ Publisher and NYSE Publisher). NYSE Subscriber listens only to trades from NYSE Publisher.

Summary

We have presented the architecture pattern, publish-subscribe, that is used to decouple producers and consumers in messaging brokers. We utilize it inside our applications by using the package aiopusub because it gives us great flexibility in managing the vast amounts of different data we process. We have presented a simple example of usage that can be easily expanded according to the data we want to process.

We use aiopubsub in our applications that process real-time data, and it allows us to keep concerns of each code apart, separated from others, and helps us in code reuse.

You can see the full code of our example here:

 1 import asyncio
 2 import dataclasses
 3 import decimal
 4 
 5 import aiopubsub
 6 
 7 
 8 @dataclasses.dataclass
 9 class Trade:
10     timestamp: float
11     quantity: int
12     price: decimal.Decimal
13 
14 
15 async def on_trade(key: aiopubsub.Key, trade: Trade) -> None:
16     print(f'Processing trade = {trade}  with key = {key}.')
17 
18 
19 async def on_nyse_trade(key: aiopubsub.Key, trade: Trade) -> None:
20     print(f'Processing trade = {trade}  with key = {key} that happened in NYSE')
21 
22 
23 async def main():
24     # create an aiopubsub hub
25     hub = aiopubsub.Hub()
26 
27     # create a sample of data to send
28     trade = Trade(timestamp = 123.5, quantity = 56, price = decimal.Decimal('1639.43'))
29 
30     # subscriber listens on every trade and calls the `on_trade` function
31     subscriber = aiopubsub.Subscriber(hub, 'trades')
32     subscribe_key = aiopubsub.Key('*', 'trade', '*')
33     subscriber.add_async_listener(subscribe_key, on_trade)
34 
35     # publisher has a NASDAQ prefix and sends the trade that happened on Google stock
36     publisher = aiopubsub.Publisher(hub, prefix = aiopubsub.Key('NASDAQ'))
37     publish_key = aiopubsub.Key('trade', 'GOOGL')
38     publisher.publish(publish_key, trade)
39 
40     # sleep so the event loop can process the action
41     await asyncio.sleep(0.001)
42 
43     # expected output:
44     # Processing trade = Trade(timestamp=123.5, quantity=56, price=Decimal('1639.43'))  with key = ('NASDAQ', 'trade', 'GOOGL').
45 
46     # sample from another stock exchange
47     trade_nyse = Trade(timestamp = 127.45, quantity = 67, price = decimal.Decimal('1639.44'))
48 
49     # subscribe only for the NYSE exchange
50     subscribe_key_nyse = aiopubsub.Key('NYSE', 'trade', '*')
51     subscriber.add_async_listener(subscribe_key_nyse, on_nyse_trade)
52 
53     # publish NYSE trade
54     publisher_nyse = aiopubsub.Publisher(hub, prefix = aiopubsub.Key('NYSE'))
55     publisher_nyse.publish(aiopubsub.Key('trade', 'GOOGL'), trade_nyse)
56 
57     # sleep so the event loop can process the action
58     await asyncio.sleep(0.001)
59 
60     # expected output:
61     # Processing trade = Trade(timestamp=127.45, quantity=67, price=Decimal('1639.44'))  with key = ('NYSE', 'trade', 'GOOGL').
62     # Processing trade = Trade(timestamp=127.45, quantity=67, price=Decimal('1639.44'))  with key = ('NYSE', 'trade', 'GOOGL') that happened in NYSE
63 
64     # clean the subscriber before the end of the program
65     await subscriber.remove_all_listeners()
66 
67 
68 
69 if __name__ == '__main__':
70     asyncio.run(main())
Quantlane Written by Petr Šebek on October 29, 2020. We have more articles like this on our blog. If you want to be notified when we publish something about Python or finance, sign up for our newsletter:
Petr Šebek

Hey, my name is Petr. I started working at Quantlane as a programmer and later led the development team. I currently focus on big architectural issues in a small specialised team.

My main focus is on messaging systems, performant data ingestion, and everything that needs to be optimized on the backend. I'm interested in complex systems, physics, and arts.