async connections working

--HG--
branch : fastopen
This commit is contained in:
Matt Johnston 2015-02-18 22:46:15 +08:00
parent 8795d733ec
commit 755c1458f0
11 changed files with 108 additions and 105 deletions

View File

@ -73,6 +73,7 @@ struct Channel {
* to ensure we don't run it twice (nor type->checkclose()). */ * to ensure we don't run it twice (nor type->checkclose()). */
int close_handler_done; int close_handler_done;
struct dropbear_progress_connection *conn_pending;
int initconn; /* used for TCP forwarding, whether the channel has been int initconn; /* used for TCP forwarding, whether the channel has been
fully initialised */ fully initialised */
@ -100,6 +101,9 @@ struct ChanType {
void (*closehandler)(struct Channel*); void (*closehandler)(struct Channel*);
}; };
/* Callback for connect_remote */
void channel_connect_done(int result, int sock, void* user_data, const char* errstring);
void chaninitialise(const struct ChanType *chantypes[]); void chaninitialise(const struct ChanType *chantypes[]);
void chancleanup(); void chancleanup();
void setchannelfds(fd_set *readfd, fd_set *writefd); void setchannelfds(fd_set *readfd, fd_set *writefd);

View File

@ -72,12 +72,8 @@ int main(int argc, char ** argv) {
} else } else
#endif #endif
{ {
int sock = connect_remote(cli_opts.remotehost, cli_opts.remoteport, &error); connect_remote(cli_opts.remotehost, cli_opts.remoteport, cli_connected, NULL);
sock_in = sock_out = sock; sock_in = sock_out = -1;
}
if (sock_in < 0) {
dropbear_exit("%s", error);
} }
cli_session(sock_in, sock_out); cli_session(sock_in, sock_out);

View File

