Skip to content

Commit 7506506

Browse files
Added support for enqueuing and dequeing AQ messages as JSON.
1 parent 3a13440 commit 7506506

File tree

17 files changed

+398
-144
lines changed

17 files changed

+398
-144
lines changed

doc/src/api_manual/aq.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@ Queue Attributes
106106
.. attribute:: Queue.payload_type
107107

108108
This read-only attribute returns the object type for payloads that can be
109-
enqueued and dequeued. If using a raw queue, this returns the value None.
109+
enqueued and dequeued. If using a JSON queue, this returns the value
110+
``"JSON"``. If using a raw queue, this returns the value ``None``.
110111

111112
For consistency and compliance with the PEP 8 naming style, the name of
112113
the attribute was changed from `payloadType`. The old name will

doc/src/api_manual/connection.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,8 @@ Connection Methods
193193

194194
The ``payload_type`` parameter, if specified, is expected to be an
195195
:ref:`object type <dbobjecttype>` that identifies the type of payload the
196-
queue expects. If not specified, RAW data is enqueued and dequeued.
196+
queue expects. If the string "JSON" is specified, JSON data is enqueued and
197+
dequeued. If not specified, RAW data is enqueued and dequeued.
197198

198199
For consistency and compliance with the PEP 8 naming style, the
199200
parameter `payloadType` was renamed to `payload_type`. The old name

doc/src/release_notes.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ Thick Mode Changes
3131

3232
#) Added support for getting the message id of the AQ message which generated
3333
a notification.
34+
#) Added support for enqueuing and dequeing AQ messages as JSON.
3435
#) Added the ability to use `externalauth` as a connection parameter for
3536
standalone connections in addition to creating pools. For standalone
3637
connections, this parameter is optional.

doc/src/user_guide/aq.rst

Lines changed: 79 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,22 @@ architecture.
1818
Python-oracledb uses the updated interface for Oracle Advanced Queuing that
1919
was first introduced in cx_Oracle 7.2.
2020

21+
Starting from Oracle Database 21c, Advanced Queuing also supports the JSON
22+
payload type. To use the JSON payload type, the Oracle Client libraries must
23+
be version 21 or later.
24+
2125
There are Advanced Queuing examples in the `GitHub examples
2226
<https://github.com/oracle/python-oracledb/tree/main/samples>`__ directory.
2327

2428

2529
Creating a Queue
2630
================
2731

28-
Before being used, queues need to be created in the database, for example in
32+
Before being used, queues need to be created in the database.
33+
34+
**Using RAW Payloads**
35+
36+
Queues can be created using the RAW payload type, for example in
2937
SQL*Plus:
3038

3139
.. code-block:: sql
@@ -37,16 +45,37 @@ SQL*Plus:
3745
end;
3846
/
3947
40-
This examples creates a RAW queue suitable for sending string or raw bytes
48+
This example creates a RAW queue suitable for sending string or bytes
4149
messages.
4250

51+
**Using JSON Payloads**
52+
53+
Also, queues can be created using the JSON payload type. For example,
54+
in SQL*Plus:
55+
56+
.. code-block:: sql
57+
58+
begin
59+
dbms_aqadm.create_queue_table('JSON_QUEUE_TABLE', 'JSON');
60+
dbms_aqadm.create_queue('DEMO_JSON_QUEUE', 'JSON_QUEUE_TABLE');
61+
dbms_aqadm.start_queue('DEMO_JSON_QUEUE');
62+
end;
63+
/
64+
65+
This example creates a JSON queue suitable for sending JSON data
66+
messages.
4367

4468
Enqueuing Messages
4569
==================
4670

4771
To send messages in Python, you connect and get a :ref:`queue <queue>`. The
4872
queue can be used for enqueuing, dequeuing, or both as needed.
4973

