(Quick Reference)

5 Events Bus API - Reference Documentation

Authors: Marc Palmer (marc@grailsrocks.com), Stéphane Maldini (smaldini@vmware.com)

Version: 1.0.RC3

5 Events Bus API

Why an events bus ? Today's applications rely more and more on non-blocking processing, elasticity and modularity. An events bus loosely couples modules, enabling different codes and frameworks to work together, because the right tool for the right purpose paradigm is becoming a reality. The bus may also support publish/subscribe pattern, distributing the same message across the handling modules and giving an excellent opportunity to deploy the same application into a cluster or cloud .

Within the bus, an event is often as simple as a "callback" with no parameters, but usually there is some extra state passed as an event object. An event can be sent to multiple listeners, and any result returned from any listeners is passed back to the original sender of the event. An event belongs to a "topic" and often has a "subject". The topic is like a channel that identifies the kind of events. The optional subject is the object that the event "happened to". So for example a simple "app started" notification has no subject but may have topic "grails", but a "user logged in" even may have topic "security" and subject set to the user principal supplied by the security plugin you are using.

With Platform-Core plugin we have implemented a couple of features and artifacts to let you simply manage an event bus and get maximum flexibility when required :

  • Sending Events methods injected in your Domains, Controllers and Services
  • @Listener annotations for your Services methods.
  • Events mapping DSL artifact to select and control events topics
  • Spring beans with access to underlying API
  • More cool stuff with Events Spring Integration and Events Push
  • Simple config keys

In a nutshell, you will :

  • Send events :

class UserController{

   def registration(){
         def user = new User(params).save()
         if(user){

            //non-blocking call, will trigger application listeners for this topic
            event('mailRegistration', user)

            //blocking call :
            //event('mailRegistration', user).waitFor()

            //can also be written like that
            //event topic:'mailRegistration', data:user

            //and if you need to reuse the current thread
            //event topic:'mailRegistration', data:user, fork:false

            render(view:'sendingRegistrationMail')
         }else{
            render(view:'errorRegistration')
         }
  }
}
  • Write listeners (or event handler, or event reactor or whatever you call it):

class UserService{

   //use method name 'mailRegistration' as topic name
   //can also use custom topic name using topic arg: @Listener(topic='test')
   @grails.events.Listener
   def mailRegistration(User user){
         sendMail{
            to user.mail
            subject "Confirmation"
            html g.render(template:"userMailConfirmation")
        }
  }

  //Can also receive an EventMessage to get more information on this particular event)
  @grails.events.Listener(topic="mailRegistration")
  def mailRegistration2(org.grails.plugin.platform.events.EventMessage msg){
        sendMail{
           to msg.data.mail
           subject "Confirmation"
           html g.render(template:"userMailConfirmation")
        }
  }
}

5.1 Sending Events

Sending an event is simple. You only need to remind 1 method name and 2 different signatures:
  • event(topic, [data, params, callbackClosure])
  • event(Map args, [callbackClosure])

We recommend using the former signature if you don't have any params, otherwise the latter is more elegant in that case.

Let's see what the key arguments are doing:

  • Topic argument is a String which represents channel subscribed by listeners.
  • optional Data argument is an Object - preferrably Serializable for IO facilities - which represents the subject of your event such as a domain class.
  • optional Params argument is a Map which represents sending behaviors including namespace.
  • optional callbackClosure is a Closure triggered after an event completion.
  • The map notation allows you to reuse the same arguments than params plus topic for topic, data for data and for (shortcut for 'namespace'). If you specify params, it will use it for the params argument otherwise the first level map is used as params.

There are several params arguments :