@ -93,6 +93,15 @@ static const struct ChanType *cli_chantypes[] = {
NULL /* Null termination */ NULL /* Null termination */
}; };
void cli_connected(int result, int sock, void* userdata, const char *errstring)
{
if (result == DROPBEAR_FAILURE)
{
dropbear_exit("Connect failed: %s", errstring);
}
ses.sock_in = ses.sock_out = sock;
}
void cli_session(int sock_in, int sock_out) { void cli_session(int sock_in, int sock_out) {
common_session_init(sock_in, sock_out); common_session_init(sock_in, sock_out);

View File

@ -254,19 +254,7 @@ static int newtcpforwarded(struct Channel * channel) {
} }
snprintf(portstring, sizeof(portstring), "%d", fwd->connectport); snprintf(portstring, sizeof(portstring), "%d", fwd->connectport);
sock = connect_remote(fwd->connectaddr, portstring, NULL); channel->conn_pending = connect_remote(fwd->connectaddr, portstring, channel_connect_done, channel);
if (sock < 0) {
TRACE(("leave newtcpdirect: sock failed"))
err = SSH_OPEN_CONNECT_FAILED;
goto out;
}
ses.maxfd = MAX(ses.maxfd, sock);
/* We don't set readfd, that will get set after the connection's
* progress succeeds */
channel->writefd = sock;
channel->initconn = 1;
channel->prio = DROPBEAR_CHANNEL_PRIO_UNKNOWABLE; channel->prio = DROPBEAR_CHANNEL_PRIO_UNKNOWABLE;

View File

@ -48,7 +48,6 @@ static void send_msg_channel_data(struct Channel *channel, int isextended);
static void send_msg_channel_eof(struct Channel *channel); static void send_msg_channel_eof(struct Channel *channel);
static void send_msg_channel_close(struct Channel *channel); static void send_msg_channel_close(struct Channel *channel);
static void remove_channel(struct Channel *channel); static void remove_channel(struct Channel *channel);
static void check_in_progress(struct Channel *channel);
static unsigned int write_pending(struct Channel * channel); static unsigned int write_pending(struct Channel * channel);
static void check_close(struct Channel *channel); static void check_close(struct Channel *channel);
static void close_chan_fd(struct Channel *channel, int fd, int how); static void close_chan_fd(struct Channel *channel, int fd, int how);
@ -163,7 +162,6 @@ static struct Channel* newchannel(unsigned int remotechan,
newchan->writefd = FD_UNINIT; newchan->writefd = FD_UNINIT;
newchan->readfd = FD_UNINIT; newchan->readfd = FD_UNINIT;
newchan->errfd = FD_CLOSED; /* this isn't always set to start with */ newchan->errfd = FD_CLOSED; /* this isn't always set to start with */
newchan->initconn = 0;
newchan->await_open = 0; newchan->await_open = 0;
newchan->flushing = 0; newchan->flushing = 0;
@ -242,12 +240,6 @@ void channelio(fd_set *readfds, fd_set *writefds) {
/* write to program/pipe stdin */ /* write to program/pipe stdin */
if (channel->writefd >= 0 && FD_ISSET(channel->writefd, writefds)) { if (channel->writefd >= 0 && FD_ISSET(channel->writefd, writefds)) {
if (channel->initconn) {
/* XXX should this go somewhere cleaner? */
check_in_progress(channel);
continue; /* Important not to use the channel after
check_in_progress(), as it may be NULL */
}
writechannel(channel, channel->writefd, channel->writebuf); writechannel(channel, channel->writefd, channel->writebuf);
do_check_close = 1; do_check_close = 1;
} }
@ -374,27 +366,27 @@ static void check_close(struct Channel *channel) {
* if so, set up the channel properly. Otherwise, the channel is cleaned up, so * if so, set up the channel properly. Otherwise, the channel is cleaned up, so
* it is important that the channel reference isn't used after a call to this * it is important that the channel reference isn't used after a call to this
* function */ * function */
static void check_in_progress(struct Channel *channel) { void channel_connect_done(int result, int sock, void* user_data, const char* UNUSED(errstring)) {
int val; struct Channel *channel = user_data;
socklen_t vallen = sizeof(val);
TRACE(("enter check_in_progress")) TRACE(("enter channel_connect_done"))
if (getsockopt(channel->writefd, SOL_SOCKET, SO_ERROR, &val, &vallen) if (result == DROPBEAR_SUCCESS)
|| val != 0) { {
send_msg_channel_open_failure(channel->remotechan, channel->readfd = channel->writefd = sock;
SSH_OPEN_CONNECT_FAILED, "", ""); channel->conn_pending = NULL;
close(channel->writefd);
remove_channel(channel);
TRACE(("leave check_in_progress: fail"))
} else {
chan_initwritebuf(channel); chan_initwritebuf(channel);
send_msg_channel_open_confirmation(channel, channel->recvwindow, send_msg_channel_open_confirmation(channel, channel->recvwindow,
channel->recvmaxpacket); channel->recvmaxpacket);
channel->readfd = channel->writefd; TRACE(("leave channel_connect_done: success"))
channel->initconn = 0; }
TRACE(("leave check_in_progress: success")) else
{
send_msg_channel_open_failure(channel->remotechan,
SSH_OPEN_CONNECT_FAILED, "", "");
remove_channel(channel);
TRACE(("leave check_in_progress: fail"))
} }
} }
@ -514,8 +506,7 @@ void setchannelfds(fd_set *readfds, fd_set *writefds) {
} }
/* Stuff from the wire */ /* Stuff from the wire */
if (channel->initconn if (channel->writefd >= 0 && cbuf_getused(channel->writebuf) > 0) {
||(channel->writefd >= 0 && cbuf_getused(channel->writebuf) > 0)) {
FD_SET(channel->writefd, writefds); FD_SET(channel->writefd, writefds);
} }
@ -599,6 +590,10 @@ static void remove_channel(struct Channel * channel) {
channel->close_handler_done = 1; channel->close_handler_done = 1;
} }
if (channel->conn_pending) {
cancel_connect(channel->conn_pending);
}
ses.channels[channel->index] = NULL; ses.channels[channel->index] = NULL;
m_free(channel); m_free(channel);
ses.chancount--; ses.chancount--;
@ -1149,7 +1144,7 @@ struct Channel* get_any_ready_channel() {
struct Channel *chan = ses.channels[i]; struct Channel *chan = ses.channels[i];
if (chan if (chan
&& !(chan->sent_eof || chan->recv_eof) && !(chan->sent_eof || chan->recv_eof)
&& !(chan->await_open || chan->initconn)) { && !(chan->await_open)) {
return chan; return chan;
} }
} }

View File

@ -167,6 +167,9 @@ void session_loop(void(*loophandler)()) {
/* set up for channels which can be read/written */ /* set up for channels which can be read/written */
setchannelfds(&readfd, &writefd); setchannelfds(&readfd, &writefd);
/* Pending connections to test */
set_connect_fds(&writefd);
val = select(ses.maxfd+1, &readfd, &writefd, NULL, &timeout); val = select(ses.maxfd+1, &readfd, &writefd, NULL, &timeout);
if (exitflag) { if (exitflag) {
@ -214,11 +217,13 @@ void session_loop(void(*loophandler)()) {
process_packet(); process_packet();
} }
} }
/* if required, flush out any queued reply packets that /* if required, flush out any queued reply packets that
were being held up during a KEX */ were being held up during a KEX */
maybe_flush_reply_queue(); maybe_flush_reply_queue();
handle_connect_fds(&writefd);
/* process pipes etc for the channels, ses.dataallowed == 0 /* process pipes etc for the channels, ses.dataallowed == 0
* during rekeying ) */ * during rekeying ) */
channelio(&readfd, &writefd); channelio(&readfd, &writefd);

