Blog |

Caching Database Queries in SQLAlchemy – Part 1/2

Caching Database Queries in SQLAlchemy – Part 1/2
Table of Contents

The database is one of the most critical components here at Rollbar and its performance ripples across most of our SLOs. One of our goals last months has been to remove unnecessary or repetitive load from it as much as possible.
Caching queries is the 101 solution whenever you need to keep scaling up while maintaining, or even reducing, database costs (very relevant these days).

Here’s an example of how just one query has been drastically reduced thanks to caching.

Caching results

This is the first entry of a two-part blog explaining how we cache database ORM queries using Python's SQLAlchemy v1.3 library inside Rollbar. Note the version we’re using, it’s important because the mechanism for intercepting SQLAlchemy queries has changed a bit between versions 1.3 and 1.4.

In this first blog post we will talk about:

  1. Caching challenges at Rollbar
  2. Caching strategy
  3. Serialization format
  4. Caching with SQLAlchemy
    1. Intercepting SQLAlchemy queries
    2. Passing options to queries with Mapper Options

Part two, which will be a following blog post, will describe how we make usage in Rollbar system of the caching strateggies we discussed in this part one

Caching challenges at Rollbar

Caching is a hard problem. Among all the things you must consider is cache invalidation, not invalidation as in:

“I’m ok with eventual consistency”

but as in:

“I have refactored my code and my new model representation is incompatible with cached values, so I better don’t read stale data from cache or my application will be failing until TTL expires”

On top of that by caching data you can easily introduce huge security and privacy issues, for example, you may end up reading another account’s information as a result of a cache key collision.

Caching SQLAlchemy queries is kind of a niche problem, there’s not too much information about it, and depending on your use case, you might even end up finding a bug in the library just as we did.

Finally, the main reason we bother caching using SQLAlchemy, and not at a top layer around our models is that some of our consumer code depends directly on our model layer, not only that, but this consumer code is quite old and cumbersome to modify at the moment.

Caching Strategy

There exist several caching strategies to pick from. In our case and for simplicity, we used:

  • Cache Aside (Lazy Loading), when reading. It means:

    1. Before reading from the database, check if the data exists in the cache
    2. If the data does exist, use it, otherwise query the database
    3. Update the cache with the database result before returning it to the caller
  • Write Around, when writing. It means:

    1. Write queries always updates the database
    2. But affected models are invalidated to minimize stale reads

Serialization Format

For the serialization format we decided to use Python’s Pickle for a few reasons:

  1. SQLAlchemy's official tutorial uses it
  2. The serialized payload not only contains the model data, but also metadata such as how to lazy load relationships (more on this in the second part)
  3. In our case it was more performant that Python's JSON serialization

Still, Pickle serialization is not secure and subject to RCE attacks, even in our case where we trust our Memcached instances, is better to keep that trust to the minimum, so additionally we use a message authentication code (MAC) which is beyond the scope of this tutorial.

Caching with SQLAlchemy

You can find the entire tutorial code in GitHub.

For each step we write a test case that validates the behavior we are looking for.

Finally, we work with two basic models: User and Address, with a one-to-many relationship between them.

Intercepting SQLAlchemy queries

First let’s find out how we can intercept and execute our caching logic when performing queries such as:

  • Queries by identity/id: session.query(User).get(123)
  • Queries that use filtering: session.query(User).filter(User.is_active==True).all()

In order to intercept those queries in SQLAlchemy v1.3 we need to implement a custom Query class. We begin with inheriting from the default query class:

 
from sqlalchemy import orm

class CachingQuery(orm.Query):
    pass

And tell SQLAlchemy that it should use that class instead of the default one when calling session.query() factory. We do this by passing the query_cls parameter when creating a Session instance:

 
from sqlalchemy.orm import sessionmaker
from caching_tutorial.cache import  CachingQuery

Session = sessionmaker(query_cls=CachingQuery)

With that in place we can write an initial test case that verifies that SQLAlchemy is indeed using our CachingQuery factory:

 
from caching_tutorial.cache import CachingQuery
from caching_tutorial.models import *

