Laden...

Lesen von Live Datenstrom eines Drucksensors

Erstellt von Xodarap93 vor 6 Jahren Letzter Beitrag vor 6 Jahren 7.154 Views
X
Xodarap93 Themenstarter:in
19 Beiträge seit 2017
vor 6 Jahren
Lesen von Live Datenstrom eines Drucksensors

Guten Tag,
ich beschäftige mich erst seit kurzem mit C#, habe aber schon jetzt großen Spaß daran.
Ich bin Student und aktuell in einer Firma als Praktikant tätig.

Meine Frage hier dreht sich um das Projekt an dem ich gerade arbeite. Meine direkten Arbeitskollegen haben leider wenig Wissen im Bereich C#, weshalb ich diese nicht fragen kann. Ich benutze dennoch C#, da bereits ein großteil des Programms von 2-3 Praktikantengenerationen vor mir erstellt wurde und ich es bloß um einige Funktionen erweitern muss.

Da natürlich alles unter NDA steht kann ich wenn nur exemplarische Probleme als Code mitteilen. Aber ich denke/hoffe, dass man mir hier vorerst auch einfach im Dialog über das generelle Problem helfen kann.

Darum geht es:
Das bestehende Programm kann Werte von einem Drucksensor A lesen. Der Drucksensor schickt jeweils nach einer Anfrage einen Druckwert zurück und kann dabei mit bis zu 40Hz arbeiten.
Für unsere heutigen Zwecke ist das allerdings viel zu langsam. Wir haben jetzt einen neuen Drucksensor B, welcher mit bis zu 250Hz arbeitet, diese Rate wollen wir maximal ausnutzen.
Standardmäßig arbeitet Sensor B aber in einem Burst modus, heißt er sendet ohne Anfrage ununterbrochen Druckwerte. Er lässt sich allerdings auch in den gleichen Modus wie Sensor A einstellen und sendet dann nur noch auf Anfrage.
Zusätzliches, notwendiges Feature ist, dass die Druckwerte live in einem Graph dargestellt werden.

Mein bisheriger Lösungsversuch:
Ich habe den Sensor B auf den Anfrage Modus geschaltet, wodurch er sich identisch wie Sensor A betreiben lässt. Das waren nur geringe Anpassungen und alles lief. Allerdings gibt es eine große Limitierung.
Und zwar ist das Programm timerbasiert, also es läuft ein timer mit und bei jedem Event wird ein Druckwert angefragt, gespeichert und einem Zeitpunkt zugeordnet. Allerdings habe ich festgestellt, dass diese Timerevents von der System clock abhängig sind. Auf meinem Windows 7 Rechner beträgt diese etwa 15,6ms. Ich kann also max. 64Hz erreichen. Das ist bei weitem nicht genug. In diesem Bereich funktioniert aber schonmal alles.

Am liebsten würde ich den Burst-modus des Sensors verwenden, die Druckwerte live einlesen und verarbeiten.
Ich habe schon versucht pro timer event mehrere Druckwerte einzulesen, während ich den Burst modus verwende. Das hat mir aber die Zeitzuordnung der Druckwerte zerhauen.

Ich hoffe, dass die Angaben reichen um einen Interessanten Austausch zu erhalten.
Was würdet ihr mir empfehlen?
Wie kann ich am besten Daten, die mit 250Hz übertragen werden, in echtzeit lesen, speichern und zeitlich in einem echtzeit chart darstellen?

Beste Grüße und im vorraus schon vielen Dank

16.807 Beiträge seit 2008
vor 6 Jahren

In Echtzeit auf Windows ohne Echtzeit-CPU gar nicht.
Dafür ist das Betriebssystem nicht ausgelgt. Hat also nichts mit .NET sondern mit der Hardware und dem OS zutun.

Wir verwenden für einen ähnlichen Anwendungsfall (Sensoren einer Industriemaschine im <1ms Bereich) eine entsprechende zusätzliche Hardware (Windows IPC mit einer Soft-Core), die die Daten in Echtzeit sammelt und dann an unsere Prozesse im 50ms Bereich überträgt.

Eine Aktualisierung der Darstellung in Echtzeit macht übrigens zusätzlich keinen Sinn, weil das das menschliche Auge gar nicht verarbeiten kann.
Wäre Ressourcenverschwendung. UI Aktualisierungen macht man eher im 50-300ms Bereich. Das ist Live genug für das Auge.

X
Xodarap93 Themenstarter:in
19 Beiträge seit 2017
vor 6 Jahren

Hmm, eine unschöne Antwort. Die mich aber nicht wirklich überrascht.

Gut die Echtzeitdarstellung ist etwas überspitzt beschrieben.
Hauptsache ist, dass alle Daten gespeichert werden, die Darstellung soll zwar in Echtzeit mitlaufen, muss aber theoretisch nicht auf allen Daten basieren. Hauptsache Flanken sind eindeutig erkennbar, es reicht also auch wenn zum. Beispiel mit 250Hz Daten aufgenommen werden, der Chart aber "nur" z.B. 5mal pro Sekunde aktualisiert wird.
Graph ist nur eine visuelle Unterstützung. Die Aufgenommen Daten brauchen diese Auflösung, da diese noch weiter verarbeitet werden.

Meine jetzige Idee (damit werde ich mich morgen beschäftigen).
Die Daten kontinuierlich zu lesen (das sollte doch auch bei 250Hz) möglich sein und dann etwa alle 0,2s den Chart zu updaten.

Ich denke generell werde ich nicht weiter versuchen möglichst viel vom bestehenden Code zu behalten, sondern nehme mir die Zeit, setze meine eigenen Ideen um und passe den bestehenden Code an meinen an.

Weitere Ideen zu dem Thema sind gern gesehen.

16.807 Beiträge seit 2008
vor 6 Jahren

Wenn der Sensor kein Puffer hat, der es zulässt, dass Windows im Rahmen seiner Möglichkeit die Daten abruft, dann haste keine Chance ohne extra Hardware.
Ansonsten bekommst Du alles zwischen zwei Lesezyklen nicht mit. Keine Chance hier 250Hz zu erreichen. 50Hz stabil wäre schon Glück. Gerade weil für uns die Spitzen interessant sind und nicht der Rest, müssen wir über eine andere Hardware gehen.

C
2.121 Beiträge seit 2010
vor 6 Jahren

Wie werden die Daten gesendet? Das beeinflusst die Lösung sicher auch. Seriell? USB? Dabei ist ein Puffer im Rechner drin, denke ich. Dann kann man die Schnittstelle öffnen und lesen. Kameradaten kommen noch viel geballter und dabei schafft mans ja auch.

T
2.219 Beiträge seit 2008
vor 6 Jahren

@Xodarap93
Hier muss ich Abt zustimmen.
Auch das Thema mit der Darstellung wäre noch unklar.
Wie genau soll er die Daten in einem Chart in Echtzeit anzeigen?
Dazu müsste das Chart ja immer wieder gezeichnet werden, was aber auch die Neubindung bedeuten würde.
Dann würde deine UI wahrscheinlich durchgehend die CPU fressen.

Auch wäre, wie Chillic anmerkt, wichtig wie die Daten gelesen werden.
Wie genau funktioniert deine Timerverarbeitung genau?
Wird hier nur alle X Millisekunden gelesen oder was genau meinst du damit?

Für mich klingt es stark nach seriellen Lesen, da Sensoren sich meistens nur per Serielle Schnittstelle abfragen lassen.
Hatte auch mal ein Temperaturen Sensor, der per USB Adapter am Rechner angeschlossen wurde.
Dort musste auch im Abfrage Modus immer eine Abfrage gesendet werden und naütrlich die Verbindung aufgebaut werden mit einem Start Paket.

T-Virus

Developer, Developer, Developer, Developer....

99 little bugs in the code, 99 little bugs. Take one down, patch it around, 117 little bugs in the code.

16.807 Beiträge seit 2008
vor 6 Jahren

Das mag für 0815 Bedarfsfälle sein, dass diese RS232 haben; könnte hier durchaus der Fall sein.
Industrie-Sensoren sind BUS Systeme wie ProfiBus, profiNET, Ethernet etc.

USB ist explizit nicht als Echtzeit-Übertragung spezifiziert.
RS232 auch nicht; kann für manche Szenarien aber dafür missbraucht werden: mit Glück passt das hier ja.

PS: Für Windows 7 x64 gibt es RTX.
Kenne das aber auch ur aus der Theorie. Nie eine Industrieanlage damit gesehen.

3.170 Beiträge seit 2006
vor 6 Jahren

Hallo,

Für Windows 7 x64 gibt es RTX.
Kenne das aber auch ur aus der Theorie. Nie eine Industrieanlage damit gesehen.

Das haben wir bei einem unserer Geräte im Einsatz (nicht bei dem, an dem ich arbeite).

Ist aber wenn ich das richtig mitbekommen habe jetzt nix, wo man sich eben mal kurz einarbeiten kann.
Das ist Real-Time Programmierung mit C++, das hat wieder ganz eigene Anforderungen und ist quasi eine Welt für sich.
Wir haben da extra einen Spezialisten eingestellt - das ist für ein Praktikum bestimmt nicht geeignet.

Gruß, MarsStein

Non quia difficilia sunt, non audemus, sed quia non audemus, difficilia sunt! - Seneca

X
Xodarap93 Themenstarter:in
19 Beiträge seit 2017
vor 6 Jahren

Super, schonmal vielen Dank für all die Antworten.
Ich versuche mal alle aufgekommenen Unklarheiten aufzuklären.

Gesendet wird tatsächlich über Serial Ports also RS232.

Die Werte werden kontinuirlich gestreamt. 5 Bytes entsprechen dabei einem Druckwert.

Der Chart funktioniert in Echtzeit mit dem alten Sensor wie folgt:
Alle 25ms wird ein Timerevent ausgelöst. Dieses erfragt einen Druckwert beim Sensor und ordnet dem Wert einen Zeitpunkt zu. Dieser Wert wird dann in einen Chart gezeichnet (Druck über Zeit). Nach 10s wird im Chart an Position 0 wieder begonnen und die alten Werte überschrieben. Gleichzeitig werden alle Werte mit Zeitstempel in einem .txt-file geloggt.
Das klappt auch alles super und für den neuen Sensor konnte ich das wie gesagt auch implementieren bin da aber aufgrund der Timer von der System Clock auf 15.6ms pro timerevent beschränkt.

