guile-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Guile-commits] 01/02: Only ptob->close() after read/write finish


From: Andy Wingo
Subject: [Guile-commits] 01/02: Only ptob->close() after read/write finish
Date: Wed, 31 Aug 2016 17:33:07 +0000 (UTC)

wingo pushed a commit to branch master
in repository guile.

commit b8a53b98b33dc89b0ed526ca66232655d24f2ce8
Author: Andy Wingo <address@hidden>
Date:   Wed Aug 31 19:00:27 2016 +0200

    Only ptob->close() after read/write finish
    
    * libguile/Makefile.am (noinst_HEADERS): Add atomics-internal.h.
    * libguile/atomics-internal.h: New file.
    * libguile/ports-internal.h (refcount): New member.
    * libguile/ports.c (release_port, scm_dynwind_acquire_port): New
      facility for acquiring a port within a dynwind.
      (scm_port_poll, scm_i_read_bytes, scm_setvbuf, scm_end_input)
      (scm_i_write_bytes, scm_char_ready_p, scm_seek)
      (scm_truncate_file, trampoline_to_c_read)
      (trampoline_to_c_write): Acquire port.
      (scm_c_make_port_with_encoding): Init refcount to 1.
      (scm_close_port): Release port.
    * doc/ref/api-io.texi (I/O Extensions): Add documentation
---
 doc/ref/api-io.texi         |    7 +++
 libguile/Makefile.am        |    1 +
 libguile/atomics-internal.h |   85 ++++++++++++++++++++++++++
 libguile/ports-internal.h   |   13 +++-
 libguile/ports.c            |  138 +++++++++++++++++++++++++++++++++++--------
 5 files changed, 216 insertions(+), 28 deletions(-)

diff --git a/doc/ref/api-io.texi b/doc/ref/api-io.texi
index e4e4f36..9facb38 100644
--- a/doc/ref/api-io.texi
+++ b/doc/ref/api-io.texi
@@ -1694,6 +1694,13 @@ operating system inform Guile about the appropriate 
buffer sizes for the
 particular file opened by the port.
 @end table
 
+Note that calls to all of these methods can proceed in parallel and
+concurrently and from any thread up until the point that the port is
+closed.  The call to @code{close} will happen when no other method is
+running, and no method will be called after the @code{close} method is
+called.  If your port implementation needs mutual exclusion to prevent
+concurrency, it is responsible for locking appropriately.
+
 @node Non-Blocking I/O
 @subsection Non-Blocking I/O
 
diff --git a/libguile/Makefile.am b/libguile/Makefile.am
index 8161ade..ba6be20 100644
--- a/libguile/Makefile.am
+++ b/libguile/Makefile.am
@@ -507,6 +507,7 @@ noinst_HEADERS = conv-integer.i.c conv-uinteger.i.c         
\
                  elf.h                                         \
                  srfi-14.i.c                                   \
                  quicksort.i.c                                  \
+                 atomics-internal.h                            \
                  posix-w32.h                                   \
                 private-options.h ports-internal.h
 
