chromium/third_party/zstd/src/lib/compress/zstdmt_compress.c

/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 * All rights reserved.
 *
 * This source code is licensed under both the BSD-style license (found in the
 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
 * in the COPYING file in the root directory of this source tree).
 * You may select, at your option, one of the above-listed licenses.
 */


/* ======   Compiler specifics   ====== */
#if defined(_MSC_VER)
#  pragma warning(disable : 4204)   /* disable: C4204: non-constant aggregate initializer */
#endif


/* ======   Dependencies   ====== */
#include "../common/allocations.h" /* ZSTD_customMalloc, ZSTD_customCalloc, ZSTD_customFree */
#include "../common/zstd_deps.h"   /* ZSTD_memcpy, ZSTD_memset, INT_MAX, UINT_MAX */
#include "../common/mem.h"         /* MEM_STATIC */
#include "../common/pool.h"        /* threadpool */
#include "../common/threading.h"   /* mutex */
#include "zstd_compress_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */
#include "zstd_ldm.h"
#include "zstdmt_compress.h"

/* Guards code to support resizing the SeqPool.
 * We will want to resize the SeqPool to save memory in the future.
 * Until then, comment the code out since it is unused.
 */
#define ZSTD_RESIZE_SEQPOOL

/* ======   Debug   ====== */
#if defined(DEBUGLEVEL) && (DEBUGLEVEL>=2) \
    && !defined(_MSC_VER) \
    && !defined(__MINGW32__)

#  include <stdio.h>
#  include <unistd.h>
#  include <sys/times.h>

#define DEBUG_PRINTHEX

static unsigned long long GetCurrentClockTimeMicroseconds(void)
{
   static clock_t _ticksPerSecond = 0;
   if (_ticksPerSecond <= 0) _ticksPerSecond = sysconf(_SC_CLK_TCK);

   {   struct tms junk; clock_t newTicks = (clock_t) times(&junk);
       return ((((unsigned long long)newTicks)*(1000000))/_ticksPerSecond);
}  }

#define MUTEX_WAIT_TIME_DLEVEL
#define ZSTD_PTHREAD_MUTEX_LOCK

#else

#define ZSTD_PTHREAD_MUTEX_LOCK(m)
#define DEBUG_PRINTHEX(l,p,n)

#endif


/* =====   Buffer Pool   ===== */
/* a single Buffer Pool can be invoked from multiple threads in parallel */

buffer_t;

static const buffer_t g_nullBuffer =;

ZSTDMT_bufferPool;

static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool* bufPool)
{}

static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned maxNbBuffers, ZSTD_customMem cMem)
{}

/* only works at initialization, not during compression */
static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool)
{}

/* ZSTDMT_setBufferSize() :
 * all future buffers provided by this buffer pool will have _at least_ this size
 * note : it's better for all buffers to have same size,
 * as they become freely interchangeable, reducing malloc/free usages and memory fragmentation */
static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* const bufPool, size_t const bSize)
{}


static ZSTDMT_bufferPool* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool* srcBufPool, unsigned maxNbBuffers)
{}

/** ZSTDMT_getBuffer() :
 *  assumption : bufPool must be valid
 * @return : a buffer, with start pointer and size
 *  note: allocation may fail, in this case, start==NULL and size==0 */
static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool)
{}

#if ZSTD_RESIZE_SEQPOOL
/** ZSTDMT_resizeBuffer() :
 * assumption : bufPool must be valid
 * @return : a buffer that is at least the buffer pool buffer size.
 *           If a reallocation happens, the data in the input buffer is copied.
 */
static buffer_t ZSTDMT_resizeBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buffer)
{
    size_t const bSize = bufPool->bufferSize;
    if (buffer.capacity < bSize) {
        void* const start = ZSTD_customMalloc(bSize, bufPool->cMem);
        buffer_t newBuffer;
        newBuffer.start = start;
        newBuffer.capacity = start == NULL ? 0 : bSize;
        if (start != NULL) {
            assert(newBuffer.capacity >= buffer.capacity);
            ZSTD_memcpy(newBuffer.start, buffer.start, buffer.capacity);
            DEBUGLOG(5, "ZSTDMT_resizeBuffer: created buffer of size %u", (U32)bSize);
            return newBuffer;
        }
        DEBUGLOG(5, "ZSTDMT_resizeBuffer: buffer allocation failure !!");
    }
    return buffer;
}
#endif

/* store buffer for later re-use, up to pool capacity */
static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
{}

/* We need 2 output buffers per worker since each dstBuff must be flushed after it is released.
 * The 3 additional buffers are as follows:
 *   1 buffer for input loading
 *   1 buffer for "next input" when submitting current one
 *   1 buffer stuck in queue */
