2015-09-18 6 views
1

Я разрабатываю пул потоков со следующими функциями.Как проверить, работает ли поток в данный момент?

  • Новый поток должен быть порожден только тогда, когда запущены все другие потоки.
  • Максимальное количество потоков должно быть настраиваемым.
  • Когда поток ожидает, он должен иметь возможность обрабатывать новые запросы.
  • Каждая операция ввода-вывода должна вызвать функцию обратного вызова по завершении
  • резьбы должны иметь способ управления запрос его сервировки и IO обратные вызовы

Вот код:

unit ThreadUtilities; 

interface 
uses 
Windows, SysUtils, Classes; 

type 
    EThreadStackFinalized = class(Exception); 
    TSimpleThread = class; 

    // Thread Safe Pointer Queue 
    TThreadQueue = class 
    private 
     FFinalized: Boolean; 
     FIOQueue: THandle; 
    public 
     constructor Create; 
     destructor Destroy; override; 
     procedure Finalize; 
     procedure Push(Data: Pointer); 
     function Pop(var Data: Pointer): Boolean; 
     property Finalized: Boolean read FFinalized; 
    end; 

    TThreadExecuteEvent = procedure (Thread: TThread) of object; 

    TSimpleThread = class(TThread) 
    private 
     FExecuteEvent: TThreadExecuteEvent; 
    protected 
     procedure Execute(); override; 
    public 
     constructor Create(CreateSuspended: Boolean; ExecuteEvent: TThreadExecuteEvent; AFreeOnTerminate: Boolean); 
    end; 

    TThreadPoolEvent = procedure (Data: Pointer; AThread: TThread) of Object; 

    TThreadPool = class(TObject) 
    private 
     FThreads: TList; 
     fis32MaxThreadCount : Integer; 
     FThreadQueue: TThreadQueue; 
     FHandlePoolEvent: TThreadPoolEvent; 
     procedure DoHandleThreadExecute(Thread: TThread); 
     procedure SetMaxThreadCount(const pis32MaxThreadCount : Integer); 
     function GetMaxThreadCount : Integer; 

    public 
     constructor Create(HandlePoolEvent: TThreadPoolEvent; MaxThreads: Integer = 1); virtual; 
     destructor Destroy; override; 
     procedure Add(const Data: Pointer); 
     property MaxThreadCount : Integer read GetMaxThreadCount write SetMaxThreadCount; 
    end; 


implementation 



constructor  TThreadQueue.Create; 
begin   
    //-- Create IO Completion Queue 
    FIOQueue := CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); 
    FFinalized := False; 
end; 

destructor TThreadQueue.Destroy; 
begin 
    //-- Destroy Completion Queue 
    if (FIOQueue = 0) then 
     CloseHandle(FIOQueue); 
    inherited; 
end; 

procedure TThreadQueue.Finalize; 
begin 
    //-- Post a finialize pointer on to the queue 
    PostQueuedCompletionStatus(FIOQueue, 0, 0, Pointer($FFFFFFFF)); 
    FFinalized := True; 
end; 


function TThreadQueue.Pop(var Data: Pointer): Boolean; 
var 
    A: Cardinal; 
    OL: POverLapped; 
begin 
    Result := True; 
    if (not FFinalized) then 
     //-- Remove/Pop the first pointer from the queue or wait 
     GetQueuedCompletionStatus(FIOQueue, A, Cardinal(Data), OL, INFINITE); 

    //-- Check if we have finalized the queue for completion 
    if FFinalized or (OL = Pointer($FFFFFFFF)) then begin 
     Data := nil; 
     Result := False; 
     Finalize; 
    end; 
end; 

procedure TThreadQueue.Push(Data: Pointer); 
begin   
    if FFinalized then 
     Raise EThreadStackFinalized.Create('Stack is finalized'); 
    //-- Add/Push a pointer on to the end of the queue 
    PostQueuedCompletionStatus(FIOQueue, 0, Cardinal(Data), nil); 
end; 

{ TSimpleThread } 