diff --git a/libguile/atomics-internal.h b/libguile/atomics-internal.h
new file mode 100644
index 0000000..1859daa
--- /dev/null
+++ b/libguile/atomics-internal.h
@@ -0,0 +1,85 @@
+#ifndef SCM_ATOMICS_INTERNAL_H
+#define SCM_ATOMICS_INTERNAL_H
+
+/* Copyright (C) 2016
+ * Free Software Foundation, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ */
+
+
+
+
+#include <stdint.h>
+
+
+
+
+#define HAVE_C11_ATOMICS (__STDC_VERSION__ >= 201112L && 
!defined(__STDC_NO_ATOMICS__))
+
+#if HAVE_C11_ATOMICS
+
+#include <stdatomic.h>
+static inline uint32_t
+scm_atomic_subtract_uint32 (uint32_t *obj, uint32_t arg)
+{
+  return atomic_fetch_sub (obj, arg);
+}
+static inline _Bool
+scm_atomic_compare_and_swap_uint32 (uint32_t *obj, uint32_t *expected,
+                                    uint32_t desired)
+{
+  return atomic_compare_exchange_weak (obj, expected, desired);
+}
+
+#else /* HAVE_C11_ATOMICS */
+
+/* Fallback implementation using locks.  */
+#include "libguile/threads.h"
+static scm_i_pthread_mutex_t atomics_lock = SCM_I_PTHREAD_MUTEX_INITIALIZER;
+static inline uint32_t
+scm_atomic_subtract_uint32 (uint32_t *obj, uint32_t arg)
+{
+  uint32_t ret;
+  scm_i_pthread_mutex_lock (&atomics_lock);
+  ret = *obj;
+  *obj -= arg;
+  scm_i_pthread_mutex_unlock (&atomics_lock);
+  return ret;
+}
+static inline int
+scm_atomic_compare_and_swap_uint32 (uint32_t *obj, uint32_t *expected,
+                                    uint32_t desired)
+{
+  int ret;
+  scm_i_pthread_mutex_lock (&atomics_lock);
+  if (*obj == *expected)
+    {
+      *obj = desired;
+      ret = 1;
+    }
+  else
+    {
+      *expected = *obj;
+      ret = 0;
+    }
+  scm_i_pthread_mutex_unlock (&atomics_lock);
+  return ret;
+}
+
+#endif /* HAVE_C11_ATOMICS */
+
+#endif /* SCM_ATOMICS_INTERNAL_H */
diff --git a/libguile/ports-internal.h b/libguile/ports-internal.h
index d014415..4203a5c 100644
--- a/libguile/ports-internal.h
+++ b/libguile/ports-internal.h
@@ -323,12 +323,19 @@ struct scm_t_port
      `unwrite-byte'.  */
   size_t read_buffering;
 
+  /* Reads and writes can proceed concurrently, but we don't want to
+     start any read or write after close() has been called.  So we have
+     a refcount which is positive if close has not yet been called.
+     Reading, writing, and the like temporarily increments this
+     refcount, provided it was nonzero to start with.  */
+  scm_t_uint32 refcount;
+
   /* True if the port is random access.  Implies that the buffers must
      be flushed before switching between reading and writing, seeking,
      and so on.  */
-  unsigned rw_random : 1;
-  unsigned at_stream_start_for_bom_read  : 1;
-  unsigned at_stream_start_for_bom_write : 1;
+  scm_t_uint32 rw_random : 1;
+  scm_t_uint32 at_stream_start_for_bom_read  : 1;
+  scm_t_uint32 at_stream_start_for_bom_write : 1;
 
   /* Character encoding support.  */
   SCM encoding;  /* A symbol of upper-case ASCII.  */
diff --git a/libguile/ports.c b/libguile/ports.c
index 9e5211f..278bbe9 100644
--- a/libguile/ports.c
+++ b/libguile/ports.c
@@ -27,6 +27,7 @@
 #  include <config.h>
 #endif
 
+#include <assert.h>
 #include <stdio.h>
 #include <errno.h>
 #include <fcntl.h>  /* for chsize on mingw */
@@ -37,10 +38,9 @@
 #include <unistr.h>
 #include <striconveh.h>
 
-#include <assert.h>
-
 #include "libguile/_scm.h"
 #include "libguile/async.h"
+#include "libguile/atomics-internal.h"
 #include "libguile/deprecation.h"
 #include "libguile/eval.h"
 #include "libguile/fports.h"  /* direct access for seek and truncate */
@@ -131,6 +131,63 @@ static const scm_t_wchar UNICODE_REPLACEMENT_CHARACTER = 
0xFFFD;
 
 
 
+static void
+release_port (SCM port)
+{
+  scm_t_port *pt = SCM_PORT (port);
+  scm_t_uint32 prev;
+
+  prev = scm_atomic_subtract_uint32 (&pt->refcount, 1);
+  if (prev == 0)
+    /* Logic failure.  */
+    abort ();
+
+  if (prev > 1)
+    /* Port still alive.  */
+    return;
+
+  /* FIXME: `catch' around the close call?  It could throw an exception,
+     and in that case we'd leak the iconv descriptors, if any.  */
+  if (SCM_PORT_TYPE (port)->close)
+    SCM_PORT_TYPE (port)->close (port);
+
+  scm_i_pthread_mutex_lock (&iconv_lock);
+  pt = SCM_PORT (port);
+  if (scm_is_true (pt->precise_encoding))
+    {
+      if (pt->input_cd != (iconv_t) -1)
+        iconv_close (pt->input_cd);
+      if (pt->output_cd != (iconv_t) -1)
+        iconv_close (pt->output_cd);
+      pt->precise_encoding = SCM_BOOL_F;
+      pt->input_cd = pt->output_cd = (iconv_t) -1;
+    }
+  scm_i_pthread_mutex_unlock (&iconv_lock);
+}
+
+static void
+scm_dynwind_acquire_port (SCM port)
+{
+  scm_t_port *pt = SCM_PORT (port);
+  /* We're acquiring a lease on the port so that we only close it when
+     no one is using it.  The normal case is that it's open with a
+     refcount of 1 and we're going to push it to 2.  Otherwise perhaps
+     there is someone else using it; that's fine, we just add our
+     refcount.  However if the current refcount is 0 then the port has
+     been closed or is closing and we must throw an error.  */
+  scm_t_uint32 cur = 1, next = 2;
+  while (!scm_atomic_compare_and_swap_uint32 (&pt->refcount, &cur, next))
+    {
+      if (cur == 0)
+        scm_wrong_type_arg_msg (NULL, 0, port, "open port");
+      next = cur + 1;
+    }
+  scm_dynwind_unwind_handler_with_scm (release_port, port,
+                                       SCM_F_WIND_EXPLICITLY);
+}
+
+
+
 static SCM trampoline_to_c_read_subr;
 static SCM trampoline_to_c_write_subr;
 
