##// END OF EJS Templates
perf: add command to benchmark bundle reading...
perf: add command to benchmark bundle reading Upcoming commits will be refactoring bundle2 I/O code. This commit establishes a `hg perfbundleread` command that measures how long it takes to read a bundle using various mechanisms. As a baseline, here's output from an uncompressed bundle1 bundle of my Firefox repo (7,098,622,890 bytes): ! read(8k) ! wall 0.763481 comb 0.760000 user 0.160000 sys 0.600000 (best of 6) ! read(16k) ! wall 0.644512 comb 0.640000 user 0.110000 sys 0.530000 (best of 16) ! read(32k) ! wall 0.581172 comb 0.590000 user 0.060000 sys 0.530000 (best of 18) ! read(128k) ! wall 0.535183 comb 0.530000 user 0.010000 sys 0.520000 (best of 19) ! cg1 deltaiter() ! wall 0.873500 comb 0.880000 user 0.840000 sys 0.040000 (best of 12) ! cg1 getchunks() ! wall 6.283797 comb 6.270000 user 5.570000 sys 0.700000 (best of 3) ! cg1 read(8k) ! wall 1.097173 comb 1.100000 user 0.400000 sys 0.700000 (best of 10) ! cg1 read(16k) ! wall 0.810750 comb 0.800000 user 0.200000 sys 0.600000 (best of 13) ! cg1 read(32k) ! wall 0.671215 comb 0.670000 user 0.110000 sys 0.560000 (best of 15) ! cg1 read(128k) ! wall 0.597857 comb 0.600000 user 0.020000 sys 0.580000 (best of 15) And from an uncompressed bundle2 bundle (6,070,036,163 bytes): ! read(8k) ! wall 0.676997 comb 0.680000 user 0.160000 sys 0.520000 (best of 15) ! read(16k) ! wall 0.592706 comb 0.590000 user 0.080000 sys 0.510000 (best of 17) ! read(32k) ! wall 0.529395 comb 0.530000 user 0.050000 sys 0.480000 (best of 16) ! read(128k) ! wall 0.491270 comb 0.490000 user 0.010000 sys 0.480000 (best of 19) ! bundle2 forwardchunks() ! wall 2.997131 comb 2.990000 user 2.270000 sys 0.720000 (best of 4) ! bundle2 iterparts() ! wall 12.247197 comb 10.670000 user 8.170000 sys 2.500000 (best of 3) ! bundle2 part seek() ! wall 11.761675 comb 10.500000 user 8.240000 sys 2.260000 (best of 3) ! bundle2 part read(8k) ! wall 9.116163 comb 9.110000 user 8.240000 sys 0.870000 (best of 3) ! bundle2 part read(16k) ! wall 8.984362 comb 8.970000 user 8.110000 sys 0.860000 (best of 3) ! bundle2 part read(32k) ! wall 8.758364 comb 8.740000 user 7.860000 sys 0.880000 (best of 3) ! bundle2 part read(128k) ! wall 8.749040 comb 8.730000 user 7.830000 sys 0.900000 (best of 3) We already see some interesting data. Notably that bundle2 has significant overhead compared to bundle1. This matters for e.g. stream clone bundles, which can be applied at >1Gbps. Differential Revision: https://phab.mercurial-scm.org/D1385

File last commit:

