Klaus Demo ~jonashaag/bjoern / 7bbd8fe
Fast file transfers, part 2: Profit from sendfile(). Jonas Haag 9 years ago
8 changed file(s) with 106 addition(s) and 57 deletion(s). Raw diff Collapse all Expand all
9191 watch -n 0.5 \
9292 'cat /proc/$$(pgrep -n python)/cmdline | tr "\0" " " | head -c -1; \
9393 echo; echo; \
94 tail -n +25 /proc/$$(pidof -s python)/smaps'
94 tail -n +25 /proc/$$(pgrep -n python)/smaps'
9595
9696 upload:
9797 python setup.py sdist upload
1111 }
1212 Py_INCREF(file);
1313 FileWrapper* wrapper = PyObject_NEW(FileWrapper, &FileWrapper_Type);
14 PyFile_IncUseCount((PyFileObject*)file);
1415 wrapper->file = file;
1516 return (PyObject*)wrapper;
1617 }
3031 static void
3132 FileWrapper_dealloc(PyObject* self)
3233 {
34 PyFile_DecUseCount((PyFileObject*)((FileWrapper*)self)->file);
3335 Py_DECREF(((FileWrapper*)self)->file);
3436 PyObject_FREE(self);
3537 }
00 #include "common.h"
1
2 #define FileWrapper_CheckExact(x) ((x)->ob_type == &FileWrapper_Type)
13
24 PyTypeObject FileWrapper_Type;
35
6060 Py_DECREF(request->iterable);
6161 }
6262 Py_XDECREF(request->iterator);
63 if(request->headers)
64 assert(request->headers->ob_refcnt >= 1);
65 if(request->status)
66 assert(request->status->ob_refcnt >= 1);
6763 Py_XDECREF(request->headers);
6864 Py_XDECREF(request->status);
6965 }
1414 unsigned keep_alive : 1;
1515 unsigned response_length_unknown : 1;
1616 unsigned chunked_response : 1;
17 unsigned use_sendfile : 1;
1718 } request_state;
1819
1920 typedef struct {
3536
3637 request_state state;
3738
39 PyObject* status;
3840 PyObject* headers;
3941 PyObject* current_chunk;
4042 Py_ssize_t current_chunk_p;
4143 PyObject* iterable;
4244 PyObject* iterator;
43 PyObject* status;
4445 } Request;
4546
4647 #define REQUEST_FROM_WATCHER(watcher) \
44 #ifdef WANT_SIGINT_HANDLING
55 # include <sys/signal.h>
66 #endif
7 #include <sys/sendfile.h>
78 #include <ev.h>
89 #include "common.h"
910 #include "wsgi.h"
3233 static ev_io_callback ev_io_on_read;
3334 static ev_io_callback ev_io_on_write;
3435 static bool send_chunk(Request*);
36 static bool do_sendfile(Request*);
37 static bool handle_nonzero_errno(Request*);
3538
3639 void server_run(const char* hostaddr, const int port)
3740 {
186189 Request* request = REQUEST_FROM_WATCHER(watcher);
187190
188191 GIL_LOCK(0);
189 assert(request->current_chunk);
190
191 if(send_chunk(request))
192 goto out;
193
194 if(request->iterator) {
195 PyObject* next_chunk;
196 next_chunk = wsgi_iterable_get_next_chunk(request);
197 if(next_chunk) {
198 if(request->state.chunked_response) {
199 request->current_chunk = wrap_http_chunk_cruft_around(next_chunk);
200 Py_DECREF(next_chunk);
192
193 if(request->state.use_sendfile) {
194 /* sendfile */
195 if(request->current_chunk && send_chunk(request))
196 goto out;
197 /* abuse current_chunk_p to store the file fd */
198 request->current_chunk_p = PyObject_AsFileDescriptor(request->iterable);
199 if(do_sendfile(request))
200 goto out;
201 } else {
202 /* iterable */
203 if(send_chunk(request))
204 goto out;
205
206 if(request->iterator) {
207 PyObject* next_chunk;
208 next_chunk = wsgi_iterable_get_next_chunk(request);
209 if(next_chunk) {
210 if(request->state.chunked_response) {
211 request->current_chunk = wrap_http_chunk_cruft_around(next_chunk);
212 Py_DECREF(next_chunk);
213 } else {
214 request->current_chunk = next_chunk;
215 }
216 assert(request->current_chunk_p == 0);
217 goto out;
201218 } else {
202 request->current_chunk = next_chunk;
219 if(PyErr_Occurred()) {
220 PyErr_Print();
221 /* We can't do anything graceful here because at least one
222 * chunk is already sent... just close the connection */
223 DBG_REQ(request, "Exception in iterator, can not recover");
224 ev_io_stop(mainloop, &request->ev_watcher);
225 close(request->client_fd);
226 Request_free(request);
227 goto out;
228 }
229 Py_CLEAR(request->iterator);
203230 }
231 }
232
233 if(request->state.chunked_response) {
234 /* We have to send a terminating empty chunk + \r\n */
235 request->current_chunk = PyString_FromString("0\r\n\r\n");
204236 assert(request->current_chunk_p == 0);
237 request->state.chunked_response = false;
205238 goto out;
206 } else {
207 if(PyErr_Occurred()) {
208 PyErr_Print();
209 /* We can't do anything graceful here because at least one
210 * chunk is already sent... just close the connection */
211 DBG_REQ(request, "Exception in iterator, can not recover");
212 ev_io_stop(mainloop, &request->ev_watcher);
213 close(request->client_fd);
214 Request_free(request);
215 goto out;
216 }
217 Py_CLEAR(request->iterator);
218239 }
219 }
220
221 if(request->state.chunked_response) {
222 /* We have to send a terminating empty chunk + \r\n */
223 request->current_chunk = PyString_FromString("0\r\n\r\n");
224 assert(request->current_chunk_p == 0);
225 request->state.chunked_response = false;
226 goto out;
227240 }
228241
229242 ev_io_stop(mainloop, &request->ev_watcher);
248261 send_chunk(Request* request)
249262 {
250263 Py_ssize_t chunk_length;
251 Py_ssize_t sent_bytes;
264 Py_ssize_t bytes_sent;
252265
253266 assert(request->current_chunk != NULL);
254267 assert(!(request->current_chunk_p == PyString_GET_SIZE(request->current_chunk)
255268 && PyString_GET_SIZE(request->current_chunk) != 0));
256269
257 sent_bytes = write(
270 bytes_sent = write(
258271 request->client_fd,
259272 PyString_AS_STRING(request->current_chunk) + request->current_chunk_p,
260273 PyString_GET_SIZE(request->current_chunk) - request->current_chunk_p
261274 );
262275
263 if(sent_bytes == -1) {
264 error:
265 if(errno == EAGAIN || errno == EWOULDBLOCK) {
266 /* Try again later */
267 return true;
268 } else {
269 /* Serious transmission failure. Hang up. */
270 fprintf(stderr, "Client %d hit errno %d\n", request->client_fd, errno);
271 Py_DECREF(request->current_chunk);
272 Py_XCLEAR(request->iterator);
273 request->state.keep_alive = false;
274 return false;
275 }
276 }
277
278 request->current_chunk_p += sent_bytes;
276 if(bytes_sent == -1)
277 return handle_nonzero_errno(request);
278
279 request->current_chunk_p += bytes_sent;
279280 if(request->current_chunk_p == PyString_GET_SIZE(request->current_chunk)) {
280281 Py_CLEAR(request->current_chunk);
281282 request->current_chunk_p = 0;
283284 }
284285 return true;
285286 }
287
288 #define SENDFILE_CHUNK_SIZE 16*1024
289
290 static bool
291 do_sendfile(Request* request)
292 {
293 Py_ssize_t bytes_sent = sendfile(
294 request->client_fd,
295 request->current_chunk_p, /* current_chunk_p stores the file fd */
296 NULL, SENDFILE_CHUNK_SIZE
297 );
298 if(bytes_sent == -1)
299 return handle_nonzero_errno(request);
300 return bytes_sent != 0;
301 }
302
303 static bool
304 handle_nonzero_errno(Request* request)
305 {
306 if(errno == EAGAIN || errno == EWOULDBLOCK) {
307 /* Try again later */
308 return true;
309 } else {
310 /* Serious transmission failure. Hang up. */
311 fprintf(stderr, "Client %d hit errno %d\n", request->client_fd, errno);
312 Py_XDECREF(request->current_chunk);
313 Py_XCLEAR(request->iterator);
314 request->state.keep_alive = false;
315 return false;
316 }
317 }
00 #include "common.h"
11 #include "bjoernmodule.h"
2 #include "filewrapper.h"
23 #include "wsgi.h"
34
45 static PyObject* (start_response)(PyObject* self, PyObject* args, PyObject *kwargs);
8384 Py_DECREF(retval);
8485 first_chunk = NULL;
8586 }
87 } else if(FileWrapper_CheckExact(retval)) {
88 request->state.use_sendfile = true;
89 request->iterable = ((FileWrapper*)retval)->file;
90 Py_INCREF(request->iterable);
91 Py_DECREF(retval);
92 request->iterator = NULL;
93 first_chunk = NULL;
8694 } else {
8795 /* Generic iterable (list of length != 1, generator, ...) */
8896 request->iterable = retval;
00 #include "common.h"
11 #include "bjoernmodule.h"
2 #include "filewrapper.h"
23 #include "wsgi.h"
34
45 static PyObject* (start_response)(PyObject* self, PyObject* args, PyObject *kwargs);
8384 Py_DECREF(retval);
8485 first_chunk = NULL;
8586 }
87 } else if(FileWrapper_CheckExact(retval)) {
88 request->state.use_sendfile = true;
89 request->iterable = ((FileWrapper*)retval)->file;
90 Py_INCREF(request->iterable);
91 Py_DECREF(retval);
92 request->iterator = NULL;
93 first_chunk = NULL;
8694 } else {
8795 /* Generic iterable (list of length != 1, generator, ...) */
8896 request->iterable = retval;