
registrations table.secondary argument to the association table.# hello_db.py
import os
from flask import Flask
from flask_script import Manager
from flask_sqlalchemy import SQLAlchemy
basedir = os.path.abspath(os.path.dirname(__file__))
app = Flask(__name__)
manager = Manager(app)
app.config["SQLALCHEMY_DATABASE_URI"] =\
"sqlite:///" + os.path.join(basedir, "data-test-m2m.sqlite")
app.config["SQLALCHEMY_COMMIT_ON_TEARDOWN"] = True
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["SQLALCHEMY_ECHO"] = False
db = SQLAlchemy(app)
# ...
# hello_db.py
# ...
registrations = db.Table('registrations',
db.Column('student_id', db.Integer, db.ForeignKey('students.id')),
db.Column('class_id', db.Integer, db.ForeignKey('classes.id')))
class Student(db.Model):
__tablename__ = 'students'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String)
classes = db.relationship('Class', secondary=registrations,
backref=db.backref('students', lazy='dynamic'),
lazy='dynamic')
def __repr__(self):
return '<Student %r>' % self.name
class Class(db.Model):
__tablename__ = 'classes'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String)
def __repr__(self):
return '<Class %r>' % self.name
if __name__ == "__main__":
manager.run()
classes relationship uses list semantics.backref argument also has a lazy='dynamic' attribute, so both sides return a query that can accept additional filters.(venv) $ python hello_db.py shell
>>> from hello_db import db, Student, Class
>>> db.create_all()
>>> s = Student(name='Student_A')
>>> c = Class(name='Flask_Web')
>>> db.session.add_all([s, c])
>>> db.session.commit()
>>> s.classes.append(c)
>>> db.session.add(s)
>>> s.classes.all()
[<Class 'Flask_Web'>]
>>> c.students.all()
[<Student 'Student_A'>]
>>> s.classes.remove(c)
>>> s.classes.all()
[]
follows and each row represents a user following another user.
Follow model to represent the assication table.# app/models.py
class Follow(db.Model):
__tablename__ = 'follows'
follower_id = db.Column(db.Integer, db.ForeignKey('users.id'),
primary_key=True)
followed_id = db.Column(db.Integer, db.ForeignKey('users.id'),
primary_key=True)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
followed and followers relationships are defined as individual one-to-many relationships.followed: Users who are followed by this userfollowers: Users who follow this userforeign_keys optional argument.# app/models.py
class User(UserMixin, db.Model):
# ...
followed = db.relationship('Follow',
foreign_keys=[Follow.follower_id],
backref=db.backref('follower', lazy='joined'),
lazy='dynamic',
cascade='all, delete-orphan')
followers = db.relationship('Follow',
foreign_keys=[Follow.followed_id],
backref=db.backref('followed', lazy='joined'),
lazy='dynamic',
cascade='all, delete-orphan')
lazy argument¶Follow sidelazy='joined' causes the related object to be loaded immediatly from the join query.lazy is set to default value of select, then the follower and followed users are loaded lazily when they are first accessed and each attribute will require an individual query.User sidelazy argument is dynamic, so the relationship attributes return query objects instead of the items directly, so that additional filters can be added to the query.cascade argument¶cascade argument configures how actions performed on a parent object propagate to related objects.delete-orphan cascade option does.all, delete-orphan leaves the default cascade options enabled and adds the delete behavior for orphans.# app/models.py
class User(UserMixin, db.Model):
# ...
def follow(self, user):
if not self.is_following(user):
f = Follow(follower=self, followed=user)
self.followed.append(f)
def unfollow(self, user):
f = self.followed.filter_by(followed_id=user.id).first()
if f:
self.followed.remove(f)
def is_following(self, user):
if user.id is None:
return False
return self.followed.filter_by(followed_id=user.id).first() is not None
def is_followed_by(self, user):
if user.id is None:
return False
return self.followers.filter_by(follower_id=user.id).first() is not None
(venv) $ python manage.py db upgrade
# tests/test_user_model.py
from datetime import datetime
# ...
from app.models import User, Role, Permission, AnonymousUser, Follow
class UserModelTestCase(unittest.TestCase):
# ...
def test_follows(self):
u1 = User(email='john@example.com', password='cat')
u2 = User(email='susan@example.org', password='dog')
db.session.add(u1)
db.session.add(u2)
db.session.commit()
self.assertFalse(u1.is_following(u2))
self.assertFalse(u1.is_followed_by(u2))
timestamp_before = datetime.utcnow()
u1.follow(u2)
db.session.add(u1)
db.session.commit()
timestamp_after = datetime.utcnow()
self.assertTrue(u1.is_following(u2))
self.assertFalse(u1.is_followed_by(u2))
self.assertTrue(u2.is_followed_by(u1))
self.assertTrue(u1.followed.count() == 1)
self.assertTrue(u2.followers.count() == 1)
# ...
# tests/test_user_model.py
class UserModelTestCase(unittest.TestCase):
# ...
def test_follows(self):
# ...
f = u1.followed.all()[-1]
self.assertTrue(f.followed == u2)
self.assertTrue(timestamp_before <= f.timestamp <= timestamp_after)
f = u2.followers.all()[-1]
self.assertTrue(f.follower == u1)
u1.unfollow(u2)
db.session.add(u1)
db.session.commit()
self.assertTrue(u1.followed.count() == 0)
self.assertTrue(u2.followers.count() == 0)
self.assertTrue(Follow.query.count() == 0)
u2.follow(u1)
db.session.add(u2)
db.session.commit()
db.session.delete(u2)
db.session.commit()
self.assertTrue(Follow.query.count() == 0)

