您现在的位置是:亿华云 > 数据库

面试官:Semaphore在项目中有使用过吗?

亿华云2025-10-09 12:50:58【数据库】5人已围观

简介Semaphore它就是我们之前在讲源码的时候提到的信号量,下面看下它的构造函数public Semaphore(int permits) {sync = new NonfairSync(permit

Semaphore

它就是面试我们之前在讲源码的时候提到的信号量,下面看下它的构造函数

public Semaphore(int permits) {

sync = new NonfairSync(permits);

}

public Semaphore(int permits, boolean fair) {

sync = fair ? new FairSync(permits) : new NonfairSync(permits);

}

​从构造函数可以看出,它可以传入指定数量的官S过资源和指定公平和非公平锁,公平和非公平就不多阐述了。

我们重点关注的项目是acquire()和release(), 这两个方法字面意思很好理解, Semaphore往往用于资源有限的场景,比如我们需要限制某个操作的使用线程数量。下面通过例子感受一下。面试

public class SemaphoreTest {

public static final class Task implements Runnable {

private int num;

private Semaphore semaphore;

public Task(int num,官S过 Semaphore semaphore) {

this.num = num;

this.semaphore = semaphore;

}

@Override

public void run() {

try {

// 获取

semaphore.acquire();

System.out.println(String.format("num: %d, 剩余%d个资源, 还有%d个线程在等待", num, semaphore.availablePermits(), semaphore.getQueueLength()));

System.out.println(System.currentTimeMillis());

Thread.sleep(3000);

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

// 释放

System.out.println("释放资源");

semaphore.release();

}

}

}

public static void main(String[] args) {

Semaphore semaphore = new Semaphore(2);

IntStream.range(0, 20).forEach(i -> new Thread(new Task(i, semaphore)).start());

}

}

实际输出:

num: 1, 剩余0个资源, 还有0个线程在等待

1657591518171

num: 0, 剩余1个资源, 还有0个线程在等待

1657591518172

释放资源

....

释放资源

num: 18, 剩余0个资源, 还有1个线程在等待

1657591545235

num: 19, 剩余0个资源, 还有0个线程在等待

1657591545236

释放资源

释放资源

进程已结束,退出代码0

源码剖析

​我们重点看下acquire()源码实现。

从这个信号量获取一个许可,项目阻塞直到有一个可用,使用或者线程被中断。面试获得一个许可,官S过如果一个可用并立即返回,项目将可用许可的使用数量减少一个​。

public void acquire() throws InterruptedException {

sync.acquireSharedInterruptibly(1);

}

重点是面试这个sync。

// 首先它继承 AbstractQueuedSynchronizer 这个大家肯定不陌生了 就是官S过AQS

abstract static class Sync extends AbstractQueuedSynchronizer {

private static final long serialVersionUID = 1192457210091910933L;

// 初始化的时候会写入一个状态

Sync(int permits) {

setState(permits);

}

// 获取当前的状态

final int getPermits() {

return getState();

}

// 非公平方式获取信号量

final int nonfairTryAcquireShared(int acquires) {

for (;;) {

// 当前可获取的

int available = getState();

// 计算剩余数量

int remaining = available - acquires;

// 如果剩余数量大于0 就是进行cas修改

if (remaining < 0 ||

compareAndSetState(available, remaining))

return remaining;

}

}

// 释放信号量

protected final boolean tryReleaseShared(int releases) {

for (;;) {

int current = getState();

// 释放后剩余的数量

int next = current + releases;

// 如果超出就

if (next < current) // overflow

throw new Error("Maximum permit count exceeded");

if (compareAndSetState(current, next))

return true;

}

}

final void reducePermits(int reductions) {

for (;;) {

int current = getState();

int next = current - reductions;

// 超出最大限量抛异常

if (next > current) // underflow

throw new Error("Permit count underflow");

if (compareAndSetState(current, next))

return;

}

}

final int drainPermits() {

for (;;) {

int current = getState();

if (current == 0 || compareAndSetState(current, 0))

return current;

}

}

}

在构造函数中FairSync和NonfairSync他们都继承Sync。

sync = fair ?项目 new FairSync(permits) : new NonfairSync(permits);

默认情况下非公平的Semaphore会去调用Sync的nonfairTryAcquireShared。香港云服务器

static final class NonfairSync extends Sync {

private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {

super(permits);

}

protected int tryAcquireShared(int acquires) {

return nonfairTryAcquireShared(acquires);

}

}

公平的Semaphore内部实现了tryAcquireShared()。

static final class FairSync extends Sync {

private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {

super(permits);

}

protected int tryAcquireShared(int acquires) {

for (;;) {

if (hasQueuedPredecessors())

return -1;

int available = getState();

int remaining = available - acquires;

if (remaining < 0 ||

compareAndSetState(available, remaining))

return remaining;

}

}

}

下面我们再回过头看下acquire(), 内部方法acquireSharedInterruptibly是AQS的内部方法。

public void acquire() throws InterruptedException {

sync.acquireSharedInterruptibly(1);

}

如果线程中断,直接抛异常, 如果没拿到资源就进入排队机制。

public final void acquireSharedInterruptibly(int arg)

throws InterruptedException {

if (Thread.interrupted())

throw new InterruptedException();

// 可获取的资源数小于0进入排队 这里的实现在子类,就是上边提到的

if (tryAcquireShared(arg) < 0)

doAcquireSharedInterruptibly(arg);

}

重点看下这个doAcquireSharedInterruptibly()。

private void doAcquireSharedInterruptibly(int arg)

throws InterruptedException {

// 以共享模式加入到阻塞队列 之前讲源码的时候都讲过

final Node node = addWaiter(Node.SHARED);

// 默认失败

boolean failed = true;

try {

for (;;) {

// 获取前置节点

final Node p = node.predecessor();

// 如果上一个节点就是头部节点 再次尝试获取 (原因是头部节点可能释放资源了)

if (p == head) {

int r = tryAcquireShared(arg);

// 如果获取到了 并且还有剩余资源

if (r >= 0) {

// 1. 将当前节点设置为头部节点

// 2. 判断后续节点是否是共享等待节点

// 3. 唤醒后续的节点

setHeadAndPropagate(node, r);

p.next = null; // help GC

failed = false;

return;

}

}

// 这一步主要是检查未能获取到资源的节点状态

// 如果线程需要阻塞返回true

// parkAndCheckInterrupt 如果线程中断了 抛出异常

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

throw new InterruptedException();

}

} finally {

// 如果失败 取消正在进行的获取

if (failed)

cancelAcquire(node);

}

}

shouldParkAfterFailedAcquire()的细节我们也来看下,可能有的同学不大清楚。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

int ws = pred.waitStatus;

// 只要前置节点释放锁,就会通知标识为SIGNAL(-1)状态的后续节点的线程

// 如果前置节点为SIGNAL,只需要等待其他前置节点的线程被释放,

if (ws == Node.SIGNAL)

return true;

// 这里的高防服务器判断指的是取消状态, 如果取消了就讲这个节点移除掉

if (ws > 0) {

do {

node.prev = pred = pred.prev;

} while (pred.waitStatus > 0);

pred.next = node;

} else {

// cas 更新

compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

}

return false;

}

这里的SIGNAL一类的常量,大家可以自行到源码查看,这也是细节地方。发现这段代码主要的作用就是检查节点状态,对后续节点做一些操作,这里并没有阻塞操作,下面我们看下parkAndCheckInterrupt()。

private final boolean parkAndCheckInterrupt() {

LockSupport.park(this);

return Thread.interrupted();

}

这里我们可以看到加了锁,所以阻塞发生在这。那么释放锁在哪呢?其实在release阶段。

private void unparkSuccessor(Node node) {

int ws = node.waitStatus;

if (ws < 0)

compareAndSetWaitStatus(node, ws, 0);

Node s = node.next;

if (s == null || s.waitStatus > 0) {

s = null;

for (Node t = tail; t != null && t != node; t = t.prev)

if (t.waitStatus <= 0)

s = t;

}

if (s != null)

LockSupport.unpark(s.thread);

}

​可以看到在unparkSuccessor中进行了锁的释放,这个过程发生在释放阶段。

release()相对简单一些,大家可以自己对着源码看下,实现有些类似。

结束语

其实本节带大家看源码,主要是想给大家讲下共享锁的知识,Semaphore其实就是使用了共享锁。另外AQS这个类很值得大家好好研究一下,你会发现很多的好用的类都是基于它实现,之前我们讲源码的时候也都遇到了,有兴趣可以了解一下。云南idc服务商

很赞哦!(5)