|
|
|
|
|
|
Login
|
erp5.org has permanently moved to wiki.erp5.org ! Current status of ERP5 community websites:
Note: if you created content in this ancient portal, please migrate it to the wiki. The old website will stay online as long as all contents are not mograted to the wiki.
Solving SQL locks in ERP5
ERP5 users commit writes in parallel. Under heavy load, it tends to generated locks in SQL message or index tables and make the whole system unusable. This article discusses the solutions implemented in ERP5 to provide better write parallelism.
This technical note is still being updated and revised. It is provided here for peer review. SQL locks: an exampleBefore proceeding further, let us first explain what is an SQL lock. Let us open two MySQL connections and create two transactions:
MySQL Connection A MySQL Connection B
------------------ ------------------
begin;
begin;
insert into message set method_id = "foo";
insert into message set method_id = "bar";
delete from message where method_id = "bar";
delete from message where method_id = "foo";
ERROR 1213: Deadlock found when trying to get lock; Try restarting transaction
If Connection A and Connection B were attached to Zope HTTP requests A and B, the user who started B would get an error message on screen. Under little load with few users, this situation would be very rare. Howver, with 30 users working in parallel on a ZEO cluster with a central SQL repository and Zope transactions which can last up to 30 seconds, this situation can become frequent. From time to time, the majority of Zope HTTP requests end up with deadlock errors, making the whole system unusable. Understanding SQL locks in ERP5If two transactions update / insert / delete the same rows, one transaction will have to wait for the other. A timeout exception will be raised if the transactions last too long. If, additionaly, the two transactions produce sequences of update / insert / delete which are incompatible, a deadlock exception will be raised. In large ERP5 systems, this situation can happen in two cases:
Solving message locks in ERP5Message locks in ERP5 usually happen when users launch long transactions, or whenever a long transaction if being executed by a batch process. Long transactions in ERP5 last from 10 seconds to 2 minutes and may generate a large number of insert / delete in the message queue. The idea to solve this problem is to defer the execution of SQL insert / delete queries at the end of the Zope transaction. Two approaches have been implemented:
Deferred SQL ConnectionThe DeferredDB class simply queues SQL query strings (def query) and defers their execution at the end of Zope transaction (def _finish). This approch is only valid for INSERT / UPDATE SQL statements. SELECT statements currently raise an error:
class DeferredDB(DB):
"""
An experimental MySQL DA which implements deferred execution
of SQL code in order to reduce locks
"""
def __init__(self,connection):
DB.__init__(self, connection)
self.sql_string_list = []
def query(self,query_string, max_rows=1000):
self._use_TM and self._register()
desc=None
result=()
db=self.db
try:
self._lock.acquire()
for qs in filter(None, map(strip,split(query_string, '\0'))):
qtype = upper(split(qs, None, 1)[0])
if qtype == "SELECT":
raise NotSupportedError, "can not SELECT in deferred connections"
#LOG('ZMySQLDA', 0, "insert string %s" % qs )
self.sql_string_list.append(qs)
finally:
self._lock.release()
return (),()
def _begin(self, *ignored):
from thread import get_ident
self._tlock.acquire()
self._tthread = get_ident()
def _finish(self, *ignored):
from thread import get_ident
if not self._tlock.locked() or self._tthread != get_ident():
LOG('ZMySQLDA', INFO, "ignoring _finish")
return
# BEGIN commit
LOG('ZMySQLDA', INFO, "BEGIN commit")
try:
if self._transactions:
self.db.query("BEGIN")
self.db.store_result()
if self._mysql_lock:
self.db.query("SELECT GET_LOCK('%s',0)" % self._mysql_lock)
self.db.store_result()
except:
LOG('ZMySQLDA', ERROR, "exception during _begin",
error=sys.exc_info())
self._tlock.release()
raise
# Execute SQL
db = self.db
for qs in self.sql_string_list:
try:
db.query(qs)
c=db.store_result()
except OperationalError, m:
if m[0] not in hosed_connection: raise
# Hm. maybe the db is hosed. Let's restart it.
db=self.db=apply(self.Database_Connection, (), self.kwargs)
try:
db.query(qs)
c=db.store_result()
except OperationalError, m:
raise
LOG('ZMySQLDA', INFO, "Execute %s" % qs)
# Finish commit
LOG('ZMySQLDA', INFO, "FINISH commit")
try:
try:
if self._mysql_lock:
self.db.query("SELECT RELEASE_LOCK('%s')" % self._mysql_lock)
self.db.store_result()
if self._transactions:
self.db.query("COMMIT")
self.db.store_result()
except:
LOG('ZMySQLDA', ERROR, "exception during _finish",
error=sys.exc_info())
raise
finally:
self._tlock.release()
def _abort(self, *ignored):
from thread import get_ident
if not self._tlock.locked() or self._tthread != get_ident():
LOG('ZMySQLDA', INFO, "ignoring _abort")
return
self._tlock.release()
It would have been possible to provide immediate execution of SELECT statements and defered execution of other statements, but it was viewed as risky since reordering SQL statements may lead to confusing results, and immediate execution might generate some locks. Although MySQL provides transactional tables (InnoDB for example), one should notice that this approach allows near transactional behaviour for UPDATE/INSERT in a Zope environment which uses non transaction MySQL tables. Transactional Activities and Reentrant Zope TransactionsAnother approch consists in registering CMFActivity to Zope transaction manager. Messages insert/delete methods are buffered into a queue and executed at the end of the Zope transaction. The methods queueMessage and deleteMessage forward method call to the singleton activity_tool instance of activity_tool:
def queueMessage(self, activity_tool, m):
activity_tool.deferredQueueMessage(self, m)
def deleteMessage(self, activity_tool, m):
if not self.isMessageDeleted(activity_tool, m):
activity_tool.deferredDeleteMessage(self, m)
# We must never deleted twice
which in turns forwards the method call to a volatile activity_buffer instance (one instance per thread):
def deferredQueueMessage(self, activity, message):
self._v_activity_buffer.deferredQueueMessage(self, activity, message)
def deferredDeleteMessage(self, activity, message):
self._v_activity_buffer.deferredDeleteMessage(self, activity, message)
activity_buffer is an instance of class ActivityBuffer:
class ActivityBuffer(TM):
_p_oid=_p_changed=_registered=None
def __init__(self):
from thread import allocate_lock
self._use_TM = self._transactions = 1
if self._use_TM:
self._tlock = allocate_lock()
self._tthread = None
self._lock = allocate_lock()
# Keeps a list of messages to add and remove
# at end of transaction
def _begin(self, *ignored):
from thread import get_ident
from ActivityTool import activity_list
self._tlock.acquire()
self._tthread = get_ident()
self.requires_prepare = 1
try:
self.queued_activity = []
self.flushed_activity = []
for activity in activity_list: # Reset registration for each transaction
activity.registerActivityBuffer(self)
except:
LOG('ActivityBuffer', ERROR, "exception during _begin",
error=sys.exc_info())
self._tlock.release()
raise
def _finish(self, *ignored):
from thread import get_ident
if not self._tlock.locked() or self._tthread != get_ident():
LOG('ActivityBuffer', INFO, "ignoring _finish")
return
try:
try:
# Try to push / delete all messages
for (activity, activity_tool, message) in self.flushed_activity:
activity.finishDeleteMessage(activity_tool, message)
for (activity, activity_tool, message) in self.queued_activity:
activity.finishQueueMessage(activity_tool, message)
except:
LOG('ActivityBuffer', ERROR, "exception during _finish",
error=sys.exc_info())
raise
finally:
self._tlock.release()
def _abort(self, *ignored):
from thread import get_ident
if not self._tlock.locked() or self._tthread != get_ident():
LOG('ActivityBuffer', 0, "ignoring _abort")
return
self._tlock.release()
def tpc_prepare(self, *ignored):
if not self.requires_prepare: return
self.requires_prepare = 0
from thread import get_ident
if not self._tlock.locked() or self._tthread != get_ident():
LOG('ActivityBuffer', 0, "ignoring tpc_prepare")
return
try:
# Try to push / delete all messages
for (activity, activity_tool, message) in self.flushed_activity:
activity.prepareDeleteMessage(activity_tool, message)
for (activity, activity_tool, message) in self.queued_activity:
activity.prepareQueueMessage(activity_tool, message)
except:
LOG('ActivityBuffer', ERROR, "exception during tpc_prepare",
error=sys.exc_info())
raise
def deferredQueueMessage(self, activity_tool, activity, message):
self._register()
# Activity is called to prevent queuing some messages (useful for example
# to prevent reindexing objects multiple times)
if not activity.isMessageRegistered(self, activity_tool, message):
self.queued_activity.append((activity, activity_tool, message))
# We register queued messages so that we can
# unregister them
activity.registerMessage(self, activity_tool, message)
def deferredDeleteMessage(self, activity_tool, activity, message):
self._register()
self.flushed_activity.append((activity, activity_tool, message))
All transaction handling is therefore centralised by ActivityBuffer:
If we look at the code of activity classes, 4 new methods are defined to handle the real processing of messages:
# Transaction Management
def prepareQueueMessage(self, activity_tool, m):
# Called to prepare transaction commit for queued messages
pass
def finishQueueMessage(self, activity_tool, m):
# Called to commit queued messages
pass
def prepareDequeueMessage(self, activity_tool, m):
# Called to prepare transaction commit for deleted messages
pass
def finishDequeueMessage(self, activity_tool, m):
# Called to commit deleted messages
pass
In RAMDict, all message handling is achieved during the finish step. Messages are simply added or deleted from the message dictionnary:
def finishQueueMessage(self, activity_tool, m):
if m.is_registered:
self.dict[(m.object_path, m.method_id)] = m
def finishDeleteMessage(self, activity_tool, message):
for key, m in self.dict.items():
if m.object_path == message.object_path and m.method_id == message.method_id:
del self.dict[(m.object_path, m.method_id)]
Besides transaction API, a registration API is added:
def registerActivityBuffer(self, activity_buffer):
class_name = self.__class__.__name__
setattr(activity_buffer, '_%s_message_list' % class_name, [])
def isMessageRegistered(self, activity_buffer, activity_tool, m):
class_name = self.__class__.__name__
return m in getattr(activity_buffer, '_%s_message_list' % class_name)
def registerMessage(self, activity_buffer, activity_tool, m):
class_name = self.__class__.__name__
getattr(activity_buffer, '_%s_message_list' % class_name).append(m)
m.is_registered = 1
def unregisterMessage(self, activity_buffer, activity_tool, m):
m.is_registered = 0
def getRegisteredMessageList(self, activity_buffer, activity_tool):
class_name = self.__class__.__name__
return filter(lambda m: m.is_registered, getattr(activity_buffer, '_%s_message_list' % class_name))
It can be partly or completely overriden by each Activity class. Its purpose is to make sure messages are not queued more than necessary, and that flush operations flush both already commited messages and messages which have been queued during the same transaction. Incidentally, this implementation provides a nice solution to make RAM based activities transactional. However, for SQL based activities, it requires to modify Zope transaction manager and implement 3 phase commit instead of 2 phase commit. We modified Zope Transaction class so that:
ncommitted = 0
jars = self._get_jars(objects, subtransaction)
becomes:
ncommitted = 0
# Do proceed until number of jars is stable
# WARNING: this could create infinite loop
jars_len = -1
jars = self._get_jars(objects, subtransaction)
while len(jars) != jars_len:
jars_len = len(jars)
self._commit_prepare(jars, subjars, subtransaction)
jars = self._get_jars(objects, subtransaction)
The underlying idea is that objects registered to the Zope transaction manager should be allowed to register new jars just before a transaction if commited. For example, calling ZSQL methods registers new jars to the transaction manager. If ZSQL methods are called during tpc_commit, they will never be commited. We then added a new _commit_prepare method to Zope Transaction class (Transaction.py):
def _commit_prepare(self, jars, subjars, subtransaction):
if subtransaction:
assert not subjars
for jar in jars:
try:
jar.tpc_prepare(self, subtransaction)
except TypeError:
# Assume that TypeError means that tpc_begin() only
# takes one argument, and that the jar doesn't
# support subtransactions.
jar.tpc_prepare(self)
except AttributeError:
# Assume that KeyError means that tpc_prepare
# not available
pass
else:
# Merge in all the jars used by one of the subtransactions.
# When the top-level subtransaction commits, the tm must
# call commit_sub() for each jar involved in one of the
# subtransactions. The commit_sub() method should call
# tpc_begin() on the storage object.
# It must also call tpc_begin() on jars that were used in
# a subtransaction but don't support subtransactions.
# These operations must be performed on the jars in order.
# Modify jars inplace to include the subjars, too.
jars += subjars
jars.sort(jar_cmp)
# assume that subjars is small, so that it's cheaper to test
# whether jar in subjars than to make a dict and do has_key.
for jar in jars:
try:
jar.tpc_prepare(self)
except AttributeError:
# Assume that KeyError means that tpc_prepare
# not available
pass
This method simply calls tpc_prepare on all jars. In SQLDict, all message handling is achieved during the prepare step:
def prepareQueueMessage(self, activity_tool, m):
if m.is_registered:
activity_tool.SQLDict_writeMessage(path = '/'.join(m.object_path) ,
method_id = m.method_id,
priority = m.activity_kw.get('priority', 1),
message = self.dumpMessage(m))
# Also store uid of activity
def prepareDeleteMessage(self, activity_tool, m):
# Erase all messages in a single transaction
uid_list = activity_tool.SQLDict_readUidList(path=m.object_path, method_id=m.method_id,processing_node=None)
uid_list = map(lambda x:x.uid, uid_list)
activity_tool.SQLDict_delMessage(uid = uid_list)
This way, SQLDict can issue SQL method calls which will extend the list of jars of the Zope transaction. All SQL transactions are committed during the finish phase of the Zope transaction. ResultsTo be done Solving unique uid locks in ERP5Many methods or forms in ERP5 require documents to be assigned a valid unique uid. Currently, uids are generated by reindexing objects. A central SQL table provides a garantee that uid values are unique. This approach requires immediate reindexing in different cases:
In a parallel reindexing environment, locks may happen whenever a single document is being reindexed by different threads on different nodes and assigned different uid values. This happens in particular whenever we reindex trees of newly created objects: each object might be assigned a different reindexing node. Access to parent_uid would then lead to different values on each node and cause conflicts during transaction commit. Reserved uid valuesA solution consists in reserving uid values for each ZEO client and using reserved uid values whenever an object needs to be assigned a new uid rather than proceeding to reindexing. Zope read conflict mechanism provides a garantee that uid is only assigned once for each object. SQL access is drasticaly reduced, which reduces the risk of locks. Involved steps are:
ImplementationThe catalog table stores a snapshot of reserved uid vales:
uid id path
123456 ZEO1 reserved
123457 ZEO1 reserved
123458 ZEO1 reserved
123459 ZEO1 reserved
223456 ZEO2 reserved
323457 ZEO2 reserved
423458 ZEO2 reserved
523459 ZEO2 reserved
id is used to store an identifier of the ZEO client. path stores an arbitary value which identifies reserved rows Here is how the SQLCatalog class looks like now:
def produceUid(self):
"""
Produces reserved uids in advance
"""
thread_id = get_ident()
uid_list = getattr(self, '_v_uid_buffer', [])
if len(uid_list) < UID_BUFFER_SIZE:
date = DateTime()
new_uid_list = self.zProduceReservedUidList(count = UID_BUFFER_SIZE, thread_id=thread_id, date=date)
uid_list.extend( map(lambda x: x.uid, new_uid_list ))
self._v_uid_buffer = uid_list
def newUid(self):
self.produceUid()
uid_list = getattr(self, '_v_uid_buffer', [])
if len(uid_list) > 0:
return uid_list.pop()
else:
index=getattr(self, '_v_nextid', 0)
if index%4000 == 0: index = randid()
while self.hasUid(index):
index=randid()
self._v_nextid=index+1 # This is a volatile value (stored in RAM)
the newUid method tries to allocate new uids using produceUid and returns to default uid generation else. (this part still needs to be done) Other issuesA list of other issues which generate locks:
|
|
|