Printable version XML version
Login
Name

Password


Join
Forgot your password?
erp5.org => wiki.erp5.org !

erp5.org has permanently moved to wiki.erp5.org !

Current status of ERP5 community websites:

  • www.erp5.org should redirect to wiki.erp5.org automaticcaly.
  • wiki.erp5.org is the place where fresh news and documentation are published.
  • cps.erp5.org is the old erp5 community website.

Note: if you created content in this ancient portal, please migrate it to the wiki. The old website will stay online as long as all contents are not mograted to the wiki.

Solving SQL locks in ERP5
ERP5 users commit writes in parallel. Under heavy load, it tends to generated locks in SQL message or index tables and make the whole system unusable. This article discusses the solutions implemented in ERP5 to provide better write parallelism.

This technical note is still being updated and revised. It is provided here for peer review.

SQL locks: an example

Before proceeding further, let us first explain what is an SQL lock. Let us open two MySQL connections and create two transactions:

    MySQL Connection A                            MySQL Connection B
    ------------------                            ------------------    

    begin;                                
                                                  begin;
    insert into message set method_id = "foo";    
                                                  insert into message set method_id = "bar";                           
    delete from message where method_id = "bar";
                                                  delete from message where method_id = "foo"; 
                                                  ERROR 1213: Deadlock found when trying to get lock; Try restarting transaction

If Connection A and Connection B were attached to Zope HTTP requests A and B, the user who started B would get an error message on screen. Under little load with few users, this situation would be very rare. Howver, with 30 users working in parallel on a ZEO cluster with a central SQL repository and Zope transactions which can last up to 30 seconds, this situation can become frequent.

From time to time, the majority of Zope HTTP requests end up with deadlock errors, making the whole system unusable.

Understanding SQL locks in ERP5

If two transactions update / insert / delete the same rows, one transaction will have to wait for the other. A timeout exception will be raised if the transactions last too long.

If, additionaly, the two transactions produce sequences of update / insert / delete which are incompatible, a deadlock exception will be raised.

In large ERP5 systems, this situation can happen in two cases:

  • whenever two Zope transactions try to insert / delete messages in the message activity queues. This is currently the worst source of locks.
  • whenever two Zope transactinos are trying to add a new unique uid for the same object. This source of locks has not been yet experienced much, probably because it is hidden by the first source

Solving message locks in ERP5

Message locks in ERP5 usually happen when users launch long transactions, or whenever a long transaction if being executed by a batch process. Long transactions in ERP5 last from 10 seconds to 2 minutes and may generate a large number of insert / delete in the message queue.

The idea to solve this problem is to defer the execution of SQL insert / delete queries at the end of the Zope transaction.

Two approaches have been implemented:

  • Deferred SQL connection: a new SQL connection class was added to ZMySQLDA so that queries are simply stored in a list and are then executed at once at the end of the transaction.
  • Transactional Activities and Reentrant Zope Transactions

Deferred SQL Connection

The DeferredDB class simply queues SQL query strings (def query) and defers their execution at the end of Zope transaction (def _finish). This approch is only valid for INSERT / UPDATE SQL statements. SELECT statements currently raise an error:

      class DeferredDB(DB):
          """
              An experimental MySQL DA which implements deferred execution
              of SQL code in order to reduce locks
          """

          def __init__(self,connection):
              DB.__init__(self, connection)
              self.sql_string_list = []

          def query(self,query_string, max_rows=1000):
              self._use_TM and self._register()
              desc=None
              result=()
              db=self.db
              try:
                  self._lock.acquire()
                  for qs in filter(None, map(strip,split(query_string, '\0'))):
                      qtype = upper(split(qs, None, 1)[0])
                      if qtype == "SELECT":
                            raise NotSupportedError, "can not SELECT in deferred connections"
                      #LOG('ZMySQLDA', 0, "insert string %s" % qs )
                      self.sql_string_list.append(qs)
              finally:
                  self._lock.release()

              return (),()

          def _begin(self, *ignored):
              from thread import get_ident
              self._tlock.acquire()
              self._tthread = get_ident()

          def _finish(self, *ignored):
              from thread import get_ident
              if not self._tlock.locked() or self._tthread != get_ident():
                  LOG('ZMySQLDA', INFO, "ignoring _finish")
                  return
              # BEGIN commit
              LOG('ZMySQLDA', INFO, "BEGIN commit")
              try:
                  if self._transactions:
                      self.db.query("BEGIN")
                      self.db.store_result()
                  if self._mysql_lock:
                      self.db.query("SELECT GET_LOCK('%s',0)" % self._mysql_lock)
                      self.db.store_result()
              except:
                  LOG('ZMySQLDA', ERROR, "exception during _begin",
                      error=sys.exc_info())
                  self._tlock.release()
                  raise
              # Execute SQL            
              db = self.db
              for qs in self.sql_string_list:
                  try:
                      db.query(qs)
                      c=db.store_result()                       
                  except OperationalError, m:
                      if m[0] not in hosed_connection: raise
                      # Hm. maybe the db is hosed.  Let's restart it.
                      db=self.db=apply(self.Database_Connection, (), self.kwargs)
                      try:
                          db.query(qs)
                          c=db.store_result()
                      except OperationalError, m:
                          raise            
                  LOG('ZMySQLDA', INFO, "Execute %s" % qs)
              # Finish commit             
              LOG('ZMySQLDA', INFO, "FINISH commit")
              try:
                  try:
                      if self._mysql_lock:
                          self.db.query("SELECT RELEASE_LOCK('%s')" % self._mysql_lock)
                          self.db.store_result()
                      if self._transactions:
                          self.db.query("COMMIT")
                      self.db.store_result()
                  except:
                      LOG('ZMySQLDA', ERROR, "exception during _finish",
                          error=sys.exc_info())
                      raise
              finally:
                  self._tlock.release()

          def _abort(self, *ignored):
              from thread import get_ident
              if not self._tlock.locked() or self._tthread != get_ident():
                  LOG('ZMySQLDA', INFO, "ignoring _abort")
                  return
              self._tlock.release()          

It would have been possible to provide immediate execution of SELECT statements and defered execution of other statements, but it was viewed as risky since reordering SQL statements may lead to confusing results, and immediate execution might generate some locks.

Although MySQL provides transactional tables (InnoDB for example), one should notice that this approach allows near transactional behaviour for UPDATE/INSERT in a Zope environment which uses non transaction MySQL tables.

Transactional Activities and Reentrant Zope Transactions

Another approch consists in registering CMFActivity to Zope transaction manager. Messages insert/delete methods are buffered into a queue and executed at the end of the Zope transaction. The methods queueMessage and deleteMessage forward method call to the singleton activity_tool instance of activity_tool:

      def queueMessage(self, activity_tool, m):    
        activity_tool.deferredQueueMessage(self, m)  

      def deleteMessage(self, activity_tool, m):
        if not self.isMessageDeleted(activity_tool, m):
          activity_tool.deferredDeleteMessage(self, m)  
        # We must never deleted twice

which in turns forwards the method call to a volatile activity_buffer instance (one instance per thread):

      def deferredQueueMessage(self, activity, message):
        self._v_activity_buffer.deferredQueueMessage(self, activity, message)

      def deferredDeleteMessage(self, activity, message):
        self._v_activity_buffer.deferredDeleteMessage(self, activity, message)

activity_buffer is an instance of class ActivityBuffer:

      class ActivityBuffer(TM):

          _p_oid=_p_changed=_registered=None

          def __init__(self):
              from thread import allocate_lock
              self._use_TM = self._transactions = 1
              if self._use_TM:
                  self._tlock = allocate_lock()
                  self._tthread = None
              self._lock = allocate_lock()

          # Keeps a list of messages to add and remove
          # at end of transaction
          def _begin(self, *ignored):
              from thread import get_ident
              from ActivityTool import activity_list
              self._tlock.acquire()
              self._tthread = get_ident()
              self.requires_prepare = 1
              try:
                  self.queued_activity = []
                  self.flushed_activity = []
                  for activity in activity_list:              # Reset registration for each transaction
                      activity.registerActivityBuffer(self)
              except:
                  LOG('ActivityBuffer', ERROR, "exception during _begin",
                      error=sys.exc_info())
                  self._tlock.release()
                  raise

          def _finish(self, *ignored):
              from thread import get_ident
              if not self._tlock.locked() or self._tthread != get_ident():
                  LOG('ActivityBuffer', INFO, "ignoring _finish")
                  return
              try:
                  try:
                      # Try to push / delete all messages
                      for (activity, activity_tool, message) in self.flushed_activity:
                          activity.finishDeleteMessage(activity_tool, message)
                      for (activity, activity_tool, message) in self.queued_activity:
                          activity.finishQueueMessage(activity_tool, message)
                  except:
                      LOG('ActivityBuffer', ERROR, "exception during _finish",
                          error=sys.exc_info())
                      raise
              finally:
                  self._tlock.release()

          def _abort(self, *ignored):
              from thread import get_ident
              if not self._tlock.locked() or self._tthread != get_ident():
                  LOG('ActivityBuffer', 0, "ignoring _abort")
                  return
              self._tlock.release()

          def tpc_prepare(self, *ignored):
              if not self.requires_prepare: return
              self.requires_prepare = 0
              from thread import get_ident
              if not self._tlock.locked() or self._tthread != get_ident():
                  LOG('ActivityBuffer', 0, "ignoring tpc_prepare")
                  return
              try:
                  # Try to push / delete all messages
                  for (activity, activity_tool, message) in self.flushed_activity:
                      activity.prepareDeleteMessage(activity_tool, message)
                  for (activity, activity_tool, message) in self.queued_activity:
                      activity.prepareQueueMessage(activity_tool, message)
              except:
                  LOG('ActivityBuffer', ERROR, "exception during tpc_prepare",
                      error=sys.exc_info())
                  raise

          def deferredQueueMessage(self, activity_tool, activity, message):
            self._register()      
            # Activity is called to prevent queuing some messages (useful for example
            # to prevent reindexing objects multiple times)
            if not activity.isMessageRegistered(self, activity_tool, message):
              self.queued_activity.append((activity, activity_tool, message))
              # We register queued messages so that we can 
              # unregister them
              activity.registerMessage(self, activity_tool, message)

          def deferredDeleteMessage(self, activity_tool, activity, message):
            self._register()
            self.flushed_activity.append((activity, activity_tool, message))

All transaction handling is therefore centralised by ActivityBuffer:

  • a list of queued/deleted messages is initialised at transaction begin
  • queued/deleted messages are commited at transaction finish

If we look at the code of activity classes, 4 new methods are defined to handle the real processing of messages:

      # Transaction Management
      def prepareQueueMessage(self, activity_tool, m):
        # Called to prepare transaction commit for queued messages
        pass

      def finishQueueMessage(self, activity_tool, m):
        # Called to commit queued messages
        pass

      def prepareDequeueMessage(self, activity_tool, m):
        # Called to prepare transaction commit for deleted messages
        pass

      def finishDequeueMessage(self, activity_tool, m):
        # Called to commit deleted messages
        pass

In RAMDict, all message handling is achieved during the finish step. Messages are simply added or deleted from the message dictionnary:

      def finishQueueMessage(self, activity_tool, m):
        if m.is_registered:
          self.dict[(m.object_path, m.method_id)] = m

      def finishDeleteMessage(self, activity_tool, message):
        for key, m in self.dict.items():
          if m.object_path == message.object_path and m.method_id == message.method_id:
              del self.dict[(m.object_path, m.method_id)]

Besides transaction API, a registration API is added:

      def registerActivityBuffer(self, activity_buffer):
        class_name = self.__class__.__name__
        setattr(activity_buffer, '_%s_message_list' % class_name, [])  

      def isMessageRegistered(self, activity_buffer, activity_tool, m):
        class_name = self.__class__.__name__
        return m in getattr(activity_buffer, '_%s_message_list' % class_name)

      def registerMessage(self, activity_buffer, activity_tool, m):
        class_name = self.__class__.__name__
        getattr(activity_buffer, '_%s_message_list' % class_name).append(m)
        m.is_registered = 1

      def unregisterMessage(self, activity_buffer, activity_tool, m):
        m.is_registered = 0

      def getRegisteredMessageList(self, activity_buffer, activity_tool):
        class_name = self.__class__.__name__
        return filter(lambda m: m.is_registered, getattr(activity_buffer, '_%s_message_list' % class_name))                      

It can be partly or completely overriden by each Activity class. Its purpose is to make sure messages are not queued more than necessary, and that flush operations flush both already commited messages and messages which have been queued during the same transaction.

Incidentally, this implementation provides a nice solution to make RAM based activities transactional.

However, for SQL based activities, it requires to modify Zope transaction manager and implement 3 phase commit instead of 2 phase commit.

We modified Zope Transaction class so that:

            ncommitted = 0
            jars = self._get_jars(objects, subtransaction)            

becomes:

            ncommitted = 0
            # Do proceed until number of jars is stable
            # WARNING: this could create infinite loop
            jars_len = -1
            jars = self._get_jars(objects, subtransaction)            
            while len(jars) != jars_len:             
              jars_len = len(jars)
              self._commit_prepare(jars, subjars, subtransaction)
              jars = self._get_jars(objects, subtransaction)                        

The underlying idea is that objects registered to the Zope transaction manager should be allowed to register new jars just before a transaction if commited. For example, calling ZSQL methods registers new jars to the transaction manager. If ZSQL methods are called during tpc_commit, they will never be commited.

We then added a new _commit_prepare method to Zope Transaction class (Transaction.py):

      def _commit_prepare(self, jars, subjars, subtransaction):
          if subtransaction:
              assert not subjars
              for jar in jars:
                  try:
                      jar.tpc_prepare(self, subtransaction)
                  except TypeError:
                      # Assume that TypeError means that tpc_begin() only
                      # takes one argument, and that the jar doesn't
                      # support subtransactions.
                      jar.tpc_prepare(self)
                  except AttributeError:
                      # Assume that KeyError means that tpc_prepare
                      # not available
                      pass                    
          else:
              # Merge in all the jars used by one of the subtransactions.

              # When the top-level subtransaction commits, the tm must
              # call commit_sub() for each jar involved in one of the
              # subtransactions.  The commit_sub() method should call
              # tpc_begin() on the storage object.

              # It must also call tpc_begin() on jars that were used in
              # a subtransaction but don't support subtransactions.

              # These operations must be performed on the jars in order.

              # Modify jars inplace to include the subjars, too.
              jars += subjars
              jars.sort(jar_cmp)
              # assume that subjars is small, so that it's cheaper to test
              # whether jar in subjars than to make a dict and do has_key.
              for jar in jars:
                  try:
                      jar.tpc_prepare(self)
                  except AttributeError:
                      # Assume that KeyError means that tpc_prepare
                      # not available
                      pass                    

This method simply calls tpc_prepare on all jars.

