Многопоточный доступ к базам данных
По этой теме очень мало информации, особенно в части, касающейся доступа к SQL-серверам (например IB). Мне пришлось несколько дней активно заниматься всем этим – не нашел достойной замены для VirtualTree и решил заполнять дерево с помощью потока. Отмечу, что мои ранние попытки использовать потоки для обращения к Interbase не увенчались успехом, да и дискуссии по теме на форуме epsylon.public.interbase не особо вдохновляли. Жизнь заставила пересмотреть подходы к проблеме и вот что получилось.
Как известно, запрос в потоке должен выполняться в отдельном контексте, т.е. поток должен иметь как минимум IBDataset, IBTransaction и IBSQL. Можно использовать IBDataBase.InternalTransaction, но лучше таки создать в потоке отдельный IBTransaction.
Рассмотрим процесс создания потока, способного обеспечить требуемые условия – создадим базовый класс и затем породим от него поток, способный, например, подсчитать количество записей в запросе.
unit basedbtr;
interface
uses Windows,Classes, Messages,IBDatabase,IBSQL;
const
{Идентификаторы сообщений, используемых для передачи информации форме, создавшей поток}
WM_THREAD_TEXT=WM_USER+1000; WM_THREAD_INTEGER=WM_THREAD_TEXT+1;
type
TBaseDB_Thread = class(TThread)
private
{Локальные поля, хранящие информацию об окне владельца потока}
fCurForm:HWND; {SQL текст} fNewSqlText:string;
{Флаг нового запроса} fSetNewQuery:boolean;
{Далее три компоненты, обеспечивающие наличие собственного контекста потока}
Base:TIBdatabase; trWrite:TIbTransaction;
ThreadSql:TIBSQL;
{Процедуры для передачи информации владельцу потока}
procedure Post_Text(MsgData:string); procedure Post_Integer(MsgData:integer);
protected
{Метод Execute пока ничего, кроме как перехода в состояние Suspend, не умеет}
procedure Execute; override; public
property CurForm:HWND read fCurForm write fCurForm;
property NewSqlText:string read fNewSqlText write fNewSqlText;
property SetNewQuery:boolean read fSetNewQuery write fSetNewQuery;
constructor Create(OwnerHWND:HWND; //Дескриптор окна-родителя потока aBaseName:String; //DatabaseName aBaseParams:TStrings; //Информация для IBDatabase.Params TransParams:TStrings); //Информация для IBTransactin.Params destructor Destroy; override;
end;
{Породим от базового потока новый поток с требуемой функциональностью, которая реализуется в методе Execute}
TGetRecordCount_Thread = class(TBaseDB_Thread) protected
procedure Execute; override;
end;
{Глобальная переменная, хранящая текст для передачи окну-владельцу}
var MsgText:string; implementation
constructor TBaseDB_Thread.Create;
begin
fCurForm:=OwnerHWND;
NewSqlText:='';
FreeOnTerminate :=False;
base:=TIBdatabase.Create(nil);
Base.DatabaseName:=aBaseName;
Base.LoginPrompt:=false;
Base.Params.Assign(aBaseParams);
trWrite:=TIBTransaction.Create(nil);
trWrite.DefaultDatabase:=Base;
trWrite.Params.Assign(TransParams);
ThreadSql:=TIBSQL.Create(nil);
ThreadSql.SQL.Text:='';
ThreadSql.Transaction:=trWrite;
Base.Connected:=true;
inherited Create(true); {true - переведем поток в состояние Suspended} end;
destructor TBaseDB_Thread.Destroy;
begin
{Остановим поток} if not Suspended then Suspend;
fNewSqlText:='';
fSetNewQuery:=false;
Terminate;
Resume;
WaitFor;
ThreadSql.Free;
trWrite.Free;
Base.Free;
inherited Destroy;
end;
{Ниже процедуры для передачи информации владельцу}
procedure TBaseDB_Thread.Post_Text(MsgData:string); begin
MsgText:=msgData;
PostMessage(CurForm,WM_THREAD_TEXT,0,Integer(PChar(MsgText)));
end;
procedure TBaseDB_Thread.Post_Integer(MsgData:integer);
begin
PostMessage(CurForm,WM_THREAD_INTEGER,0,MsgData);
end;
{"Пустой" метод базового класса}
procedure TBaseDB_Thread.Execute; begin
while not Terminated do
suspend;
end;
{Здесь Execute наследника} procedure TGetRecordCount_Thread.Execute;
begin
while not Terminated do begin
{Если установлен флаг нового запроса}
if fSetNewQuery then begin
{Сбросим флаг} fSetNewQuery:=false;
{Сообщим владельцу о начале работы потока }
Post_text('Resume'); try
if not trWrite.InTransaction then trWrite.StartTransaction;
ThreadSql.Sql.Text:=NewSqlText;
ThreadSql.ExecQuery;
{Сообщим владельцу результат} Post_Integer(ThreadSql.Fields[0].AsInteger);
trWrite.Commit;
ThreadSql.Close;
except
raise;
Post_Text('Error');
end;
end;
{Если в процессе выполнения не установлен флаг нового запроса, то остановимся в этой точке}
if not Terminated and not SetNewQuery then begin {Вначале сообщим владельцу}
Post_Text('Suspend');
suspend; end;
end;
end;
end.
|
В прилагаемом к статье проекте на Delphi (10.4K) реализованы дополнительные функции для обеспечения нормальной работы потока. Рассмотрим их.
procedure TForm1.IBQuery1AfterOpen(DataSet: TDataSet);
begin
if Trs.Suspended
then SetTrsData
else Timer1.Enabled:=true;
end;
|
Событие должно вызвать метод потока Resume для выполнения запроса о количестве записей. Но мы не знаем, стоит ли это делать, может прежний запрос касается миллионов записей и выражение WHERE столь сложен, чтоб его выполнить за секунды. Проверим состояние потока – если он Suspended, то можно сходу инициализировать поток новыми параметрами и запустить его, иначе – воспользуемся таймером Timer1.
procedure TForm1.Timer1Timer(Sender: TObject);
begin
if Trs.Suspended then begin
Timer1.Enabled:=false;
SetTrsData;
end;
end;
|
Здесь процедура рестарта потока
procedure TForm1.SetTrsData;
begin
Trs.NewSqlText:=Memo2.Lines.Text;
Trs.SetNewQuery:=true;
Trs.Resume;
end;
|
Перед тем, как переоткрыть запрос для Grid, остановим таймер – вдруг он сработает и нарушит порядок
procedure TForm1.IBQuery1BeforeOpen(DataSet: TDataSet);
begin
Timer1.Enabled:=false;
end;
|
Вот и все. Данный пример показывает, как создавать поток для использования его в качестве дополнительного инструмента как для мелких «поручений» типа рассмотренного выше, так и для исполнения тяжелых процедур, связанных с изменениями (INSERT or UPDATE), способных ввести основной поток приложения в ступор.
Предположим, что необходимо заполнять TREE данными из запроса.
TMySql_Thread = class(TBaseDB_Thread)
Private
{Это флаг актуальности данных}
FActual:boolean;
{Набор параметров для IBSQL}
FParam1:integer; FParam2:integer;
protected
procedure Execute; override;
public
property Actual:boolean read fActual write fActual;
property Param1:integer read fParam1 write fParam1;
property Param2:integer read fParam2 write fParam2;
end;
procedure TMySql_Thread.Execute;
begin
while not Terminated do begin
{Если установлен флаг нового запроса}
if fSetNewQuery then begin
{Сбросим флаг} fSetNewQuery:=false;
fActual:=true;
{Сообщим владельцу о начале работы потока } Post_text('Resume'); try
if not trWrite.InTransaction then trWrite.StartTransaction;
ThreadSql.Sql.Text:=NewSqlText;
ThreadSql.Params[0].asInteger:=fParam1;
ThreadSql.Params[1].asInteger:=fParam2;
ThreadSql.ExecQuery;
{Здесь цикл вставки данных в Tree, причем анализируются Terminated и флаг актуальности потока}
While not Terminated and fActual and not ThreadSql.Eof do Begin
{Вызываем процедуру вставки} Synchronize(AddToList);
ThreadSql.Next;
End;
trWrite.Commit;
ThreadSql.Close;
except
raise;
Post_Text('Error');
end;
end;
{Если в процессе выполнения не установлен флаг нового запроса, то остановимся в этой точке}
if not Terminated and not SetNewQuery then begin {Вначале сообщим владельцу}
Post_Text('Suspend');
suspend;
end;
end;
end;
|