Jetzt möchte ich daher den Burst mode des Sensors nutzen um die maximale Performance zu erreichen.
Mein Plan dabei:
A) Alle Werte kontinuirlich über den Serial Port lesen
B) Jedem Wert einem spezifischen Zeitpunkt zuordnen
C) Alle Werte in einem File speichern. (inkl. Zeitstempel)
D) Alle X Zeitpunkte einen der Werte abgreifen und in den Chart schreiben

Ich schreibe jetzt mal ein bisschen was und versuche dann auch Codebeispiele einzuordnen. Denke dadurch kann einiges besser veranschaulicht werden.

W
872 Beiträge seit 2005
vor 6 Jahren

Vielleicht noch etwas grundsätzliches - Lesen vom Drucksensor solltest Du immer in einem eigenen Thread, der dann in einen Buffer schreibt. Ich benutze für den Buffer meist den Disruptor.
.NET erlaubt schon hohen Durchsatz, wenn man vor allem mit wiederverwerteten Objekten arbeitet, so daß man GC 2 umgeht. Das erfordert aber eine aufwändigere Programmierung und ist unmöglich, wenn Du gleichzeitig Server und Clientaufgaben in einem WPF Programm mischst.

C
2.121 Beiträge seit 2010
vor 6 Jahren

A) Alle Werte kontinuirlich über den Serial Port lesen
B) Jedem Wert einem spezifischen Zeitpunkt zuordnen
C) Alle Werte in einem File speichern. (inkl. Zeitstempel)
D) Alle X Zeitpunkte einen der Werte abgreifen und in den Chart schreiben

A sollte von sich aus gehen, ohne jeden Wert anzufragen. Probiers doch einfach aus.

B wird schon schwieriger denn du hast nicht zu jedem Messwert einen Zeitstempel. Höchstens wenn der Sensor wirklich immer mit der selben Rate liest, dann kannst du das hochrechnen.

C und D sind kein Problem, das ist ja wieder was ganz anderes als die Daten zu bekommen.

X
Xodarap93 Themenstarter:in
19 Beiträge seit 2017
vor 6 Jahren

Mir hat es halt wirklich geholfen, mich nicht zu sehr anden bestehenden Code zu halten und alles in die Timerstruktur quetschen zu wollen.

A) bekomme ich ohne weiteres hin.
B) Genau das selbe ist mein Plan und habe ich auch mit meinem Betreuer schon besprochen.

C) und D) sollten auch selbstläufer sein.

Meine einzigen Probleme sind jetzt aktuell mein noch nicht sehr tiefgreifendes Wissen bezüglich C#, da ich damit erst seit 2 Wochen arbeite. Aber das sind alles dinge, die ich mit Eigenrecherche gelöst bekomme.

Wenn ich wieder größere Probleme habe melde ich mich gerne wieder.
Ihr konntet mir sehr mit euren Ideen helfen.

Beste Grüße

X
Xodarap93 Themenstarter:in
19 Beiträge seit 2017
vor 6 Jahren

Also, falls mal jemand ähnliche Probleme hat, gebe ich hier mal den Code rein, den ich aktuell habe.
Klassen-, Methoden- und Variablen-bezeichnungen sind nur sporadisch gewählt und werden noch angepasst 😉


using Mensor;

namespace Mensor2
{
    public class Program
    {
        static  void Main()
        {
            Port CallIt = new Port();
            CallIt.ReadPort();
        }
    }
}


using System;
using System.IO.Ports;
using System.Diagnostics;

namespace Mensor
{
    public class Port
    {
        Stopwatch clock = Stopwatch.StartNew();
        System.IO.StreamWriter file = new System.IO.StreamWriter(@"C:\Users\namename\Desktop\TestFile.txt");
        public void ReadPort()
        {

            SerialPort mySerialPort = new SerialPort("COM2");
            mySerialPort.BaudRate = 57600;

            mySerialPort.DataReceived += new SerialDataReceivedEventHandler(DataReceivedHandler);

            mySerialPort.Open();

            Console.WriteLine("Press any key to continue...");
            Console.WriteLine();
            Console.ReadKey();
            mySerialPort.Close();
        }

        public void DataReceivedHandler(
                            object sender,
                            SerialDataReceivedEventArgs e)
        {
            SerialPort sp = (SerialPort)sender;
            int byteCount = sp.BytesToRead;
            byte[] buffer = new byte[byteCount];
            int readBytes = sp.Read(buffer, 0, buffer.Length);
            Console.Write((clock.Elapsed.ToString() + " " + BitConverter.ToString(buffer) + "\n"));
            file.WriteLine((clock.Elapsed.ToString()+ " " + BitConverter.ToString(buffer) + "\n"));
        }
    }
}

Das ist ein Ausschnitt aus dem entsprechenden File:

00:00:00.0601398 97-D3-26-07-BB-6C-7D-6A-0E-BB-67-6D-28-B7-BB-76-2A-44-9F-BB-80-10-B9-04-BB-81-66-87-29-BB-86-40-5A-DB-BB-83-F7-F5-2A-BB-81-90-6C-38-BB-79-B3-8F-76

00:00:00.0617665 BB-7F-D0-67-71

00:00:00.0652682 BB-8F-01-4E-99

00:00:00.0695822 BB-97-18-33-9D

00:00:00.0736972 BB-92-7A-41-08

00:00:00.0778293 BB-8F-D2-6B-87

00:00:00.0819810 BB-8E-61-B7-61

00:00:00.0861751 BB-88-C9-52-5E

00:00:00.0903359 BB-85-E6-73-99

00:00:00.0944593 BB-85-73-BF-72

00:00:00.0986179 BB-83-73-01-B2

00:00:00.1028425 BB-62-80-12-AF

00:00:00.1069061 BB-45-3D-BC-F9

00:00:00.1110302 BB-3F-B4-16-C4

00:00:00.1152409 BB-2E-05-B4-A2

00:00:00.1193482 BB-27-B5-07-9E

00:00:00.1235170 BB-31-9E-C4-4E

00:00:00.1276789 BB-37-3E-1C-4C

00:00:00.1317807 BB-41-66-D7-39

00:00:00.1359741 BB-47-72-11-85

00:00:00.1402782 BB-5E-48-04-65

00:00:00.1443800 BB-6C-BC-1B-FE

00:00:00.1484444 BB-7C-71-3D-E5

00:00:00.1526942 BB-8D-FC-DA-1E

00:00:00.1568429 BB-99-07-3C-97

00:00:00.1610580 BB-A2-10-A0-0D

00:00:00.1651451 BB-AA-0C-49-BA

00:00:00.1692659 BB-B1-55-2C-ED

00:00:00.1733152 BB-B4-30-93-32

00:00:00.1775977 BB-B6-02-E1-54

00:00:00.1818011 BB-AE-C0-A4-CD

00:00:00.1858522 BB-A9-B8-25-41

00:00:00.1899096 BB-A9-A3-74-7B

00:00:00.1942875 BB-A4-77-83-59

00:00:00.1984321 BB-A0-E9-81-C5

00:00:00.2024899 BB-90-44-0A-99

00:00:00.2067082 BB-81-BB-E6-DD

00:00:00.2107968 BB-83-8D-59-24

00:00:00.2150126 BB-7D-7A-D0-82

00:00:00.2191503 BB-7D-04-12-4E

00:00:00.2231850 BB-77-E6-43-5B

00:00:00.2273996 BB-75-64-EB-7F

00:00:00.2315700 BB-81-C4-9B-9B

00:00:00.2357370 BB-85-3B-7F-FA

00:00:00.2398479 BB-8C-73-12-CC

00:00:00.2439592 BB-8A-D8-BC-D9

00:00:00.2482685 BB-89-9A-9F-7D

00:00:00.2523010 BB-8A-95-CE-A8

00:00:00.2564936 BB-7B-D7-A0-AD

00:00:00.2606299 BB-6C-F2-01-1A

00:00:00.2615552 

00:00:00.2648083 BB-67-89-4E-F9

00:00:00.2690094 BB-65-77-B9-50

00:00:00.2731383 BB-6F-1A-77-BB

00:00:00.2739896 

00:00:00.2772393 BB-74-E0-4B-5A

00:00:00.2813341 BB-71-DF-96-A1

00:00:00.2855209 BB-7D-5E-2C-C2

Damit ist es mir aktuell möglich sowohl in ein File als auch in die Konsole alle 250 Werte zu schreiben und jeweils mit einer Stopwatch einen Zeitpunkt zuzuordnen.

Wenn jemand was übernehmen will müsst ihr natürlich die Eigenheiten des verwendeten Sensors beachten.
Hier wurde ein Mensor CPT6140 verwendet.

Ich bin natürlich weiterhin offen für Anmerkungen.
Das verarbeiten der Daten und implementieren in das bestehende Programm gehe ich nächste Woche an. Das Vorgehen habe ich nun zumindest verstanden. Auch dank eurer Hilfe.

16.807 Beiträge seit 2008
vor 6 Jahren

Wie es bereits empfohlen wurde: einen eigenen Thread / Long-Running Task verwenden.
Zudem wenn Du mit Ressourcen arbeitest, die Disposed werden müssen ( Deine Streams ) dann sollte die Klasse (hier Port) ebenfalls Disposable sein, damit keine Speicherleichen entstehen können.

Willst Du wirklich eine Stopwatch als Zeitgebener nutzen?
Zumindestens in Industrieumgebungen ist das ein No-Go.
Hier müssen alle Systeme einen gemeinsamen NTP haben; Zeiten werden via ISO 8601 gespeichert und zugeordnet.
Sowas nur als Empfehlung; sollte man nicht unbedingt selbst neu erfinden sondern Standards nutzen.