<!-- app/templates/user.html -->
{% block page_content %}
...
<div class="profile-header">
...
{% if user.about_me %}<p>{{ user.about_me }}</p>{% endif %}
<p>
Member since {{ moment(user.member_since).format('L') }}.
Last seen {{ moment(user.last_seen).fromNow() }}.
</p>
<p>{{ user.posts.count() }} blog posts.</p>
<p>
{% if current_user.can(Permission.FOLLOW) and user != current_user %}
{% if not current_user.is_following(user) %}
<a href="{{ url_for('.follow', username=user.username) }}" class="btn btn-primary">Follow</a>
{% else %}
<a href="{{ url_for('.unfollow', username=user.username) }}" class="btn btn-default">Unfollow</a>
{% endif %}
{% endif %}
<a href="{{ url_for('.followers', username=user.username) }}">Followers: <span class="badge">{{ user.followers.count() }}</span></a>
<a href="{{ url_for('.followed_by', username=user.username) }}">Following: <span class="badge">{{ user.followed.count() }}</span></a>
{% if current_user.is_authenticated and user != current_user and user.is_following(current_user) %}
| <span class="label label-default">Follows you</span>
{% endif %}
</p>
...
# app/main/views.py
@main.route('/follow/<username>')
@login_required
@permission_required(Permission.FOLLOW)
def follow(username):
user = User.query.filter_by(username=username).first()
if user is None:
flash('Invalid user.')
return redirect(url_for('.index'))
if current_user.is_following(user):
flash('You are already following this user.')
return redirect(url_for('.user', username=username))
current_user.follow(user)
flash('You are now following %s.' % username)
return redirect(url_for('.user_profile', username=username))
# app/main/views.py
@main.route('/unfollow/<username>')
@login_required
@permission_required(Permission.FOLLOW)
def unfollow(username):
user = User.query.filter_by(username=username).first()
if user is None:
flash('Invalid user.')
return redirect(url_for('.index'))
if not current_user.is_following(user):
flash('You are not following this user.')
return redirect(url_for('.user', username=username))
current_user.unfollow(user)
flash('You are not following %s anymore.' % username)
return redirect(url_for('.user_profile', username=username))
# app/main/views.py
@main.route('/followers/<username>')
def followers(username):
user = User.query.filter_by(username=username).first()
if user is None:
flash('Invalid user.')
return redirect(url_for('.index'))
page = request.args.get('page', 1, type=int)
pagination = user.followers.paginate(
page, per_page=current_app.config['FLASKY_FOLLOWERS_PER_PAGE'],
error_out=False)
follows = [{'user': item.follower, 'timestamp': item.timestamp}
for item in pagination.items]
return render_template('followers.html', user=user, title="Followers of",
endpoint='.followers', pagination=pagination,
follows=follows)
# app/main/views.py
@main.route('/followed-by/<username>')
def followed_by(username):
user = User.query.filter_by(username=username).first()
if user is None:
flash('Invalid user.')
return redirect(url_for('.index'))
page = request.args.get('page', 1, type=int)
pagination = user.followed.paginate(
page, per_page=current_app.config['FLASKY_FOLLOWERS_PER_PAGE'],
error_out=False)
follows = [{'user': item.followed, 'timestamp': item.timestamp}
for item in pagination.items]
return render_template('followers.html', user=user, title="Followed by",
endpoint='.followed_by', pagination=pagination,
follows=follows)
FLASKY_FOLLOWERS_PER_PAGE to config.py¶# config.py
class Config(object):
# ...
FLASKY_FOLLOWERS_PER_PAGE = 50
followers.html¶followers and followed_by use the same tempalte followers.html.<!-- app/templates/followers.html -->
{% extends "base.html" %}
{% import "_macros.html" as macros %}
{% block title %}Flasky - {{ title }} {{ user.username }}{% endblock %}
{% block page_content %}
<div class="page-header">
<h1>{{ title }} {{ user.username }}</h1>
</div>
<table class="table table-hover followers">
<thead><tr><th>User</th><th>Since</th></tr></thead>
{% for follow in follows %}
<tr>
<td>
<a href="{{ url_for('.user_profile', username = follow.user.username) }}">
<img class="img-rounded" src="{{ follow.user.gravatar(size=32) }}">
{{ follow.user.username }}
</a>
</td>
<td>{{ moment(follow.timestamp).format('L') }}</td>
</tr>
{% endfor %}
</table>
<div class="pagination">
{{ macros.pagination_widget(pagination, endpoint, username = user.username) }}
</div>
{% endblock %}




