Python Web App ORM Example

In previous article How To Set Up Python Web Application Development Environment, we have tell you how to create a simple hello world web application in python, this article we will focus on how to write python code to manipulate MySQL database with ORM (Object Relational Mapping).

In a web application, all data, including user information, published logs, comments, and so on, is stored in a database. In our python web app example, we chose MySQL as our database. There are many places in python web app that need to access the database. Accessing a database requires creating a database connection, cursor objects, executing SQL statements, and finally handling exceptions and cleaning up resources. If the code that accesses the database is scattered across functions, it is bound to be unmaintainable and not conducive to code reuse.

So, we’ll first wrap the usual SELECT, INSERT, UPDATE, and DELETE operations in a common function, then we can reuse them later.

Normal synchronous io operations cannot be used in our web app because all users are serviced by a single thread and the program must execute very quickly to handle a large number of user requests. Time-consuming io operations cannot be used synchronously in a program, otherwise the system cannot respond to any other user while waiting for an io operation.

Because of above synchronous io issue, our example web app use aiohttp framework which is based on asyncio, and asyncio is an asynchronous model based on collaborative process. This is a principle of asynchronous programming: once you decide to use asynchrony, every layer of the system must be asynchronous.

Fortunately, aiomysql provides asynchronous IO drivers for MySQL databases. Below we will show you how to use aiomysql to connect and operate MySQL database.

1. Create MySQL DB Connection Pool.

We need to create a global connection pool so that each http request can obtain a database connection directly from the db connection pool. The advantage of using connection pooling is that you don’t have to open and close database connections frequently, but reuse them whenever you need.

The connection pool is stored in the global variable __pool and the encoding is set to utf8, and set transaction commit to autocommit by default.

@asyncio.coroutine
def create_pool(loop, **kw):
    logging.info('create database connection pool...')
    global __pool
    __pool = yield from aiomysql.create_pool(
        host=kw.get('host', 'localhost'),
        port=kw.get('port', 3306),
        user=kw['user'],
        password=kw['password'],
        db=kw['db'],
        charset=kw.get('charset', 'utf8'),
        autocommit=kw.get('autocommit', True),
        maxsize=kw.get('maxsize', 10),
        minsize=kw.get('minsize', 1),
        loop=loop
    )

2. DB Table Select Function.

To execute the select statement, we write a select function, passing in the sql statement and sql parameters.

@asyncio.coroutine
def select(sql, args, size=None):
    log(sql, args)
    global __pool
    with (yield from __pool) as conn:
        cur = yield from conn.cursor(aiomysql.DictCursor)
        yield from cur.execute(sql.replace('?', '%s'), args or ())
        if size:
            rs = yield from cur.fetchmany(size)
        else:
            rs = yield from cur.fetchall()
        yield from cur.close()
        logging.info('rows returned: %s' % len(rs))
        return rs

SQL statement’s placeholders is ?, MySQL’s placeholders is %s, and the select() function will automatically translate the two placeholders internally. Be careful to always use SQL with parameters instead of concatenating SQL strings to prevent SQL injection attacks.

Notice that yield from calls a subroutine (that is, calls another program in one program) and returns the result of the subroutine directly. If the size parameter is passed in, the maximum number of records specified is fetched through fetchmany(); otherwise, all records are fetched through fetchall().

3. DB Table Insert, Update, Delete Function.

To execute INSERT, UPDATE, and DELETE statements, you can define a generic execute() function because all three SQL executions require the same parameters and return an integer representing the number of rows affected.

@asyncio.coroutine
def execute(sql, args):
    log(sql)
    with (yield from __pool) as conn:
        try:
            cur = yield from conn.cursor()
            yield from cur.execute(sql.replace('?', '%s'), args)
            affected = cur.rowcount
            yield from cur.close()
        except BaseException as e:
            raise
        return affected

The difference between the execute() function and the select() function is that the cursor object does not return the result set, but instead returns the number of results through rowcount.

4. ORM(Object Relational Mapping).

With the basic select() and execute() functions, we are ready to write a simple ORM. Designing an ORM requires designing from the perspective of the upper caller. Let’s first consider how to define a User object and then associate the database table users with it.

from orm import Model, StringField, IntegerField

class User(Model):
    __table__ = 'users'

    id = IntegerField(primary_key=True)
    name = StringField()

Notice that the __table__, id, and name defined in the User class are class attributes, not instance attributes. Therefore, properties defined at the class level are used to describe the mapping relationship between User objects and tables, while instance properties must be initialized by __init__() method, so the two do not interfere.

# Create User instance:
user = User(id=123, name='Michael')
# Insert user info into db:
user.insert()
# Find all user object:
users = User.findAll()

5. Define Model.

The first thing to define is the base class Model for all ORM mappings.

