Delphi-Help

Главная Статьи FireBird/Interbase Многопоточный доступ к базам данных

Многопоточный доступ к базам данных

Оцените материал
(0 голосов)


Многопоточный доступ к базам данных

По этой теме очень мало информации, особенно в части, касающейся доступа к SQL-серверам (например IB). Мне пришлось несколько дней активно заниматься всем этим – не нашел достойной замены для VirtualTree и решил заполнять дерево с помощью потока. Отмечу, что мои ранние попытки использовать потоки для обращения к Interbase не увенчались успехом, да и дискуссии по теме на форуме epsylon.public.interbase не особо вдохновляли. Жизнь заставила пересмотреть подходы к проблеме и вот что получилось.

Как известно, запрос в потоке должен выполняться в отдельном контексте, т.е. поток должен иметь как минимум IBDataset, IBTransaction и IBSQL. Можно использовать IBDataBase.InternalTransaction, но лучше таки создать в потоке отдельный IBTransaction.

Рассмотрим процесс создания потока, способного обеспечить требуемые условия – создадим базовый класс и затем породим от него поток, способный, например, подсчитать количество записей в запросе.

    unit basedbtr;
    interface
      uses Windows,Classes, Messages,IBDatabase,IBSQL;
    const
    {Идентификаторы сообщений, используемых для передачи информации форме, создавшей поток}
      WM_THREAD_TEXT=WM_USER+1000;
      WM_THREAD_INTEGER=WM_THREAD_TEXT+1;
    type
      TBaseDB_Thread = class(TThread)
        private
          {Локальные поля, хранящие информацию об окне владельца потока}
          fCurForm:HWND;
          {SQL текст}
          fNewSqlText:string;
          {Флаг нового запроса}
          fSetNewQuery:boolean;
          {Далее три компоненты, обеспечивающие наличие собственного контекста потока}
          Base:TIBdatabase;
          trWrite:TIbTransaction;
          ThreadSql:TIBSQL;
          {Процедуры для передачи информации владельцу потока}
          procedure Post_Text(MsgData:string);
          procedure Post_Integer(MsgData:integer);
        protected
          {Метод Execute пока ничего, кроме как перехода в состояние Suspend, не умеет}
          procedure Execute; override;
        public
          property CurForm:HWND read fCurForm write fCurForm;
          property NewSqlText:string read fNewSqlText write fNewSqlText;
          property SetNewQuery:boolean read fSetNewQuery write fSetNewQuery;
        constructor Create(OwnerHWND:HWND; //Дескриптор окна-родителя потока         aBaseName:String; //DatabaseName
            aBaseParams:TStrings; //Информация для IBDatabase.Params
            TransParams:TStrings); //Информация для IBTransactin.Params
        destructor Destroy; override;
      end; 
 
    {Породим от базового потока новый поток с требуемой функциональностью, которая реализуется в методе Execute}
    TGetRecordCount_Thread = class(TBaseDB_Thread)
      protected
        procedure Execute; override;
    end; 
 
    {Глобальная переменная, хранящая текст для передачи окну-владельцу}
      var MsgText:string;
    implementation 
 
    constructor TBaseDB_Thread.Create;
    begin
      fCurForm:=OwnerHWND;
      NewSqlText:='';
      FreeOnTerminate :=False;
      base:=TIBdatabase.Create(nil);
      Base.DatabaseName:=aBaseName;
      Base.LoginPrompt:=false;
      Base.Params.Assign(aBaseParams);
      trWrite:=TIBTransaction.Create(nil);
      trWrite.DefaultDatabase:=Base;
      trWrite.Params.Assign(TransParams);
      ThreadSql:=TIBSQL.Create(nil);
      ThreadSql.SQL.Text:='';
      ThreadSql.Transaction:=trWrite;
      Base.Connected:=true;
      inherited Create(true); {true - переведем поток в состояние Suspended}
    end;
 
    destructor TBaseDB_Thread.Destroy;
    begin
      {Остановим поток}
      if not Suspended then Suspend;
      fNewSqlText:='';
      fSetNewQuery:=false;
      Terminate;
      Resume;
      WaitFor;
      ThreadSql.Free;
      trWrite.Free;
      Base.Free;
      inherited Destroy;
    end; 
 
    {Ниже процедуры для передачи информации владельцу}
    procedure TBaseDB_Thread.Post_Text(MsgData:string);
    begin
      MsgText:=msgData;
      PostMessage(CurForm,WM_THREAD_TEXT,0,Integer(PChar(MsgText)));
    end;
 
    procedure TBaseDB_Thread.Post_Integer(MsgData:integer);
    begin
      PostMessage(CurForm,WM_THREAD_INTEGER,0,MsgData);
    end; 
 
    {"Пустой" метод базового класса}
    procedure TBaseDB_Thread.Execute;
    begin
      while not Terminated do
        suspend;
    end; 
 
    {Здесь Execute наследника}
    procedure TGetRecordCount_Thread.Execute;
      begin
        while not Terminated do begin
          {Если установлен флаг нового запроса}
          if fSetNewQuery then
            begin
              {Сбросим флаг}
              fSetNewQuery:=false;
              {Сообщим владельцу о начале работы потока }
              Post_text('Resume');
              try
                if not trWrite.InTransaction then trWrite.StartTransaction;
                ThreadSql.Sql.Text:=NewSqlText;
                ThreadSql.ExecQuery;
                {Сообщим владельцу результат}
                Post_Integer(ThreadSql.Fields[0].AsInteger);
                trWrite.Commit;
                ThreadSql.Close;
              except
                raise;
                Post_Text('Error');
              end;
            end;
          {Если в процессе выполнения не установлен флаг нового запроса, то остановимся в этой точке}
          if not Terminated and not SetNewQuery then begin
            {Вначале сообщим владельцу}
            Post_Text('Suspend');
            suspend;
          end;
        end;
      end;
    end. 

