API就是把Web App的功能全部封裝了,所以,通過API操作數據,可以極大地把前端和後端的代碼隔離,使得後端代碼易於測試,前端代碼編寫更簡單。一個API也是一個URL的處理函數,我們希望能直接通過一個@api來把函數變成JSON格式的REST API,這樣,獲取註冊用戶可以用一個API實現如下:
<code>@get('/api/users') def api_get_users(*, page='1'): page_index = get_page_index(page) num = await User.findNumber('count(id)') p = Page(num, page_index) if num == 0: return dict(page=p, users=()) users = await User.findAll(orderBy='created_at desc', limit=(p.offset, p.limit)) for u in users: u.passwd = '******' return dict(page=p, users=users)/<code>
只要返回一個dict,後續的response這個middleware就可以把結果序列化為JSON並返回。
以下是所有網站所需要的後端API, 前端頁面URL的列表:
後端API包括:
獲取日誌:GET /api/blogs創建日誌:POST /api/blogs修改日誌:POST /api/blogs/:blog_id刪除日誌:POST /api/blogs/:blog_id/delete獲取評論:GET /api/comments創建評論:POST /api/blogs/:blog_id/comments刪除評論:POST /api/comments/:comment_id/delete創建新用戶:POST /api/users獲取用戶:GET /api/users管理頁面包括:
評論列表頁:GET /manage/comments日誌列表頁:GET /manage/blogs創建日誌頁:GET /manage/blogs/create修改日誌頁:GET /manage/blogs/用戶列表頁:GET /manage/users用戶瀏覽頁面包括:
註冊頁:GET /register登錄頁:GET /signin註銷頁:GET /signout首頁:GET /日誌詳情頁:GET /blog/:blog_id我們將處理這些API和URL的函數統一放在handlers.py中:
<code>#!/usr/bin/env python3 # -*- coding: utf-8 -*- __author__ = 'Woodman Zhang' import re, time, json, logging, hashlib, base64, asyncio ## markdown 是處理日誌文本的一種格式語法,具體語法使用請百度 import markdown from aiohttp import web from coroweb import get, post ## 分頁管理以及調取API時的錯誤信息 from apis import Page, APIValueError, APIResourceNotFoundError from models import User, Comment, Blog, next_id from config import configs COOKIE_NAME = 'awesession' _COOKIE_KEY = configs.session.secret ## 查看是否是管理員用戶 def check_admin(request): if request.__user__ is None or not request.__user__.admin: raise APIPermissionError() ## 獲取頁碼信息 def get_page_index(page_str): p = 1 try: p = int(page_str) except ValueError as e: pass if p < 1: p = 1 return p ## 計算加密cookie def user2cookie(user, max_age): # build cookie string by: id-expires-sha1 expires = str(int(time.time() + max_age)) s = '%s-%s-%s-%s' % (user.id, user.passwd, expires, _COOKIE_KEY) L = [user.id, expires, hashlib.sha1(s.encode('utf-8')).hexdigest()] return '-'.join(L) ## 文本轉HTML def text2html(text): lines = map(lambda s: '%s
' % s.replace('&', '&').replace('', '>'), filter(lambda s: s.strip() != '', text.split('\n'))) return ''.join(lines) ## 解密cookie async def cookie2user(cookie_str): if not cookie_str: return None try: L = cookie_str.split('-') if len(L) != 3: return None uid, expires, sha1 = L if int(expires) < time.time(): return None user = await User.find(uid) if user is None: return None s = '%s-%s-%s-%s' % (uid, user.passwd, expires, _COOKIE_KEY) if sha1 != hashlib.sha1(s.encode('utf-8')).hexdigest(): logging.info('invalid sha1') return None user.passwd = '******' return user except Exception as e: logging.exception(e) return None ## 處理首頁URL @get('/') async def index(*, page='1'): page_index = get_page_index(page) num = await Blog.findNumber('count(id)') p = Page(num, page_index) if num == 0: blogs = [] else: blogs = await Blog.findAll(orderBy='created_at desc', limit=(p.offset, p.limit)) return { '__template__': 'blogs.html', 'page': p, 'blogs': blogs } ## 處理日誌詳情頁面URL @get('/blog/{id}') async def get_blog(id): blog = await Blog.find(id) comments = await Comment.findAll('blog_id=?', [id], orderBy='created_at desc') for c in comments: c.html_content = markdown.markdown(c.content) blog.html_content = markdown.markdown(blog.content) return { '__template__': 'blog.html', 'blog': blog, 'comments': comments } ## 處理註冊頁面URL @get('/register') def register(): return { '__template__': 'register.html' } ## 處理登錄頁面URL @get('/signin') def signin(): return { '__template__': 'signin.html' } ## 用戶登錄驗證API @post('/api/authenticate') async def authenticate(*, email, passwd): if not email: raise APIValueError('email', 'Invalid email.') if not passwd: raise APIValueError('passwd', 'Invalid password.') users = await User.findAll('email=?', [email]) if len(users) == 0: raise APIValueError('email', 'Email not exist.') user = users[0] # check passwd: sha1 = hashlib.sha1() sha1.update(user.id.encode('utf-8')) sha1.update(b':') sha1.update(passwd.encode('utf-8')) if user.passwd != sha1.hexdigest(): raise APIValueError('passwd', 'Invalid password.') # authenticate ok, set cookie: r = web.Response() r.set_cookie(COOKIE_NAME, user2cookie(user, 86400), max_age=86400, httponly=True) user.passwd = '******' r.content_type = 'application/json' r.body = json.dumps(user, ensure_ascii=False).encode('utf-8') return r ## 用戶註銷 @get('/signout') def signout(request): referer = request.headers.get('Referer') r = web.HTTPFound(referer or '/') r.set_cookie(COOKIE_NAME, '-deleted-', max_age=0, httponly=True) logging.info('user signed out.') return r ## 獲取管理頁面 @get('/manage/') def manage(): return 'redirect:/manage/comments' ## 評論管理頁面 @get('/manage/comments') def manage_comments(*, page='1'): return { '__template__': 'manage_comments.html', 'page_index': get_page_index(page) } ## 日誌管理頁面 @get('/manage/blogs') def manage_blogs(*, page='1'): return { '__template__': 'manage_blogs.html', 'page_index': get_page_index(page) } ## 創建日誌頁面 @get('/manage/blogs/create') def manage_create_blog(): return { '__template__': 'manage_blog_edit.html', 'id': '', 'action': '/api/blogs' } ## 編輯日誌頁面 @get('/manage/blogs/edit') def manage_edit_blog(*, id): return { '__template__': 'manage_blog_edit.html', 'id': id, 'action': '/api/blogs/%s' % id } ## 用戶管理頁面 @get('/manage/users') def manage_users(*, page='1'): return { '__template__': 'manage_users.html', 'page_index': get_page_index(page) } ## 獲取評論信息API @get('/api/comments') async def api_comments(*, page='1'): page_index = get_page_index(page) num = await Comment.findNumber('count(id)') p = Page(num, page_index) if num == 0: return dict(page=p, comments=()) comments = await Comment.findAll(orderBy='created_at desc', limit=(p.offset, p.limit)) return dict(page=p, comments=comments) ## 用戶發表評論API @post('/api/blogs/{id}/comments') async def api_create_comment(id, request, *, content): user = request.__user__ if user is None: raise APIPermissionError('Please signin first.') if not content or not content.strip(): raise APIValueError('content') blog = await Blog.find(id) if blog is None: raise APIResourceNotFoundError('Blog') comment = Comment(blog_id=blog.id, user_id=user.id, user_name=user.name, user_image=user.image, content=content.strip()) await comment.save() return comment ## 管理員刪除評論API @post('/api/comments/{id}/delete') async def api_delete_comments(id, request): check_admin(request) c = await Comment.find(id) if c is None: raise APIResourceNotFoundError('Comment') await c.remove() return dict(id=id) ## 獲取用戶信息API @get('/api/users') async def api_get_users(*, page='1'): page_index = get_page_index(page) num = await User.findNumber('count(id)') p = Page(num, page_index) if num == 0: return dict(page=p, users=()) users = await User.findAll(orderBy='created_at desc', limit=(p.offset, p.limit)) for u in users: u.passwd = '******' return dict(page=p, users=users) ## 定義EMAIL和HASH的格式規範 _RE_EMAIL = re.compile(r'^[a-z0-9\.\-\_]+\@[a-z0-9\-\_]+(\.[a-z0-9\-\_]+){1,4}#39;) _RE_SHA1 = re.compile(r'^[0-9a-f]{40}#39;) ## 用戶註冊API @post('/api/users') async def api_register_user(*, email, name, passwd): if not name or not name.strip(): raise APIValueError('name') if not email or not _RE_EMAIL.match(email): raise APIValueError('email') if not passwd or not _RE_SHA1.match(passwd): raise APIValueError('passwd') users = await User.findAll('email=?', [email]) if len(users) > 0: raise APIError('register:failed', 'email', 'Email is already in use.') uid = next_id() sha1_passwd = '%s:%s' % (uid, passwd) user = User(id=uid, name=name.strip(), email=email, passwd=hashlib.sha1(sha1_passwd.encode('utf-8')).hexdigest(), image='http://www.gravatar.com/avatar/%s?d=mm&s=120' % hashlib.md5(email.encode('utf-8')).hexdigest()) await user.save() # make session cookie: r = web.Response() r.set_cookie(COOKIE_NAME, user2cookie(user, 86400), max_age=86400, httponly=True) user.passwd = '******' r.content_type = 'application/json' r.body = json.dumps(user, ensure_ascii=False).encode('utf-8') return r ## 獲取日誌列表API @get('/api/blogs') async def api_blogs(*, page='1'): page_index = get_page_index(page) num = await Blog.findNumber('count(id)') p = Page(num, page_index) if num == 0: return dict(page=p, blogs=()) blogs = await Blog.findAll(orderBy='created_at desc', limit=(p.offset, p.limit)) return dict(page=p, blogs=blogs) ## 獲取日誌詳情API @get('/api/blogs/{id}') async def api_get_blog(*, id): blog = await Blog.find(id) return blog ## 發表日誌API @post('/api/blogs') async def api_create_blog(request, *, name, summary, content): check_admin(request) if not name or not name.strip(): raise APIValueError('name', 'name cannot be empty.') if not summary or not summary.strip(): raise APIValueError('summary', 'summary cannot be empty.') if not content or not content.strip(): raise APIValueError('content', 'content cannot be empty.') blog = Blog(user_id=request.__user__.id, user_name=request.__user__.name, user_image=request.__user__.image, name=name.strip(), summary=summary.strip(), content=content.strip()) await blog.save() return blog ## 編輯日誌API @post('/api/blogs/{id}') async def api_update_blog(id, request, *, name, summary, content): check_admin(request) blog = await Blog.find(id) if not name or not name.strip(): raise APIValueError('name', 'name cannot be empty.') if not summary or not summary.strip(): raise APIValueError('summary', 'summary cannot be empty.') if not content or not content.strip(): raise APIValueError('content', 'content cannot be empty.') blog.name = name.strip() blog.summary = summary.strip() blog.content = content.strip() await blog.update() return blog ## 刪除日誌API @post('/api/blogs/{id}/delete') async def api_delete_blog(request, *, id): check_admin(request) blog = await Blog.find(id) await blog.remove() return dict(id=id) ## 刪除用戶API @post('/api/users/{id}/delete') async def api_delete_users(id, request): check_admin(request) id_buff = id user = await User.find(id) if user is None: raise APIResourceNotFoundError('Comment') await user.remove() # 給被刪除的用戶在評論中標記 comments = await Comment.findAll('user_id=?',[id]) if comments: for comment in comments: id = comment.id c = await Comment.find(id) c.user_name = c.user_name + ' (該用戶已被刪除)' await c.update() id = id_buff return dict(id=id)/<code>