Udostępnij przez


Przykładowy projekt koordynatora działań

W tym prostym przykładzie koordynatora działań pokazano, jak interfejs API można wykorzystać do ponownego trenowania modelu w tle po spełnieniu warunków systemowych.

Omówienie przykładowego projektu

Rozważmy przypadek aplikacji do edycji muzyki. Ta aplikacja ma zadania w tle o wysokim priorytcie, które obsługują żądania użytkowników, takie jak publikowanie zawartości w magazynie w chmurze. Istnieją również zadania w tle o niskim priorytcie, które obsługują interakcję użytkowników, takie jak dostarczanie automatycznych zaleceń w celu ulepszenia kompozycji podczas edytowania. Na koniec istnieje zestaw odroczonych zadań, które nie muszą być wykonywane w określonym czasie bez żądania użytkownika, co jest naszym celem w tym przykładzie. W szczególności chcemy okresowo ponownie trenować model rekomendacji, gdy wpływ użytkownika jest minimalny. Aby to osiągnąć, możemy użyć interfejsu API koordynatora działań.

W tym scenariuszu chcemy ponownie wytrenować model, gdy użytkownik nie jest obecny. Przepływ pracy ponownego trenowania w tym scenariuszu jest również konsumentem procesora GPU, dlatego chcemy również uruchomić, gdy jest to dobry moment na użycie procesora GPU. Możemy określić te wymagania przy użyciu zasad koordynatora działań. Interfejs API koordynatora działań użyje naszych zasad, aby określić, kiedy są spełnione wymagania i wysyłać powiadomienia o tym, kiedy rozpocząć lub przestać uruchamiać naszą pracę.

W tym przypadku szablon zasad GOOD spełnia większość naszych potrzeb, ponieważ śledzi procesor CPU, pamięć, dysk systemowy, zasilanie i bezczynność użytkownika. Po prostu musimy jawnie ustawić warunek dla procesora GPU. Należy pamiętać, że mimo że nasze obciążenie będzie korzystać głównie z procesora GPU, wykonanie naszej aktywności nadal zużywa procesor CPU, pamięć, dysk i moc. Wpływ na te zasoby może się również znacznie różnić w zależności od konfiguracji systemu. Na przykład szybszy procesor GPU może spowodować, że procesor CPU poświęca więcej czasu na przekazywanie procesora GPU przy użyciu danych, co może spowodować odczytanie lub zapisanie większej ilości danych na dysku. Szybkość tego dysku może również mieć wpływ na użycie procesora CPU w podobny sposób. Konfigurując wszystkie zasoby, które mamy wpływ, możemy mieć pewność, że nie zakłócamy przypadkowo środowiska użytkownika ani nie obniżamy wydajności systemu. Ponadto sama praca została podzielona na małe fragmenty, dzięki czemu możemy odpowiednio reagować na powiadomienia koordynacji, aby uniknąć działania poza żądanymi warunkami.

Aby zademonstrować, w jaki sposób deweloperzy mogą zmieniać lub obniżać zasady, dodamy również wymaganie ponownego trenowania w ciągu 48 godzin. Pierwsze 24 godziny, nasz miękki termin, staramy się uruchomić z naszą idealną polityką, a ostatnie 24 godziny obniżamy do mniejszej polityki.

Przykładowy kod projektu

Poniższy kod to przykładowa aplikacja do edycji muzyki. Korzysta on z interfejsu API koordynatora działań do wykonywania zadań w tle zgodnie z opisem w omówieniu.

#include <chrono>
#include <mutex>
#include <condition_variable>
#include <Windows.h>
#include <ActivityCoordinator.h>
#include <wil/resource.h>

// To use ActivityCoordinator, we must link to the OneCoreUAP library.

#pragma comment(lib, "OneCoreUAP.lib")

using namespace std;
using namespace chrono;
using namespace wil;

// Declare RAII wrappers for the Activity Coordinator policy and subscription.
// These behave like traditional smart pointers and will call their associated
// API cleanup functions when they go out of scope.

typedef wil::unique_any<
        ACTIVITY_COORDINATOR_POLICY,
        decltype(&DestroyActivityCoordinatorPolicy),
        DestroyActivityCoordinatorPolicy>
    unique_policy;

typedef wil::unique_any<
        ACTIVITY_COORDINATOR_SUBSCRIPTION,
        decltype(&UnsubscribeActivityCoordinatorPolicy),
        UnsubscribeActivityCoordinatorPolicy>
    unique_subscription;

struct WORKER_CONTEXT {
    mutex ContextLock;
    unique_threadpool_work Worker;
    bool ShouldRun;
    bool IsRunning;
    bool IsComplete;
    std::condition_variable CompletionSignal;
};

_Requires_lock_held_(workerContext->ContextLock)
void
ResumeWorker(
    _In_ WORKER_CONTEXT* workerContext
    )
{
    workerContext->ShouldRun = true;
    if (!workerContext->IsRunning && !workerContext->IsComplete) {

        // No active workers, so start a new one.

        workerContext->IsRunning = true;
        SubmitThreadpoolWork(workerContext->Worker.get());
    }
}

void
DeferredWorkEventCallback(
    _In_ ACTIVITY_COORDINATOR_NOTIFICATION notificationType,
    _In_ void* callbackContext
    )
{
    WORKER_CONTEXT* workerContext = reinterpret_cast<WORKER_CONTEXT*>(callbackContext);

    // Use this callback thread to dispatch notifications to a worker thread
    // about whether or not it should process the next chunk of deferred work.

    // Note: Do not use this thread to perform your activity's workload.

    lock_guard<mutex> scopedLock(workerContext->ContextLock);
    switch (notificationType) {
    case ACTIVITY_COORDINATOR_NOTIFICATION_RUN:

        // Allow deferred work to be processed.

        ResumeWorker(workerContext);

        break;

    case ACTIVITY_COORDINATOR_NOTIFICATION_STOP:

        // Stop processing deferred work.

        workerContext->ShouldRun = false;

        break;

    default:
        FAIL_FAST();
        break;
    }
}

