diff --git a/Framework/Core/include/Framework/DataRelayer.h b/Framework/Core/include/Framework/DataRelayer.h index 1e010fc12f3d4..e5a2aecea1de4 100644 --- a/Framework/Core/include/Framework/DataRelayer.h +++ b/Framework/Core/include/Framework/DataRelayer.h @@ -102,7 +102,8 @@ class DataRelayer DataRelayer(CompletionPolicy const&, std::vector const& routes, TimesliceIndex&, - ServiceRegistryRef); + ServiceRegistryRef, + int); /// This invokes the appropriate `InputRoute::danglingChecker` on every /// entry in the cache and if it returns true, it creates a new diff --git a/Framework/Core/include/Framework/DefaultsHelpers.h b/Framework/Core/include/Framework/DefaultsHelpers.h index 16d41d03baa7f..68e64cc42a90e 100644 --- a/Framework/Core/include/Framework/DefaultsHelpers.h +++ b/Framework/Core/include/Framework/DefaultsHelpers.h @@ -12,16 +12,24 @@ #ifndef O2_FRAMEWORK_DEFAULTHELPERS_H_ #define O2_FRAMEWORK_DEFAULTHELPERS_H_ +namespace fair::mq +{ +class ProgOptions; +} + namespace o2::framework { enum struct DeploymentMode; +struct DeviceConfig; struct DefaultsHelpers { static DeploymentMode deploymentMode(); /// @true if running online static bool onlineDeploymentMode(); /// get max number of timeslices in the queue - static unsigned int pipelineLength(); + static unsigned int pipelineLength(unsigned int minLength); + static unsigned int pipelineLength(const fair::mq::ProgOptions& options); + static unsigned int pipelineLength(const DeviceConfig& dc); }; } // namespace o2::framework diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 450f31f4ba7d3..c5cc021a53478 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -564,7 +564,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() if (dc.options.count("timeframes-rate-limit") && dc.options["timeframes-rate-limit"].defaulted() == false) { config->maxTimeframes = std::stoll(dc.options["timeframes-rate-limit"].as()); } else { - config->maxTimeframes = readers * DefaultsHelpers::pipelineLength(); + config->maxTimeframes = readers * DefaultsHelpers::pipelineLength(dc); } static bool once = false; // Until we guarantee this is called only once... diff --git a/Framework/Core/src/CommonServices.cxx b/Framework/Core/src/CommonServices.cxx index f786d99fd2c0d..6486406a06dca 100644 --- a/Framework/Core/src/CommonServices.cxx +++ b/Framework/Core/src/CommonServices.cxx @@ -414,11 +414,13 @@ o2::framework::ServiceSpec CommonServices::dataRelayer() .name = "datarelayer", .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle { auto& spec = services.get(); + int pipelineLength = DefaultsHelpers::pipelineLength(options); return ServiceHandle{TypeIdHelpers::uniqueId(), new DataRelayer(spec.completionPolicy, spec.inputs, services.get(), - services)}; + services, + pipelineLength)}; }, .configure = noConfiguration(), .kind = ServiceKind::Serial}; diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index ccfb58db7559a..da04a23e81c0c 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -1483,7 +1483,7 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) auto& infos = state.inputChannelInfos; if (context.balancingInputs) { - static int pipelineLength = DefaultsHelpers::pipelineLength(); + static int pipelineLength = DefaultsHelpers::pipelineLength(*ref.get().device()->fConfig); static uint64_t ahead = getenv("DPL_MAX_CHANNEL_AHEAD") ? std::atoll(getenv("DPL_MAX_CHANNEL_AHEAD")) : std::max(8, std::min(pipelineLength - 48, pipelineLength / 2)); auto newEnd = std::remove_if(pollOrder.begin(), pollOrder.end(), [&infos, limitNew = currentOldest.value + ahead](int a) -> bool { return infos[a].oldestForChannel.value > limitNew; @@ -2259,12 +2259,14 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v return false; } - auto postUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t tStart, uint64_t tStartMilli) { + int pipelineLength = DefaultsHelpers::pipelineLength(*ref.get().device()->fConfig); + + auto postUpdateStats = [ref, pipelineLength](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t tStart, uint64_t tStartMilli) { auto& stats = ref.get(); auto& states = ref.get(); std::atomic_thread_fence(std::memory_order_release); char relayerSlotState[1024]; - int written = snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength()); + int written = snprintf(relayerSlotState, 1024, "%d ", pipelineLength); char* buffer = relayerSlotState + written; for (size_t ai = 0; ai != record.size(); ai++) { buffer[ai] = record.isValid(ai) ? '3' : '0'; @@ -2291,11 +2293,11 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v count++; }; - auto preUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t) { + auto preUpdateStats = [ref, pipelineLength](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t) { auto& states = ref.get(); std::atomic_thread_fence(std::memory_order_release); char relayerSlotState[1024]; - snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength()); + snprintf(relayerSlotState, 1024, "%d ", pipelineLength); char* buffer = strchr(relayerSlotState, ' ') + 1; for (size_t ai = 0; ai != record.size(); ai++) { buffer[ai] = record.isValid(ai) ? '2' : '0'; diff --git a/Framework/Core/src/DataRelayer.cxx b/Framework/Core/src/DataRelayer.cxx index 05b64b6ed1dad..cece5b343659f 100644 --- a/Framework/Core/src/DataRelayer.cxx +++ b/Framework/Core/src/DataRelayer.cxx @@ -37,6 +37,7 @@ #include "Framework/DataProcessingStates.h" #include "Framework/DataTakingContext.h" #include "Framework/DefaultsHelpers.h" +#include "Framework/RawDeviceService.h" #include "Headers/DataHeaderHelpers.h" #include "Framework/Formatters.h" @@ -48,6 +49,7 @@ #include #include #include +#include #include #include #include @@ -70,7 +72,8 @@ constexpr int INVALID_INPUT = -1; DataRelayer::DataRelayer(const CompletionPolicy& policy, std::vector const& routes, TimesliceIndex& index, - ServiceRegistryRef services) + ServiceRegistryRef services, + int pipelineLength) : mContext{services}, mTimesliceIndex{index}, mCompletionPolicy{policy}, @@ -81,7 +84,17 @@ DataRelayer::DataRelayer(const CompletionPolicy& policy, std::scoped_lock lock(mMutex); if (policy.configureRelayer == nullptr) { - static int pipelineLength = DefaultsHelpers::pipelineLength(); + if (pipelineLength == -1) { + auto getPipelineLengthHelper = [&services]() { + try { + return DefaultsHelpers::pipelineLength(*services.get().device()->fConfig); + } catch (...) { + return DefaultsHelpers::pipelineLength(0); + } + }; + static int detectedPipelineLength = getPipelineLengthHelper(); + pipelineLength = detectedPipelineLength; + } setPipelineLength(pipelineLength); } else { policy.configureRelayer(*this); diff --git a/Framework/Core/src/DefaultsHelpers.cxx b/Framework/Core/src/DefaultsHelpers.cxx index 4dcc734216f0c..5fd1ed29e7af6 100644 --- a/Framework/Core/src/DefaultsHelpers.cxx +++ b/Framework/Core/src/DefaultsHelpers.cxx @@ -11,6 +11,9 @@ #include "Framework/DefaultsHelpers.h" #include "Framework/DataTakingContext.h" +#include "Framework/DeviceConfig.h" +#include + #include #include #include @@ -18,23 +21,35 @@ namespace o2::framework { -unsigned int DefaultsHelpers::pipelineLength() +unsigned int DefaultsHelpers::pipelineLength(unsigned int minLength) { static bool override = getenv("DPL_DEFAULT_PIPELINE_LENGTH"); if (override) { static unsigned int retval = atoi(getenv("DPL_DEFAULT_PIPELINE_LENGTH")); - return retval; + return std::max(minLength, retval); } DeploymentMode deploymentMode = DefaultsHelpers::deploymentMode(); // just some reasonable numers // The number should really be tuned at runtime for each processor. if (deploymentMode == DeploymentMode::OnlineDDS || deploymentMode == DeploymentMode::OnlineECS || deploymentMode == DeploymentMode::FST) { - return 512; + return std::max(minLength, 512u); } else { - return 64; + return std::max(minLength, 64u); } } +unsigned int DefaultsHelpers::pipelineLength(const DeviceConfig& dc) +{ + static unsigned int minLength = dc.options.count("timeframes-rate-limit") ? std::max(0, atoi(dc.options["timeframes-rate-limit"].as().c_str())) : 0; + return pipelineLength(minLength); +} + +unsigned int DefaultsHelpers::pipelineLength(const fair::mq::ProgOptions& options) +{ + static unsigned int minLength = options.Count("timeframes-rate-limit") ? std::max(0, atoi(options.GetValue("timeframes-rate-limit").c_str())) : 0; + return pipelineLength(minLength); +} + static DeploymentMode getDeploymentMode_internal() { char* explicitMode = getenv("O2_DPL_DEPLOYMENT_MODE"); diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 166f26878c363..ced884ebaa1ed 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -817,7 +817,8 @@ void spawnDevice(uv_loop_t* loop, .sendInitialValue = true, }); - for (size_t i = 0; i < DefaultsHelpers::pipelineLength(); ++i) { + unsigned int pipelineLength = DefaultsHelpers::pipelineLength(DeviceConfig{varmap}); + for (size_t i = 0; i < pipelineLength; ++i) { allStates.back().registerState(DataProcessingStates::StateSpec{ .name = fmt::format("matcher_variables/{}", i), .stateId = static_cast((short)(ProcessingStateId::CONTEXT_VARIABLES_BASE) + i), @@ -826,7 +827,7 @@ void spawnDevice(uv_loop_t* loop, }); } - for (size_t i = 0; i < DefaultsHelpers::pipelineLength(); ++i) { + for (size_t i = 0; i < pipelineLength; ++i) { allStates.back().registerState(DataProcessingStates::StateSpec{ .name = fmt::format("data_relayer/{}", i), .stateId = static_cast((short)(ProcessingStateId::DATA_RELAYER_BASE) + i), diff --git a/Framework/Core/test/benchmark_DataRelayer.cxx b/Framework/Core/test/benchmark_DataRelayer.cxx index dcff3930dbaad..3c3d2294fdd7e 100644 --- a/Framework/Core/test/benchmark_DataRelayer.cxx +++ b/Framework/Core/test/benchmark_DataRelayer.cxx @@ -65,7 +65,7 @@ static void BM_RelaySingleSlot(benchmark::State& state) TimesliceIndex index{1, infos}; auto policy = CompletionPolicyHelpers::consumeWhenAny(); ServiceRegistry registry; - DataRelayer relayer(policy, inputs, index, {registry}); + DataRelayer relayer(policy, inputs, index, {registry}, -1); relayer.setPipelineLength(4); // Let's create a dummy O2 Message with two headers in the stack: @@ -118,7 +118,7 @@ static void BM_RelayMultipleSlots(benchmark::State& state) auto policy = CompletionPolicyHelpers::consumeWhenAny(); ServiceRegistry registry; - DataRelayer relayer(policy, inputs, index, {registry}); + DataRelayer relayer(policy, inputs, index, {registry}, -1); relayer.setPipelineLength(4); // Let's create a dummy O2 Message with two headers in the stack: @@ -177,7 +177,7 @@ static void BM_RelayMultipleRoutes(benchmark::State& state) auto policy = CompletionPolicyHelpers::consumeWhenAny(); ServiceRegistry registry; - DataRelayer relayer(policy, inputs, index, {registry}); + DataRelayer relayer(policy, inputs, index, {registry}, -1); relayer.setPipelineLength(4); // Let's create a dummy O2 Message with two headers in the stack: @@ -254,7 +254,7 @@ static void BM_RelaySplitParts(benchmark::State& state) auto policy = CompletionPolicyHelpers::consumeWhenAny(); ServiceRegistry registry; - DataRelayer relayer(policy, inputs, index, {registry}); + DataRelayer relayer(policy, inputs, index, {registry}, -1); relayer.setPipelineLength(4); // Let's create a dummy O2 Message with two headers in the stack: @@ -314,7 +314,7 @@ static void BM_RelayMultiplePayloads(benchmark::State& state) auto policy = CompletionPolicyHelpers::consumeWhenAny(); ServiceRegistry registry; - DataRelayer relayer(policy, inputs, index, {registry}); + DataRelayer relayer(policy, inputs, index, {registry}, -1); relayer.setPipelineLength(4); // DataHeader matching the one provided in the input diff --git a/Framework/Core/test/test_DataRelayer.cxx b/Framework/Core/test/test_DataRelayer.cxx index 7d5a3ded88e16..8957e361cb8a2 100644 --- a/Framework/Core/test/test_DataRelayer.cxx +++ b/Framework/Core/test/test_DataRelayer.cxx @@ -83,7 +83,7 @@ TEST_CASE("DataRelayer") ref.registerService(ServiceRegistryHelpers::handleForService(&index)); auto policy = CompletionPolicyHelpers::consumeWhenAny(); - DataRelayer relayer(policy, inputs, index, {registry}); + DataRelayer relayer(policy, inputs, index, {registry}, -1); relayer.setPipelineLength(4); // Let's create a dummy O2 Message with two headers in the stack: @@ -133,7 +133,7 @@ TEST_CASE("DataRelayer") ref.registerService(ServiceRegistryHelpers::handleForService(&index)); auto policy = CompletionPolicyHelpers::consumeWhenAny(); - DataRelayer relayer(policy, inputs, index, {registry}); + DataRelayer relayer(policy, inputs, index, {registry}, -1); relayer.setPipelineLength(4); // Let's create a dummy O2 Message with two headers in the stack: @@ -195,7 +195,7 @@ TEST_CASE("DataRelayer") ref.registerService(ServiceRegistryHelpers::handleForService(&index)); auto policy = CompletionPolicyHelpers::consumeWhenAll(); - DataRelayer relayer(policy, inputs, index, {registry}); + DataRelayer relayer(policy, inputs, index, {registry}, -1); relayer.setPipelineLength(4); auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq"); @@ -276,7 +276,7 @@ TEST_CASE("DataRelayer") ref.registerService(ServiceRegistryHelpers::handleForService(&index)); auto policy = CompletionPolicyHelpers::consumeWhenAll(); - DataRelayer relayer(policy, inputs, index, {registry}); + DataRelayer relayer(policy, inputs, index, {registry}, -1); relayer.setPipelineLength(3); auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq"); @@ -359,7 +359,7 @@ TEST_CASE("DataRelayer") std::vector infos{1}; TimesliceIndex index{1, infos}; ref.registerService(ServiceRegistryHelpers::handleForService(&index)); - DataRelayer relayer(policy, inputs, index, {registry}); + DataRelayer relayer(policy, inputs, index, {registry}, -1); // Only two messages to fill the cache. relayer.setPipelineLength(2); @@ -437,7 +437,7 @@ TEST_CASE("DataRelayer") ref.registerService(ServiceRegistryHelpers::handleForService(&index)); auto policy = CompletionPolicyHelpers::processWhenAny(); - DataRelayer relayer(policy, inputs, index, {registry}); + DataRelayer relayer(policy, inputs, index, {registry}, -1); // Only two messages to fill the cache. relayer.setPipelineLength(2); @@ -509,7 +509,7 @@ TEST_CASE("DataRelayer") ref.registerService(ServiceRegistryHelpers::handleForService(&index)); auto policy = CompletionPolicyHelpers::processWhenAny(); - DataRelayer relayer(policy, inputs, index, {registry}); + DataRelayer relayer(policy, inputs, index, {registry}, -1); // Only two messages to fill the cache. relayer.setPipelineLength(3); @@ -568,7 +568,7 @@ TEST_CASE("DataRelayer") ref.registerService(ServiceRegistryHelpers::handleForService(&index)); auto policy = CompletionPolicyHelpers::processWhenAny(); - DataRelayer relayer(policy, inputs, index, {registry}); + DataRelayer relayer(policy, inputs, index, {registry}, -1); // Only two messages to fill the cache. relayer.setPipelineLength(1); @@ -629,7 +629,7 @@ TEST_CASE("DataRelayer") ref.registerService(ServiceRegistryHelpers::handleForService(&index)); auto policy = CompletionPolicyHelpers::processWhenAny(); - DataRelayer relayer(policy, inputs, index, {registry}); + DataRelayer relayer(policy, inputs, index, {registry}, -1); // Only two messages to fill the cache. relayer.setPipelineLength(1); @@ -698,7 +698,7 @@ TEST_CASE("DataRelayer") ref.registerService(ServiceRegistryHelpers::handleForService(&index)); auto policy = CompletionPolicyHelpers::consumeWhenAny(); - DataRelayer relayer(policy, inputs, index, {registry}); + DataRelayer relayer(policy, inputs, index, {registry}, -1); relayer.setPipelineLength(4); DataHeader dh{"CLUSTERS", "TPC", 0}; @@ -752,7 +752,7 @@ TEST_CASE("DataRelayer") ref.registerService(ServiceRegistryHelpers::handleForService(&index)); auto policy = CompletionPolicyHelpers::consumeWhenAny(); - DataRelayer relayer(policy, inputs, index, {registry}); + DataRelayer relayer(policy, inputs, index, {registry}, -1); relayer.setPipelineLength(4); auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");