linux/drivers/md/dm-vdo/indexer/index.c

// SPDX-License-Identifier: GPL-2.0-only
/*
 * Copyright 2023 Red Hat
 */


#include "index.h"

#include "logger.h"
#include "memory-alloc.h"

#include "funnel-requestqueue.h"
#include "hash-utils.h"
#include "sparse-cache.h"

static const u64 NO_LAST_SAVE = U64_MAX;

/*
 * When searching for deduplication records, the index first searches the volume index, and then
 * searches the chapter index for the relevant chapter. If the chapter has been fully committed to
 * storage, the chapter pages are loaded into the page cache. If the chapter has not yet been
 * committed (either the open chapter or a recently closed one), the index searches the in-memory
 * representation of the chapter. Finally, if the volume index does not find a record and the index
 * is sparse, the index will search the sparse cache.
 *
 * The index send two kinds of messages to coordinate between zones: chapter close messages for the
 * chapter writer, and sparse cache barrier messages for the sparse cache.
 *
 * The chapter writer is responsible for committing chapters of records to storage. Since zones can
 * get different numbers of records, some zones may fall behind others. Each time a zone fills up
 * its available space in a chapter, it informs the chapter writer that the chapter is complete,
 * and also informs all other zones that it has closed the chapter. Each other zone will then close
 * the chapter immediately, regardless of how full it is, in order to minimize skew between zones.
 * Once every zone has closed the chapter, the chapter writer will commit that chapter to storage.
 *
 * The last zone to close the chapter also removes the oldest chapter from the volume index.
 * Although that chapter is invalid for zones that have moved on, the existence of the open chapter
 * means that those zones will never ask the volume index about it. No zone is allowed to get more
 * than one chapter ahead of any other. If a zone is so far ahead that it tries to close another
 * chapter before the previous one has been closed by all zones, it is forced to wait.
 *
 * The sparse cache relies on having the same set of chapter indexes available to all zones. When a
 * request wants to add a chapter to the sparse cache, it sends a barrier message to each zone
 * during the triage stage that acts as a rendezvous. Once every zone has reached the barrier and
 * paused its operations, the cache membership is changed and each zone is then informed that it
 * can proceed. More details can be found in the sparse cache documentation.
 *
 * If a sparse cache has only one zone, it will not create a triage queue, but it still needs the
 * barrier message to change the sparse cache membership, so the index simulates the message by
 * invoking the handler directly.
 */

struct chapter_writer {
	/* The index to which we belong */
	struct uds_index *index;
	/* The thread to do the writing */
	struct thread *thread;
	/* The lock protecting the following fields */
	struct mutex mutex;
	/* The condition signalled on state changes */
	struct cond_var cond;
	/* Set to true to stop the thread */
	bool stop;
	/* The result from the most recent write */
	int result;
	/* The number of bytes allocated by the chapter writer */
	size_t memory_size;
	/* The number of zones which have submitted a chapter for writing */
	unsigned int zones_to_write;
	/* Open chapter index used by uds_close_open_chapter() */
	struct open_chapter_index *open_chapter_index;
	/* Collated records used by uds_close_open_chapter() */
	struct uds_volume_record *collated_records;
	/* The chapters to write (one per zone) */
	struct open_chapter_zone *chapters[];
};

static bool is_zone_chapter_sparse(const struct index_zone *zone, u64 virtual_chapter)
{
	return uds_is_chapter_sparse(zone->index->volume->geometry,
				     zone->oldest_virtual_chapter,
				     zone->newest_virtual_chapter, virtual_chapter);
}

static int launch_zone_message(struct uds_zone_message message, unsigned int zone,
			       struct uds_index *index)
{
	int result;
	struct uds_request *request;

	result = vdo_allocate(1, struct uds_request, __func__, &request);
	if (result != VDO_SUCCESS)
		return result;

	request->index = index;
	request->unbatched = true;
	request->zone_number = zone;
	request->zone_message = message;

	uds_enqueue_request(request, STAGE_MESSAGE);
	return UDS_SUCCESS;
}

static void enqueue_barrier_messages(struct uds_index *index, u64 virtual_chapter)
{
	struct uds_zone_message message = {
		.type = UDS_MESSAGE_SPARSE_CACHE_BARRIER,
		.virtual_chapter = virtual_chapter,
	};
	unsigned int zone;

	for (zone = 0; zone < index->zone_count; zone++) {
		int result = launch_zone_message(message, zone, index);

		VDO_ASSERT_LOG_ONLY((result == UDS_SUCCESS), "barrier message allocation");
	}
}

/*
 * Determine whether this request should trigger a sparse cache barrier message to change the
 * membership of the sparse cache. If a change in membership is desired, the function returns the
 * chapter number to add.
 */
static u64 triage_index_request(struct uds_index *index, struct uds_request *request)
{
	u64 virtual_chapter;
	struct index_zone *zone;

	virtual_chapter = uds_lookup_volume_index_name(index->volume_index,
						       &request->record_name);
	if (virtual_chapter == NO_CHAPTER)
		return NO_CHAPTER;

	zone = index->zones[request->zone_number];
	if (!is_zone_chapter_sparse(zone, virtual_chapter))
		return NO_CHAPTER;

	/*
	 * FIXME: Optimize for a common case by remembering the chapter from the most recent
	 * barrier message and skipping this chapter if is it the same.
	 */

	return virtual_chapter;
}

/*
 * Simulate a message to change the sparse cache membership for a single-zone sparse index. This
 * allows us to forgo the complicated locking required by a multi-zone sparse index. Any other kind
 * of index does nothing here.
 */
