Make main socket nonblocking. Limit writequeue size.

This commit is contained in:
Matt Johnston 2015-03-20 23:36:42 +08:00
parent a070159cc5
commit 275611fbaa
6 changed files with 41 additions and 17 deletions

View File

@ -106,7 +106,7 @@ void channel_connect_done(int result, int sock, void* user_data, const char* err
void chaninitialise(const struct ChanType *chantypes[]); void chaninitialise(const struct ChanType *chantypes[]);
void chancleanup(); void chancleanup();
void setchannelfds(fd_set *readfd, fd_set *writefd); void setchannelfds(fd_set *readfds, fd_set *writefds, int allow_reads);
void channelio(fd_set *readfd, fd_set *writefd); void channelio(fd_set *readfd, fd_set *writefd);
struct Channel* getchannel(); struct Channel* getchannel();
/* Returns an arbitrary channel that is in a ready state - not /* Returns an arbitrary channel that is in a ready state - not

View File

@ -531,7 +531,7 @@ static void writechannel(struct Channel* channel, int fd, circbuffer *cbuf,
/* Set the file descriptors for the main select in session.c /* Set the file descriptors for the main select in session.c
* This avoid channels which don't have any window available, are closed, etc*/ * This avoid channels which don't have any window available, are closed, etc*/
void setchannelfds(fd_set *readfds, fd_set *writefds) { void setchannelfds(fd_set *readfds, fd_set *writefds, int allow_reads) {
unsigned int i; unsigned int i;
struct Channel * channel; struct Channel * channel;
@ -549,7 +549,7 @@ void setchannelfds(fd_set *readfds, fd_set *writefds) {
FD if there's the possibility of "~."" to kill an FD if there's the possibility of "~."" to kill an
interactive session (the read_mangler) */ interactive session (the read_mangler) */
if (channel->transwindow > 0 if (channel->transwindow > 0
&& (ses.dataallowed || channel->read_mangler)) { && ((ses.dataallowed && allow_reads) || channel->read_mangler)) {
if (channel->readfd >= 0) { if (channel->readfd >= 0) {
FD_SET(channel->readfd, readfds); FD_SET(channel->readfd, readfds);

View File

@ -64,6 +64,13 @@ void common_session_init(int sock_in, int sock_out) {
ses.sock_out = sock_out; ses.sock_out = sock_out;
ses.maxfd = MAX(sock_in, sock_out); ses.maxfd = MAX(sock_in, sock_out);
if (sock_in >= 0) {
setnonblocking(sock_in);
}
if (sock_out >= 0) {
setnonblocking(sock_out);
}
ses.socket_prio = DROPBEAR_PRIO_DEFAULT; ses.socket_prio = DROPBEAR_PRIO_DEFAULT;
/* Sets it to lowdelay */ /* Sets it to lowdelay */
update_channel_prio(); update_channel_prio();
@ -145,6 +152,7 @@ void session_loop(void(*loophandler)()) {
/* main loop, select()s for all sockets in use */ /* main loop, select()s for all sockets in use */
for(;;) { for(;;) {
const int writequeue_has_space = (ses.writequeue_len <= 2*TRANS_MAX_PAYLOAD_LEN);
timeout.tv_sec = select_timeout(); timeout.tv_sec = select_timeout();
timeout.tv_usec = 0; timeout.tv_usec = 0;
@ -155,8 +163,12 @@ void session_loop(void(*loophandler)()) {
/* We delay reading from the input socket during initial setup until /* We delay reading from the input socket during initial setup until
after we have written out our initial KEXINIT packet (empty writequeue). after we have written out our initial KEXINIT packet (empty writequeue).
This means our initial packet can be in-flight while we're doing a blocking This means our initial packet can be in-flight while we're doing a blocking
read for the remote ident */ read for the remote ident.
if (ses.sock_in != -1 && (ses.remoteident || isempty(&ses.writequeue))) { We also avoid reading from the socket if the writequeue is full, that avoids
replies backing up */
if (ses.sock_in != -1
&& (ses.remoteident || isempty(&ses.writequeue))
&& writequeue_has_space) {
FD_SET(ses.sock_in, &readfd); FD_SET(ses.sock_in, &readfd);
} }
if (ses.sock_out != -1 && !isempty(&ses.writequeue)) { if (ses.sock_out != -1 && !isempty(&ses.writequeue)) {
@ -168,7 +180,7 @@ void session_loop(void(*loophandler)()) {
FD_SET(ses.signal_pipe[0], &readfd); FD_SET(ses.signal_pipe[0], &readfd);
/* set up for channels which can be read/written */ /* set up for channels which can be read/written */
setchannelfds(&readfd, &writefd); setchannelfds(&readfd, &writefd, writequeue_has_space);
/* Pending connections to test */ /* Pending connections to test */
set_connect_fds(&writefd); set_connect_fds(&writefd);
@ -318,9 +330,7 @@ void session_cleanup() {
void send_session_identification() { void send_session_identification() {
buffer *writebuf = buf_new(strlen(LOCAL_IDENT "\r\n") + 1); buffer *writebuf = buf_new(strlen(LOCAL_IDENT "\r\n") + 1);
buf_putbytes(writebuf, LOCAL_IDENT "\r\n", strlen(LOCAL_IDENT "\r\n")); buf_putbytes(writebuf, LOCAL_IDENT "\r\n", strlen(LOCAL_IDENT "\r\n"));
buf_putbyte(writebuf, 0x0); /* packet type */ writebuf_enqueue(writebuf, 0);
buf_setpos(writebuf, 0);
enqueue(&ses.writequeue, writebuf);
} }
static void read_session_identification() { static void read_session_identification() {

View File

@ -59,7 +59,7 @@ void write_packet() {
ssize_t written; ssize_t written;
#ifdef HAVE_WRITEV #ifdef HAVE_WRITEV
/* 50 is somewhat arbitrary */ /* 50 is somewhat arbitrary */
int iov_count = 50; unsigned int iov_count = 50;
struct iovec iov[50]; struct iovec iov[50];
#endif #endif
@ -83,6 +83,7 @@ void write_packet() {
} }
packet_queue_consume(&ses.writequeue, written); packet_queue_consume(&ses.writequeue, written);
ses.writequeue_len -= written;
if (written == 0) { if (written == 0) {
ses.remoteclosed(); ses.remoteclosed();
@ -113,6 +114,8 @@ void write_packet() {
ses.remoteclosed(); ses.remoteclosed();
} }
ses.writequeue_len -= written;
if (written == len) { if (written == len) {
/* We've finished with the packet, free it */ /* We've finished with the packet, free it */
dequeue(&ses.writequeue); dequeue(&ses.writequeue);
@ -570,15 +573,12 @@ void encrypt_packet() {
/* stick the MAC on it */ /* stick the MAC on it */
buf_putbytes(writebuf, mac_bytes, mac_size); buf_putbytes(writebuf, mac_bytes, mac_size);
/* The last byte of the buffer stores the cleartext packet_type. It is not
* transmitted but is used for transmit timeout purposes */
buf_putbyte(writebuf, packet_type);
/* enqueue the packet for sending. It will get freed after transmission. */
buf_setpos(writebuf, 0);
enqueue(&ses.writequeue, (void*)writebuf);
/* Update counts */ /* Update counts */
ses.kexstate.datatrans += writebuf->len; ses.kexstate.datatrans += writebuf->len;
writebuf_enqueue(writebuf, packet_type);
/* Update counts */
ses.transseq++; ses.transseq++;
now = monotonic_now(); now = monotonic_now();
@ -596,6 +596,16 @@ void encrypt_packet() {
TRACE2(("leave encrypt_packet()")) TRACE2(("leave encrypt_packet()"))
} }
void writebuf_enqueue(buffer * writebuf, unsigned char packet_type) {
/* The last byte of the buffer stores the cleartext packet_type. It is not
* transmitted but is used for transmit timeout purposes */
buf_putbyte(writebuf, packet_type);
/* enqueue the packet for sending. It will get freed after transmission. */
buf_setpos(writebuf, 0);
enqueue(&ses.writequeue, (void*)writebuf);
ses.writequeue_len += writebuf->len-1;
}
/* Create the packet mac, and append H(seqno|clearbuf) to the output */ /* Create the packet mac, and append H(seqno|clearbuf) to the output */
/* output_mac must have ses.keys->trans.algo_mac->hashsize bytes. */ /* output_mac must have ses.keys->trans.algo_mac->hashsize bytes. */

View File

@ -28,12 +28,15 @@
#include "includes.h" #include "includes.h"
#include "queue.h" #include "queue.h"
#include "buffer.h"
void write_packet(); void write_packet();
void read_packet(); void read_packet();
void decrypt_packet(); void decrypt_packet();
void encrypt_packet(); void encrypt_packet();
void writebuf_enqueue(buffer * writebuf, unsigned char packet_type);
void process_packet(); void process_packet();
void maybe_flush_reply_queue(); void maybe_flush_reply_queue();

View File

@ -125,6 +125,7 @@ struct sshsession {
throughout the code, as handlers fill out this throughout the code, as handlers fill out this
buffer with the packet to send. */ buffer with the packet to send. */
struct Queue writequeue; /* A queue of encrypted packets to send */ struct Queue writequeue; /* A queue of encrypted packets to send */
unsigned int writequeue_len; /* Number of bytes pending to send in writequeue */
buffer *readbuf; /* From the wire, decrypted in-place */ buffer *readbuf; /* From the wire, decrypted in-place */
buffer *payload; /* Post-decompression, the actual SSH packet. buffer *payload; /* Post-decompression, the actual SSH packet.
May have extra data at the beginning, will be May have extra data at the beginning, will be