$ git clone https://github.com/win911/flask_class.git$ git checkout 12a$ python manage.py db upgradejoin operation takes two or more tables and finds all the combination of rows that satisfy a given condition.join operation¶



return db.session.query(Post).select_from(Follow).\
filter_by(follower_id=self.id).\
join(Post, Follow.followed_id == Post.author_id)
db.session.query(Post): a query that returns Post objectsselect_from(Follow): the query begins with the Follow modelfilter_by(follower_id=self.id): performs the filtering of the follows table by the follower userjoin(Post, Follow.followed_id == Post.author_id): joins the results of filter_by() with the Post objects.return Post.query.join(Follow, Follow.followed_id == Post.author_id).\
filter(Follow.follower_id == self.id)
followed_posts() method is defined as a property so that it does not need the (). # app/models.py
class User(UserMixin, db.Model):
# ...
@property
def followed_posts(self):
return Post.query.join(Follow, Follow.followed_id == Post.author_id)\
.filter(Follow.follower_id == self.id)
show_followed.Post.query is used, and the User.followed_posts property is used when the list should be restricted to followers.# app/main/views.py
@main.route('/', methods=['GET', 'POST'])
def index():
form = PostForm()
if current_user.can(Permission.WRITE_ARTICLES) and form.validate_on_submit():
post = Post(body=form.body.data,
author=current_user._get_current_object())
db.session.add(post)
return redirect(url_for('.index'))
page = request.args.get('page', 1, type=int)
show_followed = False
if current_user.is_authenticated:
show_followed = bool(request.cookies.get('show_followed', ''))
if show_followed:
query = current_user.followed_posts
else:
query = Post.query
# ...
# app/main/views.py
@main.route('/', methods=['GET', 'POST'])
def index():
# ...
pagination = query.order_by(
Post.timestamp.desc()).paginate(page,
per_page=current_app.config['FLASKY_POSTS_PER_PAGE'], error_out=False)
posts = pagination.items
return render_template('index.html', form=form,
posts=posts, pagination=pagination)
show_followed cookie is set in two new routes.make_response().set_cookie() takes the cookie name and the value as the first two arguments. max_age optional argument sets the number of seconds until the cookie expires.# app/main/views.py
from flask import render_template, abort, flash, redirect, url_for
from flask import request, current_app, make_response
# ...
@main.route('/all')
@login_required
def show_all():
resp = make_response(redirect(url_for('.index')))
resp.set_cookie('show_followed', '', max_age=30*24*60*60)
return resp
@main.route('/followed')
@login_required
def show_followed():
resp = make_response(redirect(url_for('.index')))
resp.set_cookie('show_followed', '1', max_age=30*24*60*60)
return resp
<!-- app/templates/index.html -->
...
{% block page_content %}
...
<div>
{% if current_user.can(Permission.WRITE_ARTICLES) %}
{{ wtf.quick_form(form) }}
{% endif %}
</div>
<div class="post-tabs">
<ul class="nav nav-tabs">
<li{% if not show_followed %} class="active"{% endif %}><a href="{{ url_for('.show_all') }}">All</a></li>
{% if current_user.is_authenticated %}
<li{% if show_followed %} class="active"{% endif %}><a href="{{ url_for('.show_followed') }}">Followers</a></li>
{% endif %}
</ul>
{% include '_posts.html' %}
</div>
{% if pagination %}
<div class="pagination">
{{ macros.pagination_widget(pagination, '.index') }}
</div>
{% endif %}
{% endblock %}

$ git clone https://github.com/win911/flask_class.git$ git checkout 12b# app/models.py
class User(UserMixin, db.Model):
#...
def __init__(self, **kwargs):
#...
self.follow(self)
# app/models.py
class User(UserMixin, db.Model):
# ...
@staticmethod
def add_self_follows():
for user in User.query.all():
if not user.is_following(user):
user.follow(user)
db.session.add(user)
db.session.commit()
(venv) $ python manage.py shell
>>> User.add_self_follows()