В прилагаемом к статье проекте на Delphi (10.4K) реализованы дополнительные функции для обеспечения нормальной работы потока. Рассмотрим их.

    procedure TForm1.IBQuery1AfterOpen(DataSet: TDataSet);
    begin
      if Trs.Suspended
        then SetTrsData
        else Timer1.Enabled:=true;
    end; 

Событие должно вызвать метод потока Resume для выполнения запроса о количестве записей. Но мы не знаем, стоит ли это делать, может прежний запрос касается миллионов записей и выражение WHERE столь сложен, чтоб его выполнить за секунды. Проверим состояние потока – если он Suspended, то можно сходу инициализировать поток новыми параметрами и запустить его, иначе – воспользуемся таймером Timer1.

    procedure TForm1.Timer1Timer(Sender: TObject);
    begin
      if Trs.Suspended then begin
        Timer1.Enabled:=false;
        SetTrsData;
      end;
    end; 

Здесь процедура рестарта потока

    procedure TForm1.SetTrsData;
    begin
      Trs.NewSqlText:=Memo2.Lines.Text;
      Trs.SetNewQuery:=true;
      Trs.Resume;
    end; 

Перед тем, как переоткрыть запрос для Grid, остановим таймер – вдруг он сработает и нарушит порядок

    procedure TForm1.IBQuery1BeforeOpen(DataSet: TDataSet);
    begin
      Timer1.Enabled:=false;
    end; 

Вот и все. Данный пример показывает, как создавать поток для использования его в качестве дополнительного инструмента как для мелких «поручений» типа рассмотренного выше, так и для исполнения тяжелых процедур, связанных с изменениями (INSERT or UPDATE), способных ввести основной поток приложения в ступор.

Предположим, что необходимо заполнять TREE данными из запроса.

    TMySql_Thread = class(TBaseDB_Thread)
      Private
        {Это флаг актуальности данных}
        FActual:boolean;
        {Набор параметров для IBSQL}
        FParam1:integer;
        FParam2:integer;
      protected
        procedure Execute; override;
      public
        property Actual:boolean read fActual write fActual;
        property Param1:integer read fParam1 write fParam1;
        property Param2:integer read fParam2 write fParam2;
      end;
 
    procedure TMySql_Thread.Execute;
    begin
      while not Terminated do begin
        {Если установлен флаг нового запроса}
        if fSetNewQuery then
          begin
          {Сбросим флаг}
          fSetNewQuery:=false;
          fActual:=true;
          {Сообщим владельцу о начале работы потока }
          Post_text('Resume');
          try
            if not trWrite.InTransaction then trWrite.StartTransaction;
            ThreadSql.Sql.Text:=NewSqlText;
            ThreadSql.Params[0].asInteger:=fParam1;
            ThreadSql.Params[1].asInteger:=fParam2;
            ThreadSql.ExecQuery;
            {Здесь цикл вставки данных в Tree, причем анализируются Terminated и флаг актуальности потока}
            While not Terminated and fActual and not ThreadSql.Eof do
              Begin
                {Вызываем процедуру вставки}
                Synchronize(AddToList);
                ThreadSql.Next;
              End;
            trWrite.Commit;
            ThreadSql.Close;
          except
            raise;
            Post_Text('Error');
          end;
          end;
        {Если в процессе выполнения не установлен флаг нового запроса, то остановимся в этой точке}
        if not Terminated and not SetNewQuery then begin
          {Вначале сообщим владельцу}
          Post_Text('Suspend');
          suspend;
        end;
      end;
    end; 
Прочитано 7747 раз

Авторизация



Счетчики