Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions Framework/AnalysisSupport/src/AODWriterHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "Framework/EndOfStreamContext.h"
#include "Framework/ProcessingContext.h"
#include "Framework/InitContext.h"
#include "Framework/Output.h"
#include "Framework/CallbackService.h"
#include "Framework/AnalysisSupportHelpers.h"
#include "Framework/TableConsumer.h"
Expand All @@ -31,6 +32,10 @@
#include <TMap.h>
#include <TObjString.h>
#include <arrow/table.h>
#include <algorithm>
#include <string>
#include <utility>
#include <vector>

O2_DECLARE_DYNAMIC_LOG(histogram_registry);

Expand Down Expand Up @@ -477,4 +482,46 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& /*ct
};
}};
}

AlgorithmSpec AODWriterHelpers::getMetadataCollector(ConfigContext const& /*ctx*/)
{
return AlgorithmSpec{[](InitContext&) -> std::function<void(ProcessingContext&)> {
// Accumulated metadata, last writer wins per key.
auto merged = std::make_shared<std::vector<std::pair<std::string, std::string>>>();
return [merged](ProcessingContext& pc) -> void {
auto nParts = pc.inputs().getNofParts(0);
for (auto pi = 0U; pi < nParts; ++pi) {
auto part = pc.inputs().get<TMap*>("meta", pi);
if (!part) {
continue;
}
TIter next(part.get());
while (TObject* key = next()) {
TObject* value = part->GetValue(key);
std::string k = key->GetName();
std::string v = value != nullptr ? value->GetName() : "";
auto it = std::find_if(merged->begin(), merged->end(),
[&k](auto const& e) { return e.first == k; });
if (it != merged->end()) {
it->second = std::move(v);
} else {
merged->emplace_back(std::move(k), std::move(v));
}
}
}
// Emit the keys/vals vectors the AOD writer already turns into the AO2D
// metaData TMap, so no special handling is needed there.
std::vector<TString> keys, vals;
keys.reserve(merged->size());
vals.reserve(merged->size());
for (auto const& [k, v] : *merged) {
keys.emplace_back(k);
vals.emplace_back(v);
}
LOG(debug) << "metadata-collector: emitting " << keys.size() << " aggregated metadata entries";
pc.outputs().snapshot(Output{"AMD", "AODMetadataKeys", 0}, keys);
pc.outputs().snapshot(Output{"AMD", "AODMetadataVals", 0}, vals);
};
}};
}
} // namespace o2::framework::writers
1 change: 1 addition & 0 deletions Framework/AnalysisSupport/src/AODWriterHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ namespace o2::framework::writers
struct AODWriterHelpers {
static AlgorithmSpec getOutputObjHistWriter(ConfigContext const& context);
static AlgorithmSpec getOutputTTreeWriter(ConfigContext const& context);
static AlgorithmSpec getMetadataCollector(ConfigContext const& context);
};

} // namespace o2::framework::writers
Expand Down
8 changes: 8 additions & 0 deletions Framework/AnalysisSupport/src/Plugin.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ struct ROOTTTreeWriter : o2::framework::AlgorithmPlugin {
}
};

struct ROOTMetadataCollector : o2::framework::AlgorithmPlugin {
o2::framework::AlgorithmSpec create(o2::framework::ConfigContext const& config) override
{
return o2::framework::writers::AODWriterHelpers::getMetadataCollector(config);
}
};

using namespace o2::framework;
struct RunSummary : o2::framework::ServicePlugin {
o2::framework::ServiceSpec* create() final
Expand Down Expand Up @@ -286,6 +293,7 @@ DEFINE_DPL_PLUGINS_BEGIN
DEFINE_DPL_PLUGIN_INSTANCE(ROOTFileReader, CustomAlgorithm);
DEFINE_DPL_PLUGIN_INSTANCE(ROOTObjWriter, CustomAlgorithm);
DEFINE_DPL_PLUGIN_INSTANCE(ROOTTTreeWriter, CustomAlgorithm);
DEFINE_DPL_PLUGIN_INSTANCE(ROOTMetadataCollector, CustomAlgorithm);
DEFINE_DPL_PLUGIN_INSTANCE(RunSummary, CustomService);
DEFINE_DPL_PLUGIN_INSTANCE(DiscoverMetadataInAOD, ConfigDiscovery);
DEFINE_DPL_PLUGINS_END
3 changes: 3 additions & 0 deletions Framework/Core/include/Framework/AnalysisSupportHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ struct AnalysisSupportHelpers {
static DataProcessorSpec getOutputObjHistSink(ConfigContext const&);
/// writes inputs of kind AOD to file
static DataProcessorSpec getGlobalAODSink(ConfigContext const&);
/// Match all inputs of kind META, merge them and republish as the AOD
/// metadata keys/vals consumed by the AOD writer.
static DataProcessorSpec getMetadataCollectorSink(ConfigContext const&);
/// Get the data director
static std::shared_ptr<DataOutputDirector> getDataOutputDirector(ConfigContext const& ctx);
};
Expand Down
19 changes: 19 additions & 0 deletions Framework/Core/src/AnalysisSupportHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,23 @@ DataProcessorSpec

return spec;
}