KeyTypeDefaultDescription
forkBooleanfalseForce the event to reuse the caller thread, therefore executing the method synchronously and propagating any errors.
namespace / forString'app'Target a dedicated topic namespace. To avoid overlapping topic names, the events bus supports a scoping concept called namespace. E.g. 'gorm' is used by gorm events and 'browser' is used for Javascript listeners in events-push plugin.
onReplyClosure{EventReply reply} Same behavior than callbackClosure argument, overrides it if both are defined.
onErrorClosure{List<Exception> errors} If exceptions has been raised by listeners, this callback will be triggered. If undefined, exceptions will be propagated on EventReply.getValue(s).
gormSessionBooleantrueOpens a GORM session for the new thread which carries event execution.
timeoutLong Define a maximum time in millisecond for the event execution.
headersMap<String, Serializable> Additional headers for the event message enveloppe.

The event method returns EventReply which implements Future<Object> and provides usefuls methods :

  • List<Object> getValues() : Returns as many values as listeners has replied.
  • Object getValue() : First element of getValues().
  • int size() : Invoked listeners count.
  • List<Throwable> getErrors() : Available errors.
  • boolean hasErrors() : Scans for any errors.
  • EventReply waitFor() : blocks current thread and return this reply.
  • EventReply waitFor(long time) : blocks current thread for T milliseconds and returns this reply.

Events workflow

Events can be sent from domains, services and controllers artefacts by using EventReply event(String topic, Object data) . Platform-core Events bus provides a non-blocking way to send events by default, however you can block on several methods from EventReply :
  • size
  • waitFor
  • get
  • getValues
  • getValue

Therefore you have the control on the execution flow if you want. Just keep in mind it does not block for processing right after event() call, which seems to be a sensible default for the bus. Eventual Exceptions will be raised after using one of the mentioned blocking methods except if onError parameter is used.

class SomeController{

   def logout(){
      def reply = event("logout", session.user)
      //doesn't wait for event execution
      
      render reply.value  //wait and display value
      
      event(topic:"afterLogout").waitFor()
      
      //Only triggered when "afterLogout" finished
      def errorHandler = {errs -> }
      //Use a dedicated error handler
      event(topic:"afterAfterLogout", onError:errs)
   }
}

Non forked events

If you want to reuse the current thread and force synchronous processing, use the fork param. Be aware that each exception will be directly propagated to caller even without using blocking methods except if onError parameter is used.

class SomeController{

   def logout(){
      def reply = event('logout', session.user, [fork:false])
      //block for processing

      //no need to wait for reply since it has been populated on event call.
      render reply.value
   }
}

Assigning a namespace

All listeners get a property called namespace which prevents topic naming collisions and undesired events. By default, they are all assigned to app. This is the same default used when you send an event, but what if you want to reach others namespaced listeners, like 'browser' ones if you use events-push plugin ? Simply use namespace argument or use for if you stick with Map notation.

 class SomeController{

    def logout(){
        //we use the Map form, the namespace argument is identified by the 'for' key
       event for:'browser', topic:'logout', data:session.user
    }
 }

It's mandatory to declare namespace when using events bus from a plugin in order to avoid any conflicts.

Wildcard support

It's possible to call multiple topics/namespaces in a single shot using wildcard as the last character.

class SomeController{

    def logout(){
        /*
          We send to every listeners starting with "chat-"
          on every namespaces starting with "role-"
        */
        event for:'role-*', topic:'chat-*', data:session.user

        //Here we can trigger every listeners in the default namespace 'app'
        event '*'
    }
}

This feature will probably evolve to a smarter implementation behaving like UrlMappings and authorizing substring captures

5.2 Listening Events

Listening for events simply requires registering the method that should receive the event notifications.

There are few ways to register events.

Defining listeners at compile time

Within Grails services you can use the @Listener annotation. It takes a topic string, but you can omit it and use the method name as the topic to listen for:

class SomeService{

   @grails.events.Listener(topic = 'userLogged')
   def myMethod(User user){
   }

   //use 'mailSent' as topic name
   @grails.events.Listener
   def mailSent(){
  }
}

Event methods can define a single argument, and the value is the object sent with the event. Usually this is the "subject" of the event. However an event is carried by an enveloppe called EventMessage which contains several useful metadata like additionnal headers, current topic :

class SomeService{

