Initial release of lock/ unlock operation in MiscService

master
Brian Rosenberger 2023-09-28 09:09:36 +02:00
parent 485dae7bb1
commit b2cd9fbda7
5 changed files with 168 additions and 44 deletions

View File

@ -0,0 +1,51 @@
package net.brutex.xservices.util;
import java.sql.*;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.h2.jdbcx.JdbcConnectionPool;
import org.quartz.*;
@Slf4j
@DisallowConcurrentExecution
public class LockCleanerJob implements Job, InterruptableJob {
private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
log.debug("LockCleaner is executing now.");
final Instant d = Instant.now();
final long ts = d.toEpochMilli();
final JdbcConnectionPool mpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("mdbConnection");
final String deleteMemTable = "DELETE FROM brutex.tbl_lock where btx_validity < " + (ts-5000);
try (Connection mcon = mpool.getConnection()){
Statement stmt = mcon.createStatement();
stmt.execute(deleteMemTable);
int count = stmt.getUpdateCount();
log.debug("LockCleaner deleted '{}' stale locks.", count);
} catch (SQLException e) {
log.error("Exception in SQL execution: {}", e.getMessage());
throw new JobExecutionException(e);
}
}
/**
* <p>
* Called by the <code>{@link Scheduler}</code> when a user
* interrupts the <code>Job</code>.
* </p>
*
* @throws UnableToInterruptJobException if there is an exception while interrupting the job.
*/
@Override
public synchronized void interrupt() throws UnableToInterruptJobException {
isInterrupted.set(true);
log.warn("LockCleanerJob received an interrupt.");
Thread.currentThread().interrupt();
}
}

View File