constructor TSimpleThread.Create(CreateSuspended: Boolean; 
    ExecuteEvent: TThreadExecuteEvent; AFreeOnTerminate: Boolean); 
begin 
    FreeOnTerminate := AFreeOnTerminate; 
    FExecuteEvent := ExecuteEvent; 
    inherited Create(CreateSuspended); 
end; 

Изменен код, предложенный J ... также добавил критические разделы, но проблема, с которой я сейчас сталкиваюсь, заключается в том, что, когда я пытаюсь вызвать многократную задачу, используется только один поток. Давайте скажем, если бы я добавил 5 потоков в пул, тогда используется только один поток который является нитью 1. Пожалуйста, проверьте мою клиентскую треску e, а также в следующем разделе.

procedure TSimpleThread.Execute; 
begin 
    // if Assigned(FExecuteEvent) then 
//  FExecuteEvent(Self); 
    while not self.Terminated do begin 
    try 
//  FGoEvent.WaitFor(INFINITE); 
//  FGoEvent.ResetEvent; 
     EnterCriticalSection(csCriticalSection); 
     if self.Terminated then break; 


     if Assigned(FExecuteEvent) then 
     FExecuteEvent(Self); 
    finally 
     LeaveCriticalSection(csCriticalSection); 
//  HandleException; 
    end; 
end; 
end; 

В методе Add, как я могу проверить, есть ли какой-либо поток, который не занят, если он не занят, то повторно он еще создать новую тему и добавить его в список ThreadPool?

{ TThreadPool } 
procedure TThreadPool.Add(const Data: Pointer); 
begin 
    FThreadQueue.Push(Data); 
// if FThreads.Count < MaxThreadCount then 
// begin 
// FThreads.Add(TSimpleThread.Create(False, DoHandleThreadExecute, False)); 
// end; 
end; 

constructor TThreadPool.Create(HandlePoolEvent: TThreadPoolEvent; 
    MaxThreads: Integer); 
begin 
    FHandlePoolEvent := HandlePoolEvent; 
    FThreadQueue := TThreadQueue.Create; 
    FThreads := TList.Create; 
    FThreads.Add(TSimpleThread.Create(False, DoHandleThreadExecute, False)); 
end; 

destructor TThreadPool.Destroy; 
var 
    t: Integer; 
begin 
    FThreadQueue.Finalize; 
    for t := 0 to FThreads.Count-1 do 
     TThread(FThreads[t]).Terminate; 
    while (FThreads.Count = 0) do begin 
     TThread(FThreads[0]).WaitFor; 
     TThread(FThreads[0]).Free; 
     FThreads.Delete(0); 
    end; 
    FThreadQueue.Free; 
    FThreads.Free; 
    inherited; 
end; 

procedure TThreadPool.DoHandleThreadExecute(Thread: TThread); 
var 
    Data: Pointer; 
begin 
    while FThreadQueue.Pop(Data) and (not TSimpleThread(Thread).Terminated) do begin 
     try 
      FHandlePoolEvent(Data, Thread); 
     except 
     end; 
    end; 
end; 

function TThreadPool.GetMaxThreadCount: Integer; 
begin 
    Result := fis32MaxThreadCount; 
end; 

procedure TThreadPool.SetMaxThreadCount(const pis32MaxThreadCount: Integer); 
begin 
    fis32MaxThreadCount := pis32MaxThreadCount; 
end; 

end. 

Client Код: Этот клиент я создал, чтобы регистрировать данные в текстовом файле: блок ThreadClient;

interface 

uses Windows, SysUtils, Classes, ThreadUtilities; 

type 
    PLogRequest = ^TLogRequest; 
    TLogRequest = record 
     LogText: String; 
    end; 

    TThreadFileLog = class(TObject) 
    private 
     FFileName: String; 
     FThreadPool: TThreadPool; 
     procedure HandleLogRequest(Data: Pointer; AThread: TThread); 
    public 
     constructor Create(const FileName: string); 
     destructor Destroy; override; 
     procedure Log(const LogText: string); 
     procedure SetMaxThreadCount(const pis32MaxThreadCnt : Integer); 
    end; 

