#
# patch "ChangeLog"
# from [935b6b18ae73c892ace4c8fd4471d1db0d3d8e12]
# to [e3fedf7bec874242af4435ee5bb81be25f508a9a]
#
# patch "contrib/usher.cc"
# from [f772b459368204c6ae5095edc8531377bf7d28da]
# to [8f667fecdd0c69ee5e67fb3df9a0818bba2d529f]
#
========================================================================
--- ChangeLog 935b6b18ae73c892ace4c8fd4471d1db0d3d8e12
+++ ChangeLog e3fedf7bec874242af4435ee5bb81be25f508a9a
@@ -1,3 +1,11 @@
+2005-10-22 Timothy Brownawell
+
+ * contrib/usher.cc: Support friendly shutdown (no new connections, but
+ don't kill existing ones) of local servers and the usher as a whole.
+ Simple admin interface to start/stop or get the status of individual
+ servers or the whole usher (listens for connections on a separate
+ port, only enabled if specified on the command line).
+
2005-10-18 Timothy Brownawell
* contrib/usher.cc: Update comment about client versions
========================================================================
--- contrib/usher.cc f772b459368204c6ae5095edc8531377bf7d28da
+++ contrib/usher.cc 8f667fecdd0c69ee5e67fb3df9a0818bba2d529f
@@ -12,11 +12,14 @@
// a post-0.23 client is needed (0.23 clients can only be matched against
// their include pattern).
//
-// Usage: usher [-a ] [-p ]
+// Usage: usher [-l address[:port]] [-a address:port]
//
-// is the address to listen on
-// is the local port to listen on
-// is a file that looks like
+// options:
+// -l address and port to listen on, defaults to 0.0.0.0:5253
+// -a address and port to listen for admin commands
+// a file that looks like
+// userpass username password
+//
// server monotone
// host localhost
// pattern net.venge.monotone
@@ -27,7 +30,9 @@
// pattern *
// local -d /usr/local/src/managed/mt.db~ *
//
-// or in general, blocks of a
+// or in general, one block of one or more lines of
+// userpass
+// followed by any number of blocks of a
// server
// line followed by one or more
// host
@@ -38,6 +43,8 @@
// local
// , with blocks separated by blank lines
//
+// "userpass" lines specify who is allowed to use the administrative port.
+//
// A request to server "hostname" will be directed to the
// server at :, if that stem is marked as remote,
// and to a local server managed by the usher, started with the given
@@ -47,6 +54,68 @@
// to connect to, but does not have to match exactly. This means that you don't
// have to know in advance whether clients will be asking for
// or : .
+//
+//
+// Admin commands
+//
+// If the -a option is given, the usher will listen for administrative
+// connections on that port. The connecting client gives commands of the form
+// COMMAND [arguments]
+// , and after any command except USERPASS the usher will send a reply and
+// close the connection. The reply will always end with a newline.
+//
+// USERPASS username password
+// Required before any other command, so random people can't do bad things.
+// If incorrect, the connection will be closed immediately.
+//
+// STATUS [servername]
+// Get the status of a server, as named by the "server" lines in the
+// config file. If a server is specified, the result will be one of:
+// REMOTE - this is a remote server without active connections
+// ACTIVE n - this server currently has n active connections
+// WAITING - this (local) server is running, but has no connections
+// SLEEPING - this (local) server is not running, but is available
+// STOPPING n - this (local) server has been asked to stop, but still has
+// n active connections. It will not accept further connections.
+// STOPPED - this (local) server has been stopped, and will not accept
+// connections. The server process is not running.
+// SHUTTINGDOWN - the usher has been shut down, no servers are accepting
+// connections.
+// SHUTDOWN - the usher has been shut down, all connections have been closed,
+// and all local server processes have been stopped.
+// If no server is specified, the repsonse will be SHUTTINGDOWN, SHUTDOWN,
+// WAITING, or ACTIVE (with n being the total number of open connections,
+// across all servers).
+//
+// STOP servername
+// Prevent the given local server from receiving further connections, and stop
+// it once all connections are closed. The result will be the new status of
+// that server: ACTIVE local servers become STOPPING, and WAITING and SLEEPING
+// servers become STOPPED. Servers in other states are not affected.
+//
+// START servername
+// Allow a stopped or stopping server to receive connections again. The result
+// will be the new status of that server. (A server in the "STOPPING" state
+// becomes ACTIVE, and a STOPPED server becomes SLEEPING. A server in some
+// other state is not affected.)
+//
+// LIST [state]
+// Returns a space-separated list of all servers. If a state is given, only
+// list the servers that are in that state.
+//
+// SHUTDOWN
+// Do not accept new connections for any servers, local or remote. Returns "ok".
+//
+// STARTUP
+// Begin accepting connections again after a SHUTDOWN. Returns "ok".
+//
+// CONNECTIONS
+// Returns the number of connections currently open.
+//
+// RELOAD
+// Reload the config file (same as sending SIGHUP). The reply will be "ok",
+// and will not be given until the config file has been reloaded.
+//
#include
#include
@@ -65,6 +134,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -82,6 +152,7 @@
using boost::lexical_cast;
using boost::shared_ptr;
using std::cerr;
+using std::pair;
using std::make_pair;
// defaults, overridden by command line
@@ -104,13 +175,17 @@
string const greeting = " Hello! This is the monotone usher at localhost. What would you like?";
-string const errmsg = "!Sorry, I don't know where to find that.";
+string const notfound = "!Sorry, I don't know where to find that.";
+string const disabled = "!Sorry, this usher is not currently accepting connections.";
+string const srvdisabled = "!Sorry, that server is currently disabled.";
+
struct errstr
{
std::string name;
int err;
+ errstr(std::string const & s): name(s), err(0) {}
errstr(std::string const & s, int e): name(s), err(e) {}
};
@@ -247,27 +322,39 @@
if (s)
s[1]++;
}
- ~sock()
+ void deref()
{
- if (!s || s[1]--)
- return;
- try {
- close();
- } catch(errstr & e) {
- // if you want it to throw errors, call close manually
+ if (s && !(--s[1])) {
+ try {
+ close();
+ } catch(errstr & e) {
+ // if you want it to throw errors, call close manually
+ }
+ delete[] s;
+ all_socks.erase(all_socks.find(s));
}
- delete[] s;
- all_socks.erase(all_socks.find(s));
s = 0;
}
- sock operator=(int ss)
+ ~sock()
{
- if (!s) {
- s = new int[2];
- all_socks.insert(s);
- }
+ deref();
+ }
+ sock const & operator=(int ss)
+ {
+ deref();
+ s = new int[2];
+ all_socks.insert(s);
s[0]=ss;
+ return *this;
}
+ sock const & operator=(sock const & ss)
+ {
+ deref();
+ s = ss.s;
+ if (s)
+ ++s[1];
+ return *this;
+ }
void close()
{
if (!s || s[0] == -1)
@@ -345,7 +432,6 @@
currport = minport-1;
for(int i = 0; i < 4; ++i)
curraddr[i] = minaddr[i];
- curraddr[0];
}
do {
// get the next address in our list
@@ -421,7 +507,7 @@
}
char ** a = new char*[args.size()+1];
- for (int i = 0; i < args.size(); ++i) {
+ for (unsigned int i = 0; i < args.size(); ++i) {
a[i] = new char[args[i].size()+1];
memcpy(a[i], args[i].c_str(), args[i].size()+1);
}
@@ -456,8 +542,78 @@
}
}
+bool connections_allowed = true;
+int total_connections = 0;
+
+struct serverstate
+{
+ enum ss {remote, active, waiting, sleeping, stopping,
+ stopped, shuttingdown, shutdown, unknown};
+ ss state;
+ int num;
+ serverstate(): state(unknown), num(0) {}
+ serverstate const & operator=(string const & s)
+ {
+ if (s == "REMOTE")
+ state = remote;
+ else if (s == "ACTIVE")
+ state = active;
+ else if (s == "WAITING")
+ state = waiting;
+ else if (s == "SLEEPING")
+ state = sleeping;
+ else if (s == "STOPPING")
+ state = stopping;
+ else if (s == "STOPPED")
+ state = stopped;
+ else if (s == "SHUTTINGDOWN")
+ state = shuttingdown;
+ else if (s == "SHUTDOWN")
+ state = shutdown;
+ return *this;
+ }
+ bool operator==(string const & s)
+ {
+ serverstate foo;
+ foo = s;
+ return foo.state == state;
+ }
+};
+std::ostream & operator<<(std::ostream & os, serverstate const & ss)
+{
+ switch (ss.state) {
+ case serverstate::remote:
+ os<<"REMOTE";
+ break;
+ case serverstate::active:
+ os<<"ACTIVE "< >::iterator> by_host, by_pat;
map >::iterator by_name;
static map > servers_by_host;
@@ -471,7 +627,7 @@
int port;
int connection_count;
int last_conn_time;
- server() : pid(-1), local(false), port(0),
+ server() : enabled(true), local(false), pid(-1), port(0),
connection_count(0), last_conn_time(0)
{
}
@@ -479,6 +635,30 @@
{
yeskill();
}
+ serverstate get_state()
+ {
+ serverstate ss;
+ ss.num = connection_count;
+ if (!connections_allowed) {
+ if (!total_connections)
+ ss.state = serverstate::shutdown;
+ else
+ ss.state = serverstate::shuttingdown;
+ } else if (connection_count) {
+ if (enabled)
+ ss.state = serverstate::active;
+ else
+ ss.state = serverstate::stopping;
+ } else if (!local)
+ ss.state = serverstate::remote;
+ else if (!enabled)
+ ss.state = serverstate::stopped;
+ else if (pid == -1)
+ ss.state = serverstate::sleeping;
+ else
+ ss.state = serverstate::waiting;
+ return ss;
+ }
void delist()
{
vector foo;
@@ -539,6 +719,10 @@
}
sock connect()
{
+ if (!connections_allowed)
+ throw errstr("all servers disabled");
+ if (!enabled)
+ throw errstr("server disabled");
if (local && pid == -1) {
// server needs to be started
// we'll try 3 times, since there's a delay between our checking that
@@ -550,7 +734,7 @@
args.push_back("monotone");
args.push_back("serve");
args.push_back("--bind=" + addr + ":" + lexical_cast(port));
- int n = 0, m = 0;
+ unsigned int n = 0, m = 0;
n = arguments.find_first_not_of(" \t");
while (n != string::npos && m != string::npos) {
m = arguments.find_first_of(" ", n);
@@ -561,23 +745,32 @@
}
}
sock s = make_outgoing(port, addr);
+ if (local && !connection_count) {
+ live_servers.insert(by_name->second);
+ }
++connection_count;
+ ++total_connections;
return s;
}
void disconnect()
{
+ --total_connections;
if (--connection_count || !local)
return;
last_conn_time = time(0);
+ maybekill();
}
void maybekill()
{
if (!local)
return;
+ if (pid == -1)
+ return;
int difftime = time(0) - last_conn_time;
- if (difftime > server_idle_timeout && !connection_count)
- yeskill();
- else if (waitpid(pid, 0, WNOHANG) == 0) {
+ if (!connection_count
+ && (difftime > server_idle_timeout || !connections_allowed))
+ yeskill();
+ else if (waitpid(pid, 0, WNOHANG) > 0) {
pid = -1;
port = 0;
}
@@ -590,6 +783,7 @@
do {r = waitpid(pid, 0, 0);} while (r==-1 && errno == EINTR);
pid = -1;
port = 0;
+ live_servers.erase(live_servers.find(by_name->second));
}
}
string name()
@@ -659,11 +853,17 @@
}
if (name.empty())
return string();
- shared_ptr srv(new server);
+ shared_ptr srv;
map >::iterator
i = server::servers_by_name.find(name);
- if (i != server::servers_by_name.end())
+ if (i != server::servers_by_name.end()) {
+ if (local && i->second->local && i->second->arguments == desc)
+ srv = i->second;
+ else
+ srv = shared_ptr(new server);
i->second->delist();
+ } else
+ srv = shared_ptr(new server);
srv->by_name = server::servers_by_name.insert(make_pair(name, srv)).first;
srv->set_hosts(hosts);
srv->set_patterns(patterns);
@@ -672,7 +872,7 @@
srv->arguments = desc;
} else {
srv->local = false;
- int c = desc.find(":");
+ unsigned int c = desc.find(":");
if (c != desc.npos) {
srv->addr = desc.substr(0, c);
srv->port = lexical_cast(desc.substr(c+1));
@@ -703,6 +903,18 @@
return shared_ptr();
}
+shared_ptr get_server(string const & name)
+{
+ map >::iterator i;
+ for (i = server::servers_by_name.begin();
+ i != server::servers_by_name.end(); ++i) {
+ if (name == i->first) {
+ return i->second;
+ }
+ }
+ return shared_ptr();
+}
+
void kill_old_servers()
{
set >::iterator i;
@@ -722,7 +934,7 @@
got = p[b];
out += ((int)(p[b] & 0x7f))<<(b*7);
++b;
- } while (b*7 < sizeof(int)*8-1 && (got & 0x80));
+ } while ((unsigned int)(b*7) < sizeof(int)*8-1 && (got & 0x80));
return b;
}
@@ -800,7 +1012,7 @@
}
~channel()
{
- if (who)
+ if (who && !no_server)
who->disconnect();
}
bool is_finished()
@@ -860,20 +1072,23 @@
string reply_srv, reply_pat;
if (extract_reply(cbuf, reply_srv, reply_pat)) {
who = get_server(reply_srv, reply_pat);
- if (who) {
+ if (who && who->enabled) {
try {
srv = who->connect();
have_routed = true;
s = srv;
} catch (errstr & e) {
- std::cerr<<"cannot contact server "<name()<<"\n";
+ cerr<<"cannot contact server "<name()<<"\n";
no_server = true;
}
} else {
char * dat;
int size;
sbuf.getwrite(p, n);
- make_packet(errmsg, dat, size);
+ if (who->enabled)
+ make_packet(notfound, dat, size);
+ else
+ make_packet(srvdisabled, dat, size);
if (n < size) size = n;
memcpy(p, dat, size);
sbuf.fixwrite(size);
@@ -901,19 +1116,32 @@
if ((no_server || have_routed && s < 0) && !sbuf.canread()) {
cli.close(), c = -1;
}
+ return true;
}
};
int channel::counter = 0;
bool reload_pending;
+map admins;
+string conffile;
void reload_conffile(string const & file)
{
reload_pending = false;
cerr<<"Reloading config file...\n";
std::ifstream cf(file.c_str());
+
+ string line = getline(cf);
+ while (!line.empty()) {
+ std::istringstream iss(line);
+ string a, b, c;
+ iss>>a>>b>>c;
+ if (a == "userpass")
+ admins.insert(make_pair(b, c));
+ line = getline(cf);
+ }
+
set names;
- int pos = 0;
while(cf) {
string n = read_server_record(cf);
if(!n.empty())
@@ -943,22 +1171,217 @@
reload_pending = true;
}
+struct administrator
+{
+ sock port;
+ struct cstate
+ {
+ bool auth;
+ bool rdone;
+ string buf;
+ cstate(): auth(false), rdone(false) {}
+ };
+ list > conns;
+ administrator(): port(-1)
+ {}
+ bool process(cstate & cs)
+ {
+ unsigned int n = cs.buf.find("\n");
+ if (n == cs.buf.npos)
+ return true;
+ string l = cs.buf.substr(0, n);
+ cs.buf.erase(0, n+1);
+ std::istringstream iss(l);
+ string cmd;
+ iss>>cmd;
+ if (cmd == "USERPASS") {
+ string user, pass;
+ iss>>user>>pass;
+ map::iterator i = admins.find(user);
+ if (i == admins.end() || i->second != pass) {
+ cerr<<"Failed admin login.\n";
+ return false;
+ } else {
+ if (cs.auth == true)
+ return false;
+ cs.auth = true;
+ process(cs);
+ }
+ } else if (cmd == "STATUS") {
+ string srv;
+ iss>>srv;
+ std::ostringstream oss;
+ if (srv.empty()) {
+ serverstate ss;
+ ss.num = total_connections;
+ if (connections_allowed) {
+ if (total_connections)
+ ss.state = serverstate::active;
+ else
+ ss.state = serverstate::waiting;
+ } else {
+ if (total_connections)
+ ss.state = serverstate::shuttingdown;
+ else
+ ss.state = serverstate::shutdown;
+ }
+ oss< >::iterator i;
+ i = server::servers_by_name.find(srv);
+ if (i != server::servers_by_name.end())
+ oss<second->get_state()<<"\n";
+ else
+ oss<<"No such server.\n";
+ }
+ cs.buf = oss.str();
+ } else if (cmd == "START") {
+ string srv;
+ iss>>srv;
+ std::ostringstream oss;
+ map >::iterator i;
+ i = server::servers_by_name.find(srv);
+ if (i != server::servers_by_name.end()) {
+ i->second->enabled = true;
+ oss<second->get_state()<<"\n";
+ } else
+ oss<<"No such server.\n";
+ cs.buf = oss.str();
+ } else if (cmd == "STOP") {
+ string srv;
+ iss>>srv;
+ std::ostringstream oss;
+ map >::iterator i;
+ i = server::servers_by_name.find(srv);
+ if (i != server::servers_by_name.end()) {
+ i->second->enabled = false;
+ i->second->maybekill();
+ oss<second->get_state()<<"\n";
+ } else
+ oss<<"No such server.\n";
+ cs.buf = oss.str();
+ } else if (cmd == "LIST") {
+ string state;
+ iss>>state;
+ map >::iterator i;
+ for (i = server::servers_by_name.begin();
+ i != server::servers_by_name.end(); ++i) {
+ if (state.empty() || i->second->get_state() == state)
+ cs.buf += (cs.buf.empty()?"":" ") + i->first;
+ }
+ cs.buf += "\n";
+ } else if (cmd == "SHUTDOWN") {
+ connections_allowed = false;
+ kill_old_servers();
+ cs.buf = "ok\n";
+ } else if (cmd == "CONNECTIONS") {
+ cs.buf = lexical_cast(total_connections) + "\n";
+ } else if (cmd == "RELOAD") {
+ reload_conffile(conffile);
+ cs.buf = "ok\n";
+ } else if (cmd == "STARTUP") {
+ connections_allowed = true;
+ cs.buf = "ok\n";
+ }
+ cs.rdone = true;
+ return true;
+ }
+ void initialize(string const & ap)
+ {
+ try {
+ int c = ap.find(":");
+ string a = ap.substr(0, c);
+ int p = lexical_cast(ap.substr(c+1));
+ port = start(a, p);
+ } catch (errstr & s) {
+ cerr<<"Could not initialize admin port: "< >::iterator i = conns.begin();
+ i != conns.end(); ++i) {
+ int c = i->second;
+ if (i->first.rdone)
+ FD_SET(c, &wr);
+ else
+ FD_SET(c, &rd);
+ maxfd = max(maxfd, int(c));
+ }
+ }
+ void process_selected(fd_set & rd, fd_set & wr, fd_set & er)
+ {
+ if (int(port) == -1)
+ return;
+ if (FD_ISSET(port, &rd)) {
+ try {
+ struct sockaddr_in addr;
+ unsigned int l = sizeof(addr);
+ memset(&addr, 0, l);
+ sock nc = tosserr(accept(port, (struct sockaddr *)
+ &addr, &l), "accept()");
+ conns.push_back(make_pair(cstate(), nc));
+ } catch(errstr & s) {
+ cerr<<"During new admin connection: "< >::iterator i = conns.begin();
+ i != conns.end(); ++i) {
+ int c = i->second;
+ if (c <= 0)
+ conns.erase(i);
+ else if (FD_ISSET(c, &rd)) {
+ char buf[120];
+ int n;
+ n = read(c, buf, 120);
+ if (n < 1)
+ conns.erase(i);
+ i->first.buf.append(buf, n);
+ if (!process(i->first)) {
+ cerr<<"Closing connection...\n";
+// i->second.close();
+ conns.erase(i);
+ }
+ }
+ else if (FD_ISSET(c, &wr)) {
+ int n = write(c, i->first.buf.c_str(), i->first.buf.size());
+ if (n < 1)
+ conns.erase(i);
+ else {
+ i->first.buf.erase(0, n);
+ if (i->first.buf.empty())
+ conns.erase(i);
+ }
+ }
+ }
+ }
+};
+
int main (int argc, char **argv)
{
- string conffile;
+ administrator admin;
{
int i;
for (i = 1; i < argc; ++i) {
- if (string(argv[i]) == "-a")
- listenaddr = argv[++i];
- else if (string(argv[i]) == "-p")
- listenport = lexical_cast(argv[++i]);
+ if (string(argv[i]) == "-l") {
+ string lp(argv[++i]);
+ unsigned int c = lp.find(":");
+ listenaddr = lp.substr(0, c);
+ if (c != lp.npos)
+ listenport = lexical_cast(lp.substr(c+1));
+ } else if (string(argv[i]) == "-a")
+ admin.initialize(argv[++i]);
else
conffile = argv[i];
}
if (conffile.empty() || i != argc) {
cerr<<"Usage:\n";
- cerr<<"\tusher [-a ] [-p ] \n";
+ cerr<<"\tusher [-l addr[:port]] [-a addr:port] \n";
exit (1);
}
}
@@ -998,14 +1421,14 @@
i != channels.end(); ++i)
i->add_to_select(nfds, rd, wr, er);
+ admin.add_to_select(nfds, rd, wr, er);
+
timeval timeout;
timeout.tv_sec = 10;
timeout.tv_usec = 0;
int r = select(nfds+1, &rd, &wr, &er, &timeout);
- if (r == -1 && errno == EINTR)
- continue;
- if (r < 0) {
+ if (r < 0 && errno != EINTR) {
perror ("select()");
exit (1);
}
@@ -1016,7 +1439,15 @@
memset(&client_address, 0, l);
sock cli = tosserr(accept(h, (struct sockaddr *)
&client_address, &l), "accept()");
- newchan = new channel(cli);
+ if (connections_allowed)
+ newchan = new channel(cli);
+ else {
+ char * dat;
+ int size;
+ make_packet(disabled, dat, size);
+ write(cli, dat, size);
+ delete[] dat;
+ }
} catch(errstr & s) {
cerr<<"During new connection: "<