Laden...

WCF: Großes Array übertragen (Streaming?)

Erstellt von AnTri vor 12 Jahren Letzter Beitrag vor 12 Jahren 13.867 Views
Hinweis von gfoidl vor 12 Jahren

Unten gibt es eine Komponente mit der jeder serialisierbarer Typ in einem IEnumerable<T> gestreamt werden kann.

A
AnTri Themenstarter:in
119 Beiträge seit 2009
vor 12 Jahren
WCF: Großes Array übertragen (Streaming?)

Hallo,

ich entwickle eine Client-Server-Anwendung. Der Server hat Kontakt zu einer Datenbank. Der Client "frägt" dann die Daten vom Server ab. Das Problem ist jetzt nur die Datenmenge. Die Ergebnisse der Datenbankabfrage können leicht mehrere tausend Zeilen sein, die ich um Client "transportieren" muss. Wie stelle ich das am besten an?
Ich habe es mit WCF probiert, stoße da aber schnell an das Limit. Außerdem wird mir die Anwendung während der Übertragung geblockt.
ich bin über jede Hilfe sehr dankbar.

Viele Grüße

AnTri

6.911 Beiträge seit 2009
vor 12 Jahren

Hallo AnTri,

Außerdem wird mir die Anwendung während der Übertragung geblockt.

Verwende asynchrone Vorgänge beim Client dann wird dieser nicht geblockt da die Übertragung in einem Hintergrundthread stattfindet und wenn die Daten da sind ein Ereignis gefeuert wird (das auch automatisch in den aufrufenden Context delegiert wird, also [FAQ] Controls von Thread aktualisieren lassen (Control.Invoke/Dispatcher.Invoke) wird automatisch erledigt). Das kannst du beim Erstellen der ServiceReference einstellen bzw auch nachher. Wenn du selbst einen Proxy geschrieben hast musst du dich selbst darum kümmern.

können leicht mehrere tausend Zeilen sein,

Müssen diese alle aufeinmal zum Client? Diese zB in einem DataGrid anzeigen bringt nix denn da ist der User überfordert. Das Thema haben wir aber schon sehr oft besprochen, siehe also Forensuche mit den Stichwörter Paging, Virtualization, etc.

Wenn es doch sein muss dann schraub die Quotas/Max-Werte in der config des Service hoch.

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

A
AnTri Themenstarter:in
119 Beiträge seit 2009
vor 12 Jahren

Vielen Dank für die schnelle Antwort. Ich hätte noch ein paar kurze Fragen:

Müssen diese alle aufeinmal zum Client?

Ja, den auf dem Client wird dann entwender ein Diagramm oder ein Datenfile erzeugt. Es handelt sich dabei um Zeitreihen von Messwerten.

Verwende asynchrone Vorgänge beim Client dann wird dieser nicht geblockt da die Übertragung in einem Hintergrundthread stattfindet und wenn die Daten da sind ein Ereignis gefeuert wird

Das heißt ich muss DualBinding verwenden, oder?

Ich werde mir das mit dem Paging mal anschauen. Bisher bin ich da noch nicht weitergekommen. Kann ich da bei der Übertragung einen "Fortschrittsbalken" anzeigen, dass der User weiß wie lange es noch dauert?

Gibt es vielleicht in WCF/.net 4 noch elegantere Lösungen für das Problem?

Danke!

6.911 Beiträge seit 2009
vor 12 Jahren

Hallo AnTri,

Das heißt ich muss DualBinding verwenden, oder?

Nein. Das geht mit jeder Bindung. Siehe angehängtes Bild. Oder bei svcutil entsprechendes Argument mitgeben.

Nachdem diese generiert wurde müssen diese auch verwendet werden 😉

Kann ich da bei der Übertragung einen "Fortschrittsbalken" anzeigen, dass der User weiß wie lange es noch dauert?

Das ginge über Streaming - ist aber aufwändiger. Ein "Marquee"-Fortschritt kannst du aber auch so anzeigen lassen.

Gibt es vielleicht in WCF/.net 4 noch elegantere Lösungen für das Problem?

Speziell für große Datenmenge hat sich nix geändert.

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

A
AnTri Themenstarter:in
119 Beiträge seit 2009
vor 12 Jahren

Hallo gfoidl,

vielen Dank für die Ansätze. Eine Frage habe ich noch:

Das ginge über Streaming

Ich habe schon Files über WCF gestreamt. Wenn ich auf dem "Server" meine Daten in einer List<> habe, muss ich dann zuerst ein File erzeugen (z.B. XML) oder kann ich direkt die List<> streamen?

Nochmals vielen Dank!

6.911 Beiträge seit 2009
vor 12 Jahren

Hallo AnTri,

es reicht ganz allgemein ein Stream, also würde ich hier einen MemoryStream verwenden. Die List<T> bekommt du mit dem BinaryFormatter in den Stream. Dabei ist darauf zu achten dass das T in der List<T> auch mit dem Serializable-Attribut markiert ist.