#define BUF_POOL_MAX_NB_BUFFERS(nbWorkers)

/* After a worker releases its rawSeqStore, it is immediately ready for reuse.
 * So we only need one seq buffer per worker. */
#define SEQ_POOL_MAX_NB_BUFFERS(nbWorkers)

/* =====   Seq Pool Wrapper   ====== */

ZSTDMT_seqPool;

static size_t ZSTDMT_sizeof_seqPool(ZSTDMT_seqPool* seqPool)
{}

static rawSeqStore_t bufferToSeq(buffer_t buffer)
{}

static buffer_t seqToBuffer(rawSeqStore_t seq)
{}

static rawSeqStore_t ZSTDMT_getSeq(ZSTDMT_seqPool* seqPool)
{}

#if ZSTD_RESIZE_SEQPOOL
static rawSeqStore_t ZSTDMT_resizeSeq(ZSTDMT_seqPool* seqPool, rawSeqStore_t seq)
{
  return bufferToSeq(ZSTDMT_resizeBuffer(seqPool, seqToBuffer(seq)));
}
#endif

static void ZSTDMT_releaseSeq(ZSTDMT_seqPool* seqPool, rawSeqStore_t seq)
{}

static void ZSTDMT_setNbSeq(ZSTDMT_seqPool* const seqPool, size_t const nbSeq)
{}

static ZSTDMT_seqPool* ZSTDMT_createSeqPool(unsigned nbWorkers, ZSTD_customMem cMem)
{}

static void ZSTDMT_freeSeqPool(ZSTDMT_seqPool* seqPool)
{}

static ZSTDMT_seqPool* ZSTDMT_expandSeqPool(ZSTDMT_seqPool* pool, U32 nbWorkers)
{}


/* =====   CCtx Pool   ===== */
/* a single CCtx Pool can be invoked from multiple threads in parallel */

ZSTDMT_CCtxPool;

/* note : all CCtx borrowed from the pool must be reverted back to the pool _before_ freeing the pool */
static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool)
{}

/* ZSTDMT_createCCtxPool() :
 * implies nbWorkers >= 1 , checked by caller ZSTDMT_createCCtx() */
static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(int nbWorkers,
                                              ZSTD_customMem cMem)
{}

static ZSTDMT_CCtxPool* ZSTDMT_expandCCtxPool(ZSTDMT_CCtxPool* srcPool,
                                              int nbWorkers)
{}

/* only works during initialization phase, not during compression */
static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool)
{}

static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* cctxPool)
{}

static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
{}

/* ====   Serial State   ==== */

range_t;

serialState_t;

static int
ZSTDMT_serialState_reset(serialState_t* serialState,
                         ZSTDMT_seqPool* seqPool,
                         ZSTD_CCtx_params params,
                         size_t jobSize,
                         const void* dict, size_t const dictSize,
                         ZSTD_dictContentType_e dictContentType)
{}

static int ZSTDMT_serialState_init(serialState_t* serialState)
{}

static void ZSTDMT_serialState_free(serialState_t* serialState)
{}

static void ZSTDMT_serialState_update(serialState_t* serialState,
                                      ZSTD_CCtx* jobCCtx, rawSeqStore_t seqStore,
                                      range_t src, unsigned jobID)
{}

static void ZSTDMT_serialState_ensureFinished(serialState_t* serialState,
                                              unsigned jobID, size_t cSize)
{}


/* ------------------------------------------ */
/* =====          Worker thread         ===== */
/* ------------------------------------------ */

static const range_t kNullRange =;

ZSTDMT_jobDescription;

#define JOB_ERROR(e)

/* ZSTDMT_compressionJob() is a POOL_function type */
static void ZSTDMT_compressionJob(void* jobDescription)
{}


/* ------------------------------------------ */
/* =====   Multi-threaded compression   ===== */
/* ------------------------------------------ */

inBuff_t;

roundBuff_t;

static const roundBuff_t kNullRoundBuff =;

#define RSYNC_LENGTH
/* Don't create chunks smaller than the zstd block size.
 * This stops us from regressing compression ratio too much,
 * and ensures our output fits in ZSTD_compressBound().
 *
 * If this is shrunk < ZSTD_BLOCKSIZELOG_MIN then
 * ZSTD_COMPRESSBOUND() will need to be updated.
 */
#define RSYNC_MIN_BLOCK_LOG
#define RSYNC_MIN_BLOCK_SIZE

rsyncState_t;

struct ZSTDMT_CCtx_s {};

