In previous article How To Set Up Python Web Application Development Environment, we have tell you how to create a simple hello world web application in python, this article we will focus on how to write python code to manipulate MySQL database with ORM (Object Relational Mapping).
In a web application, all data, including user information, published logs, comments, and so on, is stored in a database. In our python web app example, we chose MySQL as our database. There are many places in python web app that need to access the database. Accessing a database requires creating a database connection, cursor objects, executing SQL statements, and finally handling exceptions and cleaning up resources. If the code that accesses the database is scattered across functions, it is bound to be unmaintainable and not conducive to code reuse.
So, we’ll first wrap the usual SELECT, INSERT, UPDATE, and DELETE operations in a common function, then we can reuse them later.
Normal synchronous io operations cannot be used in our web app because all users are serviced by a single thread and the program must execute very quickly to handle a large number of user requests. Time-consuming io operations cannot be used synchronously in a program, otherwise the system cannot respond to any other user while waiting for an io operation.
Because of above synchronous io issue, our example web app use
aiohttp framework which is based on
asyncio is an asynchronous model based on collaborative process. This is a principle of asynchronous programming: once you decide to use asynchrony, every layer of the system must be asynchronous.
aiomysql provides asynchronous IO drivers for MySQL databases. Below we will show you how to use
aiomysql to connect and operate MySQL database.
1. Create MySQL DB Connection Pool.
We need to create a global connection pool so that each http request can obtain a database connection directly from the db connection pool. The advantage of using connection pooling is that you don’t have to open and close database connections frequently, but reuse them whenever you need.
The connection pool is stored in the global variable
__pool and the encoding is set to utf8, and set transaction commit to autocommit by default.
@asyncio.coroutine def create_pool(loop, **kw): logging.info('create database connection pool...') global __pool __pool = yield from aiomysql.create_pool( host=kw.get('host', 'localhost'), port=kw.get('port', 3306), user=kw['user'], password=kw['password'], db=kw['db'], charset=kw.get('charset', 'utf8'), autocommit=kw.get('autocommit', True), maxsize=kw.get('maxsize', 10), minsize=kw.get('minsize', 1), loop=loop )
2. DB Table Select Function.
To execute the select statement, we write a select function, passing in the sql statement and sql parameters.
@asyncio.coroutine def select(sql, args, size=None): log(sql, args) global __pool with (yield from __pool) as conn: cur = yield from conn.cursor(aiomysql.DictCursor) yield from cur.execute(sql.replace('?', '%s'), args or ()) if size: rs = yield from cur.fetchmany(size) else: rs = yield from cur.fetchall() yield from cur.close() logging.info('rows returned: %s' % len(rs)) return rs
SQL statement’s placeholders is
?, MySQL’s placeholders is
%s, and the
select() function will automatically translate the two placeholders internally. Be careful to always use SQL with parameters instead of concatenating SQL strings to prevent SQL injection attacks.
yield from calls a subroutine (that is, calls another program in one program) and returns the result of the subroutine directly. If the
size parameter is passed in, the maximum number of records specified is fetched through
fetchmany(); otherwise, all records are fetched through
3. DB Table Insert, Update, Delete Function.
To execute INSERT, UPDATE, and DELETE statements, you can define a generic execute() function because all three SQL executions require the same parameters and return an integer representing the number of rows affected.
@asyncio.coroutine def execute(sql, args): log(sql) with (yield from __pool) as conn: try: cur = yield from conn.cursor() yield from cur.execute(sql.replace('?', '%s'), args) affected = cur.rowcount yield from cur.close() except BaseException as e: raise return affected
The difference between the execute() function and the select() function is that the cursor object does not return the result set, but instead returns the number of results through rowcount.
4. ORM(Object Relational Mapping).
With the basic
execute() functions, we are ready to write a simple ORM. Designing an ORM requires designing from the perspective of the upper caller. Let’s first consider how to define a
User object and then associate the database table
users with it.
from orm import Model, StringField, IntegerField class User(Model): __table__ = 'users' id = IntegerField(primary_key=True) name = StringField()
Notice that the
name defined in the User class are class attributes, not instance attributes. Therefore, properties defined at the class level are used to describe the mapping relationship between User objects and tables, while instance properties must be initialized by
__init__() method, so the two do not interfere.
# Create User instance: user = User(id=123, name='Michael') # Insert user info into db: user.insert() # Find all user object: users = User.findAll()
5. Define Model.
The first thing to define is the base class Model for all ORM mappings.
class Model(dict, metaclass=ModelMetaclass): def __init__(self, **kw): super(Model, self).__init__(**kw) def __getattr__(self, key): try: return self[key] except KeyError: raise AttributeError(r"'Model' object has no attribute '%s'" % key) def __setattr__(self, key, value): self[key] = value def getValue(self, key): return getattr(self, key, None) def getValueOrDefault(self, key): value = getattr(self, key, None) if value is None: field = self.__mappings__[key] if field.default is not None: value = field.default() if callable(field.default) else field.default logging.debug('using default value for %s: %s' % (key, str(value))) setattr(self, key, value) return value
Model inherits from
dict, so it has all
dict capabilities and implements the special methods
__setattr__(), so it can be used as if it were referring to a normal field.
>>> user['id'] 123 >>> user.id 123
And Field and various Field subclasses:
class Field(object): def __init__(self, name, column_type, primary_key, default): self.name = name self.column_type = column_type self.primary_key = primary_key self.default = default def __str__(self): return '<%s, %s:%s>' % (self.__class__.__name__, self.column_type, self.name)
StringField which map to
class StringField(Field): def __init__(self, name=None, primary_key=False, default=None, ddl='varchar(100)'): super().__init__(name, ddl, primary_key, default)
Notice that the
Model is just a base class. How do you read the mapping information for a specific subclass, such as
User? The answer is through
class ModelMetaclass(type): def __new__(cls, name, bases, attrs): # Exclude the Model class itself: if name=='Model': return type.__new__(cls, name, bases, attrs) # Get table name: tableName = attrs.get('__table__', None) or name logging.info('found model: %s (table: %s)' % (name, tableName)) # Get all Field and primary key: mappings = dict() fields =  primaryKey = None for k, v in attrs.items(): if isinstance(v, Field): logging.info(' found mapping: %s ==> %s' % (k, v)) mappings[k] = v if v.primary_key: # Find the primary key: if primaryKey: raise RuntimeError('Duplicate primary key for field: %s' % k) primaryKey = k else: fields.append(k) if not primaryKey: raise RuntimeError('Primary key not found.') for k in mappings.keys(): attrs.pop(k) escaped_fields = list(map(lambda f: '`%s`' % f, fields)) attrs['__mappings__'] = mappings # Saves the mapping of attributes and columns attrs['__table__'] = tableName attrs['__primary_key__'] = primaryKey # Primary key property name attrs['__fields__'] = fields # Property names other than primary keys # Construct the default SELECT, INSERT, UPDATE, and DELETE statements: attrs['__select__'] = 'select `%s`, %s from `%s`' % (primaryKey, ', '.join(escaped_fields), tableName) attrs['__insert__'] = 'insert into `%s` (%s, `%s`) values (%s)' % (tableName, ', '.join(escaped_fields), primaryKey, create_args_string(len(escaped_fields) + 1)) attrs['__update__'] = 'update `%s` set %s where `%s`=?' % (tableName, ', '.join(map(lambda f: '`%s`=?' % (mappings.get(f).name or f), fields)), primaryKey) attrs['__delete__'] = 'delete from `%s` where `%s`=?' % (tableName, primaryKey) return type.__new__(cls, name, bases, attrs)
In this way, any class inherited from the Model (such as User) will automatically scan the mapping relationship through the Model metaclass and store it in its own class properties such as
__mappings__. We then add class methods to the Model class so that all subclasses can call those class methods.
class Model(dict): ... @classmethod @asyncio.coroutine def find(cls, pk): ' find object by primary key. ' rs = yield from select('%s where `%s`=?' % (cls.__select__, cls.__primary_key__), [pk], 1) if len(rs) == 0: return None return cls(**rs)
The User class can now implement primary key lookup through class methods.
user = yield from User.find('123')
Adding instance methods to the Model class to lets all subclasses invoke instance methods.
class Model(dict): ... @asyncio.coroutine def save(self): args = list(map(self.getValueOrDefault, self.__fields__)) args.append(self.getValueOrDefault(self.__primary_key__)) rows = yield from execute(self.__insert__, args) if rows != 1: logging.warn('failed to insert record: affected rows: %s' % rows)
This way, you can store an instance of a User in the database.
user = User(id=123, name='Michael') yield from user.save()
The last step is to improve ORM. For search, we can implement the following methods.
- findAll() – find by where condition.
- findNumber() – looks up by where condition, but returns an integer, suitable for SQL of type select count(*).
And update() and remove() methods. All of these methods have to be decorated with
@asyncio.coroutine and become a coroutine.
You need to be very careful when you call below code.
None of this works because the call to save() simply creates a coroutine and does not execute it. Be sure to use below code to make insert operation is actually performed.
yield from user.save()