   @grails.events.Listener(topic = 'userLogged')
   def myMethod(org.grails.plugin.platform.events.EventMessage userMessage){
    println userMessage.headers // display opt headers
    println userMessage.event // displays current topic
    println userMessage.data // displays data
   }
}

If a listener argument type is not assignable to an event data type, the event silently skips the mismatching listener. If you want to catch every event types, use Object type or if the argument is not necessary, do not declare it.

Filtering on the EventMessage<D> generic type doesn't work, e.g. EventMessage<Book> won't prevent EventMessage<Author> invokation. For such fine grained control, you can rely on Events Artifact

Namespacing

Your declared events belongs to the app namespace, unless you tune it using the namespace argument or the Events DSL we will introduce later.

class SomeService{

   @grails.events.Listener(topic = 'userLogged', namespace = 'security')
   def myMethod(User user){
   }

   //will subscribe this method to topic 'afterInsert' on namespace 'gorm'
   @grails.events.Listener(namespace = 'gorm')
   def afterInsert(User user){
  }
}

Remember that you will need to specify the scope when triggering events if you customize it with a different value than app :

class SomeController{
   def myAction(){
        event for:'security', topic:'userLogged', data:session.user
   }
}

It's mandatory to declare namespace when using events bus from a plugin in order to avoid any conflicts.

Proxy (AOP) support

By default, listeners try to call the original method (unproxified bean). Using proxySupport you can tweak this setting :

class SomeService{

   static transactional = true

   //Will invoke transactional logic, similar to someSerice.myMethod()
   @grails.events.Listener(proxySupport=true)
   def myMethod(User user){
   }

}

Dynamic listeners

Some edge cases need runtime registration. If you meet this use case, use the injected on method :

class SomeController{

 def testInlineListener = {
        //register with 'logout' topic on 'app' default namespace
        def listener = on("logout") {User user ->
            println "test $user"
        }
        render "$listener registered"
 }

 def testInlineListener2 = {
        //register a 'gorm' namespaced handler on 'afterInsert' topic.
        def listener = on("gorm", "afterInsert") {Book book ->
            println "test $book"
        }
        render "$listener registered"
 }
}

Wildcard support

Capturing a wider group of events can be useful, specially for monitoring purposes. It's possible to listen for multiple topics/namespaces in a single shot using wildcard as the last character.

class SomeService{

 @grails.events.Listener(namespace='role-*', topic = 'chat-*')
   def myMethod(org.grails.plugin.platform.events.EventMessage userMessage){
      println userMessage.namespace
      println userMessage.event
   }
}

This feature will probably evolve to a smarter implementation behaving like UrlMappings and authorizing substring captures

Listener ID

