root/trunk/examples/divan/divan/comments.py @ 201

Revision 201, 10.8 KB (checked in by cmlenz, 2 years ago)

Divan: Add some very basic tests for the OpenID support.

  • Property svn:eol-style set to native
Line 
1# -*- coding: utf-8 -*-
2
3from cgi import parse_header
4from datetime import datetime
5import logging
6from urllib import urlencode
7from urlparse import urlsplit, urlunsplit
8
9from BeautifulSoup import BeautifulSoup
10from diva import app
11from diva.forms import Form, TextValidator, ValidationError
12from diva.http import abort
13from diva.routing import abs_url, path_to, redirect_to
14from diva.templating import output, render
15from genshi import escape, HTML, ParseError
16from genshi.filters import HTMLSanitizer
17from httplib2 import Http, HttpLib2Error
18
19from divan import moderation, view
20from divan.model import Post, Comment
21
22log = logging.getLogger('divan.comments')
23
24
25def to_xhtml(text):
26    if text is None:
27        text = ''
28    try:
29        from markdown2 import markdown
30    except ImportError:
31        log.warning('python-markdown2 import failed', exc_info=True)
32        return escape(text)
33    else:
34        return HTML(markdown(text, html4tags=True)) | HTMLSanitizer()
35
36
37class CommentForm(Form):
38    openid_url = TextValidator(required=True)
39    resolved_url = TextValidator()
40    author = TextValidator()
41    content = TextValidator(required=True)
42
43    def validate_openid(self, data):
44        openid = normalize_openid(data['openid_url'])
45        if openid == data.get('resolved_url'):
46            return
47
48        http = Http()
49        http.follow_redirects = False
50        try:
51            log.info('Looking up OpenID URL %r', openid)
52            while True: # resolve permanent redirectionss
53                resp, body = http.request(openid, method='HEAD', headers={
54                    'Host': get_hostname(openid)
55                })
56                if resp.status != 301:
57                    break
58                openid = resp['location']
59            if resp.status >= 400:
60                raise ValidationError(resp.reason)
61            log.info('OpenID resolved to URL %r', openid)
62        except HttpLib2Error, e:
63            raise ValidationError(e.args[0])
64
65        data['openid_url'] = data['resolved_url'] = openid
66
67
68class AuthenticationError(Exception):
69    """Exception raised on errors with OpenID authentication."""
70
71
72@output('comment.html')
73def authenticate(request, response, id):
74    comment = Comment.load(app.db, id) or abort(404)
75    post = Post.load(app.db, comment.post_id) or abort(404)
76    if not post.allow_comments:
77        abort(403)
78    mode = request.GET['openid.mode']
79
80    if mode == 'id_res':
81        log.info('OpenID %r authenticated by %r', comment.openid,
82                 comment.openid_server)
83        params = {
84            'openid.mode': 'check_authentication',
85            'openid.sig': request.GET['openid.sig'],
86            'openid.signed': request.GET['openid.signed'],
87            'openid.assoc_handle': request.GET['openid.assoc_handle'],
88        }
89        for name in request.GET['openid.signed'].split(','):
90            params.setdefault('openid.%s' % name,
91                              request.GET['openid.%s' % name])
92
93        http = Http()
94        resp, body = http.request(comment.openid_server, method='POST',
95                                  body=urlencode(params),
96                                  headers={
97            'Content-Type': 'application/x-www-form-urlencoded',
98            'Host': get_hostname(comment.openid_server)
99        })
100        result = decode_kvform(body.strip())
101        if result['is_valid'] == 'true':
102            log.info('OpenID authentication for %r validated by %r',
103                     comment.openid, comment.openid_server)
104            comment.authenticated = True
105
106            if Comment.is_whitelisted(comment.openid):
107                # OpenID of commenter is on the whitelist, no moderation needed
108                comment.published = True
109                post.num_comments += 1
110                app.db.update([comment._data, post._data])
111                log.info('Published comment by %r <%s>', comment.author,
112                         comment.openid)
113                redirect_to('post', *post.path())
114
115            comment.store(app.db)
116            moderation.notify(post, comment)
117            log.info('Comment by %r <%s> awaiting moderation', comment.author,
118                     comment.openid)
119            return render('thanks.html', post=post, comment=comment)
120
121        else:
122            log.warning('Validation of OpenID authentication for %r by %r '
123                        'failed: %r', comment.openid, comment.openid_server,
124                        body.strip())
125
126    elif mode == 'cancel':
127        del app.db[comment.id]
128
129    redirect_to('post', *post.path())
130
131
132@output('comment.html')
133def comment(request, response, refid):
134    post = Post.load(app.db, refid) or abort(404)
135    if not post.allow_comments:
136        abort(403)
137    form = CommentForm()
138    preview = None
139
140    if request.method == 'POST':
141        if 'cancel' in request.POST:
142            return redirect_to(view.post, *post.path())
143        if form.validate(request.POST) and 'preview' not in request.POST:
144            claimed_id = form['openid_url']
145            log.info('Attempting OpenID authentication for %r', claimed_id)
146            try:
147                provider, local_id, version = resolve_openid(claimed_id)
148            except AuthenticationError, e:
149                form.errors[None] = [e.args[0]]
150            else:
151                log.info('OpenID resolved to %r via %r', local_id, provider)
152
153                try:
154                    markup = to_xhtml(form['content']).render('xhtml',
155                                                              encoding=None)
156                except ParseError, e:
157                    form.errors['content'] = ['HTML parsing error: %s' % e]
158                else:
159                    comment = Comment(post_id=post.id, time=datetime.utcnow(),
160                                      openid=claimed_id,
161                                      openid_server=provider,
162                                      openid_identity=local_id,
163                                      markup=markup,
164                                      **form.data)
165                    comment.store(app.db)
166
167                    response.set_cookie('author', comment.author)
168                    response.set_cookie('openid', claimed_id)
169
170                    params = {
171                        'openid.mode': 'checkid_setup',
172                        'openid.identity': local_id,
173                        'openid.return_to': abs_url(path_to('authenticate', comment.id))
174                    }
175                    if version >= 2:
176                        params.update({
177                            'openid.ns': 'http://specs.openid.net/auth/2.0',
178                            'openid.claimed_id': claimed_id,
179                            'openid.identity': local_id,
180                            'openid.realm': abs_url(path_to('index'))
181                        })
182                    else:
183                        params.update({
184                            'openid.trust_root': abs_url(path_to('index'))
185                        })
186
187                    return redirect_to(provider, **params)
188
189        try:
190            preview = to_xhtml(form['content'])
191        except ParseError, e:
192            form.errors['content'] = ['HTML parsing error: %s' % e]
193
194    return render(
195        errors = form.errors,
196        form = form,
197        post = post,
198        preview = preview
199    )
200
201
202def get_hostname(url):
203    """Extract the host name from the given URL, which does not include the
204    port for HTTP and HTTPS URLs using the respective default port.
205   
206    >>> get_hostname('http://me.yahoo.com/')
207    'me.yahoo.com'
208    >>> get_hostname('http://me.yahoo.com:80/')
209    'me.yahoo.com'
210    >>> get_hostname('https://me.yahoo.com:443/')
211    'me.yahoo.com'
212   
213    :param url: the URL from which to extract the host name
214    :return: the extracted host name
215    """
216    scheme, netloc, path, query, fragment = list(urlsplit(url))
217    if ':' in netloc:
218        hostname, port = netloc.split(':')
219        if port == '80' and scheme == 'http':
220            port = None
221        elif port == '443' and scheme == 'https':
222            port = None
223        return ':'.join(filter(None, [hostname, port]))
224    return netloc
225
226
227def normalize_openid(url):
228    """Normalize the given OpenID URL.
229   
230    >>> normalize_openid('me.yahoo.com')
231    'http://me.yahoo.com/'
232    >>> normalize_openid('https://me.yahoo.com')
233    'https://me.yahoo.com/'
234    >>> normalize_openid('https://me.yahoo.com/joe')
235    'https://me.yahoo.com/joe'
236   
237    :param url: the URL to normalize
238    :return: the normalized URL
239    """
240    if '://' not in url:
241        url = 'http://%s' % url
242    scheme, netloc, path, query, fragment = list(urlsplit(url))
243    if not path:
244        path = '/'
245    return urlunsplit((scheme, netloc, path, query, ''))
246
247
248def decode_kvform(string):
249    """Parse a key-value form encoded string as defined in the OpenID
250    specification, and return a dictionary mapping values to keys.
251   
252    >>> encoded = 'ns:http://specs.openid.net/auth/2.0\\nis_valid:true'
253    >>> parsed = decode_kvform(encoded)
254    >>> parsed['ns']
255    u'http://specs.openid.net/auth/2.0'
256    >>> parsed['is_valid']
257    u'true'
258   
259    :param string: the key-value form encoded string
260    :return: dictionary mapping values to keys
261    """
262    if not isinstance(string, unicode):
263        string = string.decode('utf-8')
264    string = string.strip()
265    retval = {}
266    for line in string.split('\n'):
267        key, value = line.split(':', 1)
268        retval[key.strip()] = value.strip()
269    return retval
270
271
272def resolve_openid(url):
273    """Resolve the given OpenID URL and return a tuple of the form
274    `(provider_url, identity_url, version)`.
275    """
276    http = Http()
277    resp, body = http.request(url, method='GET', headers={
278        'Host': get_hostname(url)
279    })
280    if resp.status != 200:
281        raise AuthenticationError('Could not retrieve OpenID URL: %d %s' %
282                                  (resp.status, resp.reason))
283
284    mimetype = resp.get('content-type')
285    if mimetype:
286        mimetype, params = parse_header(mimetype)
287    if mimetype not in ('application/xhtml+xml', 'text/html'):
288        raise AuthenticationError('Resource for OpenID URL is not an HTML '
289                                  'page, but %r' % mimetype)
290
291    soup = BeautifulSoup(body, fromEncoding=params.get('charset'))
292    provider_tag = soup.findAll('link', rel='openid2.provider')
293    if provider_tag:
294        version = 2
295    else:
296        provider_tag = soup.findAll('link', rel='openid.server')
297        if not provider_tag:
298            raise AuthenticationError('OpenID HTML resource contains no '
299                                      '"openid.server" link')
300        version = 1
301    provider_url = provider_tag[0]['href']
302
303    if version == 2:
304        identity_tag = soup.findAll('link', rel='openid2.local_id')
305    else:
306        identity_tag = soup.findAll('link', rel='openid.delegate')
307    if not identity_tag:
308        identity_url = url
309    else:
310        identity_url = identity_tag[0]['href']
311
312    return provider_url, identity_url, version
Note: See TracBrowser for help on using the browser.