View Javadoc
1   /**
2    * Copyright 2018 Internet2
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package edu.internet2.middleware.grouper.app.messaging;
18  
19  import java.util.ArrayList;
20  import java.util.Collection;
21  import java.util.Collections;
22  import java.util.HashMap;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.regex.Matcher;
26  import java.util.regex.Pattern;
27  
28  import org.apache.commons.logging.Log;
29  import org.joda.time.DateTime;
30  import org.joda.time.format.ISODateTimeFormat;
31  import org.quartz.DisallowConcurrentExecution;
32  import org.quartz.Job;
33  import org.quartz.JobExecutionContext;
34  import org.quartz.JobExecutionException;
35  
36  import com.fasterxml.jackson.databind.JsonNode;
37  import com.fasterxml.jackson.databind.node.ObjectNode;
38  
39  import edu.internet2.middleware.grouper.GrouperSession;
40  import edu.internet2.middleware.grouper.app.loader.GrouperDaemonUtils;
41  import edu.internet2.middleware.grouper.app.loader.GrouperLoaderConfig;
42  import edu.internet2.middleware.grouper.cfg.GrouperConfig;
43  import edu.internet2.middleware.grouper.util.GrouperHttpClient;
44  import edu.internet2.middleware.grouper.util.GrouperHttpMethod;
45  import edu.internet2.middleware.grouper.util.GrouperUtil;
46  import edu.internet2.middleware.grouperClient.messaging.GrouperMessage;
47  import edu.internet2.middleware.grouperClient.messaging.GrouperMessageAcknowledgeParam;
48  import edu.internet2.middleware.grouperClient.messaging.GrouperMessageAcknowledgeType;
49  import edu.internet2.middleware.grouperClient.messaging.GrouperMessageQueueParam;
50  import edu.internet2.middleware.grouperClient.messaging.GrouperMessageQueueType;
51  import edu.internet2.middleware.grouperClient.messaging.GrouperMessageReceiveParam;
52  import edu.internet2.middleware.grouperClient.messaging.GrouperMessageReceiveResult;
53  import edu.internet2.middleware.grouperClient.messaging.GrouperMessageSendParam;
54  import edu.internet2.middleware.grouperClient.messaging.GrouperMessageSystemParam;
55  import edu.internet2.middleware.grouperClient.messaging.GrouperMessagingEngine;
56  import edu.internet2.middleware.grouperClient.messaging.GrouperMessagingSystem;
57  import edu.internet2.middleware.grouperClient.util.GrouperClientUtils;
58  import org.apache.commons.lang3.StringUtils;
59  
60  /**
61   * @author vsachdeva
62   */
63  // https://spaces.at.internet2.edu/display/Grouper/Grouper+messaging+to+web+service+API
64  @DisallowConcurrentExecution
65  public class MessageConsumerDaemon implements Job {
66    
67    /**
68     * logger 
69     */
70    private static final Log LOG = GrouperUtil.getLog(MessageConsumerDaemon.class);
71  
72    /**
73     * @see Job#execute(JobExecutionContext)
74     */
75    @Override
76    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
77      
78      Pattern pattern = Pattern.compile("^grouper\\.messaging\\.([^.]+)\\.messagingSystemName$");
79      GrouperLoaderConfig grouperLoaderConfig = GrouperLoaderConfig.retrieveConfig();
80      
81      String configName = null;
82      
83      String actAsSubjectId = null;
84      String actAsSubjectSourceId = null;
85      String messagingSystemName = null;
86      String queueOrTopicName = null;
87      String routingKey = null;
88      String exchangeType = null;
89      String messageQueueType = null;
90      Integer longPollingSeconds = null;
91      Map<String, Object> queueArguments = null;
92      
93      for (String propertyName : grouperLoaderConfig.propertyNames()) {
94        Matcher matcher = pattern.matcher(propertyName);
95        if (matcher.matches()) {
96  
97          configName = matcher.group(1);
98          
99          messagingSystemName = grouperLoaderConfig.propertyValueString(propertyName);
100         if (StringUtils.isBlank(messagingSystemName)) {
101           LOG.info("No messaging system name found so not going to connect to any queue or topic.");
102           return;
103         }
104         
105         queueOrTopicName = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".queueOrTopicName");
106         routingKey = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".routingKey");
107         exchangeType = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".exchangeType");
108         messageQueueType = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".messageQueueType");
109         actAsSubjectSourceId = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".actAsSubjectSourceId");
110         actAsSubjectId = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".actAsSubjectId");
111         longPollingSeconds = grouperLoaderConfig.propertyValueInt("grouper.messaging."+configName+".longPollingSeconds", 1);
112 
113         for (int i=0;i<100;i++) {
114           String key = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".queueArgs." + i + ".key");
115           if (key == null || "".equals(key)) {
116             break;
117           }
118 
119           String value = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".queueArgs." + i + ".value");
120           if (queueArguments == null) {
121             queueArguments = new HashMap<>();
122           }
123           queueArguments.put(key, value);
124         }
125       }
126     }
127     
128     if (StringUtils.isBlank(configName)) {
129       return;
130     }
131     
132     try {
133       GrouperSession.startBySubjectIdAndSource(actAsSubjectId, actAsSubjectSourceId);
134     } catch (Exception e) {
135       LOG.error("Error occurred while starting grouper session for subjectId" + actAsSubjectId +" and source id "+actAsSubjectSourceId, e);
136       return;
137     }
138     
139     GrouperMessagingSystem grouperMessageSystem = null;
140     
141     try {
142       grouperMessageSystem = GrouperMessagingEngine.retrieveGrouperMessageSystem(messagingSystemName);
143     } catch(Exception e) {
144       LOG.error("Error occurred while retrieving grouper message system for "+messagingSystemName, e);
145       return;
146     }
147     
148     Collection<GrouperMessage> grouperMessages = null;
149     try {
150       grouperMessages = receiveMessages(messagingSystemName, queueOrTopicName, routingKey, exchangeType, messageQueueType, longPollingSeconds, queueArguments, grouperMessageSystem);
151       LOG.info("Received "+grouperMessages.size() +" message(s) from "+queueOrTopicName +" for message system: "+messagingSystemName);
152       
153       GrouperDaemonUtils.stopProcessingIfJobPaused();
154     } catch (Exception e) {
155       LOG.error("Error occurred while receiving messages from "+queueOrTopicName, e);
156       return;
157     }
158     
159     processMessages(messagingSystemName, grouperMessageSystem, messageQueueType, queueOrTopicName, grouperMessages, configName);
160 
161   }
162 
163 
164   /**
165    * @param messagingSystemName
166    * @param grouperMessageSystem
167    * @param messageQueueType
168    * @param queueTopicName
169    * @param grouperMessages
170    * @param configName
171    */
172   protected void processMessages(String messagingSystemName, GrouperMessagingSystem grouperMessageSystem,
173       String messageQueueType, String queueTopicName,
174       Collection<GrouperMessage> grouperMessages, String configName) {
175     
176     List<GrouperMessage> messagesToBeAcknowledged = new ArrayList<GrouperMessage>();
177     
178     for (GrouperMessage inputMessage: grouperMessages) {
179       GrouperDaemonUtils.stopProcessingIfJobPaused();
180 
181       String messageBody = inputMessage.getMessageBody();
182       InputMessageGrouperHeader grouperHeader = null;
183       
184       JsonNode jsonObject = null;
185       
186       
187       try { 
188         jsonObject = GrouperUtil.jsonJacksonNode(messageBody);
189         
190       } catch(Exception e) {
191         LOG.error("Error occurred while building json object for "+messageBody);
192         continue;
193       }
194       
195       JsonNode grouperHeaderJson = null;
196       try {
197         grouperHeaderJson = jsonObject.get("grouperHeader");
198       } catch (Exception e) {
199         LOG.error("Error occurred while retrieving key grouperHeader out of message body: "+messageBody);
200         continue;
201       }
202       
203       try {
204         grouperHeader = GrouperUtil.jsonConvertFrom(grouperHeaderJson, InputMessageGrouperHeader.class);
205       } catch (Exception e) {
206         LOG.error("Error occurred while building Json object for "+grouperHeaderJson);
207         continue;
208       }
209       
210       Collection<String> errors = validate(grouperHeader);
211       
212       String replyToQueueOrTopic = grouperHeader.getReplyToQueueOrTopic();
213       String replyToQueueOrTopicName = grouperHeader.getReplyToQueueOrTopicName();
214       String routingKey = grouperHeader.getReplyToRoutingKey();
215       if (errors.size() > 0) {
216         if (canReplyToErrorMessages(replyToQueueOrTopicName, replyToQueueOrTopic)) {
217           String errorJson = buildErrorResponse(errors, grouperHeader);
218           sendReplyMessage(grouperMessageSystem, grouperHeader, messagingSystemName, errorJson, routingKey);
219         } else {
220           LOG.error("Invalid message received from the queue. Errors: "+GrouperUtil.collectionToString(errors));
221         }
222         continue;
223       }
224       
225       String endpoint = grouperHeader.getEndpoint();
226       
227       String wsRequestBody = GrouperUtil.jsonJacksonGetString(jsonObject, endpoint);
228       
229       String newJson = "{ \"" + endpoint + "\" :" + wsRequestBody + "}";
230       
231       String wsBaseUrl = GrouperLoaderConfig.retrieveConfig().propertyValueString("grouper.messaging."+configName+".ws.url");
232       
233       WsResponse wsReponse = null;
234       try {        
235         wsReponse = callWebService(newJson, wsBaseUrl + grouperHeader.getHttpPath(), configName);
236         messagesToBeAcknowledged.add(inputMessage);
237       } catch (Exception e) {
238         wsReponse = new WsResponse();
239         wsReponse.setHttpStatusCode(400);
240         wsReponse.setSuccess("F");
241         wsReponse.setResultCode("EXCEPTION");
242         wsReponse.setBody("\"errorMessage\": \"" + e.getMessage() + "\"");
243         LOG.error("Error occurred while calling web service: " + e.getMessage(), e);
244       }
245       
246       // now send the response to queue/topic if specified in the message
247       if (StringUtils.isNotBlank(replyToQueueOrTopic) && StringUtils.isNotBlank(replyToQueueOrTopicName)) {
248         String messageToBeSent = buildWsReplyToMessage(wsReponse, grouperHeader);
249         sendReplyMessage(grouperMessageSystem, grouperHeader, messagingSystemName, messageToBeSent, routingKey);
250       }
251             
252     }
253     
254     acknowledge(grouperMessageSystem, messagingSystemName, messageQueueType, queueTopicName, messagesToBeAcknowledged);
255     
256   }
257   
258   /**
259    * @param grouperMessageSystem
260    * @param messagingSystemName
261    * @param messageQueueType
262    * @param queueTopicName
263    * @param messagesToBeAcknowledged
264    */
265   private void acknowledge(GrouperMessagingSystem grouperMessageSystem,
266       String messagingSystemName, String messageQueueType, String queueTopicName, Collection<GrouperMessage> messagesToBeAcknowledged) {
267     
268     GrouperMessageAcknowledgeParam acknowledgeParam = new GrouperMessageAcknowledgeParam();
269     acknowledgeParam.assignQueueName(queueTopicName);
270     acknowledgeParam.assignAcknowledgeType(GrouperMessageAcknowledgeType.mark_as_processed);
271     acknowledgeParam.assignGrouperMessages(messagesToBeAcknowledged);
272     acknowledgeParam.assignGrouperMessageSystemName(messagingSystemName);
273     
274     GrouperMessageSystemParam systemParam = new GrouperMessageSystemParam();
275     systemParam.assignMesssageSystemName(messagingSystemName);
276     acknowledgeParam.assignGrouperMessageSystemParam(systemParam);
277     
278     GrouperMessageQueueParam queueParam = new GrouperMessageQueueParam();
279     queueParam.assignQueueOrTopicName(queueTopicName);
280     queueParam.assignQueueType(GrouperMessageQueueType.valueOfIgnoreCase(messageQueueType, true));
281     acknowledgeParam.assignGrouperMessageQueueParam(queueParam);
282     
283     grouperMessageSystem.acknowledge(acknowledgeParam);
284     
285   }
286   
287   /**
288    * @param queueOrTopicName
289    * @param queueType
290    * @return
291    */
292   private boolean canReplyToErrorMessages(String queueOrTopicName, String queueType) {
293     return StringUtils.isNotBlank(queueType) && StringUtils.isNotBlank(queueOrTopicName) && GrouperMessageQueueType.valueOfIgnoreCase(queueType, false) != null;
294   }
295 
296   /**
297    * @param grouperHeader
298    * @return
299    */
300   private List<String> validate(InputMessageGrouperHeader grouperHeader) {
301     
302     List<String> errors = new ArrayList<String>();
303     
304     if (StringUtils.isBlank(grouperHeader.getMessageVersion())) {
305       errors.add("grouperHeader.messageVersion is required.");
306     }
307     
308     if (StringUtils.isBlank(grouperHeader.getTimestampInput())) {
309       errors.add("grouperHeader.timestampInput is required.");
310     }
311     
312     try {
313       DateTime.parse(grouperHeader.getTimestampInput(), ISODateTimeFormat.dateTime());
314     } catch (Exception e) {
315       errors.add("Error converting "+grouperHeader.getTimestampInput() +" to datetime using "+ISODateTimeFormat.dateTime());
316     }
317     
318     if (StringUtils.isBlank(grouperHeader.getType())) {
319       errors.add("grouperHeader.type is required.");
320     }
321     
322     if (StringUtils.isBlank(grouperHeader.getEndpoint())) {
323       errors.add("grouperHeader.endpoint is required.");
324     }
325     
326     if (StringUtils.isBlank(grouperHeader.getMessageInputUuid())) {
327       errors.add("grouperHeader.messageInputUuid is required.");
328     }
329     
330     if (StringUtils.isBlank(grouperHeader.getHttpMethod())) {
331       errors.add("grouperHeader.httpMethod is required.");
332     }
333     
334     if (StringUtils.isBlank(grouperHeader.getHttpPath())) {
335       errors.add("grouperHeader.httpPath is required.");
336     }
337     
338     if (StringUtils.isNotBlank(grouperHeader.getReplyToQueueOrTopic()) &&
339         GrouperMessageQueueType.valueOfIgnoreCase(grouperHeader.getReplyToQueueOrTopic(), false) == null) {
340       errors.add("grouperHeader.replyToQueueOrTopic can only be queue or topic.");
341     }
342   
343     return errors;
344   }
345 
346 
347   private Collection<GrouperMessage> receiveMessages(String messagingSystemName,
348       String queueOrTopicName, String routingKey, String exchangeType, String messageQueueType, Integer longPollingSeconds,
349       Map<String, Object> queueArguments, GrouperMessagingSystem grouperMessageSystem) {
350     
351     GrouperMessageReceiveParam receiveParam = new GrouperMessageReceiveParam();
352     receiveParam.assignGrouperMessageSystemName(messagingSystemName);
353     
354     GrouperMessageQueueParam queueParam = new GrouperMessageQueueParam();
355     queueParam.assignQueueOrTopicName(queueOrTopicName);
356     queueParam.assignQueueType(GrouperMessageQueueType.valueOfIgnoreCase(messageQueueType, true));
357     receiveParam.assignGrouperMessageQueueParam(queueParam);
358     
359     GrouperMessageSystemParam systemParam = new GrouperMessageSystemParam();
360     systemParam.assignMesssageSystemName(messagingSystemName);
361     
362     receiveParam.assignQueueName(queueOrTopicName);
363     
364     receiveParam.assignGrouperMessageSystemParam(systemParam);
365     receiveParam.assignLongPollMillis(longPollingSeconds*1000);
366     receiveParam.assignAutocreateObjects(true);
367     receiveParam.assignRoutingKey(routingKey);
368     receiveParam.assignExchangeType(exchangeType);
369     receiveParam.assignQueueArguments(queueArguments);
370     
371     GrouperMessageReceiveResult grouperMessageReceiveResult = grouperMessageSystem.receive(receiveParam);
372     return grouperMessageReceiveResult.getGrouperMessages();
373   } 
374   
375   /**
376    * @param jsonInput
377    * @param url
378    * @param configName
379    * @return
380    * @throws Exception
381    */
382   private WsResponse callWebService(String jsonInput, String url, String configName) throws Exception {
383       
384       GrouperHttpClientrHttpClient.html#GrouperHttpClient">GrouperHttpClient grouperHttpClient = new GrouperHttpClient();
385       
386       grouperHttpClient.assignUrl(url);
387       grouperHttpClient.assignGrouperHttpMethod(GrouperHttpMethod.post);
388 
389       GrouperLoaderConfig grouperLoaderConfig = GrouperLoaderConfig.retrieveConfig();
390       
391       String username = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".ws.username");
392       String password = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".ws.password");
393       if (StringUtils.isNotBlank(password)) {        
394         password = GrouperClientUtils.decryptFromFileIfFileExists(password, null);
395         grouperHttpClient.assignUser(username);
396         grouperHttpClient.assignPassword(password);
397       }
398       
399       String proxyUrl = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".ws.proxyUrl");
400       String proxyType = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".ws.proxyType");
401       
402       grouperHttpClient.assignProxyUrl(proxyUrl);
403       grouperHttpClient.assignProxyType(proxyType);
404 
405       String actAsSubjectSourceId = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".actAsSubjectSourceId");
406       String actAsSubjectId = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".actAsSubjectId");
407       
408       grouperHttpClient.addHeader("X-Grouper-actAsSourceId", actAsSubjectSourceId);
409       grouperHttpClient.addHeader("X-Grouper-actAsSubjectId", actAsSubjectId);
410       
411       grouperHttpClient.addHeader("Connection", "close");
412       
413       grouperHttpClient.assignBody(jsonInput);
414 
415       grouperHttpClient.addHeader("Content-type", "text/x-json; charset=UTF-8");
416 
417       grouperHttpClient.executeRequest();
418       
419       String response = grouperHttpClient.getResponseBody();
420       
421       WsResponseer/app/messaging/WsResponse.html#WsResponse">WsResponse wsResponse = new WsResponse();
422       wsResponse.setBody(response);
423       wsResponse.setHttpStatusCode(grouperHttpClient.getResponseCode());
424       //make sure a request came back
425       String successString = grouperHttpClient.getResponseHeaders().get("X-Grouper-success");
426       wsResponse.setSuccess(successString);
427       String resultCode = grouperHttpClient.getResponseHeaders().get("X-Grouper-resultCode");
428       wsResponse.setResultCode(resultCode);
429       
430       String resultCode2 = grouperHttpClient.getResponseHeaders().get("X-Grouper-resultCode2");
431       wsResponse.setResultCode2(resultCode2);
432       
433       return wsResponse;
434   }
435   
436   /**
437    * @param errors
438    * @param inputGrouperHeader
439    * @return
440    */
441   private String buildErrorResponse(Collection<String> errors, InputMessageGrouperHeader inputGrouperHeader) {
442     
443     String timestampOutput = new DateTime().toString(ISODateTimeFormat.dateTime());
444     
445     OutputMessageGrouperHeaderutputMessageGrouperHeader.html#OutputMessageGrouperHeader">OutputMessageGrouperHeader outputHeader = new OutputMessageGrouperHeader();
446     outputHeader.setMessageVersion(inputGrouperHeader.getMessageVersion());
447     outputHeader.setTimestampInput(inputGrouperHeader.getTimestampInput());
448     outputHeader.setTimestampOutput(timestampOutput);
449     outputHeader.setType("grouperMessagingFromWebService");
450     outputHeader.setEndpoint(inputGrouperHeader.getEndpoint());
451     outputHeader.setMessageInputUuid(inputGrouperHeader.getMessageInputUuid());
452     
453     outputHeader.setHttpHeaderXGrouperSuccess("F");
454     outputHeader.setHttpResponseCode(400);
455     outputHeader.setHttpHeaderXGrouperResultCode2("NONE");
456     outputHeader.setHttpHeaderXGrouperResultCode("ERROR");
457 
458     ObjectNode objectNode = GrouperUtil.jsonConvertFromObjectToObjectNode(outputHeader);
459     
460     renameKeys(objectNode);
461 
462     String header = GrouperUtil.jsonConvertTo(objectNode, false);
463     
464     String errorMessages = GrouperUtil.jsonConvertTo(errors, false);
465     
466     String finalOuput = " { \"grouperHeader\":  "+header+", \"errors\": " + errorMessages + " }" ;
467         
468     return finalOuput;
469     
470   }
471   
472   private void renameKeys(ObjectNode jsonObject) {
473     
474     jsonObject.put("httpHeader_X-Grouper-resultCode", jsonObject.get("httpHeaderXGrouperResultCode"));
475     jsonObject.put("httpHeader_X-Grouper-success", jsonObject.get("httpHeaderXGrouperSuccess"));
476     jsonObject.put("httpHeader_X-Grouper-resultCode2", jsonObject.get("httpHeaderXGrouperResultCode2"));
477     
478     jsonObject.remove("httpHeaderXGrouperResultCode");
479     jsonObject.remove("httpHeaderXGrouperSuccess");
480     jsonObject.remove("httpHeaderXGrouperResultCode2");
481     
482     
483   }
484   
485   /**
486    * @param wsResponse
487    * @param inputGrouperHeader
488    * @return
489    */
490   private String buildWsReplyToMessage(WsResponse wsResponse, InputMessageGrouperHeader inputGrouperHeader) {
491     
492     String timestampOutput = new DateTime().toString(ISODateTimeFormat.dateTime());
493     
494     OutputMessageGrouperHeaderutputMessageGrouperHeader.html#OutputMessageGrouperHeader">OutputMessageGrouperHeader outputHeader = new OutputMessageGrouperHeader();
495     outputHeader.setMessageVersion(inputGrouperHeader.getMessageVersion());
496     outputHeader.setTimestampInput(inputGrouperHeader.getTimestampInput());
497     outputHeader.setTimestampOutput(timestampOutput);
498     outputHeader.setType("grouperMessagingFromWebService");
499     outputHeader.setEndpoint(inputGrouperHeader.getEndpoint());
500     outputHeader.setMessageInputUuid(inputGrouperHeader.getMessageInputUuid());
501     outputHeader.setHttpResponseCode(wsResponse.getHttpStatusCode());
502     outputHeader.setHttpHeaderXGrouperResultCode(wsResponse.getResultCode());
503     outputHeader.setHttpHeaderXGrouperSuccess(wsResponse.getSuccess());
504     outputHeader.setHttpHeaderXGrouperResultCode2(wsResponse.getResultCode2());
505 
506     ObjectNode objectNode = GrouperUtil.jsonConvertFromObjectToObjectNode(outputHeader);
507     renameKeys(objectNode);
508     
509     String header = GrouperUtil.jsonConvertTo(objectNode, false);
510     
511     String finalOuput = " { \"grouperHeader\":  "+header+", " + wsResponse.getBody() + " }" ;
512         
513     return finalOuput;
514     
515   }
516 
517   /**
518    * @param grouperMessagingSystem
519    * @param inputGrouperHeader
520    * @param messagingSystemName
521    * @param finalOuput
522    */
523   private void sendReplyMessage(GrouperMessagingSystem grouperMessagingSystem,
524       InputMessageGrouperHeader inputGrouperHeader, String messagingSystemName,
525       String finalOuput, String routingKey) {
526     GrouperMessageSendParam sendParam = new GrouperMessageSendParam();
527     sendParam.assignAutocreateObjects(true);
528     
529     GrouperMessageQueueParam queueParam = new GrouperMessageQueueParam();
530     queueParam.assignQueueOrTopicName(inputGrouperHeader.getReplyToQueueOrTopicName());
531     queueParam.assignQueueType(GrouperMessageQueueType.valueOfIgnoreCase(inputGrouperHeader.getReplyToQueueOrTopic(), true));
532     sendParam.assignGrouperMessageQueueParam(queueParam);
533     
534     sendParam.assignGrouperMessageSystemName(messagingSystemName);
535     sendParam.assignQueueOrTopicName(inputGrouperHeader.getReplyToQueueOrTopicName());
536     sendParam.assignQueueType(GrouperMessageQueueType.valueOfIgnoreCase(inputGrouperHeader.getReplyToQueueOrTopic(), true));
537     
538     GrouperMessageSystemParam systemParam = new GrouperMessageSystemParam();
539     systemParam.assignAutocreateObjects(true);
540     systemParam.assignMesssageSystemName(messagingSystemName);
541     sendParam.assignGrouperMessageSystemParam(systemParam);
542     
543     sendParam.assignMessageBodies(Collections.singleton(finalOuput));
544     sendParam.assignRoutingKey(routingKey);
545     sendParam.assignExchangeType(inputGrouperHeader.getReplyToExchangeType());
546     sendParam.assignQueueArguments(inputGrouperHeader.getReplyToQueueArguments());
547     try {
548       grouperMessagingSystem.send(sendParam);
549     } catch (Exception e) {
550       LOG.error("Error occurred while sending reply message "+ inputGrouperHeader.getMessageInputUuid()+" to "+inputGrouperHeader.getReplyToQueueOrTopicName(), e);
551     }
552   }
553 
554 }