diff --git a/Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFile.h b/Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFile.h index 340027642b74c..eeabf8e8d4117 100644 --- a/Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFile.h +++ b/Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFile.h @@ -21,6 +21,8 @@ #include #include +#include "Framework/DataSpecUtils.h" +#include "Framework/OutputSpec.h" #include "Framework/Logger.h" namespace o2 @@ -151,13 +153,13 @@ struct SubTimeFrameFileMeta { /// std::uint64_t mWriteTimeMs; - auto getTimePoint() + auto getTimePoint() const { using namespace std::chrono; return time_point{milliseconds{mWriteTimeMs}}; } - std::string getTimeString() + std::string getTimeString() const { using namespace std::chrono; std::time_t lTime = system_clock::to_time_t(getTimePoint()); @@ -167,6 +169,11 @@ struct SubTimeFrameFileMeta { return lTimeStream.str(); } + const std::string info() const + { + return fmt::format("Size in file: {} Time: {} Version: {}", mStfSizeInFile, getTimeString(), mStfFileVersion); + } + SubTimeFrameFileMeta(const std::uint64_t pStfSize) : SubTimeFrameFileMeta() { @@ -220,6 +227,11 @@ struct SubTimeFrameFileDataIndex { static_assert(sizeof(DataIndexElem) == 48, "DataIndexElem changed -> Binary compatibility is lost!"); } + + const std::string info() const + { + return fmt::format("DH: {} Cnt:{} Size:{} Offset:{}", o2::framework::DataSpecUtils::describe(o2::framework::OutputSpec{mDataOrigin, mDataDescription, mSubSpecification}), mDataBlockCnt, mSize, mOffset); + } }; SubTimeFrameFileDataIndex() = default; @@ -240,6 +252,8 @@ struct SubTimeFrameFileDataIndex { return sizeof(o2::header::DataHeader) + (sizeof(DataIndexElem) * mDataIndex.size()); } + const std::vector& getDataIndex() const { return mDataIndex; } + friend std::ostream& operator<<(std::ostream& pStream, const SubTimeFrameFileDataIndex& pIndex); private: diff --git a/Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFileReader.h b/Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFileReader.h index 3b926e0a79206..7fba597252ba8 100644 --- a/Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFileReader.h +++ b/Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFileReader.h @@ -50,7 +50,7 @@ class SubTimeFrameFileReader ~SubTimeFrameFileReader(); /// Read a single TF from the file - std::unique_ptr read(fair::mq::Device* device, const std::vector& outputRoutes, const std::string& rawChannel, size_t slice, bool sup0xccdb, int verbosity); + std::unique_ptr read(fair::mq::Device* device, const std::vector& outputRoutes, const std::string& rawChannel, size_t slice, bool sup0xccdb, bool repairHeaders, int verbosity); /// Tell the current position of the file inline std::uint64_t position() const { return mFileMapOffset; } diff --git a/Detectors/Raw/TFReaderDD/src/RawTFDumpSpec.cxx b/Detectors/Raw/TFReaderDD/src/RawTFDumpSpec.cxx index 64c39fa7ef75a..513edf236b0fe 100644 --- a/Detectors/Raw/TFReaderDD/src/RawTFDumpSpec.cxx +++ b/Detectors/Raw/TFReaderDD/src/RawTFDumpSpec.cxx @@ -97,6 +97,7 @@ class RawTFDump : public Task bool mCreateRunEnvDir = true; bool mAcceptCurrentTF = false; bool mRejectDEADBEEF = false; + bool mRejectDistSTF = true; int mVerbose = 0; std::vector mTFOrbits{}; // 1st orbits of TF accumulated in current file o2::framework::DataTakingContext mDataTakingContext{}; @@ -185,6 +186,7 @@ void RawTFDump::init(InitContext& ic) mWriteTF = false; mStoreMetaFile = false; } + mRejectDistSTF = !ic.options().get("include-dist-stf"); mRejectDEADBEEF = !ic.options().get("include-deadbeef"); mCreateRunEnvDir = !ic.options().get("ignore-partition-run-dir"); mMinFileSize = ic.options().get("min-file-size"); @@ -520,6 +522,9 @@ void RawTFDump::prepareTFForWriting(ProcessingContext& pc) if (dh->subSpecification == 0xdeadbeef && mRejectDEADBEEF) { continue; } + if (dh->dataOrigin == o2::header::gDataOriginFLP && dh->dataDescription == o2::header::gDataDescriptionDISTSTF && mRejectDistSTF) { + continue; + } const auto lHdrDataSize = sizeof(DataHeader) + dh->payloadSize; mTFSize += lHdrDataSize; @@ -577,6 +582,7 @@ DataProcessorSpec getRawTFDumpSpec(const std::string& inpconfig, const std::stri AlgorithmSpec{adaptFromTask(trigger)}, Options{ {"include-deadbeef", VariantType::Bool, false, {"Include DPL-generated 0xdeadbeef subspecs for missing data"}}, + {"include-dist-stf", VariantType::Bool, false, {"Include FLP/DISTSUBTIMEFRAME input"}}, {"exclude-trigger-specs", VariantType::String, "", {"Ignore trigger seen in these inputs of triggerspec"}}, {"max-dump-rate", VariantType::Float, 0.f, {"%-age of TFs to dump. W/o external trigger: random(>0) or periodic(<0) rejection, with: max limit"}}, {"rate-est-conf-limit", VariantType::Float, 0.05f, {"quantile for the lowest rate estimate confidence limit"}}, diff --git a/Detectors/Raw/TFReaderDD/src/SubTimeFrameFileReader.cxx b/Detectors/Raw/TFReaderDD/src/SubTimeFrameFileReader.cxx index 5f862dffe512f..c0de91d623c3a 100644 --- a/Detectors/Raw/TFReaderDD/src/SubTimeFrameFileReader.cxx +++ b/Detectors/Raw/TFReaderDD/src/SubTimeFrameFileReader.cxx @@ -184,7 +184,7 @@ std::uint64_t sCreationTime = 0; std::mutex stfMtx; std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* device, const std::vector& outputRoutes, - const std::string& rawChannel, size_t slice, bool sup0xccdb, int verbosity) + const std::string& rawChannel, size_t slice, bool sup0xccdb, bool repaireHeaders, int verbosity) { std::unique_ptr messagesPerRoute = std::make_unique(); auto& msgMap = *messagesPerRoute.get(); @@ -252,10 +252,15 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* return nullptr; } lStfMetaDataHdr = o2::header::DataHeader::Get(lMetaHdrStack.first()); - LOGP(debug, "read filemeta, pos = {}, size = {}", position(), sizeof(SubTimeFrameFileMeta)); + if (verbosity > 0) { + LOGP(info, "read filemeta, pos = {}, size = {}", position(), sizeof(SubTimeFrameFileMeta)); + } if (!read_advance(&lStfFileMeta, sizeof(SubTimeFrameFileMeta))) { return nullptr; } + if (verbosity > 0) { + LOGP(info, "TFMeta : {}", lStfFileMeta.info()); + } if (lStfFileMeta.mWriteTimeMs == 0 && creationFallBack != 0) { if (!creation0Notified) { creation0Notified = true; @@ -319,6 +324,7 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* std::int64_t lLeftToRead = lStfDataSize; STFHeader stfHeader{tfID, -1u, -1u}; + DataHeader prevHeader; // read pairs while (lLeftToRead > 0) { // allocate and read the Headers @@ -335,6 +341,28 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* return nullptr; } DataHeader locDataHeader(*lDataHeader); + + if (repaireHeaders) { + auto descHeader = [&locDataHeader]() { + return o2f::DataSpecUtils::describe(o2f::OutputSpec{locDataHeader.dataOrigin, locDataHeader.dataDescription, locDataHeader.subSpecification}); + }; + if (locDataHeader == prevHeader) { + if (prevHeader.tfCounter == locDataHeader.tfCounter && (prevHeader.splitPayloadIndex + 1) != locDataHeader.splitPayloadIndex) { + if (verbosity > 3) { + LOGP(warn, "Repairing wrong index {}/{} to {} for {}", locDataHeader.splitPayloadIndex, locDataHeader.splitPayloadParts, (prevHeader.splitPayloadIndex + 1) % prevHeader.splitPayloadParts, descHeader()); + } + locDataHeader.splitPayloadIndex = (++prevHeader.splitPayloadIndex) % prevHeader.splitPayloadParts; + } + } else { // new header + if (locDataHeader.splitPayloadIndex != 0) { + if (verbosity > 2) { + LOGP(warn, "Repairing wrong index {}/{} to 0 for new {}", locDataHeader.splitPayloadIndex, locDataHeader.splitPayloadParts, descHeader()); + } + locDataHeader.splitPayloadIndex = 0; + } + } + prevHeader = locDataHeader; + } // sanity check if (int(locDataHeader.firstTForbit) == -1) { if (!negativeOrbitNotified) { diff --git a/Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx b/Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx index 919e76083f595..f94edf8540ac8 100644 --- a/Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx +++ b/Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx @@ -118,6 +118,8 @@ void TFReaderSpec::init(o2f::InitContext& ic) mInput.maxTFsPerFile = mInput.maxTFsPerFile > 0 ? mInput.maxTFsPerFile : 0x7fffffff; mInput.maxTFCache = std::max(1, ic.options().get("max-cached-tf")); mInput.maxFileCache = std::max(1, ic.options().get("max-cached-files")); + mInput.repairHeaders = !ic.options().get("ignore-repair-headers"); + if (!mInput.fileRunTimeSpans.empty()) { loadRunTimeSpans(mInput.fileRunTimeSpans); } @@ -421,7 +423,7 @@ void TFReaderSpec::TFBuilder() std::this_thread::sleep_for(sleepTime); continue; } - auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mAccTFCounter, mInput.sup0xccdb, mInput.verbosity); + auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mAccTFCounter, mInput.sup0xccdb, mInput.repairHeaders, mInput.verbosity); bool acceptTF = true; if (tf) { if (mRunTimeRanges.size()) { @@ -675,6 +677,7 @@ o2f::DataProcessorSpec o2::rawdd::getTFReaderSpec(o2::rawdd::TFReaderInp& rinp) } spec.options.emplace_back(o2f::ConfigParamSpec{"select-tf-ids", o2f::VariantType::String, "", {"comma-separated list TF IDs to inject (from cumulative counter of TFs seen)"}}); spec.options.emplace_back(o2f::ConfigParamSpec{"fetch-failure-threshold", o2f::VariantType::Float, 0.f, {"Fatil if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}}); + spec.options.emplace_back(o2f::ConfigParamSpec{"ignore-repair-headers", o2f::VariantType::Bool, false, {"do not check/repair headers"}}); spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf", o2f::VariantType::Int, -1, {"max TF ID to process (<= 0 : infinite)"}}); spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf-per-file", o2f::VariantType::Int, -1, {"max TFs to process per raw-tf file (<= 0 : infinite)"}}); spec.options.emplace_back(o2f::ConfigParamSpec{"max-cached-tf", o2f::VariantType::Int, 3, {"max TFs to cache in memory"}}); diff --git a/Detectors/Raw/TFReaderDD/src/TFReaderSpec.h b/Detectors/Raw/TFReaderDD/src/TFReaderSpec.h index 2c1c62ecbb414..57dd283d9a1d5 100644 --- a/Detectors/Raw/TFReaderDD/src/TFReaderSpec.h +++ b/Detectors/Raw/TFReaderDD/src/TFReaderSpec.h @@ -49,6 +49,7 @@ struct TFReaderInp { bool sendDummyForMissing = true; bool sup0xccdb = false; bool invertIRFramesSelection = false; + bool repairHeaders = true; std::vector hdVec; std::vector tfIDs{}; }; diff --git a/Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx b/Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx index b424353531de7..a29b4dadfdb25 100644 --- a/Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx +++ b/Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx @@ -34,7 +34,7 @@ void customize(std::vector& workflowOptions) options.push_back(ConfigParamSpec{"copy-dir", VariantType::String, "/tmp/", {"copy base directory for remote files"}}); options.push_back(ConfigParamSpec{"tf-file-regex", VariantType::String, ".+\\.tf$", {"regex string to identify TF files"}}); options.push_back(ConfigParamSpec{"remote-regex", VariantType::String, "^(alien://|)/alice/data/.+", {"regex string to identify remote files"}}); // Use "^/eos/aliceo2/.+" for direct EOS access - options.push_back(ConfigParamSpec{"tf-reader-verbosity", VariantType::Int, 0, {"verbosity level (1 or 2: check RDH, print DH/DPH for 1st or all slices, >2 print RDH)"}}); + options.push_back(ConfigParamSpec{"tf-reader-verbosity", VariantType::Int, 0, {"verbosity level (1 or 2: check RDH, print DH/DPH for 1st or all slices, >2 print RDH), report repairs"}}); options.push_back(ConfigParamSpec{"raw-channel-config", VariantType::String, "", {"optional raw FMQ channel for non-DPL output"}}); options.push_back(ConfigParamSpec{"send-diststf-0xccdb", VariantType::Bool, false, {"send explicit FLP/DISTSUBTIMEFRAME/0xccdb output"}}); options.push_back(ConfigParamSpec{"disable-dummy-output", VariantType::Bool, false, {"Disable sending empty output if corresponding data is not found in the data"}});