X
Xodarap93 Themenstarter:in
19 Beiträge seit 2017
vor 6 Jahren

Wie es bereits empfohlen wurde: einen eigenen Thread / Long-Running Task verwenden.

Werde ich mich direkt Montag zu tiefer belesen.
Das erste bisschen was ich bisher gelesen habe klingt soweit sinnvoll.
Danke 😃

16.807 Beiträge seit 2008
vor 6 Jahren

Du solltest - wenn möglich - auch das Lesen vom Port und das Schreiben der Datei trennen bzw. zu entkoppeln; sprich einen Puffer dazwischen setzen.
zB via einer BlockingCollection.

So hat der Thread des Lesens am Port wieder schneller Zeit Daten entgegen zu nehmen.
Abstrakt gesehen ist dem "Port" ja egal, was nach dem Lesen passiert.
Der Port sagt nur "Hier, neue Daten!".

Und mit einem entsprechenden Consumer reagierst Du drauf und kannst dann Aktionen (hier Speichern) ausführen.
Gleichzeitig könnte eine Aktion natürlich auch an die UI gehen.

X
Xodarap93 Themenstarter:in
19 Beiträge seit 2017
vor 6 Jahren

Ich komme voran, stehe aber wieder vor einem Problem.
Ich konnte schon mit 250Hz vom Sensor lesen und alle Werte speichern. Alls ich meinen Code allerdings in das bestehende Programm einfügen konnte haben immer wieder andere Dinge nicht funktioniert.

Deshalb versuche ich aktuell eine eigene Windows Forms Application zu schreiben, welche es mir erlaubt vom Sensor zu lesen und entsprechende Werte zu speichern, später auch einen Chart zu erstellen. Ich versuche alles möglichst modular mit einzelnen methoden aufzubauen, um später jede einzelne Funktion in das bestehende Programm möglichst einfach einfügen zu können.

Hier mein bisheriger Code:

Form1.cs


using System;
using System.Threading;
using System.IO;
using System.Windows.Forms;
using MENSORCPT6140;

namespace MensorCPT6140_constReadingWindowsForms
{

    public partial class Form1 : Form
    {

        //Stream zum Schreiben
        StreamWriter logfile = null;
        
        BurstMode PortObject = new BurstMode();
        bool LoopIsRunning = false;
        public Form1()
        {
            InitializeComponent();
        }

        private void Form1_Load(object sender, EventArgs e)
        {
            this.chart1.Series.Clear();
            chart1.Series.Add("series1");
        }

        private void button1_Click(object sender, EventArgs e)
        {
            //Create a new thread object
            Thread PortThread = new Thread(PortObject.ReadPort);

            //Start the PortThread
            PortThread.Start();
            
            tmrChart.Enabled = true;
        }
        

        private void tmrUpdateChart(object sender, EventArgs e)
        {
            
            chart1.Series[0].Points.AddXY(DateTime.Now,PortObject.ConvHexToDouble_IEEE754().pressure);
        }

        private void button2_Click(object sender, EventArgs e)
        {
            // create file
            logfile = null;
            logfile = new StreamWriter(@"C:\Users\namename\Desktop\\MensorCPT6140_" + DateTime.Now.Year + "_" + DateTime.Now.Month + "_" + DateTime.Now.Day + "_" + DateTime.Now.Hour + "_" + DateTime.Now.Minute + ".txt");

            //timer is used to check every 25ms if values are in queue
            System.Timers.Timer tmrLogfile = null;
            tmrLogfile = new System.Timers.Timer();
            tmrLogfile.Interval = 25;
            tmrLogfile.Elapsed += new System.Timers.ElapsedEventHandler(LogFileHandler);
            tmrLogfile.Start();
        }
        
        public void LogFileHandler(object sender, System.Timers.ElapsedEventArgs e)
        {
            if (LoopIsRunning == false) //protect loop from beeing called multiple times
            {
                while (PortObject.dataQueue.Count > 0)
                {
                    LoopIsRunning = true;
                    if (PortObject.CheckChecksum())
                    {
                        PortObject.Datapoint = PortObject.ConvHexToDouble_IEEE754();
                        //write file
                        logfile.WriteLine(PortObject.Datapoint.time.ToString("M/dd/yyyy h:mm:ss.fffffff tt") + " " + PortObject.Datapoint.pressure.ToString());

                        //write into console
                        Console.Write((PortObject.Datapoint.time.ToString("M/dd/yyyy h:mm:ss.fffffff tt") + " " + PortObject.Datapoint.pressure.ToString() + "\n"));

                        PortObject.dataQueue.Dequeue();
                    }
                    else
                    {
                        PortObject.dataQueue.Dequeue();        // remove invalid pressure value
                    }
                }
                LoopIsRunning = false;
            }
            
        }
    }
}

MENSORCPT6140.cs


using System;
using System.IO;
using System.IO.Ports;
using System.Diagnostics;
using System.Collections.Generic;

namespace MENSORCPT6140
{
    public class rawValue
    {
        public DateTime rawTime;
        public byte[] rawPressure;
        public rawValue(byte[] rawPressure, DateTime rawTime)
        {
            this.rawTime = rawTime;
            this.rawPressure = rawPressure;
        }
    }
    public class PSValue
    {
        public DateTime time;
        public double pressure;
        public string unit;

        public PSValue(double pressure, string unit, DateTime time)
        {
            this.time = time;
            this.pressure = pressure;
            this.unit = unit;
        }
    }

    public class BurstMode
    {
        // initialize clock for PS-value timestamps
        Stopwatch clock = Stopwatch.StartNew();
        //initialize Datetime
        DateTime FirstCallTime = DateTime.Now;

        public Queue<rawValue> dataQueue = new Queue<rawValue>();
        public System.Timers.Timer ChartUpdateTmr = null;
        public PSValue Datapoint;
        
        // method to check the transmitted pressure value
        public bool CheckChecksum()
        {
            byte[] CheckBuffer = dataQueue.Peek().rawPressure;
            string Checksum_transmitted = CheckBuffer[4].ToString("X2");                                                                            //transmitted Checksum
            string Checksum_result = (CheckBuffer[0] + CheckBuffer[1] + CheckBuffer[2] + CheckBuffer[3]).ToString("X1").Substring(1);               //calculated Checksum
            if (Checksum_result == Checksum_transmitted)
            {
                return true;
            }
            else
            {
                return false;
            }
        }

        // converting hex to floating (IEEE754) - using BitConverter.ToSingle
        public PSValue ConvHexToDouble_IEEE754()
        {
            byte[] ConvBuffer = new byte[4];
            if (dataQueue.Count > 0)
            {
                ConvBuffer[0] = dataQueue.Peek().rawPressure[0];
                ConvBuffer[1] = dataQueue.Peek().rawPressure[1];
                ConvBuffer[2] = dataQueue.Peek().rawPressure[2];
                ConvBuffer[3] = dataQueue.Peek().rawPressure[3];
                Array.Reverse(ConvBuffer);
                return new PSValue(BitConverter.ToSingle(ConvBuffer,0), "mbar", dataQueue.Peek().rawTime);
            }
            else
            {
                return new PSValue(0, "NaN", DateTime.MinValue); ;
            }
            
        }

        public virtual void ReadPort()
        {
            // initializing serial port
            SerialPort PSPort = new SerialPort("COM2");
            PSPort.BaudRate = 57600;
            PSPort.Open();

            // calling event (read bytes from serial port)
            PSPort.DataReceived += new SerialDataReceivedEventHandler(DataReceivedHandler);
            
        }
        

        public void DataReceivedHandler(
                            object sender,
                            SerialDataReceivedEventArgs e)
        {
            // reading from serial Port
            SerialPort sp = (SerialPort)sender;
            byte[] buffer = new byte[5];                            // reading 5 bytes (representing pressure value)
            int readBytes = sp.Read(buffer, 0, buffer.Length);
            dataQueue.Enqueue(new rawValue(buffer, (FirstCallTime + clock.Elapsed)));
        }
    }
}

Wird button 1 gedrückt wird der Serial Port geöffnet und alle Werte in eine Queue geschrieben.
Button 2 erstellt ein File, verarbeitet die Daten aus der Queue und schreibt diese kontinuierlich in das File.
Das habe ich damit verwirklicht, dass ein Timer alle 25ms prüft, ob Werte in der Queue sind, ist dies der Fall werden alle Werte in dieser Queue in das File geschrieben.

Mein Problem dabei ist, dass nach etwa 1,5 sekunden keine weiteren Werte mehr in das File geschrieben werden. Ich glaube ich habe irgendwo ganz grobe Fehler gemacht, finde diese aber nicht. Vermute das Problem in dem Zusammenspiel aus Timer und While-Loop.

Vielen Dank für eure Hilfe im Vorraus.

16.807 Beiträge seit 2008
vor 6 Jahren

Es ist, wie schon geschrieben, sehr zu empfehlen, dass Du das annehmen von Daten und das anschließende Verarbeiten dieser Daten trennst; auch im Sinne von Thread trennst.
Sprich Consumer Producer Pattern, zB mit Hilfe einer BlockingCollection.

Du musst dafür sorgen, dass der lesende Thread Zeit hat zu lesen, und keinen anderen Aufgaben nachgeht.
Hier bieten sich auch (long running) Tasks an, die sich leichter handlen lassen, als Threads.

Damit lässt sich Dein Code auch am Ende deutlich vereinfachen, weil es übersichtlicher wird und eine deutliche Trennung der Aufgaben des Codes sichtbar wird.

X
Xodarap93 Themenstarter:in
19 Beiträge seit 2017
vor 6 Jahren

Es ist, wie schon geschrieben, sehr zu empfehlen, dass Du das annehmen von Daten und das anschließende Verarbeiten dieser Daten trennst; auch im Sinne von Thread trennst.
Sprich Consumer Producer Pattern, zB mit Hilfe einer BlockingCollection.

