SOA架构中的事件驱动服务

豆豆网   技术应用频道   2008年01月22日  【字号: 收藏本文

内容摘要:讨论使用Mule实现一个高效的事件驱动和面向服务的平台,一个轻量级的事件-消息架构,企业信息总线(ESB)模式。组件和程序可以使用Mule通过公共的JMS或其他的消息处理技术去实现通信。

  事件管理器类是被封装在一个工厂类里,因此,可以依据需要去改变它的实现而不会影响到它的客户端。事件管理器实现如下:

package com.jeffhanson.mule;
  
import org.mule.umo.*;
import org.mule.extras.client.MuleClient;
import org.mule.impl.endpoint.MuleEndpoint;
import org.mule.config.QuickConfigurationBuilder;
  
import java.util.HashMap;
import java.util.Map;
  
public class EventManagerFactory
{
  private static HashMap instances = new HashMap();
  
  /**
  * Retrieves the event manager instance for a given protocol.
  *
  * @param protocol   The protocol to use.
  * @return EventManager The event manager instance.
  */
  public static EventManager getInstance(String protocol)
  {
   EventManager instance = (EventManager)instances.get(protocol);
  
   if (instance == null)
   {
     instance = new EventManagerImpl(protocol);
     instances.put(protocol, instance);
   }
  
   return instance;
  }
  
  /**
  * A concrete implementation for a simple event manager.
  */
  private static class EventManagerImpl
   implements EventManager
  {
   private UMOManager manager = null;
   private QuickConfigurationBuilder builder = null;
   private MuleClient eventClient = null;
   private String protocol = null;
   private MuleEndpoint receiveEndpoint = null;
   private MuleEndpoint sendEndpoint = null;
  
   private EventManagerImpl(String protocol)
   {
     this.protocol = protocol;
   }
  
   /**
    * Starts this event manager.
    */
   public void start()
   {
     try
     {
      builder = new QuickConfigurationBuilder();
      manager = builder.createStartedManager(true,
                          protocol + "tmp/events");
      eventClient = new MuleClient();
      receiveEndpoint = new MuleEndpoint(protocol
                        + "tmp/events/receive");
      sendEndpoint = new MuleEndpoint(protocol + "tmp/events/send");
     }
     catch (UMOException e)
     {
      System.err.println(e);
     }
   }
  
   /**
    * Stops this event manager.
    */
   public void stop()
   {
     try
     {
      manager.stop();
     }
     catch (UMOException e)
     {
      System.err.println(e);
     }
   }
  
   /**
    * Retrieves the protocol this event manager uses.
    * @return
    */
   public String getProtocol()
   {
     return protocol;
   }
  
   /**
    * Registers a service to receive event messages.
    *
    * @param serviceName   The name to associate with the service.
    * @param implementation  Either a container reference to the service
    *             or a fully-qualified class name
    *             to use as the component implementation.
    */
   public void registerService(String serviceName,
                 String implementation)
     throws EventException
   {
     if (!manager.getModel().isComponentRegistered(serviceName))
     {
      try
      {
        builder.registerComponent(implementation,
                     serviceName,
                     receiveEndpoint,
                     sendEndpoint);
      }
      catch (UMOException e)
      {
        throw new EventException(e.toString());
      }
     }
   }
  
   /**
    * Unregisters a service from receiving event messages.
    *
    * @param serviceName The name associated with the service to unregister.
    */
   public void unregisterService(String serviceName)
     throws EventException
   {
     try
     {
      builder.unregisterComponent(serviceName);
     }
     catch (UMOException e)
     {
      throw new EventException(e.toString());
     }
   }
  
   /**
    * Sends an event message synchronously to a given service.
    *
    * @param serviceName  The name of the service to which the event
    *            message is to be sent.
    * @param payload    The content of the event message
    * @return Object    The result, if any.
    * @throws EventException on error
    */
   public Object sendSynchronousEvent(String serviceName,
                     Object payload)
     throws EventException
   {
     try
     {
      if (!manager.getModel().isComponentRegistered(serviceName))
      {
        throw new EventException("Service: " + serviceName
                    + " is not registered.");
      }
  
      String transformers = null;
      Map messageProperties = null;
      UMOMessage result = eventClient.sendDirect(serviceName,
                            transformers,
                            payload,
                            messageProperties);
      if (result == null)
      {
        return null;
      }
      return result.getPayload();
     }
     catch (UMOException e)
     {
      throw new EventException(e.toString());
     }
     catch (Exception e)
     {
      throw new EventException(e.toString());
     }
   }
  
   /**
    * Sends an event message asynchronously.
    *
    * @param serviceName  The name of the service to which the event
    *            message is to be sent.
    * @param payload    The content of the event message.
    * @return FutureMessageResult The result, if any
    * @throws EventException on error
    */
   public FutureMessageResult sendAsynchronousEvent(String serviceName,
                            Object payload)
     throws EventException
   {
     FutureMessageResult result = null;
  
     try
     {
      if (!manager.getModel().isComponentRegistered(serviceName))
      {
        throw new EventException("Service: " + serviceName
                    + " is not registered.");
      }
  
      String transformers = null;
      Map messageProperties = null;
      result = eventClient.sendDirectAsync(serviceName,
                         transformers,
                         payload,
                         messageProperties);
     }
     catch (UMOException e)
     {
      throw new EventException(e.toString());
     }
  
     return result;
   }
  }
}

责编:豆豆技术应用

正在加载评论...