static int simulate_index_zone_barrier_message(struct index_zone *zone,
					       struct uds_request *request)
{
	u64 sparse_virtual_chapter;

	if ((zone->index->zone_count > 1) ||
	    !uds_is_sparse_index_geometry(zone->index->volume->geometry))
		return UDS_SUCCESS;

	sparse_virtual_chapter = triage_index_request(zone->index, request);
	if (sparse_virtual_chapter == NO_CHAPTER)
		return UDS_SUCCESS;

	return uds_update_sparse_cache(zone, sparse_virtual_chapter);
}

/* This is the request processing function for the triage queue. */
static void triage_request(struct uds_request *request)
{
	struct uds_index *index = request->index;
	u64 sparse_virtual_chapter = triage_index_request(index, request);

	if (sparse_virtual_chapter != NO_CHAPTER)
		enqueue_barrier_messages(index, sparse_virtual_chapter);

	uds_enqueue_request(request, STAGE_INDEX);
}

static int finish_previous_chapter(struct uds_index *index, u64 current_chapter_number)
{
	int result;
	struct chapter_writer *writer = index->chapter_writer;

	mutex_lock(&writer->mutex);
	while (index->newest_virtual_chapter < current_chapter_number)
		uds_wait_cond(&writer->cond, &writer->mutex);
	result = writer->result;
	mutex_unlock(&writer->mutex);

	if (result != UDS_SUCCESS)
		return vdo_log_error_strerror(result,
					      "Writing of previous open chapter failed");

	return UDS_SUCCESS;
}

static int swap_open_chapter(struct index_zone *zone)
{
	int result;

	result = finish_previous_chapter(zone->index, zone->newest_virtual_chapter);
	if (result != UDS_SUCCESS)
		return result;

	swap(zone->open_chapter, zone->writing_chapter);
	return UDS_SUCCESS;
}

/*
 * Inform the chapter writer that this zone is done with this chapter. The chapter won't start
 * writing until all zones have closed it.
 */
static unsigned int start_closing_chapter(struct uds_index *index,
					  unsigned int zone_number,
					  struct open_chapter_zone *chapter)
{
	unsigned int finished_zones;
	struct chapter_writer *writer = index->chapter_writer;

	mutex_lock(&writer->mutex);
	finished_zones = ++writer->zones_to_write;
	writer->chapters[zone_number] = chapter;
	uds_broadcast_cond(&writer->cond);
	mutex_unlock(&writer->mutex);

	return finished_zones;
}

static int announce_chapter_closed(struct index_zone *zone, u64 closed_chapter)
{
	int result;
	unsigned int i;
	struct uds_zone_message zone_message = {
		.type = UDS_MESSAGE_ANNOUNCE_CHAPTER_CLOSED,
		.virtual_chapter = closed_chapter,
	};

	for (i = 0; i < zone->index->zone_count; i++) {
		if (zone->id == i)
			continue;

		result = launch_zone_message(zone_message, i, zone->index);
		if (result != UDS_SUCCESS)
			return result;
	}

	return UDS_SUCCESS;
}

static int open_next_chapter(struct index_zone *zone)
{
	int result;
	u64 closed_chapter;
	u64 expiring;
	unsigned int finished_zones;
	u32 expire_chapters;

	vdo_log_debug("closing chapter %llu of zone %u after %u entries (%u short)",
		      (unsigned long long) zone->newest_virtual_chapter, zone->id,
		      zone->open_chapter->size,
		      zone->open_chapter->capacity - zone->open_chapter->size);

	result = swap_open_chapter(zone);
	if (result != UDS_SUCCESS)
		return result;

	closed_chapter = zone->newest_virtual_chapter++;
	uds_set_volume_index_zone_open_chapter(zone->index->volume_index, zone->id,
					       zone->newest_virtual_chapter);
	uds_reset_open_chapter(zone->open_chapter);

	finished_zones = start_closing_chapter(zone->index, zone->id,
					       zone->writing_chapter);
	if ((finished_zones == 1) && (zone->index->zone_count > 1)) {
		result = announce_chapter_closed(zone, closed_chapter);
		if (result != UDS_SUCCESS)
			return result;
	}

	expiring = zone->oldest_virtual_chapter;
	expire_chapters = uds_chapters_to_expire(zone->index->volume->geometry,
						 zone->newest_virtual_chapter);
	zone->oldest_virtual_chapter += expire_chapters;

	if (finished_zones < zone->index->zone_count)
		return UDS_SUCCESS;

	while (expire_chapters-- > 0)
		uds_forget_chapter(zone->index->volume, expiring++);

	return UDS_SUCCESS;
}

static int handle_chapter_closed(struct index_zone *zone, u64 virtual_chapter)
{
	if (zone->newest_virtual_chapter == virtual_chapter)
		return open_next_chapter(zone);

	return UDS_SUCCESS;
}

static int dispatch_index_zone_control_request(struct uds_request *request)
{
	struct uds_zone_message *message = &request->zone_message;
	struct index_zone *zone = request->index->zones[request->zone_number];

	switch (message->type) {
	case UDS_MESSAGE_SPARSE_CACHE_BARRIER:
		return uds_update_sparse_cache(zone, message->virtual_chapter);

	case UDS_MESSAGE_ANNOUNCE_CHAPTER_CLOSED:
		return handle_chapter_closed(zone, message->virtual_chapter);

	default:
		vdo_log_error("invalid message type: %d", message->type);
		return UDS_INVALID_ARGUMENT;
	}
}