Das wollte ich nicht ignorieren.
Ich habe mich mit BlockingCollection befasst und es für sehr sinnvoll erachtet. Ich konnte es allerdings nicht in der Form implementieren wie ich es mir vorgestellt habe. Ich habe dieses Thema nicht komplett verstanden. Bin aber auf Queue() gestoßen, ich hätte gedacht, dass ich damit das Schreiben und Verarbeiten der Daten ausreichend "getrennt" hätte.

Consumer Producer Pattern scheint mir ein gutes Stichwort. Das arbeite ich nun mal durch und ich denke, das sollte ich nun hinbekommen. Man lernt ja auch kontinuierlich dabei, viele dinge an C# sind mir immernoch sehr ungewohnt.

Und an Long running Tasks setzte ich mich danach wohl auch nochmal. Threads schienen mir einfacher zu verwenden, als ich mich über beides belesen habe.

16.807 Beiträge seit 2008
vor 6 Jahren

Du trennst es prinzipiell mit der Queue physikalisch; bist aber im gleichen Thread womit der Effekt verpufft.

Im Endeffekt hast Du mit der BLockingCollection zwei Tasks (statt Threads): ein Task für das Lesen und ein Task für das Behandeln der Nachricht.

Beispielcode findest Du unter TPL Pipelines

Im Prinzip kann das vereinfacht so aussehen:

public class BurstRunner3000
{
    BlockingCollection<string> _queue = new BlockingCollection<string>();
    StreamWriter _sr;

    Task _consumerTask;
    public BurstRunner3000(string logFile)
    {
        _sr = new StreamWriter(logFile);
        _consumerTask = Task.Run(async () => await ConsumeMessageAsync());
    }

    public void AddMessage(string message)
    {
        _queue.Add(message);
    }

    private async Task ConsumeMessageAsync()
    {
        foreach(var entry in _queue.GetConsumingEnumerable())
        {
            await _sr.WriteLineAsync(entry);
        }
    }
}

Dein Port-Lese-Event ruft hier die Methode AddMessage auf und kippt die Nachricht ein.
Sobald eine Nachricht in die BLockingCollection kommt, läuft die foreach Schleife im ConsumeMessage weiter und die Nachricht wird verarbeitet.

Was fehlt:

  • korrektes Implementieren vin IDisposable, sodass die Tasks beendet werden, wenn die Anwendung beendet wird bzw. die Klasse BurstRunner3000 verworfen wird.
    Damit wird auch der Stream korrekt geschlossen
  • willst Du mehrere Consumer-Threads haben, um die Schreibperformance zu erhöhen - wenn nötig - brauchst Du ein Locking auf den Stream bzw. ein Locking auf alle Thread-Unsafe Operationen.
  • BlockingCollection muss finalisiert werden, damit beim Disposen der Klasse keine Nachrichten verloren gehen, sondern das Disposen erst abgeschlossen ist, wenn alle Nachrichten verarbeitet wurden (quasi asynchrones Canceln).

Aber das Konstrukt trennt schon das Einkippen einer Nachricht vom Verarbeiten einer Nachricht.

T
2.219 Beiträge seit 2008
vor 6 Jahren

@Abt
Danke für das Beispiel, kann ich gebrauchen.
Hab ein ähnliches Problem, bei dem ein Thread Objekte(Verbindungen) einreiht.
Diese werden dann in einem Batch eingefügt.
Der Batch arbeitet dann in einem zweiten Thread die Verbindungen dann einmal durch.
Aktuell nutze ich dafür eine Queue und im Batch einen extra Thread.
Dort muss ich aber das hinzufügen/entfernen noch per lock Absichern, vergesse leider immer wieder, dass es BlockingCollection gibt.

Ist also im Grunde das gleiche Konzept.
Ein Thread der Verbindungen hinzufügt und einer der diese Verarbeitet und aus dem Batch entfernt.

T-Virus

Developer, Developer, Developer, Developer....

99 little bugs in the code, 99 little bugs. Take one down, patch it around, 117 little bugs in the code.

X
Xodarap93 Themenstarter:in
19 Beiträge seit 2017
vor 6 Jahren

Okay, auf ein Neues!
Abt, dein Beispiel und das unter https://msdn.microsoft.com/en-us/library/ff963548.aspx haben mir generell sehr helfen können.

Das generelle Prinzip der Pipelines habe ich verstanden und konnte ich auch umsetzen.
Mit dem Thema IDisposibles und generellem Pipeline-Cancelling habe ich große Probleme.

Habe jetzt einfach mal, ohne wirklich zu wissen was ich mache in jedem Consumer Block try/catch Anweisungen eingebaut. Denke aber ich habe da nonSense fabriziert.

Ich habe nun 4 Blöcke und 3 Buffer. Der Input wird im ersten Block gelesen und in einen Buffer geschrieben. Der Inhalt dieses Buffers wird im 2. Block in den Druckwert umgerechnet und in einen 2. Buffer geschrieben. Der Inhalt dieses Buffers wird in einem 3. Block in einen String umgewandelt, welcher in einen 3. Buffer geschrieben wird. In einem 4. Block wird der Inhalt dieses Buffers in das File geschrieben.
Hier in jedem Fall mal mein Code der Console Application:

Program.cs:

using System;
using System.Threading;
using Pipeline;

namespace MENSORCPT6140
{
    public class rawValue
    {
        public DateTime rawTime;
        public byte[] rawPressure;
        public rawValue(byte[] rawPressure, DateTime rawTime)
        {
            this.rawTime = rawTime;
            this.rawPressure = rawPressure;
        }
    }

    public class PSValue
    {
        public DateTime time;
        public double pressure;
        public string unit;

        public PSValue(double pressure, string unit, DateTime time)
        {
            this.time = time;
            this.pressure = pressure;
            this.unit = unit;
        }
    }

    public class Program
    {
        static void Main()
        {
            //Create a new thread object
            SensorRxTx PipelineObject = new SensorRxTx();
            Thread PortThread = new Thread(PipelineObject.ReadPort);

            //Start the PortThread
            PortThread.Start();
        }
    }
}

Pipeline.cs:

using System;
using System.IO;
using System.IO.Ports;
using System.Diagnostics;
using System.Threading;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using MENSORCPT6140;

namespace Pipeline
{
    public class SensorRxTx
    {
        BlockingCollection<rawValue> readSerial_buffer = new BlockingCollection<rawValue>();
        BlockingCollection<PSValue> IEEE754_buffer = new BlockingCollection<PSValue>();
        BlockingCollection<string> OutString_buffer = new BlockingCollection<string>();

        Task _consumerTask;

        StreamWriter _sr;
        // initialize clock for PS-value timestamps
        Stopwatch clock = Stopwatch.StartNew();
        //initialize Datetime
        DateTime FirstCallTime = DateTime.Now;

        //...
        public SensorRxTx()
        {
            CancellationTokenSource cts = new CancellationTokenSource();
            var token = cts.Token;
            _sr = new StreamWriter(@"C:\Users\sharifso\Desktop\\MensorCPT6140_" + DateTime.Now.Year + "_" + DateTime.Now.Month + "_" + DateTime.Now.Day + "_" + DateTime.Now.Hour + "_" + DateTime.Now.Minute + ".txt");
            _consumerTask = Task.Run(async () => await ConsumeMessageAsync(cts));
        }

        // method to check the transmitted pressure value
        public bool CheckChecksum(rawValue SensorInput)
        {
            byte[] CheckBuffer = SensorInput.rawPressure;
            string Checksum_transmitted = CheckBuffer[4].ToString("X2");                                                                            //transmitted Checksum
            string Checksum_result = (CheckBuffer[0] + CheckBuffer[1] + CheckBuffer[2] + CheckBuffer[3]).ToString("X1").Substring(1);               //calculated Checksum
            if (Checksum_result == Checksum_transmitted)
            {
                return true;
            }
            else
            {
                return false;
            }
        }

        // converting hex to floating (IEEE754) - using BitConverter.ToSingle
        public PSValue ConvHexToDouble_IEEE754(rawValue SensorInput)
        {
            byte[] ConvBuffer = new byte[4];
            ConvBuffer[0] = SensorInput.rawPressure[0];
            ConvBuffer[1] = SensorInput.rawPressure[1];
            ConvBuffer[2] = SensorInput.rawPressure[2];
            ConvBuffer[3] = SensorInput.rawPressure[3];
            Array.Reverse(ConvBuffer);

            return new PSValue(BitConverter.ToSingle(ConvBuffer, 0), "mbar", SensorInput.rawTime);
        }

        public virtual void ReadPort()
        {
            // initializing serial port
            SerialPort PSPort = new SerialPort("COM2");
            PSPort.BaudRate = 57600;
            PSPort.Open();

            // calling event (read bytes from serial port)
            PSPort.DataReceived += new SerialDataReceivedEventHandler(DataReceivedHandler);

        }


        public void DataReceivedHandler(
                            object sender,
                            SerialDataReceivedEventArgs e)
        {
            // reading from serial Port
            SerialPort sp = (SerialPort)sender;
            byte[] buffer = new byte[5];                            // reading 5 bytes (representing pressure value)
            int readBytes = sp.Read(buffer, 0, buffer.Length);
            CancellationTokenSource cts = new CancellationTokenSource();
            var token = cts.Token;
            ReadString(new rawValue(buffer, (FirstCallTime + clock.Elapsed)), cts);
            //dataQueue.Enqueue(new rawValue(buffer, (FirstCallTime + clock.Elapsed)));
        }
        public void ReadString(rawValue SensorInput, CancellationTokenSource cts)
        {
            try
            {
                var token = cts.Token;
                readSerial_buffer.Add(SensorInput, token);
            }
            catch (Exception e)
            {
                cts.Cancel();
                if (!(e is OperationCanceledException))
                    throw;
            }
            finally
            {
                readSerial_buffer.CompleteAdding();
            }
        }

        public void ConvStringIEEE754(CancellationTokenSource cts)
        {
            try
            {
                var token = cts.Token;
                foreach (var entry in readSerial_buffer.GetConsumingEnumerable())
                {
                    if (token.IsCancellationRequested) break;
                    if (CheckChecksum(entry))
                    {
                        IEEE754_buffer.Add(ConvHexToDouble_IEEE754(entry), token);
                    }
                    else
                    {
                        IEEE754_buffer.Add(new PSValue(0, "NaN", entry.rawTime), token);
                    }
                }
            }
            catch (Exception e)
            {
                cts.Cancel();
                if (!(e is OperationCanceledException))
                    throw;
            }
            finally
            {
                IEEE754_buffer.CompleteAdding();
            }
        }