Der Weg über die Datei ist nicht notwendig.

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

1.346 Beiträge seit 2008
vor 12 Jahren

Hallo,

Ich glaube nicht, dass hier ein MemoryStream so sinnvoll ist. Immerhin scheinen die Daten so groß zu sein, dass es sich lohnt einen Fortschrittsbalken zu haben. Und dann sollte man doch eher darauf achten, sie nur einmal im Speicher zu haben, und nicht nochmal als Serialisierte Form. Die Datei scheint doch ganz vernünftig, oder, wenn das geht(ich kenne mich mit WCF nicht sonderlich gut aus) das ganze direkt in den Ausgabestream zu serialisieren.

Gruß pdelvo

6.911 Beiträge seit 2009
vor 12 Jahren

Hallo,

wenn das geht(ich kenne mich mit WCF nicht sonderlich gut aus) das ganze direkt in den Ausgabestream zu serialisieren.

Klar das ist das einzig vernüftige. Danke.

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

A
AnTri Themenstarter:in
119 Beiträge seit 2009
vor 12 Jahren

Hallo,

vielen Dank für Eure zahlreichen Antworten. Die Datenreihen sind (je Reihe) so zwischen 500k und 5MB groß. Daher finde ich die Möglichkeit eines Fortschrittbalken schon gut.

das ganze direkt in den Ausgabestream zu serialisieren

Wie wäre den da der Ansatz?

Vielen Dank AnTri

656 Beiträge seit 2008
vor 12 Jahren

Spricht was dagegen, das Datenfile am Server zu erzeugen, und das ganze dann per Stream zum Client zu schieben (damit kannst du dir dann auch den Fortschrittsbalken anzeigen lassen, wenn du eine entsprechende Stream-Implementierung verwendest, die dir Statusinformationen usw. liefert)?

Clientseitig generierst du dann basierend auf dem Datenfile dein Diagramm.

6.911 Beiträge seit 2009
vor 12 Jahren

Hallo BhaaL,

genau um Streaming gehts momentan 😉

Hallo AnTri,

es geht genau gleich wie beim File-Streamen nur dass halt keine FileStream verwendet wird. Da alle WCF-Beispiel davon ausgehen dass der Stream bekannt ist kannst du hier - wie ich oben vorgeschlagen haben - einen MemoryStream verwenden.

Oder - was ich eleganter finde - wie pdelvo vorgeschlagen hat: direkt in den Response schreiben. Dafür findet sich aber noch* keine Information im www.
Hol dir über den OperationContext.Current den aktuellen Context und dort gibts dann die Response(Stream)-Eigenschaft (hoffentlich, muss erst nachgucken) und in diese kannst du wie in jeden anderen Stream schreiben.
Damit der OperationContract von WCF erfüllt ist müssen die Attribute noch angepasst werden so dass das Request/Response-Verhalten korrekt vorgegaukelt wird.

Ich weiß, das hört sich ein wenig kompliziert an. Wenn du es nicht verstehst werde ich versuchen eine Demo zu basteln.

* durch die jetztige Antwort schon 😃

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

1.378 Beiträge seit 2006
vor 12 Jahren

Hallo AnTri,

Müssen diese alle aufeinmal zum Client?
Ja, den auf dem Client wird dann entwender ein Diagramm oder ein Datenfile erzeugt. Es handelt sich dabei um Zeitreihen von Messwerten.

evt. könnte man die Daten die fürs Diagramm notwendig sind noch einmal bereits Server-Seitig auf ein notwendiges Minimum aggregieren sodass evt. nur mehr ein paar Hundert Datenzeilen für das Diagramm geladen werden müssen.

Lg, XXX

A
AnTri Themenstarter:in
119 Beiträge seit 2009
vor 12 Jahren

Hallo,

Wenn du es nicht verstehst werde ich versuchen eine Demo zu basteln

Also ich wäre Dir für eine kleine Demo (um auf die richtige Spur zu kommen) unendlich dankbar. Ich habe mal gegoogelt, finde aber jetzt nicht so richtig etwas dass mir jetzt helfen könnte.

@xxxprod

Server-Seitig auf ein notwendiges Minimum aggregieren

das wird schon gemacht. In der Datenbank sind deutlich mehr Werte gespeichert als später übertragen werden...

Vielen Dank!

6.911 Beiträge seit 2009
vor 12 Jahren

Hallo AnTri,

mach ich, dauert aber noch ein wenig. Muss vorher was anderes machen.
Endlicher Dank genügt mir auch schon 😃

Siehe unten.

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

A
AnTri Themenstarter:in
119 Beiträge seit 2009
vor 12 Jahren

Vielen Dank für Deine Hilfe!!!

656 Beiträge seit 2008
vor 12 Jahren

genau um Streaming gehts momentan 😉