@@ -191,7 +248,10 @@ trampoline_to_c_read (SCM port, SCM dst, SCM start, SCM 
count)
   SCM_ASSERT_RANGE (3, start, c_start <= SCM_BYTEVECTOR_LENGTH (dst));
   SCM_ASSERT_RANGE (4, count, c_count <= SCM_BYTEVECTOR_LENGTH (dst) - 
c_start);
 
+  scm_dynwind_begin (0);
+  scm_dynwind_acquire_port (port);
   ret = SCM_PORT_TYPE (port)->c_read (port, dst, c_start, c_count);
+  scm_dynwind_end ();
 
   return ret == (size_t) -1 ? SCM_BOOL_F : scm_from_size_t (ret);
 }
@@ -218,7 +278,10 @@ trampoline_to_c_write (SCM port, SCM src, SCM start, SCM 
count)
   SCM_ASSERT_RANGE (3, start, c_start <= SCM_BYTEVECTOR_LENGTH (src));
   SCM_ASSERT_RANGE (4, count, c_count <= SCM_BYTEVECTOR_LENGTH (src) - 
c_start);
 
+  scm_dynwind_begin (0);
+  scm_dynwind_acquire_port (port);
   ret = SCM_PORT_TYPE (port)->c_write (port, src, c_start, c_count);
+  scm_dynwind_end ();
 
   return ret == (size_t) -1 ? SCM_BOOL_F : scm_from_size_t (ret);
 }