        public void CreateOutputString(CancellationTokenSource cts)
        {
            try
            {
                var token = cts.Token;
                foreach (var entry in IEEE754_buffer.GetConsumingEnumerable())
                {
                    if (token.IsCancellationRequested) break;
                    OutString_buffer.Add(entry.time.ToString("M/dd/yyyy h:mm:ss.fffffff tt") + " " + entry.pressure.ToString() + entry.unit);
                }
            }
            catch (Exception e)
            {
                cts.Cancel();
                if (!(e is OperationCanceledException))
                    throw;
            }
            finally
            {
                OutString_buffer.CompleteAdding();
            }

        }

        private async Task ConsumeMessageAsync(CancellationTokenSource cts)
        {
            try
            {
                var token = cts.Token;
                foreach (var entry in OutString_buffer.GetConsumingEnumerable())
                {
                    if (token.IsCancellationRequested) break;
                    await _sr.WriteLineAsync(entry);
                }
            }
            catch(Exception e)
            {
                cts.Cancel();
                if (!(e is OperationCanceledException))
                    throw;
            }
            finally
            {
                OutString_buffer.CompleteAdding();
            }
        }
    }
}

Das mit den Pipelines ist im Prinzip super und genau das was ich brauche/möchte.
Allerdings schließt sich das Programm nach dem Start auch direkt wieder und es wird nur das leere logfile erstellt.

Ich bräuchte mal noch ein Beispiel wie ich mit IDisposibles umzugehen habe. Geht es dabei um das Thema "Canceling a Pipeline"? Das ist mir alles noch ziemlich schleierhaft.

Bin ich den ansonsten mit dem Aufbau der Pipeline korrekt vorgegangen?

D
985 Beiträge seit 2014
vor 6 Jahren

Nun ja, was soll dein Programm auch daran hindern sich zu beenden.

Die letzte Aktion ist das Starten des Threads und dann kommt nix mehr. Und wenn nix mehr kommt, wird die Main-Methode verlassen. Und wenn die verlassen wird, dann beendet sich das Programm.

16.807 Beiträge seit 2008
vor 6 Jahren

Du brauchst hier nicht einen einzigen Threads, alles viel einfacher mit Tasks umzusetzen.

Im Endeffekt ruft Deine Anwendung eine (nennen wir sie einfach mal) DoWork() Methode auf, die einen Task startet und diesen zurück gibt (mit Task.Run, damit es ein long-running Task wird).
Deine Main-Methode muss nun mit Hilfe von Task.Wait() einfach warten, bis sich dieser Haupttask beendet hat.

Der Haupttask darf Child-Tasks haben, wie zB. Dein Listener oder Deine Consumer-Tasks.
Der Haupttask darf sich aber erst beenden ( durch ein Cancel ) wenn sich alle Child-Tasks beendet haben.

Sobald Cancel aufgerufen wurde und sich alle Tasks sauber beendet haben, dann blockiert Wait() nicht mehr und die Applikation schließt sich.

PS: damit Methoden auf ein Cancel reagieren übergibt man nur den Token und nicht die Token-Source.
Die Source kennt nur das Parent.

Evtl. finde ich heute Abend Zeit für ein konkreteres Beispiel.

X
Xodarap93 Themenstarter:in
19 Beiträge seit 2017
vor 6 Jahren

Und ich behaupte noch ich hätte die Idee hinter Pipelines verstanden 😄
Das man alle Tasks starten muss und bis zu Beendigung dieser warten muss sollte auf der Hand liegen.

Naja das beachtet funktioniert es tatsächlich. Ich kann mit entsprechender Geschwindigkeit schreiben. Nun muss ich nur noch mein wildes mit token umhergewerfe bereinigen.

Ich melde mich nochmal.
Weiterhin vielen Dank!

16.807 Beiträge seit 2008
vor 6 Jahren

Und modularisier mal Deinen Code (und vergess das Aufräumen der Ressourcen nicht -> Dispose).

X
Xodarap93 Themenstarter:in
19 Beiträge seit 2017
vor 6 Jahren

Und modularisier mal Deinen Code (und vergess das Aufräumen der Ressourcen nicht -> Dispose).

Was meinst du nun noch mit modularisieren? Jede Methode in sich ist ja schon sehr minimalistisch als einzelne Methode und wird als eigener Task aufgerufen. Meinem Verständnis nach habe ich also ein Modul zum lesen, eines zum verarbeiten, eines zum schreiben,...
Oder is sowas gemeint? How To: Modules

Bezüglich Dispose.
Das soll verwendet werden um "unmanaged resources" zu bereinigen, da in dem Fall der garbage collector nicht greift.
Was sind in meinem Fall beispielsweise solche Ressourcen? Und was würde vom Garbage Collector erfasst werden?

16.807 Beiträge seit 2008
vor 6 Jahren

Deine Ressourcen sind:

  • Streams
  • Die BlockingCollection (Du musst hier AddedComplete beachten, womit die BlockingCollection nicht mehr befüllt werden kann und die Tasks den Rest abarbeiten, bevor diese sich beenden; aktuell behandelst Du das falsch)

Dein Code sollte nicht nur in einzelne Methoden, sondern einzelne Klassen separiert werden.
Damit erzielst Du eine viel bessere Wiederverwendbarkeit und Testbarkeit. Dein Code so aktuell ist ja überhaupt nicht automatisch testbar ( [Artikel] Unit-Tests: Einführung in das Unit-Testing mit VisualStudio ).
Sollte man sich so also gar nicht erst angewöhnen.

Sprich:

Das macht den Code nicht wirklich umfangreicher, aber deutlich effizienter.

X
Xodarap93 Themenstarter:in
19 Beiträge seit 2017
vor 6 Jahren

Das klingt super. Also ist der Link, den ich gepostet habe schon das richtige Stichwort.
Der Modulare Aufbau sollte ja dann auch die Implementierung in das bestehende Programm deutlich vereinfachen.
Das war ja aktuell auch mein Vorhaben, aber halbherzig ausgeführt. Anstatt nur einzelne Methoden auch einzelne Klassen klingt vernünftig. Vorallem mit dem Stichwort Unit-Testing vor den Augen.

Eigentlich eine Schande, dass ich das aktuell überhaupt nicht beachtet habe. Denn in meinem 6Monatigen Praktikum im bachelor Studium war ich genau für das Erstellen von automatisierten Tests zuständig. Da habe ich damals mit C programmiert und die SPUT Unit Testing Framework verwendet. Soviel aber nur nebenbei.

Weiterhin bedanke ich mich für die vielen wirklich sehr hilfreichen Tips

X
Xodarap93 Themenstarter:in
19 Beiträge seit 2017
vor 6 Jahren

Ich habe 2 weitere Code Versionen. Den einen habe ich gestern noch vor dem Tip mit dem Modularisieren geschrieben. Der erstellt ein logfile und schreibt korrekt hinein.
Diese Version ist identisch zu meinem zuletzt geposteten Code außer, dass ich jetzt auf die einzelnen Tasks warte. Die Tasks werden nicht sauber beendet, aber es werden kontinuirlich Daten in das File geschrieben.

Ich habe heute eine neue Version geschrieben, diese beruht auf mehreren Klassen zum modularisieren. Nun habe ich die Blocking Collection Buffer in der Program Klasse als public deklariert, damit ich auf diese von allen Klassen zugreifen kann.
Das funktioniert augenscheinlich auch, lasse ich allerdings das Programm laufen, welches hier der einfachheit halber nur aus Daten lesen und in double umwandeln besteht, passiert nichts.
Beim Debuggen sehe ich, dass Task1 nach dem ersten Durchlauf direkt den Status Completed bekommt (das passiert auch in der anderen, prinzipiell funktionierenden Version) aber trotzdem der DataReceivedHandler arbeitet und Werte in den Buffer schreibt.
Task2 bleibt, wie auch in der Anderen Codeversion, auf dem Status Running, aber nach einem ersten "Aufruf" wird Task2 nie wieder durchlaufen. Obwohl der Buffer gefüllt ist, arbeitet die foreach-loop nicht.

Wo ist mein Fehler?
Ich glaube, das beim Start von Task2 der readSerial_buffer noch leer ist und deshalb die foreach Schleife direkt durch ist. Allerdings wundert mich dabei dann, warum das in der alten Version nicht passiert und warum Task2 nicht auch auf Completed gesetzt wird wie Task1.

Program:


using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using MensorCPT6140;

namespace Con_CPT6140_ModularPipeline
{
    public class rawValue
    {
        public DateTime rawTime;
        public byte[] rawPressure;
        public rawValue(byte[] rawPressure, DateTime rawTime)
        {
            this.rawTime = rawTime;
            this.rawPressure = rawPressure;
        }
    }

    public class PSValue
    {
        public DateTime time;
        public double pressure;
        public string unit;

        public PSValue(double pressure, string unit, DateTime time)
        {
            this.time = time;
            this.pressure = pressure;
            this.unit = unit;
        }
    }

    public class Program
    {
        //initialize BlockingCollection buffers
        public BlockingCollection<rawValue> readSerial_buffer = new BlockingCollection<rawValue>();
        public BlockingCollection<PSValue> IEEE754_buffer = new BlockingCollection<PSValue>();

        // initialize clock for PS-value timestamps
        public Stopwatch clock = Stopwatch.StartNew();
        //initialize Datetime
        public DateTime FirstCallTime = DateTime.Now;