static void set_request_location(struct uds_request *request,
				 enum uds_index_region new_location)
{
	request->location = new_location;
	request->found = ((new_location == UDS_LOCATION_IN_OPEN_CHAPTER) ||
			  (new_location == UDS_LOCATION_IN_DENSE) ||
			  (new_location == UDS_LOCATION_IN_SPARSE));
}

static void set_chapter_location(struct uds_request *request,
				 const struct index_zone *zone, u64 virtual_chapter)
{
	request->found = true;
	if (virtual_chapter == zone->newest_virtual_chapter)
		request->location = UDS_LOCATION_IN_OPEN_CHAPTER;
	else if (is_zone_chapter_sparse(zone, virtual_chapter))
		request->location = UDS_LOCATION_IN_SPARSE;
	else
		request->location = UDS_LOCATION_IN_DENSE;
}

static int search_sparse_cache_in_zone(struct index_zone *zone, struct uds_request *request,
				       u64 virtual_chapter, bool *found)
{
	int result;
	struct volume *volume;
	u16 record_page_number;
	u32 chapter;

	result = uds_search_sparse_cache(zone, &request->record_name, &virtual_chapter,
					 &record_page_number);
	if ((result != UDS_SUCCESS) || (virtual_chapter == NO_CHAPTER))
		return result;

	request->virtual_chapter = virtual_chapter;
	volume = zone->index->volume;
	chapter = uds_map_to_physical_chapter(volume->geometry, virtual_chapter);
	return uds_search_cached_record_page(volume, request, chapter,
					     record_page_number, found);
}

static int get_record_from_zone(struct index_zone *zone, struct uds_request *request,
				bool *found)
{
	struct volume *volume;

	if (request->location == UDS_LOCATION_RECORD_PAGE_LOOKUP) {
		*found = true;
		return UDS_SUCCESS;
	} else if (request->location == UDS_LOCATION_UNAVAILABLE) {
		*found = false;
		return UDS_SUCCESS;
	}

	if (request->virtual_chapter == zone->newest_virtual_chapter) {
		uds_search_open_chapter(zone->open_chapter, &request->record_name,
					&request->old_metadata, found);
		return UDS_SUCCESS;
	}

	if ((zone->newest_virtual_chapter > 0) &&
	    (request->virtual_chapter == (zone->newest_virtual_chapter - 1)) &&
	    (zone->writing_chapter->size > 0)) {
		uds_search_open_chapter(zone->writing_chapter, &request->record_name,
					&request->old_metadata, found);
		return UDS_SUCCESS;
	}

	volume = zone->index->volume;
	if (is_zone_chapter_sparse(zone, request->virtual_chapter) &&
	    uds_sparse_cache_contains(volume->sparse_cache, request->virtual_chapter,
				      request->zone_number))
		return search_sparse_cache_in_zone(zone, request,
						   request->virtual_chapter, found);

	return uds_search_volume_page_cache(volume, request, found);
}

static int put_record_in_zone(struct index_zone *zone, struct uds_request *request,
			      const struct uds_record_data *metadata)
{
	unsigned int remaining;

	remaining = uds_put_open_chapter(zone->open_chapter, &request->record_name,
					 metadata);
	if (remaining == 0)
		return open_next_chapter(zone);

	return UDS_SUCCESS;
}

static int search_index_zone(struct index_zone *zone, struct uds_request *request)
{
	int result;
	struct volume_index_record record;
	bool overflow_record, found = false;
	struct uds_record_data *metadata;
	u64 chapter;

	result = uds_get_volume_index_record(zone->index->volume_index,
					     &request->record_name, &record);
	if (result != UDS_SUCCESS)
		return result;

	if (record.is_found) {
		if (request->requeued && request->virtual_chapter != record.virtual_chapter)
			set_request_location(request, UDS_LOCATION_UNKNOWN);

		request->virtual_chapter = record.virtual_chapter;
		result = get_record_from_zone(zone, request, &found);
		if (result != UDS_SUCCESS)
			return result;
	}

	if (found)
		set_chapter_location(request, zone, record.virtual_chapter);

	/*
	 * If a record has overflowed a chapter index in more than one chapter (or overflowed in
	 * one chapter and collided with an existing record), it will exist as a collision record
	 * in the volume index, but we won't find it in the volume. This case needs special
	 * handling.
	 */
	overflow_record = (record.is_found && record.is_collision && !found);
	chapter = zone->newest_virtual_chapter;
	if (found || overflow_record) {
		if ((request->type == UDS_QUERY_NO_UPDATE) ||
		    ((request->type == UDS_QUERY) && overflow_record)) {
			/* There is nothing left to do. */
			return UDS_SUCCESS;
		}

		if (record.virtual_chapter != chapter) {
			/*
			 * Update the volume index to reference the new chapter for the block. If
			 * the record had been deleted or dropped from the chapter index, it will
			 * be back.
			 */
			result = uds_set_volume_index_record_chapter(&record, chapter);
		} else if (request->type != UDS_UPDATE) {
			/* The record is already in the open chapter. */
			return UDS_SUCCESS;
		}
	} else {
		/*
		 * The record wasn't in the volume index, so check whether the
		 * name is in a cached sparse chapter. If we found the name on
		 * a previous search, use that result instead.
		 */
		if (request->location == UDS_LOCATION_RECORD_PAGE_LOOKUP) {
			found = true;
		} else if (request->location == UDS_LOCATION_UNAVAILABLE) {
			found = false;
		} else if (uds_is_sparse_index_geometry(zone->index->volume->geometry) &&
			   !uds_is_volume_index_sample(zone->index->volume_index,
						       &request->record_name)) {
			result = search_sparse_cache_in_zone(zone, request, NO_CHAPTER,
							     &found);
			if (result != UDS_SUCCESS)
				return result;
		}

		if (found)
			set_request_location(request, UDS_LOCATION_IN_SPARSE);

		if ((request->type == UDS_QUERY_NO_UPDATE) ||
		    ((request->type == UDS_QUERY) && !found)) {
			/* There is nothing left to do. */
			return UDS_SUCCESS;
		}

		/*
		 * Add a new entry to the volume index referencing the open chapter. This needs to
		 * be done both for new records, and for records from cached sparse chapters.
		 */
		result = uds_put_volume_index_record(&record, chapter);
	}

	if (result == UDS_OVERFLOW) {
		/*
		 * The volume index encountered a delta list overflow. The condition was already
		 * logged. We will go on without adding the record to the open chapter.
		 */
		return UDS_SUCCESS;
	}

	if (result != UDS_SUCCESS)
		return result;

	if (!found || (request->type == UDS_UPDATE)) {
		/* This is a new record or we're updating an existing record. */
		metadata = &request->new_metadata;
	} else {
		/* Move the existing record to the open chapter. */
		metadata = &request->old_metadata;
	}

	return put_record_in_zone(zone, request, metadata);
}

