Dienstag, 17. Mai 2016

FDK - Parallel-Multithreading-Pipeline


Ein kleiner Codeauszug aus unser Multithread Pipeline:

TPipeline.Configure( )
  {} .Throttle( 10 ) // max. 10 Items in der OutputCollection
  {} .Stage<Integer>(
    procedure( Output: IBlockingCollection<Integer> )
    var
      i: Integer;
    begin
      for i := 1 to 100 do
        Output.Add( GetDataFromWebservice( i ) )
    end )
  {} .NumTasks( 5 ) // Anzahl der Tasks für die nächste Stage
  {} .Stage(
    procedure( const Input: Integer; out Output: Integer )
    begin
      Output := CalculateData( Input );
    end )
  {} .Stage(
    procedure( Input, Output: IBlockingCollection<Integer> )
    var
      v: Integer;
    begin
      for v in Input do
        begin
          WriteDataInDatabase( v );
        end;
    end )
  {} .Run( );

Der abgebildete Arbeitsprozess holt in diesem Beispiel 100 Daten (hier zu Demonstration Integer-Werte) von einem theoretischen Webserver (Im Thread). Maximal ( 10 ) Werte werden hierbei in die Verarbeitungspipeline eingestellt.  Wenn die nachfolgende Stage ( In 5 Threads ) diese Werte nicht schnell genug abarbeiten kann, werden die Threads die die Daten von Webserver holen, nach Erreichen des Throttlewertes „schlafen“ gelegt. Das „Aufwachen“ passiert natürlich sofort wenn wieder Platz in der Eingangsqueue ist. Dieses Verhalten kontrolliert die BlockingCollection für alle Stages. Falls die Anzahl der Threads nicht angegeben ist, gilt der default Wert.