Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1410,10 +1410,13 @@ private void addNotificationLogBatch(List<NotificationEvent> eventList, List<Lis

event.setEventId(nextEventId);
// Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners.
if (event.isSetEventId()) {
if (event.isSetEventId()
&& !listenerEvent
.getParameters()
.containsKey(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME)) {
listenerEvent.putParameter(
MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME,
Long.toString(event.getEventId()));
MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME,
Long.toString(event.getEventId()));
}
nextNLId++;
nextEventId++;
Expand All @@ -1431,17 +1434,19 @@ private void addNotificationLogBatch(List<NotificationEvent> eventList, List<Lis
* Process this notification by adding it to metastore DB.
*
* @param event NotificationEvent is the object written to the metastore DB.
* @param listenerEvent ListenerEvent (from which NotificationEvent was based) used only to set the
* DB_NOTIFICATION_EVENT_ID_KEY_NAME for future reference by other listeners.
* @param listenerEvent ListenerEvent (from which NotificationEvent was based) used only to set
* the DB_NOTIFICATION_EVENT_ID_KEY_NAME for future reference by other listeners.
*/
private void process(NotificationEvent event, ListenerEvent listenerEvent) throws MetaException {
event.setMessageFormat(msgEncoder.getMessageFormat());
LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(),
event.getMessage());
LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(), event.getMessage());
HMSHandler.getMSForConf(conf).addNotificationEvent(event);

// Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners.
if (event.isSetEventId()) {
if (event.isSetEventId()
&& !listenerEvent
.getParameters()
.containsKey(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME)) {
listenerEvent.putParameter(
MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME,
Long.toString(event.getEventId()));
Expand Down
1 change: 1 addition & 0 deletions itests/src/test/resources/testconfiguration.properties
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ minillap.query.files=\
cte_4.q,\
cttl.q,\
custom_udf_vectorization.q,\
db_notification_batch_insert.q,\
dynamic_partition_pruning_2.q,\
dynamic_semijoin_user_level.q,\
dynpart_cast.q,\
Expand Down
25 changes: 25 additions & 0 deletions ql/src/test/queries/clientpositive/db_notification_batch_insert.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
set hive.metastore.transactional.event.listeners=org.apache.hive.hcatalog.listener.DbNotificationListener;
set metastore.jdbc.max.batch.size=10;

set hive.stats.autogather=true;
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.stats.reliable=true;

DROP TABLE IF EXISTS repro_batch_test;

CREATE TABLE repro_batch_test (
id INT,
name STRING
)
PARTITIONED BY (part_key INT);

INSERT INTO TABLE repro_batch_test PARTITION (part_key)
SELECT
val as id,
'dummy_data' as name,
val as part_key
FROM (
SELECT (a.pos + 1) as val
FROM (SELECT posexplode(split(repeat(',', 10), ','))) a
) dummy;
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
PREHOOK: query: DROP TABLE IF EXISTS repro_batch_test
PREHOOK: type: DROPTABLE
PREHOOK: Output: database:default
POSTHOOK: query: DROP TABLE IF EXISTS repro_batch_test
POSTHOOK: type: DROPTABLE
POSTHOOK: Output: database:default
PREHOOK: query: CREATE TABLE repro_batch_test (
id INT,
name STRING
)
PARTITIONED BY (part_key INT)
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@repro_batch_test
POSTHOOK: query: CREATE TABLE repro_batch_test (
id INT,
name STRING
)
PARTITIONED BY (part_key INT)
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@repro_batch_test
PREHOOK: query: INSERT INTO TABLE repro_batch_test PARTITION (part_key)
SELECT
val as id,
'dummy_data' as name,
val as part_key
FROM (
SELECT (a.pos + 1) as val
FROM (SELECT posexplode(split(repeat(',', 10), ','))) a
) dummy
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@repro_batch_test
POSTHOOK: query: INSERT INTO TABLE repro_batch_test PARTITION (part_key)
SELECT
val as id,
'dummy_data' as name,
val as part_key
FROM (
SELECT (a.pos + 1) as val
FROM (SELECT posexplode(split(repeat(',', 10), ','))) a
) dummy
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@repro_batch_test
POSTHOOK: Output: default@repro_batch_test@part_key=1
POSTHOOK: Output: default@repro_batch_test@part_key=10
POSTHOOK: Output: default@repro_batch_test@part_key=11
POSTHOOK: Output: default@repro_batch_test@part_key=2
POSTHOOK: Output: default@repro_batch_test@part_key=3
POSTHOOK: Output: default@repro_batch_test@part_key=4
POSTHOOK: Output: default@repro_batch_test@part_key=5
POSTHOOK: Output: default@repro_batch_test@part_key=6
POSTHOOK: Output: default@repro_batch_test@part_key=7
POSTHOOK: Output: default@repro_batch_test@part_key=8
POSTHOOK: Output: default@repro_batch_test@part_key=9
POSTHOOK: Lineage: repro_batch_test PARTITION(part_key=10).id SCRIPT []
POSTHOOK: Lineage: repro_batch_test PARTITION(part_key=10).name SIMPLE []
POSTHOOK: Lineage: repro_batch_test PARTITION(part_key=11).id SCRIPT []
POSTHOOK: Lineage: repro_batch_test PARTITION(part_key=11).name SIMPLE []
POSTHOOK: Lineage: repro_batch_test PARTITION(part_key=1).id SCRIPT []
POSTHOOK: Lineage: repro_batch_test PARTITION(part_key=1).name SIMPLE []
POSTHOOK: Lineage: repro_batch_test PARTITION(part_key=2).id SCRIPT []
POSTHOOK: Lineage: repro_batch_test PARTITION(part_key=2).name SIMPLE []
POSTHOOK: Lineage: repro_batch_test PARTITION(part_key=3).id SCRIPT []
POSTHOOK: Lineage: repro_batch_test PARTITION(part_key=3).name SIMPLE []
POSTHOOK: Lineage: repro_batch_test PARTITION(part_key=4).id SCRIPT []
POSTHOOK: Lineage: repro_batch_test PARTITION(part_key=4).name SIMPLE []
POSTHOOK: Lineage: repro_batch_test PARTITION(part_key=5).id SCRIPT []
POSTHOOK: Lineage: repro_batch_test PARTITION(part_key=5).name SIMPLE []
POSTHOOK: Lineage: repro_batch_test PARTITION(part_key=6).id SCRIPT []
POSTHOOK: Lineage: repro_batch_test PARTITION(part_key=6).name SIMPLE []
POSTHOOK: Lineage: repro_batch_test PARTITION(part_key=7).id SCRIPT []
POSTHOOK: Lineage: repro_batch_test PARTITION(part_key=7).name SIMPLE []
POSTHOOK: Lineage: repro_batch_test PARTITION(part_key=8).id SCRIPT []
POSTHOOK: Lineage: repro_batch_test PARTITION(part_key=8).name SIMPLE []
POSTHOOK: Lineage: repro_batch_test PARTITION(part_key=9).id SCRIPT []
POSTHOOK: Lineage: repro_batch_test PARTITION(part_key=9).name SIMPLE []
Loading