Compare commits

...

2 Commits

Author SHA1 Message Date
qwer_poiuy df7006915c Merge branch 'master' of http://git.zjvtit.net/21653B118/demo 2021-12-30 17:31:24 +08:00
qwer_poiuy cf2ec6b2b1 Day07_test_ending 2021-12-30 17:28:24 +08:00
30 changed files with 856 additions and 11 deletions

2
.gitattributes vendored Normal file
View File

@ -0,0 +1,2 @@
# Auto detect text files and perform LF normalization
# text=auto

View File

2
readme Normal file
View File

@ -0,0 +1,2 @@
this is a python project
learning python

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

41
www/apis.py Normal file
View File

@ -0,0 +1,41 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = 'Michael Liao'
'''
JSON API definition.
'''
import json, logging, inspect, functools
class APIError(Exception):
'''
the base APIError which contains error(required), data(optional) and message(optional).
'''
def __init__(self, error, data='', message=''):
super(APIError, self).__init__(message)
self.error = error
self.data = data
self.message = message
class APIValueError(APIError):
'''
Indicate the input value has error or invalid. The data specifies the error field of input form.
'''
def __init__(self, field, message=''):
super(APIValueError, self).__init__('value:invalid', field, message)
class APIResourceNotFoundError(APIError):
'''
Indicate the resource was not found. The data specifies the resource name.
'''
def __init__(self, field, message=''):
super(APIResourceNotFoundError, self).__init__('value:notfound', field, message)
class APIPermissionError(APIError):
'''
Indicate the api has no permission.
'''
def __init__(self, message=''):
super(APIPermissionError, self).__init__('permission:forbidden', 'permission', message)

View File

