001/* Copyright 2011-2012 the original author or authors: 002 * 003 * Marc Palmer (marc@grailsrocks.com) 004 * Stéphane Maldini (smaldini@vmware.com) 005 * 006 * Licensed under the Apache License, Version 2.0 (the "License"); 007 * you may not use this file except in compliance with the License. 008 * You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.grails.plugin.platform.events.publisher; 019 020import groovy.lang.Closure; 021import org.apache.log4j.Logger; 022import org.codehaus.groovy.grails.support.PersistenceContextInterceptor; 023import org.grails.plugin.platform.events.EventMessage; 024import org.grails.plugin.platform.events.EventReply; 025import org.grails.plugin.platform.events.registry.DefaultEventsRegistry; 026import org.springframework.beans.BeansException; 027import org.springframework.beans.factory.annotation.Autowired; 028import org.springframework.context.ApplicationContext; 029import org.springframework.context.ApplicationContextAware; 030import org.springframework.beans.factory.InitializingBean; 031import org.springframework.core.task.AsyncTaskExecutor; 032 033import java.util.Map; 034import java.util.concurrent.Callable; 035import java.util.concurrent.Future; 036import java.util.concurrent.TimeUnit; 037 038/** 039 * @author Stephane Maldini <smaldini@vmware.com> 040 * @version 1.0 041 * @file 042 * @date 16/01/12 043 * @section DESCRIPTION 044 * <p/> 045 * [Does stuff] 046 */ 047public class DefaultEventsPublisher implements EventsPublisher, ApplicationContextAware, InitializingBean { 048 049 050 private static final String EXECUTOR = "executor"; 051 private static final String DEFAULT_EXECUTOR = "grailsTopicExecutor"; 052 private static final String QUEUE_EXECUTOR = "grailsP2PExecutor"; 053 private final static Logger log = Logger.getLogger(DefaultEventsPublisher.class); 054 055 private DefaultEventsRegistry grailsEventsRegistry; 056 057 @Autowired 058 protected Map<String, AsyncTaskExecutor> taskExecutors; 059 060 private PersistenceContextInterceptor persistenceInterceptor; 061 private boolean catchFlushExceptions = false; 062 private ApplicationContext context; 063 064 public void setCatchFlushExceptions(boolean catchFlushExceptions) { 065 this.catchFlushExceptions = catchFlushExceptions; 066 } 067 068 public void setPersistenceInterceptor(PersistenceContextInterceptor persistenceInterceptor) { 069 this.persistenceInterceptor = persistenceInterceptor; 070 } 071 072 public void setGrailsEventsRegistry(DefaultEventsRegistry grailsEventsRegistry) { 073 this.grailsEventsRegistry = grailsEventsRegistry; 074 } 075 076 //API 077 078 public EventReply event(EventMessage event) { 079 DefaultEventsRegistry.InvokeResult invokeResult = grailsEventsRegistry.invokeListeners(event); 080 return new EventReply(invokeResult.getResult(), invokeResult.getInvoked()); 081 } 082 083 public EventReply eventAsync(final EventMessage event, final Map<String, Object> params) { 084 AsyncTaskExecutor taskExecutor = params != null && params.containsKey(EXECUTOR) ? 085 taskExecutors.get( params.get(EXECUTOR) ) : 086 taskExecutors.get(DEFAULT_EXECUTOR); 087 088 Future<DefaultEventsRegistry.InvokeResult> invokeResult = 089 taskExecutor.submit(new Callback(event)); 090 091 final WrappedFuture reply = new WrappedFuture(invokeResult, -1); 092 093 if (params != null) { 094 reply.setOnError((Closure)params.get(ON_ERROR)); 095 if (params.get(ON_REPLY) != null) { 096 taskExecutor.execute(new Runnable() { 097 098 public void run() { 099 try { 100 if (params.get(TIMEOUT) != null) 101 reply.get((Long) params.get(TIMEOUT), TimeUnit.MILLISECONDS); 102 else 103 reply.get(); 104 105 reply.throwError(); 106 ((Closure) params.get(ON_REPLY)).call(reply); 107 } catch (Throwable e) { 108 reply.setCallingError(e); 109 } 110 } 111 }); 112 113 } 114 } 115 return reply; 116 } 117 118 public void afterPropertiesSet() throws BeansException { 119 //try to lazy load contextInterceptor 120 if(persistenceInterceptor == null){ 121 try{ 122 persistenceInterceptor = context.getBean("persistenceInterceptor", PersistenceContextInterceptor.class); 123 }catch (BeansException ex){ 124 log.debug("No persistence context interceptor found", ex); 125 } 126 } 127 } 128 129 public void setApplicationContext(ApplicationContext context) { 130 this.context = context; 131 } 132 133 //INTERNAL 134 135 private class Callback implements Callable<DefaultEventsRegistry.InvokeResult> { 136 137 private EventMessage event; 138 139 public Callback(EventMessage event) { 140 this.event = event; 141 } 142 143 public DefaultEventsRegistry.InvokeResult call() { 144 boolean gormSession = persistenceInterceptor != null && event.isGormSession(); 145 if (gormSession) { 146 persistenceInterceptor.init(); 147 } 148 149 DefaultEventsRegistry.InvokeResult invokeResult = grailsEventsRegistry.invokeListeners(event); 150 151 if (gormSession) { 152 try { 153 persistenceInterceptor.flush(); 154 } catch (RuntimeException re) { 155 if (!catchFlushExceptions) 156 throw re; 157 } finally { 158 persistenceInterceptor.destroy(); 159 } 160 } 161 return invokeResult; 162 } 163 } 164 165 private static class WrappedFuture extends EventReply { 166 167 public WrappedFuture(Future<?> wrapped, int receivers) { 168 super(wrapped, receivers); 169 } 170 171 @Override 172 protected void initValues(Object val) { 173 DefaultEventsRegistry.InvokeResult message = (DefaultEventsRegistry.InvokeResult) val; 174 setReceivers(message.getInvoked()); 175 super.initValues(message.getResult()); 176 } 177 178 public void setCallingError(Throwable e) { 179 super.initValues(e); 180 if(getOnError() != null){ 181 getOnError().call(this); 182 } 183 } 184 185 } 186}