        static void Main(string[] args)
        {
            //Cancellation tokens
            var cts1 = new CancellationTokenSource();
            var token1 = cts1.Token;

            //Create objects
            Program MainObject = new Program();
            ReadDataFromCPT6140_Mode6 ReadSerial = new ReadDataFromCPT6140_Mode6();
            ConvertDataToPSValue ConvData = new ConvertDataToPSValue();

            //new longrunning task factory
            var SensorTaskFactory = new TaskFactory(token1, TaskCreationOptions.LongRunning, TaskContinuationOptions.None, TaskScheduler.Current);

            MainObject.FirstCallTime = DateTime.Now;
            //Start tasks
            var task1 = SensorTaskFactory.StartNew(() => ReadSerial.ReadPort());
            var task2 = SensorTaskFactory.StartNew(() => ConvData.ConvStringIEEE754());

            Task.WaitAll(task1, task2, );
        }
    }
}

MensorCPT6140:


using System;
using System.IO;
using System.IO.Ports;
using System.Threading;
using System.Threading.Tasks;
using Con_CPT6140_ModularPipeline;

namespace MensorCPT6140
{
    //class with methods to read data transmitted from the MENSOR CPT6140 pressure sensor via RS232
    //each value is transmitted with 4 bytes representing the pressure value and a 1 byte (truncated) checksum - see sensor description for further information
    //this class is written for data transmitted in the burst mode (Mode6) which transmits data with up to 250Hz
    class ReadDataFromCPT6140_Mode6 : Program
    {
        public void ReadPort()
        {
            SerialPort PSPort = new SerialPort("COM2");
            // initializing serial port
            PSPort.BaudRate = 57600;
            PSPort.Open();

            // calling event (read bytes from serial port)
            PSPort.DataReceived += new SerialDataReceivedEventHandler(DataReceivedHandler);
        }

        public void DataReceivedHandler(
                            object sender,
                            SerialDataReceivedEventArgs e)
        {
            // reading from serial Port
            SerialPort sp = (SerialPort)sender;
            byte[] buffer = new byte[5];                            // reading 5 bytes (representing pressure value)
            int readBytes = sp.Read(buffer, 0, buffer.Length);
            readSerial_buffer.Add(new rawValue(buffer, (FirstCallTime + clock.Elapsed)));
        }
    }

    //Class containing methods to convert the 5byte data to a pressure value after checking the checksum
    //IEEE754 is used to convert data
    class ConvertDataToPSValue : Program
    {
        public bool CheckChecksum(rawValue SensorInput)
        {
            byte[] CheckBuffer = SensorInput.rawPressure;
            string Checksum_transmitted = CheckBuffer[4].ToString("X2");                                                                            //transmitted Checksum
            string Checksum_result = (CheckBuffer[0] + CheckBuffer[1] + CheckBuffer[2] + CheckBuffer[3]).ToString("X1").Substring(1);               //calculated Checksum
            if (Checksum_result == Checksum_transmitted)
            {
                return true;
            }
            else
            {
                return false;
            }
        }

        // converting hex to floating (IEEE754) - using BitConverter.ToSingle
        public PSValue ConvHexToDouble_IEEE754(rawValue SensorInput)
        {
            byte[] ConvBuffer = new byte[4];
            ConvBuffer[0] = SensorInput.rawPressure[0];
            ConvBuffer[1] = SensorInput.rawPressure[1];
            ConvBuffer[2] = SensorInput.rawPressure[2];
            ConvBuffer[3] = SensorInput.rawPressure[3];
            Array.Reverse(ConvBuffer);

            return new PSValue(BitConverter.ToSingle(ConvBuffer, 0), "mbar", SensorInput.rawTime);
        }
        public void ConvStringIEEE754()
        {
            foreach (var entry in readSerial_buffer.GetConsumingEnumerable())
            {
                if (CheckChecksum(entry))
                {
                    IEEE754_buffer.Add(ConvHexToDouble_IEEE754(entry));
                }
                else
                {
                    IEEE754_buffer.Add(new PSValue(0, "NaN", entry.rawTime));
                }
            }
        }
    }
}

PS: Tokens und Dispose würde ich nachträglich noch implementieren wollen, wenn das hier erstmal läuft...

16.807 Beiträge seit 2008
vor 6 Jahren

Die Schleife kann nicht durch gehen.
GetConsumingEnumerable blockiert, bis die Collection ein Eintrag hat oder finalisiert wird.

Aber Deine Vererbung sieht komplett falsch aus.
Eine Vererbung führt dazu, dass hier mehrere unabhängige Collections existieren, weil unterschiedliche Instanzen.
Von Collection A liest Du, schreibst aber in B - das kann nicht klappen.
Glaube Du solltest Dir nochmal anschauen, was Vererbung ist; es ist jedenfalls nicht was Du hier aktuell vor hast: das Teilen von gemeinsamen Referenzen.

Vererbung ist hier auch überhaupt nicht praktikabel. Bin ehrlich gesagt auch überfragt, wieso Du hier überhaupt eine Vererbung von Program in Betracht gezogen hast!? Man erbt niemals vom logischen Parent.
Ich glaub ich mach Dir echt heute Abend mal ein Beispiel, weil so ist das ein Holzweg 😉

X
Xodarap93 Themenstarter:in
19 Beiträge seit 2017
vor 6 Jahren

Aber Deine Vererbung sieht komplett falsch aus.
Eine Vererbung führt dazu, dass hier mehrere unabhängige Collections existieren, weil unterschiedliche Instanzen.
Von Collection A liest Du, schreibst aber in B - das kann nicht klappen.

Das Problem habe ich auch vermutet. An sich wollte ich auch nicht zwingend mit Vererbung arbeiten, ich habe das eher "zufällig" implementiert, da ich ohne groß zu überlegen bemerkt hatte, das so alle Zugriff auf die Collection haben. Bei den jetzt aufgetretenen Problemen habe ich mir aber fast gedacht, dass ich quasi mehrere Collections erzeuge.
Ich versuche das mal zu bereinigen..

Ich glaub ich mach Dir echt heute Abend mal ein Beispiel, weil so ist das ein Holzweg 😉

Das wäre eine Riesenhilfe. Solang versuche ich noch weiter dran zu arbeiten.
Und wunder dich bitte nicht, wenn ich nicht antworte. Aber ich bin bis Dienstag im Ausland.

X
Xodarap93 Themenstarter:in
19 Beiträge seit 2017
vor 6 Jahren

Eine Frage vorweg: Wenn ich nur eine Konsolenapplikation schreibe, dann ist die UI-Layer ja quasi nicht vorhanden, oder? (natürlich soll mein Program eines Tages eine FormsApplikation sein)

DataAccess-layer würde dann eine Klasse darstellen, in der alle Methoden unterkommen, die sich mit dem Lesen vom Serial Port befassen und

BusinessLogic-Layer wären dann Methoden wie verarbeiten der Daten.

Mein Problem genauer ausformuliert ist dann, wie bekomme ich es hin, dass die Daten die ich in der einen Klasse lese so abgespeichert werden, dass ich auf diese in der Logik-Klasse zugreifen kann. In dem Fall also auf den BlockingCollection-Buffer.

Ich habe natürlich noch mehr Probleme, das ist aber das aktuelle was sich in meinem Kopf festgesetzt hat.

Ich habe auch einfach Probleme dadurch, dass ich erst seit 3 Wochen mit C# arbeite und nicht sinnvoll aufbauend den Umgang damit gelernt habe. Sondern ich habe ein bestehendes großes Projekt und möchte das etwas umfangreiches hinzufügen und sehe mich daher direkt vielen großen Programmierkonzepten und Problemen gleichzeitig gegenüber gestellt.
Ich bearbeite zeitgleich, auch in meiner Freizeit, C#-Tutorials, da ich den Umfang und die Vorteile dieser Sprache schnell erkannt habe und Spaß an der Programmierung damit habe.
Allerdings braucht das wohl seine Zeit und in meiner aktuellen Situation muss ich leider mich auch mit dem großen Projekt direkt beschäftigen und versuchen die Probleme vorerst einfach so zu bewerkstelligen.
Gerade deshalb bin ich für eure Mitarbeit und Hilfe enorm dankbar.

Ich habe schon viel durch die ganzen hier vorgestellten Tips gelernt, aber nun habt ihr vielleicht Verständnis dafür, dass ich so unaufhaltsam gegen eine Wand nach der anderen fahre.
Ich verspräche ich werde mich bessern. Ich arbeite weiter grundlegende Konzepte durch um dann auch besseren Überblick über die großen zu haben 😃

4.931 Beiträge seit 2008
vor 6 Jahren

Auch eine Konsolenapplikation hat eine UI, nämlich Eingabe und Ausgabe mittels der Klasse Console, so daß du diese von deiner BusinessLogic-Schicht trennen solltest - so daß man bei einer anderen UI eben nur diese UI-Schicht austauschen muß.

Gerade bei einer Konsolenapplikation vermischen einige Anfänger UI und Logik, da diese eher prozedural (sequentiell) entwickelt wird, jedoch dann Schwierigkeiten haben, daraus dann eine WinForms oder WPF-Applikation zu entwickeln, da diese dann ereignisorientiert entwickelt werden.

T
2.219 Beiträge seit 2008
vor 6 Jahren

@Th69
Korrekt.
Gerade wenn man die ganzen Console Befehle in die Logik haut, nimmt man sich die Protierbarkeit des Codes auf eine andere UI bzw. muss mit Aufwand diese Befehle wieder entfernen/verschieben etc.

T-Virus

Developer, Developer, Developer, Developer....

99 little bugs in the code, 99 little bugs. Take one down, patch it around, 117 little bugs in the code.

16.807 Beiträge seit 2008
vor 6 Jahren

Vorweg: konnte den Code nicht testen, denn ich habe keinen COM-Port an meinen Rechnern (hat mich selbst verwundert).
Ich hab mir eine Ersatzklasse für den SerialPort gebaut, dass ich überhaupt testen konnte, dass dieses Konstrukt so funktioniert.

Der Code soll grundlegend zeigen, wie man lesen und schreiben mit Hilfe von zwei Tasks und einer BlockingCollection separiert.
Wer mehr Consumer Tasks will / braucht, der muss natürlich entsprechend erweitern.
Das Lesen findet in einem Task statt (inkl. Init des Ports) sowie das Schreiben der Logdatei in einem zweiten Task.

