Klaus Demo ~jonashaag/klaus / a6e3660
Add some tests for 'klaus.make_app' Jonas Haag 7 years ago
5 changed file(s) with 167 addition(s) and 0 deletion(s). Raw diff Collapse all Expand all
11 /bin/
22 env/
33 .idea
4 *.swp
5 tests/repos/
0 #!/bin/sh
1 ( rm -rf tests/repos/test_repo
2 mkdir -p tests/repos/test_repo
3 cd tests/repos/test_repo
4 git init
5 echo Hello World > README
6 git add README
7 git commit -a -m "First commit"
8 )
9
10 py.test tests/
(New empty file)
0 testuser:test:41e45fc24eaddd7545c4819e49d1198b
0 import os
1 import contextlib
2 import subprocess
3 import tempfile
4 import shutil
5 import requests
6 import requests.auth
7 import klaus
8
9 import werkzeug.serving
10 import threading
11
12
13 TEST_REPO = os.path.abspath("tests/repos/test_repo")
14 TEST_REPO_URL = "test_repo/"
15 TEST_SITE_NAME = "Some site"
16 HTDIGEST_FILE = "tests/credentials.htdigest"
17
18 UNAUTHORIZED_TEST_SERVER = "http://invalid:password@localhost:9876/"
19 UNAUTHORIZED_TEST_REPO_URL = UNAUTHORIZED_TEST_SERVER + TEST_REPO_URL
20 AUTHORIZED_TEST_SERVER = "http://testuser:testpassword@localhost:9876/"
21 AUTHORIZED_TEST_REPO_URL = AUTHORIZED_TEST_SERVER + TEST_REPO_URL
22
23
24 def GET_unauthorized(url=""):
25 return requests.get(UNAUTHORIZED_TEST_SERVER + url, auth=requests.auth.HTTPDigestAuth("invalid", "password"))
26
27 def GET_authorized(url=""):
28 return requests.get(AUTHORIZED_TEST_SERVER + url, auth=requests.auth.HTTPDigestAuth("testuser", "testpassword"))
29
30
31 @contextlib.contextmanager
32 def testserver(*args, **kwargs):
33 app = klaus.make_app([TEST_REPO], TEST_SITE_NAME, *args, **kwargs)
34 server = werkzeug.serving.make_server("localhost", 9876, app)
35 thread = threading.Thread(target=server.serve_forever)
36 thread.daemon = True
37 thread.start()
38 try:
39 yield
40 finally:
41 server.server_close()
42
43
44 def testserver_require_auth(*args, **kwargs):
45 kwargs['htdigest_file'] = open(HTDIGEST_FILE)
46 kwargs['require_browser_auth'] = True
47 return testserver(*args, **kwargs)
48
49
50 def test_no_smarthttp_no_require_browser_auth():
51 with testserver():
52 assert can_reach_site_unauthorized()
53 assert can_reach_site_authorized()
54 assert_cannot_clone()
55 assert not can_push_unauthorized()
56 assert not can_push_authorized()
57
58
59 def test_smarthttp_no_require_browser_auth():
60 # Push is disabled either if no credentials are given or the 'disable_push' option is set:
61 for server in [
62 testserver(use_smarthttp=True),
63 testserver(use_smarthttp=True, htdigest_file=open(HTDIGEST_FILE), disable_push=True)
64 ]:
65 with server:
66 assert can_clone_unauthorized()
67 assert not can_push_unauthorized()
68 assert not can_push_authorized()
69
70
71 def test_smarthttp_push_no_require_browser_auth():
72 with testserver(use_smarthttp=True, htdigest_file=open(HTDIGEST_FILE)):
73 assert can_clone_unauthorized()
74 assert not can_push_unauthorized()
75 assert can_push_authorized()
76
77
78 def test_no_smarthttp_require_browser_auth():
79 with testserver_require_auth():
80 assert not can_reach_site_unauthorized()
81 assert can_reach_site_authorized()
82 assert not can_clone_unauthorized()
83 assert not can_clone_authorized()
84 assert not can_push_unauthorized()
85 assert not can_push_authorized()
86
87
88 def test_smarthttp_require_browser_auth():
89 with testserver_require_auth(use_smarthttp=True, disable_push=True):
90 assert not can_clone_unauthorized()
91 assert can_clone_authorized()
92 assert not can_push_unauthorized()
93 assert not can_push_authorized()
94
95
96 def test_smarthttp_push_require_browser_auth():
97 with testserver_require_auth(use_smarthttp=True):
98 assert not can_clone_unauthorized()
99 assert can_clone_authorized()
100 assert not can_push_unauthorized()
101 assert can_push_authorized()
102
103
104 def assert_cannot_clone():
105 assert "git clone" not in GET_unauthorized(TEST_REPO_URL).content
106 assert 404 == GET_unauthorized(TEST_REPO_URL + "info/refs?service=git-upload-pack").status_code
107 assert 128 == subprocess.call(["git", "clone", AUTHORIZED_TEST_REPO_URL])
108
109
110 def assert_cannot_push_authorized(expected_status):
111 assert expected_status == GET_authorized(TEST_REPO_URL + "info/refs?service=git-receive-pack").status_code
112 assert expected_status == GET_authorized(TEST_REPO_URL + "git-receive-pack").status_code
113 assert 128 == subprocess.call(["git", "push", AUTHORIZED_TEST_REPO_URL, "master"], cwd=TEST_REPO)
114
115
116 def assert_cannot_push_unauthorized(expected_status):
117 assert expected_status == GET_unauthorized(TEST_REPO_URL + "info/refs?service=git-receive-pack").status_code
118 assert expected_status == GET_unauthorized(TEST_REPO_URL + "git-receive-pack").status_code
119 # XXX 'git push' asks for a new username/password and blocks
120 # assert 128 == subprocess.call(["git", "push", UNAUTHORIZED_TEST_REPO_URL, "master"], cwd=TEST_REPO)
121
122
123 # Clone
124 def can_clone_unauthorized():
125 tmp = tempfile.mkdtemp()
126 try:
127 return subprocess.call(["git", "clone", UNAUTHORIZED_TEST_REPO_URL, tmp]) == 0
128 finally:
129 shutil.rmtree(tmp, ignore_errors=True)
130
131 def can_clone_authorized():
132 tmp = tempfile.mkdtemp()
133 try:
134 return subprocess.call(["git", "clone", AUTHORIZED_TEST_REPO_URL, tmp]) == 0
135 finally:
136 shutil.rmtree(tmp, ignore_errors=True)
137
138
139 # Push
140 def can_push_unauthorized():
141 return subprocess.call(["git", "push", UNAUTHORIZED_TEST_REPO_URL, "master"], cwd=TEST_REPO) == 0
142
143 def can_push_authorized():
144 return subprocess.call(["git", "push", AUTHORIZED_TEST_REPO_URL, "master"], cwd=TEST_REPO) == 0
145
146
147 # Reach
148 def can_reach_site_unauthorized():
149 return GET_unauthorized("test_repo").status_code == 200
150
151 def can_reach_site_authorized():
152 return GET_authorized(TEST_REPO_URL).status_code == 200