(Quick Reference)
5 Events Bus API - Reference Documentation
Authors: Marc Palmer (marc@grailsrocks.com), Stéphane Maldini (smaldini@vmware.com)
Version: 1.0.RC3
5 Events Bus API
Why an events bus ?
Today's applications rely more and more on non-blocking processing, elasticity and modularity. An events bus loosely couples modules,
enabling different codes and frameworks to work together, because
the right tool for the right purpose paradigm is
becoming a reality. The bus may also support
publish/subscribe pattern, distributing the same message across the handling modules and giving an
excellent opportunity to deploy the same application into a cluster or
cloud .
Within the bus, an event is often as simple as a "callback" with no parameters, but usually there is some extra state passed as an event object.
An event can be sent to multiple listeners, and any result returned from any listeners is passed back to the original sender of the event.
An event belongs to a "topic" and often has a "subject". The topic is like a channel that identifies the kind of events. The optional subject is the object that the event "happened to".
So for example a simple "app started" notification has no subject but may have topic "grails", but a "user logged in" even may have topic "security" and subject set to the user principal supplied by the security plugin you are using.
With Platform-Core plugin we have implemented a couple of features and artifacts to let you simply manage an
event bus
and get maximum flexibility when required :
- Sending Events methods injected in your Domains, Controllers and Services
- @Listener annotations for your Services methods.
- Events mapping DSL artifact to select and control events topics
- Spring beans with access to underlying API
- More cool stuff with Events Spring Integration and Events Push
- Simple config keys
In a nutshell, you will :
class UserController{
def registration(){
def user = new User(params).save()
if(user){
//non-blocking call, will trigger application listeners for this topic
event('mailRegistration', user)
//blocking call :
//event('mailRegistration', user).waitFor()
//can also be written like that
//event topic:'mailRegistration', data:user
//and if you need to reuse the current thread
//event topic:'mailRegistration', data:user, fork:false
render(view:'sendingRegistrationMail')
}else{
render(view:'errorRegistration')
}
}
}
- Write listeners (or event handler, or event reactor or whatever you call it):
class UserService{
//use method name 'mailRegistration' as topic name
//can also use custom topic name using topic arg: @Listener(topic='test')
@grails.events.Listener
def mailRegistration(User user){
sendMail{
to user.mail
subject "Confirmation"
html g.render(template:"userMailConfirmation")
}
}
//Can also receive an EventMessage to get more information on this particular event)
@grails.events.Listener(topic="mailRegistration")
def mailRegistration2(org.grails.plugin.platform.events.EventMessage msg){
sendMail{
to msg.data.mail
subject "Confirmation"
html g.render(template:"userMailConfirmation")
}
}
}
5.1 Sending Events
Sending an event is simple. You only need to remind 1 method name and 2 different signatures:
- event(topic, [data, params, callbackClosure])
- event(Map args, [callbackClosure])
We recommend using the former signature if you don't have any params, otherwise the latter is more elegant in that case.
Let's see what the key arguments are doing:
- Topic argument is a String which represents channel subscribed by listeners.
- optional Data argument is an Object - preferrably Serializable for IO facilities - which represents the subject of your event such as a domain class.
- optional Params argument is a Map which represents sending behaviors including namespace.
- optional callbackClosure is a Closure triggered after an event completion.
- The map notation allows you to reuse the same arguments than params plus topic for topic, data for data and for (shortcut for 'namespace'). If you specify params, it will use it for the params argument otherwise the first level map is used as params.
There are several
params arguments :
Key | Type | Default | Description |
---|
fork | Boolean | false | Force the event to reuse the caller thread, therefore executing the method synchronously and propagating any errors. |
namespace / for | String | 'app' | Target a dedicated topic namespace. To avoid overlapping topic names, the events bus supports a scoping concept called namespace. E.g. 'gorm' is used by gorm events and 'browser' is used for Javascript listeners in events-push plugin. |
onReply | Closure{EventReply reply} | | Same behavior than callbackClosure argument, overrides it if both are defined. |
onError | Closure{List<Exception> errors} | | If exceptions has been raised by listeners, this callback will be triggered. If undefined, exceptions will be propagated on EventReply.getValue(s). |
gormSession | Boolean | true | Opens a GORM session for the new thread which carries event execution. |
timeout | Long | | Define a maximum time in millisecond for the event execution. |
headers | Map<String, Serializable> | | Additional headers for the event message enveloppe. |
The event method returns
EventReply which implements Future<Object> and provides usefuls methods :
- List<Object> getValues() : Returns as many values as listeners has replied.
- Object getValue() : First element of getValues().
- int size() : Invoked listeners count.
- List<Throwable> getErrors() : Available errors.
- boolean hasErrors() : Scans for any errors.
- EventReply waitFor() : blocks current thread and return this reply.
- EventReply waitFor(long time) : blocks current thread for T milliseconds and returns this reply.
Events workflow
Events can be sent from domains, services and controllers artefacts by using
EventReply event(String topic, Object data) .
Platform-core Events bus provides a non-blocking way to send events by default, however you can
block on several methods from
EventReply :
- size
- waitFor
- get
- getValues
- getValue
Therefore you have the control on the execution flow if you want. Just keep in mind it does not block for processing right after event() call, which seems to be a
sensible default for the bus. Eventual
Exceptions will be raised after using one of the mentioned blocking methods except if
onError parameter is used.
class SomeController{
def logout(){
def reply = event("logout", session.user)
//doesn't wait for event execution
render reply.value //wait and display value
event(topic:"afterLogout").waitFor()
//Only triggered when "afterLogout" finished
def errorHandler = {errs -> }
//Use a dedicated error handler
event(topic:"afterAfterLogout", onError:errs)
}
}
Non forked events
If you want to reuse the current thread and force synchronous processing, use the fork param. Be aware that each exception will be directly propagated to caller even without using blocking methods except
if
onError parameter is used.
class SomeController{
def logout(){
def reply = event('logout', session.user, [fork:false])
//block for processing
//no need to wait for reply since it has been populated on event call.
render reply.value
}
}
Assigning a namespace
All listeners get a property called namespace which prevents topic naming collisions and undesired events. By default, they are all assigned to
app.
This is the same default used when you send an event, but what if you want to reach others namespaced listeners, like 'browser' ones if you use
events-push plugin ? Simply use
namespace argument or use
for if you stick with Map notation.
class SomeController{
def logout(){
//we use the Map form, the namespace argument is identified by the 'for' key
event for:'browser', topic:'logout', data:session.user
}
}
It's mandatory to declare namespace when using events bus from a plugin in order to avoid any conflicts.
Wildcard support
It's possible to call multiple topics/namespaces in a single shot using
wildcard as the last character.
class SomeController{
def logout(){
/*
We send to every listeners starting with "chat-"
on every namespaces starting with "role-"
*/
event for:'role-*', topic:'chat-*', data:session.user
//Here we can trigger every listeners in the default namespace 'app'
event '*'
}
}
This feature will probably evolve to a smarter implementation behaving like UrlMappings and authorizing substring captures
5.2 Listening Events
Listening for events simply requires registering the method that should receive the event notifications.
There are few ways to register events.
Defining listeners at compile time
Within Grails services you can use the
@Listener annotation. It takes a
topic string, but you can omit it and use the
method name as the topic to listen for:
class SomeService{
@grails.events.Listener(topic = 'userLogged')
def myMethod(User user){
}
//use 'mailSent' as topic name
@grails.events.Listener
def mailSent(){
}
}
Event methods can define a
single argument, and the value is the object sent with the event. Usually this is the "subject" of the event.
However an event is carried by an enveloppe called EventMessage which contains several useful metadata like additionnal headers, current topic :
class SomeService{
@grails.events.Listener(topic = 'userLogged')
def myMethod(org.grails.plugin.platform.events.EventMessage userMessage){
println userMessage.headers // display opt headers
println userMessage.event // displays current topic
println userMessage.data // displays data
}
}
If a listener argument type is not assignable to an event data type, the event
silently skips the mismatching listener. If you want to catch
every event types, use Object type or if the argument is not necessary, do not declare it.
Filtering on the EventMessage<D> generic type doesn't work, e.g. EventMessage<Book> won't prevent EventMessage<Author> invokation. For such fine grained
control, you can rely on Events Artifact
Namespacing
Your declared events belongs to the
app namespace, unless you tune it using the
namespace argument or the Events DSL we will introduce later.
class SomeService{
@grails.events.Listener(topic = 'userLogged', namespace = 'security')
def myMethod(User user){
}
//will subscribe this method to topic 'afterInsert' on namespace 'gorm'
@grails.events.Listener(namespace = 'gorm')
def afterInsert(User user){
}
}
Remember that you will need to specify the scope when triggering events if you customize it with a different value than
app :
class SomeController{
def myAction(){
event for:'security', topic:'userLogged', data:session.user
}
}
It's mandatory to declare namespace when using events bus from a plugin in order to avoid any conflicts.
Proxy (AOP) support
By default, listeners try to call the original method (unproxified bean). Using
proxySupport you can tweak this setting :
class SomeService{
static transactional = true
//Will invoke transactional logic, similar to someSerice.myMethod()
@grails.events.Listener(proxySupport=true)
def myMethod(User user){
}
}
Dynamic listeners
Some edge cases need runtime registration. If you meet this use case, use the injected
on method :
class SomeController{
def testInlineListener = {
//register with 'logout' topic on 'app' default namespace
def listener = on("logout") {User user ->
println "test $user"
}
render "$listener registered"
}
def testInlineListener2 = {
//register a 'gorm' namespaced handler on 'afterInsert' topic.
def listener = on("gorm", "afterInsert") {Book book ->
println "test $book"
}
render "$listener registered"
}
}
Wildcard support
Capturing a wider group of events can be useful, specially for monitoring purposes. It's possible to listen for multiple topics/namespaces in a single shot using
wildcard as the last character.
class SomeService{
@grails.events.Listener(namespace='role-*', topic = 'chat-*')
def myMethod(org.grails.plugin.platform.events.EventMessage userMessage){
println userMessage.namespace
println userMessage.event
}
}
This feature will probably evolve to a smarter implementation behaving like UrlMappings and authorizing substring captures
Listener ID
Registered listeners generate a unique id (
ListenerId) applying the following pattern :
[namespace://]topic[:package.Class][#method][@hashcode]
The above square brackets determine each optional part of the sequence id thus allowing to target group of listeners depending
of the known arguments: namespace, class, method, hashcode.
This pattern is useful when using
countListeners,
removeListeners or
extensions. For instance, overriding
a generated
channel with
events-si plugin requires to use
namespace://topic
if
namespace is different from "app".
Another example to count listeners:
//count every listeners subscribed to 'mytopic' inside TestService
countListeners("mytopic:my.TestService")
//count every listeners using gorm namespace
countListeners("gorm://*")
//remove every listeners in TestService
removeListeners("*:my.TestService")
Reloading in Development mode
It works.
5.3 Replying from Listeners
Usually, an event is
fired and forgot . In some cases, you may expect an answer to transform your messaging architecture
into a controlled flow. For instance, a negative reply can be used in GORM events to veto database writing for the current subjet.
Another usual example is the aggregation of multiple workers products.
Simple reply
Replying is a simple matter of returning an object from the listener method :
class SomeService{
@Listener
def logout(User user){
Date disconnectDate = new Date()
//do something with user
return disconnectDate
}
}
If listeners return non null objects, the caller can access them through the EventReply enveloppe returned immediatly
after calling
event method. The other option is the use of a
reply handler :
class SomeController{
def logout(){
def reply = event topic:"logout", data:session.user, fork:false
render reply.value //display value
//Using callback closure
def replyHandler = {EventReply reply-> }
event topic:"logout", data:session.user, onReply:replyHandler
//Or as last argument
event(topic:"logout", data:session.user){ EventReply r-> }
//EventReply object is a Future implementation
def reply_future = event topic:"logout", data:session.user
render reply_future.get(30, TimeUnit.SECONDS)
}
}
Whenever an event is triggered, a task is submitted into the events bus thread pool and a Future returned, wrapped into EventReply.
It's also planned to fully support reply-address pattern in a future version (replyTo parameter) which brings interesting
features out of the box : non blocking response, streaming handler response one by one, forwarding using topic name instead of closure…
Multiple replies
Multiple listeners can return values for the same topic/namespace. In this case, EventReply will wait for all handlers before returning any value.
Remember that a valid result is a non null value, hence why even if 3 handlers have reacted but only 2 did return something, then you will only
see 2 values in the
EventReply.values.
class SomeController{
def logout(){
def reply = event topic:"sendMails", data:session.user
//wait for all listeners and then display the first value from the aggregated results
render reply.value
//display all results as List
render reply.values
}
}
Exceptions
Because no code is perfect, exceptions can happen in the event process for 3 reasons :
- RuntimeException in one or more handlers
- InterruptedException if the process has been cancelled
- TimeoutException if the maximum process time has been reached (timeout parameter)
An
onError parameter is available and accepts a
Closure{List<Throwable> errors}. If non set, exceptions are
propagated to the caller when blocking the EventReply object (
getValue etc) and/or when
fork == false.
Exceptions in multiple listeners scenario don't interrupt the execution flow and leave a chance to others listeners to execute as well.
The return value from a failing listener becomes the raised exception.
class SomeController{
def logout(){
on('test'){
sleep(5000)
throw new MyException('haha')
}
def reply = event topic:"test"
reply.values //throws MyException after 5s
def errorsHandler = {println it}
reply = event topic:"test", onError:errorsHandler
reply.values //calls errorsHandler and returns values which contain at MyException
event(topic:"test", onError:errorsHandler, timeout:1000){
//executes both this and errorsHandler closures with a TimeoutException
}
reply = event(topic:"test", onError:errorsHandler, timeout:1000)
reply.cancel() //executes errorsHandler closure with an InterruptedException
event(topic:"test", fork:false)
//wait 5s and raises an exception in the caller thread
}
}
Waiting replies
In domains, services and controllers artefacts you can wait for events using "EventReply waitFor(EventReply… eventReplies)".
It accepts as many events replies you want and returns the same array for functiunal programming style.
EventReply also have a waitFor method for one-line waiting.
class SomeController{
def logout(){
def reply = event('logout', session.user)
def reply2 = event('logout', session.user)
def reply3 = event('logout', session.user)
waitFor(reply,reply2,reply3).each{EventReply reply->
render reply.value +'</br>'
}
//same with 20 seconds timeout on each reply
waitFor(20, TimeUnit.SECONDS, reply,reply2,reply3).each{EventReply reply->
render reply.value
}
//other style :
event('logout', session.user).waitFor() //blocks event
event('logout', session.user).waitFor(2000) //blocks event for maximum 2 seconds
}
}
5.4 Routing configuration -- The XxxEvents Artifact
An extensible Events DSL is available in
grails-app/conf for routing configuration.
This artifact does detail behaviors of
event method by selecting topics and namespaces to apply :
- Filtering
- Disabling
- Sending behaviors
- Extensions
- Security
- Declarations
The DSL is intended to evolve. One of the most wanted features is topic/namespace declaration: assigning a definition
to a property would generate an injectable eponym bean with streaming methods.
The DSL requires to assign a closure to an
events variable.
Each call is a
definition, the method name is parsed as a topic name and key/value arguments are definitions attributes.
Wildcard topics/namespaces are supported as well.
An
Events artifact is a script with some bound variables:
Variable | Description |
---|
grailsApplication | Grails application object, retrieves artifacts, context etc. |
ctx | Spring context, useful for beans access, e.g. ctx.myService.method() |
config | Configuration object |
Each
definition supports the following attributes:
Attribute name | Type | Default | Description |
---|
namespace | String | "app" | Define which namespace the current definition is bound |
filter | Closure(Object) Closure(EventMessage) Class | | If a closure is passed, the return value matched as the condition for event propagation. If a class is passed, the subject data type must match. |
disabled | boolean | false | Disable event propagation |
fork | boolean | false | Use the current thread for event processing (blocking call) |
onError | Closure(List<Throwable>) | | Default onError handler for the current topic(s) |
onReply | Closure(EventReply) | | Default onReply handler for the current topic(s) |
timeout | Long | | Default timeout for execution time, throwing a TimeoutException and calls handlers |
* | * | | Any attributes can be written to be used by plugins through EventDefinition.othersAttributes |
events = {
//prevents any events in gorm namespace
'*' namespace:'gorm', disabled:true
//filters any events on 'testTopic' where data <= 2
testTopic filter:{it > 2}
//filters any events on 'testTopic2' where data is not a TestTopic class type
testTopic2 filter:TestTopic
//filters any events on 'testTopicX' using boolean method from service
testTopicX filter:ctx.myService.&someMethod
//only if using events-push plugin, allows client-side listener on this topic
testTopic3 browser:true
//Default Error Handling, Global Reply Handling, timeout and fork
testTopicD onError:{}, onReply:{}, timeout:1000l
testTopicD2 fork:false
//-------------- roadmap --------------
//not yet implemented: Assigning and merging definitions
//myTopic = testTopic4(filter:{i>2})
//myTopic = testTopic4(filter:{i<4})
//not yet implemented: Enabling security context for target listeners
//testTopic5 secured:true
//not yet implemented: Topic Forwarding
//testTopic6 to:'anotherTopic'
//not yet implemented: Topic Handlers
//testTopic9 onError:'anotherTopicErrors', onReply:'anotherTopicReplies'
}
Reloading in Development mode
It works.
5.5 Listening GORM events
Starting
from Grails 2, the Events Bus supports
GORM events.
GORM Listeners
To listen for GORM, simply declare listeners on the
gorm namespace using the following supported topics table :
Event Type | Target Topic |
---|
PreInsertEvent | beforeInsert |
PreUpdateEvent | beforeUpdate |
PreDeleteEvent | beforeDelete |
ValidationEvent | beforeValidate |
PostInsertEvent | afterInsert |
PostUpdateEvent | afterUpdate |
PostDeleteEvent | afterDelete |
SaveOrUpdateEvent | onSaveOrUpdate |
Same listeners behaviors apply, e.g. using EventMessage for the argument type, using wildcard topics etc.
Because listeners are called if there are
no arguments or the argument
type is assignable to current event data type, specifying
a domain class is the only required step to filter domains events.
class SomeService{
@Listener(namespace = 'gorm')
void afterInsert(Author author) {
println "after save author - $author.name"
}
@Listener(topic = 'beforeInsert', namespace = 'gorm')
void beforeInsertBook(Book book) {
println "will insert book - $book.title"
}
//Will catch everything since we don't filter on the subject by using EventMessage
@Listener(topic = 'before*', namespace = 'gorm')
void beforeEachGormEvent(EventMessage message) {
println "gorm event $message.event on domain $message.data.class"
}
}
Filtering with Events Artifact
Setting a filter through an Events artifact allows more fined control and efficient selection since it prevents events to be
propagated :
events = {
'afterInsert' namespace:'gorm', filter:Book
'afterDelete' namespace:'gorm', filter:{it.id > 5}
'afterUpdate' namespace:'gorm', filter:{it in Book || it in Author}
'beforeDelete' namespace:'gorm', disabled:true
}
GORM may generate tons of events. Consider using it wisely, combine it with routing filtering.
You can also totally disable gorm bridge by using events.gorm.disabled
configuration key.
Threading behaviors
GORM Listeners are executed in the same thread than the caller in order to reuse the current opened session, if any.
Avoid blocking logic if possible or use the listener body to call another event.
Vetoing changes
If a listener handles one of the
before* topics and returns a boolean value, it becomes part of the vetoing chain:
- Returning false will cancel the current database write
- Returning true will just let the chain continuing
class SomeService{
//veto any Book insert
@Listener(topic = 'beforeInsert', namespace = 'gorm')
boolean beforeInsertBook(Book book) {
false
}
}
5.6 Spring Beans
Plugin developpers and any crazy tweakers may need to override one or more Events Bus beans, like the
Spring Integration plugin does.
The
grailsEvents bean is also useful to inject events methods into unhandled artifacts (other than
domain,
service,
controller).
Bean Name | Type | Default Implementation | Description |
---|
grailsEvents | org.grails.plugin.platform.events .Events | org.grails.plugin.platform.events .EventsImpl | Main events gateway which contains injected methods in artifacts |
grailsEventsPublisher | org.grails.plugin.platform.events .publisher.EventsPublisher | org.grails.plugin.platform.events .publisher.DefaultsEventsPublisher | Publisher bean, triggers events. To be implemented by extensions if required (e.g. events-si) |
grailsEventsRegistry | org.grails.plugin.platform.events .registry.EventsRegistry | org.grails.plugin.platform.events .registry.DefaultsEventsRegistry | Registry bean, store listeners and route events. To be implemented by extensions if required (e.g. events-si) |
gormTopicSupport | org.grails.plugin.platform.events .dispatcher.GormTopicSupport | org.grails.plugin.platform.events .dispatcher.GormTopicSupport2X | Translates gorm events to topic names and processes veto(s). |
grailsEventsGormBridge | org.grails.plugin.platform.events .publisher.GormBridgePublisher | | Listens for GORM events and publishes to the right bus using gormTopicSupport. |
grailsTopicExecutor | org.springframework .core.task.TaskExecutor | org.springframework.scheduling .concurrent.ThreadPoolTaskExecutor | Carries on events execution. |
5.7 Securing events
To be implemented. You can still use headers or data to pass security context for instance. M4 release will bring platform-security abstraction ready for events.
5.8 Extensions
Writing extensions is one of the greatest habbits of
grails developpers. Groovy and Grails communites are like that,
pragmatic and pleasant.
Having seen the referenced beans in the previous chapter should already give you ideas to improve or customize your needs.
There are two available examples of extensions:
- events-si : This plugin overrides the publisher and registry beans in order to replace the default mechanisms with the much
more flexible Spring Integration channels and endpoints.
- events-push : This plugin registers new Listeners from your cool browsers using javascript, authorized through the
new Events DSL attributes
browser and
browserFilter
5.9 Configuration properties
Based on Platform-Core
configuration mechanism, the plugin provides the following Events-Bus related keys:
Configuration Key | Type | Default | Description |
---|
grails.plugin.platform.events.disabled | Boolean | false | Fully disable Events Bus mechanism, no events methods will be injected |
grails.plugin.platform.events.poolSize | Integer | 10 | Allow X concurrent workers to process events |
grails.plugin.platform.events.gorm.disabled | Boolean | false | Disable GORM bridge, stopping GORM events from being published |
grails.plugin.platform.events.catchFlushException | Boolean | true | Catch any GORM flushing exceptions which could be noisy specially when vetoing changes |
In addition, you can override beans values such as gormTopicSupport
beans{
gormTopicSupport {
//transform gorm Events Objects types into topics
translateTable = [
'PreInsertEvent': 'beforeInsert', 'PreUpdateEvent': 'beforeUpdate',
/*'PreLoadEvent': 'beforeLoad',*/ 'PreDeleteEvent': 'beforeDelete',
'ValidationEvent': 'beforeValidate', 'PostInsertEvent': 'afterInsert',
'PostUpdateEvent': 'afterUpdate', 'PostDeleteEvent': 'afterDelete',
/*'PostLoadEvent': 'afterLoad',*/ 'SaveOrUpdateEvent': 'onSaveOrUpdate'
]
}
}