104
dbutil.c
View File

@ -995,6 +995,8 @@ struct dropbear_progress_connection
or NULL. */ or NULL. */
int sock; int sock;
char* errstring;
}; };
/* Deallocate a progress connection. Removes from the pending list if iter!=NULL. /* Deallocate a progress connection. Removes from the pending list if iter!=NULL.
@ -1005,6 +1007,7 @@ static void remove_connect(struct dropbear_progress_connection *c, m_list_elem *
} }
m_free(c->remotehost); m_free(c->remotehost);
m_free(c->remoteport); m_free(c->remoteport);
m_free(c->errstring);
m_free(c); m_free(c);
if (iter) { if (iter) {
@ -1012,12 +1015,24 @@ static void remove_connect(struct dropbear_progress_connection *c, m_list_elem *
} }
} }
static int connect_try_next(struct dropbear_progress_connection *c) { static void cancel_callback(int result, int sock, void* UNUSED(data), const char* UNUSED(errstring)) {
if (result == DROPBEAR_SUCCESS)
{
m_close(sock);
}
}
void cancel_connect(struct dropbear_progress_connection *c) {
c->cb = cancel_callback;
c->cb_data = NULL;
}
static void connect_try_next(struct dropbear_progress_connection *c) {
int err = EADDRNOTAVAIL; int err = EADDRNOTAVAIL;
struct addrinfo *r; struct addrinfo *r;
if (!c->res_iter) { if (!c->res_iter) {
return DROPBEAR_FAILURE; return;
} }
for (r = c->res_iter; r; r = r->ai_next) for (r = c->res_iter; r; r = r->ai_next)
@ -1030,6 +1045,7 @@ static int connect_try_next(struct dropbear_progress_connection *c) {
continue; continue;
} }
ses.maxfd = MAX(ses.maxfd, c->sock);
setnonblocking(c->sock); setnonblocking(c->sock);
#if defined(__linux__) && defined(TCP_DEFER_ACCEPT) #if defined(__linux__) && defined(TCP_DEFER_ACCEPT)
@ -1060,8 +1076,12 @@ static int connect_try_next(struct dropbear_progress_connection *c) {
if (c->sock >= 0 || (errno == EINPROGRESS)) { if (c->sock >= 0 || (errno == EINPROGRESS)) {
/* Success */ /* Success */
set_sock_nodelay(c->sock); set_sock_nodelay(c->sock);
return DROPBEAR_SUCCESS; return;
} else { } else {
if (!c->res_iter)
{
}
/* XXX - returning error message through */ /* XXX - returning error message through */
#if 0 #if 0
/* Failed */ /* Failed */
@ -1073,15 +1093,10 @@ static int connect_try_next(struct dropbear_progress_connection *c) {
} }
TRACE(("Error connecting: %s", strerror(err))) TRACE(("Error connecting: %s", strerror(err)))
#endif #endif
return DROPBEAR_FAILURE;
} }
} }
/* Connect via TCP to a host. Connection will try ipv4 or ipv6, will /* Connect via TCP to a host. */
* return immediately if nonblocking is set. On failure, if errstring
* wasn't null, it will be a newly malloced error message */
/* TODO: maxfd */
struct dropbear_progress_connection *connect_remote(const char* remotehost, const char* remoteport, struct dropbear_progress_connection *connect_remote(const char* remotehost, const char* remoteport,
connect_callback cb, void* cb_data) connect_callback cb, void* cb_data)
{ {
@ -1096,6 +1111,8 @@ struct dropbear_progress_connection *connect_remote(const char* remotehost, cons
c->cb = cb; c->cb = cb;
c->cb_data = cb_data; c->cb_data = cb_data;
list_append(&ses.conn_pending, c);
memset(&hints, 0, sizeof(hints)); memset(&hints, 0, sizeof(hints));
hints.ai_socktype = SOCK_STREAM; hints.ai_socktype = SOCK_STREAM;
hints.ai_family = PF_UNSPEC; hints.ai_family = PF_UNSPEC;
@ -1103,29 +1120,18 @@ struct dropbear_progress_connection *connect_remote(const char* remotehost, cons
err = getaddrinfo(remotehost, remoteport, &hints, &c->res); err = getaddrinfo(remotehost, remoteport, &hints, &c->res);
if (err) { if (err) {
int len; int len;
char *errstring;
len = 100 + strlen(gai_strerror(err)); len = 100 + strlen(gai_strerror(err));
errstring = (char*)m_malloc(len); c->errstring = (char*)m_malloc(len);
snprintf(errstring, len, "Error resolving '%s' port '%s'. %s", snprintf(c->errstring, len, "Error resolving '%s' port '%s'. %s",
remotehost, remoteport, gai_strerror(err)); remotehost, remoteport, gai_strerror(err));
c->cb(DROPBEAR_FAILURE, -1, c->cb_data, errstring);
m_free(errstring);
TRACE(("Error resolving: %s", gai_strerror(err))) TRACE(("Error resolving: %s", gai_strerror(err)))
remove_connect(c, NULL);
return NULL; return NULL;
} }
c->res_iter = c->res; c->res_iter = c->res;
if (connect_try_next(c) == DROPBEAR_FAILURE) { /* Set one going */
/* Should not happen - getaddrinfo() should return failure if there are no addresses */ connect_try_next(c);
c->cb(DROPBEAR_FAILURE, -1, c->cb_data, "No address to try");
TRACE(("leave handle_connect_fds - failed"))
remove_connect(c, NULL);
return NULL;
}
list_append(&ses.conn_pending, c);
return c; return c;
} }
@ -1136,12 +1142,24 @@ void set_connect_fds(fd_set *writefd) {
TRACE(("enter handle_connect_fds")) TRACE(("enter handle_connect_fds"))
for (iter = ses.conn_pending.first; iter; iter = iter->next) { for (iter = ses.conn_pending.first; iter; iter = iter->next) {
struct dropbear_progress_connection *c = iter->item; struct dropbear_progress_connection *c = iter->item;
/* Set one going */
while (c->res_iter && c->sock < 0)
{
connect_try_next(c);
}
if (c->sock >= 0) { if (c->sock >= 0) {
FD_SET(c->sock, writefd); FD_SET(c->sock, writefd);
} } else {
else m_list_elem *remove_iter;
{ /* Final failure */
if (!c->errstring) {
c->errstring = m_strdup("unexpected failure");
}
c->cb(DROPBEAR_FAILURE, -1, c->cb_data, c->errstring);
/* Safely remove without invalidating iter */
remove_iter = iter;
iter = iter->prev;
remove_connect(c, remove_iter);
} }
} }
} }
@ -1162,31 +1180,25 @@ void handle_connect_fds(fd_set *writefd) {
if (getsockopt(c->sock, SOL_SOCKET, SO_ERROR, &val, &vallen) != 0) { if (getsockopt(c->sock, SOL_SOCKET, SO_ERROR, &val, &vallen) != 0) {
TRACE(("handle_connect_fds getsockopt(%d) SO_ERROR failed: %s", c->sock, strerror(errno))) TRACE(("handle_connect_fds getsockopt(%d) SO_ERROR failed: %s", c->sock, strerror(errno)))
/* This isn't expected to happen - Unix has surprises though, continue gracefully. */
m_close(c->sock);
c->sock = -1;
} else if (val != 0) { } else if (val != 0) {
/* Connect failed */ /* Connect failed */
TRACE(("connect to %s port %s failed.", c->remotehost, c->remoteport)) TRACE(("connect to %s port %s failed.", c->remotehost, c->remoteport))
m_close(c->sock); m_close(c->sock);
c->sock = -1; c->sock = -1;
if (connect_try_next(c) == DROPBEAR_FAILURE) { m_free(c->errstring);
c->cb(DROPBEAR_FAILURE, -1, c->cb_data, strerror(val)); c->errstring = strerror(val);
TRACE(("leave handle_connect_fds - failed")) } else {
remove_connect(c, iter); /* New connection has been established */
/* Must return here - remove_connect() invalidates iter */ c->cb(DROPBEAR_SUCCESS, c->sock, c->cb_data, NULL);
return; remove_connect(c, iter);
} else { TRACE(("leave handle_connect_fds - success"))
/* new connection try was successfuly started, will be finished by a /* Must return here - remove_connect() invalidates iter */
later call to handle_connect_fds() */ return;
TRACE(("leave handle_connect_fds - new try"))
continue;
}
} }
/* New connection has been established */
c->cb(DROPBEAR_SUCCESS, c->sock, c->cb_data, "");
remove_connect(c, iter);
TRACE(("leave handle_connect_fds - success"))
/* Must return here - remove_connect() invalidates iter */
return;
} }
TRACE(("leave handle_connect_fds - end iter")) TRACE(("leave handle_connect_fds - end iter"))
} }

