001/* Copyright 2011-2012 the original author or authors:
002 *
003 *    Marc Palmer (marc@grailsrocks.com)
004 *    Stéphane Maldini (smaldini@vmware.com)
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.grails.plugin.platform.events.publisher;
019
020import groovy.lang.Closure;
021import org.apache.log4j.Logger;
022import org.codehaus.groovy.grails.support.PersistenceContextInterceptor;
023import org.grails.plugin.platform.events.EventMessage;
024import org.grails.plugin.platform.events.EventReply;
025import org.grails.plugin.platform.events.registry.DefaultEventsRegistry;
026import org.springframework.beans.BeansException;
027import org.springframework.beans.factory.annotation.Autowired;
028import org.springframework.context.ApplicationContext;
029import org.springframework.context.ApplicationContextAware;
030import org.springframework.beans.factory.InitializingBean;
031import org.springframework.core.task.AsyncTaskExecutor;
032
033import java.util.Map;
034import java.util.concurrent.Callable;
035import java.util.concurrent.Future;
036import java.util.concurrent.TimeUnit;
037
038/**
039 * @author Stephane Maldini <smaldini@vmware.com>
040 * @version 1.0
041 * @file
042 * @date 16/01/12
043 * @section DESCRIPTION
044 * <p/>
045 * [Does stuff]
046 */
047public class DefaultEventsPublisher implements EventsPublisher, ApplicationContextAware, InitializingBean {
048
049
050    private static final String EXECUTOR = "executor";
051    private static final String DEFAULT_EXECUTOR = "grailsTopicExecutor";
052    private static final String QUEUE_EXECUTOR = "grailsP2PExecutor";
053    private final static Logger log = Logger.getLogger(DefaultEventsPublisher.class);
054
055    private DefaultEventsRegistry grailsEventsRegistry;
056
057    @Autowired
058    protected Map<String, AsyncTaskExecutor> taskExecutors;
059
060    private PersistenceContextInterceptor persistenceInterceptor;
061    private boolean catchFlushExceptions = false;
062    private ApplicationContext context;
063
064    public void setCatchFlushExceptions(boolean catchFlushExceptions) {
065        this.catchFlushExceptions = catchFlushExceptions;
066    }
067
068    public void setPersistenceInterceptor(PersistenceContextInterceptor persistenceInterceptor) {
069        this.persistenceInterceptor = persistenceInterceptor;
070    }
071
072    public void setGrailsEventsRegistry(DefaultEventsRegistry grailsEventsRegistry) {
073        this.grailsEventsRegistry = grailsEventsRegistry;
074    }
075
076    //API
077
078    public EventReply event(EventMessage event) {
079        DefaultEventsRegistry.InvokeResult invokeResult = grailsEventsRegistry.invokeListeners(event);
080        return new EventReply(invokeResult.getResult(), invokeResult.getInvoked());
081    }
082
083    public EventReply eventAsync(final EventMessage event, final Map<String, Object> params) {
084        AsyncTaskExecutor taskExecutor = params != null && params.containsKey(EXECUTOR) ?
085                taskExecutors.get( params.get(EXECUTOR) ) :
086                taskExecutors.get(DEFAULT_EXECUTOR);
087
088        Future<DefaultEventsRegistry.InvokeResult> invokeResult =
089                taskExecutor.submit(new Callback(event));
090
091        final WrappedFuture reply = new WrappedFuture(invokeResult, -1);
092
093        if (params != null) {
094            reply.setOnError((Closure)params.get(ON_ERROR));
095            if (params.get(ON_REPLY) != null) {
096                taskExecutor.execute(new Runnable() {
097
098                    public void run() {
099                        try {
100                            if (params.get(TIMEOUT) != null)
101                                reply.get((Long) params.get(TIMEOUT), TimeUnit.MILLISECONDS);
102                            else
103                                reply.get();
104
105                            reply.throwError();
106                            ((Closure) params.get(ON_REPLY)).call(reply);
107                        } catch (Throwable e) {
108                            reply.setCallingError(e);
109                        }
110                    }
111                });
112
113            }
114        }
115        return reply;
116    }
117
118    public void afterPropertiesSet() throws BeansException {
119        //try to lazy load contextInterceptor
120        if(persistenceInterceptor == null){
121            try{
122                persistenceInterceptor = context.getBean("persistenceInterceptor", PersistenceContextInterceptor.class);
123            }catch (BeansException ex){
124                log.debug("No persistence context interceptor found", ex);
125            }
126        }
127    }
128
129    public void setApplicationContext(ApplicationContext context) {
130        this.context = context;
131    }
132
133    //INTERNAL
134
135    private class Callback implements Callable<DefaultEventsRegistry.InvokeResult> {
136
137        private EventMessage event;
138
139        public Callback(EventMessage event) {
140            this.event = event;
141        }
142
143        public DefaultEventsRegistry.InvokeResult call() {
144            boolean gormSession = persistenceInterceptor != null && event.isGormSession();
145            if (gormSession) {
146                persistenceInterceptor.init();
147            }
148
149            DefaultEventsRegistry.InvokeResult invokeResult = grailsEventsRegistry.invokeListeners(event);
150
151            if (gormSession) {
152                try {
153                    persistenceInterceptor.flush();
154                } catch (RuntimeException re) {
155                    if (!catchFlushExceptions)
156                        throw re;
157                } finally {
158                    persistenceInterceptor.destroy();
159                }
160            }
161            return invokeResult;
162        }
163    }
164
165    private static class WrappedFuture extends EventReply {
166
167        public WrappedFuture(Future<?> wrapped, int receivers) {
168            super(wrapped, receivers);
169        }
170
171        @Override
172        protected void initValues(Object val) {
173            DefaultEventsRegistry.InvokeResult message = (DefaultEventsRegistry.InvokeResult) val;
174            setReceivers(message.getInvoked());
175            super.initValues(message.getResult());
176        }
177
178        public void setCallingError(Throwable e) {
179            super.initValues(e);
180            if(getOnError() != null){
181                getOnError().call(this);
182            }
183        }
184
185    }
186}