static int remove_from_index_zone(struct index_zone *zone, struct uds_request *request)
{
	int result;
	struct volume_index_record record;

	result = uds_get_volume_index_record(zone->index->volume_index,
					     &request->record_name, &record);
	if (result != UDS_SUCCESS)
		return result;

	if (!record.is_found)
		return UDS_SUCCESS;

	/* If the request was requeued, check whether the saved state is still valid. */

	if (record.is_collision) {
		set_chapter_location(request, zone, record.virtual_chapter);
	} else {
		/* Non-collision records are hints, so resolve the name in the chapter. */
		bool found;

		if (request->requeued && request->virtual_chapter != record.virtual_chapter)
			set_request_location(request, UDS_LOCATION_UNKNOWN);

		request->virtual_chapter = record.virtual_chapter;
		result = get_record_from_zone(zone, request, &found);
		if (result != UDS_SUCCESS)
			return result;

		if (!found) {
			/* There is no record to remove. */
			return UDS_SUCCESS;
		}
	}

	set_chapter_location(request, zone, record.virtual_chapter);

	/*
	 * Delete the volume index entry for the named record only. Note that a later search might
	 * later return stale advice if there is a colliding name in the same chapter, but it's a
	 * very rare case (1 in 2^21).
	 */
	result = uds_remove_volume_index_record(&record);
	if (result != UDS_SUCCESS)
		return result;

	/*
	 * If the record is in the open chapter, we must remove it or mark it deleted to avoid
	 * trouble if the record is added again later.
	 */
	if (request->location == UDS_LOCATION_IN_OPEN_CHAPTER)
		uds_remove_from_open_chapter(zone->open_chapter, &request->record_name);

	return UDS_SUCCESS;
}

static int dispatch_index_request(struct uds_index *index, struct uds_request *request)
{
	int result;
	struct index_zone *zone = index->zones[request->zone_number];

	if (!request->requeued) {
		result = simulate_index_zone_barrier_message(zone, request);
		if (result != UDS_SUCCESS)
			return result;
	}

	switch (request->type) {
	case UDS_POST:
	case UDS_UPDATE:
	case UDS_QUERY:
	case UDS_QUERY_NO_UPDATE:
		result = search_index_zone(zone, request);
		break;

	case UDS_DELETE:
		result = remove_from_index_zone(zone, request);
		break;

	default:
		result = vdo_log_warning_strerror(UDS_INVALID_ARGUMENT,
						  "invalid request type: %d",
						  request->type);
		break;
	}

	return result;
}

/* This is the request processing function invoked by each zone's thread. */
static void execute_zone_request(struct uds_request *request)
{
	int result;
	struct uds_index *index = request->index;

	if (request->zone_message.type != UDS_MESSAGE_NONE) {
		result = dispatch_index_zone_control_request(request);
		if (result != UDS_SUCCESS) {
			vdo_log_error_strerror(result, "error executing message: %d",
					       request->zone_message.type);
		}

		/* Once the message is processed it can be freed. */
		vdo_free(vdo_forget(request));
		return;
	}

	index->need_to_save = true;
	if (request->requeued && (request->status != UDS_SUCCESS)) {
		set_request_location(request, UDS_LOCATION_UNAVAILABLE);
		index->callback(request);
		return;
	}

	result = dispatch_index_request(index, request);
	if (result == UDS_QUEUED) {
		/* The request has been requeued so don't let it complete. */
		return;
	}

	if (!request->found)
		set_request_location(request, UDS_LOCATION_UNAVAILABLE);

	request->status = result;
	index->callback(request);
}

static int initialize_index_queues(struct uds_index *index,
				   const struct index_geometry *geometry)
{
	int result;
	unsigned int i;

	for (i = 0; i < index->zone_count; i++) {
		result = uds_make_request_queue("indexW", &execute_zone_request,
						&index->zone_queues[i]);
		if (result != UDS_SUCCESS)
			return result;
	}

	/* The triage queue is only needed for sparse multi-zone indexes. */
	if ((index->zone_count > 1) && uds_is_sparse_index_geometry(geometry)) {
		result = uds_make_request_queue("triageW", &triage_request,
						&index->triage_queue);
		if (result != UDS_SUCCESS)
			return result;
	}

	return UDS_SUCCESS;
}