def test_query_instance_is_caching_query(session):
    query = session.query(User)
    assert isinstance(query, CachingQuery)

And indeed the test passes:

 ❯ pytest -v
========================================================================= test session starts ==========================================================================
collected 1 items

tests/caching_query_test.py::test_query_instance_is_caching_query PASSED                                                                                         [ 100%]

========================================================================== 1 passed in 0.03s ===========================================================================

Reading from an in-memory cache

We can now safely intercept any query and read from an in-memory cache, all we need to do is to override Query.__iter__() method (even Query.get() is routed via Query.__iter__()).

Let’s update our CachingQuery like this:

 
class CachingQuery(orm.Query):
    def __init__(self, *args, cache=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.cache = cache or {}

    def __iter__(self):
        # Cache-aside caching pattern
        cache_key = self.cache_key()
        result = self.cache.get(cache_key)

        if result is None:
            result = list(super().__iter__())
            self.cache[cache_key] = result

        return iter(result)

    def cache_key(self):
        "Use the query's SQL statement and parameters as the cache key"
        stmt = self.with_labels().statement
        compiled = stmt.compile()
        params = compiled.params
        cache_key = " ".join([str(compiled)] + [str(params[k]) for k in sorted(params)])

        return cache_key

And we can validate our implementation by writing a new test:

 
def test_query_results_from_cache(session):
    query = session.query(User)
    cache_key = query.cache_key()

    cached_user = User(id=1, name="John Wick")
    query.cache[cache_key] = [cached_user]

    user = query.one()

    assert user.id == 1
    assert user.name == "John Wick"
 ❯ pytest -v
========================================================================= test session starts ==========================================================================
collected 2 items

tests/caching_query_test.py::test_query_instance_is_caching_query PASSED                                                                                         [ 50%]
tests/caching_query_test.py::test_query_results_from_cache PASSED                                                                                                [ 100%]

========================================================================== 2 passed in 0.03s ===========================================================================

It seems that we got it all working, but our in-memory cache shouldn’t store Python objects, it should store bytes since this is going to be the behavior when using another service such as Memcached/Redis, so let’s fix that by introducing Pickling.

Update CachingQuery.__iter__() :

 
class CachingQuery(orm.Query):
    def __init__(self, *args, cache=None, **kwargs):
        …

    def __iter__(self):
        cache_key = self.cache_key()
        raw_data = self.cache.get(cache_key)

        if raw_data is None:
            result = list(super().__iter__())
            self.cache[cache_key] = pickle.dumps(result)
        else:
            result = pickle.loads(raw_data)

        return iter(result)

    def cache_key(self):
        …
 

Update our test:

 
def test_query_results_from_cache(session):
    query = session.query(User)
    cache_key = query.cache_key()

    cached_user = User(id=1, name="John Wick")
    query.cache[cache_key] = pickle.dumps([cached_user])

    user = query.one()

    assert user.id == 1
    assert user.name == "John Wick"
 

The issue with detached instances

There’s a corner case we haven’t covered: relationships. When reading back our model instances from the cache, those instances should be bound to the current session in order to be able to further lazy-load relationships from the database.

Whenever we read models from the database using the ORM, the instances are given back in the persistent state. But if we pickle and then unpickle those instances, their state transitions to detached.

Let’s create a new test case covering this:

 
def test_db_instance_state_after_pickling(session):
    session.add(User(id=1, name="John Wick"))
    session.commit()

    user = session.query(User).one()

    assert sa.inspect(user).persistent

    raw_data = pickle.dumps(user)
    cached_user = pickle.loads(raw_data)

    assert sa.inspect(cached_user).detached
 
 ❯ pytest -v
========================================================================= test session starts ==========================================================================
collected 3 items

tests/caching_query_test.py::test_query_instance_is_caching_query PASSED                                                                                         [ 33%]
tests/caching_query_test.py::test_db_instance_state_after_pickling PASSED                                                                                        [ 66%]
tests/caching_query_test.py::test_query_results_from_cache PASSED                                                                                                [ 100%]

========================================================================== 3 passed in 0.03s ===========================================================================

With this information we’re ready to update our test_query_results_from_cache case to assert that the user has no associated addresses, making sure first that the instance we store in the cache as a way of mocking, replicates the right behavior and stores it in the detached state:

 
def test_query_results_from_cache(session):
    query = session.query(User)

    cached_user = User(id=1, name="John Wick")
    make_transient_to_detached(cached_user)
    cache_key = query.cache_key()
    query.cache[cache_key] = pickle.dumps([cached_user])

    user = query.one()

    assert user.id == 1
    assert user.name == "John Wick"
    assert len(user.addresses) == 0
 

This time our test fails! It is stating we cannot use a detached instance to query the database since instances in detached state are not bound to a session.

 
self._invoke_raise_load(state, passive, "raise")

        session = _state_session(state)
        if not session:
            if passive & attributes.NO_RAISE:
                return attributes.PASSIVE_NO_RESULT

>           raise orm_exc.DetachedInstanceError(
                "Parent instance %s is not bound to a Session; "
                "lazy load operation of attribute '%s' cannot proceed"
                % (orm_util.state_str(state), self.key)
            )
E           sqlalchemy.orm.exc.DetachedInstanceError: Parent instance User at 0x1047f4a90 is not bound to a Session; lazy load operation of attribute 'addresses' cannot proceed (Background on this error at: http://sqlalche.me/e/13/bhk3)

In order to solve this problem we need to merge instances read from the cache into the query associated session using Query.merge_result(iterator, load=False) Note the load=False part, this is the bit that let us tell SQLAlchemy to trust our data and to not perform any query against the database.

Updating again our CachingQuery solves the issue:

 
class CachingQuery(orm.Query):
    def __init__(self, *args, cache=None, **kwargs):
        …

    def __iter__(self):
        cache_key = self.cache_key()
        raw_data = self.cache.get(cache_key)

        if raw_data is None:
            result = list(super().__iter__())
            self.cache[cache_key] = pickle.dumps(result)
        else:
            result = pickle.loads(raw_data)

        return self.merge_result(result, load=False)

    def cache_key(self):
        …
 
 ❯ pytest -v
========================================================================= test session starts ==========================================================================
collected 3 items

tests/caching_query_test.py::test_query_instance_is_caching_query PASSED                                                                                         [ 33%]
tests/caching_query_test.py::test_db_instance_state_after_pickling PASSED                                                                                        [ 66%]
tests/caching_query_test.py::test_query_results_from_cache PASSED                                                                                                [ 100%]

========================================================================== 3 passed in 0.03s ===========================================================================

Writing and then reading from an in-memory cache

Let’s end this section by writing another test case for the entire cache-miss/cache-hit flow:

 
def test_query_results_into_and_from_cache(session):
    # Let's populate the database with some data
    user = User(id=1, name="John Wick")
    session.add(user)
    session.commit()

    query = session.query(User)
    cache_key = query.cache_key()

    # The cache is empty
    assert cache_key not in query.cache

    user = query.one()

    # The cache is filled with the query results
    assert cache_key in query.cache

    # Let's validate that the query don't hit the database
    # if the cache is filled
    query.error_on_cache_miss = True
    user = query.one()

    assert user.id == 1
    assert user.name == "John Wick"
    assert len(user.addresses) == 0
 

Notice that we also introduced an error_on_cache_miss property in CachingQuery in order to easily test if we are hitting the database when we don’t expect it:

 
class CachingQuery(orm.Query):
    def __init__(self, *args, cache=None, **kwargs):
       …
        self.error_on_cache_miss = False

    def __iter__(self):
        …
        if raw_data is None:
            if self.error_on_cache_miss:
                raise CacheMissError("Cache miss for key: {}".format(cache_key))
        …

    def cache_key(self):
        …
 

And it works as expected:

 ❯ pytest -v
========================================================================= test session starts ==========================================================================
collected 4 items

tests/caching_query_test.py::test_query_instance_is_caching_query PASSED                                                                                         [ 25%]
tests/caching_query_test.py::test_db_instance_state_after_pickling PASSED                                                                                        [ 50%]
tests/caching_query_test.py::test_query_results_from_cache PASSED                                                                                                [ 75%]
tests/caching_query_test.py::test_query_results_into_and_from_cache PASSED                                                                                       [ 100%]

========================================================================== 4 passed in 0.03s ===========================================================================

Passing options to queries with Mapper Options

Source tree for this section.

Up to this point everything works fine but there’s a missing piece in our puzzle: our CachingQuery is very coupled to the caching client/store, also the cache key is automatically derived from the query but would be convenient to also be able to specify one manually. The way we can inject configuration options to a given query is by using the Query.options() method and passing a custom Mapper Option instance.

Let’s first refactor our CachingQuery and move all the caching logic to a separate InMemoryCacheStrategy:

 
from sqlalchemy.orm import Query
from .cache_strategies import InMemoryCacheStrategy

class CachingQuery(Query):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.caching_strategy = InMemoryCacheStrategy()

    def __iter__(self):
        return self.caching_strategy.exec(super())

And the new caching strategy will look just as the code we had before, only this time it’s parameterized with a query argument:

 
class InMemoryCacheStrategy:
    def __init__(self):
        self.cache = {}
        self.error_on_cache_miss = False

    def exec(self, query):
        cache_key = self.cache_key(query)
        raw_data = self.cache.get(cache_key)

        …

        return query.merge_result(result, load=False)

    def cache_key(self, query):
        "Use the query's SQL statement and parameters as the cache key"
        stmt = query.with_labels().statement
        …

And, after updating the tests to abide by the new changes we can validate they still pass.

This refactoring solves the coupling problem, and introduces the ability to change the caching strategy using a Mapper Option. For demonstration, let’s create a new strategy that allow us to provide the cache store explicitly and specify a cache key.

 
class CacheStoreStrategy:
    def __init__(self, cache, cache_key):
        self.cache = cache
        self.error_on_cache_miss = False
        self._cache_key = cache_key

    def exec(self, query):
        cache_key = self.cache_key(query)
        … # same as in InMemoryCacheStrategy

    def cache_key(self, query):
        return self._cache_key

And we also create a Mapper Option that takes care of swapping a query caching strategy by another one we supply:

 
from sqlalchemy.orm.interfaces import MapperOption

class with_caching_strategy(MapperOption):
    def __init__(self, strategy):
        self.strategy = strategy

    def process_query(self, query):
        query.caching_strategy = self.strategy

Finally, let’s write a test case that uses our new strategy:

 
def test_query_with_caching_store_strategy(session):
    # Let's populate the database with some data
    user = User(id=1, name="John Wick")
    session.add(user)

    cache = {}
    cache_key = "user:1"
    strategy = CacheStoreStrategy(cache, cache_key)
    query = session.query(User).options(with_caching_strategy(strategy))

    # The cache is empty
    assert cache_key not in cache

    user = query.one()

    # The cache is filled with the query results
    assert cache_key in cache

    # Let's validate that the query don't hit the database
    # if the cache is filled
    strategy.error_on_cache_miss = True
    user = query.one()

    assert user.id == 1
    assert user.name == "John Wick"
    assert len(user.addresses) == 0
 ❯ pytest -v
========================================================================= test session starts ==========================================================================
collected 5 items

tests/caching_query_test.py::test_query_instance_is_caching_query PASSED                                                                                         [ 20%]
tests/caching_query_test.py::test_db_instance_state_after_pickling PASSED                                                                                        [ 40%]
tests/caching_query_test.py::test_query_results_from_cache PASSED                                                                                                [ 60%]
tests/caching_query_test.py::test_query_results_into_and_from_cache PASSED                                                                                       [ 80%]
tests/caching_query_test.py::test_query_with_caching_store_strategy PASSED                                                                                       [100%]

========================================================================== 5 passed in 0.04s ===========================================================================

In the second part of this blog post we will review how we make usage in Rollbar system of these caching strategies presented here.

References

Related Resources

"Rollbar allows us to go from alerting to impact analysis and resolution in a matter of minutes. Without it we would be flying blind."

Error Monitoring

Start continuously improving your code today.

Get Started Shape