6-315/www/app.py

161 lines
5.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from config import configs
import logging; logging.basicConfig(level=logging.INFO)
# logging是Python 的日志记录工具level表示设置根记录器级别去指定 level.
# 日志级别:
# CRITICAL 50
# ERROR 40
# WARNING 30
# INFO 20
# DEBUG 10
# NOTSET 0
# 导入 logging 模块并使用';'对其全局配置
# basicConfig 配置了 level 信息level 配置为 INFO 信息,即只输出 INFO 级别的信息
import asyncio, os, json, time
from datetime import datetime
from aiohttp import web
from jinja2 import Environment, FileSystemLoader
import orm
from coroweb import add_routes, add_static
def init_jinja2(app, **kw):
logging.info('init jinja2...')
options = dict(
autoescape = kw.get('autoescape', True),
block_start_string = kw.get('block_start_string', '{%'),
block_end_string = kw.get('block_end_string', '%}'),
variable_start_string = kw.get('variable_start_string', '{{'),
variable_end_string = kw.get('variable_end_string', '}}'),
auto_reload = kw.get('auto_reload', True)
)
path = kw.get('path', None)
if path is None:
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates')
logging.info('set jinja2 template path: %s' % path)
env = Environment(loader=FileSystemLoader(path), **options)
filters = kw.get('filters', None)
if filters is not None:
for name, f in filters.items():
env.filters[name] = f
app['__templating__'] = env
async def logger_factory(app, handler):
async def logger(request):
logging.info('Request: %s %s' % (request.method, request.path))
# await asyncio.sleep(0.3)
return (await handler(request))
return logger
async def data_factory(app, handler):
async def parse_data(request):
if request.method == 'POST':
if request.content_type.startswith('application/json'):
request.__data__ = await request.json()
logging.info('request json: %s' % str(request.__data__))
elif request.content_type.startswith('application/x-www-form-urlencoded'):
request.__data__ = await request.post()
logging.info('request form: %s' % str(request.__data__))
return (await handler(request))
return parse_data
async def response_factory(app, handler):
async def response(request):
logging.info('Response handler...')
r = await handler(request)
if isinstance(r, web.StreamResponse):
return r
if isinstance(r, bytes):
resp = web.Response(body=r)
resp.content_type = 'application/octet-stream'
return resp
if isinstance(r, str):
if r.startswith('redirect:'):
return web.HTTPFound(r[9:])
resp = web.Response(body=r.encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r, dict):
template = r.get('__template__')
if template is None:
resp = web.Response(body=json.dumps(r, ensure_ascii=False, default=lambda o: o.__dict__).encode('utf-8'))
resp.content_type = 'application/json;charset=utf-8'
return resp
else:
resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r, int) and r >= 100 and r < 600:
return web.Response(text=r)
if isinstance(r, tuple) and len(r) == 2:
t, m = r
if isinstance(t, int) and t >= 100 and t < 600:
return web.Response(text=t,body=str(m))
# default:
resp = web.Response(body=str(r).encode('utf-8'))
resp.content_type = 'text/plain;charset=utf-8'
return resp
return response
def datetime_filter(t):
delta = int(time.time() - t)
if delta < 60:
return u'1分钟前'
if delta < 3600:
return u'%s分钟前' % (delta // 60)
if delta < 86400:
return u'%s小时前' % (delta // 3600)
if delta < 604800:
return u'%s天前' % (delta // 86400)
dt = datetime.fromtimestamp(t)
return u'%s%s%s' % (dt.year, dt.month, dt.day)
# async def index(request):
# return web.Response(body=b'<h1>....vv</h1>', headers={'content-type': 'text/html'})
# def init():
# # 创建 web.Application ,web app的骨架
# app = web.Application()
# app.router.add_get('/', index)
# web.run_app(app, host='127.0.0.1', port=9000)
async def init(loop):
await orm.create_pool(loop=loop, host='127.0.0.1', port=3306, user='root', password='123456', db='demo')
app = web.Application(loop=loop, middlewares=[
logger_factory, response_factory
])
init_jinja2(app, filters=dict(datetime=datetime_filter))
add_routes(app, 'handlers')
add_static(app)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 9001)
logging.info('server started at http://127.0.0.1:9001...')
return srv
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()
# 新版写法,会报错。
# async def init_db(app):
# # If on Linux, use another user instead of 'root'
# await orm.create_pool(
# host=configs.db.host,
# port=configs.db.port,
# user=configs.db.user,
# password=configs.db.password,
# db=configs.db.db
#
# )
#
#
# app = web.Application(middlewares=[
# logger_factory,
# response_factory
# ])
# init_jinja2(app, filters=dict(datatime=datetime_filter))
# add_routes(app, 'handlers')
# add_static(app)
# app.on_startup.append(init_db)
# web.run_app(app, host='localhost', port=9000)