Hab ich gemerkt 😃
Mein Post ging eher in die Richtung was man übertragen sollte.
Nachdem er sowohl Datei als auch Diagramm will, wird wohl eins der beiden alle Daten haben, die drin sein sollen - und das würde ich übertragen plus das jeweils andere ableiten.

6.911 Beiträge seit 2009
vor 12 Jahren

Hallo,

bin nicht früher dazu gekommen und ich konnte auch nicht an einem Stück daran arbeiten. Daher erst jetzt die Antwort die ein wenig länger ausgefallen ist als ich ursprünglich gedacht habe.

So wie pdelvo vorgeschlagen geht es leider nicht so einfach. In den zugrunde liegenden NetworkStream kommt man nur sehr schwer dran und selbst dann muss man alles selbst umsetzen wie das ganze SOAP-Protokoll-Zeugs. Wenn man an diesem Punkt ankommt drängt sich - für mich unweigerlich - die Frage auf: Warum überhaupt WCF und nicht stattdessen mit TcpServer/TcpClient streamen? Da ist der Zugriff auf den Stream trivial. Wenn das also eine Alternative ist, wie zB in einem Intranet, dann würde ich das nehmen. Auf Server- und Clientseite entsprechende Methoden die dann jeweils die Serialisierung bzw. Deserialisierung durchführen und fertig.

Trotzdem hab ich eine Lösung gefunden wie mit WCF das umsetzbar ist 😃
Bevor ich aber erleutere wie das funktioniert noch ein paar Randbemerkungen:*MTOM sollte unbedingt verwendet werden, da es sich bereits ab ca. 1KB als effizienter herausstellt. Die übertragene Datenmange sinkt dabei um ~33%. Dies kann entwder in der *.confg wie folgt angegeben werden:


<?xml version="1.0" encoding="utf-8" ?>
<configuration>
    <system.serviceModel>
        <bindings>
            <basicHttpBinding>
                <binding messageEncoding="Mtom"
                         transferMode="Streamed" />
            </basicHttpBinding>
        </bindings>
    </system.serviceModel>
</configuration>

wobei das für .net 4.0 gilt, vorher muss der Binding ein Name gegeben werden und so entsprechend dem Endpunkt bekannt gemacht werden. In .net 4.0 wird das automatisch für alle BasicHttpEndpunkte angewandt (da hier für basicHttpBinding konfiguriert).
Oder per Code wie ich es auf Client-Seite gemacht habe:


BasicHttpBinding binding = new BasicHttpBinding
{
    TransferMode    = TransferMode.Streamed,
    MessageEncoding = WSMessageEncoding.Mtom
};

var factory = new ChannelFactory<IMyStreamingServiceChannel>(
    binding,
    new EndpointAddress("http://localhost:8085/streamingTest"));

*Es entsteht ein gewissen Overhead durch das Streaming. Ob es also insgesamt schneller/effizienter ist weiß ich nicht - ich kenn auch die Daten und die Zielsysteme nicht - und sollte jedenfalls evaluiert werden.

*Das was BhaaL in der letzten Antwort beschrieben hat sollte berücksichtigt werden. So könnte zB ein OperationContract angeboten der das fertige Bild überträgt und einer der die aufbereiteten Rohdaten überträgt.

*Wenn die Daten im Strom zum Client fließen bleibt natürlich noch die Frage wie die Daten zur Stromquelle kommen. Wenn sie alle aufeinmal in den Speicher geladen werden entsteht hier ein Bottleneck. Für die Aufgabe des Themas wird es wohl schon besser sein die Daten per DataReader zu lesen und dann sofort in den Strom zu schreiben.
Da die Daten über das Netzwerk übertragen werden könnte es passieren dass auf Datenbankseite ein Timeout passiert oder sonst was. Von daher macht es Sinn die Daten nicht direkt von der DB in den Stream zu schreiben sondern diese Schritte in einer Pipeline (auch als Producer/Consumer bezeichnet) durchzuführen. Somit werden die einzelenen Schritte - Lesen bzw. Ereugen der Daten und Schreiben in den Stream - entkoppelt und das ist bei IO-lastigen Aufgaben eigentlich immer sinnvoll. Im konkreten Fall ist der Producer der DataReader und der Consumer jener Teil der in den Strom schreibt.
Dies hat zusätzlich den Vorteil dass ein wiederverwendbarer Code erstellt werden kann. Und genau das habe ich gemacht. Etwas vergleichbares habe ich bei einer Recherche auch nicht gefunden und somit ist das hier die Prämiere für sowas 😃

