Synchronising MERGE INTO alf event inbound queue. This fixes #1

master
Brian Rosenberger 2023-09-26 15:07:42 +02:00
parent adcefa1bba
commit 39855ae8ba
2 changed files with 49 additions and 40 deletions

View File

@ -16,6 +16,27 @@
package net.brutex.xservices.ws.impl; package net.brutex.xservices.ws.impl;
import static org.quartz.TriggerBuilder.newTrigger;
import java.io.StringReader;
import java.io.StringWriter;
import java.math.BigInteger;
import java.sql.*;
import java.time.Instant;
import java.util.Date;
import java.util.Enumeration;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Resource;
import javax.jws.WebService;
import javax.servlet.ServletContext;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.ws.WebServiceContext;
import javax.xml.ws.handler.MessageContext;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.brutex.xservices.types.*; import net.brutex.xservices.types.*;
import net.brutex.xservices.types.alfevent.ALFEventResponseType; import net.brutex.xservices.types.alfevent.ALFEventResponseType;
@ -32,31 +53,6 @@ import org.apache.tools.ant.taskdefs.Sleep;
import org.apache.tools.ant.taskdefs.email.EmailTask; import org.apache.tools.ant.taskdefs.email.EmailTask;
import org.h2.jdbcx.JdbcConnectionPool; import org.h2.jdbcx.JdbcConnectionPool;
import org.quartz.*; import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import javax.annotation.Resource;
import javax.jws.WebService;
import javax.servlet.ServletContext;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.namespace.QName;
import javax.xml.ws.WebServiceContext;
import javax.xml.ws.handler.MessageContext;
import java.io.StringReader;
import java.io.StringWriter;
import java.math.BigInteger;
import java.sql.*;
import java.time.Instant;
import java.util.Date;
import java.util.Enumeration;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import static org.quartz.TriggerBuilder.newTrigger;
/** /**
* Implements the web service * Implements the web service
@ -223,19 +219,20 @@ public class MiscServiceImpl
return BigInteger.valueOf(rows); return BigInteger.valueOf(rows);
} }
final Object serializedDbAccess = new Object();
@Override @Override
public ALFEventResponseType mergeALFEvent(ALFEventType event) throws XServicesFault { public ALFEventResponseType mergeALFEvent(ALFEventType event) throws XServicesFault {
final Instant d = Instant.now(); final Instant d = Instant.now();
final long ts = d.toEpochMilli(); final long ts = d.toEpochMilli();
MessageContext cont = context.getMessageContext();
//Get Parameters from the Servlet Context //Get Parameters from the Servlet Context
MessageContext cont = context.getMessageContext();
final ServletContext servletContext = final ServletContext servletContext =
(ServletContext) context.getMessageContext().get(MessageContext.SERVLET_CONTEXT); (ServletContext) cont.get(MessageContext.SERVLET_CONTEXT);
final EventmanagerConfiguration conf = (EventmanagerConfiguration) servletContext final EventmanagerConfiguration conf = (EventmanagerConfiguration) servletContext
.getAttribute(EventmanagerConfiguration.KEY); .getAttribute(EventmanagerConfiguration.KEY);
final JdbcConnectionPool pool = (JdbcConnectionPool) servletContext.getAttribute("mdbConnection"); final JdbcConnectionPool pool = (JdbcConnectionPool) servletContext.getAttribute("mdbConnection");
final JdbcConnectionPool fpool = (JdbcConnectionPool) servletContext.getAttribute("fdbConnection"); final JdbcConnectionPool fpool = (JdbcConnectionPool) servletContext.getAttribute("fdbConnection");
final AtomicLong egres_counter = (AtomicLong) servletContext.getAttribute("egres_counter"); final AtomicLong egres_counter = (AtomicLong) servletContext.getAttribute("egres_counter");
@ -256,14 +253,14 @@ public class MiscServiceImpl
final String mergeStatememt = "SELECT btx_id FROM OLD TABLE (MERGE INTO brutex.tbl_events " + final String mergeStatememt = "SELECT btx_id FROM OLD TABLE (MERGE INTO brutex.tbl_events " +
"KEY (btx_event_type, btx_obj_type, btx_obj_id) " + "KEY (btx_event_type, btx_obj_type, btx_obj_id) " +
"VALUES (?,?,?,?,?,?));"; "VALUES (?,?,?,?,?,?))";
final String insertAll = "INSERT INTO brutex.tbl_events_all VALUES (?,?,?,?,?,?,?)"; final String insertAll = "INSERT INTO brutex.tbl_events_all VALUES (?,?,?,?,?,?,?)";
try (Connection con = pool.getConnection()) { try (Connection con = pool.getConnection()) {
Marshaller m = JAXBContext.newInstance(ALFEventType.class).createMarshaller(); Marshaller m = JAXBContext.newInstance(ALFEventType.class).createMarshaller();
m.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE); //m.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
m.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE); m.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE);
m.setProperty(Marshaller.JAXB_ENCODING, "UTF-8"); m.setProperty(Marshaller.JAXB_ENCODING, "UTF-8");
JAXBElement<ALFEventType> e = of.createALFEventNoticeDoc(event); JAXBElement<ALFEventType> e = of.createALFEventNoticeDoc(event);
@ -280,6 +277,7 @@ public class MiscServiceImpl
sb.append("</soapenv:Body>"); sb.append("</soapenv:Body>");
sb.append("</soapenv:Envelope>"); sb.append("</soapenv:Envelope>");
con.setAutoCommit(false);
PreparedStatement prep = con.prepareStatement(mergeStatememt); PreparedStatement prep = con.prepareStatement(mergeStatememt);
prep.setString(1, eventType); prep.setString(1, eventType);
@ -287,9 +285,20 @@ public class MiscServiceImpl
prep.setString(3, objectType); prep.setString(3, objectType);
prep.setString(4, objectId); prep.setString(4, objectId);
prep.setLong(5, ts); prep.setLong(5, ts);
prep.setClob(6, new StringReader(sb.toString())); //prep.setClob(6, new StringReader(sb.toString()));
ResultSet r = prep.executeQuery(); ResultSet r = null;
prep.setString(6, sb.toString());
/* We need to synchronize MERGE INTO statements, as it does not seem
like H2 is able to execute them properly in a heavy multi-threaded
environment resulting those statements to insert with the same key
twice (and then throw KEY violation errors. */
synchronized (serializedDbAccess) {
r = prep.executeQuery();
con.commit(); con.commit();
}
String supersed_id = null; String supersed_id = null;
if (r.next()) { if (r.next()) {
supersed_id = r.getString(1); supersed_id = r.getString(1);
@ -308,11 +317,11 @@ public class MiscServiceImpl
prep.setString(4, objectId); prep.setString(4, objectId);
prep.setLong(5, ts); prep.setLong(5, ts);
prep.setString(6, supersed_id); prep.setString(6, supersed_id);
prep.setClob(7, new StringReader(sb.toString())); prep.setString(7, sb.toString());
prep.execute(); prep.execute();
con.commit(); con.commit();
con.close(); //con.close();
ingres_counter.incrementAndGet(); ingres_counter.incrementAndGet();

View File

@ -1,7 +1,7 @@
-- Create Schema for Brutex -- Create Schema for Brutex
CREATE SCHEMA IF NOT EXISTS brutex; CREATE SCHEMA IF NOT EXISTS brutex;
CREATE TABLE IF NOT EXISTS brutex.tbl_events CREATE MEMORY TABLE IF NOT EXISTS brutex.tbl_events
( (
btx_event_type VARCHAR(128) NOT NULL, btx_event_type VARCHAR(128) NOT NULL,
btx_id VARCHAR(128) NOT NULL, btx_id VARCHAR(128) NOT NULL,
@ -15,7 +15,7 @@ CREATE INDEX IF NOT EXISTS brutex.btx_idx_ts ON brutex.tbl_events (btx_timestamp
CREATE INDEX IF NOT EXISTS brutex.btx_idx_id ON brutex.tbl_events (btx_id); CREATE INDEX IF NOT EXISTS brutex.btx_idx_id ON brutex.tbl_events (btx_id);
CREATE TABLE IF NOT EXISTS brutex.tbl_events_snap CREATE MEMORY TABLE IF NOT EXISTS brutex.tbl_events_snap
( (
btx_event_type VARCHAR(128) NOT NULL, btx_event_type VARCHAR(128) NOT NULL,
btx_id VARCHAR(128) NOT NULL, btx_id VARCHAR(128) NOT NULL,
@ -28,7 +28,7 @@ CREATE TABLE IF NOT EXISTS brutex.tbl_events_snap
CREATE INDEX IF NOT EXISTS brutex.btx_idx_ts ON brutex.tbl_events_snap (btx_timestamp ASC); CREATE INDEX IF NOT EXISTS brutex.btx_idx_ts ON brutex.tbl_events_snap (btx_timestamp ASC);
CREATE TABLE IF NOT EXISTS brutex.tbl_events_errors CREATE CACHED TABLE IF NOT EXISTS brutex.tbl_events_errors
( (
btx_event_type VARCHAR(128) NOT NULL, btx_event_type VARCHAR(128) NOT NULL,
btx_id VARCHAR(128) NOT NULL, btx_id VARCHAR(128) NOT NULL,
@ -42,7 +42,7 @@ CREATE TABLE IF NOT EXISTS brutex.tbl_events_errors
CREATE INDEX IF NOT EXISTS brutex.btx_idx_ts ON brutex.tbl_events_errors (btx_timestamp ASC); CREATE INDEX IF NOT EXISTS brutex.btx_idx_ts ON brutex.tbl_events_errors (btx_timestamp ASC);
CREATE INDEX IF NOT EXISTS brutex.btx_idx_retry ON brutex.tbl_events_errors (btx_retry); CREATE INDEX IF NOT EXISTS brutex.btx_idx_retry ON brutex.tbl_events_errors (btx_retry);
CREATE TABLE IF NOT EXISTS brutex.tbl_events_all CREATE CACHED TABLE IF NOT EXISTS brutex.tbl_events_all
( (
btx_event_type VARCHAR(128) NOT NULL, btx_event_type VARCHAR(128) NOT NULL,
btx_id VARCHAR(128) NOT NULL, btx_id VARCHAR(128) NOT NULL,