myCSharp.de - DIE C# und .NET Community
Willkommen auf myCSharp.de! Anmelden | kostenlos registrieren
 
 | Suche | FAQ

» Hauptmenü
myCSharp.de
» Startseite
» Forum
» FAQ
» Artikel
» C#-Snippets
» Jobbörse
» Suche
» Regeln
» Wie poste ich richtig?
» Forum-FAQ

Mitglieder
» Liste / Suche
» Wer ist wo online?

Ressourcen
» openbook: Visual C#
» openbook: OO
» Microsoft Docs

Team
» Kontakt
» Übersicht
» Wir über uns

» myCSharp.de Diskussionsforum
Du befindest Dich hier: Community-Index » Diskussionsforum » Entwicklung » Basistechnologien und allgemeine .NET-Klassen » Mehrere Enumerables ohne großen Buffer zusammenfassen
Letzter Beitrag | Erster ungelesener Beitrag Druckvorschau | Thema zu Favoriten hinzufügen

Antwort erstellen
Zum Ende der Seite springen  

Mehrere Enumerables ohne großen Buffer zusammenfassen

 
Autor
Beitrag « Vorheriges Thema | Nächstes Thema »
Uwe81 Uwe81 ist männlich
myCSharp.de-Mitglied

Dabei seit: 26.07.2008
Beiträge: 282
Entwicklungsumgebung: Visual Studio 2010/13
Herkunft: Ludwigshafen


Uwe81 ist offline

Mehrere Enumerables ohne großen Buffer zusammenfassen

Beitrag: beantworten | zitieren | editieren | melden/löschen       | Top

Hallo,

Ich habe ein Array von Enumerables

C#-Code:
IEnumerable<T>[] sources;

Typischerweise iterieren diese Enumerables nicht schnell, sind z.B. Dateien aus einem Document Management System. Der Overhead ist hoch, sodass sich wegen der hohen Latenz Parallelisierung lohnen würde, z.B. weil ich bei drei verschiedenen Repositories Dokumente runterlade.

Das Interface gibt mir für ein Repository alle Dokumente als IEnumerable<T>


Jetzt suche ich ein Enumerable, was sich wiefolgt berechnet.
Für jedes Sourceelement gibt es einen kleinen Buffer.
Die Enumerables werden parallel durchlaufen, d.h. wenn der Buffer für ein Enumerable nicht mehr gefüllt ist, wird das nächste Element bereits abgerufen.
Wenn ich für das "merged Enumerable" das nächste Element abfrage, erhalte ich irgend eines aus den buffern der unterliegenen enumerables, oder halt das nächste was da ist.


Ich bin neun in Tasks, PLINQ und parallel foreach. Daher verliere ich mich bei der Implementierung.

Das beste was ich bisher hinbekommen habe ist eine manuelle Implementierung mit Blocking Collections (noch nicht getestet)
Geht das wirklich nicht eleganter?

C#-Code:
        public int MaxCapacity { get; set; }
        public IList<IIterateStrategy> IterateStrategies { get; set; };

        public IEnumerable<DSDownloadEntry> Iterate() {
            var collection = new BlockingCollection<DSDownloadEntry>(MaxCapacity);
            var tasks = new Task[IterateStrategies.Count];
            for(int i = 0; i<IterateStrategies.Count; i++) {
                int index = i;
                var strategy = IterateStrategies[index];
                tasks[i] = Task.Factory.StartNew(() => {
                    foreach (var item in IterateStrategies[index].Iterate()) {
                        collection.Add(new DSDownloadEntry() { Container = item });
                    }
                });
            }
            Task.Factory.ContinueWhenAll(tasks, _ => collection.CompleteAdding());
            return collection.GetConsumingEnumerable();
        }

Ergänzung:

Ich habe jetzt nochmal PLINQ versucht:

C#-Code:
    class Producer {
        public string Prefix;
        public int Count;

        public IEnumerable<string> GetResults() {
            Random random = new Random();
            for (int i = 0; i < Count; i++) {
                var text = string.Format("{0}{1:000000}", Prefix, i);
                Console.WriteLine("Producing " + text);
                yield return text;
            }
            Console.WriteLine("############### DONE " + Prefix);
        }
    }
    class Program {
        public static void Main() {
            var producers = new[] {
                new Producer() {Prefix = "A", Count = 100000},
                new Producer() {Prefix = "B", Count = 100000},
                new Producer() {Prefix = "C", Count = 100000},
            };

            var merged = from p in producers.AsParallel()
                         from r in p.GetResults()
                         select r;

            foreach (var r in merged) {
                Console.WriteLine("Consumint " + r);
            }
        }
    }

Das Problem hier ist, das der Producer bereits weiterläuft, auch wenn nur wenig abgerufen wird.
Eine ergänzung "AsParallel().WithMergeOptions" hat nichts verändert.

Mein Wunsch wäre eben, dass pro Producer nur wenige Einträge gebuffert werden.

Dieser Beitrag wurde 2 mal editiert, zum letzten Mal von Uwe81 am 06.09.2018 11:09.

05.09.2018 23:00 E-Mail | Beiträge des Benutzers | zu Buddylist hinzufügen
witte
myCSharp.de-Mitglied

Dabei seit: 03.09.2010
Beiträge: 920


witte ist offline

Beitrag: beantworten | zitieren | editieren | melden/löschen       | Top

Hallo,
fassen wir mal zusammen:
* du hast mehrere Producer welche Objekte aufwändig von jeweils einer zugeordneten Datenquelle abrufen.
* diese Producer sollen vorab Objekte laden da dies lange dauern kann. Sie sollen aber nicht zuviele Objekte laden das dies zuviel Speicher verbrauchen kann.
* ein oder mehrere Konsumenten verarbeiten diese Objekte in dem die verschiedenen Ladepuffer zusammengeführt werden.
Was kann man machen?
* vllt  TPL Dataflow verwenden. Die Idee dort ist die Datenströme wie ein Fließband/Materialflußanlage aufzuteilen. Es gibt dort z.B. ActionBlocks die zu einem Ganzen zusammengesteckt werden können und die auch begrenzt/gedrosselt werden können. Hier noch  ein Artikel mit einer Beispielimplementierung.
* RX / Reactive Extensions / Reactive Framework: Dort werden Actionen definiert die ausgeführt werden wenn ein Ereignis (ein Dokument wurde heruntergeladen) eingetreten ist.
* self-made: Task.WhenAny, Concurent Queues ...

Dieser Beitrag wurde 2 mal editiert, zum letzten Mal von witte am 06.09.2018 11:48.

06.09.2018 11:42 E-Mail | Beiträge des Benutzers | zu Buddylist hinzufügen
Programmierhans
myCSharp.de-Poweruser/ Experte

avatar-1651.gif


Dabei seit: 05.04.2005
Beiträge: 4.221
Entwicklungsumgebung: VS2003-VS2013 / SAP WebIDE
Herkunft: Zentralschweiz


Programmierhans ist offline

Beitrag: beantworten | zitieren | editieren | melden/löschen       | Top

(ich habe nicht alles im Detail durchgelesen).

Aber wie wäre es mit einem ganz anderen Ansatz.

Mit einem ContinueWith.

 https://msdn.microsoft.com/de-de/library...(v=vs.110).aspx

Die Objekte würden sich dann selber für den nächsten Prozesschritt anmelden sobald sie verfügbar sind.
07.09.2018 11:24 E-Mail | Beiträge des Benutzers | zu Buddylist hinzufügen
Baumstruktur | Brettstruktur       | Top 
myCSharp.de | Forum Der Startbeitrag ist älter als 2 Jahre.
Der letzte Beitrag ist älter als 2 Jahre.
Antwort erstellen


© Copyright 2003-2020 myCSharp.de-Team | Impressum | Datenschutz | Alle Rechte vorbehalten. | Dieses Portal verwendet zum korrekten Betrieb Cookies. 20.10.2020 08:25