@@ -691,6 +754,8 @@ scm_c_make_port_with_encoding (scm_t_port_type *ptob, 
unsigned long mode_bits,
   pt->file_name = SCM_BOOL_F;
   pt->position = scm_cons (SCM_INUM0, SCM_INUM0);
 
+  pt->refcount = 1;
+
   pt->at_stream_start_for_bom_read  = 1;
   pt->at_stream_start_for_bom_write = 1;
 
@@ -797,11 +862,9 @@ SCM_DEFINE (scm_close_port, "close-port", 1, 0, 0,
            "descriptors.")
 #define FUNC_NAME s_scm_close_port
 {
-  scm_t_port *pt;
-
   port = SCM_COERCE_OUTPORT (port);
-
   SCM_VALIDATE_PORT (1, port);
+
   if (SCM_CLOSEDP (port))
     return SCM_BOOL_F;
 
@@ -809,28 +872,12 @@ SCM_DEFINE (scm_close_port, "close-port", 1, 0, 0,
   if (SCM_OUTPUT_PORT_P (port))
     scm_flush (port);
 
-  pt = SCM_PORT (port);
   SCM_CLR_PORT_OPEN_FLAG (port);
 
   if (SCM_PORT_TYPE (port)->flags & SCM_PORT_TYPE_NEEDS_CLOSE_ON_GC)
     scm_weak_set_remove_x (scm_i_port_weak_set, port);
 
-  if (SCM_PORT_TYPE (port)->close)
-    /* Note!  This may throw an exception.  Anything after this point
-       should be resilient to non-local exits.  */
-    SCM_PORT_TYPE (port)->close (port);
-
-  scm_i_pthread_mutex_lock (&iconv_lock);
-  if (scm_is_true (pt->precise_encoding))
-    {
-      if (pt->input_cd != (iconv_t) -1)
-        iconv_close (pt->input_cd);
-      if (pt->output_cd != (iconv_t) -1)
-        iconv_close (pt->output_cd);
-      pt->precise_encoding = SCM_BOOL_F;
-      pt->input_cd = pt->output_cd = (iconv_t) -1;
-    }
-  scm_i_pthread_mutex_unlock (&iconv_lock);
+  release_port (port);
 
   return SCM_BOOL_T;
 }
@@ -1314,6 +1361,7 @@ SCM_DEFINE (scm_port_write_wait_fd, "port-write-wait-fd", 
1, 0, 0,
 }
 #undef FUNC_NAME
 
+/* Call while having acquired the port.  */
 static int
 port_poll (SCM port, short events, int timeout)
 #define FUNC_NAME "port-poll"
@@ -1358,6 +1406,7 @@ SCM_DEFINE (scm_port_poll, "port-poll", 2, 1, 0,
 {
   short c_events = 0;
   int c_timeout;
+  SCM ret;
 
   port = SCM_COERCE_OUTPORT (port);
   SCM_VALIDATE_PORT (1, port);
@@ -1371,7 +1420,12 @@ SCM_DEFINE (scm_port_poll, "port-poll", 2, 1, 0,
   if (scm_i_string_contains_char (events, 'w'))
     c_events |= POLLIN;
 
-  return scm_from_int (port_poll (port, c_events, c_timeout));
+  scm_dynwind_begin (0);
+  scm_dynwind_acquire_port (port);
+  ret = scm_from_int (port_poll (port, c_events, c_timeout));
+  scm_dynwind_end ();
+
+  return ret;
 }
 #undef FUNC_NAME
 
@@ -1476,6 +1530,9 @@ scm_i_read_bytes (SCM port, SCM dst, size_t start, size_t 
count)
   assert (count <= SCM_BYTEVECTOR_LENGTH (dst));
   assert (start + count <= SCM_BYTEVECTOR_LENGTH (dst));
 
+  scm_dynwind_begin (0);
+  scm_dynwind_acquire_port (port);
+
  retry:
   filled = ptob->c_read (port, dst, start, count);
 
@@ -1485,6 +1542,8 @@ scm_i_read_bytes (SCM port, SCM dst, size_t start, size_t 
count)
       goto retry;
     }
 
+  scm_dynwind_end ();
+
   assert (filled <= count);
 
   return filled;
@@ -2220,8 +2279,11 @@ SCM_DEFINE (scm_setvbuf, "setvbuf", 2, 1, 0,
   else
     {
       read_buf_size = write_buf_size = default_buffer_size;
+      scm_dynwind_begin (0);
+      scm_dynwind_acquire_port (port);
       if (ptob->get_natural_buffer_sizes)
         ptob->get_natural_buffer_sizes (port, &read_buf_size, &write_buf_size);
+      scm_dynwind_end ();
     }
 
   /* Minimum buffer size is one byte.  */
@@ -2310,7 +2372,12 @@ scm_end_input (SCM port)
   offset = - (scm_t_off) discarded;
 
   if (offset != 0)
-    SCM_PORT_TYPE (port)->seek (port, offset, SEEK_CUR);
+    {
+      scm_dynwind_begin (0);
+      scm_dynwind_acquire_port (port);
+      SCM_PORT_TYPE (port)->seek (port, offset, SEEK_CUR);
+      scm_dynwind_end ();
+    }
 }
 
 SCM_DEFINE (scm_force_output, "force-output", 0, 1, 0,
@@ -2722,6 +2789,9 @@ scm_i_write_bytes (SCM port, SCM src, size_t start, 
size_t count)
   assert (count <= SCM_BYTEVECTOR_LENGTH (src));
   assert (start + count <= SCM_BYTEVECTOR_LENGTH (src));
 
+  scm_dynwind_begin (0);
+  scm_dynwind_acquire_port (port);
+
   do
     {
       size_t ret = ptob->c_write (port, src, start + written, count - written);
@@ -2733,6 +2803,8 @@ scm_i_write_bytes (SCM port, SCM src, size_t start, 
size_t count)
     }
   while (written < count);
 
+  scm_dynwind_end ();
+
   assert (written == count);
 }
 
@@ -3495,7 +3567,14 @@ SCM_DEFINE (scm_char_ready_p, "char-ready?", 0, 1, 0,
       scm_t_port_type *ptob = SCM_PORT_TYPE (port);
       
       if (ptob->input_waiting)
-       return scm_from_bool (ptob->input_waiting (port));
+        {
+          SCM ret;
+          scm_dynwind_begin (0);
+          scm_dynwind_acquire_port (port);
+          ret = scm_from_bool (ptob->input_waiting (port));
+          scm_dynwind_end ();
+          return ret;
+        }
       else
        return SCM_BOOL_T;
     }
@@ -3549,7 +3628,10 @@ SCM_DEFINE (scm_seek, "seek", 3, 0, 0,
           /* If we are just querying the current position, avoid
              flushing buffers.  We don't even need to require that the
              port supports random access.  */
+          scm_dynwind_begin (0);
+          scm_dynwind_acquire_port (fd_port);
           rv = ptob->seek (fd_port, off, how);
+          scm_dynwind_end ();
           rv -= scm_port_buffer_can_take (pt->read_buf);
           rv += scm_port_buffer_can_take (pt->write_buf);
           return scm_from_off_t_or_off64_t (rv);
@@ -3562,7 +3644,10 @@ SCM_DEFINE (scm_seek, "seek", 3, 0, 0,
       scm_end_input (fd_port);
       scm_flush (fd_port);
 
+      scm_dynwind_begin (0);
+      scm_dynwind_acquire_port (fd_port);
       rv = ptob->seek (fd_port, off, how);
+      scm_dynwind_end ();
 
       /* Set stream-start flags according to new position. */
       pt->at_stream_start_for_bom_read  = (rv == 0);
@@ -3668,7 +3753,10 @@ SCM_DEFINE (scm_truncate_file, "truncate-file", 1, 1, 0,
         scm_end_input (object);
       scm_flush (object);
 
+      scm_dynwind_begin (0);
+      scm_dynwind_acquire_port (object);
       ptob->truncate (object, c_length);
+      scm_dynwind_end ();
       rv = 0;
     }
   else



reply via email to

[Prev in Thread] Current Thread [Next in Thread]