View File

@ -129,4 +129,7 @@ struct dropbear_progress_connection * connect_remote (const char* remotehost, co
void set_connect_fds(fd_set *writefd); void set_connect_fds(fd_set *writefd);
void handle_connect_fds(fd_set *writefd); void handle_connect_fds(fd_set *writefd);
/* Doesn't actually stop the connect, but adds a dummy callback instead */
void cancel_connect(struct dropbear_progress_connection *c);
#endif /* _DBUTIL_H_ */ #endif /* _DBUTIL_H_ */

View File

@ -52,6 +52,7 @@ static buffer* buf_decompress(buffer* buf, unsigned int len);
static void buf_compress(buffer * dest, buffer * src, unsigned int len); static void buf_compress(buffer * dest, buffer * src, unsigned int len);
#endif #endif
#if 0
struct iovec * dropbear_queue_to_iovec(struct Queue *queue) { struct iovec * dropbear_queue_to_iovec(struct Queue *queue) {
struct iovec *iov = NULL; struct iovec *iov = NULL;
@ -69,6 +70,7 @@ struct iovec * dropbear_queue_to_iovec(struct Queue *queue) {
void dropbear_queue_consume(struct Queue *queue, ssize_t written) { void dropbear_queue_consume(struct Queue *queue, ssize_t written) {
} }
#endif
/* non-blocking function writing out a current encrypted packet */ /* non-blocking function writing out a current encrypted packet */
void write_packet() { void write_packet() {

View File

@ -61,6 +61,7 @@ void svr_dropbear_log(int priority, const char* format, va_list param);
/* Client */ /* Client */
void cli_session(int sock_in, int sock_out); void cli_session(int sock_in, int sock_out);
void cli_connected(int result, int sock, void* userdata, const char *errstring);
void cleantext(unsigned char* dirtytext); void cleantext(unsigned char* dirtytext);
/* crypto parameters that are stored individually for transmit and receive */ /* crypto parameters that are stored individually for transmit and receive */

View File

@ -270,19 +270,7 @@ static int newtcpdirect(struct Channel * channel) {
} }
snprintf(portstring, sizeof(portstring), "%d", destport); snprintf(portstring, sizeof(portstring), "%d", destport);
sock = connect_remote(desthost, portstring, NULL); channel->conn_pending = connect_remote(desthost, portstring, channel_connect_done, channel);
if (sock < 0) {
err = SSH_OPEN_CONNECT_FAILED;
TRACE(("leave newtcpdirect: sock failed"))
goto out;
}
ses.maxfd = MAX(ses.maxfd, sock);
/* We don't set readfd, that will get set after the connection's
* progress succeeds */
channel->writefd = sock;
channel->initconn = 1;
channel->prio = DROPBEAR_CHANNEL_PRIO_UNKNOWABLE; channel->prio = DROPBEAR_CHANNEL_PRIO_UNKNOWABLE;