user.html¶<!-- app/templates/user.html -->
...
{% block page_content %}
...
<a href="{{ url_for('.followers', username=user.username) }}">Followers:
<span class="badge">{{ user.followers.count() - 1 }}</span></a>
<a href="{{ url_for('.followed_by', username=user.username) }}">Following:
<span class="badge">{{ user.followed.count() - 1 }}</span></a>
...
<!-- app/templates/followers.html -->
{% extends "base.html" %}
{% import "_macros.html" as macros %}
{% block title %}Flasky - {{ title }} {{ user.username }}{% endblock %}
{% block page_content %}
<div class="page-header">
<h1>{{ title }} {{ user.username }}</h1>
</div>
<table class="table table-hover followers">
<thead><tr><th>User</th><th>Since</th></tr></thead>
{% for follow in follows %}
{% if follow.user != user %} <!-- new -->
<tr>
<td>
<a href="{{ url_for('.user_profile', username = follow.user.username) }}">
<img class="img-rounded" src="{{ follow.user.gravatar(size=32) }}">
{{ follow.user.username }}
</a>
</td>
<td>{{ moment(follow.timestamp).format('L') }}</td>
</tr>
{% endif %} <!-- new -->
{% endfor %}
</table>
<div class="pagination">
{{ macros.pagination_widget(pagination, endpoint, username = user.username) }}
</div>
{% endblock %}
# tests/test_user_model.py
from datetime import datetime
# ...
from app.models import User, Role, Permission, AnonymousUser, Follow
class UserModelTestCase(unittest.TestCase):
# ...
def test_follows(self):
u1 = User(email='john@example.com', password='cat')
u2 = User(email='susan@example.org', password='dog')
db.session.add(u1)
db.session.add(u2)
db.session.commit()
self.assertFalse(u1.is_following(u2))
self.assertFalse(u1.is_followed_by(u2))
timestamp_before = datetime.utcnow()
u1.follow(u2)
db.session.add(u1)
db.session.commit()
timestamp_after = datetime.utcnow()
self.assertTrue(u1.is_following(u2))
self.assertFalse(u1.is_followed_by(u2))
self.assertTrue(u2.is_followed_by(u1))
self.assertTrue(u1.followed.count() == 2) # modified
self.assertTrue(u2.followers.count() == 2) # modified
# ...
# tests/test_user_model.py
class UserModelTestCase(unittest.TestCase):
# ...
def test_follows(self):
# ...
f = u1.followed.all()[-1]
self.assertTrue(f.followed == u2)
self.assertTrue(timestamp_before <= f.timestamp <= timestamp_after)
f = u2.followers.all()[-1]
self.assertTrue(f.follower == u1)
u1.unfollow(u2)
db.session.add(u1)
db.session.commit()
self.assertTrue(u1.followed.count() == 1) # modified
self.assertTrue(u2.followers.count() == 1) # modified
self.assertTrue(Follow.query.count() == 2) # modified
u2.follow(u1)
db.session.add(u2)
db.session.commit()
db.session.delete(u2)
db.session.commit()
self.assertTrue(Follow.query.count() == 1) # modified
$ git clone https://github.com/win911/flask_class.git$ git checkout 12cposts table is defined. comments table is also in a one-to-many relationship with the users table.Comment model are almost the same as those of Post. disabled field: a Boolean that will be used by moderators to suppress comments that are inappropriate or offensive.
# app/models.py
class Comment(db.Model):
__tablename__ = 'comments'
id = db.Column(db.Integer, primary_key=True)
body = db.Column(db.Text)
body_html = db.Column(db.Text)
timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow)
disabled = db.Column(db.Boolean)
author_id = db.Column(db.Integer, db.ForeignKey('users.id'))
post_id = db.Column(db.Integer, db.ForeignKey('posts.id'))
@staticmethod
def on_changed_body(target, value, oldvalue, initiator):
allowed_tags = ['a', 'abbr', 'acronym', 'b', 'code', 'em', 'i',
'strong']
target.body_html = bleach.linkify(bleach.clean(
markdown(value, output_format='html'),
tags=allowed_tags, strip=True))
db.event.listen(Comment.body, 'set', Comment.on_changed_body)
# app/models.py
class Post(db.Model):
# ...
comments = db.relationship('Comment', backref='post', lazy='dynamic')
class User(UserMixin, db.Model):
# ...
comments = db.relationship('Comment', backref='author', lazy='dynamic')
(venv) $ python manage.py db upgrade
# app/main/forms.py
class CommentForm(FlaskForm):
body = StringField('', validators=[Required()])
submit = SubmitField('Submit')
post route to support comments¶url_for() function sets the page to -1, a special page number that is used to request the last page of comments so that the comment just entered is seen on the page. # app/main/views.py
#...
from .forms import EditProfileForm, EditProfileAdminForm, PostForm, CommentForm
from ..models import Permission, User, Role, Post, Comment
@main.route('/post/<int:id>', methods=['GET', 'POST'])
def post(id):
post = Post.query.get_or_404(id)
form = CommentForm()
if form.validate_on_submit():
comment = Comment(body=form.body.data,
post=post,
author=current_user._get_current_object())
db.session.add(comment)
db.session.commit()
flash('Your comment has been published.')
return redirect(url_for('.post', id=post.id, page=-1))
page = request.args.get('page', 1, type=int)
if page == -1:
page = (post.comments.count() - 1) // \
current_app.config['FLASKY_COMMENTS_PER_PAGE'] + 1
pagination = post.comments.order_by(Comment.timestamp.asc()).paginate(
page, per_page=current_app.config['FLASKY_COMMENTS_PER_PAGE'],
error_out=False)
comments = pagination.items
return render_template('post.html', posts=[post], form=form,
comments=comments, pagination=pagination)
FLASKY_COMMENTS_PER_PAGE to config.py¶# config.py
class Config(object):
#...
FLASKY_COMMENTS_PER_PAGE = 30
_comments.html¶_comments.html that is similar to _posts.html.<!-- app/templates/_comments.html -->
<ul class="posts">
{% for comment in comments %}
<li class="post">
<div class="post-thumbnail">
<a href="{{ url_for('.user_profile', username=comment.author.username) }}">
<img class="img-rounded profile-thumbnail" src="{{ comment.author.gravatar(size=40) }}">
</a>
</div>
<div class="post-content">
<div class="post-date">{{ moment(comment.timestamp).fromNow() }}</div>
<div class="post-author"><a href="{{ url_for('.user_profile', username=comment.author.username) }}">{{ comment.author.username }}</a></div>
<div class="post-body">
{% if comment.body_html %}
{{ comment.body_html | safe }}
{% else %}
{{ comment.body }}
{% endif %}
</div>
</div>
</li>
{% endfor %}
</ul>
post.html¶_comments.html is included by post.html below the body of the post, followed by a call to the pagination macro.<!-- app/templates/post.html -->
{% extends "base.html" %}
{% import "_macros.html" as macros %}
{% import "bootstrap/wtf.html" as wtf %}
{% block title %}Flasky - Post{% endblock %}
{% block page_content %}
{% include '_posts.html' %}
<h4 id="comments">Comments</h4>
{% if current_user.can(Permission.COMMENT) %}
<div class="comment-form">
{{ wtf.quick_form(form) }}
</div>
{% endif %}
{% include '_comments.html' %}
{% if pagination %}
<div class="pagination">
{{ macros.pagination_widget(pagination, '.post', fragment='#comments',
id=posts[0].id) }}
</div>
{% endif %}
{% endblock %}
#comments: it is an URL fragment and is used to indicate an initial scroll position for the page.post.html template, which is written as <h4 id="comments">Comments<h4>.<!-- app/templates/_post.html -->
<ul class="posts">
...
<div class="post-content">
...
<div class="post-footer">
...
{% endif %}
<a href="{{ url_for('.post', id=post.id) }}">
<span class="label label-default">Permalink</span>
</a>
<a href="{{ url_for('.post', id=post.id) }}#comments">
<span class="label label-primary">{{ post.comments.count() }} Comments</span>
</a>
...
#comments fragment added.<!-- app/tempaltes/_macros.html -->
{% macro pagination_widget(pagination, endpoint, fragment='') %}
...
{% for p in pagination.iter_pages() %}
{% if p %}
{% if p == pagination.page %}
<li class="active">
<a href="{{ url_for(endpoint, page = p, **kwargs) }}{{ fragment }}">{{ p }}</a>
</li>
{% else %}
<li>
<a href="{{ url_for(endpoint, page = p, **kwargs) }}{{ fragment }}">{{ p }}</a>
</li>
{% endif %}


