Python實戰-編寫Web App-Day8-編寫API

API就是把Web App的功能全部封裝了,所以,通過API操作數據,可以極大地把前端和後端的代碼隔離,使得後端代碼易於測試,前端代碼編寫更簡單。一個API也是一個URL的處理函數,我們希望能直接通過一個@api來把函數變成JSON格式的REST API,這樣,獲取註冊用戶可以用一個API實現如下:

<code>@get('/api/users')
def api_get_users(*, page='1'):
    page_index = get_page_index(page)
    num = await User.findNumber('count(id)')
    p = Page(num, page_index)
    if num == 0:
        return dict(page=p, users=())
    users = await User.findAll(orderBy='created_at desc', limit=(p.offset, p.limit))
    for u in users:
        u.passwd = '******'
    return dict(page=p, users=users)/<code>

只要返回一個dict,後續的response這個middleware就可以把結果序列化為JSON並返回。

以下是所有網站所需要的後端API, 前端頁面URL的列表:

後端API包括:

獲取日誌:GET /api/blogs創建日誌:POST /api/blogs修改日誌:POST /api/blogs/:blog_id刪除日誌:POST /api/blogs/:blog_id/delete獲取評論:GET /api/comments創建評論:POST /api/blogs/:blog_id/comments刪除評論:POST /api/comments/:comment_id/delete創建新用戶:POST /api/users獲取用戶:GET /api/users管理頁面包括:

評論列表頁:GET /manage/comments日誌列表頁:GET /manage/blogs創建日誌頁:GET /manage/blogs/create修改日誌頁:GET /manage/blogs/用戶列表頁:GET /manage/users用戶瀏覽頁面包括:

註冊頁:GET /register登錄頁:GET /signin註銷頁:GET /signout首頁:GET /日誌詳情頁:GET /blog/:blog_id我們將處理這些API和URL的函數統一放在handlers.py中:

<code>#!/usr/bin/env python3
# -*- coding: utf-8 -*-
 
__author__ = 'Woodman Zhang'
 
import re, time, json, logging, hashlib, base64, asyncio
 
## markdown 是處理日誌文本的一種格式語法,具體語法使用請百度
import markdown
from aiohttp import web
from coroweb import get, post
 
## 分頁管理以及調取API時的錯誤信息
from apis import Page, APIValueError, APIResourceNotFoundError
from models import User, Comment, Blog, next_id
from config import configs
 
COOKIE_NAME = 'awesession'
_COOKIE_KEY = configs.session.secret
 
## 查看是否是管理員用戶
def check_admin(request):
    if request.__user__ is None or not request.__user__.admin:
        raise APIPermissionError()
 
## 獲取頁碼信息
def get_page_index(page_str):
    p = 1
    try:
        p = int(page_str)
    except ValueError as e:
        pass
    if p < 1:
        p = 1
    return p
 
## 計算加密cookie
def user2cookie(user, max_age):
    # build cookie string by: id-expires-sha1
    expires = str(int(time.time() + max_age))
    s = '%s-%s-%s-%s' % (user.id, user.passwd, expires, _COOKIE_KEY)
    L = [user.id, expires, hashlib.sha1(s.encode('utf-8')).hexdigest()]
    return '-'.join(L)
 