implementation 

(* Simple reuse of a logtofile function for example *) 
procedure LogToFile(const FileName, LogString: String); 
var 
    F: TextFile; 
begin 
    AssignFile(F, FileName); 
    if not FileExists(FileName) then 
     Rewrite(F) 
    else 
     Append(F); 
    try 
     Writeln(F, DateTimeToStr(Now) + ': ' + LogString); 
    finally 
     CloseFile(F); 
    end; 
end; 

constructor TThreadFileLog.Create(const FileName: string); 
begin 
    FFileName := FileName; 
    //-- Pool of one thread to handle queue of logs 
    FThreadPool := TThreadPool.Create(HandleLogRequest, 5); 
end; 

destructor TThreadFileLog.Destroy; 
begin 
    FThreadPool.Free; 
    inherited; 
end; 

procedure TThreadFileLog.HandleLogRequest(Data: Pointer; AThread: TThread); 
var 
    Request: PLogRequest; 
    los32Idx : Integer; 
begin 
    Request := Data; 
    try 
    for los32Idx := 0 to 100 do 
    begin 
     LogToFile(FFileName, IntToStr(AThread.ThreadID) + Request^.LogText); 
    end; 
    finally 
    Dispose(Request); 
    end; 
end; 

procedure TThreadFileLog.Log(const LogText: string); 
var 
    Request: PLogRequest; 
begin 
    New(Request); 
    Request^.LogText := LogText; 
    FThreadPool.Add(Request); 
end; 
procedure TThreadFileLog.SetMaxThreadCount(const pis32MaxThreadCnt: Integer); 
begin 
    FThreadPool.MaxThreadCount := pis32MaxThreadCnt; 
end; 

end. 

Это форма приложения, где я добавил три кнопки, каждая кнопка клик записать некоторое значение в файл с ID потоком и текст сообщ. Но проблема в том, идентификатор потока всегда же

unit ThreadPool; 

interface 

uses 
    Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms, 
    Dialogs, StdCtrls, ThreadClient; 

type 
    TForm5 = class(TForm) 
    Button1: TButton; 
    Button2: TButton; 
    Button3: TButton; 
    Edit1: TEdit; 
    procedure Button1Click(Sender: TObject); 
    procedure FormCreate(Sender: TObject); 
    procedure Button2Click(Sender: TObject); 
    procedure Button3Click(Sender: TObject); 
    procedure Edit1Change(Sender: TObject); 
    private 
    { Private declarations } 
    fiFileLog : TThreadFileLog; 
    public 
    { Public declarations } 
    end; 

var 
    Form5: TForm5; 

implementation 

{$R *.dfm} 

procedure TForm5.Button1Click(Sender: TObject); 
begin 
    fiFileLog.Log('Button one click'); 
end; 

procedure TForm5.Button2Click(Sender: TObject); 
begin 
    fiFileLog.Log('Button two click'); 
end; 

procedure TForm5.Button3Click(Sender: TObject); 
begin 
    fiFileLog.Log('Button three click'); 
end; 

procedure TForm5.Edit1Change(Sender: TObject); 
begin 
    fiFileLog.SetMaxThreadCount(StrToInt(Edit1.Text)); 
end; 

procedure TForm5.FormCreate(Sender: TObject); 
begin 
    fiFileLog := TThreadFileLog.Create('C:/test123.txt'); 
end; 

end. 
+0

Это не подходящий вопрос для переполнения стека. Пожалуйста, прочитайте темы в [help]. –

+1

Где я должен спрашивать тогда? Это технический вопрос, я пытаюсь создать threadpool в D2007, но столкнулся с некоторой проблемой при нерестах нового потока, когда запрос поступит в threadpool. Если вы можете повторить некоторые рекомендации, это будет очень полезно. Спасибо –

+1

«сталкиваются с какой-то проблемой при нерестах новой нити». Затем вы можете показать код, связанный с этим, и описать, как проблема проявляется, и где. – MartynA

ответ

2