Die Initialisierung der Business Logik Klasse erzeugt das Grundkonstrukt; gestartet wird via Run - abgebrochen via Cancel.
Run erzeugt die Tasks durch die beiden Hilfsmethoden, die in einem größeren Code-Konstrukt eigene Klassen sein sollten (zB. wegen der Berechnung der CheckSumme etc).
Cancel führt das CompleteAdding aus, mit dem die Queue keine weiteren Einträge animmt.
Ich bin mir ehrlich gesagt nicht sicher, ob das Task.Cancel für den Serial Port zum Abschluss ausreicht; aber der Consumer beendet sich automatisch, da beim CompleteAdding die Einträge abgearbeitet werden und dann einfach die Schleife verlassen und somit der Thread beendet wird.

namespace ConsoleAppPort
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Initializing...");
            MyFancyBusinessService b = new MyFancyBusinessService("COM2", 57600, "test.txt");

            Console.WriteLine("Starting...");
            b.Run();


            Console.WriteLine("Started. Press key to request cancel...");
            Console.Read();

            Console.WriteLine("Requesting cancellation...");
            b.Cancel(TimeSpan.FromSeconds(30));
            Console.WriteLine("Cancelled.");


            Console.WriteLine("Press key to shutdown application");
            Console.Read();
        }
    }

    public class DataPortReader
    {
        public DataPortReader(string portName, int baudRate)
        {
            SerialPort port = new SerialPort(portName) { BaudRate = baudRate };
            port.Open();

            port.DataReceived += (sender, eventArgs) =>
            {
                SerialPort sourcePort = (SerialPort)sender;

                const int bufSize = 5;
                Byte[] buf = new Byte[bufSize];

                int bytesRead = sourcePort.Read(buf, 0, bufSize);


                OnNewData?.Invoke(this, new DataItem { Data = buf, Timestamp = DateTime.UtcNow });
            };
        }

        public delegate void NewDataHandler(object sender, DataItem e);

        public event NewDataHandler OnNewData;


    }

    public class DataItem
    {
        public Byte[] Data { get; set; }
        public DateTime Timestamp { get; set; }
    }


    public class MyFancyBusinessService
    {
        private readonly string _comPort;
        private readonly int _baudRate;
        private readonly string _logFile;

        public MyFancyBusinessService(string comPort, int baudRate, string logFile)
        {
            _comPort = comPort;
            _baudRate = baudRate;
            _logFile = logFile;
        }

        private readonly BlockingCollection<DataItem> _queue = new BlockingCollection<DataItem>();

        private readonly CancellationTokenSource _cts = new CancellationTokenSource();

        private Task ListenAsync()
        {
            return Task.Run(() =>
            {
                DataPortReader dpr = new DataPortReader(_comPort, _baudRate);
                dpr.OnNewData += (sender, item) => _queue.Add(item);


                Console.WriteLine("Listening waiting for Cancellation.");

                _cts.Token.WaitHandle.WaitOne();

                Console.WriteLine("Listening is finished.");
            });
        }

        private Task ConsumeAsync()
        {
            return Task.Run(async () =>
            {
                using (StreamWriter sr = new StreamWriter(_logFile))
                {
                    foreach (var entry in _queue.GetConsumingEnumerable()) // GetConsumingEnumerable blockiert solang kein CompleteAdding erfolgt
                    {
                        await sr.WriteLineAsync($"Got {entry.Data.Length} bytes on {entry.Timestamp}");
                    }
                }

                Console.WriteLine("Consume is finished.");

            });
        }

        private Task[] _tasks;

        public void Run()
        {
            if (!_cts.IsCancellationRequested && _tasks == null)
            {
                var produceTask = ListenAsync();
                var consumeTask = ConsumeAsync();

                _tasks = new[] { produceTask, consumeTask };
            }
        }


        public void Cancel(TimeSpan cancelWaitTime)
        {
            _queue.CompleteAdding();
            _cts.Cancel();

            Task.WhenAll(_tasks).Wait(cancelWaitTime);
        }
    }
}

Was noch gesagt sein soll:
Wenn man sich einen neuen Pattern aneignet, dann macht es, sinn dieses Konstrukt anhand eines Minimalsbeispiels zu programmieren und zu verstehen und nicht direkt versucht die ganze Programmlogik darauf umzumünzen.
So kommt man viel schneller an Lernergebnisse und schlussendlich das Ziel.


namespace ConsoleAppPort
{
    class Program
    {
        static void Main(string[] args)
        {
            CancellationTokenSource cts = new CancellationTokenSource();
            BlockingCollection<string> queue = new BlockingCollection<string>();
            Task writeTask = Task.Run(() =>
            {
                foreach (var entry in queue.GetConsumingEnumerable())
                {
                    Console.WriteLine("[ECHO]: " + entry);
                }
            }, cts.Token);

            Console.WriteLine("Write text to get your echo or leave empty to cancel. ");
            while (true)
            {
                var text = Console.ReadLine();
                if (text == "")
                {
                    break;
                }


                queue.Add(text);
            }

            Console.WriteLine("Requesting cancel...");

            queue.CompleteAdding();
            cts.Cancel();

            Console.WriteLine("Waiting for task finish...");
            writeTask.Wait();
            Console.WriteLine("Cancelled.");


            Console.WriteLine("Press key to shutdown application");
            Console.Read();
        }
    }
}
X
Xodarap93 Themenstarter:in
19 Beiträge seit 2017
vor 6 Jahren

Vielen Dank für dieses ausführliche Beispiel. Das hilft ungemein.
Auch ich blicke wirklich schnell durch alles durch, was du da anwendest.

Eine kleine Frage aber noch, dein Kommentar "// GetConsumingEnumerable blockiert solang kein CompleteAdding erfolgt".
Willst du damit einfach sagen, dass man GetConsumingEnumerable nutzt, anstatt einfach nur die _queue, da sonst gewartet werden würde bis die _queue mit CompleteAdding abgeschlossen wurde?

Und wenn ich meinen Code nun um Funktionen erweitere sollte ich zum Beispiel für die Hilfsmethoden ConsumeAsync() und ListenAsync() eigene Klassen erstellen (Stichwort Drei-Schichten-Architektur) und in diesen dann Methoden zum Checksumme prüfen oder Datendekodierung erstellen?

Beste Grüße

D
985 Beiträge seit 2014
vor 6 Jahren

BlockingCollection<T>.GetConsumingEnumerable() blockiert schon mal gar nicht, sondern liefert nur eine IEnumerable<T> Instanz zurück.

Wenn man darüber jetzt iteriert, dann wird solange blockiert, bis entweder ein Wert vorhanden ist, oder BlockingCollection<T>.CompleteAdding() aufgerufen wurde. Blockierend ist nur der Aufruf von GetNext() des Enumerators.

Das ist es aber auch was Abt mit dem Kommentar meinte.

16.807 Beiträge seit 2008
vor 6 Jahren

Das yield im IEnumerable wird erst ausgelöst, wenn ein Element der Schleife hinzugefügt wurde oder die Collection als Abgeschlossen markiert wurde.
Das schont die CPU im Idle, da die Schleife nicht "leer durchlaufen" wird.

Die Alternative wäre ja ein while(true) mit nem Sleep und ner Abfrage, ob was neues in der Queue ist.
Das wird damit verhindert.

X
Xodarap93 Themenstarter:in
19 Beiträge seit 2017
vor 6 Jahren

Ich habe nun eigene Producer- und Consumer-Klassen.
Das läuft soweit auch, heißt das logfile wird erstellt und beschrieben.

Aktuell läuft auf diese Weise aber natürlich das Canceln der Tasks nicht mehr. Natürlich, da an der Stelle, an der ich die Tasks beende die Tasks gar nicht bekannt sind.

Ich habe schon ein paar Dinge versucht um das zu beheben und wahrscheinlich ist es auch ganz einfach.
Zum Beispiel habe ich probiert mit einem 2. Konstruktor in der jeweiligen Klasse durch Überladen einen Abbruch zu erzeugen. Aber auch da waren die Tasks dann nicht ohne weiteres abzubrechen.

Ein Tip wie ich das hier am besten machen sollte?
Und ist das mit den Klassen für Producer und Consumer überhaupt Sinnvoll umgesetzt?

