Klaus Demo ~jonashaag/bjoern / 261d3a6 tests / keep-alive-behaviour.py
261d3a6

Tree @261d3a6 (Download .tar.gz)

keep-alive-behaviour.py @261d3a6raw · history · blame

import os
import re
import random
import httplib
import socket

HOST = ('127.0.0.1', 9000)

class Fail(Exception):
    pass

def dispatcher(environ, start_response):
    return dispatcher.app(environ, start_response)

def still_alive(sock):
    prev_timeout = sock.gettimeout()
    try:
        sock.settimeout(0.5)
        try:
            c = sock.recv(100)
            assert not c, repr(c)
            return False
        except socket.timeout:
            return True
    finally:
        sock.settimeout(prev_timeout)


class NeverClosedHTTPConnection(httplib.HTTPConnection):
    def close(self):
        backup = self.sock
        self.sock = None
        httplib.HTTPConnection.close(self)
        self.sock = backup

class Testcase(object):
    body_length = 1000

    def __init__(self):
        self.conn = None
        self.body = os.urandom(self.body_length)
        self.request_count = 5

    def run(self):
        if not self.conn:
            dispatcher.app = self
            self.conn = NeverClosedHTTPConnection(*HOST)

        self.send_request(self._tinker_request())
        response = self.get_response()
        if self.expect_chunked and not response.chunked:
            raise Fail("Response unexpectedly not chunked")
        body = response.read()

        if self.raise_error:
            if response.status != 500 or body:
                raise Fail("Expected 500 Internal Server Error")
        else:
            if response.status != 200:
                raise Fail("Status is %d, expected 200", response.status)

        if still_alive(self.conn.sock):
            if not self.expect_keep_alive:
                raise Fail("Expected connection not be kept-alive")
            self.request_count -= 1
            if self.request_count:
                if self.request_count == 1:
                    # Send Connection: close on last request
                    self.want_keep_alive = False
                    self.expect_keep_alive = False
                    self.expect_chunked = False
                self.run()
            else:
                if still_alive(self.conn.sock):
                    raise Fail("Connection still alive")
        else:
            if self.expect_keep_alive:
                raise Fail("Expected connection to be kept-alive")

        if not self.raise_error and body != self.body:
            raise Fail("Different bodies:\n%s\n%s" % (body, self.body))

    def _is_chunked(self, response):
        if not 'Transfer-Encoding: chunked' in response or not response.endswith('0\r\n\r\n'):
            return False
        # TODO: full-featured chunked validation here
        return True

    def _tinker_request(self):
        req = 'GET / HTTP/1.%d\r\n' % self.http_minor
        if self.want_keep_alive:
            req += 'Connection: Keep-Alive\r\n'
        req += '\r\n'
        return req

    def send_request(self, data):
        self.conn.send(data)

    def get_response(self):
        self.conn._HTTPConnection__state = httplib._CS_REQ_SENT
        return self.conn.getresponse()

    def __call__(self, environ, start_response):
        if self.raise_error and random.randint(0, 1):
            raise ValueError('foo')
        headers = []
        if self.give_content_length:
            headers.append(('Content-Length', str(len(self.body))))
        if self.raise_error:
            start_response('200 ok', headers)
            raise ValueError('bar')
        start_response('200 ok', headers)
        # second item is to trick bjoern's internal optimizations:
        return [self.body, '']

import thread
import bjoern
thread.start_new_thread(bjoern.run, (dispatcher,)+HOST)

import time; time.sleep(0.1)

for index, tpl in enumerate([
        (0, False,  False, False, False, False),
        (0, False,  False, True,  False, False),
        (0, False,  False, True,  False, False),
        (0, False,  True,  True,  False, True),
        (0, True,   False, False, False, False),
        (0, True,   False, True,  False, False),
        (0, True,   True,  False, False, False),
        (0, True,   True,  True,  False, False),

        (1, False,  False, False, False, False),
        (1, False,  False, True,  True,  True),
        (1, False,  True,  False, False, False),
        (1, False,  True,  True,  False, True),
        (1, True,   False, False, False, False),
        (1, True,   False, True,  False, False),
        (1, True,   True,  False, False, False),
        (1, True,   True,  True,  False, False)
]):
    print 'Running test %d: %r' % (index, tpl)
    class _test(Testcase):
        http_minor, \
        raise_error, \
        give_content_length, \
        want_keep_alive, \
        expect_chunked, \
        expect_keep_alive = tpl
    _test().run()

print '--- SUCCESS! ---'