class Model(dict, metaclass=ModelMetaclass):

    def __init__(self, **kw):
        super(Model, self).__init__(**kw)

    def __getattr__(self, key):
        try:
            return self[key]
        except KeyError:
            raise AttributeError(r"'Model' object has no attribute '%s'" % key)

    def __setattr__(self, key, value):
        self[key] = value

    def getValue(self, key):
        return getattr(self, key, None)

    def getValueOrDefault(self, key):
        value = getattr(self, key, None)
        if value is None:
            field = self.__mappings__[key]
            if field.default is not None:
                value = field.default() if callable(field.default) else field.default
                logging.debug('using default value for %s: %s' % (key, str(value)))
                setattr(self, key, value)
        return value

The Model inherits from dict, so it has all dict capabilities and implements the special methods __getattr__() and __setattr__(), so it can be used as if it were referring to a normal field.

>>> user['id']
123
>>> user.id
123

And Field and various Field subclasses:

class Field(object):

    def __init__(self, name, column_type, primary_key, default):
        self.name = name
        self.column_type = column_type
        self.primary_key = primary_key
        self.default = default

    def __str__(self):
        return '<%s, %s:%s>' % (self.__class__.__name__, self.column_type, self.name)

StringField which map to varchar:

class StringField(Field):

    def __init__(self, name=None, primary_key=False, default=None, ddl='varchar(100)'):
        super().__init__(name, ddl, primary_key, default)

Notice that the Model is just a base class. How do you read the mapping information for a specific subclass, such as User? The answer is through metaclass: ModelMetaclass.

class ModelMetaclass(type):

    def __new__(cls, name, bases, attrs):
        # Exclude the Model class itself:
        if name=='Model':
            return type.__new__(cls, name, bases, attrs)
        # Get table name:
        tableName = attrs.get('__table__', None) or name
        logging.info('found model: %s (table: %s)' % (name, tableName))
        # Get all Field and primary key:
        mappings = dict()
        fields = []
        primaryKey = None
        for k, v in attrs.items():
            if isinstance(v, Field):
                logging.info('  found mapping: %s ==> %s' % (k, v))
                mappings[k] = v
                if v.primary_key:
                    # Find the primary key:
                    if primaryKey:
                        raise RuntimeError('Duplicate primary key for field: %s' % k)
                    primaryKey = k
                else:
                    fields.append(k)
        if not primaryKey:
            raise RuntimeError('Primary key not found.')
        for k in mappings.keys():
            attrs.pop(k)
        escaped_fields = list(map(lambda f: '`%s`' % f, fields))
        attrs['__mappings__'] = mappings # Saves the mapping of attributes and columns
        attrs['__table__'] = tableName
        attrs['__primary_key__'] = primaryKey # Primary key property name
        attrs['__fields__'] = fields # Property names other than primary keys
        # Construct the default SELECT, INSERT, UPDATE, and DELETE statements:
        attrs['__select__'] = 'select `%s`, %s from `%s`' % (primaryKey, ', '.join(escaped_fields), tableName)
        attrs['__insert__'] = 'insert into `%s` (%s, `%s`) values (%s)' % (tableName, ', '.join(escaped_fields), primaryKey, create_args_string(len(escaped_fields) + 1))
        attrs['__update__'] = 'update `%s` set %s where `%s`=?' % (tableName, ', '.join(map(lambda f: '`%s`=?' % (mappings.get(f).name or f), fields)), primaryKey)
        attrs['__delete__'] = 'delete from `%s` where `%s`=?' % (tableName, primaryKey)
        return type.__new__(cls, name, bases, attrs)

In this way, any class inherited from the Model (such as User) will automatically scan the mapping relationship through the Model metaclass and store it in its own class properties such as __table__ and __mappings__. We then add class methods to the Model class so that all subclasses can call those class methods.

class Model(dict):

    ...

    @classmethod
    @asyncio.coroutine
    def find(cls, pk):
        ' find object by primary key. '
        rs = yield from select('%s where `%s`=?' % (cls.__select__, cls.__primary_key__), [pk], 1)
        if len(rs) == 0:
            return None
        return cls(**rs[0])

The User class can now implement primary key lookup through class methods.

user = yield from User.find('123')

Adding instance methods to the Model class to lets all subclasses invoke instance methods.

class Model(dict):

    ...

    @asyncio.coroutine
    def save(self):
        args = list(map(self.getValueOrDefault, self.__fields__))
        args.append(self.getValueOrDefault(self.__primary_key__))
        rows = yield from execute(self.__insert__, args)
        if rows != 1:
            logging.warn('failed to insert record: affected rows: %s' % rows)

This way, you can store an instance of a User in the database.

user = User(id=123, name='Michael')
yield from user.save()

The last step is to improve ORM. For search, we can implement the following methods.

  1. findAll() – find by where condition.
  2. findNumber() – looks up by where condition, but returns an integer, suitable for SQL of type select count(*).

And update() and remove() methods. All of these methods have to be decorated with @asyncio.coroutine and become a coroutine.

You need to be very careful when you call below code.

user.save()

None of this works because the call to save() simply creates a coroutine and does not execute it. Be sure to use below code to make insert operation is actually performed.

yield from user.save()

Leave a Comment

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.