@ -1,3 +1,6 @@
from config import configs
import logging; logging.basicConfig(level=logging.INFO) import logging; logging.basicConfig(level=logging.INFO)
# logging是Python 的日志记录工具level表示设置根记录器级别去指定 level. # logging是Python 的日志记录工具level表示设置根记录器级别去指定 level.
# 日志级别: # 日志级别:
@ -7,25 +10,153 @@ import logging; logging.basicConfig(level=logging.INFO)
# INFO 20 # INFO 20
# DEBUG 10 # DEBUG 10
# NOTSET 0 # NOTSET 0
# __auther__ = 'lzj'
# 导入 logging 模块并使用';'对其全局配置 # 导入 logging 模块并使用';'对其全局配置
# basicConfig 配置了 level 信息level 配置为 INFO 信息,即只输出 INFO 级别的信息 # basicConfig 配置了 level 信息level 配置为 INFO 信息,即只输出 INFO 级别的信息
import asyncio, os, json, time
from datetime import datetime
import asyncio
from aiohttp import web from aiohttp import web
from jinja2 import Environment, FileSystemLoader
import orm
from coroweb import add_routes, add_static
async def index(request): def init_jinja2(app, **kw):
return web.Response(body=b'<h1>fuck</h1>', headers={'content-type': 'text/html'}) logging.info('init jinja2...')
options = dict(
autoescape = kw.get('autoescape', True),
block_start_string = kw.get('block_start_string', '{%'),
block_end_string = kw.get('block_end_string', '%}'),
variable_start_string = kw.get('variable_start_string', '{{'),
variable_end_string = kw.get('variable_end_string', '}}'),
auto_reload = kw.get('auto_reload', True)
)
path = kw.get('path', None)
if path is None:
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates')
logging.info('set jinja2 template path: %s' % path)
env = Environment(loader=FileSystemLoader(path), **options)
filters = kw.get('filters', None)
if filters is not None:
for name, f in filters.items():
env.filters[name] = f
app['__templating__'] = env
async def logger_factory(app, handler):
async def logger(request):
logging.info('Request: %s %s' % (request.method, request.path))
# await asyncio.sleep(0.3)
return (await handler(request))
return logger
def init(): async def data_factory(app, handler):
# 创建 web.Application ,web app的骨架 async def parse_data(request):
app = web.Application() if request.method == 'POST':
app.router.add_get('/', index) if request.content_type.startswith('application/json'):
web.run_app(app, host='127.0.0.1', port=9000) request.__data__ = await request.json()
logging.info('request json: %s' % str(request.__data__))
elif request.content_type.startswith('application/x-www-form-urlencoded'):
request.__data__ = await request.post()
logging.info('request form: %s' % str(request.__data__))
return (await handler(request))
return parse_data
async def response_factory(app, handler):
async def response(request):
logging.info('Response handler...')
r = await handler(request)
if isinstance(r, web.StreamResponse):
return r
if isinstance(r, bytes):
resp = web.Response(body=r)
resp.content_type = 'application/octet-stream'
return resp
if isinstance(r, str):
if r.startswith('redirect:'):
return web.HTTPFound(r[9:])
resp = web.Response(body=r.encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r, dict):
template = r.get('__template__')
if template is None:
resp = web.Response(body=json.dumps(r, ensure_ascii=False, default=lambda o: o.__dict__).encode('utf-8'))
resp.content_type = 'application/json;charset=utf-8'
return resp
else:
resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r, int) and r >= 100 and r < 600:
return web.Response(text=r)
if isinstance(r, tuple) and len(r) == 2:
t, m = r
if isinstance(t, int) and t >= 100 and t < 600:
return web.Response(text=t,body=str(m))
# default:
resp = web.Response(body=str(r).encode('utf-8'))
resp.content_type = 'text/plain;charset=utf-8'
return resp
return response
if __name__ == '__main__': def datetime_filter(t):
init() delta = int(time.time() - t)
if delta < 60:
return u'1分钟前'
if delta < 3600:
return u'%s分钟前' % (delta // 60)
if delta < 86400:
return u'%s小时前' % (delta // 3600)
if delta < 604800:
return u'%s天前' % (delta // 86400)
dt = datetime.fromtimestamp(t)
return u'%s%s%s' % (dt.year, dt.month, dt.day)
# async def index(request):
# return web.Response(body=b'<h1>....vv</h1>', headers={'content-type': 'text/html'})
# def init():
# # 创建 web.Application ,web app的骨架
# app = web.Application()
# app.router.add_get('/', index)
# web.run_app(app, host='127.0.0.1', port=9000)
async def init(loop):
await orm.create_pool(loop=loop, host='127.0.0.1', port=3306, user='root', password='123456', db='demo')
app = web.Application(loop=loop, middlewares=[
logger_factory, response_factory
])
init_jinja2(app, filters=dict(datetime=datetime_filter))
add_routes(app, 'handlers')
add_static(app)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 9001)
logging.info('server started at http://127.0.0.1:9001...')
return srv
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()
# 新版写法,会报错。
# async def init_db(app):
# # If on Linux, use another user instead of 'root'
# await orm.create_pool(
# host=configs.db.host,
# port=configs.db.port,
# user=configs.db.user,
# password=configs.db.password,
# db=configs.db.db
#
# )
#
#
# app = web.Application(middlewares=[
# logger_factory,
# response_factory
# ])
# init_jinja2(app, filters=dict(datatime=datetime_filter))
# add_routes(app, 'handlers')
# add_static(app)
# app.on_startup.append(init_db)
# web.run_app(app, host='localhost', port=9000)

49
www/config.py Normal file
View File

@ -0,0 +1,49 @@
import config_default
class Dict(dict):
'''
Simple dict but support access as x.y style.
'''
def __init__(self, names=(), values=(), **kw):
super(Dict, self).__init__(**kw)
# zip 可以把 names 和 values 合成为一个列表 ,这里为了方便给 self 赋值
for k, v in zip(names, values):
self[k] = v
def __getattr__(self, key):
try:
return self[key]
except KeyError:
raise AttributeError(r"'Dict' object has no attribute '%s'" % key)
def __setattr__(self, key, value):
self[key] = value
def merge(defaults, override):
r = {}
for k, v in defaults.items():
if k in override:
if isinstance(v, dict):
r[k] = merge(v, override[k])
else:
r[k] = override[k]
else:
r[k] = v
return r
def toDict(d):
D = Dict()
for k, v in d.items():
D[k] = toDict(v) if isinstance(v, dict) else v
return D
configs = config_default.configs
try:
import config_override
configs = merge(configs, config_override.configs)
except ImportError:
pass
configs = toDict(configs)

14
www/config_default.py Normal file
View File

@ -0,0 +1,14 @@
configs = {
'debug': True,
'db': {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'password': '123456',
'db': 'demo'
},
'session': {
'secret': 'demo'
}
}

6
www/config_override.py Normal file
View File

@ -0,0 +1,6 @@
configs = {
'db': {
'host': '127.0.0.1'
}
}

220
www/coroweb.py Normal file
View File

@ -0,0 +1,220 @@
import asyncio, os, inspect, logging, functools
from urllib import parse
from aiohttp import web
from apis import APIError
def get(path):
# 函数通过@get()的装饰就附带了URL信息。
'''
Define decorator @get('/path')
'''
# wrapper装饰器
# 装饰器本质上是一个Python函数它可以让其他函数在不需要做任何代码变动的前提下增加额外功能
# 装饰器的返回值也是一个函数对象。它经常用于有切面需求的场景,比如:插入日志、性能测试、事务处理、缓存、权限校验等场景。
# 装饰器是解决这类问题的绝佳设计,有了装饰器,我们就可以抽离出大量与函数功能本身无关的雷同代码并继续重用。
# 可变参数*args和关键字参数**kwargs有了这两个参数装饰器就可以用于任意目标函数
def decorator(func):
# 返回了一个partial对象这个对象对update_wrapper进行了包装固定了wrappedassignedupdated三个参数。
# wraps本省就是一个装饰器因为它返回的是一个“函数”即partial对象这个对象接收函数作为参数同时以函数作为返回值。
# 把wrapped函数的属性拷贝到wrapper函数中。
# wrapped是被装饰的原函数
# wrapper是被装饰器装饰后的新函数。
@functools.wraps(func)
def wrapper(*args, **kw):
return func(*args, **kw)
wrapper.__method__ = 'GET'
wrapper.__route__ = path
return wrapper
return decorator
def post(path):
'''
Define decorator @post('/path')
'''
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kw):
return func(*args, **kw)
wrapper.__method__ = 'POST'
wrapper.__route__ = path
return wrapper
return decorator
def get_required_kw_args(fn):
args = []
params = inspect.signature(fn).parameters
for name, param in params.items():
if param.kind == inspect.Parameter.KEYWORD_ONLY and param.default == inspect.Parameter.empty:
# 在 args 里加上仅包含关键字keyword的参数 且不包括默认值, 然后返回 args
args.append(name)
return tuple(args)
# 所以这个函数的作用和名称一样, 得到需要的关键字参数, 下面同理
def get_named_kw_args(fn):
args = []
params = inspect.signature(fn).parameters
for name, param in params.items():
if param.kind == inspect.Parameter.KEYWORD_ONLY:
args.append(name)
return tuple(args)
def has_named_kw_args(fn):
params = inspect.signature(fn).parameters
for _, param in params.items():
if param.kind == inspect.Parameter.KEYWORD_ONLY:
return True
def has_var_kw_arg(fn):
params = inspect.signature(fn).parameters
for _, param in params.items():
if param.kind == inspect.Parameter.VAR_KEYWORD:
return True
def has_request_arg(fn):
sig = inspect.signature(fn)
params = sig.parameters
found = False
for name, param in params.items():
if name == 'request':
found = True
continue
if found and (param.kind != inspect.Parameter.VAR_POSITIONAL and param.kind != inspect.Parameter.KEYWORD_ONLY and param.kind != inspect.Parameter.VAR_KEYWORD):
raise ValueError('request parameter must be the last named parameter in function: %s%s' % (fn.__name__, str(sig)))
return found
# RequestHandler目的就是从URL函数中分析其需要接收的参数从request中获取必要的参数
# 调用URL函数然后把结果转换为web.Response对象
class RequestHandler(object):
def __init__(self, app, fn):
self._app = app
self._func = fn
self._has_request_arg = has_request_arg(fn)
self._has_var_kw_arg = has_var_kw_arg(fn)
self._has_named_kw_args = has_named_kw_args(fn)
self._named_kw_args = get_named_kw_args(fn)
self._required_kw_args = get_required_kw_args(fn)
async def __call__(self, request):
kw = None
if self._has_var_kw_arg or self._has_named_kw_args or self._required_kw_args:
# 只要获取到了var 姓名 或者需要的关键字之一就判断是不是post请求
if request.method == 'POST':
# 如果没有内容类型 text/html等之类的参数
if not request.content_type:
return web.HTTPBadRequest(reason='Missing Content-Type.')
ct = request.content_type.lower() #全转小写
# 如果内容类型是以json开头json文件
if ct.startswith('application/json'):
params = await request.json()
# inspect库中的检查是否是对象的方法
if not isinstance(params, dict):
return web.HTTPBadRequest(reason='JSON body must be object.')
kw = params
#判断内容类型开头是不是原生表单或者表单请求
elif ct.startswith('application/x-www-form-urlencoded') or ct.startswith('multipart/form-data'):
params = await request.post()
kw = dict(**params)
else:
# 不支持的类型
return web.HTTPBadRequest(reason='Unsupported Content-Type: %s' % request.content_type)
# 判断是不是get请求
if request.method == 'GET':
qs = request.query_string
if qs:
kw = dict()
for k, v in parse.parse_qs(qs, True).items():
kw[k] = v[0]
# 都没有获取到任何信息
if kw is None:
kw = dict(**request.match_info)
else:
if not self._has_var_kw_arg and self._named_kw_args:
# remove all unamed kw:
copy = dict()
for name in self._named_kw_args:
if name in kw:
copy[name] = kw[name]
kw = copy
# check named arg:
for k, v in request.match_info.items():
if k in kw:
logging.warning('Duplicate arg name in named arg and kw args: %s' % k)
kw[k] = v
if self._has_request_arg:
kw['request'] = request
# check required kw:
if self._required_kw_args:
for name in self._required_kw_args:
if not name in kw:
return web.HTTPBadRequest(reason='Missing argument: %s' % name)
logging.info('call with args: %s' % str(kw))
try:
r = await self._func(**kw)
return r
except APIError as e:
return dict(error=e.error, data=e.data, message=e.message)
return RequestHandler.__call__()
def add_static(app):
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static')
app.router.add_static('/static/', path)
logging.info('add static %s => %s' % ('/static/', path))
# 改变静态网址下路径
def add_static(app):
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static')
app.router.add_static('/static/', path)
logging.info('add static %s => %s' % ('/static/', path))
# add_route函数用来注册一个URL处理函数
def add_route(app, fn):
# getattr() 函数用于返回一个对象属性值。
# 类型
# 路径
method = getattr(fn, '__method__', None)
path = getattr(fn, '__route__', None)
if path is None or method is None:
raise ValueError('@get or @post not defined in %s.' % str(fn))
# iscoroutinefunctionReturn True if the object is a ‎协程函数
# isgeneratorfunctionReturn True if the object is a Python 生成器函数
if not asyncio.iscoroutinefunction(fn) and not inspect.isgeneratorfunction(fn):
# asyncio.coroutine
# 用来标记基于生成器的协程的装饰器。
# 此装饰器使得旧式的基于生成器的协程能与 async/await 代码相兼容
# 此装饰器不应该被用于 async def 协程
fn = asyncio.coroutine(fn)
#日志内容inspect.signature(fn).parameters获取函数参数的参数名参数的属性参数的默认值
logging.info('add route %s %s => %s(%s)' % (method, path, fn.__name__, ', '.join(inspect.signature(fn).parameters.keys())))
app.router.add_route(method, path, RequestHandler(app, fn))
# 把很多次add_route()注册的调用
def add_routes(app, module_name):
# Python rfind() 返回字符串最后一次出现的位置,如果没有匹配项则返回 -1。
n = module_name.rfind('.')#对应python文件存为handles使用rfind找到handles.py文件
if n == (-1):#如果没有匹配项,返回(-1
# __import__() 函数用于动态加载类和函数 。
# 如果一个模块经常变化就可以使用 __import__() 来动态载入。
# globals() 函数会以字典类型返回当前位置的全部全局变量。
# locals() 函数会以字典类型返回当前位置的全部局部变量。
mod = __import__(module_name, globals(), locals())
else:#找到了,返回字符串出现的最后一次位置。
name = module_name[n+1:]
mod = getattr(__import__(module_name[:n], globals(), locals(), [name]), name)
for attr in dir(mod):
if attr.startswith('_'):
continue
fn = getattr(mod, attr)
# callable() 函数用于检查一个对象是否是可调用的。如果返回 Trueobject 仍然可能调用失败;但如果返回 False调用对象 object 绝对不会成功。
# 对于函数、方法、lambda 函式、 类以及实现了 __call__ 方法的类实例, 它都返回 True。
if callable(fn):#如果找到了对应的处理方法。
method = getattr(fn, '__method__', None)
path = getattr(fn, '__route__', None)
if method and path:
# logging.info('add rosute ')
add_route(app, fn)

16
www/handlers.py Normal file
View File

@ -0,0 +1,16 @@
# url handlers
import re, time, json, logging, hashlib, base64, asyncio
from coroweb import get, post
from models import User, Comment, Blog, next_id
@get('/')
async def index(request):
users = await User.findAll()
return {
'__template__': 'test.html',
'users': users
}

48
www/models.py Normal file
View File

@ -0,0 +1,48 @@
import time, uuid
from orm import Model, StringField, BooleanField, FloatField, TextField
def next_id():
# uuid.uuid4() 可以生成一个随机的 UUID 目的是区别不同事务(大概)
# hex 可以把自身返回为一个16进制整数 所以这个函数就是生成各种 id ,里面还包含时间
return '%015d%s000' % (int(time.time() * 1000), uuid.uuid4().hex)
class User(Model):
__table__ = 'users'
# varchar 为 MySQL 里的数据类型
id = StringField(primary_key=True, default=next_id, ddl='varchar(50)')
email = StringField(ddl='varchar(50)')
password = StringField(ddl='varchar(50)') #有改动passwd改成password
# admin = BooleanField()
name = StringField(ddl='varchar(50)')
image = StringField(ddl='varchar(500)')
# created_at = FloatField(default=time.time)
# time.time 可以设置当前日期和时间, 把日期和时间储存为 float 类型 记录到 create_at 里
class Blog(Model):
__table__ = 'blogs'
id = StringField(primary_key=True, default=next_id, ddl='varchar(50)')
user_id = StringField(ddl='varchar(50)')
user_name = StringField(ddl='varchar(50)')
user_image = StringField(ddl='varchar(500)')
name = StringField(ddl='varchar(50)')
summary = StringField(ddl='varchar(200)')
content = TextField()
created_at = FloatField(default=time.time)
class Comment(Model):
__table__ = 'comments'
id = StringField(primary_key=True, default=next_id, ddl='varchar(50)')
blog_id = StringField(ddl='varchar(50)')
user_id = StringField(ddl='varchar(50)')
user_name = StringField(ddl='varchar(50)')
user_image = StringField(ddl='varchar(500)')
content = TextField()
created_at = FloatField(default=time.time)
# 这些属性可以按照自己的需要进行增减
# 之后对数据库的测试在 www 文件夹的 test_sql.py 里

281
www/orm.py Normal file
View File

@ -0,0 +1,281 @@
import asyncio, logging, aiomysql
# 创建基本日志函数
def log(sql, args=()):
logging.info('SQL: %s' % sql)
# 创建连接池函数
async def create_pool(loop, **kw):
logging.info('create database connection pool...')
# 声明 __pool 为全局变量
global __pool
# 使用这些基本参数来创建连接池
# await 和 async 是联动的异步IO
# 连接池是一种标准技术,用于在内存中维护长时间运行的连接,以便有效地重用,
# 并未应用程序可能同时使用的连接总数提供管理。特别是对于服务器端Web应用程序
# 连接池是内存中维护活动数据库连接池的标准方法,这些活动数据库连接在请求之间重复使用。
# 使用这些基本参数来创建连接池
# await 和 async 是联动的异步IO
__pool = await aiomysql.create_pool(
host=kw.get('host', 'localhost'),
port=kw.get('port', 3306),
user=kw['user'],
password=kw['password'],
db=kw['db'],
charset=kw.get('charset', 'utf8'),
autocommit=kw.get('autocommit', True),
maxsize=kw.get('maxsize', 10),
minsize=kw.get('minsize', 1),
loop=loop
)
async def select(sql, args, size=None):
log(sql, args)
global __pool
# 防止多个程序同时执行,达到异步效果
with (await __pool) as conn:
# 'aiomysql.DictCursor'要求返回字典格式
cur = await conn.cursor(aiomysql.DictCursor)
# cursor 游标实例可以调用 execute 来执行一条单独的 SQL 语句
await cur.execute(sql.replace('?', '%s'), args or())
# size 为空时为 False上面定义了初始值为 None ,具体得看传入的参数有没有定义 size
if size:
# fetchmany 可以获取行数为 size 的多行查询结果集,返回一个列表
rs = await cur.fetchmany(size)
else:
# fetchall 可以获取一个查询结果的所有(剩余)行,返回一个列表
rs = await cur.fetchall()
# close() ,立即关闭 cursor ,从这一时刻起该 cursor 将不再可用
await cur.close()
# 日志:提示返回了多少行
logging.info('rows returned: %s' % len(rs))
# select 函数给我们从 SQL 返回了一个列表
return rs
# execute :执行
async def execute(sql, args):
log(sql)
global __pool
with (await __pool) as conn:
try:
cur = await conn.cursor()
await cur.execute(sql.replace('?', '%s'),args)
# rowcount 获取行数,应该表示的是该函数影响的行数
affected = cur.rowcount
await cur.close()
except BaseException as _:
# except BaseException as e:
# 将错误抛出BaseEXception 是异常不用管
raise
# 返回行数
return affected
def create_args_string(num):
L = []
for _ in range(num):
L.append('?')
return ', '.join(L)
# Model 是一个基类,所以先定义 ModelMetaclass ,再在定义 Model 时使用 metaclass 参数
class ModelMetaclass(type):
# __new__()方法接收到的参数依次是:
# cls当前准备创建的类的对象 class
# name类的名字 str
# bases类继承的父类集合 Tuple
# attrs类的方法集合
def __new__(cls, name, bases, attrs):
# 排除 Model 类本身,返回它自己
if name=='Model':
return type.__new__(cls, name, bases, attrs)
# 获取 table 名称
tableName = attrs.get('__table__', None) or name
# 日志:找到名为 name 的 model
logging.info('found model: %s (table: %s)' % (name, tableName))
# 获取 所有的 Field 和主键名
mappings = dict()
fields = []
primaryKey = None
# attrs.items 取决于 __new__ 传入的 attrs 参数
for k,v in attrs.items():
# isinstance 函数:如果 v 和 Field 类型相同则返回 True ,不相同则 False
if isinstance(v, Field):
logging.info(' found mapping: %s ==> %s' % (k,v))
mappings[k] = v
# 这里的 v.primary_key 我理解为 :只要 primary_key 为 True 则这个 field 为主键
if v.primary_key:
# 找到主键,如果主键 primaryKey 有值时,返回一个错误
if primaryKey:
raise RuntimeError('Duplicate primary key for field: %s' % k)
# 然后直接给主键赋值
primaryKey = k
else:
# 没找到主键的话,直接在 fields 里加上 k
fields.append(k)
if not primaryKey:
# 如果主键为 None 就报错
raise RuntimeError('Primary key not found.')
for k in mappings.keys():
# pop :如果 key 存在于字典中则将其移除并返回其值,否则返回 default
attrs.pop(k)
escaped_fields = list(map(lambda f: '`%s`' % f, fields))
attrs['__mappings__'] = mappings # 保存属性和列的映射关系
attrs['__table__'] = tableName # table 名
attrs['__primary_key__'] = primaryKey # 主键属性名
attrs['__fields__'] = fields # 除主键外的属性名
# 构造默认的 SELECT, INSERT, UPDAT E和 DELETE 语句
attrs['__select__'] = 'select `%s`, %s from `%s`' % (primaryKey, ', '.join(escaped_fields), tableName)
attrs['__insert__'] = 'insert into `%s` (%s, `%s`) values (%s)' % (tableName, ', '.join(escaped_fields), primaryKey, create_args_string(len(escaped_fields) + 1))
attrs['__update__'] = 'update `%s` set %s where `%s`=?' % (tableName, ', '.join(map(lambda f: '`%s`=?' % (mappings.get(f).name or f), fields)), primaryKey)
attrs['__delete__'] = 'delete from `%s` where `%s`=?' % (tableName, primaryKey)
return type.__new__(cls, name, bases, attrs)
# metaclass 参数提示 Model 要通过上面的 __new__ 来创建
class Model(dict, metaclass=ModelMetaclass):
def __init__(self, **kw):
# super 用来引用父类? 引用了 ModelMetaclass super 文档:
super(Model, self).__init__(**kw)
# 返回参数为 key 的自身属性, 如果出错则报具体错误
def __getattr__(self, key):
try:
return self[key]
except KeyError:
raise AttributeError(r"'Model' object has no attribute '%s'" % key)
# 设置自身属性
def __setattr__(self, key, value):
self[key] = value
# 通过属性返回想要的值
def getValue(self, key):
return getattr(self, key, None)
#
def getValueOrDefault(self, key):
value = getattr(self, key, None)
if value is None:
# 如果 value 为 None定位某个键 value 不为 None 就直接返回
field = self.__mappings__[key]
if field.default is not None:
# 如果 field.default 不是 None 就把它赋值给 value
value = field.default() if callable(field.default) else field.default
logging.debug('using default value for %s: %s' % (key,str(value)))
setattr(self, key, value)
return value
# *** 往 Model 类添加 class 方法,就可以让所有子类调用 class 方法
@classmethod
async def findAll(cls, where=None, args=None, **kw):
## find objects by where clause
sql = [cls.__select__]
# where 默认值为 None
# 如果 where 有值就在 sql 加上字符串 'where' 和 变量 where
if where:
sql.append('where')
sql.append(where)
if args is None:
# args 默认值为 None
# 如果 findAll 函数未传入有效的 where 参数,则将 '[]' 传入 args
args = []
orderBy = kw.get('orderBy', None)
if orderBy:
# get 可以返回 orderBy 的值,如果失败就返回 None ,这样失败也不会出错
# oederBy 有值时给 sql 加上它,为空值时什么也不干
sql.append('order by')
sql.append(orderBy)
# 开头和上面 orderBy 类似
limit = kw.get('limit', None)
if limit is not None:
sql.append('limit')
if isinstance(limit, int):
# 如果 limit 为整数
sql.append('?')
args.append(limit)
elif isinstance(limit, tuple) and len(limit) == 2:
# 如果 limit 是元组且里面只有两个元素
sql.append('?, ?')
# extend 把 limit 加到末尾
args.extend(limit)
else:
raise ValueError('Invalid limit value: %s' % str(limit))
rs = await select(' '.join(sql), args)
# 返回选择的列表里的所有值 ,完成 findAll 函数
return [cls(**r) for r in rs]
@classmethod
async def findNumber(cls, selectField, where=None, args=None):
## find number by select and where
#找到选中的数及其位置
sql = ['select %s _num_ from `%s`' % (selectField, cls.__table__)]
if where:
sql.append('where')
sql.append(where)
rs = await select(' '.join(sql), args, 1)
if len(rs) == 0:
# 如果 rs 内无元素,返回 None ;有元素就返回某个数
return None
return rs[0]['_num_']
@classmethod
async def find(cls, pk):
## find object by primary key
# 通过主键找对象
rs = await select('%s where `%s`=?' % (cls.__select__, cls.__primary_key__), [pk], 1)
if len(rs) == 0:
return None
return cls(**rs[0])
# *** 往 Model 类添加实例方法,就可以让所有子类调用实例方法
async def save(self):
args = list(map(self.getValueOrDefault, self.__fields__))
args.append(self.getValueOrDefault(self.__primary_key__))
rows = await execute(self.__insert__, args)
if rows != 1:
logging.warning('failed to insert record: affected rows: %s' % rows)
async def update(self):
args = list(map(self.getValue, self.__fields__))
args.append(self.getValue(self.__primary_key__))
rows = await execute(self.__update__, args)
if rows != 1:
logging.warning('failed to update by primary key: affected rows: %s' % rows)
async def remove(self):
args = [self.getValue(self.__primary_key__)]
rows = await execute(self.__delete__, args)
if rows != 1:
logging.warning('failed to remove by primary key: affected rows: %s' % rows)
# 定义 Field
class Field(object):
def __init__(self, name, column_type, primary_key, default):
self.name = name
self.column_type = column_type
self.primary_key = primary_key
self.default = default
def __str__(self):
return '<%s, %s:%s>' % (self.__class__.__name__, self.column_type, self.name)
# 定义 Field 子类及其子类的默认值
class StringField(Field):
def __init__(self, name=None, primary_key=False, default=None, ddl='varchar(100)'):
super().__init__(name, ddl, primary_key, default)
class BooleanField(Field):
def __init__(self, name=None, default=False):
super().__init__(name, 'boolean', False, default)
class IntegerField(Field):
def __init__(self, name=None, primary_key=False, default=0):
super().__init__(name, 'bigint', primary_key, default)
class FloatField(Field):
def __init__(self, name=None, primary_key=False, default=0):
super().__init__(name, 'real', primary_key,default)
class TextField(Field):
def __init__(self, name=None, default=None):
super().__init__(name, 'text', False, default)

13
www/templates/test.html Normal file
View File

@ -0,0 +1,13 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title>Test users - Moe Python Webapp</title>
</head>
<body>
<h1>FUCK</h1>
{% for u in users %}
<p>{{ u.name }} / {{ u.email }} /{{u.password}}</p>
{% endfor %}
</body>
</html>

22
www/test.py Normal file
View File

@ -0,0 +1,22 @@
import asyncio
import orm
from models import User
async def test(loop):
global i
i=1
await orm.create_pool(user='root', password='123456', db='demo',loop=loop)
a = User(name='Administrator', email='admin@example.com', password='123456', image='about:blank',id=i)
i+=1
x = User(name='gjj', email='gjj@example.com', password='123456', image='about:blank',id=i)
i += 1
t = User(name='roro', email='roro@example.com', password='123456789', image='about:blank',id=i)
await a.save()
await x.save()
await t.save()
loop = asyncio.get_event_loop()
loop.run_until_complete(test(loop))