$ git clone https://github.com/win911/flask_class.git$ git checkout 13a$ python manage.py db upgradePermission.MODERATE_COMMENTS, which gives users who have it in their roles the power to moderate comments made by others.<!-- app/templates/base.html -->
...
<div class="navbar-collapse collapse">
<ul class="nav navbar-nav">
...
{% if current_user.can(Permission.MODERATE_COMMENTS) %}
<li><a href="{{ url_for('main.moderate') }}">Moderate Comments</a></li>
{% endif %}
</ul>
...
# app/main/views.py
@main.route('/moderate')
@login_required
@permission_required(Permission.MODERATE_COMMENTS)
def moderate():
page = request.args.get('page', 1, type=int)
pagination = Comment.query.order_by(Comment.timestamp.desc()).paginate(
page, per_page=current_app.config['FLASKY_COMMENTS_PER_PAGE'],
error_out=False)
comments = pagination.items
return render_template('moderate.html', comments=comments,
pagination=pagination, page=page)
moderate.html¶set directive to define a moderate template variable. _comments.html template to determine whether the moderation features need to be rendered.<!-- app/templates/moderate.html -->
{% extends "base.html" %}
{% import "_macros.html" as macros %}
{% block title %}Flasky - Comment Moderation{% endblock %}
{% block page_content %}
<div class="page-header"> <h1>Comment Moderation</h1></div>
{% set moderate = True %}
{% include '_comments.html' %}
{% if pagination %}
<div class="pagination">
{{ macros.pagination_widget(pagination, '.moderate') }}
</div>
{% endif %}
{% endblock %}
_comments.html¶<!-- app/templates/_comments.html -->
...
<div class="post-content">
<div class="post-date">{{ moment(comment.timestamp).fromNow() }}</div>
<div class="post-author"><a href="{{ url_for('.user_profile', username=comment.author.username) }}">{{ comment.author.username }}</a></div>
<div class="post-body">
{% if comment.disabled %}
<p><i>This comment has been disabled by a moderator.</i></p>
{% endif %}
{% if moderate or not comment.disabled %}
{% if comment.body_html %}
{{ comment.body_html | safe }}
{% else %}
{{ comment.body }}
{% endif %}
{% endif %}
</div>
{% if moderate %}
<br>
{% if comment.disabled %}
<a class="btn btn-default btn-xs"
href="{{ url_for('.moderate_enable', id=comment.id, page=page) }}">Enable</a>
{% else %}
<a class="btn btn-danger btn-xs"
href="{{ url_for('.moderate_disable', id=comment.id, page=page) }}">Disable</a>
{% endif %}
{% endif %}
</div>
...
disabled field to the proper value, and write it back to the database.# app/main/views.py
@main.route('/moderate/enable/<int:id>')
@login_required
@permission_required(Permission.MODERATE_COMMENTS)
def moderate_enable(id):
comment = Comment.query.get_or_404(id)
comment.disabled = False
db.session.add(comment)
return redirect(url_for('.moderate', page=request.args.get('page', 1, type=int)))
@main.route('/moderate/disable/<int:id>')
@login_required
@permission_required(Permission.MODERATE_COMMENTS)
def moderate_disable(id):
comment = Comment.query.get_or_404(id)
comment.disabled = True
db.session.add(comment)
return redirect(url_for('.moderate', page=request.args.get('page', 1, type=int)))



