Laden...

Throttling -- Begrenzung der max. Anzahl an gleichzeitig ausgeführten asynchronen Vorgängen

Erstellt von gfoidl vor 6 Jahren Letzter Beitrag vor 6 Jahren 6.505 Views
gfoidl Themenstarter:in
6.911 Beiträge seit 2009
vor 6 Jahren
Throttling -- Begrenzung der max. Anzahl an gleichzeitig ausgeführten asynchronen Vorgängen

Beschreibung:

Mit dieser Erweiterungsmethode für System.Threading.Tasks.TaskFactory kann die max. Anzahl an gleichzeitigen asynchronen Vorgänge begrenzt werden. Sobald ein Task fertig ist, wird ein neuer gestartet, damit die max. Anzahl so lange es geht aufrecht erhalten wird.

Die Verwendung bietet sich v.a. bei asynchronen IO-Vorgängen wie Webrequests, Webservice-Aufrufen und Datenbank-Aktionen an, wenn viele gleichzeitig aber nicht alle auf einmal durchgeführt werden sollen.

Wenn beispielsweise eine Webrequest oft und maximal 18x gleichzeitig durchgeführt werden soll, so kann mein Beispielcode in Wie mehrere HttpWebRequest Parallel durchführen ?
mit dieser Erweiterungsmethode direkter und ohne Infrastrukturrauschen geschrieben werden.


public async Task<IList<string>> DownloadDataAsync(string url, int count, int maxParallelRequests = 18)
{
    using (HttpClient httpClient = new HttpClient())
    {
        // Ein Func die den Download-Task erzeugt, sobald die Func ausgeführt wird
        Func<string, Func<Task<string>>> taskFunc = u => () => httpClient.GetStringAsync(u);

        var resultList = new List<string>(count);
        var taskFuncs  = Enumerable.Repeat(url, count).Select(taskFunc);

        foreach (var completedTask in Task.Factory.Throttling(taskFuncs, maxParallelRequests))
        {
            string result = await completedTask.ConfigureAwait(false);

            resultList.Add(result);
        }

        return resultList;
    }
}

Außerdem ist diese Variante ressourcenschonender als der "triviale" Ansatz im verlinkten Code. Dort muss für Task.WhenAny und Task.WhenAll zu jedem Task immer wieder eine Continuation hinzugefügt werden (intern durch die Infrastruktur von Task.WhenXXX). Hier wird zu jedem Task nur einmal eine Continuation angehängt. Weiters ist keine List<Task> nötig der Task hinzugefügt und entfernt werden.


using System.Collections.Generic;
using System.Linq;

namespace System.Threading.Tasks
{
    public static class TaskFactoryExtensions
    {
        public static IEnumerable<Task<T>> Throttling<T>(this TaskFactory factory, IEnumerable<Func<Task<T>>> taskFuncs, int concurrencyLevel = int.MaxValue)
        {
            if (factory   == null) throw new ArgumentNullException(nameof(factory));
            if (taskFuncs == null) throw new ArgumentNullException(nameof(taskFuncs));

            return ThrottlingCore(taskFuncs.ToList(), concurrencyLevel);
        }
        //---------------------------------------------------------------------
        private static IEnumerable<Task<T>> ThrottlingCore<T>(List<Func<Task<T>>> taskFuncs, int concurrencyLevel)
        {
            var sources              = taskFuncs.Select(t => new TaskCompletionSource<T>()).ToList();
            var nextTaskEnumerator   = taskFuncs.GetEnumerator();
            var nextSourceEnumerator = sources.GetEnumerator();
            var syncRoot             = new object();

            Action<Task<T>> continuation = null;
            continuation = completed =>
            {
                lock (syncRoot)
                {
                    if (nextSourceEnumerator.MoveNext())
                    {
                        var source = nextSourceEnumerator.Current;

                        if (completed.IsFaulted)
                            source.TrySetException(completed.Exception.InnerExceptions);
                        else if (completed.IsCanceled)
                            source.TrySetCanceled();
                        else
                            source.TrySetResult(completed.Result);
                    }

                    if (nextTaskEnumerator.MoveNext())
                    {
                        Func<Task<T>> taskFunc = nextTaskEnumerator.Current;

                        taskFunc().ContinueWith(
                            continuation,
                            CancellationToken.None,
                            TaskContinuationOptions.ExecuteSynchronously,
                            TaskScheduler.Default);
                    }
                }
            };

            lock (syncRoot)
            {
                int counter = 0;
                while (counter++ < concurrencyLevel && nextTaskEnumerator.MoveNext())
                {
                    Func<Task<T>> taskFunc = nextTaskEnumerator.Current;

                    Task<T> task = taskFunc();
                    task.ContinueWith(
                        continuation,
                        CancellationToken.None,
                        TaskContinuationOptions.ExecuteSynchronously,
                        TaskScheduler.Default);
                }
            }

            return sources.Select(s => s.Task);
        }
    }
}

Cancellation ist in diesem Snippet nicht berücksichtig, kann jedoch leicht nachgerüstet werden.

Anmerkung: Inspiration zu diesem Snippet, das bei mir schon länger herumlag, hab ich aus Task-based Asynchronous Pattern Abschnitte Throttling / Interleaving

Schlagwörter: Task, Throttling, Begrenzung, Speedski, async/await

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"