JavaとJavaEEプログラマのブログ

JavaEEを中心にしたをソフトウェア開発についてのブログ

JavaEE6入門〜Message Driven Bean〜

Message Driven Bean略してMDBはJMS(JavaMessaeService)を利用して非同期処理を実現する仕組み。

JMSの配信モデルは次の二種類。

  • Queueを使って送信側(プロデューサー)と受信側(コンシューマー)が一対一で送受信を行うP2P(Point to Point)モデル
  • Topicを使って一つの送信側(プロデューサー)と複数の受信側(コンシューマー)が一対多で送受信を行うpub-sub(パブリッシュ・サブスクライブ)モデル。

JMSが使用するQueueやTopicはGlassFishに組み込まれているメッセージング・プロバイダ(OpenMQ)が提供する。
MDBEJBの一種、状態を持たないStateless Beanとして管理される。そのためトランザクションアノテーションによる依存性注入もEJBと同じ。

MDBはWebプロファイルに含まれないため、warに含めてデプロイすることは出来ない。jarファイルかearアーカイブにパッケージ化してデプロイする。


受信側(コンシューマー)MDBの例

/**
 @MessageDrivenアノテーションとMessageListenerインターフェースを実装したクラスがMDB。
 このクラスをjarにしてGlassFishのメッセージングプロバイダにデプロイすると、メッセージを受信する度に、onMessage()が実行される。
 mappedName属性でJMS接続先を指定。
 activationConfig属性でJMSのプロパティを設定している。
 activationConfig属性で受信するメッセージを絞り込むことができる。
 activationConfig属性はメッセージング・プロバイダ依存。
*/
@MessageDriven(mappedName="jms/javaee6/ToipcM" 
        ,activationConfig={@ActivationConfigProperty(propertyName="acknowledgeMode",propertyValue="Auto-acknowledgeMode")
        ,@ActivationConfigProperty(propertyName="messageSelector",propertyValue="orderAmout > 1000")
        })
public class ConsumerMBean implements MessageListener{
    
    // EJBと同じようにリソースを注入できる。
    @PersistenceContext
    private EntityManager em;
    
    @EJB
    private BusinessEJB ejb;
    
    // コンテナが提供するランタイム・コンテキスト
    @Resource
    private MessageDrivenContext ctx; 
    
    @Override
    public void onMessage(Message message) {
        TextMessage msg = (TextMessage)message;
        try {
            System.out.println(msg.getText());
            // JMSを使って新たにメッセージを送信することも出来る。

        } catch (JMSException ex) {
            Logger.getLogger(ConsumerMBean.class.getName()).log(Level.SEVERE, null, ex);
            
            // メソッド終了後にロールバックを指定している。実行時例外を投げてもロールバックが実行される。
            ctx.setRollbackOnly();
        }
    }
}


送信側(プロデューサー)の例

/**
 普通のJavaクラス。ここではステートレスのEJBにしている。
*/
@Stateless
public class ProducerEJB {
    // JMSのリソースを注入。
    @Resource(lookup = "jms/javaee6/ConnectionFactory")
    private static ConnectionFactory connectionFactory;
    
    @Resource(lookup = "jms/javaee6/Queue")
    private static Queue queue;

    public void sendMessage(Message msg) {
        try {

            /*
                OrderDTOはメッセージ送信用のオブジェクト。
                OrderDTOはSerializableを実装している。
            */
            Float totalAmount = Float.valueOf(129000);
            OrderDTO order = new OrderDTO(1234l, new Date(), "Serge Gainsbourg", totalAmount);

            // Creates the needed artifacts to connect to the queue
            Connection connection = connectionFactory.createConnection();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageProducer producer = session.createProducer(queue);

            // Sends an object message to the queue
            ObjectMessage message = session.createObjectMessage();
            message.setObject(order);
            message.setFloatProperty("orderAmount", totalAmount);
            producer.send(message);
            System.out.println("\nMessage sent : " + order.toString());

            connection.close();
        } catch (JMSException ex) {
            Logger.getLogger(ProducerEJB.class.getName()).log(Level.SEVERE, null, ex);
        }

    }
}