In SQLDict, all message handling is achieved during the prepare step:

      def prepareQueueMessage(self, activity_tool, m):
        if m.is_registered:
          activity_tool.SQLDict_writeMessage(path = '/'.join(m.object_path) ,
                                              method_id = m.method_id,
                                              priority = m.activity_kw.get('priority', 1),
                                              message = self.dumpMessage(m))
                                              # Also store uid of activity

      def prepareDeleteMessage(self, activity_tool, m):
        # Erase all messages in a single transaction
        uid_list = activity_tool.SQLDict_readUidList(path=m.object_path, method_id=m.method_id,processing_node=None)
        uid_list = map(lambda x:x.uid, uid_list)
        activity_tool.SQLDict_delMessage(uid = uid_list) 

This way, SQLDict can issue SQL method calls which will extend the list of jars of the Zope transaction. All SQL transactions are committed during the finish phase of the Zope transaction.

Results

To be done

Solving unique uid locks in ERP5

Many methods or forms in ERP5 require documents to be assigned a valid unique uid. Currently, uids are generated by reindexing objects. A central SQL table provides a garantee that uid values are unique.

This approach requires immediate reindexing in different cases:

  • creation of a new document
  • access to uid value of related document (ex. parent_uid, source_uid, etc.) which has not yet been indexed

In a parallel reindexing environment, locks may happen whenever a single document is being reindexed by different threads on different nodes and assigned different uid values. This happens in particular whenever we reindex trees of newly created objects: each object might be assigned a different reindexing node. Access to parent_uid would then lead to different values on each node and cause conflicts during transaction commit.

Reserved uid values

A solution consists in reserving uid values for each ZEO client and using reserved uid values whenever an object needs to be assigned a new uid rather than proceeding to reindexing.

Zope read conflict mechanism provides a garantee that uid is only assigned once for each object. SQL access is drasticaly reduced, which reduces the risk of locks.

Involved steps are:

  • assigning uid to an object should take no time and require no SQL access
  • Uid is taken out of a buffer of reserved uids maintained by the portal_catalog of each ZEO client
  • if buffer is empty, it is generated immediately
  • buffer is also generated by a producer task which checks every minute (15 seconds) if more reserved uids should be added for each ZEO client. As soon as reserved uids for a given thread is too low, more uids are created. This prevents from reserving uids in long transactions, which is a source of locks
Implementation

The catalog table stores a snapshot of reserved uid vales:

        uid               id                  path
        123456            ZEO1                reserved
        123457            ZEO1                reserved
        123458            ZEO1                reserved
        123459            ZEO1                reserved
        223456            ZEO2                reserved
        323457            ZEO2                reserved
        423458            ZEO2                reserved
        523459            ZEO2                reserved

id is used to store an identifier of the ZEO client. path stores an arbitary value which identifies reserved rows

Here is how the SQLCatalog class looks like now:

        def produceUid(self):
            """
              Produces reserved uids in advance
            """    
            thread_id = get_ident()
            uid_list = getattr(self, '_v_uid_buffer', [])
            if len(uid_list) < UID_BUFFER_SIZE:
              date = DateTime()
              new_uid_list = self.zProduceReservedUidList(count = UID_BUFFER_SIZE, thread_id=thread_id, date=date)
              uid_list.extend( map(lambda x: x.uid, new_uid_list ))
            self._v_uid_buffer = uid_list       

        def newUid(self):
          self.produceUid()
          uid_list = getattr(self, '_v_uid_buffer', [])
          if len(uid_list) > 0:
            return uid_list.pop()
          else:
            index=getattr(self, '_v_nextid', 0)
            if index%4000 == 0: index = randid()
            while self.hasUid(index):
              index=randid()          
            self._v_nextid=index+1    # This is a volatile value (stored in RAM)

the newUid method tries to allocate new uids using produceUid and returns to default uid generation else.

(this part still needs to be done)

Other issues

A list of other issues which generate locks:

  • parallel reindexing may generate locks whenever the same document is reindexed by two different nodes / threads. Enforcing serialisation of method calls is a must.

(c) 2001-2004 ERP5 Foundation
www.erp5.org
All Content Published Under Free Licenses
Powered by ERP5 Open Source ERP, Zope, CPS and Nexedi