Ver Mensaje Individual
  #1  
Antiguo 13-11-2017
bucanero bucanero is offline
Miembro
 
Registrado: nov 2013
Ubicación: Almería, España
Posts: 208
Reputación: 11
bucanero Va camino a la fama
Cola de elementos multi-thread

Aquí dejo una clase genérica que facilita mucho el trabajo de multi-thread, el funcionamiento es relativamente sencillo, se van insertando elementos y la clase se encarga de crear tantos hilos como se le indique (maxThreads).

Los hilos se ejecutarán extrayendo y procesando cada uno de los elementos de la cola de elementos hasta que esta quede totalmente vacía.


Código Delphi [-]
unit UQueueMultiThreads;

///  Esta clase desciende de TLIST y funciona como una cola de elementos, insertando al final
///  y sacando de la cola los primeros. Ademas encapsula una lista de TThread para multiples
///  hilos de ejecucion paralelos sobre los propios elementos de la cola. Iniciandose los
///  hilos de forma automatica al insertar los elementos en la cola y finalizando los hilos
///  tambien de forma automatica cuando la cola se queda sin mas elementos para procesar.


interface

uses
  System.SysUtils, System.Classes,
  Vcl.Dialogs, System.Generics.Collections, System.TypInfo;

type
  TNotifyItemEvent=procedure(Sender:TObject; var item:T) of object;
  TClassThread = class of TThread;

  /// List with the threads running
  /// Lista con los hilos ejecutandose
  TListThreads=class(TList)
  private
    FTerminated:boolean;
    FMaxThreads: Integer;
  protected
    function CanAddThread:Boolean;
    function finished:Boolean;
  public
    constructor Create; overload;
    destructor destroy; override;
  published
    property  MaxThreads:Integer read FMaxThreads write FMaxThreads default 1;
    property Terminated:boolean read FTerminated;
    procedure TerminateAll;
  end;

  IInterfaceQueue = Interface
    ['{DDBEC426-1114-439D-AA61-15498893BC5F}']
    function GetThreadCount: Integer;
    function finished:Boolean;
    procedure TerminateAll;
    property  ThreadCount:Integer read GetThreadCount;
  End;

  TQueueMultiThread=class abstract(TList, IInterfaceQueue)
  private
    FClassThread:TClassThread;
    FListThreads:TListThreads;
    FOnTerminateThread: TNotifyEvent;
    FOnStartThread: TNotifyEvent;
    FOnAddItem: TNotifyItemEvent;
    FOnFinishItem: TNotifyItemEvent;
    FOnStartItem: TNotifyItemEvent;
    function GetMaxThreads: Integer;
    procedure SetMaxThreads(const Value: Integer);
    function GetThreadCount: Integer;
    procedure TerminateThread(sender:TObject); virtual;
    procedure AddNewThreads;
    constructor Create; overload;

    //----------------------------------------------
    //  To use interfaces it has been necessary to include these procedures
    //  Estos procedimientos han sido necesarios incluirlos para poder utilizar interfaces
    //  https://sergworks.wordpress.com/2010...ence-counting/
    function QueryInterface(const IID: TGUID; out Obj): HResult; stdcall;
    function _AddRef: Integer; stdcall;
    function _Release: Integer; stdcall;
    //----------------------------------------------
  public
    constructor Create(AClassThread:TClassThread); overload;
    destructor Destroy; override;
    function  Add(const value:T):Integer; overload;
    function  NewThread:boolean; overload; virtual;
    procedure TerminateAll;
    function finished:Boolean;
    function ExtractItem(var item:T):Boolean;
    function FinishItem(var item:T):Boolean;
  published
    property  MaxThreads:Integer read GetMaxThreads write SetMaxThreads default 1;
    property  ThreadCount:Integer read GetThreadCount;
    property  OnStartThread:TNotifyEvent read FOnStartThread write FOnStartThread;
    property  OnTerminateThread:TNotifyEvent read FOnTerminateThread write FOnTerminateThread;
    property  OnAddItem:TNotifyItemEvent read FOnAddItem write FOnAddItem;
    property  OnStartItem:TNotifyItemEvent read FOnStartItem write FOnStartItem;
    property  OnFinishItem:TNotifyItemEvent read FOnFinishItem write FOnFinishItem;
  end;

  TThread = class(TThread)
  private
    FListItems:TQueueMultiThread;
    procedure GetNewItem;
    procedure FinishItem;
  protected
    FItem:T;
    FListEmpty:Boolean;
    procedure ProcessItem; virtual; abstract;
    procedure execute; override;
    property Item:T read FItem write FItem;
  public
    property ListItems:TQueueMultiThread read FListItems write FListItems;
  end;

implementation

{ TListThreads }
procedure TListThreads.TerminateAll;
var
  i:longint;
begin
  FTerminated := true;
  for i := count - 1 downto 0 do
  try
    if not items[i].Finished then
      items[i].terminate;
  except
  end;
end;

function TListThreads.CanAddThread: Boolean;
begin
  result:= not FTerminated and ((MaxThreads = 0) or (Count < MaxThreads));
end;

constructor TListThreads.Create;
begin
  inherited;
  MaxThreads:=1;
end;

destructor TListThreads.destroy;
begin
  //all the threads running are finished
  // si hay hilos ejecutandose se finalizan todos
  if Count>0 then
    TerminateAll;
  //and wait until all the threads are finished
  // y se espera hasta salir de todos los hilos
  while Count>0 do
    Sleep(300);
  inherited;
