# # # patch "netxx_pipe.cc" # from [1212f6dce28e21811c42d88e081cbfe047e3ceb2] # to [3e2ea05091c4c0c1aa92b77b778329ecc297b208] # # patch "netxx_pipe.hh" # from [e3cae796264771899b8ccf76aca8d7593419f66a] # to [401f074f42591d282bb09ea4f84ec37e645a6bbc] # ============================================================ --- netxx_pipe.cc 1212f6dce28e21811c42d88e081cbfe047e3ceb2 +++ netxx_pipe.cc 3e2ea05091c4c0c1aa92b77b778329ecc297b208 @@ -36,49 +36,58 @@ using std::strerror; using std::perror; using std::strerror; -Netxx::StdioStream::StdioStream(void) +#ifdef WIN32 + +signed_size_type PipeStream::read (void *buffer, size_type length) +{ + +} // read + +signed_size_type PipeStream::write (const void *buffer, size_type length) +{ + +} // write + +void PipeStream::close (void) +{ + +} // close + +#endif + +Netxx::StdioStream::StdioStream (int const readfd; int const writefd) : #ifdef WIN32 - readfd ((int)GetStdHandle(STD_INPUT_HANDLE)), - writefd ((int)GetStdHandle(STD_OUTPUT_HANDLE)) + named_pipe (readfd), + writefd (writefd) #else - readfd (STDIN_FILENO), - writefd (STDOUT_FILENO) + readfd (readfd), + writefd (writefd) #endif { - // This allows netxx to call select() on these file descriptors. On Win32, - // this will fail unless they are actually a socket (ie, we are spawned - // from 'mtn sync file:...', or some other program that sets stdin, stdout - // to a socket). +#ifndef WIN32 + // This allows Netxx::Probe::ready to call 'select' on these file + // descriptors. On Win32, 'select' would fail. probe_info.add_socket (readfd); probe_info.add_socket (writefd); - -#ifdef WIN32 - { - WSADATA wsdata; - if (WSAStartup(MAKEWORD(2,2), &wsdata) != 0) - L(FL("failed to load WinSock")); - } - #endif } Netxx::signed_size_type Netxx::StdioStream::read (void *buffer, size_type length) { +#ifdef WIN32 + return named_pipe.read (buffer, length); + +#else // based on netxx/socket.cxx read signed_size_type rc; char *buffer_ptr = static_cast(buffer); for (;;) { -#ifdef WIN32 - // readfd must be a socket, and 'read' doesn't work on sockets - rc = recv(readfd, buffer_ptr, length, 0); -#else - // On Unix, this works for sockets as well as files and pipes. rc = ::read(readfd, buffer, length); -#endif + if (rc < 0) { error_type error_code = get_last_error(); @@ -94,19 +103,6 @@ Netxx::StdioStream::read (void *buffer, case EAGAIN: return -1; -#ifdef WIN32 - - case WSAEMSGSIZE: - return length; - - case WSAENETRESET: - case WSAESHUTDOWN: - case WSAECONNABORTED: - case WSAETIMEDOUT: // timed out shouldn't happen - return 0; - -#endif - default: { std::string error("recv failure: "); @@ -132,13 +128,8 @@ Netxx::StdioStream::write (const void *b while (length) { -#ifdef WIN32 - // writefd must be a socket, and 'write' doesn't work on sockets - rc = send(writefd, buffer_ptr, length, 0); -#else - // On Unix, this works for sockets as well as files and pipes. rc = ::write(writefd, buffer, length); -#endif + if (rc < 0) { Netxx::error_type error_code = get_last_error(); @@ -184,35 +175,29 @@ Netxx::StdioStream::close (void) void Netxx::StdioStream::close (void) { - // close socket so client knows we disconnected + // close stdio so client knows we disconnected L(FL("closing StdioStream")); #ifdef WIN32 - // readfd, writefd are the same socket; only close it once - if (readfd != -1) - { - closesocket (readfd); - readfd = -1; - } + named_pipe.close(); #else - // On Unix, they might be separate pipes; close both if (readfd != -1) { ::close (readfd); readfd = -1; } +#endif if (writefd != -1) { ::close (writefd); writefd = -1; } -#endif } Netxx::socket_type Netxx::StdioStream::get_socketfd (void) const { - return readfd; // only used for informative messages + return -1; } Netxx::socket_type @@ -230,23 +215,13 @@ Netxx::StdioStream::get_probe_info (void const Netxx::ProbeInfo* Netxx::StdioStream::get_probe_info (void) const { +#ifdef Win32 + I(0); // FIXME: not clear what to do here yet +#else return &probe_info; +#endif } -void -Netxx::StdioStream::set_socketfd (socket_type sock) -{ - readfd = sock; - writefd = sock; - - probe_info.clear(); - probe_info.add_socket (readfd); - probe_info.add_socket (writefd); - - // We don't set binary on WIN32, because it is not necessary for send/recv - // on a socket. -} - Netxx::SpawnedStream::SpawnedStream (const string & cmd, const vector & args) : #ifdef WIN32 @@ -255,8 +230,6 @@ Netxx::SpawnedStream::SpawnedStream (con child(-1) #endif { - socket_type socks[2]; // 0 is for child, 1 is for parent - // Unfortunately neither munge_argv_into_cmdline nor execvp take // a vector as argument. @@ -271,70 +244,119 @@ Netxx::SpawnedStream::SpawnedStream (con newargv[newargc++] = i->c_str(); newargv[newargc] = 0; - E(0 == dumb_socketpair (socks, 0), F("socketpair failed")); +#ifdef WIN32 - Child_Socket.set_socketfd (socks[0]); - Parent_Socket.set_socketfd (socks[1]); + // Create a named pipe to serve as the child process stdio; it must have a + // unique name. - probe_info.add_socket (socks[1]); + static unsigned long serial = 0; + string pipename = (F("\\\\.\\pipe\\netxx_pipe_%ld_%d") + % GetCurrentProcessId() + % (++serial)).str(); -#ifdef WIN32 + HANDLE named_pipe = + CreateNamedPipe (pipename.c_str(), + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, // dwOpenMode + PIPE_TYPE_BYTE | PIPE_WAIT, // dwPipeMode + 1, // nMaxInstances + readfd.buffer_size(), // nOutBufferSize + readfd.buffer_size(), //nInBufferSize + 1000, //nDefaultTimeout, milliseconds + 0); // lpSecurityAttributes + E(named_pipe != INVALID_HANDLE_VALUE, + F("CreateNamedPipe(%s,...) call failed: %s") + % pipename % win32_last_err_msg()); + + // Open the child's handle to the named pipe. + + SECURITY_ATTRIBUTES inherit; + memset (&inherit,0,sizeof inherit); + inherit.nLength = sizeof inherit; + inherit.bInheritHandle = TRUE; + + HANDLE child_pipe = + CreateFile(pipename.c_str(), + GENERIC_READ|GENERIC_WRITE, 0, + &inherit, + OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL|FILE_FLAG_OVERLAPPED,0); + + E(hpipe != INVALID_HANDLE_VALUE, + F("CreateFile(%s,...) call failed: %s") + % pipename % win32_last_err_msg()); + + // Spawn the child process PROCESS_INFORMATION piProcInfo; - STARTUPINFO siStartInfo; + STARTUPINFO siStartInfo; memset(&piProcInfo, 0, sizeof(piProcInfo)); memset(&siStartInfo, 0, sizeof(siStartInfo)); - // Dup handles before using for child in/out/err so child can legally - // close one without losing the others - HANDLE hin, hout, herr; - assert(DuplicateHandle(GetCurrentProcess(), (HANDLE) socks[0], GetCurrentProcess(), &hin, 0, TRUE, DUPLICATE_SAME_ACCESS) != 0); - assert(DuplicateHandle(GetCurrentProcess(), (HANDLE) socks[0], GetCurrentProcess(), &hout, 0, TRUE, DUPLICATE_SAME_ACCESS) != 0); - assert(DuplicateHandle(GetCurrentProcess(), GetStdHandle(STD_ERROR_HANDLE), GetCurrentProcess(), &herr, 0, TRUE, DUPLICATE_SAME_ACCESS) != 0); - CloseHandle((HANDLE) socks[0]); - siStartInfo.cb = sizeof(siStartInfo); - siStartInfo.hStdError = herr; - siStartInfo.hStdOutput = hout; - siStartInfo.hStdInput = hin; + siStartInfo.hStdError = (HANDLE)(_get_osfhandle(STDERR_FILENO)); // parent stderr + siStartInfo.hStdOutput = child_pipe; + siStartInfo.hStdInput = child_pipe; siStartInfo.dwFlags |= STARTF_USESTDHANDLES; string cmdline = munge_argv_into_cmdline(newargv); L(FL("Subprocess command line: '%s'") % cmdline); - BOOL started = CreateProcess(NULL, // Application name - const_cast(cmdline.c_str()), - NULL, // Process attributes - NULL, // Thread attributes - TRUE, // Inherit handles - 0, // Creation flags - NULL, // Environment - NULL, // Current directory - &siStartInfo, - &piProcInfo); + BOOL started = + CreateProcess(NULL, // Application name + const_cast(cmdline.c_str()), + NULL, // Process attributes + NULL, // Thread attributes + TRUE, // Inherit handles + 0, // Creation flags + NULL, // Environment + NULL, // Current directory + &siStartInfo, + &piProcInfo); if (!started) { - closesocket(socks[0]); - closesocket(socks[1]); + ::close (named_pipe); + ::close (childe_pipe); } E(started, F("CreateProcess(%s,...) call failed: %s") % cmdline % win32_last_err_msg()); - child = piProcInfo.hProcess; + readfd.set_named_pipe (named_pipe); + writefd = named_pipe; + child = piProcInfo.hProcess; #else // !WIN32 + // Create pipes to serve as stdio for child process + + // These are unidirectional pipes; child_*[0] accepts read, child_*[1] accepts write. + int child_stdin[2]; + int child_stdout[2]; + + E(!pipe(child_stdin), + F("pipe call failed: %s") % strerror (errno)); + + if (pipe(child_stdout)) + { + ::close(child_stdin[0]); + ::close(child_stdin[1]); + E(0, F("pipe call failed: %s") % strerror (errno)); + } + + // Create the child process + child = fork(); if (child < 0) { // fork failed - ::close (socks[0]); - ::close (socks[1]); + ::close (child_stdin[0]); + ::close (child_stdin[1]); + ::close (child_stdout[0]); + ::close (child_stdout[1]); E(0, F("fork failed %s") % strerror(errno)); } @@ -343,45 +365,56 @@ Netxx::SpawnedStream::SpawnedStream (con { // We are in the child process; run the command, then exit. - int old_stdio[2]; + // Close the end of the pipes not used in the child + ::close (child_stdin[1]); + ::close (child_stdout[0]); - // Set the child socket as stdin and stdout. dup2 clobbers its first - // arg, so copy it first. - - old_stdio[0] = socks[0]; - old_stdio[1] = socks[0]; - - if (dup2(old_stdio[0], 0) != 0 || - dup2(old_stdio[1], 1) != 1) + // Replace our stdin, stdout with the pipes + if (dup2 (child_stdin[0], STDIN_FILENO) != STDIN_FILENO || + dup2(child_stdout[1], STDOUT_FILENO) != STDOUT_FILENO) { - // We don't have the mtn error handling infrastructure here. + // We don't have the mtn error handling infrastructure yet. perror("dup2 failed"); exit(-1); } - // old_stdio now holds the file descriptors for our old stdin, stdout, so close them - ::close (old_stdio[0]); - ::close (old_stdio[1]); + // close our old stdin, stdout + ::close (child_stdin[0]); + ::close (child_stdout[1]); + // Run the command execvp(newargv[0], const_cast(newargv)); perror(newargv[0]); exit(errno); } - // else we are in the parent process; continue. + else + { + // we are in the parent process + readfd = child_stdout[0]; + writefd = child_stdin[1]; + } #endif } Netxx::signed_size_type Netxx::SpawnedStream::read (void *buffer, size_type length) { - return Parent_Socket.read (buffer, length, get_timeout()); +#ifdef WIN32 + return readfd.read (buffer, length); +#else + return ::read (readfd, buffer, length); +#endif } Netxx::signed_size_type Netxx::SpawnedStream::write (const void *buffer, size_type length) { - return Parent_Socket.write (buffer, length, get_timeout()); +#ifdef WIN32 + return writefd.write (buffer, length); +#else + return ::write (writefd, buffer, length); +#endif } void @@ -390,36 +423,86 @@ Netxx::SpawnedStream::close (void) // We need to wait for the child to exit, so it reads our last message, // releases the database, closes redirected output files etc. before we do // whatever is next. - L(FL("waiting for spawned child")); + L(FL("waiting for spawned child ...")); #ifdef WIN32 if (child != INVALID_HANDLE_VALUE) WaitForSingleObject(child, INFINITE); child = INVALID_HANDLE_VALUE; + + readfd.close(); + writefd = -1; #else if (child != -1) while (waitpid(child,0,0) == -1 && errno == EINTR); child = -1; + + if (readfd != -1) + ::close(readfd); + readfd = -1; + + if (writefd != -1) + ::close(writefd); + writefd = -1; #endif - L(FL("...done")); + L(FL("waiting for spawned child ... done")); +} - Child_Socket.close(); - Parent_Socket.close(); +Netxx::socket_type +Netxx::SpawnedStream::get_socketfd (void) const +{ +#ifdef WIN32 + return pipe.get_pipefd (); +#else + return -1; +#endif; +} +Netxx::socket_type +Netxx::SpawnedStream::get_readfd (void) const +{ +#ifdef WIN32 + return -1; +#else + return readfd; +#endif; } Netxx::socket_type -Netxx::SpawnedStream::get_socketfd (void) const +Netxx::SpawnedStream::get_writefd (void) const { - return Parent_Socket.get_socketfd (); +#ifdef WIN32 + return -1; +#else + return writefd; +#endif; } const Netxx::ProbeInfo* Netxx::SpawnedStream::get_probe_info (void) const { +#ifdef WIN32 + I(0); +#else return &probe_info; +#endif } void +Netxx::StdioProbe::ready(const StreamBase &sb, ready_type rt) +{ + try + { + // See if it's a StdioStream + add(const_cast(dynamic_cast(sb)),rt); + } + catch (...) + { + // Not a StdioStream; see if it's a SpawnedStream. YUCK! fix Netxx::Probe to be dispatching. + Probe::add(sb,rt); + } +} + +void Netxx::StdioProbe::add(const StdioStream &ps, ready_type rt) { if (rt == ready_none || rt & ready_read) @@ -545,12 +628,11 @@ UNIT_TEST(pipe, stdio_stream) } -UNIT_TEST(pipe, spawn_stdio) +void unit_test_spawn (char *cmd, int will_disconnect) { try { - // netxx_pipe_stdio_main uses StdioStream, StdioProbe - Netxx::SpawnedStream spawned ("./netxx_pipe_stdio_main", vector()); + Netxx::SpawnedStream spawned (cmd, vector()); char write_buf[1024]; char read_buf[1024]; @@ -596,20 +678,27 @@ UNIT_TEST(pipe, spawn_stdio) I(static_cast(result[1]) == 255 - c); } - // Tell netxx_pipe_stdio_main to quit, closing its socket - write_buf[0] = 'q'; - write_buf[1] = 'u'; - write_buf[2] = 'i'; - write_buf[3] = 't'; - spawned.write(write_buf, 4); + if (will_disconnect) + { + // Tell netxx_pipe_stdio_main to quit, closing its stdin + write_buf[0] = 'q'; + write_buf[1] = 'u'; + write_buf[2] = 'i'; + write_buf[3] = 't'; + spawned.write(write_buf, 4); - // Note that probe.ready here returns timeout, _not_ ready_read. - probe.clear(); - probe.add(spawned, Netxx::Probe::ready_read); - res = probe.ready(timeout); - I(res.second==Netxx::Probe::ready_none); - I(res.first == -1); + // Wait for stdin to close; should be reported as ready_read, _not_ as timeout + probe.clear(); + probe.add(spawned, Netxx::Probe::ready_read); + res = probe.ready(timeout); + I(res.second==Netxx::Probe::ready_read); + I(res.first == spawned.get_socketfd()); + // now read should return 0 bytes + bytes = spawned.read(read_buf, sizeof(read_buf)); + I(bytes == 0); + } + // This waits for the child to exit spawned.close(); } @@ -620,6 +709,20 @@ UNIT_TEST(pipe, spawn_stdio) } } +UNIT_TEST(pipe, spawn_cat) +{ + // We must be able to spawn a "normal" program, that uses 'read' and + // 'write' on stdio; the real use case is 'mtn sync ssh:...'. + unit_test_spawn ("cat", 0); +} + +UNIT_TEST(pipe, spawn_stdio) +{ + // We must also be able to spawn a program that uses StdioStream; this + // also tests StdioStream. + unit_test_spawn ("./netxx_pipe_stdio_main", 1); +} + #endif // Local Variables: ============================================================ --- netxx_pipe.hh e3cae796264771899b8ccf76aca8d7593419f66a +++ netxx_pipe.hh 401f074f42591d282bb09ea4f84ec37e645a6bbc @@ -23,61 +23,140 @@ in two different ways. The use cases are 'mtn sync file:...', 'mtn sync ssh:...', and 'mtn serve - --stdio'. + --stdio'. These are simple on Unix, but pose challenges on Win32. The netsync code uses StreamBase objects to perform all communications between the local and server mtns. - In 'mtn sync file:...', the local mtn spawns the server directly as 'mtn - serve --stdio'. Thus the local mtn needs a StreamBase object that can - serve as the stdin/stdout of a spawned process; SpawnedStream. The server - mtn needs to construct a StreamBase object from the existing stdin/stdout; - StdioStream. + In 'mtn sync file:...', the client mtn spawns the server as 'mtn serve + --stdio'. Thus the client mtn needs a StreamBase object that can serve as + the stdin/stdout of a spawned process; that is provided by SpawnedStream + (which also performs the spawn operation). The server mtn needs to + construct a StreamBase object from the existing stdin/stdout; that is + provided by StdioStream. - In 'mtn sync ssh:...' the local mtn spawns ssh, connecting to it via - SpawnedStream. On the server, ssh spawns 'mtn serve --stdio', which uses - StdioStream. + In 'mtn sync ssh:...' the client mtn spawns ssh, connecting to it via + SpawnedStream. On the server machine, ssh spawns 'mtn serve --stdio', + which uses StdioStream. - We also need StdioProbe objects that work with StdioStream objects, since - Netxx::Probe doesn't. Netxx does not provide for child classes of Probe, - nor of ProbeInfo; none of the methods are virtual. We handle this by - having the netsync code always use a StdioProbe object when the StreamBase - object might be a StdioStream. StdioProbe acts like Probe unless the - stream is StdioStream. IMPROVEME: we only need a StdioProbe in - serve_single_connection; should fix Netxx to allow for derived classes. + The original netsync code performed five operations on streams; read + available with timeout, write available with timeout, detecting a closed + connection with timeout, immediate read, and immediate write. For normal + client-server connections (not via stdio), Netxx::Sockets provides the + connection and the immediate read/write on both Win32 and Unix (via + 'recv', 'send'), and Netxx::Probe supports read/write available with + timeout and detect closed connection with timeout on both Win32 and Unix + (via 'select'). - We use socket pairs to implement SpawnedStream. On Unix and Win32, a - socket can serve as stdin and stdout for a spawned process. + For stdio connections using SpawnedStream, there are two problem cases + with sockets. First, closing a server stdio file descriptor that is + actually a socket does not cause the client 'select' to return with a + 'socket closed' indication on either Unix or Win32. Second, in 'mtn sync + ssh:...', the spawned ssh process uses the 'read' and 'write' functions on + stdio; these do not work with sockets on Win32, so the stdio file + descriptors cannot actually be sockets. - The sockets in the pair must be connected to each other; socketpair() does - that nicely. + The first problem can be worked around by changing the netsync protocol + slightly to not require detecting closed connections; thus 'mtn sync + file:...' can be provided on Win32 and Unix using a SpawnedStream + implemented with a socket. However, there are then failure situations + where the client does not terminate. See the + 'net.venge.monotone.experimental.win32_pipes' branch for an example of + this. - On Win32, select works on stdin only if it is actually a socket. We just - live with that restriction. It may mean Win32 can't be an ssh server for - mtn. + However, to support 'mtn sync ssh:...' on Win32 clients, we must use named + pipes to implement SpawnedStream; they support the 'read' and 'write' + functions. They do not directly support 'select', but the essential + operation of a Probe can be supported via overlapped operations. - An earlier implementation (a single class named PipeStream) tried to use - Win32 overlapped IO via named pipes on Win32, and Unix select with Unix - pipes on unix, but we couldn't make it work, because the semantics of the - two implementations are too different. Now we always use socket select on - sockets. + We also use pipes on Unix for SpawnedStream, to allow detecting closed + stdio connections. The original netsync code generally assumes there is + only one file descriptor for a connection; we modify it where necessary to + allow for two file descriptors. Note that Win32 named pipes need only one + file descriptor. + + A Probe object provides three operations; read available, write available, + and closed connection detection - all with optional timeout. 'select' + performs all three operations on stdio or sockets Unix, but only on + sockets on Win32. If a Probe object reports either read or write is + available within the specified timeout, netsync then performs a read or + write which is expected to complete immediately. In order to implement the + timeout without a busy wait, we must use Win32 overlapped operations. + + Overlapped operations can provide a read available with timeout. There is + no corresponding operation for write; we must simply assume write is + always available. Since pipes are only used in a SpawnedStream, there are + two cases where write would not actually be available, and the write call + would block. + + The first case is when the client is on Win32, and the server (either + local via 'mtn sync file:...' or remote via 'mtn sync ssh:...') falls far + behind in reading, so that the pipe buffer fills up. In this case, it is + safe for the client to block on a write until the server catches up. If + the connection to the server terminates due to error, the write will + terminate with an error. + + The second case is a local or remote server on Win32, when the client + falls far behind in reading. In that case, it is safe for the server to + block on write until the client catches up, since the server is not + serving other connections. + + Note that we are assuming it is not possible for both the server and the + client to fall far enough behind in reading for write to block on both + sides simultaneously. + + This implementation does not support the case of an ssh server on Win32; + ssh will spawn mtn with stdio being normal pipes, and PeekNamedPipe will + fail. In that case, the Cygwin implementation of mtn is recommended. + + Netxx does not provide for child classes of Probe, nor of ProbeInfo; none + of the methods are virtual. We handle this by having the netsync code + always use a StdioProbe object; it behaves identically to Probe when the + connection is a socket. IMPROVEME: should fix Netxx to allow for derived + classes. + */ namespace Netxx { class StdioProbe; class StreamServer; - class StdioStreamTest; +#ifdef WIN32 + class PipeStream + { + HANDLE named_pipe; + char readbuf[1024]; + DWORD bytes_available; + bool read_in_progress; + OVERLAPPED overlap; + + public: + // Nothing is constructed + PipeStream (void); + + // Get the named pipe from the existing file descriptor + explicit PipeStream (int readfd); + + // Use an existing named pipe + void set_named_pipe (HANDLE named_pipe); + + Netxx::socket_type get_pipefd(); + signed_size_type read (void *buffer, size_type length); + void close (void); + }; +#endif + class SpawnedStream : public StreamBase { - Socket Parent_Socket; - Socket Child_Socket; - ProbeInfo probe_info; #ifdef WIN32 - HANDLE child; + PipeStream pipe; + HANDLE child; #else - pid_t child; + int readfd; + int writefd; + ProbeInfo probe_info; + pid_t child; #endif public: @@ -89,22 +168,30 @@ namespace Netxx virtual signed_size_type read (void *buffer, size_type length); virtual signed_size_type write (const void *buffer, size_type length); virtual void close (void); + virtual const ProbeInfo* get_probe_info (void) const; + + // get_socketfd will return -1 on Unix virtual socket_type get_socketfd (void) const; - virtual const ProbeInfo* get_probe_info (void) const; + virtual socket_type get_writefd (void) const; + virtual socket_type get_readfd (void) const; }; class StdioStream : public StreamBase { friend class StdioProbe; - int readfd; - int writefd; - ProbeInfo probe_info; +#ifdef WIN32 + PipeStream readfd; + int writefd; +#else + int readfd; + int writefd; + ProbeInfo probe_info; +#endif public: - explicit StdioStream (void); - // Construct a Stream object from stdout and stdin of the current - // process. + explicit StdioStream (int const readfd; int const writefd); + // Construct a Stream object from the given file descriptors virtual ~StdioStream() { close(); } virtual signed_size_type read (void *buffer, size_type length); @@ -112,28 +199,20 @@ namespace Netxx virtual void close (void); virtual const ProbeInfo* get_probe_info (void) const; - // In general, we have two file descriptors that netsync needs to know - // about. Netsync should never call get_socketfd to identify this - // stream; it will throw an error. + // get_socketfd will return -1 virtual socket_type get_socketfd (void) const; virtual socket_type get_writefd (void) const; virtual socket_type get_readfd (void) const; - private: - friend class StdioStreamTest; - // Unit test facilities - - // noop constructor - explicit StdioStream (int test) {}; - - // Set socket for unit testing - void set_socketfd (socket_type sock); }; struct StdioProbe : Probe { public: + result_type ready (const Timeout &timeout=Timeout(), ready_type rt=ready_none); + // Note that Netxx::Probe::add is a template, not virtual; we provide // the versions needed by netsync code. + void add(const SpawnedStream &ps, ready_type rt=ready_none); void add(const StdioStream &ps, ready_type rt=ready_none); void add(const StreamBase &sb, ready_type rt=ready_none); void add(const StreamServer &ss, ready_type rt=ready_none);