/* This is the driver function for the chapter writer thread. */
static void close_chapters(void *arg)
{
	int result;
	struct chapter_writer *writer = arg;
	struct uds_index *index = writer->index;

	vdo_log_debug("chapter writer starting");
	mutex_lock(&writer->mutex);
	for (;;) {
		while (writer->zones_to_write < index->zone_count) {
			if (writer->stop && (writer->zones_to_write == 0)) {
				/*
				 * We've been told to stop, and all of the zones are in the same
				 * open chapter, so we can exit now.
				 */
				mutex_unlock(&writer->mutex);
				vdo_log_debug("chapter writer stopping");
				return;
			}
			uds_wait_cond(&writer->cond, &writer->mutex);
		}

		/*
		 * Release the lock while closing a chapter. We probably don't need to do this, but
		 * it seems safer in principle. It's OK to access the chapter and chapter_number
		 * fields without the lock since those aren't allowed to change until we're done.
		 */
		mutex_unlock(&writer->mutex);

		if (index->has_saved_open_chapter) {
			/*
			 * Remove the saved open chapter the first time we close an open chapter
			 * after loading from a clean shutdown, or after doing a clean save. The
			 * lack of the saved open chapter will indicate that a recovery is
			 * necessary.
			 */
			index->has_saved_open_chapter = false;
			result = uds_discard_open_chapter(index->layout);
			if (result == UDS_SUCCESS)
				vdo_log_debug("Discarding saved open chapter");
		}

		result = uds_close_open_chapter(writer->chapters, index->zone_count,
						index->volume,
						writer->open_chapter_index,
						writer->collated_records,
						index->newest_virtual_chapter);

		mutex_lock(&writer->mutex);
		index->newest_virtual_chapter++;
		index->oldest_virtual_chapter +=
			uds_chapters_to_expire(index->volume->geometry,
					       index->newest_virtual_chapter);
		writer->result = result;
		writer->zones_to_write = 0;
		uds_broadcast_cond(&writer->cond);
	}
}

static void stop_chapter_writer(struct chapter_writer *writer)
{
	struct thread *writer_thread = NULL;

	mutex_lock(&writer->mutex);
	if (writer->thread != NULL) {
		writer_thread = writer->thread;
		writer->thread = NULL;
		writer->stop = true;
		uds_broadcast_cond(&writer->cond);
	}
	mutex_unlock(&writer->mutex);

	if (writer_thread != NULL)
		vdo_join_threads(writer_thread);
}

static void free_chapter_writer(struct chapter_writer *writer)
{
	if (writer == NULL)
		return;

	stop_chapter_writer(writer);
	uds_free_open_chapter_index(writer->open_chapter_index);
	vdo_free(writer->collated_records);
	vdo_free(writer);
}

static int make_chapter_writer(struct uds_index *index,
			       struct chapter_writer **writer_ptr)
{
	int result;
	struct chapter_writer *writer;
	size_t collated_records_size =
		(sizeof(struct uds_volume_record) * index->volume->geometry->records_per_chapter);

	result = vdo_allocate_extended(struct chapter_writer, index->zone_count,
				       struct open_chapter_zone *, "Chapter Writer",
				       &writer);
	if (result != VDO_SUCCESS)
		return result;

	writer->index = index;
	mutex_init(&writer->mutex);
	uds_init_cond(&writer->cond);

	result = vdo_allocate_cache_aligned(collated_records_size, "collated records",
					    &writer->collated_records);
	if (result != VDO_SUCCESS) {
		free_chapter_writer(writer);
		return result;
	}

	result = uds_make_open_chapter_index(&writer->open_chapter_index,
					     index->volume->geometry,
					     index->volume->nonce);
	if (result != UDS_SUCCESS) {
		free_chapter_writer(writer);
		return result;
	}

	writer->memory_size = (sizeof(struct chapter_writer) +
			       index->zone_count * sizeof(struct open_chapter_zone *) +
			       collated_records_size +
			       writer->open_chapter_index->memory_size);

	result = vdo_create_thread(close_chapters, writer, "writer", &writer->thread);
	if (result != VDO_SUCCESS) {
		free_chapter_writer(writer);
		return result;
	}

	*writer_ptr = writer;
	return UDS_SUCCESS;
}

static int load_index(struct uds_index *index)
{
	int result;
	u64 last_save_chapter;

	result = uds_load_index_state(index->layout, index);
	if (result != UDS_SUCCESS)
		return UDS_INDEX_NOT_SAVED_CLEANLY;

	last_save_chapter = ((index->last_save != NO_LAST_SAVE) ? index->last_save : 0);

	vdo_log_info("loaded index from chapter %llu through chapter %llu",
		     (unsigned long long) index->oldest_virtual_chapter,
		     (unsigned long long) last_save_chapter);

	return UDS_SUCCESS;
}

static int rebuild_index_page_map(struct uds_index *index, u64 vcn)
{
	int result;
	struct delta_index_page *chapter_index_page;
	struct index_geometry *geometry = index->volume->geometry;
	u32 chapter = uds_map_to_physical_chapter(geometry, vcn);
	u32 expected_list_number = 0;
	u32 index_page_number;
	u32 lowest_delta_list;
	u32 highest_delta_list;

	for (index_page_number = 0;
	     index_page_number < geometry->index_pages_per_chapter;
	     index_page_number++) {
		result = uds_get_volume_index_page(index->volume, chapter,
						   index_page_number,
						   &chapter_index_page);
		if (result != UDS_SUCCESS) {
			return vdo_log_error_strerror(result,
						      "failed to read index page %u in chapter %u",
						      index_page_number, chapter);
		}

		lowest_delta_list = chapter_index_page->lowest_list_number;
		highest_delta_list = chapter_index_page->highest_list_number;
		if (lowest_delta_list != expected_list_number) {
			return vdo_log_error_strerror(UDS_CORRUPT_DATA,
						      "chapter %u index page %u is corrupt",
						      chapter, index_page_number);
		}

		uds_update_index_page_map(index->volume->index_page_map, vcn, chapter,
					  index_page_number, highest_delta_list);
		expected_list_number = highest_delta_list + 1;
	}

	return UDS_SUCCESS;
}