74+
**Using RAW Payloads**
75+
76+
You can connect to the database and get the queue that was created with RAW
77+
payload type by using:
78+
5079
.. code-block:: python
5180
5281
queue = connection.queue("DEMO_RAW_QUEUE")
@@ -66,24 +95,63 @@ messages:
6695
connection.commit()
6796
6897
Since the queue sending the messages is a RAW queue, the strings in this
69-
example will be internally encoded to bytes using :attr:`Connection.encoding`
98+
example will be internally encoded to bytes using ``message.encode()``
7099
before being enqueued.
71100

101+
**Using JSON Payloads**
102+
103+
You can connect to the database and get the queue that was created with JSON
104+
payload type by using:
105+
106+
.. code-block:: python
107+
108+
queue = connection.queue("DEMO_JSON_QUEUE", "JSON")
109+
# The second argument (JSON) indicates that the queue is of JSON payload type.
110+
111+
Now the message can be enqueued using :meth:`~Queue.enqone()`.
112+
113+
.. code-block:: python
114+
115+
json_data = [
116+
[
117+
2.75,
118+
True,
119+
'Ocean Beach',
120+
b'Some bytes',
121+
{'keyA': 1.0, 'KeyB': 'Melbourne'},
122+
datetime.datetime(2022, 8, 1, 0, 0)
123+
],
124+
dict(name="John", age=30, city="New York")
125+
]
126+
for data in json_data:
127+
queue.enqone(connection.msgproperties(payload=data))
128+
connection.commit()
72129
73130
Dequeuing Messages
74131
==================
75132

76133
Dequeuing is performed similarly. To dequeue a message call the method
77-
:meth:`~Queue.deqone()` as shown. Note that if the message is expected to be a
78-
string, the bytes must be decoded using :attr:`Connection.encoding`.
134+
:meth:`~Queue.deqone()` as shown in the examples below.
135+
136+
**Using RAW Payload Type**
79137

80138
.. code-block:: python
81139
82140
queue = connection.queue("DEMO_RAW_QUEUE")
83-
msg = queue.deqOne()
141+
message = queue.deqOne()
84142
connection.commit()
85-
print(msg.payload.decode(connection.encoding))
143+
print(message.payload.decode())
144+
145+
Note that if the message is expected to be a string, the bytes must
146+
be decoded using ``message.payload.decode()``, as shown.
147+
148+
**Using JSON Payload Type**
86149

150+
.. code-block:: python
151+
152+
queue = connection.queue("DEMO_JSON_QUEUE", "JSON")
153+
message = queue.deqOne()
154+
connection.commit()
87155
88156
Using Object Queues
89157
===================
@@ -133,9 +201,9 @@ Dequeuing is done like this:
133201
book_type = connection.gettype("UDT_BOOK")
134202
queue = connection.queue("DEMO_BOOK_QUEUE", book_type)
135203
136-
msg = queue.deqone()
204+
message = queue.deqone()
137205
connection.commit()
138-
print(msg.payload.TITLE) # will print Quick Brown Fox
206+
print(message.payload.TITLE) # will print Quick Brown Fox
139207
140208
141209
Using Recipient Lists
@@ -241,8 +309,8 @@ time:
241309

242310
.. code-block:: python
243311
244-
for m in queue.deqmany(10):
245-
print(m.payload.decode(connection.encoding))
312+
for message in queue.deqmany(10):
313+
print(message.payload.decode())
246314
247315
Depending on the queue properties and the number of messages available to
248316
dequeue, this code will print out from zero to ten messages.

