parent
39855ae8ba
commit
4777c132d4
|
@ -0,0 +1,18 @@
|
|||
package net.brutex.xservices.util;
|
||||
|
||||
import lombok.NonNull;
|
||||
import net.brutex.xservices.types.alfevent.ALFEventType;
|
||||
|
||||
/**
|
||||
* Provides filtering and modification of ALF Events
|
||||
*/
|
||||
public interface ALFEventFilter {
|
||||
|
||||
/**
|
||||
* Modify or filter the alf event.
|
||||
*
|
||||
* @param event
|
||||
* @return modified event or null if it should be filtered out
|
||||
*/
|
||||
ALFEventType filter(@NonNull ALFEventType event);
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
package net.brutex.xservices.util;
|
||||
|
||||
import lombok.NonNull;
|
||||
import net.brutex.xservices.types.alfevent.ALFEventType;
|
||||
|
||||
/**
|
||||
* This example filter cuts the eventid to max of 32 characters
|
||||
*/
|
||||
public class EventIdTruncateFilter implements ALFEventFilter {
|
||||
/**
|
||||
* Modify or filter the alf event.
|
||||
*
|
||||
* @param event
|
||||
* @return modified event or null if it should be filtered out
|
||||
*/
|
||||
@Override
|
||||
public ALFEventType filter(@NonNull ALFEventType event) {
|
||||
if (event.getBase().getEventId().length() > 32) {
|
||||
event.getBase().setEventId(event.getBase().getEventId().substring(0, 32));
|
||||
}
|
||||
return event;
|
||||
}
|
||||
}
|
|
@ -13,6 +13,9 @@ import org.apache.commons.configuration2.event.EventListener;
|
|||
import org.apache.commons.configuration2.ex.ConfigurationException;
|
||||
|
||||
import javax.servlet.ServletContext;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* A configuration object for the MiscService -> Eventmanager. Implemented as singleton.
|
||||
|
@ -43,6 +46,8 @@ public class EventmanagerConfiguration {
|
|||
private String jdbc_filedb;
|
||||
private boolean isEmitterActive = true;
|
||||
private int cleaner_interval;
|
||||
private List<ALFEventFilter> alf_filter = new ArrayList<>();
|
||||
private int init_delay;
|
||||
|
||||
|
||||
public synchronized EventmanagerConfiguration refreshConfig() {
|
||||
|
@ -61,6 +66,19 @@ public class EventmanagerConfiguration {
|
|||
this.jdbc_filedb = config.getString("fdb", "jdbc:h2:mem:lockdb;DB_CLOSE_DELAY=-1;");
|
||||
this.isEmitterActive = config.getBoolean("emitter_active", true);
|
||||
this.cleaner_interval = config.getInt("cleaner_interval", 5);
|
||||
List<String> lst = config.getList(String.class,"alf_filter", new ArrayList<String>());
|
||||
|
||||
for ( String c :lst ) {
|
||||
try{
|
||||
alf_filter.add( (ALFEventFilter) Class.forName(c).getDeclaredConstructor().newInstance());
|
||||
} catch (ClassNotFoundException e) {
|
||||
log.error("Cannot find class for alf filter with name '{}'.", c);
|
||||
} catch (InvocationTargetException | InstantiationException | IllegalAccessException |
|
||||
NoSuchMethodException e) {
|
||||
log.error("Cannot instantiate class for alf filter with name '{}'.", c);
|
||||
}
|
||||
}
|
||||
this.init_delay = config.getInt("init_delay", 15);
|
||||
|
||||
|
||||
} catch (ConfigurationException e) {
|
||||
|
|
|
@ -205,7 +205,8 @@ public class MiscServiceServletContextListener implements ServletContextListener
|
|||
job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli());
|
||||
job2.getJobDataMap().put("egres_counter", egres);
|
||||
job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance());
|
||||
SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("ALFEmitter").startNow().build();
|
||||
SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("ALFEmitter")
|
||||
.startAt(Date.from(Instant.now().plusSeconds(configuration.getInit_delay()))).build();
|
||||
scheduler.scheduleJob(job2, t);
|
||||
}
|
||||
} catch (SchedulerException ex) {
|
||||
|
|
|
@ -23,10 +23,8 @@ import java.io.StringWriter;
|
|||
import java.math.BigInteger;
|
||||
import java.sql.*;
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
import java.util.Date;
|
||||
import java.util.Enumeration;
|
||||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import javax.annotation.Resource;
|
||||
import javax.jws.WebService;
|
||||
|
@ -43,6 +41,7 @@ import net.brutex.xservices.types.alfevent.ALFEventResponseType;
|
|||
import net.brutex.xservices.types.alfevent.ALFEventType;
|
||||
import net.brutex.xservices.types.alfevent.ObjectFactory;
|
||||
import net.brutex.xservices.types.ant.FileSetResource;
|
||||
import net.brutex.xservices.util.ALFEventFilter;
|
||||
import net.brutex.xservices.util.EventEmitter;
|
||||
import net.brutex.xservices.util.EventmanagerConfiguration;
|
||||
import net.brutex.xservices.util.RunTask;
|
||||
|
@ -259,6 +258,11 @@ public class MiscServiceImpl
|
|||
|
||||
try (Connection con = pool.getConnection()) {
|
||||
|
||||
List<ALFEventFilter> filter = conf.getAlf_filter();
|
||||
for(ALFEventFilter f : filter) {
|
||||
event = f.filter(event);
|
||||
}
|
||||
|
||||
Marshaller m = JAXBContext.newInstance(ALFEventType.class).createMarshaller();
|
||||
//m.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
|
||||
m.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE);
|
||||
|
|
|
@ -11,6 +11,10 @@ target.url = http://localhost:8085/eventmanager/services/ALFEventManagerDocLit
|
|||
# default: interval = 10
|
||||
interval = 30
|
||||
|
||||
# Initial delay in seconds before sending events recovered from the file based database. This should be
|
||||
# smaller than the 'interval' setting. Default is 20 seconds.
|
||||
init_delay = 20
|
||||
|
||||
# In-Memory Database (H2 in this case) to use for event processing
|
||||
# This is the JDBC connection string.
|
||||
# default: memdb = jdbc:h2:mem:lockdb;DB_CLOSE_DELAY=-1;
|
||||
|
@ -21,7 +25,6 @@ memdb = jdbc:h2:mem:lockdb;DB_CLOSE_DELAY=-1;
|
|||
# default: fdb = jdbc:h2:file:~/alf_event_db
|
||||
fdb = jdbc:h2:file:~/alf_event_db
|
||||
|
||||
|
||||
# Activate Emitter
|
||||
# Default is true, otherwise XServices will receive events, but not make any attempt to
|
||||
# forward them. Memory inbound queue will only be written to disk when the service is
|
||||
|
@ -32,3 +35,7 @@ emitter_active = true
|
|||
# storage. For trace and debugging purpose. Default value is every 5 minutes. A value of zero or smaller
|
||||
# deactivates the cleaner. Value in minutes.
|
||||
cleaner_interval = 4
|
||||
|
||||
# ALFEventFilter classes, executed in order. This can be a comma separated list of classes
|
||||
# default: empty
|
||||
alf_filter = net.brutex.xservices.util.EventIdTruncateFilter
|
Loading…
Reference in New Issue