static int replay_record(struct uds_index *index, const struct uds_record_name *name,
			 u64 virtual_chapter, bool will_be_sparse_chapter)
{
	int result;
	struct volume_index_record record;
	bool update_record;

	if (will_be_sparse_chapter &&
	    !uds_is_volume_index_sample(index->volume_index, name)) {
		/*
		 * This entry will be in a sparse chapter after the rebuild completes, and it is
		 * not a sample, so just skip over it.
		 */
		return UDS_SUCCESS;
	}

	result = uds_get_volume_index_record(index->volume_index, name, &record);
	if (result != UDS_SUCCESS)
		return result;

	if (record.is_found) {
		if (record.is_collision) {
			if (record.virtual_chapter == virtual_chapter) {
				/* The record is already correct. */
				return UDS_SUCCESS;
			}

			update_record = true;
		} else if (record.virtual_chapter == virtual_chapter) {
			/*
			 * There is a volume index entry pointing to the current chapter, but we
			 * don't know if it is for the same name as the one we are currently
			 * working on or not. For now, we're just going to assume that it isn't.
			 * This will create one extra collision record if there was a deleted
			 * record in the current chapter.
			 */
			update_record = false;
		} else {
			/*
			 * If we're rebuilding, we don't normally want to go to disk to see if the
			 * record exists, since we will likely have just read the record from disk
			 * (i.e. we know it's there). The exception to this is when we find an
			 * entry in the volume index that has a different chapter. In this case, we
			 * need to search that chapter to determine if the volume index entry was
			 * for the same record or a different one.
			 */
			result = uds_search_volume_page_cache_for_rebuild(index->volume,
									  name,
									  record.virtual_chapter,
									  &update_record);
			if (result != UDS_SUCCESS)
				return result;
			}
	} else {
		update_record = false;
	}

	if (update_record) {
		/*
		 * Update the volume index to reference the new chapter for the block. If the
		 * record had been deleted or dropped from the chapter index, it will be back.
		 */
		result = uds_set_volume_index_record_chapter(&record, virtual_chapter);
	} else {
		/*
		 * Add a new entry to the volume index referencing the open chapter. This should be
		 * done regardless of whether we are a brand new record or a sparse record, i.e.
		 * one that doesn't exist in the index but does on disk, since for a sparse record,
		 * we would want to un-sparsify if it did exist.
		 */
		result = uds_put_volume_index_record(&record, virtual_chapter);
	}

	if ((result == UDS_DUPLICATE_NAME) || (result == UDS_OVERFLOW)) {
		/* The rebuilt index will lose these records. */
		return UDS_SUCCESS;
	}

	return result;
}

static bool check_for_suspend(struct uds_index *index)
{
	bool closing;

	if (index->load_context == NULL)
		return false;

	mutex_lock(&index->load_context->mutex);
	if (index->load_context->status != INDEX_SUSPENDING) {
		mutex_unlock(&index->load_context->mutex);
		return false;
	}

	/* Notify that we are suspended and wait for the resume. */
	index->load_context->status = INDEX_SUSPENDED;
	uds_broadcast_cond(&index->load_context->cond);

	while ((index->load_context->status != INDEX_OPENING) &&
	       (index->load_context->status != INDEX_FREEING))
		uds_wait_cond(&index->load_context->cond, &index->load_context->mutex);

	closing = (index->load_context->status == INDEX_FREEING);
	mutex_unlock(&index->load_context->mutex);
	return closing;
}

static int replay_chapter(struct uds_index *index, u64 virtual, bool sparse)
{
	int result;
	u32 i;
	u32 j;
	const struct index_geometry *geometry;
	u32 physical_chapter;

	if (check_for_suspend(index)) {
		vdo_log_info("Replay interrupted by index shutdown at chapter %llu",
			     (unsigned long long) virtual);
		return -EBUSY;
	}

	geometry = index->volume->geometry;
	physical_chapter = uds_map_to_physical_chapter(geometry, virtual);
	uds_prefetch_volume_chapter(index->volume, physical_chapter);
	uds_set_volume_index_open_chapter(index->volume_index, virtual);

	result = rebuild_index_page_map(index, virtual);
	if (result != UDS_SUCCESS) {
		return vdo_log_error_strerror(result,
					      "could not rebuild index page map for chapter %u",
					      physical_chapter);
	}

	for (i = 0; i < geometry->record_pages_per_chapter; i++) {
		u8 *record_page;
		u32 record_page_number;

		record_page_number = geometry->index_pages_per_chapter + i;
		result = uds_get_volume_record_page(index->volume, physical_chapter,
						    record_page_number, &record_page);
		if (result != UDS_SUCCESS) {
			return vdo_log_error_strerror(result, "could not get page %d",
						      record_page_number);
		}

		for (j = 0; j < geometry->records_per_page; j++) {
			const u8 *name_bytes;
			struct uds_record_name name;

			name_bytes = record_page + (j * BYTES_PER_RECORD);
			memcpy(&name.name, name_bytes, UDS_RECORD_NAME_SIZE);
			result = replay_record(index, &name, virtual, sparse);
			if (result != UDS_SUCCESS)
				return result;
		}
	}

	return UDS_SUCCESS;
}

