From 39855ae8badbbadd7cc3be33ccedbeae96dfe595 Mon Sep 17 00:00:00 2001 From: brian Date: Tue, 26 Sep 2023 15:07:42 +0200 Subject: [PATCH] Synchronising MERGE INTO alf event inbound queue. This fixes #1 --- .../xservices/ws/impl/MiscServiceImpl.java | 81 ++++++++++--------- src/main/resources/ddl/BRTX_schema.ddl | 8 +- 2 files changed, 49 insertions(+), 40 deletions(-) diff --git a/src/main/java/net/brutex/xservices/ws/impl/MiscServiceImpl.java b/src/main/java/net/brutex/xservices/ws/impl/MiscServiceImpl.java index 6420065..440c3a9 100644 --- a/src/main/java/net/brutex/xservices/ws/impl/MiscServiceImpl.java +++ b/src/main/java/net/brutex/xservices/ws/impl/MiscServiceImpl.java @@ -16,6 +16,27 @@ package net.brutex.xservices.ws.impl; +import static org.quartz.TriggerBuilder.newTrigger; + +import java.io.StringReader; +import java.io.StringWriter; +import java.math.BigInteger; +import java.sql.*; +import java.time.Instant; +import java.util.Date; +import java.util.Enumeration; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Resource; +import javax.jws.WebService; +import javax.servlet.ServletContext; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBElement; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; +import javax.xml.ws.WebServiceContext; +import javax.xml.ws.handler.MessageContext; import lombok.extern.slf4j.Slf4j; import net.brutex.xservices.types.*; import net.brutex.xservices.types.alfevent.ALFEventResponseType; @@ -32,31 +53,6 @@ import org.apache.tools.ant.taskdefs.Sleep; import org.apache.tools.ant.taskdefs.email.EmailTask; import org.h2.jdbcx.JdbcConnectionPool; import org.quartz.*; -import org.quartz.impl.StdSchedulerFactory; - -import javax.annotation.Resource; -import javax.jws.WebService; -import javax.servlet.ServletContext; -import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBElement; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Marshaller; -import javax.xml.namespace.QName; -import javax.xml.ws.WebServiceContext; -import javax.xml.ws.handler.MessageContext; -import java.io.StringReader; -import java.io.StringWriter; -import java.math.BigInteger; -import java.sql.*; -import java.time.Instant; -import java.util.Date; -import java.util.Enumeration; -import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; - -import static org.quartz.TriggerBuilder.newTrigger; - /** * Implements the web service @@ -223,19 +219,20 @@ public class MiscServiceImpl return BigInteger.valueOf(rows); } + final Object serializedDbAccess = new Object(); @Override public ALFEventResponseType mergeALFEvent(ALFEventType event) throws XServicesFault { final Instant d = Instant.now(); final long ts = d.toEpochMilli(); - MessageContext cont = context.getMessageContext(); - //Get Parameters from the Servlet Context + MessageContext cont = context.getMessageContext(); final ServletContext servletContext = - (ServletContext) context.getMessageContext().get(MessageContext.SERVLET_CONTEXT); + (ServletContext) cont.get(MessageContext.SERVLET_CONTEXT); final EventmanagerConfiguration conf = (EventmanagerConfiguration) servletContext .getAttribute(EventmanagerConfiguration.KEY); + final JdbcConnectionPool pool = (JdbcConnectionPool) servletContext.getAttribute("mdbConnection"); final JdbcConnectionPool fpool = (JdbcConnectionPool) servletContext.getAttribute("fdbConnection"); final AtomicLong egres_counter = (AtomicLong) servletContext.getAttribute("egres_counter"); @@ -256,14 +253,14 @@ public class MiscServiceImpl final String mergeStatememt = "SELECT btx_id FROM OLD TABLE (MERGE INTO brutex.tbl_events " + "KEY (btx_event_type, btx_obj_type, btx_obj_id) " + - "VALUES (?,?,?,?,?,?));"; + "VALUES (?,?,?,?,?,?))"; final String insertAll = "INSERT INTO brutex.tbl_events_all VALUES (?,?,?,?,?,?,?)"; + try (Connection con = pool.getConnection()) { - try (Connection con = pool.getConnection()) { Marshaller m = JAXBContext.newInstance(ALFEventType.class).createMarshaller(); - m.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE); + //m.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE); m.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE); m.setProperty(Marshaller.JAXB_ENCODING, "UTF-8"); JAXBElement e = of.createALFEventNoticeDoc(event); @@ -280,6 +277,7 @@ public class MiscServiceImpl sb.append(""); sb.append(""); + con.setAutoCommit(false); PreparedStatement prep = con.prepareStatement(mergeStatememt); prep.setString(1, eventType); @@ -287,9 +285,20 @@ public class MiscServiceImpl prep.setString(3, objectType); prep.setString(4, objectId); prep.setLong(5, ts); - prep.setClob(6, new StringReader(sb.toString())); - ResultSet r = prep.executeQuery(); - con.commit(); + //prep.setClob(6, new StringReader(sb.toString())); + ResultSet r = null; + prep.setString(6, sb.toString()); + + /* We need to synchronize MERGE INTO statements, as it does not seem + like H2 is able to execute them properly in a heavy multi-threaded + environment resulting those statements to insert with the same key + twice (and then throw KEY violation errors. */ + + synchronized (serializedDbAccess) { + r = prep.executeQuery(); + con.commit(); + } + String supersed_id = null; if (r.next()) { supersed_id = r.getString(1); @@ -308,11 +317,11 @@ public class MiscServiceImpl prep.setString(4, objectId); prep.setLong(5, ts); prep.setString(6, supersed_id); - prep.setClob(7, new StringReader(sb.toString())); + prep.setString(7, sb.toString()); prep.execute(); con.commit(); - con.close(); + //con.close(); ingres_counter.incrementAndGet(); diff --git a/src/main/resources/ddl/BRTX_schema.ddl b/src/main/resources/ddl/BRTX_schema.ddl index 2f1dbc1..a6833db 100644 --- a/src/main/resources/ddl/BRTX_schema.ddl +++ b/src/main/resources/ddl/BRTX_schema.ddl @@ -1,7 +1,7 @@ -- Create Schema for Brutex CREATE SCHEMA IF NOT EXISTS brutex; -CREATE TABLE IF NOT EXISTS brutex.tbl_events +CREATE MEMORY TABLE IF NOT EXISTS brutex.tbl_events ( btx_event_type VARCHAR(128) NOT NULL, btx_id VARCHAR(128) NOT NULL, @@ -15,7 +15,7 @@ CREATE INDEX IF NOT EXISTS brutex.btx_idx_ts ON brutex.tbl_events (btx_timestamp CREATE INDEX IF NOT EXISTS brutex.btx_idx_id ON brutex.tbl_events (btx_id); -CREATE TABLE IF NOT EXISTS brutex.tbl_events_snap +CREATE MEMORY TABLE IF NOT EXISTS brutex.tbl_events_snap ( btx_event_type VARCHAR(128) NOT NULL, btx_id VARCHAR(128) NOT NULL, @@ -28,7 +28,7 @@ CREATE TABLE IF NOT EXISTS brutex.tbl_events_snap CREATE INDEX IF NOT EXISTS brutex.btx_idx_ts ON brutex.tbl_events_snap (btx_timestamp ASC); -CREATE TABLE IF NOT EXISTS brutex.tbl_events_errors +CREATE CACHED TABLE IF NOT EXISTS brutex.tbl_events_errors ( btx_event_type VARCHAR(128) NOT NULL, btx_id VARCHAR(128) NOT NULL, @@ -42,7 +42,7 @@ CREATE TABLE IF NOT EXISTS brutex.tbl_events_errors CREATE INDEX IF NOT EXISTS brutex.btx_idx_ts ON brutex.tbl_events_errors (btx_timestamp ASC); CREATE INDEX IF NOT EXISTS brutex.btx_idx_retry ON brutex.tbl_events_errors (btx_retry); -CREATE TABLE IF NOT EXISTS brutex.tbl_events_all +CREATE CACHED TABLE IF NOT EXISTS brutex.tbl_events_all ( btx_event_type VARCHAR(128) NOT NULL, btx_id VARCHAR(128) NOT NULL,