##// END OF EJS Templates
rust-pyo3: exposition of AncestorsIterator...
rust-pyo3: exposition of AncestorsIterator Compared to the early experiments, we have one less `RwLock` in the wrapping. Still that is somewhat redundant with `UnsafePyLeaked` being itself some kind of lock. In the original rust-cpython code, we were borrowing the `RefCell` with a method that can panic. Instead we are now converting the `PoisonError` that unlocking a `RwLock` can produce. Since all methods acquiring the `RwLock` are themselves protected by the GIL and do not release it before returning, nor do they leak RwLock guards, there is no risk of contention on these locks themselves.

File last commit:

r44446:de783805 default
r53428:4c9e3198 default
Show More
pool.c
344 lines | 10.8 KiB | text/x-c | CLexer
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 /*
* Copyright (c) 2016-present, Yann Collet, Facebook, Inc.
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 * All rights reserved.
*
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 * This source code is licensed under both the BSD-style license (found in the
* LICENSE file in the root directory of this source tree) and the GPLv2 (found
* in the COPYING file in the root directory of this source tree).
* You may select, at your option, one of the above-listed licenses.
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 */
/* ====== Dependencies ======= */
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 #include <stddef.h> /* size_t */
#include "debug.h" /* assert */
#include "zstd_internal.h" /* ZSTD_malloc, ZSTD_free */
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 #include "pool.h"
/* ====== Compiler specifics ====== */
#if defined(_MSC_VER)
# pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */
#endif
#ifdef ZSTD_MULTITHREAD
#include "threading.h" /* pthread adaptation */
/* A job is a function and an opaque argument */
typedef struct POOL_job_s {
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 POOL_function function;
void *opaque;
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 } POOL_job;
struct POOL_ctx_s {
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 ZSTD_customMem customMem;
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 /* Keep track of the threads */
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 ZSTD_pthread_t* threads;
size_t threadCapacity;
size_t threadLimit;
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895
/* The queue is a circular buffer */
POOL_job *queue;
size_t queueHead;
size_t queueTail;
size_t queueSize;
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
/* The number of threads working on jobs */
size_t numThreadsBusy;
/* Indicates if the queue is empty */
int queueEmpty;
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 /* The mutex protects the queue */
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 ZSTD_pthread_mutex_t queueMutex;
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 /* Condition variable for pushers to wait on when the queue is full */
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 ZSTD_pthread_cond_t queuePushCond;
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 /* Condition variables for poppers to wait on when the queue is empty */
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 ZSTD_pthread_cond_t queuePopCond;
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 /* Indicates if the queue is shutting down */
int shutdown;
};
/* POOL_thread() :
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 * Work thread for the thread pool.
* Waits for jobs and executes them.
* @returns : NULL on failure else non-null.
*/
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 static void* POOL_thread(void* opaque) {
POOL_ctx* const ctx = (POOL_ctx*)opaque;
if (!ctx) { return NULL; }
for (;;) {
/* Lock the mutex and wait for a non-empty queue or until shutdown */
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 ZSTD_pthread_mutex_lock(&ctx->queueMutex);
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 while ( ctx->queueEmpty
|| (ctx->numThreadsBusy >= ctx->threadLimit) ) {
if (ctx->shutdown) {
/* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit),
* a few threads will be shutdown while !queueEmpty,
* but enough threads will remain active to finish the queue */
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
return opaque;
}
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 }
/* Pop a job off the queue */
{ POOL_job const job = ctx->queue[ctx->queueHead];
ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 ctx->numThreadsBusy++;
ctx->queueEmpty = ctx->queueHead == ctx->queueTail;
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 /* Unlock the mutex, signal a pusher, and run the job */
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 ZSTD_pthread_cond_signal(&ctx->queuePushCond);
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 job.function(job.opaque);
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
/* If the intended queue size was 0, signal after finishing job */
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 ZSTD_pthread_mutex_lock(&ctx->queueMutex);
ctx->numThreadsBusy--;
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 if (ctx->queueSize == 1) {
ZSTD_pthread_cond_signal(&ctx->queuePushCond);
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 }
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
}
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 } /* for (;;) */
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 assert(0); /* Unreachable */
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 }
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
}
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize,
ZSTD_customMem customMem) {
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 POOL_ctx* ctx;
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 /* Check parameters */
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 if (!numThreads) { return NULL; }
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 /* Allocate the context and zero initialize */
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 ctx = (POOL_ctx*)ZSTD_calloc(sizeof(POOL_ctx), customMem);
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 if (!ctx) { return NULL; }
/* Initialize the job queue.
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 * It needs one extra space since one space is wasted to differentiate
* empty and full queues.
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 */
ctx->queueSize = queueSize + 1;
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 ctx->queue = (POOL_job*)ZSTD_malloc(ctx->queueSize * sizeof(POOL_job), customMem);
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 ctx->queueHead = 0;
ctx->queueTail = 0;
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 ctx->numThreadsBusy = 0;
ctx->queueEmpty = 1;
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 {
int error = 0;
error |= ZSTD_pthread_mutex_init(&ctx->queueMutex, NULL);
error |= ZSTD_pthread_cond_init(&ctx->queuePushCond, NULL);
error |= ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL);
if (error) { POOL_free(ctx); return NULL; }
}
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 ctx->shutdown = 0;
/* Allocate space for the thread handles */
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 ctx->threads = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), customMem);
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 ctx->threadCapacity = 0;
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 ctx->customMem = customMem;
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 /* Check for errors */
if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
/* Initialize the threads */
{ size_t i;
for (i = 0; i < numThreads; ++i) {
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) {
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 ctx->threadCapacity = i;
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 POOL_free(ctx);
return NULL;
} }
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 ctx->threadCapacity = numThreads;
ctx->threadLimit = numThreads;
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 }
return ctx;
}
/*! POOL_join() :
Shutdown the queue, wake any sleeping threads, and join all of the threads.
*/
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 static void POOL_join(POOL_ctx* ctx) {
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 /* Shut down the queue */
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 ZSTD_pthread_mutex_lock(&ctx->queueMutex);
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 ctx->shutdown = 1;
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 /* Wake up sleeping threads */
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 ZSTD_pthread_cond_broadcast(&ctx->queuePushCond);
ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 /* Join all of the threads */
{ size_t i;
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 for (i = 0; i < ctx->threadCapacity; ++i) {
ZSTD_pthread_join(ctx->threads[i], NULL); /* note : could fail */
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 } }
}
void POOL_free(POOL_ctx *ctx) {
if (!ctx) { return; }
POOL_join(ctx);
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 ZSTD_pthread_mutex_destroy(&ctx->queueMutex);
ZSTD_pthread_cond_destroy(&ctx->queuePushCond);
ZSTD_pthread_cond_destroy(&ctx->queuePopCond);
ZSTD_free(ctx->queue, ctx->customMem);
ZSTD_free(ctx->threads, ctx->customMem);
ZSTD_free(ctx, ctx->customMem);
}
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 size_t POOL_sizeof(POOL_ctx *ctx) {
if (ctx==NULL) return 0; /* supports sizeof NULL */
return sizeof(*ctx)
+ ctx->queueSize * sizeof(POOL_job)
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 + ctx->threadCapacity * sizeof(ZSTD_pthread_t);
}
/* @return : 0 on success, 1 on error */
static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
{
if (numThreads <= ctx->threadCapacity) {
if (!numThreads) return 1;
ctx->threadLimit = numThreads;
return 0;
}
/* numThreads > threadCapacity */
{ ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), ctx->customMem);
if (!threadPool) return 1;
/* replace existing thread pool */
memcpy(threadPool, ctx->threads, ctx->threadCapacity * sizeof(*threadPool));
ZSTD_free(ctx->threads, ctx->customMem);
ctx->threads = threadPool;
/* Initialize additional threads */
{ size_t threadId;
for (threadId = ctx->threadCapacity; threadId < numThreads; ++threadId) {
if (ZSTD_pthread_create(&threadPool[threadId], NULL, &POOL_thread, ctx)) {
ctx->threadCapacity = threadId;
return 1;
} }
} }
/* successfully expanded */
ctx->threadCapacity = numThreads;
ctx->threadLimit = numThreads;
return 0;
}
/* @return : 0 on success, 1 on error */
int POOL_resize(POOL_ctx* ctx, size_t numThreads)
{
int result;
if (ctx==NULL) return 1;
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
result = POOL_resize_internal(ctx, numThreads);
ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
return result;
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 }
/**
* Returns 1 if the queue is full and 0 otherwise.
*
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 * When queueSize is 1 (pool was created with an intended queueSize of 0),
* then a queue is empty if there is a thread free _and_ no job is waiting.
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 */
static int isQueueFull(POOL_ctx const* ctx) {
if (ctx->queueSize > 1) {
return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
} else {
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 return (ctx->numThreadsBusy == ctx->threadLimit) ||
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 !ctx->queueEmpty;
}
}
static void POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque)
{
POOL_job const job = {function, opaque};
assert(ctx != NULL);
if (ctx->shutdown) return;
ctx->queueEmpty = 0;
ctx->queue[ctx->queueTail] = job;
ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
ZSTD_pthread_cond_signal(&ctx->queuePopCond);
}
void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque)
{
assert(ctx != NULL);
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
/* Wait until there is space in the queue for the new job */
while (isQueueFull(ctx) && (!ctx->shutdown)) {
ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
}
POOL_add_internal(ctx, function, opaque);
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 }
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque)
{
assert(ctx != NULL);
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
if (isQueueFull(ctx)) {
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
return 0;
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 }
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 POOL_add_internal(ctx, function, opaque);
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
return 1;
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 }
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 #else /* ZSTD_MULTITHREAD not defined */
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
/* ========================== */
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 /* No multi-threading support */
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 /* ========================== */
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
/* We don't need any data, but if it is empty, malloc() might return NULL. */
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 struct POOL_ctx_s {
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 int dummy;
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 };
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 static POOL_ctx g_ctx;
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 }
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) {
(void)numThreads;
(void)queueSize;
(void)customMem;
return &g_ctx;
}
void POOL_free(POOL_ctx* ctx) {
assert(!ctx || ctx == &g_ctx);
(void)ctx;
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 }
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 int POOL_resize(POOL_ctx* ctx, size_t numThreads) {
(void)ctx; (void)numThreads;
return 0;
}
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) {
(void)ctx;
function(opaque);
}
int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) {
(void)ctx;
function(opaque);
return 1;
}
size_t POOL_sizeof(POOL_ctx* ctx) {
if (ctx==NULL) return 0; /* supports sizeof NULL */
assert(ctx == &g_ctx);
return sizeof(*ctx);
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 }
#endif /* ZSTD_MULTITHREAD */