1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package ca.uhn.hl7v2.protocol.impl;
29
30 import javax.jms.Connection;
31 import javax.jms.JMSException;
32 import javax.jms.Message;
33 import javax.jms.Session;
34 import javax.jms.Topic;
35 import javax.jms.TopicConnection;
36 import javax.jms.TopicPublisher;
37 import javax.jms.TopicSession;
38 import javax.jms.TopicSubscriber;
39
40 import ca.uhn.hl7v2.protocol.TransportException;
41
42
43
44
45
46
47
48 public class JMSTopicTransport extends AbstractJMSTransport {
49
50 private TopicSession mySendingSession;
51 private TopicSession myReceivingSession;
52 private TopicPublisher myPublisher;
53 private TopicSubscriber mySubscriber;
54 private final TopicConnection myConnection;
55 private final Topic myTopic;
56 private String myMessageSelector;
57
58
59
60
61
62
63 public JMSTopicTransport(TopicConnection theConnection, Topic theDestination) {
64 myConnection = theConnection;
65 myTopic = theDestination;
66 }
67
68
69
70
71
72
73
74 public JMSTopicTransport(TopicConnection theConnection, Topic theDestination, String theMessageSelector) {
75 myConnection = theConnection;
76 myTopic = theDestination;
77 myMessageSelector = theMessageSelector;
78 }
79
80
81
82
83 protected String getDestinationName() throws JMSException {
84 return myTopic.getTopicName();
85 }
86
87
88
89
90 public Connection getConnection() {
91 return myConnection;
92 }
93
94
95
96
97 protected Message getMessage() throws JMSException {
98 return mySendingSession.createTextMessage();
99 }
100
101
102
103
104 protected void sendJMS(Message theMessage) throws JMSException {
105 myPublisher.publish(theMessage);
106 }
107
108
109
110
111 protected Message receiveJMS() throws JMSException {
112 return mySubscriber.receive();
113 }
114
115 public void doConnect() throws TransportException {
116 boolean transacted = false;
117 int ackMode = Session.AUTO_ACKNOWLEDGE;
118
119 doDisconnect();
120 try {
121 mySendingSession = myConnection.createTopicSession(transacted, ackMode);
122 myPublisher = mySendingSession.createPublisher(myTopic);
123
124 myReceivingSession = myConnection.createTopicSession(transacted, ackMode);
125 mySubscriber = myReceivingSession.createSubscriber(myTopic);
126 } catch (JMSException e) {
127 throw new TransportException(e);
128 }
129 }
130
131
132
133
134 public void doDisconnect() throws TransportException {
135 try {
136 if (mySendingSession != null) {
137 mySendingSession.close();
138 }
139 if (myReceivingSession != null) {
140 myReceivingSession.close();
141 }
142 } catch (JMSException e) {
143 throw new TransportException(e);
144 }
145 }
146
147 }