Laden...

Wieso hat eine asynchrone Pipeline-Implementierung eine schlechtere Performance als nicht async?

Erstellt von R3turnz vor 7 Jahren Letzter Beitrag vor 7 Jahren 2.655 Views
R
R3turnz Themenstarter:in
125 Beiträge seit 2016
vor 7 Jahren
Wieso hat eine asynchrone Pipeline-Implementierung eine schlechtere Performance als nicht async?

Hallo,
ich habe nun die asynchrone Pipeline Implementation umgesetzt. Diese Variante kommt wenn dann auf gleiche, wenn nicht sogar schlechtere Performance als die ohne asynchrone Operationen. Da sie aber auf zwei Threads bzw. Tasks verteilt läuft, müsste sie doch eigentlich schneller laufen?(Ich poste nur den wichtigen Teil, damit der Code nicht unnötige lang wird):


 public async Task  CreateIndex(string[] directoryPaths, string[] excludeDirectories, string[] preferDirectories,CancellationToken ctoken)
        { 
               if (disposed) throw new ObjectDisposedException(null);
            bool hasPreferred = false;
            //Validierung//



            WriteIndexStart(SupportedFileVersion);
            _writer.WriteStartElement("entries");
            //create tasks//
            TaskFactory factory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
            BlockingCollection<string> fileBuffer = new BlockingCollection<string>(64);
            //prefered directories//
            if (hasPreferred)
                {
                    //makes reading easier//
                    _writer.WriteAttributeString("hasPreferred", "true");
                    _writer.WriteStartElement("preferred");
                    Task preferFileSearcher = factory.StartNew(delegate() 
                    {
                        EnumerateFolders(preferDirectories, excludeDirectories, fileBuffer, ctoken);
                    }
                    );
                    Task preferFileWriter = factory.StartNew(delegate () 
                    {
                         WriteFiles(fileBuffer, ctoken);
                    });
                    await Task.WhenAll(preferFileSearcher, preferFileWriter);
                    _writer.WriteEndElement();
                    fileBuffer = new BlockingCollection<string>(256);
                }
                else _writer.WriteAttributeString("hasPreferred", "false");
                _writer.WriteStartElement("other");
   
                Task fileSearcher = factory.StartNew(delegate ()
                {
                    EnumerateFolders(directoryPaths, excludeDirectories, fileBuffer, ctoken);
                });

              Task fileWriter = factory.StartNew(delegate () { WriteFiles(fileBuffer, ctoken); });
                await Task.WhenAll(fileSearcher, fileWriter);
                _writer.WriteEndElement();
                _writer.WriteEndElement();
                WriteIndexEnd();
        }


Ich verwende async await, um nicht einen Thread im Leerlauf zu haben, während er mit WaitAll() auf die IO-Threads wartet. Die eigentlichen Fragen sind wie zu erwarten:
Wo könnte man den Stil der Implementation verbessern?
Wieso ist die Performance im Verhältnis zur synchronen Variante "gleich"?