$ git clone https://github.com/win911/flask_class.git$ git checkout 13b| Request method | Target | Description | HTTP status code |
|---|---|---|---|
| GET | Individual resource URL | Obtain the resource. | 200 |
| GET | Resource collection URL | Obtain the collection of resources (or one page from it if the server implements pagination). | 200 |
| POST | Resource collection URL | Createa new resource and add it to the collection. The server chooses the URL of the new resource and returns it in a Location header in the response. | 201 |
| Request method | Target | Description | HTTP status code |
|---|---|---|---|
| PUT | Individual resource URL | Modify an existing resource. Alternatively this method can also be used to create a new resource when the client can choose the resource URL. | 200 |
| DELETE | Individual resource URL | Delete a resource. | 200 |
| DELETE | Resource collection URL | Delete all resources in the collection. | 200 |
| -flasky/
| -app/
| -api_1_0/
| -__init__.py
| -users.py
| -posts.py
| -comments.py
| -authentication.py
| -errors.py
| -decorators.py
# app/api_1_0/__init__.py
from flask import Blueprint
api = Blueprint('api', __name__)
from . import authentication, posts, users, comments, errors
# app/_init_.py
# ...
def create_app(config_name):
# ...
from .api_1_0 import api as api_1_0_blueprint
app.register_blueprint(api_1_0_blueprint, url_prefix='/api/v1.0')
# ...
| HTTP status code | Name | Description |
|---|---|---|
| 200 | OK | The request was completed successfully. |
| 201 | Created | The request was completed successfully and a new resource was created as a result. |
| 400 | Bad request | The request is invalid or inconsistent. |
| 401 | Unauthorized | The request does not include authentication information. |
| 403 | Forbidden | The authentication credentials sent with the request are insufficient for the request. |
| 404 | Not found | The resource referenced in the URL was not found. |
| 405 | Method not allowed | The request method requested is not supported for the given resource. |
| 500 | Internal server error | An unexpected error has occurred while processing the request. |
# app/api_1_0/errors.py
from flask import jsonify, request, render_template
from . import api
@api.app_errorhandler(404)
def page_not_found(e):
if request.accept_mimetypes.accept_json and not request.accept_mimetypes.accept_html:
response = jsonify({'error': 'not found'})
response.status_code = 404
return response
return render_template('404.html'), 404
@api.app_errorhandler(500)
def internal_server_error(e):
if request.accept_mimetypes.accept_json and not request.accept_mimetypes.accept_html:
response = jsonify({'error': 'internal server error'})
response.status_code = 500
return response
return render_template('500.html'), 500
# app/api_1_0/errors.py
# ...
def unauthorized(message):
response = jsonify({'error': 'unauthorized', 'message': message})
response.status_code = 401
return response
def forbidden(message):
response = jsonify({'error': 'forbidden', 'message': message})
response.status_code = 403
return response
Authorization header with all requestsFlask-HTTPAuth extension provides a convenient wrapper that hides the protocol details in a decorator similar to Flask-Login’s login_required(venv) $ pip install flask-httpauth
# app/api_1_0/authentication.py
from flask import g
from flask_httpauth import HTTPBasicAuth
from .errors import unauthorized
from ..models import User
auth = HTTPBasicAuth()
@auth.verify_password
def verify_password(email, password):
if email == '':
return False
user = User.query.filter_by(email = email).first()
if not user:
return False
g.current_user = user
return user.verify_password(password)
@auth.error_handler
def auth_error():
return unauthorized('Invalid credentials')
# app/api_1_0/authentication.py
# ...
from . import api
from .errors import unauthorized, forbidden
# ...
@api.before_request
@auth.login_required
def before_request():
if not g.current_user.is_anonymous and not g.current_user.confirmed:
return forbidden('Unconfirmed account')
# app/models.py
class User(db.Model):
# ...
def generate_auth_token(self, expiration):
s = Serializer(current_app.config['SECRET_KEY'], expires_in=expiration)
return s.dumps({'id': self.id}).decode('utf-8')
@staticmethod
def verify_auth_token(token):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return None
return User.query.get(data['id'])
# app/api_1_0/authentication.py
# ...
@auth.verify_password
def verify_password(email_or_token, password):
if email_or_token == '':
return False
if password == '':
g.current_user = User.verify_auth_token(email_or_token)
g.token_used = True
return g.current_user is not None
user = User.query.filter_by(email=email_or_token).first()
if not user:
return False
g.current_user = user
g.token_used = False
return user.verify_password(password)
# app/api_1_0/authentication.py
from flask import g, jsonify
# ...
@api.route('/token')
def get_token():
if g.current_user.is_anonymous or g.token_used:
return unauthorized('Invalid credentials')
return jsonify({'token': g.current_user.generate_auth_token(expiration=3600), 'expiration': 3600})
# app/models.py
# ...
from flask import current_app, request, url_for
# ...
class Post(db.Model):
# ...
def to_json(self):
json_post = {
'url': url_for('api.get_post', id=self.id, _external=True),
'body': self.body,
'body_html': self.body_html,
'timestamp': self.timestamp,
'author': url_for('api.get_user', id=self.author_id, _external=True),
'comments': url_for('api.get_post_comments', id=self.id, _external=True),
'comment_count': self.comments.count()
}
return json_post
# ...
# app/models.py
# ...
class User(UserMixin, db.Model):
# ...
def to_json(self):
json_user = {
'url': url_for('api.get_post', id=self.id, _external=True),
'username': self.username,
'member_since': self.member_since,
'last_seen': self.last_seen,
'posts': url_for('api.get_user_posts', id=self.id, _external=True),
'followed_posts': url_for('api.get_user_followed_posts', id=self.id, _external=True),
'post_count': self.posts.count()
}
return json_user
# ...
# app/models.py
# ...
from .exceptions import ValidationError
# ...
class Post(db.Model):
# ...
@staticmethod
def from_json(json_post):
body = json_post.get('body')
if body is None or body == '':
raise ValidationError('post does not have a body')
return Post(body=body)
# ...
# app/exceptions.py
class ValidationError(ValueError):
pass
# app/api_1_0/errors.py
# ...
from app.exceptions import ValidationError
# ...
def bad_request(message):
response = jsonify({'error': 'bad request', 'message': message})
response.status_code = 400
return response
@api.errorhandler(ValidationError)
def validation_error(e):
return bad_request(e.args[0])
# app/api_1_0/posts.py
from flask import jsonify
from . import api
from .authentication import auth
from ..models import Post
@api.route('/posts/')
@auth.login_required
def get_posts():
posts = Post.query.all()
return jsonify({'posts': [post.to_json() for post in posts]})
@api.route('/posts/<int:id>')
@auth.login_required
def get_post(id):
post = Post.query.get_or_404(id)
return jsonify(post.to_json())
# app/api_1_0/posts.py
from flask import jsonify, request, g, url_for
from . import api
from .authentication import auth
from .decorators import permission_required
from .. import db
from ..models import Post
# ...
@api.route('/posts/', methods=['POST'])
@permission_required(Permission.WRITE_ARTICLES)
def new_post():
post = Post.from_json(request.json)
post.author = g.current_user
db.session.add(post)
db.session.commit()
return jsonify(post.to_json()), 201, \
{'Location': url_for('api.get_post', id=post.id, _external=True)}
# app/api_1_0/decorators.py
from functools import wraps
from flask import g
from .errors import forbidden
def permission_required(permission):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not g.current_user.can(permission):
return forbidden('Insufficient permissions')
return f(*args, **kwargs)
return decorated_function
return decorator
# app/api_1_0/posts.py
# ...
from .errors import forbidden
from ..models import Post, Permission
# ...
@api.route('/posts/<int:id>', methods=['PUT'])
@permission_required(Permission.WRITE_ARTICLES)
def edit_post(id):
post = Post.query.get_or_404(id)
if g.current_user != post.author and not g.current_user.can(Permission.ADMINISTER):
return forbidden('Insufficient permissions')
post.body = request.json.get('body', post.body)
db.session.add(post)
return jsonify(post.to_json())
# app/api_1_0/posts.py
from flask import jsonify, request, g, url_for, current_app
# ...
@api.route('/posts/')
def get_posts():
page = request.args.get('page', 1, type=int)
pagination = Post.query.paginate(
page, per_page=current_app.config['FLASKY_POSTS_PER_PAGE'],
error_out=False)
posts = pagination.items
prev = None
if pagination.has_prev:
prev = url_for('api.get_posts', page=page-1, _external=True)
next = None
if pagination.has_next:
next = url_for('api.get_posts', page=page+1, _external=True)
return jsonify({
'posts': [post.to_json() for post in posts],
'prev': prev,
'next': next,
'count': pagination.total
})
# ...
# app/api_1_0/users.py
from . import api
@api.route('/users/<int:id>')
def get_user(id):
# TODO
pass
@api.route('/users/<int:id>/posts/')
def get_user_posts(id):
# TODO
pass
@api.route('/users/<int:id>/timeline/')
def get_user_followed_posts(id):
# TODO
pass
# app/api_1_0/comments.py
from . import api
from .decorators import permission_required
from ..models import Permission
@api.route('/comments/')
def get_comments():
# TODO
pass
@api.route('/comments/<int:id>')
def get_comment(id):
# TODO
pass
@api.route('/posts/<int:id>/comments/')
def get_post_comments(id):
# TODO
pass
@api.route('/posts/<int:id>/comments/', methods=['POST'])
@permission_required(Permission.COMMENT)
def new_post_comment(id):
# TODO
pass
(venv) $ pip install httpie
(venv) $ http --auth maomao@pyladies.com:123456 GET http://127.0.0.1:5000/api/v1.0/posts/


