Wednesday, October 16, 2013

Using Semaphores In BPEL

I came upon a situation in which I needed to make a bunch of asynchronous calls to long running worker BPELs. However, I didn't want to necessarily rely on the connection pools to be the gatekeepers for fear that other processes may be starved as they may require more real-time responses and time-out. As a result, I was looking for a way to limit the number of asynchronous calls I made at one time.

My first thought was to use something built into BPEL like the "targets" and "sources". I was using a parallel forEach activity knew they had some flow control properties. Unfortunately, their nature seems to only be if you a one particular activity to be performed before another. I needed something a little more powerful; I needed a semaphore.

The only problem with using a semaphore is that they are a foreign concept to SOA Suite. Much of the concepts of concurrency and threading are handled by the individual composites and the overarching BPEL and Mediator engines. However we wanted to stay within SOA Suite, but gain a little more control over our process. BPEL fortunately has the ability to include Java packages and execute them through Java Activities, so I wrote a little Java code to create a singleton wrapper around a semaphore.

SharedSemaphore Code
package test;
import java.util.concurrent.Semaphore;

public class SemaphoreSingleton {
    private static SemaphoreSingleton instance;
    private Semaphore sem;
    private SemaphoreSingleton(int threads) {
        sem = new Semaphore(threads,true);
    }
    public static SemaphoreSingleton getSharedSemphore(){
        return getSharedSemphore(1);
    }
    public static SemaphoreSingleton getSharedSemphore(int threads){
        if(instance == null){
            instance = new SemaphoreSingleton(threads);
        }
        return instance;
    }
    public boolean acquire(){
        return sem.tryAcquire();
    }
    public void release(){
        sem.release();
    }
}

Once I had my class written all I needed to do was make sure that the class was imported into the BPEL process and utilize the class in the Java Activities. Below you will notice that there is an input on the number of threads necessary, because we wanted to make sure that we could tweak it over time to deal with our loads in the future. This was accomplished using a global variable with the number of threads based upon a number of input parameters.

Class Imports
...
  <import location="test.SemaphoreSingleton" importType="http://schemas.oracle.com/bpel/extension/java"/>
  <import location="java.lang.Integer" importType="http://schemas.oracle.com/bpel/extension/java"/>
  <partnerLinks>

Lock Java Activity
int threads = ((Integer)getVariableData("threads")).intValue();
SemaphoreSingleton impl = SemaphoreSingleton.getSharedSemaphore(threads);   
  
setVariableData("haveLock",impl.acquire());

Release Java Activity
SemaphoreSingleton impl = SemaphoreSingleton.getSharedSemaphore(); 
impl.release();

One additional thing to note is the use of tryAcquire() in the Java class. The reason for using it instead of acquire()or acquireUninterruptibly() was that it would cause the entire BPEL process to halt and it will just spin its wheels. As a result of only trying the acquire that means that it needs to be tried again until a lock is acquired. This can be seen in the picture below. To determine whether a lock was attained there is a scoped variably called haveLock which is just a boolean.


I hope this helps anyone looking to have a little more control over their asynchronous BPEL processes. Please leave comments and questions below.


No comments: