Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 69 additions & 43 deletions reevent/src/main/java/gg/xp/reevent/events/BasicEventQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;

public class BasicEventQueue implements EventQueue {


private static final Logger log = LoggerFactory.getLogger(BasicEventQueue.class);
private final Deque<Tracker> backingQueue = new ArrayDeque<>();
private final Object queueLock = new Object();
private final BlockingQueue<Tracker> backingQueue = new ArrayBlockingQueue<>(65536);
// private final Object queueLock = new Object();
private final List<Event> delayedEvents = new ArrayList<>();
private volatile boolean delayedEventsDirtyFlag;
private static final ThreadFactory delayedEventProcessorThreadFactory = new BasicThreadFactory.Builder()
Expand Down Expand Up @@ -59,25 +61,37 @@ public void push(Event event) {
if (runAt == 0 || runAt <= System.currentTimeMillis()) {
event.setEnqueuedAt(TimeUtils.now());
Tracker tracker = new Tracker(event);
synchronized (queueLock) {
if (enableCombine) {
Tracker existingTracker = backingQueue.peekLast();
Event combined;
if (existingTracker == null || (combined = existingTracker.event.combineWith(event)) == null) {
backingQueue.add(existingTracker);
}
else {
// log.info("Combined!");
backingQueue.removeLast();
backingQueue.add(new Tracker(combined));
}
}
else {
backingQueue.add(tracker);
}
// log.info("Push: {}", event);
queueLock.notifyAll();
try {
backingQueue.put(tracker);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
// old impl
// synchronized (queueLock) {
// if (enableCombine) {
// Tracker existingTracker = backingQueue.peek();
// Event combined;
// if (existingTracker == null || (combined = existingTracker.event.combineWith(event)) == null) {
// backingQueue.add(existingTracker);
// }
// else {
//// log.info("Combined!");
// try {
// backingQueue.take();
// }
// catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// backingQueue.add(new Tracker(combined));
// }
// }
// else {
// backingQueue.add(tracker);
// }
//// log.info("Push: {}", event);
// queueLock.notifyAll();
// }
}
else {
queueDelayedEvent(event);
Expand All @@ -86,49 +100,61 @@ public void push(Event event) {

@Override
public Event pull() {
synchronized (queueLock) {
while (true) {
Tracker tracker = backingQueue.poll();
if (tracker == null) {
try {
queueLock.wait(5000);
}
catch (InterruptedException e) {
// ignored
}
}
else {
tracker.markExit();
queueLock.notifyAll();
// log.info("Pull: {}", tracker.event);
return tracker.event;
}
}
Tracker tracker;
try {
tracker = backingQueue.take();
tracker.markExit();
return tracker.event;
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
// TODO: see if a lockless queue + wait only if nothing there might be better for performance.
// Old impl
// synchronized (queueLock) {
// while (true) {
// Tracker tracker = backingQueue.poll();
// if (tracker == null) {
// try {
// queueLock.wait(5000);
// }
// catch (InterruptedException e) {
// // ignored
// }
// }
// else {
// tracker.markExit();
// queueLock.notifyAll();
//// log.info("Pull: {}", tracker.event);
// return tracker.event;
// }
// }
// }
}

@Override
public int pendingSize() {
synchronized (queueLock) {
// synchronized (queueLock) {
return backingQueue.size();
}
// }
}

// Should only be used for testing, or maybe hot reloads
// TODO: problem here is that it waits for queue to be empty, but doesn't wait for current
// event to be fully processed. This probably needs to be on EventMaster.
@Override
public void waitDrain() {
synchronized (queueLock) {
// synchronized (queueLock) {
while (pendingSize() > 0) {
try {
queueLock.wait(1000);
Thread.sleep(10);
// queueLock.wait(1000);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
// }
}

private void queueDelayedEvent(Event event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public void pushEvent(Event event) {
queue.push(event);
}

// TODO: this is not thread safe
public void pushEventAndWait(Event event) {
CountDownLatch latch = new CountDownLatch(1);
drainCallback = latch::countDown;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
/**
* Example trigger pack for a duty
*/
// TODO: this is very outdated
// @CalloutRepo indicates that the system should scan for fields defined as ModifiableCallout. The user is presented
// with a UI to enable/disable them, and change the callout text under the Plugins > Callouts tab.
// The name chosen here will show in the UI.
Expand All @@ -24,11 +25,17 @@ public class Odin implements FilteredEventHandler {
// Since we have @CalloutRepo
private final ModifiableCallout<AbilityCastStart> valknut = ModifiableCallout.durationBasedCall("Valknut (Out)", "Out");

private final XivState state;

public Odin(XivState state) {
this.state = state;
}

// This comes from FilteredEventHandler. In this case, we want to restrict this set of triggers to a specific
// zone (Urth's Fount, in this case, Zone ID 394).
@Override
public boolean enabled(EventContext context) {
return context.getStateInfo().get(XivState.class).zoneIs(394);
return state.zoneIs(394);
}

// This is an actual callout. You can specify as many as you want, but you have to follow the usual Java conventions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public UCoB(XivState state) {
// Todo: add triggers for Akh morn and Morn Afah
@Override
public boolean enabled(EventContext context) {
return context.getStateInfo().get(XivStateImpl.class).zoneIs(0x2DD);
return state.dutyIs(KnownDuty.UCoB);
}

// @HandleEvents
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,25 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class EnhancedReadWriteReentrantLock {
public final class EnhancedReadWriteReentrantLock {

private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final LockAdapter readAdp;
private final LockAdapter writeAdp;

public EnhancedReadWriteReentrantLock() {
ReadWriteLock lock = new ReentrantReadWriteLock();
readAdp = new LockAdapter(lock.readLock());
writeAdp = new LockAdapter(lock.writeLock());
}

public LockAdapter read() {
return new LockAdapter(lock.readLock());
readAdp.lock();
return readAdp;
}

public LockAdapter write() {
return new LockAdapter(lock.writeLock());
writeAdp.lock();
return writeAdp;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ public class LockAdapter implements AutoCloseable {
private final Lock lock;

public LockAdapter(Lock lock) {
lock.lock();
this.lock = lock;
}

void lock() {
lock.lock();
}

@Override
public void close() {
lock.unlock();
Expand Down
Loading
Loading