# # patch "ChangeLog" # from [935b6b18ae73c892ace4c8fd4471d1db0d3d8e12] # to [e3fedf7bec874242af4435ee5bb81be25f508a9a] # # patch "contrib/usher.cc" # from [f772b459368204c6ae5095edc8531377bf7d28da] # to [8f667fecdd0c69ee5e67fb3df9a0818bba2d529f] # ======================================================================== --- ChangeLog 935b6b18ae73c892ace4c8fd4471d1db0d3d8e12 +++ ChangeLog e3fedf7bec874242af4435ee5bb81be25f508a9a @@ -1,3 +1,11 @@ +2005-10-22 Timothy Brownawell + + * contrib/usher.cc: Support friendly shutdown (no new connections, but + don't kill existing ones) of local servers and the usher as a whole. + Simple admin interface to start/stop or get the status of individual + servers or the whole usher (listens for connections on a separate + port, only enabled if specified on the command line). + 2005-10-18 Timothy Brownawell * contrib/usher.cc: Update comment about client versions ======================================================================== --- contrib/usher.cc f772b459368204c6ae5095edc8531377bf7d28da +++ contrib/usher.cc 8f667fecdd0c69ee5e67fb3df9a0818bba2d529f @@ -12,11 +12,14 @@ // a post-0.23 client is needed (0.23 clients can only be matched against // their include pattern). // -// Usage: usher [-a ] [-p ] +// Usage: usher [-l address[:port]] [-a address:port] // -// is the address to listen on -// is the local port to listen on -// is a file that looks like +// options: +// -l address and port to listen on, defaults to 0.0.0.0:5253 +// -a address and port to listen for admin commands +// a file that looks like +// userpass username password +// // server monotone // host localhost // pattern net.venge.monotone @@ -27,7 +30,9 @@ // pattern * // local -d /usr/local/src/managed/mt.db~ * // -// or in general, blocks of a +// or in general, one block of one or more lines of +// userpass +// followed by any number of blocks of a // server // line followed by one or more // host @@ -38,6 +43,8 @@ // local // , with blocks separated by blank lines // +// "userpass" lines specify who is allowed to use the administrative port. +// // A request to server "hostname" will be directed to the // server at :, if that stem is marked as remote, // and to a local server managed by the usher, started with the given @@ -47,6 +54,68 @@ // to connect to, but does not have to match exactly. This means that you don't // have to know in advance whether clients will be asking for // or : . +// +// +// Admin commands +// +// If the -a option is given, the usher will listen for administrative +// connections on that port. The connecting client gives commands of the form +// COMMAND [arguments] +// , and after any command except USERPASS the usher will send a reply and +// close the connection. The reply will always end with a newline. +// +// USERPASS username password +// Required before any other command, so random people can't do bad things. +// If incorrect, the connection will be closed immediately. +// +// STATUS [servername] +// Get the status of a server, as named by the "server" lines in the +// config file. If a server is specified, the result will be one of: +// REMOTE - this is a remote server without active connections +// ACTIVE n - this server currently has n active connections +// WAITING - this (local) server is running, but has no connections +// SLEEPING - this (local) server is not running, but is available +// STOPPING n - this (local) server has been asked to stop, but still has +// n active connections. It will not accept further connections. +// STOPPED - this (local) server has been stopped, and will not accept +// connections. The server process is not running. +// SHUTTINGDOWN - the usher has been shut down, no servers are accepting +// connections. +// SHUTDOWN - the usher has been shut down, all connections have been closed, +// and all local server processes have been stopped. +// If no server is specified, the repsonse will be SHUTTINGDOWN, SHUTDOWN, +// WAITING, or ACTIVE (with n being the total number of open connections, +// across all servers). +// +// STOP servername +// Prevent the given local server from receiving further connections, and stop +// it once all connections are closed. The result will be the new status of +// that server: ACTIVE local servers become STOPPING, and WAITING and SLEEPING +// servers become STOPPED. Servers in other states are not affected. +// +// START servername +// Allow a stopped or stopping server to receive connections again. The result +// will be the new status of that server. (A server in the "STOPPING" state +// becomes ACTIVE, and a STOPPED server becomes SLEEPING. A server in some +// other state is not affected.) +// +// LIST [state] +// Returns a space-separated list of all servers. If a state is given, only +// list the servers that are in that state. +// +// SHUTDOWN +// Do not accept new connections for any servers, local or remote. Returns "ok". +// +// STARTUP +// Begin accepting connections again after a SHUTDOWN. Returns "ok". +// +// CONNECTIONS +// Returns the number of connections currently open. +// +// RELOAD +// Reload the config file (same as sending SIGHUP). The reply will be "ok", +// and will not be given until the config file has been reloaded. +// #include #include @@ -65,6 +134,7 @@ #include #include #include +#include #include #include #include @@ -82,6 +152,7 @@ using boost::lexical_cast; using boost::shared_ptr; using std::cerr; +using std::pair; using std::make_pair; // defaults, overridden by command line @@ -104,13 +175,17 @@ string const greeting = " Hello! This is the monotone usher at localhost. What would you like?"; -string const errmsg = "!Sorry, I don't know where to find that."; +string const notfound = "!Sorry, I don't know where to find that."; +string const disabled = "!Sorry, this usher is not currently accepting connections."; +string const srvdisabled = "!Sorry, that server is currently disabled."; + struct errstr { std::string name; int err; + errstr(std::string const & s): name(s), err(0) {} errstr(std::string const & s, int e): name(s), err(e) {} }; @@ -247,27 +322,39 @@ if (s) s[1]++; } - ~sock() + void deref() { - if (!s || s[1]--) - return; - try { - close(); - } catch(errstr & e) { - // if you want it to throw errors, call close manually + if (s && !(--s[1])) { + try { + close(); + } catch(errstr & e) { + // if you want it to throw errors, call close manually + } + delete[] s; + all_socks.erase(all_socks.find(s)); } - delete[] s; - all_socks.erase(all_socks.find(s)); s = 0; } - sock operator=(int ss) + ~sock() { - if (!s) { - s = new int[2]; - all_socks.insert(s); - } + deref(); + } + sock const & operator=(int ss) + { + deref(); + s = new int[2]; + all_socks.insert(s); s[0]=ss; + return *this; } + sock const & operator=(sock const & ss) + { + deref(); + s = ss.s; + if (s) + ++s[1]; + return *this; + } void close() { if (!s || s[0] == -1) @@ -345,7 +432,6 @@ currport = minport-1; for(int i = 0; i < 4; ++i) curraddr[i] = minaddr[i]; - curraddr[0]; } do { // get the next address in our list @@ -421,7 +507,7 @@ } char ** a = new char*[args.size()+1]; - for (int i = 0; i < args.size(); ++i) { + for (unsigned int i = 0; i < args.size(); ++i) { a[i] = new char[args[i].size()+1]; memcpy(a[i], args[i].c_str(), args[i].size()+1); } @@ -456,8 +542,78 @@ } } +bool connections_allowed = true; +int total_connections = 0; + +struct serverstate +{ + enum ss {remote, active, waiting, sleeping, stopping, + stopped, shuttingdown, shutdown, unknown}; + ss state; + int num; + serverstate(): state(unknown), num(0) {} + serverstate const & operator=(string const & s) + { + if (s == "REMOTE") + state = remote; + else if (s == "ACTIVE") + state = active; + else if (s == "WAITING") + state = waiting; + else if (s == "SLEEPING") + state = sleeping; + else if (s == "STOPPING") + state = stopping; + else if (s == "STOPPED") + state = stopped; + else if (s == "SHUTTINGDOWN") + state = shuttingdown; + else if (s == "SHUTDOWN") + state = shutdown; + return *this; + } + bool operator==(string const & s) + { + serverstate foo; + foo = s; + return foo.state == state; + } +}; +std::ostream & operator<<(std::ostream & os, serverstate const & ss) +{ + switch (ss.state) { + case serverstate::remote: + os<<"REMOTE"; + break; + case serverstate::active: + os<<"ACTIVE "< >::iterator> by_host, by_pat; map >::iterator by_name; static map > servers_by_host; @@ -471,7 +627,7 @@ int port; int connection_count; int last_conn_time; - server() : pid(-1), local(false), port(0), + server() : enabled(true), local(false), pid(-1), port(0), connection_count(0), last_conn_time(0) { } @@ -479,6 +635,30 @@ { yeskill(); } + serverstate get_state() + { + serverstate ss; + ss.num = connection_count; + if (!connections_allowed) { + if (!total_connections) + ss.state = serverstate::shutdown; + else + ss.state = serverstate::shuttingdown; + } else if (connection_count) { + if (enabled) + ss.state = serverstate::active; + else + ss.state = serverstate::stopping; + } else if (!local) + ss.state = serverstate::remote; + else if (!enabled) + ss.state = serverstate::stopped; + else if (pid == -1) + ss.state = serverstate::sleeping; + else + ss.state = serverstate::waiting; + return ss; + } void delist() { vector foo; @@ -539,6 +719,10 @@ } sock connect() { + if (!connections_allowed) + throw errstr("all servers disabled"); + if (!enabled) + throw errstr("server disabled"); if (local && pid == -1) { // server needs to be started // we'll try 3 times, since there's a delay between our checking that @@ -550,7 +734,7 @@ args.push_back("monotone"); args.push_back("serve"); args.push_back("--bind=" + addr + ":" + lexical_cast(port)); - int n = 0, m = 0; + unsigned int n = 0, m = 0; n = arguments.find_first_not_of(" \t"); while (n != string::npos && m != string::npos) { m = arguments.find_first_of(" ", n); @@ -561,23 +745,32 @@ } } sock s = make_outgoing(port, addr); + if (local && !connection_count) { + live_servers.insert(by_name->second); + } ++connection_count; + ++total_connections; return s; } void disconnect() { + --total_connections; if (--connection_count || !local) return; last_conn_time = time(0); + maybekill(); } void maybekill() { if (!local) return; + if (pid == -1) + return; int difftime = time(0) - last_conn_time; - if (difftime > server_idle_timeout && !connection_count) - yeskill(); - else if (waitpid(pid, 0, WNOHANG) == 0) { + if (!connection_count + && (difftime > server_idle_timeout || !connections_allowed)) + yeskill(); + else if (waitpid(pid, 0, WNOHANG) > 0) { pid = -1; port = 0; } @@ -590,6 +783,7 @@ do {r = waitpid(pid, 0, 0);} while (r==-1 && errno == EINTR); pid = -1; port = 0; + live_servers.erase(live_servers.find(by_name->second)); } } string name() @@ -659,11 +853,17 @@ } if (name.empty()) return string(); - shared_ptr srv(new server); + shared_ptr srv; map >::iterator i = server::servers_by_name.find(name); - if (i != server::servers_by_name.end()) + if (i != server::servers_by_name.end()) { + if (local && i->second->local && i->second->arguments == desc) + srv = i->second; + else + srv = shared_ptr(new server); i->second->delist(); + } else + srv = shared_ptr(new server); srv->by_name = server::servers_by_name.insert(make_pair(name, srv)).first; srv->set_hosts(hosts); srv->set_patterns(patterns); @@ -672,7 +872,7 @@ srv->arguments = desc; } else { srv->local = false; - int c = desc.find(":"); + unsigned int c = desc.find(":"); if (c != desc.npos) { srv->addr = desc.substr(0, c); srv->port = lexical_cast(desc.substr(c+1)); @@ -703,6 +903,18 @@ return shared_ptr(); } +shared_ptr get_server(string const & name) +{ + map >::iterator i; + for (i = server::servers_by_name.begin(); + i != server::servers_by_name.end(); ++i) { + if (name == i->first) { + return i->second; + } + } + return shared_ptr(); +} + void kill_old_servers() { set >::iterator i; @@ -722,7 +934,7 @@ got = p[b]; out += ((int)(p[b] & 0x7f))<<(b*7); ++b; - } while (b*7 < sizeof(int)*8-1 && (got & 0x80)); + } while ((unsigned int)(b*7) < sizeof(int)*8-1 && (got & 0x80)); return b; } @@ -800,7 +1012,7 @@ } ~channel() { - if (who) + if (who && !no_server) who->disconnect(); } bool is_finished() @@ -860,20 +1072,23 @@ string reply_srv, reply_pat; if (extract_reply(cbuf, reply_srv, reply_pat)) { who = get_server(reply_srv, reply_pat); - if (who) { + if (who && who->enabled) { try { srv = who->connect(); have_routed = true; s = srv; } catch (errstr & e) { - std::cerr<<"cannot contact server "<name()<<"\n"; + cerr<<"cannot contact server "<name()<<"\n"; no_server = true; } } else { char * dat; int size; sbuf.getwrite(p, n); - make_packet(errmsg, dat, size); + if (who->enabled) + make_packet(notfound, dat, size); + else + make_packet(srvdisabled, dat, size); if (n < size) size = n; memcpy(p, dat, size); sbuf.fixwrite(size); @@ -901,19 +1116,32 @@ if ((no_server || have_routed && s < 0) && !sbuf.canread()) { cli.close(), c = -1; } + return true; } }; int channel::counter = 0; bool reload_pending; +map admins; +string conffile; void reload_conffile(string const & file) { reload_pending = false; cerr<<"Reloading config file...\n"; std::ifstream cf(file.c_str()); + + string line = getline(cf); + while (!line.empty()) { + std::istringstream iss(line); + string a, b, c; + iss>>a>>b>>c; + if (a == "userpass") + admins.insert(make_pair(b, c)); + line = getline(cf); + } + set names; - int pos = 0; while(cf) { string n = read_server_record(cf); if(!n.empty()) @@ -943,22 +1171,217 @@ reload_pending = true; } +struct administrator +{ + sock port; + struct cstate + { + bool auth; + bool rdone; + string buf; + cstate(): auth(false), rdone(false) {} + }; + list > conns; + administrator(): port(-1) + {} + bool process(cstate & cs) + { + unsigned int n = cs.buf.find("\n"); + if (n == cs.buf.npos) + return true; + string l = cs.buf.substr(0, n); + cs.buf.erase(0, n+1); + std::istringstream iss(l); + string cmd; + iss>>cmd; + if (cmd == "USERPASS") { + string user, pass; + iss>>user>>pass; + map::iterator i = admins.find(user); + if (i == admins.end() || i->second != pass) { + cerr<<"Failed admin login.\n"; + return false; + } else { + if (cs.auth == true) + return false; + cs.auth = true; + process(cs); + } + } else if (cmd == "STATUS") { + string srv; + iss>>srv; + std::ostringstream oss; + if (srv.empty()) { + serverstate ss; + ss.num = total_connections; + if (connections_allowed) { + if (total_connections) + ss.state = serverstate::active; + else + ss.state = serverstate::waiting; + } else { + if (total_connections) + ss.state = serverstate::shuttingdown; + else + ss.state = serverstate::shutdown; + } + oss< >::iterator i; + i = server::servers_by_name.find(srv); + if (i != server::servers_by_name.end()) + oss<second->get_state()<<"\n"; + else + oss<<"No such server.\n"; + } + cs.buf = oss.str(); + } else if (cmd == "START") { + string srv; + iss>>srv; + std::ostringstream oss; + map >::iterator i; + i = server::servers_by_name.find(srv); + if (i != server::servers_by_name.end()) { + i->second->enabled = true; + oss<second->get_state()<<"\n"; + } else + oss<<"No such server.\n"; + cs.buf = oss.str(); + } else if (cmd == "STOP") { + string srv; + iss>>srv; + std::ostringstream oss; + map >::iterator i; + i = server::servers_by_name.find(srv); + if (i != server::servers_by_name.end()) { + i->second->enabled = false; + i->second->maybekill(); + oss<second->get_state()<<"\n"; + } else + oss<<"No such server.\n"; + cs.buf = oss.str(); + } else if (cmd == "LIST") { + string state; + iss>>state; + map >::iterator i; + for (i = server::servers_by_name.begin(); + i != server::servers_by_name.end(); ++i) { + if (state.empty() || i->second->get_state() == state) + cs.buf += (cs.buf.empty()?"":" ") + i->first; + } + cs.buf += "\n"; + } else if (cmd == "SHUTDOWN") { + connections_allowed = false; + kill_old_servers(); + cs.buf = "ok\n"; + } else if (cmd == "CONNECTIONS") { + cs.buf = lexical_cast(total_connections) + "\n"; + } else if (cmd == "RELOAD") { + reload_conffile(conffile); + cs.buf = "ok\n"; + } else if (cmd == "STARTUP") { + connections_allowed = true; + cs.buf = "ok\n"; + } + cs.rdone = true; + return true; + } + void initialize(string const & ap) + { + try { + int c = ap.find(":"); + string a = ap.substr(0, c); + int p = lexical_cast(ap.substr(c+1)); + port = start(a, p); + } catch (errstr & s) { + cerr<<"Could not initialize admin port: "< >::iterator i = conns.begin(); + i != conns.end(); ++i) { + int c = i->second; + if (i->first.rdone) + FD_SET(c, &wr); + else + FD_SET(c, &rd); + maxfd = max(maxfd, int(c)); + } + } + void process_selected(fd_set & rd, fd_set & wr, fd_set & er) + { + if (int(port) == -1) + return; + if (FD_ISSET(port, &rd)) { + try { + struct sockaddr_in addr; + unsigned int l = sizeof(addr); + memset(&addr, 0, l); + sock nc = tosserr(accept(port, (struct sockaddr *) + &addr, &l), "accept()"); + conns.push_back(make_pair(cstate(), nc)); + } catch(errstr & s) { + cerr<<"During new admin connection: "< >::iterator i = conns.begin(); + i != conns.end(); ++i) { + int c = i->second; + if (c <= 0) + conns.erase(i); + else if (FD_ISSET(c, &rd)) { + char buf[120]; + int n; + n = read(c, buf, 120); + if (n < 1) + conns.erase(i); + i->first.buf.append(buf, n); + if (!process(i->first)) { + cerr<<"Closing connection...\n"; +// i->second.close(); + conns.erase(i); + } + } + else if (FD_ISSET(c, &wr)) { + int n = write(c, i->first.buf.c_str(), i->first.buf.size()); + if (n < 1) + conns.erase(i); + else { + i->first.buf.erase(0, n); + if (i->first.buf.empty()) + conns.erase(i); + } + } + } + } +}; + int main (int argc, char **argv) { - string conffile; + administrator admin; { int i; for (i = 1; i < argc; ++i) { - if (string(argv[i]) == "-a") - listenaddr = argv[++i]; - else if (string(argv[i]) == "-p") - listenport = lexical_cast(argv[++i]); + if (string(argv[i]) == "-l") { + string lp(argv[++i]); + unsigned int c = lp.find(":"); + listenaddr = lp.substr(0, c); + if (c != lp.npos) + listenport = lexical_cast(lp.substr(c+1)); + } else if (string(argv[i]) == "-a") + admin.initialize(argv[++i]); else conffile = argv[i]; } if (conffile.empty() || i != argc) { cerr<<"Usage:\n"; - cerr<<"\tusher [-a ] [-p ] \n"; + cerr<<"\tusher [-l addr[:port]] [-a addr:port] \n"; exit (1); } } @@ -998,14 +1421,14 @@ i != channels.end(); ++i) i->add_to_select(nfds, rd, wr, er); + admin.add_to_select(nfds, rd, wr, er); + timeval timeout; timeout.tv_sec = 10; timeout.tv_usec = 0; int r = select(nfds+1, &rd, &wr, &er, &timeout); - if (r == -1 && errno == EINTR) - continue; - if (r < 0) { + if (r < 0 && errno != EINTR) { perror ("select()"); exit (1); } @@ -1016,7 +1439,15 @@ memset(&client_address, 0, l); sock cli = tosserr(accept(h, (struct sockaddr *) &client_address, &l), "accept()"); - newchan = new channel(cli); + if (connections_allowed) + newchan = new channel(cli); + else { + char * dat; + int size; + make_packet(disabled, dat, size); + write(cli, dat, size); + delete[] dat; + } } catch(errstr & s) { cerr<<"During new connection: "<