src/oracledb/aq.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,11 @@ def payload_type(self) -> Union[DbObjectType, None]:
175175
If using a raw queue, this returns the value None.
176176
"""
177177
if self._payload_type is None:
178-
self._payload_type = \
179-
DbObjectType._from_impl(self._impl.payload_type)
178+
if self._impl.is_json:
179+
self._payload_type = "JSON"
180+
elif self._impl.payload_type is not None:
181+
self._payload_type = \
182+
DbObjectType._from_impl(self._impl.payload_type)
180183
return self._payload_type
181184

182185
@property
@@ -488,16 +491,16 @@ def payload(self) -> Union[bytes, DbObject]:
488491
return self._impl.payload
489492

490493
@payload.setter
491-
def payload(self, value: Union[bytes, DbObject]) -> None:
494+
def payload(self, value: object) -> None:
492495
if isinstance(value, DbObject):
493496
self._impl.set_payload_object(value._impl)
497+
elif not isinstance(value, (str, bytes)):
498+
self._impl.set_payload_json(value)
494499
else:
495500
if isinstance(value, str):
496501
value_bytes = value.encode()
497502
elif isinstance(value, bytes):
498503
value_bytes = value
499-
else:
500-
raise TypeError("payload must be a DbObject, string or bytes")
501504
self._impl.set_payload_bytes(value_bytes)
502505
self._impl.payload = value
503506

src/oracledb/base_impl.pxd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,7 @@ cdef class BaseQueueImpl:
395395
readonly BaseDbObjectTypeImpl payload_type
396396
readonly BaseDeqOptionsImpl deq_options_impl
397397
readonly BaseEnqOptionsImpl enq_options_impl
398+
readonly bint is_json
398399

399400

400401
cdef class BaseDeqOptionsImpl:

src/oracledb/connection.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -603,7 +603,7 @@ def prepare(self) -> bool:
603603
"""
604604
return self.tpc_prepare()
605605

606-
def queue(self, name: str, payload_type: DbObjectType=None, *,
606+
def queue(self, name: str, payload_type: [DbObjectType, str]=None, *,
607607
payloadType: DbObjectType=None) -> Queue:
608608
"""
609609
Creates and returns a queue which is used to enqueue and dequeue
@@ -612,24 +612,29 @@ def queue(self, name: str, payload_type: DbObjectType=None, *,
612612
The name parameter is expected to be a string identifying the queue in
613613
which messages are to be enqueued or dequeued.
614614
615-
The payload_type parameter, if specified, is expected to be an object
616-
type that identifies the type of payload the queue expects. If not
617-
specified, RAW data is enqueued and dequeued.
615+
The payload_type parameter, if specified, is expected to be an
616+
object type that identifies the type of payload the queue expects.
617+
If the string "JSON" is specified, JSON data is enqueued and dequeued.
618+
If not specified, RAW data is enqueued and dequeued.
618619
"""
619620
self._verify_connected()
620621
payload_type_impl = None
622+
is_json = False
621623
if payloadType is not None:
622624
if payload_type is not None:
623625
errors._raise_err(errors.ERR_DUPLICATED_PARAMETER,
624626
deprecated_name="payloadType",
625627
new_name="payload_type")
626628
payload_type = payloadType
627629
if payload_type is not None:
628-
if not isinstance(payload_type, DbObjectType):
630+
if payload_type == "JSON":
631+
is_json = True
632+
elif not isinstance(payload_type, DbObjectType):
629633
raise TypeError("expecting DbObjectType")
630-
payload_type_impl = payload_type._impl
634+
else:
635+
payload_type_impl = payload_type._impl
631636
impl = self._impl.create_queue_impl()
632-
impl.initialize(self._impl, name, payload_type_impl)
637+
impl.initialize(self._impl, name, payload_type_impl, is_json)
633638
return Queue._from_impl(self, impl)
634639

635640
def rollback(self) -> None:

src/oracledb/impl/base/queue.pyx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ cdef class BaseQueueImpl:
4949

5050
@utils.CheckImpls("initializing a queue")
5151
def initialize(self, BaseConnImpl conn_impl, str name,
52-
BaseDbObjectImpl payload_type):
52+
BaseDbObjectImpl payload_type, bint is_json):
5353
pass
5454

5555

src/oracledb/impl/thick/connection.pyx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ cdef class ThickConnImpl(BaseConnImpl):
404404
def create_msg_props_impl(self):
405405
cdef ThickMsgPropsImpl impl
406406
impl = ThickMsgPropsImpl.__new__(ThickMsgPropsImpl)
407+
impl._conn_impl = self
407408
if dpiConn_newMsgProps(self._handle, &impl._handle) < 0:
408409
_raise_from_odpi()
409410
return impl

0 commit comments

Comments
 (0)