diff --git a/src/main/java/net/brutex/xservices/util/LockCleanerJob.java b/src/main/java/net/brutex/xservices/util/LockCleanerJob.java new file mode 100644 index 0000000..cd1df01 --- /dev/null +++ b/src/main/java/net/brutex/xservices/util/LockCleanerJob.java @@ -0,0 +1,51 @@ +package net.brutex.xservices.util; + +import java.sql.*; +import java.time.Instant; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.extern.slf4j.Slf4j; +import org.h2.jdbcx.JdbcConnectionPool; +import org.quartz.*; + +@Slf4j +@DisallowConcurrentExecution +public class LockCleanerJob implements Job, InterruptableJob { + + private final AtomicBoolean isInterrupted = new AtomicBoolean(false); + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + log.debug("LockCleaner is executing now."); + final Instant d = Instant.now(); + final long ts = d.toEpochMilli(); + final JdbcConnectionPool mpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("mdbConnection"); + final String deleteMemTable = "DELETE FROM brutex.tbl_lock where btx_validity < " + (ts-5000); + + try (Connection mcon = mpool.getConnection()){ + Statement stmt = mcon.createStatement(); + stmt.execute(deleteMemTable); + int count = stmt.getUpdateCount(); + log.debug("LockCleaner deleted '{}' stale locks.", count); + + } catch (SQLException e) { + log.error("Exception in SQL execution: {}", e.getMessage()); + throw new JobExecutionException(e); + } + } + + + /** + *

+ * Called by the {@link Scheduler} when a user + * interrupts the Job. + *

+ * + * @throws UnableToInterruptJobException if there is an exception while interrupting the job. + */ + @Override + public synchronized void interrupt() throws UnableToInterruptJobException { + isInterrupted.set(true); + log.warn("LockCleanerJob received an interrupt."); + Thread.currentThread().interrupt(); + } +} diff --git a/src/main/java/net/brutex/xservices/util/MiscServiceServletContextListener.java b/src/main/java/net/brutex/xservices/util/MiscServiceServletContextListener.java index f617250..aa53901 100644 --- a/src/main/java/net/brutex/xservices/util/MiscServiceServletContextListener.java +++ b/src/main/java/net/brutex/xservices/util/MiscServiceServletContextListener.java @@ -185,6 +185,8 @@ public class MiscServiceServletContextListener implements ServletContextListener if(configuration.getCleaner_interval()>0) { startEventLogCleaner((Scheduler) context.getAttribute("scheduler")); } + + startLockCleaner((Scheduler) context.getAttribute("scheduler")); } private String getLinkSQL() { @@ -235,6 +237,31 @@ public class MiscServiceServletContextListener implements ServletContextListener } } + private void startLockCleaner(Scheduler scheduler) { + try { + if (!scheduler.checkExists(JobKey.jobKey("LockCleaner"))) { + JobDetail job2 = JobBuilder.newJob(LockCleanerJob.class).withIdentity("LockCleaner").build(); + job2.getJobDataMap().put("mdbConnection", mempool); + SimpleTrigger t = + (SimpleTrigger) + newTrigger() + .withIdentity("LockCleaner") + .withSchedule( + SimpleScheduleBuilder.simpleSchedule() + .withIntervalInMinutes(2) + .repeatForever()) + .startAt( + Date.from( + Instant.now() + .plus(2, ChronoUnit.MINUTES))) + .build(); + scheduler.scheduleJob(job2, t); + } + } catch (SchedulerException ex) { + log.error("Could not start LockCleaner: {}", ex.getMessage()); + } + } + private void readConfiguration(ServletContext ctx) { /* Configure ServletContext attributes using configuration object*/ EventmanagerConfiguration c = EventmanagerConfiguration.getInstance().refreshConfig(); diff --git a/src/main/java/net/brutex/xservices/ws/MiscService.java b/src/main/java/net/brutex/xservices/ws/MiscService.java index 9777792..9b8c1be 100644 --- a/src/main/java/net/brutex/xservices/ws/MiscService.java +++ b/src/main/java/net/brutex/xservices/ws/MiscService.java @@ -72,7 +72,14 @@ public interface MiscService { @WebMethod(operationName="lock") @WSDLDocumentation("Get a lock.") public abstract BigInteger lock(@WebParam(name="id") @XmlElement(nillable = false) String id, - @WebParam(name="objectId") @XmlElement(nillable = false) String objectId) throws XServicesFault; + @WebParam(name="objectId") @XmlElement(nillable = false) String objectId, + @WebParam(name="validity") BigInteger validity) throws XServicesFault; + + @WebMethod(operationName="unlock") + @WSDLDocumentation("Remove a lock.") + public abstract Boolean unlock( @WebParam(name="id") @XmlElement(nillable = false) String id, + @WebParam(name="objectId") @XmlElement(nillable = false) String objectId) + throws XServicesFault; @WebMethod(operationName="", action = "EventNotice") @SOAPBinding(use = SOAPBinding.Use.LITERAL, style = SOAPBinding.Style.DOCUMENT, parameterStyle = SOAPBinding.ParameterStyle.BARE) diff --git a/src/main/java/net/brutex/xservices/ws/impl/MiscServiceImpl.java b/src/main/java/net/brutex/xservices/ws/impl/MiscServiceImpl.java index c9605aa..0ae08e5 100644 --- a/src/main/java/net/brutex/xservices/ws/impl/MiscServiceImpl.java +++ b/src/main/java/net/brutex/xservices/ws/impl/MiscServiceImpl.java @@ -63,6 +63,7 @@ import org.quartz.*; public class MiscServiceImpl implements MiscService { + final Object serializedDbAccess = new Object(); @Resource private WebServiceContext context; @@ -167,58 +168,83 @@ public class MiscServiceImpl return new RuntimeInfoType(); } + @Override - public BigInteger lock(String id, String objectId) throws XServicesFault { - - - final String conString = "jdbc:h2:mem:lockdb;DB_CLOSE_DELAY=10;" + - "INIT=CREATE SCHEMA IF NOT EXISTS brutex\\;" + - // "SET SCHEMA brutex\\;" + - "CREATE SEQUENCE IF NOT EXISTS brutex.btx_sequence1\\;" + - "CREATE TABLE IF NOT EXISTS brutex.tbl_lock (btx_seq BIGINT NOT NULL, btx_id VARCHAR(100) NOT NULL, btx_obj_id VARCHAR(100) NOT NULL, btx_timestamp BIGINT NOT NULL);"; - - //JdbcConnectionPool cp = JdbcConnectionPool.create(conString, "sa", ""); - //cp.setMaxConnections(1); - - Connection con = null; - long rows = 0L; - final long ts = new Date().getTime(); - try { - Class.forName("org.h2.Driver"); //Java 1.8 - con = DriverManager.getConnection(conString); - PreparedStatement prep = con.prepareStatement( - "SELECT btx_id from brutex.tbl_lock where btx_obj_id=? ORDER BY btx_seq DESC"); - prep.setString(1, objectId); - - ResultSet rs = prep.executeQuery(); - StringBuffer bf = new StringBuffer(); - while (rs.next()) { - //bf.append(rs.getString(1)); - rows++; - } - rs.close(); - - prep = con.prepareStatement("INSERT INTO brutex.tbl_lock values (NEXT VALUE FOR brutex.btx_sequence1, ?, ?, ?)"); + public Boolean unlock(String id, String objectId) throws XServicesFault { + MessageContext cont = context.getMessageContext(); + final ServletContext servletContext = + (ServletContext) cont.get(MessageContext.SERVLET_CONTEXT); + final JdbcConnectionPool pool = (JdbcConnectionPool) servletContext.getAttribute("mdbConnection"); + try(Connection con = pool.getConnection()) { + PreparedStatement prep = con.prepareStatement("DELETE FROM brutex.tbl_lock where btx_id = ? AND btx_obj_id = ?"); prep.setString(1, id); prep.setString(2, objectId); - prep.setLong(3, ts); prep.execute(); - - prep = con.prepareStatement("DELETE from brutex.tbl_lock WHERE btx_timestamp < ?"); - prep.setLong(1, ts - 10000); - prep.execute(); - prep.close(); - - con.close(); - //System.out.println(bf); - } catch (SQLException | ClassNotFoundException e) { + if(prep.getUpdateCount()>0) return Boolean.TRUE; + } catch (SQLException e) { throw new XServicesFault(e); } + return Boolean.FALSE; + } + final Object lockObj = new Object(); + @Override + public BigInteger lock(String id, String objectId, BigInteger validity) throws XServicesFault { + MessageContext cont = context.getMessageContext(); + final ServletContext servletContext = + (ServletContext) cont.get(MessageContext.SERVLET_CONTEXT); + + if(validity == null || validity.longValue()<0) validity = BigInteger.valueOf(60000*60*12); + + final JdbcConnectionPool pool = (JdbcConnectionPool) servletContext.getAttribute("mdbConnection"); + final long ts = Instant.now().toEpochMilli(); + final long validUntil = ts + validity.longValue(); + + long rows = 0L; + + try (Connection con = pool.getConnection()) { + PreparedStatement prep = + con.prepareStatement( + "SELECT btx_id from brutex.tbl_lock " + + " where btx_obj_id = ? " + + " AND btx_validity >= ? ORDER BY btx_seq asc"); + PreparedStatement prep2 = con.prepareStatement( + "INSERT INTO brutex.tbl_lock values (NEXT VALUE FOR brutex.btx_sequence1, ?, ?, ?, ?)"); + prep.setString(1, objectId); + prep.setLong(2, ts); + + prep2.setString(1, id); + prep2.setString(2, objectId); + prep2.setLong(3, ts); + prep2.setLong(4, validUntil); + + ResultSet rs; + synchronized (lockObj) { + rs = prep.executeQuery(); + if (rs.next()) { + /* lock already set and first in queue */ + String lock_id = rs.getString(1); + if (lock_id.equals(id)) { + /* return directly from here */ + return BigInteger.ZERO; + } + rows = 1; + while (rs.next()) { + /* count but not self. Using DB COUNT(*) might be faster */ + if (!rs.getString(1).equals(id)) rows++; + } + } + /* lock not found, need to insert */ + prep2.execute(); + } + + } catch (SQLException e) { + log.error("Error during 'lock' operation. SQL error: {}", e.getMessage()); + throw new XServicesFault(e); + } return BigInteger.valueOf(rows); } - final Object serializedDbAccess = new Object(); @Override public ALFEventResponseType mergeALFEvent(ALFEventType event) throws XServicesFault { final Instant d = Instant.now(); diff --git a/src/main/resources/ddl/BRTX_schema.ddl b/src/main/resources/ddl/BRTX_schema.ddl index a6833db..49e326a 100644 --- a/src/main/resources/ddl/BRTX_schema.ddl +++ b/src/main/resources/ddl/BRTX_schema.ddl @@ -54,4 +54,17 @@ CREATE CACHED TABLE IF NOT EXISTS brutex.tbl_events_all ); CREATE INDEX IF NOT EXISTS brutex.btx_idx_ssed ON brutex.tbl_events_all (btx_supersed_id); -CREATE INDEX IF NOT EXISTS brutex.btx_idx_ts ON brutex.tbl_events_all (btx_timestamp ASC); \ No newline at end of file +CREATE INDEX IF NOT EXISTS brutex.btx_idx_ts ON brutex.tbl_events_all (btx_timestamp ASC); + +-- MiscService lock operation +CREATE SEQUENCE IF NOT EXISTS brutex.btx_sequence1; +CREATE TABLE IF NOT EXISTS brutex.tbl_lock ( + btx_seq BIGINT NOT NULL, + btx_id VARCHAR(100) NOT NULL, + btx_obj_id VARCHAR(100) NOT NULL, + btx_timestamp BIGINT NOT NULL, + btx_validity BIGINT NOT NULL +); + +--CREATE INDEX IF NOT EXISTS brutex.btx_idx_seq ON brutex.tbl_lock (btx_seq ASC); +--CREATE INDEX IF NOT EXISTS brutex.btx_idx_key ON brutex.tbl_lock (btx_obj_id, btx_validity DESC);