## 文本轉HTML
def text2html(text):
    lines = map(lambda s: '

%s

' % s.replace('&', '&').replace('', '>'), filter(lambda s: s.strip() != '', text.split('\n'))) return ''.join(lines) ## 解密cookie async def cookie2user(cookie_str): if not cookie_str: return None try: L = cookie_str.split('-') if len(L) != 3: return None uid, expires, sha1 = L if int(expires) < time.time(): return None user = await User.find(uid) if user is None: return None s = '%s-%s-%s-%s' % (uid, user.passwd, expires, _COOKIE_KEY) if sha1 != hashlib.sha1(s.encode('utf-8')).hexdigest(): logging.info('invalid sha1') return None user.passwd = '******' return user except Exception as e: logging.exception(e) return None ## 處理首頁URL @get('/') async def index(*, page='1'): page_index = get_page_index(page) num = await Blog.findNumber('count(id)') p = Page(num, page_index) if num == 0: blogs = [] else: blogs = await Blog.findAll(orderBy='created_at desc', limit=(p.offset, p.limit)) return { '__template__': 'blogs.html', 'page': p, 'blogs': blogs } ## 處理日誌詳情頁面URL @get('/blog/{id}') async def get_blog(id): blog = await Blog.find(id) comments = await Comment.findAll('blog_id=?', [id], orderBy='created_at desc') for c in comments: c.html_content = markdown.markdown(c.content) blog.html_content = markdown.markdown(blog.content) return { '__template__': 'blog.html', 'blog': blog, 'comments': comments } ## 處理註冊頁面URL @get('/register') def register(): return { '__template__': 'register.html' } ## 處理登錄頁面URL @get('/signin') def signin(): return { '__template__': 'signin.html' } ## 用戶登錄驗證API @post('/api/authenticate') async def authenticate(*, email, passwd): if not email: raise APIValueError('email', 'Invalid email.') if not passwd: raise APIValueError('passwd', 'Invalid password.') users = await User.findAll('email=?', [email]) if len(users) == 0: raise APIValueError('email', 'Email not exist.') user = users[0] # check passwd: sha1 = hashlib.sha1() sha1.update(user.id.encode('utf-8')) sha1.update(b':') sha1.update(passwd.encode('utf-8')) if user.passwd != sha1.hexdigest(): raise APIValueError('passwd', 'Invalid password.') # authenticate ok, set cookie: r = web.Response() r.set_cookie(COOKIE_NAME, user2cookie(user, 86400), max_age=86400, httponly=True) user.passwd = '******' r.content_type = 'application/json' r.body = json.dumps(user, ensure_ascii=False).encode('utf-8') return r ## 用戶註銷 @get('/signout') def signout(request): referer = request.headers.get('Referer') r = web.HTTPFound(referer or '/') r.set_cookie(COOKIE_NAME, '-deleted-', max_age=0, httponly=True) logging.info('user signed out.') return r ## 獲取管理頁面 @get('/manage/') def manage(): return 'redirect:/manage/comments' ## 評論管理頁面 @get('/manage/comments') def manage_comments(*, page='1'): return { '__template__': 'manage_comments.html', 'page_index': get_page_index(page) } ## 日誌管理頁面 @get('/manage/blogs') def manage_blogs(*, page='1'): return { '__template__': 'manage_blogs.html', 'page_index': get_page_index(page) } ## 創建日誌頁面 @get('/manage/blogs/create') def manage_create_blog(): return { '__template__': 'manage_blog_edit.html', 'id': '', 'action': '/api/blogs' } ## 編輯日誌頁面 @get('/manage/blogs/edit') def manage_edit_blog(*, id): return { '__template__': 'manage_blog_edit.html', 'id': id, 'action': '/api/blogs/%s' % id } ## 用戶管理頁面 @get('/manage/users') def manage_users(*, page='1'): return { '__template__': 'manage_users.html', 'page_index': get_page_index(page) } ## 獲取評論信息API @get('/api/comments') async def api_comments(*, page='1'): page_index = get_page_index(page) num = await Comment.findNumber('count(id)') p = Page(num, page_index) if num == 0: return dict(page=p, comments=()) comments = await Comment.findAll(orderBy='created_at desc', limit=(p.offset, p.limit)) return dict(page=p, comments=comments) ## 用戶發表評論API @post('/api/blogs/{id}/comments') async def api_create_comment(id, request, *, content): user = request.__user__ if user is None: raise APIPermissionError('Please signin first.') if not content or not content.strip(): raise APIValueError('content') blog = await Blog.find(id) if blog is None: raise APIResourceNotFoundError('Blog') comment = Comment(blog_id=blog.id, user_id=user.id, user_name=user.name, user_image=user.image, content=content.strip()) await comment.save() return comment ## 管理員刪除評論API @post('/api/comments/{id}/delete') async def api_delete_comments(id, request): check_admin(request) c = await Comment.find(id) if c is None: raise APIResourceNotFoundError('Comment') await c.remove() return dict(id=id) ## 獲取用戶信息API @get('/api/users') async def api_get_users(*, page='1'): page_index = get_page_index(page) num = await User.findNumber('count(id)') p = Page(num, page_index) if num == 0: return dict(page=p, users=()) users = await User.findAll(orderBy='created_at desc', limit=(p.offset, p.limit)) for u in users: u.passwd = '******' return dict(page=p, users=users) ## 定義EMAIL和HASH的格式規範 _RE_EMAIL = re.compile(r'^[a-z0-9\.\-\_]+\@[a-z0-9\-\_]+(\.[a-z0-9\-\_]+){1,4}#39;) _RE_SHA1 = re.compile(r'^[0-9a-f]{40}#39;) ## 用戶註冊API @post('/api/users') async def api_register_user(*, email, name, passwd): if not name or not name.strip(): raise APIValueError('name') if not email or not _RE_EMAIL.match(email): raise APIValueError('email') if not passwd or not _RE_SHA1.match(passwd): raise APIValueError('passwd') users = await User.findAll('email=?', [email]) if len(users) > 0: raise APIError('register:failed', 'email', 'Email is already in use.') uid = next_id() sha1_passwd = '%s:%s' % (uid, passwd) user = User(id=uid, name=name.strip(), email=email, passwd=hashlib.sha1(sha1_passwd.encode('utf-8')).hexdigest(), image='http://www.gravatar.com/avatar/%s?d=mm&s=120' % hashlib.md5(email.encode('utf-8')).hexdigest()) await user.save() # make session cookie: r = web.Response() r.set_cookie(COOKIE_NAME, user2cookie(user, 86400), max_age=86400, httponly=True) user.passwd = '******' r.content_type = 'application/json' r.body = json.dumps(user, ensure_ascii=False).encode('utf-8') return r ## 獲取日誌列表API @get('/api/blogs') async def api_blogs(*, page='1'): page_index = get_page_index(page) num = await Blog.findNumber('count(id)') p = Page(num, page_index) if num == 0: return dict(page=p, blogs=()) blogs = await Blog.findAll(orderBy='created_at desc', limit=(p.offset, p.limit)) return dict(page=p, blogs=blogs) ## 獲取日誌詳情API @get('/api/blogs/{id}') async def api_get_blog(*, id): blog = await Blog.find(id) return blog ## 發表日誌API @post('/api/blogs') async def api_create_blog(request, *, name, summary, content): check_admin(request) if not name or not name.strip(): raise APIValueError('name', 'name cannot be empty.') if not summary or not summary.strip(): raise APIValueError('summary', 'summary cannot be empty.') if not content or not content.strip(): raise APIValueError('content', 'content cannot be empty.') blog = Blog(user_id=request.__user__.id, user_name=request.__user__.name, user_image=request.__user__.image, name=name.strip(), summary=summary.strip(), content=content.strip()) await blog.save() return blog ## 編輯日誌API @post('/api/blogs/{id}') async def api_update_blog(id, request, *, name, summary, content): check_admin(request) blog = await Blog.find(id) if not name or not name.strip(): raise APIValueError('name', 'name cannot be empty.') if not summary or not summary.strip(): raise APIValueError('summary', 'summary cannot be empty.') if not content or not content.strip(): raise APIValueError('content', 'content cannot be empty.') blog.name = name.strip() blog.summary = summary.strip() blog.content = content.strip() await blog.update() return blog ## 刪除日誌API @post('/api/blogs/{id}/delete') async def api_delete_blog(request, *, id): check_admin(request) blog = await Blog.find(id) await blog.remove() return dict(id=id) ## 刪除用戶API @post('/api/users/{id}/delete') async def api_delete_users(id, request): check_admin(request) id_buff = id user = await User.find(id) if user is None: raise APIResourceNotFoundError('Comment') await user.remove() # 給被刪除的用戶在評論中標記 comments = await Comment.findAll('user_id=?',[id]) if comments: for comment in comments: id = comment.id c = await Comment.find(id) c.user_name = c.user_name + ' (該用戶已被刪除)' await c.update() id = id_buff return dict(id=id)/<code>


分享到:


相關文章: