-
Notifications
You must be signed in to change notification settings - Fork 640
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
137 changed files
with
8,751 additions
and
21 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,4 +47,5 @@ bld/ | |
[Bb]in/ | ||
[Oo]bj/ | ||
[Ll]og/ | ||
[Ll]ogs/ | ||
[Ll]ogs/ | ||
**/org/apache/eventmesh/connector/jdbc/antlr4/autogeneration/* |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
180 changes: 180 additions & 0 deletions
180
eventmesh-common/src/main/java/org/apache/eventmesh/common/ResetCountDownLatch.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,180 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.eventmesh.common; | ||
|
||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.locks.AbstractQueuedSynchronizer; | ||
|
||
/** | ||
* ResetCountDownLatch can reset | ||
* | ||
* @see java.util.concurrent.CountDownLatch | ||
*/ | ||
public class ResetCountDownLatch { | ||
|
||
private final RestSync restSync; | ||
|
||
public ResetCountDownLatch(int count) { | ||
this.restSync = new RestSync(count); | ||
} | ||
|
||
|
||
/** | ||
* Causes the current thread to wait until the latch has counted down to zero, unless the thread is {@linkplain Thread#interrupt interrupted}. | ||
* | ||
* <p>If the current count is zero then this method returns immediately. | ||
* | ||
* <p>If the current count is greater than zero then the current | ||
* thread becomes disabled for thread scheduling purposes and lies dormant until one of two things happen: | ||
* <ul> | ||
* <li>The count reaches zero due to invocations of the | ||
* {@link #countDown} method; or | ||
* <li>Some other thread {@linkplain Thread#interrupt interrupts} | ||
* the current thread. | ||
* </ul> | ||
* | ||
* <p>If the current thread: | ||
* <ul> | ||
* <li>has its interrupted status set on entry to this method; or | ||
* <li>is {@linkplain Thread#interrupt interrupted} while waiting, | ||
* </ul> | ||
* then {@link InterruptedException} is thrown and the current thread's | ||
* interrupted status is cleared. | ||
* | ||
* @throws InterruptedException if the current thread is interrupted while waiting | ||
*/ | ||
public void await() throws InterruptedException { | ||
restSync.acquireSharedInterruptibly(1); | ||
} | ||
|
||
/** | ||
* Causes the current thread to wait until the latch has counted down to zero, unless the thread is {@linkplain Thread#interrupt interrupted}, or | ||
* the specified waiting time elapses. | ||
* | ||
* <p>If the current count is zero then this method returns immediately | ||
* with the value {@code true}. | ||
* | ||
* <p>If the current count is greater than zero then the current | ||
* thread becomes disabled for thread scheduling purposes and lies dormant until one of three things happen: | ||
* <ul> | ||
* <li>The count reaches zero due to invocations of the | ||
* {@link #countDown} method; or | ||
* <li>Some other thread {@linkplain Thread#interrupt interrupts} | ||
* the current thread; or | ||
* <li>The specified waiting time elapses. | ||
* </ul> | ||
* | ||
* <p>If the count reaches zero then the method returns with the | ||
* value {@code true}. | ||
* | ||
* <p>If the current thread: | ||
* <ul> | ||
* <li>has its interrupted status set on entry to this method; or | ||
* <li>is {@linkplain Thread#interrupt interrupted} while waiting, | ||
* </ul> | ||
* then {@link InterruptedException} is thrown and the current thread's | ||
* interrupted status is cleared. | ||
* | ||
* <p>If the specified waiting time elapses then the value {@code false} | ||
* is returned. If the time is less than or equal to zero, the method | ||
* will not wait at all. | ||
* | ||
* @param timeout the maximum time to wait | ||
* @param unit the time unit of the {@code timeout} argument | ||
* @return {@code true} if the count reached zero and {@code false} if the waiting time elapsed before the count reached zero | ||
* @throws InterruptedException if the current thread is interrupted while waiting | ||
*/ | ||
public boolean await(long timeout, TimeUnit unit) | ||
throws InterruptedException { | ||
return restSync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); | ||
} | ||
|
||
|
||
/** | ||
* Decrements the count of the latch, releasing all waiting threads if the count reaches zero. | ||
* | ||
* <p>If the current count is greater than zero then it is decremented. | ||
* If the new count is zero then all waiting threads are re-enabled for thread scheduling purposes. | ||
* | ||
* <p>If the current count equals zero then nothing happens. | ||
*/ | ||
public void countDown() { | ||
restSync.releaseShared(1); | ||
} | ||
|
||
/** | ||
* Returns the current count. | ||
* | ||
* <p>This method is typically used for debugging and testing purposes. | ||
* | ||
* @return the current count | ||
*/ | ||
public int getCount() { | ||
return restSync.getCount(); | ||
} | ||
|
||
/** | ||
* Reset the CountDownLatch | ||
*/ | ||
public void reset() { | ||
restSync.reset(); | ||
} | ||
|
||
/** | ||
* Synchronization control For ResetCountDownLatch. Uses AQS state to represent count. | ||
*/ | ||
private static final class RestSync extends AbstractQueuedSynchronizer { | ||
|
||
private final int initCount; | ||
|
||
RestSync(int count) { | ||
if (count < 0) { | ||
throw new IllegalArgumentException("count must be greater than or equal to 0"); | ||
} | ||
this.initCount = count; | ||
setState(count); | ||
} | ||
|
||
protected void reset() { | ||
setState(initCount); | ||
} | ||
|
||
int getCount() { | ||
return getState(); | ||
} | ||
|
||
@Override | ||
protected int tryAcquireShared(int acquires) { | ||
return (getState() == 0) ? 1 : -1; | ||
} | ||
|
||
@Override | ||
protected boolean tryReleaseShared(int releases) { | ||
for (; ; ) { | ||
int count = getState(); | ||
if (count == 0) { | ||
return false; | ||
} | ||
int nextCount = count - 1; | ||
if (compareAndSetState(count, nextCount)) { | ||
return nextCount == 0; | ||
} | ||
} | ||
} | ||
} | ||
} |
130 changes: 130 additions & 0 deletions
130
eventmesh-common/src/main/java/org/apache/eventmesh/common/ThreadWrapper.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.eventmesh.common; | ||
|
||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
import lombok.extern.slf4j.Slf4j; | ||
|
||
@Slf4j | ||
public abstract class ThreadWrapper implements Runnable { | ||
|
||
private final AtomicBoolean started = new AtomicBoolean(false); | ||
protected Thread thread; | ||
protected final ResetCountDownLatch waiter = new ResetCountDownLatch(1); | ||
protected volatile AtomicBoolean hasWakeup = new AtomicBoolean(false); | ||
protected boolean isDaemon = false; | ||
protected volatile boolean isRunning = false; | ||
|
||
public ThreadWrapper() { | ||
|
||
} | ||
|
||
public abstract String getThreadName(); | ||
|
||
public void start() { | ||
|
||
if (!started.compareAndSet(false, true)) { | ||
log.warn("Start thread:{} fail", getThreadName()); | ||
return; | ||
} | ||
this.thread = new Thread(this, getThreadName()); | ||
this.thread.setDaemon(isDaemon); | ||
this.thread.start(); | ||
this.isRunning = true; | ||
log.info("Start thread:{} success", getThreadName()); | ||
} | ||
|
||
public void await() { | ||
if (hasWakeup.compareAndSet(true, false)) { | ||
return; | ||
} | ||
//reset count | ||
waiter.reset(); | ||
try { | ||
waiter.await(); | ||
} catch (InterruptedException e) { | ||
log.error("Thread[{}] Interrupted", getThreadName(), e); | ||
} finally { | ||
hasWakeup.set(false); | ||
} | ||
} | ||
|
||
public void await(long timeout) { | ||
await(timeout, TimeUnit.MILLISECONDS); | ||
} | ||
|
||
public void await(long timeout, TimeUnit timeUnit) { | ||
if (hasWakeup.compareAndSet(true, false)) { | ||
return; | ||
} | ||
//reset count | ||
waiter.reset(); | ||
try { | ||
waiter.await(timeout, timeUnit == null ? TimeUnit.MILLISECONDS : timeUnit); | ||
} catch (InterruptedException e) { | ||
log.error("Thread[{}] Interrupted", getThreadName(), e); | ||
} finally { | ||
hasWakeup.set(false); | ||
} | ||
} | ||
|
||
public void wakeup() { | ||
if (hasWakeup.compareAndSet(false, true)) { | ||
waiter.countDown(); | ||
} | ||
} | ||
|
||
public void shutdownImmediately() { | ||
shutdown(true); | ||
} | ||
|
||
public void shutdown() { | ||
shutdown(false); | ||
} | ||
|
||
private void shutdown(final boolean interruptThread) { | ||
if (!started.compareAndSet(true, false)) { | ||
return; | ||
} | ||
this.isRunning = false; | ||
//wakeup the thread to run | ||
wakeup(); | ||
|
||
try { | ||
if (interruptThread) { | ||
this.thread.interrupt(); | ||
} | ||
if (!this.isDaemon) { | ||
//wait main thread to wait this thread finish | ||
this.thread.join(TimeUnit.SECONDS.toMillis(60)); | ||
} | ||
} catch (InterruptedException e) { | ||
log.error("Thread[{}] Interrupted", getThreadName(), e); | ||
} | ||
} | ||
|
||
public void setDaemon(boolean daemon) { | ||
isDaemon = daemon; | ||
} | ||
|
||
public boolean isStated() { | ||
return this.started.get(); | ||
} | ||
} |
Oops, something went wrong.