bool
TrainNextModelChunk(
    )
{
    //
    // Returns true if all work is completed, or false if there is more work.
    //

    return false;
}

void
DeferredModelTrainingWorker(
    _Inout_ PTP_CALLBACK_INSTANCE callbackInstance,
    _Inout_opt_ PVOID callbackContext,
    _Inout_ PTP_WORK work
    )
{
    // Threadpool callback instance and work are not needed for this sample.

    UNREFERENCED_PARAMETER(callbackInstance);
    UNREFERENCED_PARAMETER(work);

    WORKER_CONTEXT* workerContext = reinterpret_cast<WORKER_CONTEXT*>(callbackContext);
    bool workComplete = false;

    // Keep processing work until being told to stop or all work has been completed.

    while (true) {
        {
            lock_guard<mutex> scopedLock(workerContext->ContextLock);

            if (workComplete) {
                workerContext->IsComplete = true;
            }

            if (!workerContext->ShouldRun || workerContext->IsComplete) {
                workerContext->IsRunning = false;
                break;
            }
        }

        // TrainNextModelChunk returns true when there is no more work to do.

        workComplete = TrainNextModelChunk();
    }

    workerContext->CompletionSignal.notify_all();
}

int
__cdecl
wmain(
    )
{
    WORKER_CONTEXT workerContext;
    workerContext.ShouldRun = false;
    workerContext.IsRunning = false;
    workerContext.IsComplete = false;

    // Create the worker that will be started by our subscription callback.

    workerContext.Worker.reset(CreateThreadpoolWork(
        DeferredModelTrainingWorker,
        &workerContext,
        nullptr));
    RETURN_LAST_ERROR_IF_NULL(workerContext.Worker);

    // Allocate a policy suited for tasks that are best run when unlikely
    // to cause impact to the user or system performance.

    unique_policy policy;
    RETURN_IF_FAILED(CreateActivityCoordinatorPolicy(
        ACTIVITY_COORDINATOR_POLICY_TEMPLATE_GOOD,
        &policy));

    // The model training in this sample consumes GPU.
    // The GOOD policy template doesn't currently include the GPU resource. We
    // therefore customize the policy to include good GPU conditions to minimize
    // the impact of running our work.

    RETURN_IF_FAILED(SetActivityCoordinatorPolicyResourceCondition(
        policy.get(),
        ACTIVITY_COORDINATOR_RESOURCE_GPU,
        ACTIVITY_COORDINATOR_CONDITION_GOOD));

    // Subscribe to the policy for coordination notifications.

    unique_subscription subscription;
    RETURN_IF_FAILED(SubscribeActivityCoordinatorPolicy(
        policy.get(),
        DeferredWorkEventCallback,
        &workerContext,
        &subscription));

    // Destroy the policy because we no longer need it.

    policy.reset();

    // We want our task to complete within 48h, so we allocate 24h under our
    // ideal policy and before falling back to a downgraded policy.

    bool workerCompleted;

    {
        unique_lock<mutex> scopedLock(workerContext.ContextLock);
        workerCompleted = workerContext.CompletionSignal.wait_for(
            scopedLock,
            hours(24),
            [&workerContext] { return workerContext.IsComplete; });
    }

    if (workerCompleted) {

        // Since our work is complete, we should clean up our subscription by
        // unsubscribing. This would normally be handled quietly by our RAII
        // types, but we release them explicitly to demonstrate API flow for
        // developers manually managing resources.

        subscription.reset();
        return S_OK;
    }

    // We passed our soft deadline, so downgrade the policy and wait the
    // remaining 24h until our hard deadline has been reached. Since
    // Subscriptions and policies are independent of each other, we need to
    // create a new subscription with our downgraded policy to receive
    // notifications based on its configuration.
    // 
    // The downgraded policy uses medium conditions for all needed resources.
    // This gives us the best chance to run while helping to prevent us from
    // critically degrading the user experience, which we are more likely to do
    // when falling back to manual execution.

    RETURN_IF_FAILED(CreateActivityCoordinatorPolicy(
        ACTIVITY_COORDINATOR_POLICY_TEMPLATE_MEDIUM,
        &policy));

    RETURN_IF_FAILED(SetActivityCoordinatorPolicyResourceCondition(
        policy.get(),
        ACTIVITY_COORDINATOR_RESOURCE_GPU,
        ACTIVITY_COORDINATOR_CONDITION_MEDIUM));

    subscription.reset();
    RETURN_IF_FAILED(SubscribeActivityCoordinatorPolicy(
        policy.get(),
        DeferredWorkEventCallback,
        &workerContext,
        &subscription));

    {
        unique_lock<mutex> scopedLock(workerContext.ContextLock);
        workerCompleted = workerContext.CompletionSignal.wait_for(
            scopedLock,
            hours(24),
            [&workerContext] { return workerContext.IsComplete; });
    }

    // We passed our deadline, so unsubscribe and manually resume our task.

    subscription.reset();
    ResumeWorker(&workerContext);

    // We destroyed our subscription, so we wait indefinitely for completion as
    // there's nothing to pause execution of our task.

    unique_lock<mutex> scopedLock(workerContext.ContextLock);
    workerContext.CompletionSignal.wait(
        scopedLock,
        [&workerContext] { return workerContext.IsComplete; });

    return S_OK;
}

Omówienie interfejsu API koordynatora działań

interfejs API i terminologia koordynatora działań

Wybieranie odpowiednich zasad koordynatora działań