Was muss also berücksichtigt werden um die Consumer-Seite umzusetzen? Die Anforderungen der WCF sind dass ein Stream zurückgegeben wird. Bei einem FileStream ist das kein Problem. Bei einem MemoryStream auch nicht. Dieser hat aber den Nachteil dass alle Daten aufeinmal geladen werden müssen und dann in den Speicher serialisiert gehören. Somit ist der doppelte Speicherplatz belegt. Das ist also auch nicht unbedingt von Vorteil.
Das Problem beim MemoryStream ist auch jenes: der Stream wird zurückgegeben und dann kann nicht mehr in ihn geschrieben werden. Ein Streaming von der ursprünglichen Quelle zur Senke ist somit nicht möglich.
Daher muss ein Stream gesucht werden der es gestattet an den Client ausgehändigt zu werden und gleichzeitig verfügbar ist um Daten in ihm zu schreiben. Diesen gibt es (in .net) aber nicht. Allerdings kann mittels PipeStreams das nachgebildet werden. Es wird ein anoynmer Pipeserver erstellt indem geschrieben werden kann und der auf dem WCF-Server verbleit, während der zugehörige Pipeclient zum WCF-Client ausgehändigt wird. Mit diesser Konstellation kann das gewünschte Verhalten nachgeahmt werden.
Dabei gebe ich aber zu bedenken dass der Overhead schon ziemlich groß wird: Pipeserver und -client erstellen + der übliche Overhead der beim Streaming gegenüber gepufferten Transfer entsteht.
Eine named Pipe könnte auch verwendet werden, allerdings müsste der Name dynamsich generiert werden damit mehrere gleichzeitige Zugriffe auf den WCF-Service möglich sind. Und anstatt den Namen dynamsich zu generieren kann gleich eine anonyme Pipe verwendet werden - die macht im Prinzip auch nix anderes.

Zuerst zeige ich eher beispielhaft wie das mit den PipeStreams umgesetzt wird und anschließend zeige ich die Implementierung der wiederverwendbaren Komponente.

Die ganze "Magie" passiert dabei in folgender Methode:


private static Stream GetWritableStream(Action<Stream> writeAction)
{
    AnonymousPipeServerStream pipeServer = new AnonymousPipeServerStream();
    Task.Factory.StartNew(() =>
    {
        using (pipeServer)
        {
            writeAction(pipeServer);
            pipeServer.WaitForPipeDrain();
        }
    }, TaskCreationOptions.LongRunning);

    string clientHandle = pipeServer.GetClientHandleAsString();
    return new AnonymousPipeClientStream(clientHandle);
}

In der Methode wird zuerst der PipeServerStream erstellt und ein asynchroner Vorgang der in diesen Stream schreibt. Das muss asynchron passieren, andernfalls wäre es nicht möglich zeitgleich den Stream an den Client zu schicken. Um eben den Stream zum Client senden zu können wird vom PipeServer der zugehörige ClientStream erstellt und dieser ist auch der Rückgabewert dieser Methode.
Anmerkung: der Task wird mit LongRunning erstellt, d.h. es wird ein dezidierter Thread dafür verwendet anstatt einer aus dem ThreadPool. Dies v.a. deshalb das die Operation für das Streaming wohl lange dauern soll - sonst wäre es eh sinnlos - und deshalb der ThreadPool nicht verhungern soll.

Die Verwendung dieser Methode in einer WCF-Service-Methode wird nachfolgend gezeigt:


public Stream GetStream()
{
    Stream pipeStream = GetWritableStream(s =>
    {
        //StreamWriter sw = new StreamWriter(s);
        //sw.AutoFlush = true;
        //for (int i = 0; i < 100; ++i)
        //{
        //    Thread.Sleep(200);
        //    sw.WriteLine("juhu #{0,2}", i);
        //}

        // Die Position wird im Stream wird einfach vorgerückt und
        // somit kann der BinaryFormatter mehrmals hintereinander
        // schreiben.
        BinaryFormatter binFormatter = new BinaryFormatter();
        for (int i = 0; i < 5; ++i)
        {
            TestData testData = new TestData { Name = "test " + i.ToString() };
            binFormatter.Serialize(s, testData);
        }
    });

    EventHandler handler = null;
    handler = (s, e) =>
    {
        OperationContext.Current.OperationCompleted -= handler;
        pipeStream.Dispose();
    };
    OperationContext.Current.OperationCompleted += handler;

    return pipeStream;
}

Im oberen Teil wird die vorher vorgestellt Methode verwendet und dieser eine anonyme Methode übergeben welche (asynchron, siehe oben) in den (Pipeserver-) Stream schreibt. Im auskommentierten Teil ist gezeigt wie mit einem StreamWriter in den Stream geschrieben werden kann. Im nicht auskommentierten Teil schreibe ich beispielhaft mit dem BinaryFormatter einen Objektgraphen (hier: nur ein Objekt und die Klasse ist in der Contract-Assembly für den Service definiert, da sie WCF-Server und -Client bekannt sein muss). Dieses Vorgehen unterstützt auch weitestgehend streaming, da der BinaryFormatter direkt in den Stream schreibt ohne die Ausgabe der Serialisierung komplett zu puffern.