Registered listeners generate a unique id (ListenerId) applying the following pattern :
[namespace://]topic[:package.Class][#method][@hashcode]

The above square brackets determine each optional part of the sequence id thus allowing to target group of listeners depending of the known arguments: namespace, class, method, hashcode.

This pattern is useful when using countListeners, removeListeners or extensions. For instance, overriding a generated channel with events-si plugin requires to use namespace://topic if namespace is different from "app". Another example to count listeners:

//count every listeners subscribed to 'mytopic' inside TestService
countListeners("mytopic:my.TestService")

//count every listeners using gorm namespace
countListeners("gorm://*")

//remove every listeners in TestService
removeListeners("*:my.TestService")

Reloading in Development mode

It works.

5.3 Replying from Listeners

Usually, an event is fired and forgot . In some cases, you may expect an answer to transform your messaging architecture into a controlled flow. For instance, a negative reply can be used in GORM events to veto database writing for the current subjet. Another usual example is the aggregation of multiple workers products.

Simple reply

Replying is a simple matter of returning an object from the listener method :

class SomeService{
    @Listener
    def logout(User user){
       Date disconnectDate = new Date()

       //do something with user

       return disconnectDate
    }
}

If listeners return non null objects, the caller can access them through the EventReply enveloppe returned immediatly after calling event method. The other option is the use of a reply handler :

class SomeController{

   def logout(){
       def reply = event topic:"logout", data:session.user, fork:false
       render reply.value //display value

       //Using callback closure
       def replyHandler = {EventReply reply-> }
       event topic:"logout", data:session.user, onReply:replyHandler

       //Or as last argument
       event(topic:"logout", data:session.user){ EventReply r-> }


       //EventReply object is a Future implementation
       def reply_future = event topic:"logout", data:session.user
       render reply_future.get(30, TimeUnit.SECONDS)
   }
}

Whenever an event is triggered, a task is submitted into the events bus thread pool and a Future returned, wrapped into EventReply. It's also planned to fully support reply-address pattern in a future version (replyTo parameter) which brings interesting features out of the box : non blocking response, streaming handler response one by one, forwarding using topic name instead of closure…

Multiple replies

Multiple listeners can return values for the same topic/namespace. In this case, EventReply will wait for all handlers before returning any value. Remember that a valid result is a non null value, hence why even if 3 handlers have reacted but only 2 did return something, then you will only see 2 values in the EventReply.values.

class SomeController{

   def logout(){
       def reply = event topic:"sendMails", data:session.user

       //wait for all listeners and then display the first value from the aggregated results
       render reply.value
       
       //display all results as List
       render reply.values
   }
}

Exceptions

Because no code is perfect, exceptions can happen in the event process for 3 reasons :

  • RuntimeException in one or more handlers
  • InterruptedException if the process has been cancelled
  • TimeoutException if the maximum process time has been reached (timeout parameter)

An onError parameter is available and accepts a Closure{List<Throwable> errors}. If non set, exceptions are propagated to the caller when blocking the EventReply object ( getValue etc) and/or when fork == false.

Exceptions in multiple listeners scenario don't interrupt the execution flow and leave a chance to others listeners to execute as well. The return value from a failing listener becomes the raised exception.

class SomeController{

   def logout(){
       on('test'){
         sleep(5000)
         throw new MyException('haha')
       }

       def reply = event topic:"test"
       reply.values //throws MyException after 5s


       def errorsHandler = {println it}
       reply = event topic:"test", onError:errorsHandler
       reply.values //calls errorsHandler and returns values which contain at MyException

       event(topic:"test", onError:errorsHandler, timeout:1000){
          //executes both this and errorsHandler closures with a TimeoutException
       }

       reply = event(topic:"test", onError:errorsHandler, timeout:1000)
       reply.cancel() //executes errorsHandler closure with an InterruptedException

       event(topic:"test", fork:false)
       //wait 5s and raises an exception in the caller thread

   }
}

Waiting replies

In domains, services and controllers artefacts you can wait for events using "EventReply waitFor(EventReply… eventReplies)". It accepts as many events replies you want and returns the same array for functiunal programming style. EventReply also have a waitFor method for one-line waiting.

class SomeController{

   def logout(){
      def reply = event('logout', session.user)
      def reply2 = event('logout', session.user)
      def reply3 = event('logout', session.user)

      waitFor(reply,reply2,reply3).each{EventReply reply->
        render reply.value +'</br>'
      }

      //same with 20 seconds timeout on each reply
      waitFor(20, TimeUnit.SECONDS, reply,reply2,reply3).each{EventReply reply->
              render reply.value
      }

      //other style :
      event('logout', session.user).waitFor() //blocks event
      event('logout', session.user).waitFor(2000) //blocks event for maximum 2 seconds

   }
}

5.4 Routing configuration -- The XxxEvents Artifact

An extensible Events DSL is available in grails-app/conf for routing configuration. This artifact does detail behaviors of event method by selecting topics and namespaces to apply :
  • Filtering
  • Disabling
  • Sending behaviors
  • Extensions
  • Security
  • Declarations

The DSL is intended to evolve. One of the most wanted features is topic/namespace declaration: assigning a definition to a property would generate an injectable eponym bean with streaming methods.

The DSL requires to assign a closure to an events variable. Each call is a definition, the method name is parsed as a topic name and key/value arguments are definitions attributes. Wildcard topics/namespaces are supported as well.

An Events artifact is a script with some bound variables:

VariableDescription
grailsApplicationGrails application object, retrieves artifacts, context etc.
ctxSpring context, useful for beans access, e.g. ctx.myService.method()
configConfiguration object

Each definition supports the following attributes:

Attribute nameTypeDefaultDescription
namespaceString "app" Define which namespace the current definition is bound
filterClosure(Object)
Closure(EventMessage)
Class
 If a closure is passed, the return value matched as the condition for event propagation.
If a class is passed, the subject data type must match.
disabledbooleanfalseDisable event propagation
forkbooleanfalseUse the current thread for event processing (blocking call)
onErrorClosure(List<Throwable>) Default onError handler for the current topic(s)
onReplyClosure(EventReply) Default onReply handler for the current topic(s)
timeoutLong Default timeout for execution time, throwing a TimeoutException and calls handlers
** Any attributes can be written to be used by plugins through EventDefinition.othersAttributes

events = {
    //prevents any events in gorm namespace
    '*' namespace:'gorm', disabled:true

    //filters any events on 'testTopic' where data <= 2
    testTopic filter:{it > 2}

    //filters any events on 'testTopic2' where data is not a TestTopic class type
    testTopic2 filter:TestTopic

    //filters any events on 'testTopicX' using boolean method from service
    testTopicX filter:ctx.myService.&someMethod

    //only if using events-push plugin, allows client-side listener on this topic
    testTopic3 browser:true

    //Default Error Handling, Global Reply Handling, timeout and fork
    testTopicD onError:{}, onReply:{}, timeout:1000l
    testTopicD2 fork:false


    //-------------- roadmap --------------

    //not yet implemented: Assigning and merging definitions
    //myTopic = testTopic4(filter:{i>2})
    //myTopic = testTopic4(filter:{i<4})

    //not yet implemented: Enabling security context for target listeners
    //testTopic5 secured:true

    //not yet implemented: Topic Forwarding
    //testTopic6 to:'anotherTopic'

    //not yet implemented: Topic Handlers
    //testTopic9 onError:'anotherTopicErrors', onReply:'anotherTopicReplies'

}

Reloading in Development mode

It works.

5.5 Listening GORM events

Starting from Grails 2, the Events Bus supports GORM events.

GORM Listeners

To listen for GORM, simply declare listeners on the gorm namespace using the following supported topics table :

Event TypeTarget Topic
PreInsertEventbeforeInsert
PreUpdateEventbeforeUpdate
PreDeleteEventbeforeDelete
ValidationEventbeforeValidate
PostInsertEventafterInsert
PostUpdateEventafterUpdate
PostDeleteEventafterDelete
SaveOrUpdateEventonSaveOrUpdate

Same listeners behaviors apply, e.g. using EventMessage for the argument type, using wildcard topics etc. Because listeners are called if there are no arguments or the argument type is assignable to current event data type, specifying a domain class is the only required step to filter domains events.

class SomeService{

    @Listener(namespace = 'gorm')
    void afterInsert(Author author) {
        println "after save author -  $author.name"
    }

    @Listener(topic = 'beforeInsert', namespace = 'gorm')
    void beforeInsertBook(Book book) {
        println "will insert book - $book.title"
    }

    //Will catch everything since we don't filter on the subject by using EventMessage
    @Listener(topic = 'before*', namespace = 'gorm')
    void beforeEachGormEvent(EventMessage message) {
        println "gorm event $message.event on domain $message.data.class"
    }

}

Filtering with Events Artifact

Setting a filter through an Events artifact allows more fined control and efficient selection since it prevents events to be propagated :

events = {
    'afterInsert' namespace:'gorm', filter:Book
    'afterDelete' namespace:'gorm', filter:{it.id > 5}
    'afterUpdate' namespace:'gorm', filter:{it in Book || it in Author}
    'beforeDelete' namespace:'gorm', disabled:true
}

GORM may generate tons of events. Consider using it wisely, combine it with routing filtering. You can also totally disable gorm bridge by using events.gorm.disabled configuration key.

Threading behaviors

GORM Listeners are executed in the same thread than the caller in order to reuse the current opened session, if any. Avoid blocking logic if possible or use the listener body to call another event.

Vetoing changes

If a listener handles one of the before* topics and returns a boolean value, it becomes part of the vetoing chain:

  • Returning false will cancel the current database write
  • Returning true will just let the chain continuing

class SomeService{

    //veto any Book insert
    @Listener(topic = 'beforeInsert', namespace = 'gorm')
    boolean beforeInsertBook(Book book) {
        false
    }

}

5.6 Spring Beans

Plugin developpers and any crazy tweakers may need to override one or more Events Bus beans, like the Spring Integration plugin does. The grailsEvents bean is also useful to inject events methods into unhandled artifacts (other than domain, service, controller).

Bean NameTypeDefault ImplementationDescription
grailsEvents org.grails.plugin.platform.events .Events org.grails.plugin.platform.events .EventsImpl Main events gateway which contains injected methods in artifacts
grailsEventsPublisher org.grails.plugin.platform.events .publisher.EventsPublisher org.grails.plugin.platform.events .publisher.DefaultsEventsPublisher Publisher bean, triggers events. To be implemented by extensions if required (e.g. events-si)
grailsEventsRegistry org.grails.plugin.platform.events .registry.EventsRegistry org.grails.plugin.platform.events .registry.DefaultsEventsRegistry Registry bean, store listeners and route events. To be implemented by extensions if required (e.g. events-si)
gormTopicSupport org.grails.plugin.platform.events .dispatcher.GormTopicSupport org.grails.plugin.platform.events .dispatcher.GormTopicSupport2X Translates gorm events to topic names and processes veto(s).
grailsEventsGormBridge org.grails.plugin.platform.events .publisher.GormBridgePublisher  Listens for GORM events and publishes to the right bus using gormTopicSupport.
grailsTopicExecutor org.springframework .core.task.TaskExecutor org.springframework.scheduling .concurrent.ThreadPoolTaskExecutor Carries on events execution.

5.7 Securing events

To be implemented. You can still use headers or data to pass security context for instance. M4 release will bring platform-security abstraction ready for events.

5.8 Extensions

Writing extensions is one of the greatest habbits of grails developpers. Groovy and Grails communites are like that, pragmatic and pleasant.

Having seen the referenced beans in the previous chapter should already give you ideas to improve or customize your needs. There are two available examples of extensions:

  • events-si : This plugin overrides the publisher and registry beans in order to replace the default mechanisms with the much

more flexible Spring Integration channels and endpoints.

  • events-push : This plugin registers new Listeners from your cool browsers using javascript, authorized through the

new Events DSL attributes browser and browserFilter

5.9 Configuration properties

Based on Platform-Core configuration mechanism, the plugin provides the following Events-Bus related keys:

Configuration KeyTypeDefaultDescription
grails.plugin.platform.events.disabledBooleanfalseFully disable Events Bus mechanism, no events methods will be injected
grails.plugin.platform.events.poolSizeInteger10Allow X concurrent workers to process events
grails.plugin.platform.events.gorm.disabledBooleanfalseDisable GORM bridge, stopping GORM events from being published
grails.plugin.platform.events.catchFlushExceptionBooleantrueCatch any GORM flushing exceptions which could be noisy specially when vetoing changes

In addition, you can override beans values such as gormTopicSupport

beans{
    gormTopicSupport {
        //transform gorm Events Objects types into topics
        translateTable = [
            'PreInsertEvent': 'beforeInsert', 'PreUpdateEvent': 'beforeUpdate',
            /*'PreLoadEvent': 'beforeLoad',*/ 'PreDeleteEvent': 'beforeDelete',
            'ValidationEvent': 'beforeValidate', 'PostInsertEvent': 'afterInsert',
            'PostUpdateEvent': 'afterUpdate', 'PostDeleteEvent': 'afterDelete',
            /*'PostLoadEvent': 'afterLoad',*/ 'SaveOrUpdateEvent': 'onSaveOrUpdate'
        ]
    }
}