Stream: session completion check code moved to a separate function.
The code refactored to simplify the ngx_stream_proxy_process() function
and facilitate adding new session termination conditions.
Vladimir Homutov
3 years ago
72 | 72 | static ngx_int_t ngx_stream_proxy_test_connect(ngx_connection_t *c); |
73 | 73 | static void ngx_stream_proxy_process(ngx_stream_session_t *s, |
74 | 74 | ngx_uint_t from_upstream, ngx_uint_t do_write); |
75 | static ngx_int_t ngx_stream_proxy_test_finalize(ngx_stream_session_t *s, | |
76 | ngx_uint_t from_upstream); | |
75 | 77 | static void ngx_stream_proxy_next_upstream(ngx_stream_session_t *s); |
76 | 78 | static void ngx_stream_proxy_finalize(ngx_stream_session_t *s, ngx_uint_t rc); |
77 | 79 | static u_char *ngx_stream_proxy_log_error(ngx_log_t *log, u_char *buf, |
1645 | 1647 | |
1646 | 1648 | c->log->action = "proxying connection"; |
1647 | 1649 | |
1648 | if (c->type == SOCK_DGRAM | |
1649 | && pscf->responses != NGX_MAX_INT32_VALUE | |
1650 | && u->responses >= pscf->responses * u->requests | |
1651 | && !src->buffered && dst && !dst->buffered) | |
1652 | { | |
1650 | if (ngx_stream_proxy_test_finalize(s, from_upstream) == NGX_OK) { | |
1651 | return; | |
1652 | } | |
1653 | ||
1654 | flags = src->read->eof ? NGX_CLOSE_EVENT : 0; | |
1655 | ||
1656 | if (!src->shared && ngx_handle_read_event(src->read, flags) != NGX_OK) { | |
1657 | ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); | |
1658 | return; | |
1659 | } | |
1660 | ||
1661 | if (dst) { | |
1662 | if (!dst->shared && ngx_handle_write_event(dst->write, 0) != NGX_OK) { | |
1663 | ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); | |
1664 | return; | |
1665 | } | |
1666 | ||
1667 | if (!c->read->delayed && !pc->read->delayed) { | |
1668 | ngx_add_timer(c->write, pscf->timeout); | |
1669 | ||
1670 | } else if (c->write->timer_set) { | |
1671 | ngx_del_timer(c->write); | |
1672 | } | |
1673 | } | |
1674 | } | |
1675 | ||
1676 | ||
1677 | static ngx_int_t | |
1678 | ngx_stream_proxy_test_finalize(ngx_stream_session_t *s, | |
1679 | ngx_uint_t from_upstream) | |
1680 | { | |
1681 | ngx_connection_t *c, *pc; | |
1682 | ngx_log_handler_pt handler; | |
1683 | ngx_stream_upstream_t *u; | |
1684 | ngx_stream_proxy_srv_conf_t *pscf; | |
1685 | ||
1686 | pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); | |
1687 | ||
1688 | c = s->connection; | |
1689 | u = s->upstream; | |
1690 | pc = u->connected ? u->peer.connection : NULL; | |
1691 | ||
1692 | if (c->type == SOCK_DGRAM) { | |
1693 | ||
1694 | if (pscf->responses == NGX_MAX_INT32_VALUE | |
1695 | || u->responses < pscf->responses * u->requests) | |
1696 | { | |
1697 | return NGX_DECLINED; | |
1698 | } | |
1699 | ||
1700 | if (pc == NULL || c->buffered || pc->buffered) { | |
1701 | return NGX_DECLINED; | |
1702 | } | |
1703 | ||
1653 | 1704 | handler = c->log->handler; |
1654 | 1705 | c->log->handler = NULL; |
1655 | 1706 | |
1664 | 1715 | c->log->handler = handler; |
1665 | 1716 | |
1666 | 1717 | ngx_stream_proxy_finalize(s, NGX_STREAM_OK); |
1667 | return; | |
1668 | } | |
1669 | ||
1670 | if (c->type == SOCK_STREAM | |
1671 | && src->read->eof && dst && (dst->read->eof || !dst->buffered)) | |
1718 | ||
1719 | return NGX_OK; | |
1720 | } | |
1721 | ||
1722 | /* c->type == SOCK_STREAM */ | |
1723 | ||
1724 | if (pc == NULL | |
1725 | || (!c->read->eof && !pc->read->eof) | |
1726 | || (!c->read->eof && c->buffered) | |
1727 | || (!pc->read->eof && pc->buffered)) | |
1672 | 1728 | { |
1673 | handler = c->log->handler; | |
1674 | c->log->handler = NULL; | |
1675 | ||
1676 | ngx_log_error(NGX_LOG_INFO, c->log, 0, | |
1677 | "%s disconnected" | |
1678 | ", bytes from/to client:%O/%O" | |
1679 | ", bytes from/to upstream:%O/%O", | |
1680 | from_upstream ? "upstream" : "client", | |
1681 | s->received, c->sent, u->received, pc ? pc->sent : 0); | |
1682 | ||
1683 | c->log->handler = handler; | |
1684 | ||
1685 | ngx_stream_proxy_finalize(s, NGX_STREAM_OK); | |
1686 | return; | |
1687 | } | |
1688 | ||
1689 | flags = src->read->eof ? NGX_CLOSE_EVENT : 0; | |
1690 | ||
1691 | if (!src->shared && ngx_handle_read_event(src->read, flags) != NGX_OK) { | |
1692 | ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); | |
1693 | return; | |
1694 | } | |
1695 | ||
1696 | if (dst) { | |
1697 | if (!dst->shared && ngx_handle_write_event(dst->write, 0) != NGX_OK) { | |
1698 | ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); | |
1699 | return; | |
1700 | } | |
1701 | ||
1702 | if (!c->read->delayed && !pc->read->delayed) { | |
1703 | ngx_add_timer(c->write, pscf->timeout); | |
1704 | ||
1705 | } else if (c->write->timer_set) { | |
1706 | ngx_del_timer(c->write); | |
1707 | } | |
1708 | } | |
1729 | return NGX_DECLINED; | |
1730 | } | |
1731 | ||
1732 | handler = c->log->handler; | |
1733 | c->log->handler = NULL; | |
1734 | ||
1735 | ngx_log_error(NGX_LOG_INFO, c->log, 0, | |
1736 | "%s disconnected" | |
1737 | ", bytes from/to client:%O/%O" | |
1738 | ", bytes from/to upstream:%O/%O", | |
1739 | from_upstream ? "upstream" : "client", | |
1740 | s->received, c->sent, u->received, pc ? pc->sent : 0); | |
1741 | ||
1742 | c->log->handler = handler; | |
1743 | ||
1744 | ngx_stream_proxy_finalize(s, NGX_STREAM_OK); | |
1745 | ||
1746 | return NGX_OK; | |
1709 | 1747 | } |
1710 | 1748 | |
1711 | 1749 |