@ -185,6 +185,8 @@ public class MiscServiceServletContextListener implements ServletContextListener
if(configuration.getCleaner_interval()>0) {
startEventLogCleaner((Scheduler) context.getAttribute("scheduler"));
}
startLockCleaner((Scheduler) context.getAttribute("scheduler"));
}
private String getLinkSQL() {
@ -235,6 +237,31 @@ public class MiscServiceServletContextListener implements ServletContextListener
}
}
private void startLockCleaner(Scheduler scheduler) {
try {
if (!scheduler.checkExists(JobKey.jobKey("LockCleaner"))) {
JobDetail job2 = JobBuilder.newJob(LockCleanerJob.class).withIdentity("LockCleaner").build();
job2.getJobDataMap().put("mdbConnection", mempool);
SimpleTrigger t =
(SimpleTrigger)
newTrigger()
.withIdentity("LockCleaner")
.withSchedule(
SimpleScheduleBuilder.simpleSchedule()
.withIntervalInMinutes(2)
.repeatForever())
.startAt(
Date.from(
Instant.now()
.plus(2, ChronoUnit.MINUTES)))
.build();
scheduler.scheduleJob(job2, t);
}
} catch (SchedulerException ex) {
log.error("Could not start LockCleaner: {}", ex.getMessage());
}
}
private void readConfiguration(ServletContext ctx) {
/* Configure ServletContext attributes using configuration object*/
EventmanagerConfiguration c = EventmanagerConfiguration.getInstance().refreshConfig();

View File

@ -72,7 +72,14 @@ public interface MiscService {
@WebMethod(operationName="lock")
@WSDLDocumentation("Get a lock.")
public abstract BigInteger lock(@WebParam(name="id") @XmlElement(nillable = false) String id,
@WebParam(name="objectId") @XmlElement(nillable = false) String objectId) throws XServicesFault;
@WebParam(name="objectId") @XmlElement(nillable = false) String objectId,
@WebParam(name="validity") BigInteger validity) throws XServicesFault;
@WebMethod(operationName="unlock")
@WSDLDocumentation("Remove a lock.")
public abstract Boolean unlock( @WebParam(name="id") @XmlElement(nillable = false) String id,
@WebParam(name="objectId") @XmlElement(nillable = false) String objectId)
throws XServicesFault;
@WebMethod(operationName="", action = "EventNotice")
@SOAPBinding(use = SOAPBinding.Use.LITERAL, style = SOAPBinding.Style.DOCUMENT, parameterStyle = SOAPBinding.ParameterStyle.BARE)

View File

@ -63,6 +63,7 @@ import org.quartz.*;
public class MiscServiceImpl
implements MiscService {
final Object serializedDbAccess = new Object();
@Resource
private WebServiceContext context;
@ -167,58 +168,83 @@ public class MiscServiceImpl
return new RuntimeInfoType();
}
@Override
public BigInteger lock(String id, String objectId) throws XServicesFault {
final String conString = "jdbc:h2:mem:lockdb;DB_CLOSE_DELAY=10;" +
"INIT=CREATE SCHEMA IF NOT EXISTS brutex\\;" +
// "SET SCHEMA brutex\\;" +
"CREATE SEQUENCE IF NOT EXISTS brutex.btx_sequence1\\;" +
"CREATE TABLE IF NOT EXISTS brutex.tbl_lock (btx_seq BIGINT NOT NULL, btx_id VARCHAR(100) NOT NULL, btx_obj_id VARCHAR(100) NOT NULL, btx_timestamp BIGINT NOT NULL);";
//JdbcConnectionPool cp = JdbcConnectionPool.create(conString, "sa", "");
//cp.setMaxConnections(1);
Connection con = null;
long rows = 0L;
final long ts = new Date().getTime();
try {
Class.forName("org.h2.Driver"); //Java 1.8
con = DriverManager.getConnection(conString);
PreparedStatement prep = con.prepareStatement(
"SELECT btx_id from brutex.tbl_lock where btx_obj_id=? ORDER BY btx_seq DESC");
prep.setString(1, objectId);
ResultSet rs = prep.executeQuery();
StringBuffer bf = new StringBuffer();
while (rs.next()) {
//bf.append(rs.getString(1));
rows++;
}
rs.close();
prep = con.prepareStatement("INSERT INTO brutex.tbl_lock values (NEXT VALUE FOR brutex.btx_sequence1, ?, ?, ?)");
public Boolean unlock(String id, String objectId) throws XServicesFault {
MessageContext cont = context.getMessageContext();
final ServletContext servletContext =
(ServletContext) cont.get(MessageContext.SERVLET_CONTEXT);
final JdbcConnectionPool pool = (JdbcConnectionPool) servletContext.getAttribute("mdbConnection");
try(Connection con = pool.getConnection()) {
PreparedStatement prep = con.prepareStatement("DELETE FROM brutex.tbl_lock where btx_id = ? AND btx_obj_id = ?");
prep.setString(1, id);
prep.setString(2, objectId);
prep.setLong(3, ts);
prep.execute();
prep = con.prepareStatement("DELETE from brutex.tbl_lock WHERE btx_timestamp < ?");
prep.setLong(1, ts - 10000);
prep.execute();
prep.close();
con.close();
//System.out.println(bf);
} catch (SQLException | ClassNotFoundException e) {
if(prep.getUpdateCount()>0) return Boolean.TRUE;
} catch (SQLException e) {
throw new XServicesFault(e);
}
return Boolean.FALSE;
}
final Object lockObj = new Object();
@Override
public BigInteger lock(String id, String objectId, BigInteger validity) throws XServicesFault {
MessageContext cont = context.getMessageContext();
final ServletContext servletContext =
(ServletContext) cont.get(MessageContext.SERVLET_CONTEXT);
if(validity == null || validity.longValue()<0) validity = BigInteger.valueOf(60000*60*12);
final JdbcConnectionPool pool = (JdbcConnectionPool) servletContext.getAttribute("mdbConnection");
final long ts = Instant.now().toEpochMilli();
final long validUntil = ts + validity.longValue();
long rows = 0L;
try (Connection con = pool.getConnection()) {
PreparedStatement prep =
con.prepareStatement(
"SELECT btx_id from brutex.tbl_lock "
+ " where btx_obj_id = ? "
+ " AND btx_validity >= ? ORDER BY btx_seq asc");
PreparedStatement prep2 = con.prepareStatement(
"INSERT INTO brutex.tbl_lock values (NEXT VALUE FOR brutex.btx_sequence1, ?, ?, ?, ?)");
prep.setString(1, objectId);
prep.setLong(2, ts);
prep2.setString(1, id);
prep2.setString(2, objectId);
prep2.setLong(3, ts);
prep2.setLong(4, validUntil);
ResultSet rs;
synchronized (lockObj) {
rs = prep.executeQuery();
if (rs.next()) {
/* lock already set and first in queue */
String lock_id = rs.getString(1);
if (lock_id.equals(id)) {
/* return directly from here */
return BigInteger.ZERO;
}
rows = 1;
while (rs.next()) {
/* count but not self. Using DB COUNT(*) might be faster */
if (!rs.getString(1).equals(id)) rows++;
}
}
/* lock not found, need to insert */
prep2.execute();
}
} catch (SQLException e) {
log.error("Error during 'lock' operation. SQL error: {}", e.getMessage());
throw new XServicesFault(e);
}
return BigInteger.valueOf(rows);
}
final Object serializedDbAccess = new Object();
@Override
public ALFEventResponseType mergeALFEvent(ALFEventType event) throws XServicesFault {
final Instant d = Instant.now();

View File

@ -55,3 +55,16 @@ CREATE CACHED TABLE IF NOT EXISTS brutex.tbl_events_all
CREATE INDEX IF NOT EXISTS brutex.btx_idx_ssed ON brutex.tbl_events_all (btx_supersed_id);
CREATE INDEX IF NOT EXISTS brutex.btx_idx_ts ON brutex.tbl_events_all (btx_timestamp ASC);
-- MiscService lock operation
CREATE SEQUENCE IF NOT EXISTS brutex.btx_sequence1;
CREATE TABLE IF NOT EXISTS brutex.tbl_lock (
btx_seq BIGINT NOT NULL,
btx_id VARCHAR(100) NOT NULL,
btx_obj_id VARCHAR(100) NOT NULL,
btx_timestamp BIGINT NOT NULL,
btx_validity BIGINT NOT NULL
);
--CREATE INDEX IF NOT EXISTS brutex.btx_idx_seq ON brutex.tbl_lock (btx_seq ASC);
--CREATE INDEX IF NOT EXISTS brutex.btx_idx_key ON brutex.tbl_lock (btx_obj_id, btx_validity DESC);