본문 바로가기

MS/tip

Lock-Free Queue

출처 : http://serverprogramminggem.springnote.com/pages/940824

Lock-Free Queue

  • 일반적인 방식의 Thread Safe Queue

    1. #include "ace/Thread_Mutex.h"
      template <typename T>
      class NormalQueueT
      {
      private:
          int    _size;
          ACE_Thread_Mutex _lock;

          // NODE데이타를 저장할 구조체
          struct Node
          {
              T Data;
              Node *NextNode;
          };

          Node* _mNode;        // 노드 할당을 위해서 사용한다.
          Node* _nextNode;    // 다음노드를 포인트한다. 

          Node* _startNode;    // 시작노드를 포인트한다.
          Node* _endNode;        // 마지막노드를 포인트한다.

      public:
          NormalQueueT()
          {
              _size = 0;
          }

          void Enqueue(T t)
          {
              ACE_Guard<ACE_Thread_Mutex> guard(_lock);
              _mNode = new Node;
              _mNode->Data = t;
              _mNode->NextNode = NULL;
              if (_size == 0)
              {
                  _startNode = _mNode;
                  _endNode = _mNode;
              }
              else
              {
                  _endNode->NextNode = _mNode;
                  _endNode = _mNode;
              }
              _size++;
          }

          int Dequeue(T& t)
          {
              ACE_Guard<ACE_Thread_Mutex> guard(_lock);
              if (_size == 0)
                  return -1;
              t = _startNode->Data;
              _nextNode = _startNode->NextNode;
              delete _startNode;
              _startNode = _nextNode;
              _size--;
              return 0;
          }
      };
  • Lock-Free 방식으로 구현한 Thread Safe Queue

    1. #include <windows.h>
      template <typename T>
      class QueueT
      {
      private:
          long _size;
          // NODE데이타를 저장할 구조체
          struct Node
          {
              T data;
              Node* next;

              Node& operator=(const Node& ot)
              {
                  data = ot.data;
                  next = ot.next;
                  return *this;
              }

              bool operator==(const Node& ot)
              {
                  return (data == ot.data && next == ot.next);
              }
          };

          Node* _head;        // 시작노드를 포인트한다.
          Node* _tail;        // 마지막노드를 포인트한다.

      public:
          QueueT()
          {
              _size = 0;
              _head = new Node;
              _head->next = NULL;
              _tail = _head;
          }

          void Enqueue(T t)
          {
              Node* node = new Node;
              node->data = t;
              node->next = NULL;

              while(true)
              {
                  Node* tail = _tail;
                  Node* next = tail->next;
                 
                  if (tail == _tail)
                  {
                      if (next == NULL)
                      {
                          if ( InterlockedCompareExchangePointer((PVOID*)&tail->next, node, next) == next )
                          {
                              InterlockedCompareExchangePointer((PVOID*)&_tail, node, tail);
                              break;
                          }
                      }
                      else
                      {
                          InterlockedCompareExchangePointer((PVOID*)&_tail, next, tail);
                      }

                  }
              }

              InterlockedExchangeAdd(&_size, 1);
          }

          int Dequeue(T& t)
          {
              if (_size == 0)
                  return -1;

              while(true)
              {
                  Node* head = _head;
                  Node* tail = _tail;
                  Node* next = head->next;
                  if (head == _head)
                  {
                      if (head == tail)
                      {
                          if (next == NULL)
                              return -1;
                      }
                      else
                      {
                          t = next->data;
                          if ( InterlockedCompareExchangePointer((PVOID*)&_head, next, head) == head )
                          {
                              delete head;
                              break;
                          }
                      }
                  }
              }
              InterlockedExchangeAdd(&_size, -1);
              return 0;
          }
      };
  • 스레드 10개씩을 생성하여 Enqueue 10만개, Dequeue 10만개 하여 작업이 완료되는 시점으로 성능을 비교하였다.

    • Enqueue는 Lock-Free 방식이 10배 정도 빠름.
    • Dequeue는 Lock-Free 방식이 2배 정도 빠름.
    • 스레드 개수를 4개씩으로 변경하여도 비슷한 수치가 나옴.
  • 주의사항

    • Dequeue함수의 delete head; 부분을 Hazard Pointer 등의 기법으로 대체하여야 합니다. 그렇지 않을 경우 exception이 발생할 수 있습니다.

 

작성자 :   임영기 ( javawork93@gmail.com )



boost에도 lock free 코드가 있는 것 같은데 적용 해봐도 괜찮을 것 같습니다.