static int replay_volume(struct uds_index *index)
{
	int result;
	u64 old_map_update;
	u64 new_map_update;
	u64 virtual;
	u64 from_virtual = index->oldest_virtual_chapter;
	u64 upto_virtual = index->newest_virtual_chapter;
	bool will_be_sparse;

	vdo_log_info("Replaying volume from chapter %llu through chapter %llu",
		     (unsigned long long) from_virtual,
		     (unsigned long long) upto_virtual);

	/*
	 * The index failed to load, so the volume index is empty. Add records to the volume index
	 * in order, skipping non-hooks in chapters which will be sparse to save time.
	 *
	 * Go through each record page of each chapter and add the records back to the volume
	 * index. This should not cause anything to be written to either the open chapter or the
	 * on-disk volume. Also skip the on-disk chapter corresponding to upto_virtual, as this
	 * would have already been purged from the volume index when the chapter was opened.
	 *
	 * Also, go through each index page for each chapter and rebuild the index page map.
	 */
	old_map_update = index->volume->index_page_map->last_update;
	for (virtual = from_virtual; virtual < upto_virtual; virtual++) {
		will_be_sparse = uds_is_chapter_sparse(index->volume->geometry,
						       from_virtual, upto_virtual,
						       virtual);
		result = replay_chapter(index, virtual, will_be_sparse);
		if (result != UDS_SUCCESS)
			return result;
	}

	/* Also reap the chapter being replaced by the open chapter. */
	uds_set_volume_index_open_chapter(index->volume_index, upto_virtual);

	new_map_update = index->volume->index_page_map->last_update;
	if (new_map_update != old_map_update) {
		vdo_log_info("replay changed index page map update from %llu to %llu",
			     (unsigned long long) old_map_update,
			     (unsigned long long) new_map_update);
	}

	return UDS_SUCCESS;
}

static int rebuild_index(struct uds_index *index)
{
	int result;
	u64 lowest;
	u64 highest;
	bool is_empty = false;
	u32 chapters_per_volume = index->volume->geometry->chapters_per_volume;

	index->volume->lookup_mode = LOOKUP_FOR_REBUILD;
	result = uds_find_volume_chapter_boundaries(index->volume, &lowest, &highest,
						    &is_empty);
	if (result != UDS_SUCCESS) {
		return vdo_log_fatal_strerror(result,
					      "cannot rebuild index: unknown volume chapter boundaries");
	}

	if (is_empty) {
		index->newest_virtual_chapter = 0;
		index->oldest_virtual_chapter = 0;
		index->volume->lookup_mode = LOOKUP_NORMAL;
		return UDS_SUCCESS;
	}

	index->newest_virtual_chapter = highest + 1;
	index->oldest_virtual_chapter = lowest;
	if (index->newest_virtual_chapter ==
	    (index->oldest_virtual_chapter + chapters_per_volume)) {
		/* Skip the chapter shadowed by the open chapter. */
		index->oldest_virtual_chapter++;
	}

	result = replay_volume(index);
	if (result != UDS_SUCCESS)
		return result;

	index->volume->lookup_mode = LOOKUP_NORMAL;
	return UDS_SUCCESS;
}

static void free_index_zone(struct index_zone *zone)
{
	if (zone == NULL)
		return;

	uds_free_open_chapter(zone->open_chapter);
	uds_free_open_chapter(zone->writing_chapter);
	vdo_free(zone);
}

static int make_index_zone(struct uds_index *index, unsigned int zone_number)
{
	int result;
	struct index_zone *zone;

	result = vdo_allocate(1, struct index_zone, "index zone", &zone);
	if (result != VDO_SUCCESS)
		return result;

	result = uds_make_open_chapter(index->volume->geometry, index->zone_count,
				       &zone->open_chapter);
	if (result != UDS_SUCCESS) {
		free_index_zone(zone);
		return result;
	}

	result = uds_make_open_chapter(index->volume->geometry, index->zone_count,
				       &zone->writing_chapter);
	if (result != UDS_SUCCESS) {
		free_index_zone(zone);
		return result;
	}

	zone->index = index;
	zone->id = zone_number;
	index->zones[zone_number] = zone;

	return UDS_SUCCESS;
}

int uds_make_index(struct uds_configuration *config, enum uds_open_index_type open_type,
		   struct index_load_context *load_context, index_callback_fn callback,
		   struct uds_index **new_index)
{
	int result;
	bool loaded = false;
	bool new = (open_type == UDS_CREATE);
	struct uds_index *index = NULL;
	struct index_zone *zone;
	u64 nonce;
	unsigned int z;

	result = vdo_allocate_extended(struct uds_index, config->zone_count,
				       struct uds_request_queue *, "index", &index);
	if (result != VDO_SUCCESS)
		return result;

	index->zone_count = config->zone_count;

	result = uds_make_index_layout(config, new, &index->layout);
	if (result != UDS_SUCCESS) {
		uds_free_index(index);
		return result;
	}

	result = vdo_allocate(index->zone_count, struct index_zone *, "zones",
			      &index->zones);
	if (result != VDO_SUCCESS) {
		uds_free_index(index);
		return result;
	}

	result = uds_make_volume(config, index->layout, &index->volume);
	if (result != UDS_SUCCESS) {
		uds_free_index(index);
		return result;
	}

	index->volume->lookup_mode = LOOKUP_NORMAL;
	for (z = 0; z < index->zone_count; z++) {
		result = make_index_zone(index, z);
		if (result != UDS_SUCCESS) {
			uds_free_index(index);
			return vdo_log_error_strerror(result,
						      "Could not create index zone");
		}
	}

