Written by

Senior Cloud Architect at InterSystems
Discussion Eduard Lebedyuk · Mar 5, 2024

Work Queue with bidirectional communication

Work Queue Manager (WQM) is a feature of InterSystems IRIS that enables you to improve performance by distributing work to multiple concurrent processes programmatically. The idea is that you split the work into chunks, and WQM distributes the chunks across worker processes and can provide the info that the work is done.

However, recently, I got an interesting question: there's a large logical transaction composed of ~1,000,000 individual objects and SQL inserts and updates. Some updates are CPU-intensive, so the original idea was to use WQM to split an update into chunks to speed things up. 

But, here's a catch: if one of the individual 1,000,000 changes fails (there's a variety of application-level checks so that it can fail, and that's not even that abnormal a behavior), the entire transaction must be rolled back. That creates a problem: each chunk must report success before committing their individual transactions, and someone must get all these reports and decide if we are committing or not.

Unfortunately, it looks like WQM does not have a bidirectional communication between workers and manager, so I suggested an approach using events:

  1. Start jobs.
  2. Wait for all jobs to report success using $System.Event.WaitMsg().
  3. Send Commit or Rollback using the $System.Event.Signal().
 

Here's the code

Class elebedyu.Transaction
{

Parameter SUCCESS = "SUCCESS";Parameter ERROR = "ERROR";/// zw ##class(elebedyu.Transaction).Init()ClassMethod Init(count = {$SYSTEM.CPU.%New().nCores}, timeout As%Numeric = -1)
{
    // 1. Start jobs.set workers = ""set commit = $$$YESset sc = $$$OKfor i=1:1:count {
        job..Worker(timeout)
        set workers = workers _ $lb($zchild)
    }
    
    //zw workers// 2. Wait for all jobs to report success.for i=1:1:count {
        // return = $lb(code, message)// Where code is: // -1 implies a delete of the resource occurred while we were waiting// 0 implies a timeout occurred// 1 implies we were awakened due to a wakeup eventsetreturn = $System.Event.WaitMsg("", timeout)
        set code = $lg(return, 1)
        set message = $lg(return, 2)
        if ((code'=1) || (message'=..#SUCCESS)) {
            //zw code, message// At least one worker FAILED, so we're rolling back everythingset sc = $$$ERROR($$$GeneralError, $$$FormatText("Code: %1, Message: %2", code, message))
            set commit = $$$NO
        }
        // At least one worker FAILED, no need to wait for the rest.quit:'commit
        
    }
    //zw commit// 3. Commit or Rollback  // If all workers returned SUCCESS send "COMMIT" to all workers// If at least one worker FAILED send ROLLBACK to all workers// Also it does not matter if we signal before worker gets to WaitMsg, event will still get through.for i=1:1:count {
        set worker = $lg(workers, i)
        do$system.Event.Signal(worker, $case(commit, $$$YES:..#SUCCESS, :..#ERROR))
    }
    
    quit sc
}

ClassMethod Worker(timeout = -1)
{
    try {
        // Init root transaction TSTART// Init nested transactionTSTART// Do WORKset^work($j) = $random(100)
        
        // Random failure simulationif^work($j) > 95 {
            throw##class(%Exception.General).%New("<TOO MUCH>")
        }
        
        // Commit nested transaction, but we still can rollback the root transaction (which will rollback the nested transaction)TCOMMIT// Report SUCCESSdo$system.Event.Signal($zparent, ..#SUCCESS)
        
        // Wait for confirmation from the job managersetreturn = $system.Event.WaitMsg("", timeout)
        
        // Check directions from the job managerset code = $lg(return, 1)
        set message = $lg(return, 2)
        if ((code=1) && (message=..#SUCCESS)) {
            TCOMMIT
        } else {
            // Rollback nested and root transactionsTROLLBACK
        }
        
    } catch ex {
        // Report ERRORdo$system.Event.Signal($zparent, ..#ERROR _ ": " _ ex.DisplayString())
        
        // Rollback nested and root transactionsTROLLBACK
    }
}

}

Is there a better approach? How do you resolve similar issues?

Comments

Timo Lindenschmid · Mar 5, 2024

Just wondering my initial thought was that just use the master process to initiate the transaction check the returned stati and rollback if it has failed. 

This can be done with WQM easily enough.

Set queue=$system.WorkMgr.%New()
  If (queue="") { 
  // Report Error, can check %objlasterror for %Status code
  }
  
  TSTARTFor i=1:1:100 {
  	Set sc=queue.Queue("##class(MyClass).ClassMethod",i) 
  	If$$$ISERR(sc) {
  	// report error
  	}
  }
  
  Set sc=queue.Sync() 
  If$$$ISERR(sc) {
    // A worker encounteres an issueTROLLBACK
  } and {
    // no errros reported by workersTCOMMIT  
  }


That should work just fine, i haven't tested it though.

0
Eduard Lebedyuk  Mar 6, 2024 to Timo Lindenschmid

That won't rollback transactions inside worker processes (as they might be gone by the point we pass sync).

0
Enrico Parisi · Mar 6, 2024

How can a process commit/rollback transactions of other process(es)??

0
Lorenzo Scalese  Mar 6, 2024 to Enrico Parisi

Technically this is not possible (I think). It can just send a message to the other process that decides to validate or not.

I needed this once (in abnormal situation), see this post 

0
Eduard Lebedyuk  Mar 6, 2024 to Enrico Parisi

There's an example of that in a sample code.

0
Enrico Parisi  Mar 6, 2024 to Eduard Lebedyuk

Sure, that's fine, each process commit/rollback it's own transactions.

My message was to @Timo Lindenschmid where he was trying to commit/rollback worker processes from the "main" process. That's not possible.

0
Alexey Maslov · Mar 6, 2024

Eduard,
what is the reason of having nested transactions inside the Worker method?
And how can you distribute single ("root") transaction execution and control among several processes?

I'd take another approach:

  • work manager master process is just distributing work items among workers and waiting for complete
  • each work item is just preparing data in some ^IRISTEMP* global w/o writing to database and reports the status to the master process
  • master process checks completion status to make a choice:
    • to TS, store ^IRISTEMP* inside application database, TC, kill ^IRISTEMP*
    • or just to kill ^IRISTEMP*.

Pros: it can be implemented using WQM.
Cons: huge amount of temporary data can be written into IRISTEMP database, but if the results of work items can be processed separately, master process can do it without waiting for all items completeness killing temporary subglobals one by one.

0
Eduard Lebedyuk  Mar 6, 2024 to Alexey Maslov

what is the reason of having nested transactions inside the Worker method?

The idea is to check that TCOMMIT works, for additional safety, but yes, inner pair of TSTART/TCOMMIT can be taken out.

And how can you distribute single ("root") transaction execution and control among several processes?

No problems with that, transaction iterates over history data, so it's possible to chunk it.

another approach

Thank you. Locking situation would be better with this approach.

0