r30895:c32454d6 default
r35108:e9661304 default
Show More
pool.c
194 lines | 6.0 KiB | text/x-c | CLexer
/**
* Copyright (c) 2016-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
/* ====== Dependencies ======= */
#include <stddef.h> /* size_t */
#include <stdlib.h> /* malloc, calloc, free */
#include "pool.h"
/* ====== Compiler specifics ====== */
#if defined(_MSC_VER)
# pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */
#endif
#ifdef ZSTD_MULTITHREAD
#include "threading.h" /* pthread adaptation */
/* A job is a function and an opaque argument */
typedef struct POOL_job_s {
POOL_function function;
void *opaque;
} POOL_job;
struct POOL_ctx_s {
/* Keep track of the threads */
pthread_t *threads;
size_t numThreads;
/* The queue is a circular buffer */
POOL_job *queue;
size_t queueHead;
size_t queueTail;
size_t queueSize;
/* The mutex protects the queue */
pthread_mutex_t queueMutex;
/* Condition variable for pushers to wait on when the queue is full */
pthread_cond_t queuePushCond;
/* Condition variables for poppers to wait on when the queue is empty */
pthread_cond_t queuePopCond;
/* Indicates if the queue is shutting down */
int shutdown;
};
/* POOL_thread() :
Work thread for the thread pool.
Waits for jobs and executes them.
@returns : NULL on failure else non-null.
*/
static void* POOL_thread(void* opaque) {
POOL_ctx* const ctx = (POOL_ctx*)opaque;
if (!ctx) { return NULL; }
for (;;) {
/* Lock the mutex and wait for a non-empty queue or until shutdown */
pthread_mutex_lock(&ctx->queueMutex);
while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) {
pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
}
/* empty => shutting down: so stop */
if (ctx->queueHead == ctx->queueTail) {
pthread_mutex_unlock(&ctx->queueMutex);
return opaque;
}
/* Pop a job off the queue */
{ POOL_job const job = ctx->queue[ctx->queueHead];
ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
/* Unlock the mutex, signal a pusher, and run the job */
pthread_mutex_unlock(&ctx->queueMutex);
pthread_cond_signal(&ctx->queuePushCond);
job.function(job.opaque);
}
}
/* Unreachable */
}
POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
POOL_ctx *ctx;
/* Check the parameters */
if (!numThreads || !queueSize) { return NULL; }
/* Allocate the context and zero initialize */
ctx = (POOL_ctx *)calloc(1, sizeof(POOL_ctx));
if (!ctx) { return NULL; }
/* Initialize the job queue.
* It needs one extra space since one space is wasted to differentiate empty
* and full queues.
*/
ctx->queueSize = queueSize + 1;
ctx->queue = (POOL_job *)malloc(ctx->queueSize * sizeof(POOL_job));
ctx->queueHead = 0;
ctx->queueTail = 0;
pthread_mutex_init(&ctx->queueMutex, NULL);
pthread_cond_init(&ctx->queuePushCond, NULL);
pthread_cond_init(&ctx->queuePopCond, NULL);
ctx->shutdown = 0;
/* Allocate space for the thread handles */
ctx->threads = (pthread_t *)malloc(numThreads * sizeof(pthread_t));
ctx->numThreads = 0;
/* Check for errors */
if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
/* Initialize the threads */
{ size_t i;
for (i = 0; i < numThreads; ++i) {
if (pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) {
ctx->numThreads = i;
POOL_free(ctx);
return NULL;
} }
ctx->numThreads = numThreads;
}
return ctx;
}
/*! POOL_join() :
Shutdown the queue, wake any sleeping threads, and join all of the threads.
*/
static void POOL_join(POOL_ctx *ctx) {
/* Shut down the queue */
pthread_mutex_lock(&ctx->queueMutex);
ctx->shutdown = 1;
pthread_mutex_unlock(&ctx->queueMutex);
/* Wake up sleeping threads */
pthread_cond_broadcast(&ctx->queuePushCond);
pthread_cond_broadcast(&ctx->queuePopCond);
/* Join all of the threads */
{ size_t i;
for (i = 0; i < ctx->numThreads; ++i) {
pthread_join(ctx->threads[i], NULL);
} }
}
void POOL_free(POOL_ctx *ctx) {
if (!ctx) { return; }
POOL_join(ctx);
pthread_mutex_destroy(&ctx->queueMutex);
pthread_cond_destroy(&ctx->queuePushCond);
pthread_cond_destroy(&ctx->queuePopCond);
if (ctx->queue) free(ctx->queue);
if (ctx->threads) free(ctx->threads);
free(ctx);
}
void POOL_add(void *ctxVoid, POOL_function function, void *opaque) {
POOL_ctx *ctx = (POOL_ctx *)ctxVoid;
if (!ctx) { return; }
pthread_mutex_lock(&ctx->queueMutex);
{ POOL_job const job = {function, opaque};
/* Wait until there is space in the queue for the new job */
size_t newTail = (ctx->queueTail + 1) % ctx->queueSize;
while (ctx->queueHead == newTail && !ctx->shutdown) {
pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
newTail = (ctx->queueTail + 1) % ctx->queueSize;
}
/* The queue is still going => there is space */
if (!ctx->shutdown) {
ctx->queue[ctx->queueTail] = job;
ctx->queueTail = newTail;
}
}
pthread_mutex_unlock(&ctx->queueMutex);
pthread_cond_signal(&ctx->queuePopCond);
}
#else /* ZSTD_MULTITHREAD not defined */
/* No multi-threading support */
/* We don't need any data, but if it is empty malloc() might return NULL. */
struct POOL_ctx_s {
int data;
};
POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
(void)numThreads;
(void)queueSize;
return (POOL_ctx *)malloc(sizeof(POOL_ctx));
}
void POOL_free(POOL_ctx *ctx) {
if (ctx) free(ctx);
}
void POOL_add(void *ctx, POOL_function function, void *opaque) {
(void)ctx;
function(opaque);
}
#endif /* ZSTD_MULTITHREAD */