Da in diesem Forum funktionierender Code verlangt wird:


    public class IndexerThreaded : IDisposable,INotifyPropertyChanged
    {
        #region fields
        XmlWriter _writer;
        #endregion
        #region properties

        public long FileCount { get; private set; }
       
        private string _currentDirectory;

        public string CurrentDirectory
        {
            get { return _currentDirectory; }
            set
            {
                _currentDirectory = value;
                OnPropertyChanged(new PropertyChangedEventArgs("CurrentDirectory"));
            }
        }
        public static Version SupportedFileVersion = new Version(1, 0);
        public const string IndexNamespaceURI = "http://www.r3turnz.com/index";
        #endregion
        #region events
        public event PropertyChangedEventHandler PropertyChanged;
        protected virtual void OnPropertyChanged(PropertyChangedEventArgs args)
        {
            var handler = PropertyChanged;
            if(handler != null)
            {
                PropertyChanged(this, args);
            }
        }
        #endregion
        #region constructors
        public IndexerThreaded(string indexPath)
        {
            XmlWriterSettings settings = new XmlWriterSettings();
            settings.Indent = true;
            settings.NamespaceHandling = NamespaceHandling.OmitDuplicates;
            settings.WriteEndDocumentOnClose = true;
            settings.Encoding = Encoding.UTF8;
            _writer = XmlWriter.Create(indexPath,settings); 
        }
          public IndexerThreaded(string indexPath ,XmlWriterSettings settings)
        {
            _writer = XmlWriter.Create(indexPath, settings);
        }
        #endregion
         public async Task  CreateIndex(string[] directoryPaths, string[] excludeDirectories, string[] preferDirectories,CancellationToken ctoken)
        {
            if (disposed) throw new ObjectDisposedException(null);
            bool hasPreferred = false;
            //validation and optimization of the parameters//
            if (directoryPaths.Length < 1) throw new ArgumentException("A search path must be provided");
            directoryPaths.Distinct();

            if (excludeDirectories != null && excludeDirectories.Length > 0)
            {
                excludeDirectories.Distinct();
               if(directoryPaths.Intersect(excludeDirectories).Count() > 0) throw new ArgumentException("A directory is contained in the exclude list!");
            }
            if(preferDirectories != null && preferDirectories.Length > 0)
            {
                preferDirectories.Distinct();
                if (preferDirectories.Intersect(excludeDirectories).Count() > 0) throw new ArgumentException("A directory is contained in the exclude list!");
                hasPreferred = true;
            }

            WriteIndexStart(SupportedFileVersion);
            _writer.WriteStartElement("entries");
            //create tasks//
            TaskFactory factory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
            BlockingCollection<string> fileBuffer = new BlockingCollection<string>(64);
            //prefered directories//
            if (hasPreferred)
                {
                    //makes reading easier//
                    _writer.WriteAttributeString("hasPreferred", "true");
                    _writer.WriteStartElement("preferred");
                    Task preferFileSearcher = factory.StartNew(delegate() 
                    {
                        EnumerateFolders(preferDirectories, excludeDirectories, fileBuffer, ctoken);
                    }
                    );
                    Task preferFileWriter = factory.StartNew(delegate () 
                    {
                         WriteFiles(fileBuffer, ctoken);
                    });
                    await Task.WhenAll(preferFileSearcher, preferFileWriter);
                    _writer.WriteEndElement();
                    fileBuffer = new BlockingCollection<string>(256);
                }
                else _writer.WriteAttributeString("hasPreferred", "false");
                _writer.WriteStartElement("other");
   
                Task fileSearcher = factory.StartNew(delegate ()
                {
                    EnumerateFolders(directoryPaths, excludeDirectories, fileBuffer, ctoken);
                });

              Task fileWriter = factory.StartNew(delegate () { WriteFiles(fileBuffer, ctoken); });
                await Task.WhenAll(fileSearcher, fileWriter);
                _writer.WriteEndElement();
                _writer.WriteEndElement();
                WriteIndexEnd();
        }
        #endregion
        #region xml methods
        private void WriteIndexStart(Version fileVersion)
        {
            _writer.WriteStartDocument();
            _writer.WriteStartElement("in", "index", IndexNamespaceURI);
            _writer.WriteAttributeString("formatVersion", fileVersion.ToString());
        }   
        private void WriteIndexEnd()
        {
            _writer.WriteStartElement("summary");
            _writer.WriteStartElement("fileCount");
            _writer.WriteValue(FileCount);
            _writer.WriteEndDocument();
        }

        //writer file entry//
        private void WriteFileToIndex(string file,string fileName)
        {
            _writer.WriteStartElement("e");
            _writer.WriteAttributeString("n", file);
            _writer.WriteCData(file);
            _writer.WriteEndElement();
        }
        #endregion
        #region enumeration
        private void EnumerateFolder(string folder, string[] excludeDirectories, BlockingCollection<string> fileBuffer,CancellationToken ctoken)
        {
                    fileBuffer.Add(folder, ctoken);
                    foreach (var file in Directory.EnumerateFiles(folder))
                    {
                        ctoken.ThrowIfCancellationRequested();
                        fileBuffer.Add(file, ctoken);
                        FileCount++;
                    }
                    string[] subDirs = Directory.GetDirectories(folder).ToArray();
                    if (excludeDirectories != null)
                    {
                        subDirs = Directory.EnumerateDirectories(folder).Where(fold => !excludeDirectories.Contains(folder)).ToArray();
                    }
                    foreach (var subDir in subDirs)
                    {
                try
                {
                    EnumerateFolder(subDir, excludeDirectories, fileBuffer, ctoken);
                }
                catch (UnauthorizedAccessException) { }
                catch(FileNotFoundException) { }
                catch(DirectoryNotFoundException) { }
            }
        }
        private void EnumerateFolders(string[] folders,string[] excludeDirectories, BlockingCollection<string> fileBuffer,CancellationToken ctoken)
        {
            try
            {
                foreach (var folder in folders)
                {
                    CurrentDirectory = folder;
                    EnumerateFolder(folder, excludeDirectories, fileBuffer, ctoken);
                }
            }
            catch (OperationCanceledException) { }
            finally
            {
                fileBuffer.CompleteAdding();
            }
        }
        private void WriteFiles(BlockingCollection<string> fileBuffer, CancellationToken ctoken)
        {
            try
            {
                foreach (var file in fileBuffer.GetConsumingEnumerable(ctoken))
                {
                    ctoken.ThrowIfCancellationRequested();
                    WriteFileToIndex(file,Path.GetFileNameWithoutExtension(file));
                }
            }
            catch(OperationCanceledException) { }
        }
        #endregion
        #region disposing
        bool disposed = false;
        public void Dispose()
        {
            if (!disposed)
            {
                Dispose(true);
                GC.SuppressFinalize(this);
            }
        }
        protected virtual void Dispose(bool isSafetoDisposeObjects)
        {
            if (isSafetoDisposeObjects)
            {
                _writer.Close();
            }
            //unmanaged//
        }
        ~IndexerThreaded()
        {
            Dispose(false);
        }
        #endregion
    }