DataProcessorSpec
AnalysisSupportHelpers::getMetadataCollectorSink(ConfigContext const& ctx)
{
// Lifetime is sporadic because META messages are not produced every
// timeframe. The oldest-possible-timeframe completion policy (registered
// in CompletionPolicy::createDefaultPolicies) decides when the collected
// parts are merged and republished as the AOD metadata keys/vals that the
// AOD writer turns into the AO2D metaData object.
DataProcessorSpec spec{
.name = "internal-dpl-metadata-collector",
.inputs = {InputSpec("meta", DataSpecUtils::dataDescriptorMatcherFrom(header::DataOrigin{"META"}), Lifetime::Sporadic)},
.outputs = {OutputSpec{OutputLabel{"keys"}, header::DataOrigin{"AMD"}, header::DataDescription{"AODMetadataKeys"}, 0, Lifetime::Sporadic},
OutputSpec{OutputLabel{"vals"}, header::DataOrigin{"AMD"}, header::DataDescription{"AODMetadataVals"}, 0, Lifetime::Sporadic}},
.algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTMetadataCollector", ctx),
};

return spec;
}
} // namespace o2::framework
1 change: 1 addition & 0 deletions Framework/Core/src/CompletionPolicy.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ std::vector<CompletionPolicy>
return {
CompletionPolicyHelpers::consumeWhenAllOrdered("internal-dpl-aod-writer"),
CompletionPolicyHelpers::consumeWhenAnyZeroCount("internal-dpl-injected-dummy-sink", [](DeviceSpec const& s) { return s.name.find("internal-dpl-injected-dummy-sink") != std::string::npos; }),
CompletionPolicyHelpers::consumeWhenPastOldestPossibleTimeframe("internal-dpl-metadata-collector", [](DeviceSpec const& s) { return s.name == "internal-dpl-metadata-collector"; }),
CompletionPolicyHelpers::consumeWhenAll()};
}

Expand Down
9 changes: 9 additions & 0 deletions Framework/Core/src/WorkflowHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext

std::vector<InputSpec> requestedCCDBs;
std::vector<OutputSpec> providedCCDBs;
bool hasMetaOutput = false;

for (size_t wi = 0; wi < workflow.size(); ++wi) {
auto& processor = workflow[wi];
Expand Down Expand Up @@ -392,6 +393,8 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
} else {
it->bindings.push_back(output.binding.value);
}
} else if (DataSpecUtils::partialMatch(output, header::DataOrigin{"META"})) {
hasMetaOutput = true;
}

if (output.lifetime == Lifetime::Condition) {
Expand Down Expand Up @@ -583,6 +586,12 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
extraSpecs.push_back(rootSink);
}

// Inject a collector which merges all META messages and republishes them as
// the AOD metadata keys/vals the AOD writer writes into the AO2D file.
if (hasMetaOutput) {
extraSpecs.push_back(AnalysisSupportHelpers::getMetadataCollectorSink(ctx));
}

workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
extraSpecs.clear();

Expand Down
55 changes: 37 additions & 18 deletions Framework/TestWorkflows/src/o2TestHistograms.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@
#include "Framework/runDataProcessing.h"
#include "Framework/AnalysisTask.h"
#include <TH2F.h>
#include <TMap.h>
#include <TObjString.h>
#include <TString.h>

using namespace o2;
using namespace o2::framework;
using namespace o2::framework::expressions;

// pT cut applied when producing the skimmed derived dataset; the same value is
// published as run metadata so it lands in the derived AO2D's metaData object.
static constexpr float kSkimPtCut = 1.5f;

