(declare (unit pthreads) (uses srfi-18 srfi-34 srfi-35) (uses library) ; gc (fixnum-arithmetic) (disable-interrupts) (usual-integrations) (no-bound-checks) (no-procedure-checks-for-usual-bindings) (bound-to-procedure ) (foreign-declare #< #endif #include #include #ifndef NO_THREAD_LOOP #include #include #include typedef int (*askemos_request_function_t)(void *); typedef struct _askemos_pool_entry { askemos_request_function_t function; void *data; obj callback; } askemos_pool_entry_t; struct askemos_pool { pthread_mutex_t mutex; pthread_cond_t has_job; pthread_cond_t has_space; unsigned short int total, next, free; askemos_pool_entry_t *r; }; static void * worker_thread_loop(void *arg); static void askemos_pool_entry_init(struct askemos_pool * pool, askemos_pool_entry_t * r); static int askemos_pool_init(struct askemos_pool * pool) { int i; pool->next = 0; pool->total = pool->free = 50; pthread_mutex_init(&pool->mutex, NULL); pthread_cond_init(&pool->has_job, NULL); pthread_cond_init(&pool->has_space, NULL); pool->r = malloc(sizeof(askemos_pool_entry_t) * pool->total); for (i = 0; i < pool->total; ++i) { pool->r[i].function = NULL; pool->r[i].data = NULL; pool->r[i].callback = FALSE_OBJ; } for (i = 0; i < 5; ++i) { int e; pthread_t thread; pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, /* PTHREAD_CREATE_DETACHED */ PTHREAD_CREATE_JOINABLE); e = pthread_create(&thread, &attr, worker_thread_loop, pool); pthread_attr_destroy(&attr); } return 0; } static void askemos_pool_send(struct askemos_pool * pool, askemos_request_function_t function, void *data, obj callback) { askemos_pool_entry_t *result = NULL; pthread_mutex_lock(&pool->mutex); do { if (pool->free) { result = &pool->r[pool->next]; pool->next = (pool->next + 1) % pool->total; --pool->free; } else pthread_cond_wait(&pool->has_space, &pool->mutex); } while( result == NULL ); result->function = function; result->data = data; result->callback = callback; pthread_mutex_unlock(&pool->mutex); pthread_cond_signal(&pool->has_job); } /* askemos_pool_put returns an entry into the queue (LI) and returns * the result to rscheme. The latter is questionable, but we avoid to * take yet another lock around the interpreter callback. */ static void askemos_pool_receive(struct askemos_pool * pool, askemos_request_function_t *function, void **data, obj *callback) { askemos_pool_entry_t *result = NULL; pthread_mutex_lock(&pool->mutex); do { if (pool->free != pool->total) { unsigned short int target = (pool->next + pool->free) % pool->total; result = &pool->r[target]; ++pool->free; *function = result->function; *data = result->data; *callback = result->callback; } else { pthread_cond_wait(&pool->has_job, &pool->mutex); } } while( result == NULL ); pthread_mutex_unlock(&pool->mutex); pthread_cond_signal(&pool->has_space); } static struct askemos_pool *request_pool = NULL; void start_asynchronous_request(askemos_request_function_t function, void *data, obj callback) { if( request_pool == NULL ) { fprintf(stderr, "thread pool not initialised\n"); exit(1); } askemos_pool_send(request_pool, function, data, callback); } static pthread_mutex_t callback_mutex; #ifdef ___CHICKEN static int the_interrupt_pipe[2] = {0, 0}; C_word the_callback = C_SCHEME_FALSE; C_word the_result = 0; static int C_notify_external_interrupt() { static char buf[1] = { (char) 254 }; if(write(the_interrupt_pipe[1], buf, 1)) ; } #endif static void * worker_thread_loop(void *arg) { struct askemos_pool *pool = arg; askemos_request_function_t function; void *data; obj callback; int result; // pthread_cleanup_push(worker_thread_unlock, ressources); while (1) { askemos_pool_receive(request_pool, &function, &data, &callback); result = (*function)(data); pthread_mutex_lock(&callback_mutex); #ifdef ___CHICKEN the_result = result; the_callback = callback; /* CHICKEN_interrupt(1); */ C_notify_external_interrupt(); #else rscheme_intr_call1(callback, int2fx(result)); pthread_mutex_unlock(&callback_mutex); #endif } // pthread_cleanup_pop(1); return NULL; } void askemos_pre_init() { pthread_mutex_init(&callback_mutex, NULL); request_pool = malloc(sizeof(struct askemos_pool)); askemos_pool_init(request_pool); #ifdef ___CHICKEN if( pipe(the_interrupt_pipe) == -1 ) fprintf(stderr, "Failed to open interrupt pipe\n"); #endif } #else /* NO_THREAD_LOOP */ typedef int (*askemos_request_function_t)(void *); void start_asynchronous_request(askemos_request_function_t function, void *data, obj callback){} void askemos_pre_init() { fprintf(stderr, "thread pool not initialised\n"); } #endif /* int test_askemos_thread_sleep(void *data) { int time = (int) data; sleep(time); return time; } */ EOF ) ) (module pthreads ( external-wait ) (import scheme (except chicken condition?) foreign lolevel extras srfi-18 srfi-34 srfi-35) (define-foreign-variable interrupt-callback scheme-object "the_callback") (define-foreign-variable callback-result int "the_result") ((foreign-lambda* void () "askemos_pre_init();")) (define (handle-callback) (if interrupt-callback (let ((cb ((foreign-lambda* scheme-object () "return(CHICKEN_gc_root_ref(the_callback));"))) (rc callback-result)) (set! interrupt-callback #f) ((foreign-lambda* void () ;; "CHICKEN_interrupt(0);" "pthread_mutex_unlock(&callback_mutex);")) (cb rc))) ) (define external-wait (thread-start! (make-thread (lambda () (let ((fd ((foreign-lambda* int () "return(the_interrupt_pipe[0]);")))) (do () (#f) (thread-wait-for-i/o! fd) ((foreign-lambda* void () "static int buf[1]; int r = read(the_interrupt_pipe[0], buf, 1);")) (guard (ex (else #f)) (handle-callback))))) "external-wait"))) ) ;; module pthreads