Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <vector>

#include <Headers/DataHeader.h>
#include "Framework/DataSpecUtils.h"
#include "Framework/OutputSpec.h"
#include "Framework/Logger.h"

namespace o2
Expand Down Expand Up @@ -151,13 +153,13 @@ struct SubTimeFrameFileMeta {
///
std::uint64_t mWriteTimeMs;

auto getTimePoint()
auto getTimePoint() const
{
using namespace std::chrono;
return time_point<system_clock, milliseconds>{milliseconds{mWriteTimeMs}};
}

std::string getTimeString()
std::string getTimeString() const
{
using namespace std::chrono;
std::time_t lTime = system_clock::to_time_t(getTimePoint());
Expand All @@ -167,6 +169,11 @@ struct SubTimeFrameFileMeta {
return lTimeStream.str();
}

const std::string info() const
{
return fmt::format("Size in file: {} Time: {} Version: {}", mStfSizeInFile, getTimeString(), mStfFileVersion);
}

SubTimeFrameFileMeta(const std::uint64_t pStfSize)
: SubTimeFrameFileMeta()
{
Expand Down Expand Up @@ -220,6 +227,11 @@ struct SubTimeFrameFileDataIndex {
static_assert(sizeof(DataIndexElem) == 48,
"DataIndexElem changed -> Binary compatibility is lost!");
}

const std::string info() const
{
return fmt::format("DH: {} Cnt:{} Size:{} Offset:{}", o2::framework::DataSpecUtils::describe(o2::framework::OutputSpec{mDataOrigin, mDataDescription, mSubSpecification}), mDataBlockCnt, mSize, mOffset);
}
};

SubTimeFrameFileDataIndex() = default;
Expand All @@ -240,6 +252,8 @@ struct SubTimeFrameFileDataIndex {
return sizeof(o2::header::DataHeader) + (sizeof(DataIndexElem) * mDataIndex.size());
}

const std::vector<DataIndexElem>& getDataIndex() const { return mDataIndex; }

friend std::ostream& operator<<(std::ostream& pStream, const SubTimeFrameFileDataIndex& pIndex);

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class SubTimeFrameFileReader
~SubTimeFrameFileReader();

/// Read a single TF from the file
std::unique_ptr<MessagesPerRoute> read(fair::mq::Device* device, const std::vector<o2f::OutputRoute>& outputRoutes, const std::string& rawChannel, size_t slice, bool sup0xccdb, int verbosity);
std::unique_ptr<MessagesPerRoute> read(fair::mq::Device* device, const std::vector<o2f::OutputRoute>& outputRoutes, const std::string& rawChannel, size_t slice, bool sup0xccdb, bool repairHeaders, int verbosity);

/// Tell the current position of the file
inline std::uint64_t position() const { return mFileMapOffset; }
Expand Down
6 changes: 6 additions & 0 deletions Detectors/Raw/TFReaderDD/src/RawTFDumpSpec.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class RawTFDump : public Task
bool mCreateRunEnvDir = true;
bool mAcceptCurrentTF = false;
bool mRejectDEADBEEF = false;
bool mRejectDistSTF = true;
int mVerbose = 0;
std::vector<uint32_t> mTFOrbits{}; // 1st orbits of TF accumulated in current file
o2::framework::DataTakingContext mDataTakingContext{};
Expand Down Expand Up @@ -185,6 +186,7 @@ void RawTFDump::init(InitContext& ic)
mWriteTF = false;
mStoreMetaFile = false;
}
mRejectDistSTF = !ic.options().get<bool>("include-dist-stf");
mRejectDEADBEEF = !ic.options().get<bool>("include-deadbeef");
mCreateRunEnvDir = !ic.options().get<bool>("ignore-partition-run-dir");
mMinFileSize = ic.options().get<int64_t>("min-file-size");
Expand Down Expand Up @@ -520,6 +522,9 @@ void RawTFDump::prepareTFForWriting(ProcessingContext& pc)
if (dh->subSpecification == 0xdeadbeef && mRejectDEADBEEF) {
continue;
}
if (dh->dataOrigin == o2::header::gDataOriginFLP && dh->dataDescription == o2::header::gDataDescriptionDISTSTF && mRejectDistSTF) {
continue;
}
const auto lHdrDataSize = sizeof(DataHeader) + dh->payloadSize;
mTFSize += lHdrDataSize;

Expand Down Expand Up @@ -577,6 +582,7 @@ DataProcessorSpec getRawTFDumpSpec(const std::string& inpconfig, const std::stri
AlgorithmSpec{adaptFromTask<RawTFDump>(trigger)},
Options{
{"include-deadbeef", VariantType::Bool, false, {"Include DPL-generated 0xdeadbeef subspecs for missing data"}},
{"include-dist-stf", VariantType::Bool, false, {"Include FLP/DISTSUBTIMEFRAME input"}},
{"exclude-trigger-specs", VariantType::String, "", {"Ignore trigger seen in these inputs of triggerspec"}},
{"max-dump-rate", VariantType::Float, 0.f, {"%-age of TFs to dump. W/o external trigger: random(>0) or periodic(<0) rejection, with: max limit"}},
{"rate-est-conf-limit", VariantType::Float, 0.05f, {"quantile for the lowest rate estimate confidence limit"}},
Expand Down
32 changes: 30 additions & 2 deletions Detectors/Raw/TFReaderDD/src/SubTimeFrameFileReader.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ std::uint64_t sCreationTime = 0;
std::mutex stfMtx;

std::unique_ptr<MessagesPerRoute> SubTimeFrameFileReader::read(fair::mq::Device* device, const std::vector<o2f::OutputRoute>& outputRoutes,
const std::string& rawChannel, size_t slice, bool sup0xccdb, int verbosity)
const std::string& rawChannel, size_t slice, bool sup0xccdb, bool repaireHeaders, int verbosity)
{
std::unique_ptr<MessagesPerRoute> messagesPerRoute = std::make_unique<MessagesPerRoute>();
auto& msgMap = *messagesPerRoute.get();
Expand Down Expand Up @@ -252,10 +252,15 @@ std::unique_ptr<MessagesPerRoute> SubTimeFrameFileReader::read(fair::mq::Device*
return nullptr;
}
lStfMetaDataHdr = o2::header::DataHeader::Get(lMetaHdrStack.first());
LOGP(debug, "read filemeta, pos = {}, size = {}", position(), sizeof(SubTimeFrameFileMeta));
if (verbosity > 0) {
LOGP(info, "read filemeta, pos = {}, size = {}", position(), sizeof(SubTimeFrameFileMeta));
}
if (!read_advance(&lStfFileMeta, sizeof(SubTimeFrameFileMeta))) {
return nullptr;
}
if (verbosity > 0) {
LOGP(info, "TFMeta : {}", lStfFileMeta.info());
}
if (lStfFileMeta.mWriteTimeMs == 0 && creationFallBack != 0) {
if (!creation0Notified) {
creation0Notified = true;
Expand Down Expand Up @@ -319,6 +324,7 @@ std::unique_ptr<MessagesPerRoute> SubTimeFrameFileReader::read(fair::mq::Device*

std::int64_t lLeftToRead = lStfDataSize;
STFHeader stfHeader{tfID, -1u, -1u};
DataHeader prevHeader;
// read <hdrStack + data> pairs
while (lLeftToRead > 0) {
// allocate and read the Headers
Expand All @@ -335,6 +341,28 @@ std::unique_ptr<MessagesPerRoute> SubTimeFrameFileReader::read(fair::mq::Device*
return nullptr;
}
DataHeader locDataHeader(*lDataHeader);

if (repaireHeaders) {
auto descHeader = [&locDataHeader]() {
return o2f::DataSpecUtils::describe(o2f::OutputSpec{locDataHeader.dataOrigin, locDataHeader.dataDescription, locDataHeader.subSpecification});
};
if (locDataHeader == prevHeader) {
if (prevHeader.tfCounter == locDataHeader.tfCounter && (prevHeader.splitPayloadIndex + 1) != locDataHeader.splitPayloadIndex) {
if (verbosity > 3) {
LOGP(warn, "Repairing wrong index {}/{} to {} for {}", locDataHeader.splitPayloadIndex, locDataHeader.splitPayloadParts, (prevHeader.splitPayloadIndex + 1) % prevHeader.splitPayloadParts, descHeader());
}
locDataHeader.splitPayloadIndex = (++prevHeader.splitPayloadIndex) % prevHeader.splitPayloadParts;
}
} else { // new header
if (locDataHeader.splitPayloadIndex != 0) {
if (verbosity > 2) {
LOGP(warn, "Repairing wrong index {}/{} to 0 for new {}", locDataHeader.splitPayloadIndex, locDataHeader.splitPayloadParts, descHeader());
}
locDataHeader.splitPayloadIndex = 0;
}
}
prevHeader = locDataHeader;
}
// sanity check
if (int(locDataHeader.firstTForbit) == -1) {
if (!negativeOrbitNotified) {
Expand Down
5 changes: 4 additions & 1 deletion Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ void TFReaderSpec::init(o2f::InitContext& ic)
mInput.maxTFsPerFile = mInput.maxTFsPerFile > 0 ? mInput.maxTFsPerFile : 0x7fffffff;
mInput.maxTFCache = std::max(1, ic.options().get<int>("max-cached-tf"));
mInput.maxFileCache = std::max(1, ic.options().get<int>("max-cached-files"));
mInput.repairHeaders = !ic.options().get<bool>("ignore-repair-headers");

if (!mInput.fileRunTimeSpans.empty()) {
loadRunTimeSpans(mInput.fileRunTimeSpans);
}
Expand Down Expand Up @@ -421,7 +423,7 @@ void TFReaderSpec::TFBuilder()
std::this_thread::sleep_for(sleepTime);
continue;
}
auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mAccTFCounter, mInput.sup0xccdb, mInput.verbosity);
auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mAccTFCounter, mInput.sup0xccdb, mInput.repairHeaders, mInput.verbosity);
bool acceptTF = true;
if (tf) {
if (mRunTimeRanges.size()) {
Expand Down Expand Up @@ -675,6 +677,7 @@ o2f::DataProcessorSpec o2::rawdd::getTFReaderSpec(o2::rawdd::TFReaderInp& rinp)
}
spec.options.emplace_back(o2f::ConfigParamSpec{"select-tf-ids", o2f::VariantType::String, "", {"comma-separated list TF IDs to inject (from cumulative counter of TFs seen)"}});
spec.options.emplace_back(o2f::ConfigParamSpec{"fetch-failure-threshold", o2f::VariantType::Float, 0.f, {"Fatil if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
spec.options.emplace_back(o2f::ConfigParamSpec{"ignore-repair-headers", o2f::VariantType::Bool, false, {"do not check/repair headers"}});
spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf", o2f::VariantType::Int, -1, {"max TF ID to process (<= 0 : infinite)"}});
spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf-per-file", o2f::VariantType::Int, -1, {"max TFs to process per raw-tf file (<= 0 : infinite)"}});
spec.options.emplace_back(o2f::ConfigParamSpec{"max-cached-tf", o2f::VariantType::Int, 3, {"max TFs to cache in memory"}});
Expand Down
1 change: 1 addition & 0 deletions Detectors/Raw/TFReaderDD/src/TFReaderSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ struct TFReaderInp {
bool sendDummyForMissing = true;
bool sup0xccdb = false;
bool invertIRFramesSelection = false;
bool repairHeaders = true;
std::vector<o2::header::DataHeader> hdVec;
std::vector<int> tfIDs{};
};
Expand Down
2 changes: 1 addition & 1 deletion Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ void customize(std::vector<ConfigParamSpec>& workflowOptions)
options.push_back(ConfigParamSpec{"copy-dir", VariantType::String, "/tmp/", {"copy base directory for remote files"}});
options.push_back(ConfigParamSpec{"tf-file-regex", VariantType::String, ".+\\.tf$", {"regex string to identify TF files"}});
options.push_back(ConfigParamSpec{"remote-regex", VariantType::String, "^(alien://|)/alice/data/.+", {"regex string to identify remote files"}}); // Use "^/eos/aliceo2/.+" for direct EOS access
options.push_back(ConfigParamSpec{"tf-reader-verbosity", VariantType::Int, 0, {"verbosity level (1 or 2: check RDH, print DH/DPH for 1st or all slices, >2 print RDH)"}});
options.push_back(ConfigParamSpec{"tf-reader-verbosity", VariantType::Int, 0, {"verbosity level (1 or 2: check RDH, print DH/DPH for 1st or all slices, >2 print RDH), report repairs"}});
options.push_back(ConfigParamSpec{"raw-channel-config", VariantType::String, "", {"optional raw FMQ channel for non-DPL output"}});
options.push_back(ConfigParamSpec{"send-diststf-0xccdb", VariantType::Bool, false, {"send explicit FLP/DISTSUBTIMEFRAME/0xccdb output"}});
options.push_back(ConfigParamSpec{"disable-dummy-output", VariantType::Bool, false, {"Disable sending empty output if corresponding data is not found in the data"}});
Expand Down