static void ZSTDMT_freeJobsTable(ZSTDMT_jobDescription* jobTable, U32 nbJobs, ZSTD_customMem cMem)
{}

/* ZSTDMT_allocJobsTable()
 * allocate and init a job table.
 * update *nbJobsPtr to next power of 2 value, as size of table */
static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem)
{}

static size_t ZSTDMT_expandJobsTable (ZSTDMT_CCtx* mtctx, U32 nbWorkers) {}


/* ZSTDMT_CCtxParam_setNbWorkers():
 * Internal use only */
static size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers)
{}

MEM_STATIC ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem, ZSTD_threadPool* pool)
{}

ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem, ZSTD_threadPool* pool)
{}


/* ZSTDMT_releaseAllJobResources() :
 * note : ensure all workers are killed first ! */
static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)
{}

static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx)
{}

size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
{}

size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx)
{}


/* ZSTDMT_resize() :
 * @return : error code if fails, 0 on success */
static size_t ZSTDMT_resize(ZSTDMT_CCtx* mtctx, unsigned nbWorkers)
{}


/*! ZSTDMT_updateCParams_whileCompressing() :
 *  Updates a selected set of compression parameters, remaining compatible with currently active frame.
 *  New parameters will be applied to next compression job. */
void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* cctxParams)
{}

/* ZSTDMT_getFrameProgression():
 * tells how much data has been consumed (input) and produced (output) for current frame.
 * able to count progression inside worker threads.
 * Note : mutex will be acquired during statistics collection inside workers. */
ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
{}


size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx)
{}


/* ------------------------------------------ */
/* =====   Multi-threaded compression   ===== */
/* ------------------------------------------ */

static unsigned ZSTDMT_computeTargetJobLog(const ZSTD_CCtx_params* params)
{}

static int ZSTDMT_overlapLog_default(ZSTD_strategy strat)
{}

static int ZSTDMT_overlapLog(int ovlog, ZSTD_strategy strat)
{}

static size_t ZSTDMT_computeOverlapSize(const ZSTD_CCtx_params* params)
{}

/* ====================================== */
/* =======      Streaming API     ======= */
/* ====================================== */

size_t ZSTDMT_initCStream_internal(
        ZSTDMT_CCtx* mtctx,
        const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType,
        const ZSTD_CDict* cdict, ZSTD_CCtx_params params,
        unsigned long long pledgedSrcSize)
{}


/* ZSTDMT_writeLastEmptyBlock()
 * Write a single empty block with an end-of-frame to finish a frame.
 * Job must be created from streaming variant.
 * This function is always successful if expected conditions are fulfilled.
 */
static void ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job)
{}

static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZSTD_EndDirective endOp)
{}


/*! ZSTDMT_flushProduced() :
 *  flush whatever data has been produced but not yet flushed in current job.
 *  move to next job if current one is fully flushed.
 * `output` : `pos` will be updated with amount of data flushed .
 * `blockToFlush` : if >0, the function will block and wait if there is no data available to flush .
 * @return : amount of data remaining within internal buffer, 0 if no more, 1 if unknown but > 0, or an error code */
static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned blockToFlush, ZSTD_EndDirective end)
{}

/**
 * Returns the range of data used by the earliest job that is not yet complete.
 * If the data of the first job is broken up into two segments, we cover both
 * sections.
 */
static range_t ZSTDMT_getInputDataInUse(ZSTDMT_CCtx* mtctx)
{}

/**
 * Returns non-zero iff buffer and range overlap.
 */
static int ZSTDMT_isOverlapped(buffer_t buffer, range_t range)
{}

static int ZSTDMT_doesOverlapWindow(buffer_t buffer, ZSTD_window_t window)
{}

static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, buffer_t buffer)
{}

/**
 * Attempts to set the inBuff to the next section to fill.
 * If any part of the new section is still in use we give up.
 * Returns non-zero if the buffer is filled.
 */
static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx)
{}

syncPoint_t;

/**
 * Searches through the input for a synchronization point. If one is found, we
 * will instruct the caller to flush, and return the number of bytes to load.
 * Otherwise, we will load as many bytes as possible and instruct the caller
 * to continue as normal.
 */
static syncPoint_t
findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input)
{}

size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx* mtctx)
{}

/** ZSTDMT_compressStream_generic() :
 *  internal use only - exposed to be invoked from zstd_compress.c
 *  assumption : output and input are valid (pos <= size)
 * @return : minimum amount of data remaining to flush, 0 if none */
size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
                                     ZSTD_outBuffer* output,
                                     ZSTD_inBuffer* input,
                                     ZSTD_EndDirective endOp)
{}