Wichtig ist dass am Ende der Operation die Ressourcen freigegeben werden. Hier ist das besonders wichtig das anonyme Pipes verwendet werden und der GC bzw. der Finalizer einen (schweren) Fehler wirft wenn das nicht korrekt geschlossen wird. Hierzu wird das OperationCompleted-Ereignis des aktuellen OperationContext registiert und im Handler der Stream geschlossen. Wichtig ist auch dass der Handler wieder deregistriert wird, sonst kann ein Speicherleck entstehen. Das ist v.a. wichtig wenn der Handler eine statische Methode ist und da der Compiler anonyme Methoden so generiert trifft dies zu. Der OperationContext hat somit eine aktive Referenz zum Handler und kann nicht freigegeben vom GC freigegeben werden bis der Handler freigegeben wird -> dieser ist statisch, also erst beim entladen der AppDomain -> ist die Default-Domain -> also erst am Programmende.

Somit ist das Beispiel funktionsfähig, aber das weitere Ziel soll sein eine wiedervewendbar Komponente zu haben. Die Rahmenbedingung dazu wurde weiter oben gelegt: ein Consumer in einer Pipeline der die in den Stream schreibt. Für die Umsetzung der Pipeline gibt es praktisch zwei Möglichkeiten. Die SyncQueue <T> - Eine praktische Job-Queue oder ab .net 4.0 die BlockingCollection<T>. Nachfolgend verwende ich letztere, der Code sollte sich aber leicht auf erstere umschreiben lassen. Für nähere Info zu Producer/Consumer mit .net 4.0 verweise ich auf Patterns for Parallel Programming with the .NET Framework.

Damit jeder beliebige Typ gestreamt werden kann hab ich auch einen eigenen Serializer geschrieben der speziell für diesen Anwendungsfall entwickelt wurde. Die einzige Einschränkung für den Typ ist dass dieser als serialisierbar gekennzeichnet sein muss -> [Serializable]-Attribut muss vorhanden sein.
Für die Verwendung bietet diese Klasse die beiden Methoden Write (auf Serverseite) und Read (auf Clientseite) und schaut wie folgt aus:


/* Copyright © Günther M. FOIDL 2011
\*
\* Lizenz: Common Development and Distribution License (CDDL)
\* [URL]http://www.opensource.org/licenses/cddl1.php[/URL]
\*/

using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.IO;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;

namespace gfoidl.WcfService.Utils.Streaming
{
    public class WcfStreamingSerializer<T> : DisposableObject
    {
        private readonly BinaryFormatter _binFormatter = new BinaryFormatter();
        private readonly Stream          _stream;
        //---------------------------------------------------------------------
        public Stream Stream { get { return _stream; } }
        //---------------------------------------------------------------------
        public WcfStreamingSerializer(Stream stream)
        {
            Contract.Requires<ArgumentNullException>(stream != null);
            //-----------------------------------------------------------------
            _stream = stream;
        }
        //---------------------------------------------------------------------
        public void Write(T item)
        {
            this.ThrowIfDisposed();
            _binFormatter.Serialize(_stream, item);
            _stream.Flush();
        }
        //---------------------------------------------------------------------
        public IEnumerable<T> Read()
        {
            this.ThrowIfDisposed();
            while (true)
            {
                // yield return darf as designed nicht in einem try-catch sein
                // daher das Flag.
                bool isFaulty = false;
                T value       = default(T);
                try
                {
                    value = (T)_binFormatter.Deserialize(_stream);
                }
                // Mir ist nix schöneres eingefallen um das Ende des Streams zu
                // erkennen. Das CanSeek, etc. nicht unterstützt wird geht nicht viel.
                catch (SerializationException)
                {
                    isFaulty = true;
                }

                if (isFaulty) yield break;
                else yield return value;
            }
        }
        //---------------------------------------------------------------------
        #region IDisposable Members
        protected override void DisposeCore()
        {
            _stream.Dispose();
        }
        #endregion
    }
}

Bevor ich zur "StreamingPipeline" schreit noch kurz wie somit die Implementierung des OperationContracts ausschaut. Gezeigt wird das am Beispiel einer Text-Datei bei welcher der Inhalt zeilenweise gestreamt wird.


public Stream GetBigFile()
{
    StreamingPipeline<string> streamingPipeline = new StreamingPipeline<string>();
    HandleCompletion(streamingPipeline);
    return streamingPipeline.StartPipeline(File.ReadLines("lorem.txt"));
}
//---------------------------------------------------------------------
private static void HandleCompletion(IDisposable disposable)
{
    EventHandler handler = null;
    handler = (s, e) =>
    {
        Debug.WriteLine("OperationCompleted");
        OperationContext.Current.OperationCompleted -= handler;
        disposable.Dispose();
    };
    OperationContext.Current.OperationCompleted += handler;
}

Also ziemlich klar zu verwenden. Wichtig dabei ist dass der Methode StartPipeline ein IEnumerable<T> übergeben wird. Mit dem DataReader und yield return sollte das kein Problem sein.
Auch die Verwendung am Client ist ziemlich straight-forward:


using (var proxy  = new MyStreamingServiceProxy())
using (var stream = proxy.GetBigFile())
using (var sr     = new WcfStreamingSerializer<string>(stream))
    foreach (var value in sr.Read())
        Console.WriteLine(value);

So - nun endlich die Klasse StreamingPipeline:


/* Copyright © Günther M. FOIDL 2011
\*
\* Lizenz: Common Development and Distribution License (CDDL)
\* [URL]http://www.opensource.org/licenses/cddl1.php[/URL]
\*/

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.IO;
using System.IO.Pipes;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace gfoidl.WcfService.Utils.Streaming
{
    public class StreamingPipeline<T> : DisposableObject
    {
        private readonly BlockingCollection<T>   _buffer = new BlockingCollection<T>();
        private readonly CancellationTokenSource _cts = new CancellationTokenSource();
        //---------------------------------------------------------------------
        public Stream StartPipeline(IEnumerable<T> source)
        {
            Contract.Requires<ArgumentNullException>(source != null);
            //-----------------------------------------------------------------
            this.ThrowIfDisposed();

            AnonymousPipeServerStream pipeServer = new AnonymousPipeServerStream();
            Task producer = this.CreateAndStartProducer(source);
            Task consumer = this.CreateAndStartConsumer(pipeServer);

            // Eine andere Art der Fehlerbehandlung geht hier wegen dem
            // statuslosen Zustand von WCF nicht. Die Message würde nie zum
            // Client kommen wegen der Threads.
            // Dieser bemerkt es aber wenn die Daten nicht korrekt sind sowieso
            // (irgendwie ;-)).
            producer.ContinueWith(
                t => Trace.TraceError(t.Exception.InnerException.ToString()),
                TaskContinuationOptions.OnlyOnFaulted);
            consumer.ContinueWith(
                t => Trace.TraceError(t.Exception.InnerException.ToString()),
                TaskContinuationOptions.OnlyOnFaulted);

            string clientHandle = pipeServer.GetClientHandleAsString();
            return new AnonymousPipeClientStream(clientHandle);
        }
        //---------------------------------------------------------------------
        public void CancelPipeline()
        {
            this.ThrowIfDisposed();
            _cts.Cancel();
        }
        //---------------------------------------------------------------------
        private Task CreateAndStartProducer(IEnumerable<T> source)
        {
            CancellationToken token = _cts.Token;
            return Task.Factory.StartNew(() =>
            {
                try
                {
                    foreach (T item in source)
                    {
                        if (token.IsCancellationRequested) break;
                        _buffer.Add(item, token);
                    }
                }
                catch (OperationCanceledException) { }
                catch (Exception)
                {
                    _cts.Cancel();
                    throw;
                }
                finally
                {
                    _buffer.CompleteAdding();
                    Debug.WriteLine("Producer finished.");
                }
            }, token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
        }
        //---------------------------------------------------------------------
        private Task CreateAndStartConsumer(AnonymousPipeServerStream pipeServer)
        {
            CancellationToken token = _cts.Token;
            return Task.Factory.StartNew(() =>
            {
                var serializer = new WcfStreamingSerializer<T>(pipeServer);
                try
                {
                    foreach (T item in _buffer.GetConsumingEnumerable())
                    {
                        if (token.IsCancellationRequested) break;
                        serializer.Write(item);
                    }
                }
                catch (OperationCanceledException) { }
                catch (Exception)
                {
                    _cts.Cancel();
                    throw;
                }
                finally
                {
                    //serializer.Dispose();     hier nicht!! ist piperServer.Dispose!
                    pipeServer.WaitForPipeDrain();
                    pipeServer.Dispose();
                }
            }, token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
        }
        //---------------------------------------------------------------------
        #region IDisposable Members
        protected override void DisposeCore()
        {
            // Denn Buffer leeren und ev. die restlichen Items im
            // Buffer disposen:
            if (!_cts.IsCancellationRequested)
            {
                _buffer.CompleteAdding();
                _cts.Cancel();
            }

            // Hier ist GetConsumingEnumerable nicht notwendig denn
            // IsAddingCompleted = true.
            var disposableElements = _buffer
                .Select(item       => item as IDisposable)
                .Where(item        => item != null);

            foreach (IDisposable item in disposableElements)
                item.Dispose();

            _buffer.Dispose();
            _cts.Dispose();
        }
        #endregion
    }
}

Jetzt hoffe ich dass in der Beschreibung nix vergessen wurde. Sonst einfach nachfragen.

Angehängt die VS-Solution mit der ich das probiert und erarbeitet habe. Die Beispiel-Klassen sind frei verwendbar. Die Klassen StreamingPipeline<T> und WcfStreamingSerializer<T> stehen jedoch unter der Common Development and Distribution License (CDDL-1.0) insbesondere dürfen die Namespaces und Bezeichner nicht geändert werden. Das hab ich mir wohl verdient 😃
Ich hätte auch den WCF: Basisklasse für einen Proxy verwenden können, darauf habe ich aber verzichtet damit der Aufwand zum Betrachten des Beispiels nicht zu groß wird. Der Text ist eh schon lang 😉 In einer produktiven Umgebung würde ich jedoch die Basisklasse verwenden (bzw. ein leichte Abwandlung davon).

Anmerkung 1: Ab und zu kommt es zu einer "SEHException" beim Versuch ein Safe-Handle freizugeben. Die Ursache dafür habe ich noch nicht erforscht und behoben auch nicht. Höchstwahrscheinlich das am PipeStream. Jedenfalls kommt es dann zu einem Deadlock in VS bzw. wenn ein Debugger angehängt ist 😦
Vllt. weiß ja jemand warum das so ist od. findet es heraus. Es wäre dann nett wenn er das hier als Antwort reinschreibt.

Anmerkung 2: In den Beispielen kann es den Anschein haben dass die Daten nicht gestreamt werden wenn man mit dem Debugger durchgeht. Tatsächlich werden die Daten schon gestreamt, dieser Anschein kommt aber zu Stande da trotz Streaming mehrere Buffer im Spiel sind wie zB jene der Netzwerkübertragung.

mfG Gü

WCF, Streaming, Pipe, PipeStream, AnonymousPipe, Pipeline, Producer/Consumer, Producer, Consumer, Speedski

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

A
AnTri Themenstarter:in
119 Beiträge seit 2009
vor 12 Jahren

Hallo gfoidl,

ich bin fast sprachlos...

Vielen Dank für die Hilfe. Da ich die nächsten Tage unterwegs sein werde, komme ich jetzt nicht sofort dazu Deine Umsetzung zu implementieren. Allerdings habe ich das Projekt mal kurz 😃 ausprobiert. Im Prinzip scheint es zu laufen, ich bekomme aber beim Client ständig Asserts.

Ich denke ich melde mich auf jeden Fall nochmals, sobald ich das ganze Testen kann.
Nochmald danke...

AnTri

6.911 Beiträge seit 2009
vor 12 Jahren

Hallo AnTri,

ich bekomme aber beim Client ständig Asserts.

Das könnte von den CodeContracts sein die ich verwendet habe. Wenn du diese nicht hast ersetz diese einfach durch throw new ArgumentNullException (und die anderen Exception-Typen die auftreten).

Deine

deine geht auch -> ich bevorzuge geduzt zu werden.

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

A
AnTri Themenstarter:in
119 Beiträge seit 2009
vor 12 Jahren

Hallo gfoidl,

also ich hab nun deine zwei Klassen bei mir testweise implementiert. Beim erkennen des Streamende muss ich noch ein bisschen hirnen, da ich hier noch immer eine Exception bekomme, obwohl Du/ich diese doch eigentlich abfange...

Ich hab jetzt noch eine Frage 😉
Das erzeugen der Liste (teilweise müssen vorher berechnungen angestellt werden, wenn nicht die Daten direkt aus der DB gelesen werden) kann unter Umständen einige Zeit in Anspruch nehmen. Meine Idee wäre jetzt, dass der Client an den Server den Auftrag gibt, die Liste zu erzeugen (berechnen), irgendwie? (evtl. Callback) Bescheid bekommt wann die Liste bereitsteht und dann den Stream "abholt".
Würdest Du hier mit Sessions arbeiten oder hast du einen anderen (wahrscheinichen besseren) Vorschlag?

Vielen Dank!

6.911 Beiträge seit 2009
vor 12 Jahren

Hallo AnTri,

ad Fehler:
stell in VS unter Debug -> Exception -> Thrown ein dann siehst du welcher Fehler es ist. Wenns mit den Klassen was zu tun lass es mich bitte wissen - dann muss ich das ausbessern.

ad weitere Frage:
bei Callbacks müssen zwangsläufig Sessions vorhanden, weswegen zB BasicHttpBinding wegfällt.
Bei der Variante mit den Callback würde ich aber nicht die gesamte Liste übertragen, sondern sofern möglich jedes Item für sich oder noch besser batch-weise. Wenn die Reihenfolge erhalten bleiben muss - diese ist bei der batchweisen Übertragung ja nicht sichergestellt - muss die Liste in eine neue Klasse gewrappt werden in der auf eine fortlaufende "Pakte-Nummer" enthalten ist und dann am Client nach dieser sortiert.

Wenn nur die gesamte Liste geht wäre das möglich. Aber ich habe noch nie Streaming mit Callbacks probiert - kann dazu also gar nicht sagen ob das geht und welche Probleme auftreten können. Aber probiers mal und berichte. Dann kann ich was von dir lernen 😃

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

A
AnTri Themenstarter:in
119 Beiträge seit 2009
vor 12 Jahren

Hallo,

die Ausnahme tritt in der Klasse "WcfStreamingSerializer" in der Methode Read auf dem Client auf (z.B. wenn IEnumerable<T> keine Elemente hat, oder eben alle schon gelesen sind.


try
{
   value = (T)_binFormatter.Deserialize(_stream);
}
// Mir ist nix schöneres eingefallen um das Ende des Streams zu
// erkennen. Das CanSeek, etc. nicht unterstützt wird geht nicht viel.
                
catch (SerializationException execp)        
{                  
    Console.WriteLine(execp.Message);
    isFaulty = true;
}

Fehlermeldung:
Eine Ausnahme (erste Chance) des Typs "System.Runtime.Serialization.SerializationException" ist in mscorlib.dll aufgetreten.
Das Streamende wurde erreicht, bevor die Verarbeitung abgeschlossen wurde.

Nochmal auf meine Frage. Wie würdest Du das Problem lösen?
Seither (bisher war der "Server" eine eigene Anwendung mit UI) wurde für die Berechnung ein extra Thread verwendet und nach dessen Ende die Liste im Diagramm dargestellt.

Im Fall des Client soll dieser zuerst eine ID an den Server senden, dieser berechnet die Liste (wahrscheinlich auch in einem Thread) und gibt dem Client Bescheid die Liste jetzt "anzufordern"...

6.911 Beiträge seit 2009
vor 12 Jahren

Hallo AnTri,

diese Fehlermeldung ist normal und dafür hab ich ja das catch eingebaut um genau diese zu fangen. Sozusagen: works as designed 😉

Wie würdest Du das Problem lösen?

Hab ich eh oben geschrieben, allerdings ein paar Optionen offen gelassen und indirekt gefragt

nicht die gesamte Liste übertragen, sondern sofern möglich jedes Item für sich oder noch besser batch-weise.

Ginge das? Weiter oben hast du geschrieben

Die Datenreihen sind (je Reihe) so zwischen 500k und 5MB groß.

also würde ich jede Datenreihe per Callback übertragen. Einen Fortschritt kannst du so auch an die UI melden. Streaming ist dabei nicht unbedingt notwendig. MTOM aber rentiert sich schon (ist aber 1KB binär besser).

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

A
AnTri Themenstarter:in
119 Beiträge seit 2009
vor 12 Jahren

Hallo,

nicht die gesamte Liste übertragen, sondern sofern möglich jedes Item für sich oder noch besser batch-weise.

vielleicht bin ich jetzt ein bisschen blöd...

Wenn ich Dich richtig vestehe meinst du, dass ich für jedes Elenent der Liste eine Anfrage an den Server stelle.

  1. Anfrage des Client an den Server (CreateList)-> Liste wird generiet
  2. Übertragung des ersten Item (GetItem) -> wie komme ich jetzt wieder an Liste (genau des Clients) aus dem vorhergehenden Aufruf?

daher war mein Gedanke:
Wenn ich die Items der Liste einzeln übertrage brauch ich ja auf jeden die Sessions, oder?

6.911 Beiträge seit 2009
vor 12 Jahren

Hallo AnTri,

Code sagt mehr als 1000 Wörte:


// Server:
public void AufrufvomClientDieDatenZuSenden(...)
{
    ICallback callback = OperationContext.Current.GetCallbackChannel...
    IEnumerable<T> daten = HoleDaten(...);
    foreach (T item in daten)
    {
        callback.SendeItemZumClient(item);
    }
}

Hier kann statt HoleDaten auch wieder ein Producer/Consumer erstellt werden. Da du sagst dass die Daten aufbereitet werden müssen kann das von Vorteil sein.

Wenn ich Dich richtig vestehe meinst du, dass ich für jedes Elenent der Liste eine Anfrage an den Server stelle.

Client -> Server 1x
Server -> Client nx per Callback

Wenn ich die Items der Liste einzeln übertrage brauch ich ja auf jeden die Sessions, oder?

Bei Callbacks zwangsläufig ja. Siehe oben.

Hinweis: Die ganzen Varianten machen aber wirklich nur Sinn wenn die Daten wirklich groß sind und es sich beim Testen als besser herausgestellt hat. Sonst ist "normale" Kommunikation vorteilhafter.

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

A
AnTri Themenstarter:in
119 Beiträge seit 2009
vor 12 Jahren

Hallo,
dann siehst du also in diesem Fall auch keine andere Möglichkeit als das Problem mit Callback und Sessions zu lösen, oder?

6.911 Beiträge seit 2009
vor 12 Jahren

Hallo AnTri,

Callback und Session gehören unweigerlich zusammen!

Andere Möglichkeiten sehe ich schon - siehe gesamten Thread hier 😃
und

es sich beim Testen als besser herausgestellt hat.

Mehr kann ich ohne exakte Kenntnis auch nicht sagen - und selbst dann müssten die Varianten probiert werden um einen Sieger ermitteln zu können. Einen Königsweg gibt es halt nicht, alles ist problemspezifisch.

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"