#if defined(_MSC_VER)
# pragma warning(disable : 4204)
#endif
#include "../common/allocations.h"
#include "../common/zstd_deps.h"
#include "../common/mem.h"
#include "../common/pool.h"
#include "../common/threading.h"
#include "zstd_compress_internal.h"
#include "zstd_ldm.h"
#include "zstdmt_compress.h"
#define ZSTD_RESIZE_SEQPOOL …
#if defined(DEBUGLEVEL) && (DEBUGLEVEL>=2) \
&& !defined(_MSC_VER) \
&& !defined(__MINGW32__)
# include <stdio.h>
# include <unistd.h>
# include <sys/times.h>
#define DEBUG_PRINTHEX …
static unsigned long long GetCurrentClockTimeMicroseconds(void)
{
static clock_t _ticksPerSecond = 0;
if (_ticksPerSecond <= 0) _ticksPerSecond = sysconf(_SC_CLK_TCK);
{ struct tms junk; clock_t newTicks = (clock_t) times(&junk);
return ((((unsigned long long)newTicks)*(1000000))/_ticksPerSecond);
} }
#define MUTEX_WAIT_TIME_DLEVEL …
#define ZSTD_PTHREAD_MUTEX_LOCK …
#else
#define ZSTD_PTHREAD_MUTEX_LOCK(m) …
#define DEBUG_PRINTHEX(l,p,n) …
#endif
buffer_t;
static const buffer_t g_nullBuffer = …;
ZSTDMT_bufferPool;
static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool* bufPool)
{ … }
static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned maxNbBuffers, ZSTD_customMem cMem)
{ … }
static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool)
{ … }
static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* const bufPool, size_t const bSize)
{ … }
static ZSTDMT_bufferPool* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool* srcBufPool, unsigned maxNbBuffers)
{ … }
static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool)
{ … }
#if ZSTD_RESIZE_SEQPOOL
static buffer_t ZSTDMT_resizeBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buffer)
{
size_t const bSize = bufPool->bufferSize;
if (buffer.capacity < bSize) {
void* const start = ZSTD_customMalloc(bSize, bufPool->cMem);
buffer_t newBuffer;
newBuffer.start = start;
newBuffer.capacity = start == NULL ? 0 : bSize;
if (start != NULL) {
assert(newBuffer.capacity >= buffer.capacity);
ZSTD_memcpy(newBuffer.start, buffer.start, buffer.capacity);
DEBUGLOG(5, "ZSTDMT_resizeBuffer: created buffer of size %u", (U32)bSize);
return newBuffer;
}
DEBUGLOG(5, "ZSTDMT_resizeBuffer: buffer allocation failure !!");
}
return buffer;
}
#endif
static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
{ … }
#define BUF_POOL_MAX_NB_BUFFERS(nbWorkers) …
#define SEQ_POOL_MAX_NB_BUFFERS(nbWorkers) …
ZSTDMT_seqPool;
static size_t ZSTDMT_sizeof_seqPool(ZSTDMT_seqPool* seqPool)
{ … }
static rawSeqStore_t bufferToSeq(buffer_t buffer)
{ … }
static buffer_t seqToBuffer(rawSeqStore_t seq)
{ … }
static rawSeqStore_t ZSTDMT_getSeq(ZSTDMT_seqPool* seqPool)
{ … }
#if ZSTD_RESIZE_SEQPOOL
static rawSeqStore_t ZSTDMT_resizeSeq(ZSTDMT_seqPool* seqPool, rawSeqStore_t seq)
{
return bufferToSeq(ZSTDMT_resizeBuffer(seqPool, seqToBuffer(seq)));
}
#endif
static void ZSTDMT_releaseSeq(ZSTDMT_seqPool* seqPool, rawSeqStore_t seq)
{ … }
static void ZSTDMT_setNbSeq(ZSTDMT_seqPool* const seqPool, size_t const nbSeq)
{ … }
static ZSTDMT_seqPool* ZSTDMT_createSeqPool(unsigned nbWorkers, ZSTD_customMem cMem)
{ … }
static void ZSTDMT_freeSeqPool(ZSTDMT_seqPool* seqPool)
{ … }
static ZSTDMT_seqPool* ZSTDMT_expandSeqPool(ZSTDMT_seqPool* pool, U32 nbWorkers)
{ … }
ZSTDMT_CCtxPool;
static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool)
{ … }
static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(int nbWorkers,
ZSTD_customMem cMem)
{ … }
static ZSTDMT_CCtxPool* ZSTDMT_expandCCtxPool(ZSTDMT_CCtxPool* srcPool,
int nbWorkers)
{ … }
static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool)
{ … }
static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* cctxPool)
{ … }
static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
{ … }
range_t;
serialState_t;
static int
ZSTDMT_serialState_reset(serialState_t* serialState,
ZSTDMT_seqPool* seqPool,
ZSTD_CCtx_params params,
size_t jobSize,
const void* dict, size_t const dictSize,
ZSTD_dictContentType_e dictContentType)
{ … }
static int ZSTDMT_serialState_init(serialState_t* serialState)
{ … }
static void ZSTDMT_serialState_free(serialState_t* serialState)
{ … }
static void ZSTDMT_serialState_update(serialState_t* serialState,
ZSTD_CCtx* jobCCtx, rawSeqStore_t seqStore,
range_t src, unsigned jobID)
{ … }
static void ZSTDMT_serialState_ensureFinished(serialState_t* serialState,
unsigned jobID, size_t cSize)
{ … }
static const range_t kNullRange = …;
ZSTDMT_jobDescription;
#define JOB_ERROR(e) …
static void ZSTDMT_compressionJob(void* jobDescription)
{ … }
inBuff_t;
roundBuff_t;
static const roundBuff_t kNullRoundBuff = …;
#define RSYNC_LENGTH …
#define RSYNC_MIN_BLOCK_LOG …
#define RSYNC_MIN_BLOCK_SIZE …
rsyncState_t;
struct ZSTDMT_CCtx_s { … };
static void ZSTDMT_freeJobsTable(ZSTDMT_jobDescription* jobTable, U32 nbJobs, ZSTD_customMem cMem)
{ … }
static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem)
{ … }
static size_t ZSTDMT_expandJobsTable (ZSTDMT_CCtx* mtctx, U32 nbWorkers) { … }
static size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers)
{ … }
MEM_STATIC ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem, ZSTD_threadPool* pool)
{ … }
ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem, ZSTD_threadPool* pool)
{ … }
static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)
{ … }
static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx)
{ … }
size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
{ … }
size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx)
{ … }
static size_t ZSTDMT_resize(ZSTDMT_CCtx* mtctx, unsigned nbWorkers)
{ … }
void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* cctxParams)
{ … }
ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
{ … }
size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx)
{ … }
static unsigned ZSTDMT_computeTargetJobLog(const ZSTD_CCtx_params* params)
{ … }
static int ZSTDMT_overlapLog_default(ZSTD_strategy strat)
{ … }
static int ZSTDMT_overlapLog(int ovlog, ZSTD_strategy strat)
{ … }
static size_t ZSTDMT_computeOverlapSize(const ZSTD_CCtx_params* params)
{ … }
size_t ZSTDMT_initCStream_internal(
ZSTDMT_CCtx* mtctx,
const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType,
const ZSTD_CDict* cdict, ZSTD_CCtx_params params,
unsigned long long pledgedSrcSize)
{ … }
static void ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job)
{ … }
static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZSTD_EndDirective endOp)
{ … }
static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned blockToFlush, ZSTD_EndDirective end)
{ … }
static range_t ZSTDMT_getInputDataInUse(ZSTDMT_CCtx* mtctx)
{ … }
static int ZSTDMT_isOverlapped(buffer_t buffer, range_t range)
{ … }
static int ZSTDMT_doesOverlapWindow(buffer_t buffer, ZSTD_window_t window)
{ … }
static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, buffer_t buffer)
{ … }
static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx)
{ … }
syncPoint_t;
static syncPoint_t
findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input)
{ … }
size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx* mtctx)
{ … }
size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
ZSTD_outBuffer* output,
ZSTD_inBuffer* input,
ZSTD_EndDirective endOp)
{ … }