(venv) $ http --auth maomao@pyladies.com:123456 GET http://127.0.0.1:5000/api/v1.0/token

(venv) $ http --auth <token>: --json POST http://127.0.0.1:5000/api/v1.0/posts/ "body=I'm adding a post from the *command line*."

$ git clone https://github.com/win911/flask_class.git$ git checkout 14a| Resource | URL Methods | Description |
|---|---|---|
/users/<int:id> |
GET | A user |
/users/<int:id>/posts/ |
GET | The blog posts written by a user |
/users/<int:id>/timeline/ |
GET | The blog posts followed by a user |
| /posts/ | GET, POST | All the blog posts |
/posts/<int:id> |
GET, PUT | A blog post |
/posts/<int:id/>comments/ |
GET, POST | The comments on a blog post |
| /comments/ | GET | All the comments |
/comments/<int:id> |
GET | A comment |
# app/models.py
# ...
class Comment(db.Model):
# ...
def to_json(self):
json_comment = {
'url': url_for('api.get_comment', id=self.id),
'post_url': url_for('api.get_post', id=self.post_id),
'body': self.body,
'body_html': self.body_html,
'timestamp': self.timestamp,
'author_url': url_for('api.get_user', id=self.author_id),
}
return json_comment
@staticmethod
def from_json(json_comment):
body = json_comment.get('body')
if body is None or body == '':
raise ValidationError('comment does not have a body')
return Comment(body=body)
$ git clone https://github.com/win911/flask_class.git$ git checkout 14b