/* * Copyright (C) 1996-2023 The Squid Software Foundation and contributors * * Squid software is distributed under GPLv2+ license and includes * contributions from numerous individuals and organizations. * Please see the COPYING and CONTRIBUTORS files for details. */ /* DEBUG: section 47 Store Directory Routines */ #include "squid.h" #include "base/IoManip.h" #include "cache_cf.h" #include "CollapsedForwarding.h" #include "ConfigOption.h" #include "DiskIO/DiskIOModule.h" #include "DiskIO/DiskIOStrategy.h" #include "DiskIO/ReadRequest.h" #include "DiskIO/WriteRequest.h" #include "fs/rock/RockHeaderUpdater.h" #include "fs/rock/RockIoRequests.h" #include "fs/rock/RockIoState.h" #include "fs/rock/RockSwapDir.h" #include "globals.h" #include "ipc/mem/Pages.h" #include "MemObject.h" #include "Parsing.h" #include "SquidConfig.h" #include "SquidMath.h" #include "tools.h" #include #include #include #if HAVE_SYS_STAT_H #include #endif Rock::SwapDir::SwapDir(): ::SwapDir("rock"), slotSize(HeaderSize), filePath(nullptr), map(nullptr), io(nullptr), waitingForPage(nullptr) { } Rock::SwapDir::~SwapDir() { delete io; delete map; safe_free(filePath); } // called when Squid core needs a StoreEntry with a given key StoreEntry * Rock::SwapDir::get(const cache_key *key) { if (!map || !theFile || !theFile->canRead()) return nullptr; sfileno filen; const Ipc::StoreMapAnchor *const slot = map->openForReading(key, filen); if (!slot) return nullptr; // create a brand new store entry and initialize it with stored basics StoreEntry *e = new StoreEntry(); e->createMemObject(); anchorEntry(*e, filen, *slot); trackReferences(*e); return e; } bool Rock::SwapDir::anchorToCache(StoreEntry &entry) { Assure(!entry.hasDisk()); if (!map || !theFile || !theFile->canRead()) return false; sfileno filen; const Ipc::StoreMapAnchor *const slot = map->openForReading( reinterpret_cast(entry.key), filen); if (!slot) return false; anchorEntry(entry, filen, *slot); return true; } bool Rock::SwapDir::updateAnchored(StoreEntry &entry) { if (!map || !theFile || !theFile->canRead()) return false; assert(entry.hasDisk(index)); const auto &anchor = map->readableEntry(entry.swap_filen); entry.swap_file_sz = anchor.basics.swap_file_sz; return true; } void Rock::SwapDir::anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor) { anchor.exportInto(e); const bool complete = anchor.complete(); e.store_status = complete ? STORE_OK : STORE_PENDING; // SWAPOUT_WRITING: even though another worker writes? e.attachToDisk(index, filen, complete ? SWAPOUT_DONE : SWAPOUT_WRITING); e.ping_status = PING_NONE; EBIT_SET(e.flags, ENTRY_VALIDATED); } void Rock::SwapDir::disconnect(StoreEntry &e) { assert(e.hasDisk(index)); ignoreReferences(e); // do not rely on e.swap_status here because there is an async delay // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE. // since e has swap_filen, its slot is locked for reading and/or writing // but it is difficult to know whether THIS worker is reading or writing e, // especially since we may switch from writing to reading. This code relies // on Rock::IoState::writeableAnchor_ being set when we locked for writing. if (e.mem_obj && e.mem_obj->swapout.sio != nullptr && dynamic_cast(*e.mem_obj->swapout.sio).writeableAnchor_) { map->abortWriting(e.swap_filen); e.detachFromDisk(); dynamic_cast(*e.mem_obj->swapout.sio).writeableAnchor_ = nullptr; CollapsedForwarding::Broadcast(e); e.storeWriterDone(); } else { map->closeForReading(e.swap_filen); e.detachFromDisk(); } } uint64_t Rock::SwapDir::currentSize() const { const uint64_t spaceSize = !freeSlots ? maxSize() : (slotSize * freeSlots->size()); // everything that is not free is in use return maxSize() - spaceSize; } uint64_t Rock::SwapDir::currentCount() const { return map ? map->entryCount() : 0; } /// In SMP mode only the disker process reports stats to avoid /// counting the same stats by multiple processes. bool Rock::SwapDir::doReportStat() const { return ::SwapDir::doReportStat() && (!UsingSmp() || IamDiskProcess()); } void Rock::SwapDir::finalizeSwapoutSuccess(const StoreEntry &e) { // nothing to do; handleWriteCompletionSuccess() did everything for us assert(!e.mem_obj || !e.mem_obj->swapout.sio || !dynamic_cast(*e.mem_obj->swapout.sio).writeableAnchor_); } void Rock::SwapDir::finalizeSwapoutFailure(StoreEntry &entry) { debugs(47, 5, entry); disconnect(entry); // calls abortWriting() to free the disk entry } int64_t Rock::SwapDir::slotLimitAbsolute() const { // the max value is an invalid one; all values must be below the limit assert(std::numeric_limits::max() == std::numeric_limits::max()); return std::numeric_limits::max(); } int64_t Rock::SwapDir::slotLimitActual() const { const int64_t sWanted = (maxSize() - HeaderSize)/slotSize; const int64_t sLimitLo = map ? map->sliceLimit() : 0; // dynamic shrinking unsupported const int64_t sLimitHi = slotLimitAbsolute(); return min(max(sLimitLo, sWanted), sLimitHi); } int64_t Rock::SwapDir::entryLimitActual() const { return min(slotLimitActual(), entryLimitAbsolute()); } // TODO: encapsulate as a tool void Rock::SwapDir::create() { assert(path); assert(filePath); if (UsingSmp() && !IamDiskProcess()) { debugs (47,3, "disker will create in " << path); return; } debugs (47,3, "creating in " << path); struct stat dir_sb; if (::stat(path, &dir_sb) == 0) { struct stat file_sb; if (::stat(filePath, &file_sb) == 0) { debugs (47, DBG_IMPORTANT, "Skipping existing Rock db: " << filePath); return; } // else the db file is not there or is not accessible, and we will try // to create it later below, generating a detailed error on failures. } else { // path does not exist or is inaccessible // If path exists but is not accessible, mkdir() below will fail, and // the admin should see the error and act accordingly, so there is // no need to distinguish ENOENT from other possible stat() errors. debugs (47, DBG_IMPORTANT, "Creating Rock db directory: " << path); const int res = mkdir(path, 0700); if (res != 0) createError("mkdir"); } debugs (47, DBG_IMPORTANT, "Creating Rock db: " << filePath); const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600); if (swap < 0) createError("create"); #if SLOWLY_FILL_WITH_ZEROS char block[1024]; Must(maxSize() % sizeof(block) == 0); memset(block, '\0', sizeof(block)); for (off_t offset = 0; offset < maxSize(); offset += sizeof(block)) { if (write(swap, block, sizeof(block)) != sizeof(block)) createError("write"); } #else if (ftruncate(swap, maxSize()) != 0) createError("truncate"); char header[HeaderSize]; memset(header, '\0', sizeof(header)); if (write(swap, header, sizeof(header)) != sizeof(header)) createError("write"); #endif close(swap); } // report Rock DB creation error and exit void Rock::SwapDir::createError(const char *const msg) { int xerrno = errno; // XXX: where does errno come from? debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " << filePath << "; " << msg << " error: " << xstrerr(xerrno)); fatal("Rock Store db creation error"); } void Rock::SwapDir::init() { debugs(47,2, MYNAME); // XXX: SwapDirs aren't refcounted. We make IORequestor calls, which // are refcounted. We up our count once to avoid implicit delete's. lock(); freeSlots = shm_old(Ipc::Mem::PageStack)(freeSlotsPath()); Must(!map); map = new DirMap(inodeMapPath()); map->cleaner = this; const char *ioModule = needsDiskStrand() ? "IpcIo" : "Blocking"; if (DiskIOModule *m = DiskIOModule::Find(ioModule)) { debugs(47,2, "Using DiskIO module: " << ioModule); io = m->createStrategy(); io->init(); } else { debugs(47, DBG_CRITICAL, "FATAL: Rock store is missing DiskIO module: " << ioModule); fatal("Rock Store missing a required DiskIO module"); } theFile = io->newFile(filePath); theFile->configure(fileConfig); theFile->open(O_RDWR, 0644, this); } bool Rock::SwapDir::needsDiskStrand() const { const bool wontEvenWorkWithoutDisker = Config.workers > 1; const bool wouldWorkBetterWithDisker = DiskIOModule::Find("IpcIo"); return InDaemonMode() && (wontEvenWorkWithoutDisker || wouldWorkBetterWithDisker); } void Rock::SwapDir::parse(int anIndex, char *aPath) { index = anIndex; path = xstrdup(aPath); // cache store is located at path/db String fname(path); fname.append("/rock"); filePath = xstrdup(fname.termedBuf()); parseSize(false); parseOptions(0); // Current openForWriting() code overwrites the old slot if needed // and possible, so proactively removing old slots is probably useless. assert(!repl); // repl = createRemovalPolicy(Config.replPolicy); validateOptions(); } void Rock::SwapDir::reconfigure() { parseSize(true); parseOptions(1); // TODO: can we reconfigure the replacement policy (repl)? validateOptions(); } /// parse maximum db disk size void Rock::SwapDir::parseSize(const bool reconfig) { const int i = GetInteger(); if (i < 0) fatal("negative Rock cache_dir size value"); const uint64_t new_max_size = static_cast(i) << 20; // MBytes to Bytes if (!reconfig) max_size = new_max_size; else if (new_max_size != max_size) { debugs(3, DBG_IMPORTANT, "WARNING: cache_dir '" << path << "' size " "cannot be changed dynamically, value left unchanged (" << (max_size >> 20) << " MB)"); } } ConfigOption * Rock::SwapDir::getOptionTree() const { ConfigOption *copt = ::SwapDir::getOptionTree(); ConfigOptionVector *vector = dynamic_cast(copt); if (vector) { // if copt is actually a ConfigOptionVector vector->options.push_back(new ConfigOptionAdapter(*const_cast(this), &SwapDir::parseSizeOption, &SwapDir::dumpSizeOption)); vector->options.push_back(new ConfigOptionAdapter(*const_cast(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption)); vector->options.push_back(new ConfigOptionAdapter(*const_cast(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption)); } else { // we don't know how to handle copt, as it's not a ConfigOptionVector. // free it (and return nullptr) delete copt; copt = nullptr; } return copt; } bool Rock::SwapDir::allowOptionReconfigure(const char *const option) const { return strcmp(option, "slot-size") != 0 && ::SwapDir::allowOptionReconfigure(option); } /// parses time-specific options; mimics ::SwapDir::optionObjectSizeParse() bool Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconfig) { // TODO: ::SwapDir or, better, Config should provide time-parsing routines, // including time unit handling. Same for size and rate. time_msec_t *storedTime; if (strcmp(option, "swap-timeout") == 0) storedTime = &fileConfig.ioTimeout; else return false; if (!value) { self_destruct(); return false; } // TODO: handle time units and detect parsing errors better const int64_t parsedValue = strtoll(value, nullptr, 10); if (parsedValue < 0) { debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue); self_destruct(); return false; } const time_msec_t newTime = static_cast(parsedValue); if (!reconfig) *storedTime = newTime; else if (*storedTime != newTime) { debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option << " cannot be changed dynamically, value left unchanged: " << *storedTime); } return true; } /// reports time-specific options; mimics ::SwapDir::optionObjectSizeDump() void Rock::SwapDir::dumpTimeOption(StoreEntry * e) const { if (fileConfig.ioTimeout) storeAppendPrintf(e, " swap-timeout=%" PRId64, static_cast(fileConfig.ioTimeout)); } /// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse() bool Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaReconfig) { int *storedRate; if (strcmp(option, "max-swap-rate") == 0) storedRate = &fileConfig.ioRate; else return false; if (!value) { self_destruct(); return false; } // TODO: handle time units and detect parsing errors better const int64_t parsedValue = strtoll(value, nullptr, 10); if (parsedValue < 0) { debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue); self_destruct(); return false; } const int newRate = static_cast(parsedValue); if (newRate < 0) { debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate); self_destruct(); return false; } if (!isaReconfig) *storedRate = newRate; else if (*storedRate != newRate) { debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option << " cannot be changed dynamically, value left unchanged: " << *storedRate); } return true; } /// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump() void Rock::SwapDir::dumpRateOption(StoreEntry * e) const { if (fileConfig.ioRate >= 0) storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate); } /// parses size-specific options; mimics ::SwapDir::optionObjectSizeParse() bool Rock::SwapDir::parseSizeOption(char const *option, const char *value, int reconfig) { uint64_t *storedSize; if (strcmp(option, "slot-size") == 0) storedSize = &slotSize; else return false; if (!value) { self_destruct(); return false; } // TODO: handle size units and detect parsing errors better const uint64_t newSize = strtoll(value, nullptr, 10); if (newSize <= 0) { debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must be positive; got: " << newSize); self_destruct(); return false; } if (newSize <= sizeof(DbCellHeader)) { debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must exceed " << sizeof(DbCellHeader) << "; got: " << newSize); self_destruct(); return false; } if (!reconfig) *storedSize = newSize; else if (*storedSize != newSize) { debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option << " cannot be changed dynamically, value left unchanged: " << *storedSize); } return true; } /// reports size-specific options; mimics ::SwapDir::optionObjectSizeDump() void Rock::SwapDir::dumpSizeOption(StoreEntry * e) const { storeAppendPrintf(e, " slot-size=%" PRId64, slotSize); } /// check the results of the configuration; only level-0 debugging works here void Rock::SwapDir::validateOptions() { if (slotSize <= 0) fatal("Rock store requires a positive slot-size"); const int64_t maxSizeRoundingWaste = 1024 * 1024; // size is configured in MB const int64_t slotSizeRoundingWaste = slotSize; const int64_t maxRoundingWaste = max(maxSizeRoundingWaste, slotSizeRoundingWaste); // an entry consumes at least one slot; round up to reduce false warnings const int64_t blockSize = static_cast(slotSize); const int64_t maxObjSize = max(blockSize, ((maxObjectSize()+blockSize-1)/blockSize)*blockSize); // Does the "sfileno*max-size" limit match configured db capacity? const double entriesMayOccupy = entryLimitAbsolute()*static_cast(maxObjSize); if (entriesMayOccupy + maxRoundingWaste < maxSize()) { const int64_t diskWasteSize = maxSize() - static_cast(entriesMayOccupy); debugs(47, DBG_CRITICAL, "WARNING: Rock cache_dir " << path << " wastes disk space due to entry limits:" << "\n\tconfigured db capacity: " << maxSize() << " bytes" << "\n\tconfigured db slot size: " << slotSize << " bytes" << "\n\tconfigured maximum entry size: " << maxObjectSize() << " bytes" << "\n\tmaximum number of cache_dir entries supported by Squid: " << entryLimitAbsolute() << "\n\tdisk space all entries may use: " << entriesMayOccupy << " bytes" << "\n\tdisk space wasted: " << diskWasteSize << " bytes"); } // Does the "absolute slot count" limit match configured db capacity? const double slotsMayOccupy = slotLimitAbsolute()*static_cast(slotSize); if (slotsMayOccupy + maxRoundingWaste < maxSize()) { const int64_t diskWasteSize = maxSize() - static_cast(entriesMayOccupy); debugs(47, DBG_CRITICAL, "WARNING: Rock cache_dir " << path << " wastes disk space due to slot limits:" << "\n\tconfigured db capacity: " << maxSize() << " bytes" << "\n\tconfigured db slot size: " << slotSize << " bytes" << "\n\tmaximum number of rock cache_dir slots supported by Squid: " << slotLimitAbsolute() << "\n\tdisk space all slots may use: " << slotsMayOccupy << " bytes" << "\n\tdisk space wasted: " << diskWasteSize << " bytes"); } } bool Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const { if (diskSpaceNeeded >= 0) diskSpaceNeeded += sizeof(DbCellHeader); if (!::SwapDir::canStore(e, diskSpaceNeeded, load)) return false; if (!theFile || !theFile->canWrite()) return false; if (!map) return false; // Do not start I/O transaction if there are less than 10% free pages left. // TODO: reserve page instead if (needsDiskStrand() && Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) { debugs(47, 5, "too few shared pages for IPC I/O left"); return false; } if (io->shedLoad()) return false; load = io->load(); return true; } StoreIOState::Pointer Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STIOCB * const cbIo, void * const cbData) { if (!theFile || theFile->error()) { debugs(47,4, theFile); return nullptr; } sfileno filen; Ipc::StoreMapAnchor *const slot = map->openForWriting(reinterpret_cast(e.key), filen); if (!slot) { debugs(47, 5, "map->add failed"); return nullptr; } assert(filen >= 0); slot->set(e); // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno. // If that does not happen, the entry will not decrement the read level! Rock::SwapDir::Pointer self(this); IoState *sio = new IoState(self, &e, cbIo, cbData); sio->swap_dirn = index; sio->swap_filen = filen; sio->writeableAnchor_ = slot; debugs(47,5, "dir " << index << " created new filen " << asHex(sio->swap_filen).upperCase().minDigits(8) << " starting at " << diskOffset(sio->swap_filen)); sio->file(theFile); trackReferences(e); return sio; } StoreIOState::Pointer Rock::SwapDir::createUpdateIO(const Ipc::StoreMapUpdate &update, StoreIOState::STIOCB *cbIo, void *data) { if (!theFile || theFile->error()) { debugs(47,4, theFile); return nullptr; } Must(update.fresh); Must(update.fresh.fileNo >= 0); Rock::SwapDir::Pointer self(this); IoState *sio = new IoState(self, update.entry, cbIo, data); sio->swap_dirn = index; sio->swap_filen = update.fresh.fileNo; sio->writeableAnchor_ = update.fresh.anchor; debugs(47,5, "dir " << index << " updating filen " << asHex(sio->swap_filen).upperCase().minDigits(8) << " starting at " << diskOffset(sio->swap_filen)); sio->file(theFile); return sio; } int64_t Rock::SwapDir::diskOffset(const SlotId sid) const { assert(sid >= 0); return HeaderSize + slotSize*sid; } int64_t Rock::SwapDir::diskOffset(Ipc::Mem::PageId &pageId) const { assert(pageId); return diskOffset(pageId.number - 1); } int64_t Rock::SwapDir::diskOffsetLimit() const { assert(map); return diskOffset(map->sliceLimit()); } Rock::SlotId Rock::SwapDir::reserveSlotForWriting() { Ipc::Mem::PageId pageId; if (freeSlots->pop(pageId)) { const auto slotId = pageId.number - 1; debugs(47, 5, "got a previously free slot: " << slotId); map->prepFreeSlice(slotId); return slotId; } // catch free slots delivered to noteFreeMapSlice() assert(!waitingForPage); waitingForPage = &pageId; if (map->purgeOne()) { assert(!waitingForPage); // noteFreeMapSlice() should have cleared it assert(pageId.set()); const auto slotId = pageId.number - 1; debugs(47, 5, "got a previously busy slot: " << slotId); map->prepFreeSlice(slotId); return slotId; } assert(waitingForPage == &pageId); waitingForPage = nullptr; // This may happen when the number of available db slots is close to the // number of concurrent requests reading or writing those slots, which may // happen when the db is "small" compared to the request traffic OR when we // are rebuilding and have not loaded "many" entries or empty slots yet. debugs(47, 3, "cannot get a slot; entries: " << map->entryCount()); throw TexcHere("ran out of free db slots"); } bool Rock::SwapDir::validSlotId(const SlotId slotId) const { return 0 <= slotId && slotId < slotLimitActual(); } void Rock::SwapDir::noteFreeMapSlice(const Ipc::StoreMapSliceId sliceId) { Ipc::Mem::PageId pageId; pageId.pool = Ipc::Mem::PageStack::IdForSwapDirSpace(index); pageId.number = sliceId+1; if (waitingForPage) { *waitingForPage = pageId; waitingForPage = nullptr; } else { freeSlots->push(pageId); } } // tries to open an old entry with swap_filen for reading StoreIOState::Pointer Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STIOCB * const cbIo, void * const cbData) { if (!theFile || theFile->error()) { debugs(47,4, theFile); return nullptr; } if (!e.hasDisk()) { debugs(47,4, e); return nullptr; } // Do not start I/O transaction if there are less than 10% free pages left. // TODO: reserve page instead if (needsDiskStrand() && Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) { debugs(47, 5, "too few shared pages for IPC I/O left"); return nullptr; } // The are two ways an entry can get swap_filen: our get() locked it for // reading or our storeSwapOutStart() locked it for writing. Peeking at our // locked entry is safe, but no support for reading the entry we swap out. const Ipc::StoreMapAnchor *slot = map->peekAtReader(e.swap_filen); if (!slot) return nullptr; // we were writing after all Rock::SwapDir::Pointer self(this); IoState *sio = new IoState(self, &e, cbIo, cbData); sio->swap_dirn = index; sio->swap_filen = e.swap_filen; sio->readableAnchor_ = slot; sio->file(theFile); debugs(47,5, "dir " << index << " has old filen: " << asHex(sio->swap_filen).upperCase().minDigits(8)); // When StoreEntry::swap_filen for e was set by our anchorEntry(), e had a // public key, but it could have gone private since then (while keeping the // anchor lock). The stale anchor key is not (and cannot be) erased (until // the marked-for-deletion/release anchor/entry is unlocked is recycled). const auto ourAnchor = [&]() { if (const auto publicKey = e.publicKey()) return slot->sameKey(publicKey); return true; // cannot check }; assert(ourAnchor()); // For collapsed disk hits: e.swap_file_sz and slot->basics.swap_file_sz // may still be zero and basics.swap_file_sz may grow. assert(slot->basics.swap_file_sz >= e.swap_file_sz); return sio; } void Rock::SwapDir::ioCompletedNotification() { if (!theFile) fatalf("Rock cache_dir failed to initialize db file: %s", filePath); if (theFile->error()) { int xerrno = errno; // XXX: where does errno come from fatalf("Rock cache_dir at %s failed to open db file: %s", filePath, xstrerr(xerrno)); } debugs(47, 2, "Rock cache_dir[" << index << "] limits: " << std::setw(12) << maxSize() << " disk bytes, " << std::setw(7) << map->entryLimit() << " entries, and " << std::setw(7) << map->sliceLimit() << " slots"); if (!Rebuild::Start(*this)) storeRebuildComplete(nullptr); } void Rock::SwapDir::closeCompleted() { theFile = nullptr; } void Rock::SwapDir::readCompleted(const char *, int rlen, int errflag, RefCount< ::ReadRequest> r) { ReadRequest *request = dynamic_cast(r.getRaw()); assert(request); IoState::Pointer sio = request->sio; sio->handleReadCompletion(*request, rlen, errflag); } void Rock::SwapDir::writeCompleted(int errflag, size_t, RefCount< ::WriteRequest> r) { // TODO: Move details into IoState::handleWriteCompletion() after figuring // out how to deal with map access. See readCompleted(). Rock::WriteRequest *request = dynamic_cast(r.getRaw()); assert(request); assert(request->sio != nullptr); IoState &sio = *request->sio; // quit if somebody called IoState::close() while we were waiting if (!sio.stillWaiting()) { debugs(79, 3, "ignoring closed entry " << sio.swap_filen); noteFreeMapSlice(request->sidCurrent); return; } debugs(79, 7, "errflag=" << errflag << " rlen=" << request->len << " eof=" << request->eof); if (errflag != DISK_OK) handleWriteCompletionProblem(errflag, *request); else if (!sio.expectedReply(request->id)) handleWriteCompletionProblem(DISK_ERROR, *request); else handleWriteCompletionSuccess(*request); if (sio.touchingStoreEntry()) CollapsedForwarding::Broadcast(*sio.e); } /// code shared by writeCompleted() success handling cases void Rock::SwapDir::handleWriteCompletionSuccess(const WriteRequest &request) { auto &sio = *(request.sio); sio.splicingPoint = request.sidCurrent; // do not increment sio.offset_ because we do it in sio->write() assert(sio.writeableAnchor_); if (sio.writeableAnchor_->start < 0) { // wrote the first slot Must(request.sidPrevious < 0); sio.writeableAnchor_->start = request.sidCurrent; } else { Must(request.sidPrevious >= 0); map->writeableSlice(sio.swap_filen, request.sidPrevious).next = request.sidCurrent; } // finalize the shared slice info after writing slice contents to disk; // the chain gets possession of the slice we were writing Ipc::StoreMap::Slice &slice = map->writeableSlice(sio.swap_filen, request.sidCurrent); slice.size = request.len - sizeof(DbCellHeader); Must(slice.next < 0); if (request.eof) { assert(sio.e); if (sio.touchingStoreEntry()) { sio.e->swap_file_sz = sio.writeableAnchor_->basics.swap_file_sz = sio.offset_; map->switchWritingToReading(sio.swap_filen); // sio.e keeps the (now read) lock on the anchor // storeSwapOutFileClosed() sets swap_status and calls storeWriterDone() } sio.writeableAnchor_ = nullptr; sio.finishedWriting(DISK_OK); } } /// code shared by writeCompleted() error handling cases void Rock::SwapDir::handleWriteCompletionProblem(const int errflag, const WriteRequest &request) { auto &sio = *request.sio; noteFreeMapSlice(request.sidCurrent); writeError(sio); sio.finishedWriting(errflag); // and hope that Core will call disconnect() to close the map entry } void Rock::SwapDir::writeError(StoreIOState &sio) { // Do not abortWriting here. The entry should keep the write lock // instead of losing association with the store and confusing core. map->freeEntry(sio.swap_filen); // will mark as unusable, just in case if (sio.touchingStoreEntry()) CollapsedForwarding::Broadcast(*sio.e); // else noop: a fresh entry update error does not affect stale entry readers // All callers must also call IoState callback, to propagate the error. } void Rock::SwapDir::updateHeaders(StoreEntry *updatedE) { if (!map) return; Ipc::StoreMapUpdate update(updatedE); if (!map->openForUpdating(update, updatedE->swap_filen)) return; try { AsyncJob::Start(new HeaderUpdater(this, update)); } catch (const std::exception &ex) { debugs(20, 2, "error starting to update entry " << *updatedE << ": " << ex.what()); map->abortUpdating(update); } } bool Rock::SwapDir::full() const { return freeSlots != nullptr && !freeSlots->size(); } // storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT, // but it should not happen for us void Rock::SwapDir::diskFull() { debugs(20, DBG_IMPORTANT, "ERROR: Squid BUG: No space left with rock cache_dir: " << filePath); } /// purge while full(); it should be sufficient to purge just one void Rock::SwapDir::maintain() { // The Store calls this to free some db space, but there is nothing wrong // with a full() db, except when db has to shrink after reconfigure, and // we do not support shrinking yet (it would have to purge specific slots). // TODO: Disable maintain() requests when they are pointless. } void Rock::SwapDir::reference(StoreEntry &e) { debugs(47, 5, &e << ' ' << e.swap_dirn << ' ' << e.swap_filen); if (repl && repl->Referenced) repl->Referenced(repl, &e, &e.repl); } bool Rock::SwapDir::dereference(StoreEntry &e) { debugs(47, 5, &e << ' ' << e.swap_dirn << ' ' << e.swap_filen); if (repl && repl->Dereferenced) repl->Dereferenced(repl, &e, &e.repl); // no need to keep e in the global store_table for us; we have our own map return false; } bool Rock::SwapDir::unlinkdUseful() const { // no entry-specific files to unlink return false; } void Rock::SwapDir::evictIfFound(const cache_key *key) { if (map) map->freeEntryByKey(key); // may not be there } void Rock::SwapDir::evictCached(StoreEntry &e) { debugs(47, 5, e); if (e.hasDisk(index)) { if (map->freeEntry(e.swap_filen)) CollapsedForwarding::Broadcast(e); if (!e.locked()) disconnect(e); } else if (const auto key = e.publicKey()) { evictIfFound(key); } } void Rock::SwapDir::trackReferences(StoreEntry &e) { debugs(47, 5, e); if (repl) repl->Add(repl, &e, &e.repl); } void Rock::SwapDir::ignoreReferences(StoreEntry &e) { debugs(47, 5, e); if (repl) repl->Remove(repl, &e, &e.repl); } void Rock::SwapDir::statfs(StoreEntry &e) const { storeAppendPrintf(&e, "\n"); storeAppendPrintf(&e, "Maximum Size: %" PRIu64 " KB\n", maxSize() >> 10); storeAppendPrintf(&e, "Current Size: %.2f KB %.2f%%\n", currentSize() / 1024.0, Math::doublePercent(currentSize(), maxSize())); const int entryLimit = entryLimitActual(); const int slotLimit = slotLimitActual(); storeAppendPrintf(&e, "Maximum entries: %9d\n", entryLimit); if (map && entryLimit > 0) { const int entryCount = map->entryCount(); storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n", entryCount, (100.0 * entryCount / entryLimit)); } storeAppendPrintf(&e, "Maximum slots: %9d\n", slotLimit); if (map && slotLimit > 0) { const unsigned int slotsFree = !freeSlots ? 0 : freeSlots->size(); if (slotsFree <= static_cast(slotLimit)) { const int usedSlots = slotLimit - static_cast(slotsFree); storeAppendPrintf(&e, "Used slots: %9d %.2f%%\n", usedSlots, (100.0 * usedSlots / slotLimit)); } if (slotLimit < 100) { // XXX: otherwise too expensive to count Ipc::ReadWriteLockStats stats; map->updateStats(stats); stats.dump(e); } } storeAppendPrintf(&e, "Pending operations: %d out of %d\n", store_open_disk_fd, Config.max_open_disk_fds); storeAppendPrintf(&e, "Flags:"); if (flags.selected) storeAppendPrintf(&e, " SELECTED"); if (flags.read_only) storeAppendPrintf(&e, " READ-ONLY"); storeAppendPrintf(&e, "\n"); } SBuf Rock::SwapDir::inodeMapPath() const { return Ipc::Mem::Segment::Name(SBuf(path), "map"); } const char * Rock::SwapDir::freeSlotsPath() const { static String spacesPath; spacesPath = path; spacesPath.append("_spaces"); return spacesPath.termedBuf(); } bool Rock::SwapDir::hasReadableEntry(const StoreEntry &e) const { return map->hasReadableEntry(reinterpret_cast(e.key)); } DefineRunnerRegistratorIn(Rock, SwapDirRr); void Rock::SwapDirRr::create() { Must(mapOwners.empty() && freeSlotsOwners.empty()); for (int i = 0; i < Config.cacheSwap.n_configured; ++i) { if (const Rock::SwapDir *const sd = dynamic_cast(INDEXSD(i))) { rebuildStatsOwners.push_back(Rebuild::Stats::Init(*sd)); const int64_t capacity = sd->slotLimitActual(); SwapDir::DirMap::Owner *const mapOwner = SwapDir::DirMap::Init(sd->inodeMapPath(), capacity); mapOwners.push_back(mapOwner); // TODO: somehow remove pool id and counters from PageStack? Ipc::Mem::PageStack::Config config; config.poolId = Ipc::Mem::PageStack::IdForSwapDirSpace(i); config.pageSize = 0; // this is an index of slots on _disk_ config.capacity = capacity; config.createFull = false; // Rebuild finds and pushes free slots Ipc::Mem::Owner *const freeSlotsOwner = shm_new(Ipc::Mem::PageStack)(sd->freeSlotsPath(), config); freeSlotsOwners.push_back(freeSlotsOwner); } } } Rock::SwapDirRr::~SwapDirRr() { for (size_t i = 0; i < mapOwners.size(); ++i) { delete rebuildStatsOwners[i]; delete mapOwners[i]; delete freeSlotsOwners[i]; } }