	nonce = uds_get_volume_nonce(index->layout);
	result = uds_make_volume_index(config, nonce, &index->volume_index);
	if (result != UDS_SUCCESS) {
		uds_free_index(index);
		return vdo_log_error_strerror(result, "could not make volume index");
	}

	index->load_context = load_context;
	index->callback = callback;

	result = initialize_index_queues(index, config->geometry);
	if (result != UDS_SUCCESS) {
		uds_free_index(index);
		return result;
	}

	result = make_chapter_writer(index, &index->chapter_writer);
	if (result != UDS_SUCCESS) {
		uds_free_index(index);
		return result;
	}

	if (!new) {
		result = load_index(index);
		switch (result) {
		case UDS_SUCCESS:
			loaded = true;
			break;
		case -ENOMEM:
			/* We should not try a rebuild for this error. */
			vdo_log_error_strerror(result, "index could not be loaded");
			break;
		default:
			vdo_log_error_strerror(result, "index could not be loaded");
			if (open_type == UDS_LOAD) {
				result = rebuild_index(index);
				if (result != UDS_SUCCESS) {
					vdo_log_error_strerror(result,
							       "index could not be rebuilt");
				}
			}
			break;
		}
	}

	if (result != UDS_SUCCESS) {
		uds_free_index(index);
		return vdo_log_error_strerror(result, "fatal error in %s()", __func__);
	}

	for (z = 0; z < index->zone_count; z++) {
		zone = index->zones[z];
		zone->oldest_virtual_chapter = index->oldest_virtual_chapter;
		zone->newest_virtual_chapter = index->newest_virtual_chapter;
	}

	if (index->load_context != NULL) {
		mutex_lock(&index->load_context->mutex);
		index->load_context->status = INDEX_READY;
		/*
		 * If we get here, suspend is meaningless, but notify any thread trying to suspend
		 * us so it doesn't hang.
		 */
		uds_broadcast_cond(&index->load_context->cond);
		mutex_unlock(&index->load_context->mutex);
	}

	index->has_saved_open_chapter = loaded;
	index->need_to_save = !loaded;
	*new_index = index;
	return UDS_SUCCESS;
}

void uds_free_index(struct uds_index *index)
{
	unsigned int i;

	if (index == NULL)
		return;

	uds_request_queue_finish(index->triage_queue);
	for (i = 0; i < index->zone_count; i++)
		uds_request_queue_finish(index->zone_queues[i]);

	free_chapter_writer(index->chapter_writer);

	uds_free_volume_index(index->volume_index);
	if (index->zones != NULL) {
		for (i = 0; i < index->zone_count; i++)
			free_index_zone(index->zones[i]);
		vdo_free(index->zones);
	}

	uds_free_volume(index->volume);
	uds_free_index_layout(vdo_forget(index->layout));
	vdo_free(index);
}

/* Wait for the chapter writer to complete any outstanding writes. */
void uds_wait_for_idle_index(struct uds_index *index)
{
	struct chapter_writer *writer = index->chapter_writer;

	mutex_lock(&writer->mutex);
	while (writer->zones_to_write > 0)
		uds_wait_cond(&writer->cond, &writer->mutex);
	mutex_unlock(&writer->mutex);
}

/* This function assumes that all requests have been drained. */
int uds_save_index(struct uds_index *index)
{
	int result;

	if (!index->need_to_save)
		return UDS_SUCCESS;

	uds_wait_for_idle_index(index);
	index->prev_save = index->last_save;
	index->last_save = ((index->newest_virtual_chapter == 0) ?
			    NO_LAST_SAVE : index->newest_virtual_chapter - 1);
	vdo_log_info("beginning save (vcn %llu)", (unsigned long long) index->last_save);

	result = uds_save_index_state(index->layout, index);
	if (result != UDS_SUCCESS) {
		vdo_log_info("save index failed");
		index->last_save = index->prev_save;
	} else {
		index->has_saved_open_chapter = true;
		index->need_to_save = false;
		vdo_log_info("finished save (vcn %llu)",
			     (unsigned long long) index->last_save);
	}

	return result;
}

int uds_replace_index_storage(struct uds_index *index, struct block_device *bdev)
{
	return uds_replace_volume_storage(index->volume, index->layout, bdev);
}

/* Accessing statistics should be safe from any thread. */
void uds_get_index_stats(struct uds_index *index, struct uds_index_stats *counters)
{
	struct volume_index_stats stats;

	uds_get_volume_index_stats(index->volume_index, &stats);
	counters->entries_indexed = stats.record_count;
	counters->collisions = stats.collision_count;
	counters->entries_discarded = stats.discard_count;

	counters->memory_used = (index->volume_index->memory_size +
				 index->volume->cache_size +
				 index->chapter_writer->memory_size);
}

void uds_enqueue_request(struct uds_request *request, enum request_stage stage)
{
	struct uds_index *index = request->index;
	struct uds_request_queue *queue;

	switch (stage) {
	case STAGE_TRIAGE:
		if (index->triage_queue != NULL) {
			queue = index->triage_queue;
			break;
		}

		fallthrough;

	case STAGE_INDEX:
		request->zone_number =
			uds_get_volume_index_zone(index->volume_index, &request->record_name);
		fallthrough;

	case STAGE_MESSAGE:
		queue = index->zone_queues[request->zone_number];
		break;

	default:
		VDO_ASSERT_LOG_ONLY(false, "invalid index stage: %d", stage);
		return;
	}

	uds_request_queue_enqueue(queue, request);
}