guile-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Guile-commits] 09/10: add examples/ethreads/memcached-{client, server}


From: Andy Wingo
Subject: [Guile-commits] 09/10: add examples/ethreads/memcached-{client, server}
Date: Fri, 3 Jun 2016 21:03:09 +0000 (UTC)

wingo pushed a commit to branch +wip-ethreads
in repository guile.

commit 4d24e8d87fce6890c412ca80fffea491593efbf9
Author: Andy Wingo <address@hidden>
Date:   Fri Mar 30 19:36:24 2012 +0200

    add examples/ethreads/memcached-{client,server}
    
    * examples/ethreads/memcached-client.scm:
    * examples/ethreads/memcached-server.scm: Two new ethreads examples.
      Not really optimized, no consideration for memory use, but perhaps
      instructive.
---
 examples/ethreads/memcached-client.scm |  147 +++++++++++++++++++++++++++++++
 examples/ethreads/memcached-server.scm |  150 ++++++++++++++++++++++++++++++++
 2 files changed, 297 insertions(+)

diff --git a/examples/ethreads/memcached-client.scm 
b/examples/ethreads/memcached-client.scm
new file mode 100644
index 0000000..59b4caa
--- /dev/null
+++ b/examples/ethreads/memcached-client.scm
@@ -0,0 +1,147 @@
+;;; Simple memcached client implementation
+
+;; Copyright (C)  2012 Free Software Foundation, Inc.
+
+;; This library is free software; you can redistribute it and/or
+;; modify it under the terms of the GNU Lesser General Public
+;; License as published by the Free Software Foundation; either
+;; version 3 of the License, or (at your option) any later version.
+;;
+;; This library is distributed in the hope that it will be useful,
+;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+;; Lesser General Public License for more details.
+;;
+;; You should have received a copy of the GNU Lesser General Public
+;; License along with this library; if not, write to the Free Software
+;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+;; 02110-1301 USA
+
+(use-modules (rnrs bytevectors)
+             (ice-9 ethreads)
+             (ice-9 binary-ports)
+             (ice-9 rdelim)
+             (ice-9 match))
+
+(define (set-nonblocking! port)
+  (fcntl port F_SETFL (logior O_NONBLOCK (fcntl port F_GETFL)))
+  (setvbuf port 'block 1024))
+
+(define (server-error port msg . args)
+  (apply format (current-error-port) msg args)
+  (newline (current-error-port))
+  (close-port port)
+  (suspend))
+
+(define (parse-int port val)
+  (let ((num (string->number val)))
+    (unless (and num (integer? num) (exact? num) (>= num 0))
+      (server-error port "Expected a non-negative integer: ~s" val))
+    num))
+
+(define (make-item flags bv)
+  (vector flags bv))
+(define (item-flags item)
+  (vector-ref item 0))
+(define (item-bv item)
+  (vector-ref item 1))
+
+(define (get port . keys)
+  (put-string port "get ")
+  (put-string port (string-join keys " "))
+  (put-string port "\r\n")
+  (force-output port)
+  (let lp ((vals '()))
+    (let ((line (read-line port)))
+      (when (eof-object? line)
+        (server-error port "Expected a response to 'get', got EOF"))
+      (match (string-split (string-trim-right line) #\space)
+        (("VALUE" key flags length)
+         (let* ((flags (parse-int port flags))
+                (length (parse-int port length)))
+           (unless (member key keys)
+             (server-error port "Unknown key: ~a" key))
+           (when (assoc key vals)
+             (server-error port "Already have response for key: ~a" key))
+           (let ((bv (get-bytevector-n port length)))
+             (unless (= (bytevector-length bv) length)
+               (server-error port "Expected ~A bytes, got ~A" length bv))
+             (when (eqv? (peek-char port) #\return)
+               (read-char port))
+             (unless (eqv? (read-char port) #\newline)
+               (server-error port "Expected \\n"))
+             (lp (acons key (make-item flags bv) vals)))))
+        (("END")
+         (reverse vals))
+        (_
+         (server-error port "Bad line: ~A" line))))))
+
+(define* (set port key flags exptime bytes #:key noreply?)
+  (put-string port "set ")
+  (put-string port key)
+  (put-char port #\space)
+  (put-string port (number->string flags))
+  (put-char port #\space)
+  (put-string port (number->string exptime))
+  (put-char port #\space)
+  (put-string port (number->string (bytevector-length bytes)))
+  (when noreply?
+    (put-string port " noreply"))
+  (put-string port "\r\n")
+  (put-bytevector port bytes)
+  (put-string port "\r\n")
+  (force-output port)
+  (let ((line (read-line port)))
+    (match line
+      ((? eof-object?)
+       (server-error port "EOF while expecting response from server"))
+      ("STORED\r" #t)
+      ("NOT_STORED\r" #t)
+      (_
+       (server-error port "Unexpected response from server: ~A" line)))))
+
+(define (connect-to-server addrinfo)
+  (let ((port (socket (addrinfo:fam addrinfo)
+                      (addrinfo:socktype addrinfo)
+                      (addrinfo:protocol addrinfo))))
+    ;; Disable Nagle's algorithm.  We buffer ourselves.
+    (setsockopt port IPPROTO_TCP TCP_NODELAY 0)
+    (set-nonblocking! port)
+    (connect port (addrinfo:addr addrinfo))
+    port))
+
+(define *active-clients* 0)
+
+(define (client-loop addrinfo n num-connections)
+  (set! *active-clients* (1+ *active-clients*))
+  (let ((port (connect-to-server addrinfo))
+        (key (string-append "test-" (number->string n))))
+    (let lp ((m 0))
+      (when (< m num-connections)
+        (let ((v (string->utf8 (number->string m))))
+          (set port key 0 0 v)
+          (let* ((response (get port key))
+                 (item (assoc-ref response key)))
+            (unless item
+              (server-error port "Not found: ~A" key))
+            (unless (equal? (item-bv item) v)
+              (server-error port "Bad response: ~A (expected ~A)" (item-bv 
item) v))
+            (lp (1+ m))))))
+    (close-port port))
+  (set! *active-clients* (1- *active-clients*))
+  (when (zero? *active-clients*)
+    (exit 0)))
+
+(define (run-memcached-test num-clients num-connections)
+  ;; The getaddrinfo call blocks, unfortunately.  Call it once before
+  ;; spawning clients.
+  (let ((addrinfo (car (getaddrinfo "localhost" (number->string 11211)))))
+    (let lp ((n 0))
+      (when (< n num-clients)
+        (spawn
+         (lambda ()
+           (client-loop addrinfo n num-connections)))
+        (lp (1+ n)))))
+  (run))
+
+(apply run-memcached-test (map string->number (cdr (program-arguments))))
diff --git a/examples/ethreads/memcached-server.scm 
b/examples/ethreads/memcached-server.scm
new file mode 100644
index 0000000..a3fb39b
--- /dev/null
+++ b/examples/ethreads/memcached-server.scm
@@ -0,0 +1,150 @@
+;;; Simple memcached server implementation
+
+;; Copyright (C)  2012 Free Software Foundation, Inc.
+
+;; This library is free software; you can redistribute it and/or
+;; modify it under the terms of the GNU Lesser General Public
+;; License as published by the Free Software Foundation; either
+;; version 3 of the License, or (at your option) any later version.
+;;
+;; This library is distributed in the hope that it will be useful,
+;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+;; Lesser General Public License for more details.
+;;
+;; You should have received a copy of the GNU Lesser General Public
+;; License along with this library; if not, write to the Free Software
+;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+;; 02110-1301 USA
+
+(use-modules (rnrs bytevectors)
+             (ice-9 ethreads)
+             (ice-9 binary-ports)
+             (ice-9 rdelim)
+             (ice-9 match))
+
+(define (set-nonblocking! port)
+  (fcntl port F_SETFL (logior O_NONBLOCK (fcntl port F_GETFL)))
+  (setvbuf port 'block 1024))
+
+(define (make-default-socket family addr port)
+  (let ((sock (socket PF_INET SOCK_STREAM 0)))
+    (setsockopt sock SOL_SOCKET SO_REUSEADDR 1)
+    (fcntl sock F_SETFD FD_CLOEXEC)
+    (bind sock family addr port)
+    (set-nonblocking! sock)
+    sock))
+
+(define (client-error port msg . args)
+  (put-string port (apply format #f msg args))
+  (put-string port "\r\n")
+  (close-port port)
+  (suspend))
+
+(define (parse-int port val)
+  (let ((num (string->number val)))
+    (unless (and num (integer? num) (exact? num) (>= num 0))
+      (client-error port "Expected a non-negative integer: ~s" val))
+    num))
+
+(define (make-item flags bv)
+  (vector flags bv))
+(define (item-flags item)
+  (vector-ref item 0))
+(define (item-bv item)
+  (vector-ref item 1))
+
+(define *commands* (make-hash-table))
+
+(define-syntax-rule (define-command (name port store . pat)
+                      body body* ...)
+  (begin
+    (define (name port store args)
+      (match args
+        (pat body body* ...)
+        (else
+         (client-error port "Bad line: ~A ~S" 'name (string-join args " ")))))
+    (hashq-set! *commands* 'name name)))
+
+(define-command (get port store key* ...)
+  (let lp ((key* key*))
+    (match key*
+      ((key key* ...)
+       (let ((item (hash-ref store key)))
+         (when item
+           (put-string port "VALUE ")
+           (put-string port key)
+           (put-char port #\space)
+           (put-string port (number->string (item-flags item)))
+           (put-char port #\space)
+           (put-string port (number->string
+                              (bytevector-length (item-bv item))))
+           (put-char port #\return)
+           (put-char port #\newline)
+           (put-bytevector port (item-bv item))
+           (put-string port "\r\n"))
+         (lp key*)))
+      (()
+       (put-string port "END\r\n")))))
+
+(define-command (set port store key flags exptime bytes
+                     . (and noreply (or ("noreply") ())))
+  (let* ((flags (parse-int port flags))
+         (exptime (parse-int port exptime))
+         (bytes (parse-int port bytes)))
+    (let ((bv (get-bytevector-n port bytes)))
+      (unless (= (bytevector-length bv) bytes)
+        (client-error port "Tried to read ~A bytes, only read ~A"
+                      bytes (bytevector-length bv)))
+      (hash-set! store key (make-item flags bv))
+      (when (eqv? (peek-char port) #\return)
+        (read-char port))
+      (when (eqv? (peek-char port) #\newline)
+        (read-char port)))
+    (put-string port "STORED\r\n")))
+
+(define (client-loop port addr store)
+  (let loop ()
+    ;; TODO: Restrict read-line to 512 chars.
+    (let ((line (read-line port)))
+      (cond
+       ((eof-object? line)
+        (close-port port))
+       (else
+        (match (string-split (string-trim-right line) #\space)
+          ((verb . args)
+           (let ((proc (hashq-ref *commands* (string->symbol verb))))
+             (unless proc
+               (client-error port "Bad command: ~a" verb))
+             (proc port store args)
+             (force-output port)
+             (loop)))
+          (else (client-error port "Bad command line" line))))))))
+
+;; todo: accept and connect
+(define (socket-loop socket store)
+  (let loop ()
+    (match (accept socket)
+      ((client . addr)
+       (set-nonblocking! client)
+       ;; Disable Nagle's algorithm.  We buffer ourselves.
+       (setsockopt client IPPROTO_TCP TCP_NODELAY 0)
+       (spawn (lambda () (client-loop client addr store)))
+       (loop)))))
+
+(define* (run-memcached #:key
+                        (host #f)
+                        (family AF_INET)
+                        (addr (if host
+                                  (inet-pton family host)
+                                  INADDR_LOOPBACK))
+                        (port 11211)
+                        (socket (make-default-socket family addr port)))
+  (listen socket 128)
+  (sigaction SIGPIPE SIG_IGN)
+  (spawn
+   (lambda ()
+     (socket-loop socket (make-hash-table))))
+  (run))
+
+(run-memcached)



reply via email to

[Prev in Thread] Current Thread [Next in Thread]