namespace o2::aod
{
O2ORIGIN("EMB");
Expand All @@ -38,8 +45,8 @@ DECLARE_SOA_TABLE(SkimmedExampleTrack, "AOD", "SKIMEXTRK", //!
struct EtaAndClsHistogramsSimple {
OutputObj<TH2F> etaClsH{TH2F("eta_vs_pt", "#eta vs pT", 102, -2.01, 2.01, 100, 0, 10)};
Produces<o2::aod::SkimmedExampleTrack> skimEx;
Configurable<std::string> trackFilterString{"track-filter", "o2::aod::track::pt < 10.f", "Track filter string"};
Filter trackFilter = o2::aod::track::pt < 10.f;
Configurable<std::string> trackFilterString{"track-filter", "", "Track filter string (overrides the pT cut when set)"};
Filter trackFilter = o2::aod::track::pt < kSkimPtCut;

HistogramRegistry registry{
"registry",
Expand Down Expand Up @@ -88,8 +95,8 @@ struct EtaAndClsHistogramsSimple {
struct EtaAndClsHistogramsIUSimple {
OutputObj<TH2F> etaClsH{TH2F("eta_vs_pt", "#eta vs pT", 102, -2.01, 2.01, 100, 0, 10)};
Produces<o2::aod::SkimmedExampleTrack> skimEx;
Configurable<std::string> trackFilterString{"track-filter", "o2::aod::track::pt < 10.f", "Track filter string"};
Filter trackFilter = o2::aod::track::pt < 10.f;
Configurable<std::string> trackFilterString{"track-filter", "", "Track filter string (overrides the pT cut when set)"};
Filter trackFilter = o2::aod::track::pt < kSkimPtCut;

HistogramRegistry registry{
"registry",
Expand Down Expand Up @@ -136,8 +143,8 @@ struct EtaAndClsHistogramsFull {
} //
};

Configurable<std::string> trackFilterString{"track-filter", "o2::aod::track::pt < 10.f", "Track filter string"};
Filter trackFilter = o2::aod::track::pt < 10.f;
Configurable<std::string> trackFilterString{"track-filter", "", "Track filter string (overrides the pT cut when set)"};
Filter trackFilter = o2::aod::track::pt < kSkimPtCut;

void init(InitContext&)
{
Expand Down Expand Up @@ -183,25 +190,37 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc)
LOGP(info, "- {} present.", table);
}
// Notice it's important for the tasks to use the same name, otherwise topology generation will be confused.
WorkflowSpec specs;
if (runType == "2" || !hasTrackCov) {
LOGP(info, "Using only tracks {}", runType);
if (hasTrackIU) {
return WorkflowSpec{
adaptAnalysisTask<EtaAndClsHistogramsIUSimple>(cfgc, TaskName{"simple-histos"}),
};
specs = WorkflowSpec{adaptAnalysisTask<EtaAndClsHistogramsIUSimple>(cfgc, TaskName{"simple-histos"})};
} else {
specs = WorkflowSpec{adaptAnalysisTask<EtaAndClsHistogramsSimple>(cfgc, TaskName{"simple-histos"})};
}
return WorkflowSpec{
adaptAnalysisTask<EtaAndClsHistogramsSimple>(cfgc, TaskName{"simple-histos"}),
};
} else {
LOGP(info, "Using tracks extra {}", runType);
if (hasTrackIU) {
return WorkflowSpec{
adaptAnalysisTask<EtaAndClsHistogramsIUSimple>(cfgc, TaskName{"simple-histos"}),
};
specs = WorkflowSpec{adaptAnalysisTask<EtaAndClsHistogramsIUSimple>(cfgc, TaskName{"simple-histos"})};
} else {
specs = WorkflowSpec{adaptAnalysisTask<EtaAndClsHistogramsFull>(cfgc, TaskName{"simple-histos"})};
}
return WorkflowSpec{
adaptAnalysisTask<EtaAndClsHistogramsFull>(cfgc, TaskName{"simple-histos"}),
};
}

// Publish the skimming pT cut as run metadata, once per data frame so it is
// aligned with the derived tables. The auto-injected metadata collector merges
// all META messages (oldest-possible completion) and the AOD writer stores the
// result as the metaData object of the derived AO2D file.
specs.push_back(DataProcessorSpec{
.name = "skim-metadata",
.inputs = {InputSpec{"tfn", "TFN", "TFNumber"}},
.outputs = {OutputSpec{{"meta"}, "META", "SKIMINFO", 0, Lifetime::Sporadic}},
.algorithm = adaptStateless([](ProcessingContext& pc) {
TMap m;
m.SetOwnerKeyValue();
m.Add(new TObjString("SkimTrackPtCut"), new TObjString(TString::Format("%g", kSkimPtCut)));
pc.outputs().snapshot(Output{"META", "SKIMINFO", 0}, m);
}),
});
return specs;
}