Во-первых, и, вероятно, наиболее сильно рекомендуется, вы можете рассмотреть вопрос об использовании библиотеки как OmniThread реализовать ThreadPool. Тяжелая работа выполняется для вас, и вы, скорее всего, в конечном итоге создадите некачественный и глючный продукт с помощью собственного решения. Если у вас нет особых требований, это, вероятно, самое быстрое и простое решение.

Тем не менее, если вы хотите, чтобы попытаться сделать это ...

То, что вы могли бы рассмотреть это просто сделать все потоки в пуле при запуске, а не по требованию. Если сервер будет занят в любой точке, то в конечном итоге в конечном итоге в итоге окажется пул MaxThreadCount.

В любом случае, если вы хотите сохранить пул потоков живыми и доступными для работы, тогда им нужно будет следовать немного другой модели, чем то, что вы написали.

Рассмотрим:

procedure TSimpleThread.Execute; 
begin 
    if Assigned(FExecuteEvent) then 
     FExecuteEvent(Self); 
end; 

Вот когда вы запускаете ваш поток будет выполнять эту функцию обратного вызова, а затем прекратить. Это не похоже на то, что вы хотите. То, что вам кажется нужным, - сохранить поток в ожидании следующего рабочего пакета.Я использую базовый класс резьбы (для бассейнов) с помощью метода выполнения, который выглядит примерно так (это несколько упрощенное):

procedure TMyCustomThread.Execute; 
begin 
    while not self.Terminated do begin 
    try 
     FGoEvent.WaitFor(INFINITE); 
     FGoEvent.ResetEvent; 
     if self.Terminated then break; 
     MainExecute;   
    except 
     HandleException; 
    end; 
    end; 
end; 

Здесь FGoEvent является TEvent. Класс реализации определяет, как выглядит рабочий пакет в абстрактном методе MainExecute, но независимо от того, что он представляет собой поток, он выполнит свою работу, а затем вернется в ожидании FGoEvent, чтобы сообщить, что ему нужно выполнить новую работу.

В вашем случае вам необходимо отслеживать, какие потоки ждут и которые работают. Вероятно, вам захочется, чтобы какой-то класс менеджера отслеживал эти объекты потоков. Назначение чего-то простого, как threadID для каждого, кажется разумным. Для каждого потока, непосредственно перед его запуском, сделайте запись, в которой он сейчас занят. В самом конце рабочего пакета вы можете отправить сообщение обратно в класс менеджера, указав, что работа выполнена (и что он может отмечать поток как доступный для работы).

Когда вы добавляете работу в очередь, вы можете сначала проверить доступные потоки для запуска работы (или создать новую, если хотите следовать описанной вами модели). Если есть потоки, тогда запустите задачу, если нет, то нажмите на работу в рабочей очереди. Когда отчет о рабочих потоках завершается, менеджер может проверить очередь на выдающуюся работу. Если есть работа, она может немедленно повторно развернуть поток. Если нет работы, он может пометить поток как доступный для работы (здесь вы можете использовать вторую очередь для доступных работников).

Полная реализация слишком сложна для документирования в одном ответе здесь - это цель только для того, чтобы вырвать некоторые общие идеи.

+0

Спасибо @J ... это действительно хорошее направление. Я попробую это :) –

+3

Вместо того, чтобы давать каждому потоку собственное событие, вместо этого у меня будет событие в очереди. Заблокируйте очередь, добавьте элемент, сообщите о событии и откройте очередь. Любой незанятый поток может ждать этого события. Когда сигнализируется, простаивающие потоки могут затем заблокировать очередь, вытащить элемент, если он доступен, сбросить событие, если очередь пуста, и разблокировать очередь. Если это недостаточно эффективно, используйте порт завершения ввода-вывода. Простаивающие потоки ждут на IOCP, а затем просто публикуют рабочие элементы в IOCP и позволяют ОС сообщать каждому потоку бездействия, к которому относится конкретный элемент. Не требуется блокировка или события. –

+1

@RemyLebeau Все хорошие вещи, согласованные - этот ответ был просто простым примером * одного * способа сделать это (чтобы проиллюстрировать концепцию). –