end;

function TListThreads.finished: Boolean;
begin
  Result:= (Count<=0) or FTerminated;
end;

{ Tthread }

{ TQueueThread }
function TQueueMultiThread.Add(const value: T): Integer;
var
  AItem:T;
begin
  AItem:=value;
  if assigned(OnAddItem) then
    OnAddItem(Self, AItem);
  Result:=inherited Add(AItem);
  AddNewThreads;
end;

procedure TQueueMultiThread.AddNewThreads;
begin
  try
    while (Count > 0) and FListThreads.CanAddThread and NewThread do;
  except
  end;
end;

constructor TQueueMultiThread.Create(AClassThread: TClassThread);
begin
  Create;
  FClassThread := AClassThread;
end;

constructor TQueueMultiThread.Create;
begin
  inherited;
  FListThreads:=TListThreads.Create;
end;

destructor TQueueMultiThread.Destroy;
begin
  FListThreads.Destroy;
  inherited;
end;

function TQueueMultiThread.ExtractItem(var item: T): Boolean;
begin
  if (Count > 0) then begin
    item := Items[0];
    Extract(item);
    Result := true;
    if Assigned(FOnStartItem) then
      FOnStartItem(Self, item);
  end else
    Result := false;
end;

function TQueueMultiThread.finished: Boolean;
begin
  Result:=(FListThreads.count<=0) and (FListThreads.Terminated or (Count<=0));
end;

function TQueueMultiThread.FinishItem(var item: T): Boolean;
begin
  if Assigned(FOnFinishItem) then
    FOnFinishItem(Self, item);
end;

function TQueueMultiThread.GetMaxThreads: Integer;
begin
  Result:=FListThreads.MaxThreads;
end;

function TQueueMultiThread.GetThreadCount: Integer;
begin
  result:=FListThreads.Count;
end;

function TQueueMultiThread.NewThread: boolean;
var
  AThread:TThread;
begin
  Result:=false;
  AThread := FClassThread.Create(True);
  if assigned(AThread) then
  try
    if AThread is TThread then
      TThread(AThread).ListItems:=Self;
    FListThreads.Add(AThread);
    AThread.FreeOnTerminate:=true;
    AThread.Priority := tpLower;
    AThread.OnTerminate := Self.TerminateThread;
    if Assigned(FOnStartThread) then
      FOnStartThread(Self);
    AThread.Start;
    result:=true;
  except
  end;
end;

function TQueueMultiThread.QueryInterface(const IID: TGUID; out Obj): HResult;
begin
  if GetInterface(IID, Obj) then
    Result := S_OK
  else
    Result := E_NOINTERFACE;
end;

procedure TQueueMultiThread.SetMaxThreads(const Value: Integer);
begin
  FListThreads.MaxThreads:=Value;
end;

procedure TQueueMultiThread.TerminateAll;
begin
  Clear;
  FListThreads.TerminateAll;
end;

procedure TQueueMultiThread.TerminateThread(sender: TObject);
var
  i:longint;
begin
  if sender is TThread then begin
    if ( FListThreads.IndexOf(TThread(sender)) >= 0 ) then begin
      FListThreads.Extract(TThread(sender));
      if Assigned(FOnTerminateThread) then
        FOnTerminateThread(sender);
      AddNewThreads;
    end else
      MessageDlg('Thread not found', mtWarning, [mbOK], 0);
  end;
end;

function TQueueMultiThread._AddRef: Integer;
begin
  Result:=-1;
end;

function TQueueMultiThread._Release: Integer;
begin
  Result:=-1;
end;

{ TThread }
procedure TThread.execute;
begin
  inherited;
  if not Assigned(FListItems) then exit;
  Synchronize(GetNewItem);
  while not Terminated and not FListEmpty do begin
    ProcessItem;
    synchronize(FinishItem);
    Synchronize(GetNewItem);
  end;
end;

procedure TThread.GetNewItem;
begin
  FListEmpty := not FListItems.ExtractItem(FItem);
end;

procedure TThread.FinishItem;
begin
  FListItems.FinishItem(FItem);
end;

end.

Forma de uso:

Código Delphi [-]
uses UQueueMultiThreads;

type
  /// cualquier tipo de dato, class, record, string, interger, ... , etc
  TUnDato = record
    cadena:String;
  end;

  /// Se define la lista de elmentos igual que si descendiera de class(TLIST)
  TColaMultiThread=class(TQueueMultiThread);

  /// Se crea un Thread de la misma clase que los datos que se van a usar, sobreescribiendo el procedimiento ProcessItem;  
  TMyThread = class(TThread)
  private
    procedure ProcessItem; override;
  end;



implementation


{ TMyThread }

procedure TMyThread.ProcessItem;
begin
  inherited;
  /// lo que queremos hacer con los datos
  with Item do begin
    cadena:=ReverseString(cadena)+cadena;
    sleep(500);
  end;
end;


var
  FColaMultiThread:TColaMultiThread;

procedure test;
var
  i:longint;
  dato:TUnDato;
begin
  for i := 0 to 100 do begin
    dato.cadena:=IntToStr(i);
    FColaMultiThread.Add(dato);
  end;
end;

initialization
  /// Se crea la cola y se le indica cual es la clase de Thread que tiene que
  /// procesar los elementos que se vayan insertando
  FColaMultiThread:=TColaMultiThread.Create(TMyThread);
  FColaMultiThread.MaxThreads:=5;
finalization
  FColaMultiThread.free;
end.
Responder Con Cita