Ich habe jetzt mal alle Dokumentationselemente entfernt, mit ihnen wurde sehr unübersichtlich.

16.807 Beiträge seit 2008
vor 7 Jahren

Du hast den klassischen Denkfehler: nur weil etwas parallel (oder asynchron läuft), läuft es nicht schneller.

Gerade Aktionen auf Festplatten laufen langsamer, wenn parallel zugegriffen wird.
Bei HDDs springt der Festplattenkopf die ganze Zeit hin und her und bei SSDs muss der Controller ständig andere Tables der Indexierung laden.
Bei Festplatten (HDD) ist paralleler Zugriff kontraproduktiv für die Geschwindigkeit.
Bei Netzwerkfestplatten kommt es drauf an, wie der Zugriff erfolgt und was die Gegenstelle darstellt. SMB Zugriffe auf RAID Systeme lohnen sich parallel. SAN nur, wenn die Platten auch die Leistung bringen.

async ist technisch gesehen sowieso langsamer.
Es ist auch nicht dazu da, dass etwas schneller läuft, sondern dass etwas asynchron läuft.
async hat immer einen Overhead. Bei korrekter async Implementierung ist der Overhead marginal; messbar aber sehr sehr sehr sehr selten sprübar.

Mit Threads zu arbeiten, ohne zu wissen was der Unterschied parallel vs. async ist: geht zu 99,9% in die Hose.
Lektüre für Multi Threaded Programmierung: [Artikel] Multi-Threaded Programmierung

Bzgl. IO Operationen kannst Du mein Projekt QuickIO.NET auf github oder unter http://quickio.net anschauen.
Quellcode ist offen. Parallel erfolgt auch hier nichts, weil langsamer.

Ansonsten zeigt Dein Code auch grundlegende Architekturfehler: man mischt keinen Business Logik Code mit UI Code.
[Artikel] Drei-Schichten-Architektur