using System;
using System.IO;
using System.IO.Ports;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleAppPort
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Initializing...");
            MyFancyBusinessService b = new MyFancyBusinessService("COM2", 57600, @"C:\Users\sharifso\Desktop\\MensorCPT6140_" + DateTime.Now.Year + "_" + DateTime.Now.Month + "_" + DateTime.Now.Day + "_" + DateTime.Now.Hour + "_" + DateTime.Now.Minute + ".txt");

            Console.WriteLine("Starting...");
            b.Run();


            Console.WriteLine("Started. Press key to request cancel...");
            Console.Read();

            Console.WriteLine("Requesting cancellation...");
            b.Cancel(TimeSpan.FromSeconds(30));
            Console.WriteLine("Cancelled.");


            Console.WriteLine("Press key to shutdown application");
            Console.Read();
        }
    }

    public class DataPortReader
    {
        public DataPortReader(string portName, int baudRate)
        {
            SerialPort port = new SerialPort(portName) { BaudRate = baudRate };
            port.Open();

            port.DataReceived += (sender, eventArgs) =>
            {
                SerialPort sourcePort = (SerialPort)sender;

                const int bufSize = 5;
                Byte[] buf = new Byte[bufSize];

                int bytesRead = sourcePort.Read(buf, 0, bufSize);


                OnNewData?.Invoke(this, new DataItem { Data = buf, Timestamp = DateTime.UtcNow });
            };
        }

        public delegate void NewDataHandler(object sender, DataItem e);

        public event NewDataHandler OnNewData;


    }

    public class DataItem
    {
        public Byte[] Data { get; set; }
        public DateTime Timestamp { get; set; }
    }

    public class ProducerClass
    {
        private readonly string _comPort;
        private readonly int _baudRate;
        private readonly string _logFile;
        private BlockingCollection<DataItem> _queue;
        private CancellationTokenSource _cts;
        private Task[] _tasks;

        public ProducerClass(BlockingCollection<DataItem> _queue, CancellationTokenSource _cts, string comPort, int baudRate, string logFile)
        {
            this._queue = _queue;
            this._cts = _cts;
            _comPort = comPort;
            _baudRate = baudRate;
            _logFile = logFile;

            if (!_cts.IsCancellationRequested && _tasks == null)
            {
                var produceTask = ListenAsync();

                _tasks = new[] { produceTask };
            }
        }

        private Task ListenAsync()
        {
            return Task.Run(() =>
            {
                DataPortReader dpr = new DataPortReader(_comPort, _baudRate);
                dpr.OnNewData += (sender, item) => _queue.Add(item);


                Console.WriteLine("Listening waiting for Cancellation.");

                _cts.Token.WaitHandle.WaitOne();

                Console.WriteLine("Listening is finished.");
            });
        }
    }

    public class ConsumerClass
    {
        private readonly string _logFile;
        private BlockingCollection<DataItem> _queue;
        private CancellationTokenSource _cts;
        private Task[] _tasks;

        public ConsumerClass(BlockingCollection<DataItem> _queue, CancellationTokenSource _cts, string logFile)
        {
            this._queue = _queue;
            this._cts = _cts;
            _logFile = logFile;

            if (!_cts.IsCancellationRequested && _tasks == null)
            {
                var consumeTask = ConsumeAsync();

                _tasks = new[] { consumeTask };
            }
        }

        private Task ConsumeAsync()
        {
            return Task.Run(async () =>
            {
                using (StreamWriter sr = new StreamWriter(_logFile))
                {
                    foreach (var entry in _queue.GetConsumingEnumerable()) // GetConsumingEnumerable blockiert solang kein CompleteAdding erfolgt
                    {
                        await sr.WriteLineAsync($"Got {entry.Data.Length} bytes on {entry.Timestamp}");
                    }
                }

                Console.WriteLine("Consume is finished.");

            });
        }
    }

    public class MyFancyBusinessService
    {
        private readonly string _comPort;
        private readonly int _baudRate;
        private readonly string _logFile;

        public MyFancyBusinessService(string comPort, int baudRate, string logFile)
        {
            _comPort = comPort;
            _baudRate = baudRate;
            _logFile = logFile;
        }

        private readonly BlockingCollection<DataItem> _queue = new BlockingCollection<DataItem>();
        private readonly CancellationTokenSource _cts = new CancellationTokenSource();

        private Task[] _tasks;

        public void Run()
        {
            string LogfileString = @"C:\Users\sharifso\Desktop\\MensorCPT6140_" + DateTime.Now.Year + "_" + DateTime.Now.Month + "_" + DateTime.Now.Day + "_" + DateTime.Now.Hour + "_" + DateTime.Now.Minute + ".txt";
            ProducerClass PC = new ProducerClass(_queue, _cts, "COM2", 57600, LogfileString);
            ConsumerClass CC = new ConsumerClass(_queue, _cts, LogfileString);
        }
        
        public void Cancel(TimeSpan cancelWaitTime)
        {
            _queue.CompleteAdding();
            _cts.Cancel();

            Task.WhenAll(_tasks).Wait(cancelWaitTime);
        }
    }
}
16.807 Beiträge seit 2008
vor 6 Jahren

Ich verstehe den Sinn Deiner Consumer Class so nicht.
Was hast Du vor? Warum folgst Du nicht einfach meinem Beispiel?

X
Xodarap93 Themenstarter:in
19 Beiträge seit 2017
vor 6 Jahren

...Run erzeugt die Tasks durch die beiden Hilfsmethoden, die in einem größeren Code-Konstrukt eigene Klassen sein sollten (zB. wegen der Berechnung der CheckSumme etc).

Das hatte ich so verstanden, dass ich eine Klasse für Producer-Methoden (in diesem Fall nur der ListenAsync-Task) und eine für Consumer-Methoden(aktuell nur ConsumeAsync()-Task, später noch um Checksumme und Dekodierung erweitert) erstelle.

Das habe ich hier umsetzen wollen, generell wollte ich also dein Beispiel beibehalten, bloß Consumer und Producer als eigene Klassen haben.

D
985 Beiträge seit 2014
vor 6 Jahren

Dann wäre z.B. so ein Ansatz sinnvoller


public interface IProducer<T>
{
    Task ProduceAsync(BlockingCollection<T> queue, CancellationToken cancellationToken);
}

public interface IConsumer<T>
{
    Task ConsumeAsync(IEnumerable<T> source, CancellationToken cancellationToken);
}

public class ProducerConsumerService<T>
{
    private readonly IProducer<T> _producer;
    private readonly IConsumer<T> _consumer;
    private readonly BlockingCollection<T> _queue = new BlockingCollection<T>();
    private readonly CancellationTokenSource _cts = new CancellationTokenSource();
    private Task[] _tasks;

    public ProducerConsumerService(IProducer<T> producer, IConsumer<T> consumer)
    {
        this._producer = producer ?? throw new ArgumentNullException(nameof(producer));
        this._consumer = consumer ?? throw new ArgumentNullException(nameof(consumer));
    }

    public void Run()
    {
        if (!_cts.IsCancellationRequested && _tasks == null)
        {
            var producerTask = _producer.ProduceAsync(_queue, _cts.Token);
            var consumerTask = _consumer.ConsumeAsync(_queue.GetConsumingEnumerable(), _cts.Token);

            _tasks = new[] { producerTask, consumerTask };
        }
    }

    public void Cancel(TimeSpan cancelWaitTime)
    {
        _queue.CompleteAdding();
        _cts.Cancel();
        Task.WhenAll(_tasks).Wait(cancelWaitTime);
    }
}

16.807 Beiträge seit 2008
vor 6 Jahren

Ja, eine Consumer Class kann durchaus sinn machen - aber nicht so, wie Du sie implementiert hattest.
So hat sie nämlich den Sinn und die Technik hinter Consumer Producer einfach kurz durchbrochen 😉

Da ist zB Sir Rufo's Beispiel besser und auch sehr schlank.

D
985 Beiträge seit 2014
vor 6 Jahren

Da ist zB Sir Rufo's Beispiel besser und auch sehr schlank.

Ist ja einfach nur dein Beispiel leicht veredelt 😉

X
Xodarap93 Themenstarter:in
19 Beiträge seit 2017
vor 6 Jahren

Dann wäre z.B. so ein Ansatz sinnvoller

  
public interface IProducer<T>  
{  
    Task ProduceAsync(BlockingCollection<T> queue, CancellationToken cancellationToken);  
}  
  
public interface IConsumer<T>  
{  
    Task ConsumeAsync(IEnumerable<T> source, CancellationToken cancellationToken);  
}  
  
public class ProducerConsumerService<T>  
{  
    private readonly IProducer<T> _producer;  
    private readonly IConsumer<T> _consumer;  
    private readonly BlockingCollection<T> _queue = new BlockingCollection<T>();  
    private readonly CancellationTokenSource _cts = new CancellationTokenSource();  
    private Task[] _tasks;  
  
    public ProducerConsumerService(IProducer<T> producer, IConsumer<T> consumer)  
    {  
        this._producer = producer ?? throw new ArgumentNullException(nameof(producer));  
        this._consumer = consumer ?? throw new ArgumentNullException(nameof(consumer));  
    }  
  
    public void Run()  
    {  
        if (!_cts.IsCancellationRequested && _tasks == null)  
        {  
            var producerTask = _producer.ProduceAsync(_queue, _cts.Token);  
            var consumerTask = _consumer.ConsumeAsync(_queue.GetConsumingEnumerable(), _cts.Token);  
  
            _tasks = new[] { producerTask, consumerTask };  
        }  
    }  
  
    public void Cancel(TimeSpan cancelWaitTime)  
    {  
        _queue.CompleteAdding();  
        _cts.Cancel();  
        Task.WhenAll(_tasks).Wait(cancelWaitTime);  
    }  
}  
  

Das sieht wirklich sehr Schlank und Sinnvoll aus. Aber vor der Umsetzung habe ich noch ein paar Fragen.
Ich habe nun vor auf das Beispiel von Abt zurückzurudern aber dann die Interfaces zu nutzen, wie du es beispielhaft aufzeigst.

Die Klasse ProducerConsumerService<T> war vorher MyFancyBusinessService, richtig?
Ich würde weiterhin eine Consumer- und eine Producer-Klasse haben, diese erben aber vom jeweiligen Interface. Und in der jeweiligen Klasse finden sich die im Interface erwähnten Tasks.

So müsste es möglich sein die jeweiligen Consumer- und Producer-Tasks , trotz eigener Klassen, von der Service-Klasse aus zu starten und auch zu Canceln. Und das natürlich nun ohne den Sinn der Producer/Consumer-Trennung zu verlieren, wie es bei mir der Fall war.

Das klingt soweit einleuchtend.

Aber warum übergebe ich beim Aufruf von ProducerConsumerService schon die beiden Interfaces? Der Aufruf, bzw. die Initialisierung erfolgt doch von Main() aus. Wie ich dabei vorgehe ist mir nicht ganz klar.

Und noch eine kleine Frage bezüglich des Beispiels von Abt.
Ab und zu wirft er eine "InvalidOperationException" nachdem Canceln. Diese Tritt in ListenAsync() auf, da offensichtlich die _queue mit Complete markiert ist, aber noch Daten zur Verfügung stehen.
Passiert also in folgender Zeile:

dpr.OnNewData += (sender, item) => _queue.Add(item);

Ich habe schon vermutet, dass das passiert da der Port natürlich weiterhin Offen ist und entsprechend weiter neue Daten zur Verfügung stehen. Aber auch wenn ich beim Cancel() den Port extra schließe tritt dieser Fall ab und zu auf.
Sehr verwunderlich ist eben, dass ich die Ausnahme nicht direkt reproduzieren kann. Bei gleichem Verhalten tritt diese mal auf und mal wieder nicht.

Weiterhin vielen Dank an euch beide.