总结JVM相关的知识
1.JVM实例对应了一个独立运行的java程序它是进程级别
2.JVM执行引擎实例则对应了属于用户运行程序的线程它是线程级别的
注: 绿色部分是线程共享的区域,白色部分是线程私有的区域
a) 程序计数器
一块较小的内存空间,它是当前线程所执行的字节码的行号指示器,字节码解释器工作时通过改变该计数器的值来选择下一条需要执行的字节码指令,分支、跳转、循环等基础功能都要依赖它来实现。每条线程都有一个独立的的程序计数器,各线程间的计数器互不影响,因此该区域是线程私有的。
由于在JVM中,多线程是通过线程轮流切换来获得CPU执行时间的,因此,在任一具体时刻,一个CPU的内核只会执行一条线程中的指令,因此,为了能够使得每个线程都在线程切换后能够恢复在切换之前的程序执行位置,每个线程都需要有自己独立的程序计数器,并且不能互相被干扰,否则就会影响到程序的正常执行次序。因此,可以这么说,程序计数器是每个线程所私有的。由于程序计数器中存储的数据所占空间的大小不会随程序的执行而发生改变,因此,对于程序计数器是不会发生内存溢出现象(OutOfMemory)的。
b) JVM栈
JVM栈中存放的是一个个的栈帧,每个栈帧对应一个被调用的方法,在栈帧中包括局部变量表(Local Variables)、操作数栈(Operand Stack)、指向当前方法所属的类的运行时常量池(运行时常量池的概念在方法区部分会谈到)的引用(Reference to runtime constant pool)、方法返回地址(Return Address)和一些额外的附加信息。当线程执行一个方法时,就会随之创建一个对应的栈帧,并将建立的栈帧压栈。当方法执行完毕之后,便会将栈帧出栈。
c) 本地方法栈
本地方法栈与Java栈的作用和原理非常相似。区别只不过是Java栈是为执行Java方法服务的,而本地方法栈则是为执行本地方法(Native Method)服务的
d) 堆
Java中的堆是用来存储对象本身的以及数组(数组引用是存放在Java栈中的)。堆是被所有线程共享的,在JVM中只有一个堆。
e) 方法区
与堆一样,是被线程共享的区域。在方法区中,存储了每个类的信息(包括类的名称、方法信息、字段信息)、静态变量、常量以及编译器编译后的代码等。
在Class文件中除了类的字段、方法、接口等描述信息外,还有一项信息是常量池,用来存储编译期间生成的字面量和符号引用。
在方法区中有一个非常重要的部分就是运行时常量池,它是每一个类或接口的常量池的运行时表示形式,在类和接口被加载到JVM后,对应的运行时常量池就被创建出来。当然并非Class文件常量池中的内容才能进入运行时常量池,在运行期间也可将新的常量放入运行时常量池中,比如String的intern方法。
在JDK8之后,方法区已经取消,方法区被一个叫MetaSpace,它和和堆合并到一起管理
jmm中的主内存、工作内存与jvm中的Java堆、栈、方法区等并不是同一个层次的内存划分,这两者基本上是没有关系的,如果两者一定要勉强对应起来,那从变量、主内存、工作内存的定义来看,主内存主要对应于Java堆中的对象实例数据部分,而工作内存则对应于虚拟机栈中的部分区域。从更低层次上说,主内存就直接对应于物理硬件的内存,而为了获取更好的运行速度,虚拟机(甚至是硬件系统本身的优化措施)可能会让工作内存优先存储于寄存器和高速缓存中,因为程序运行时主要访问读写的是工作内存。
总结java线程基础知识
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
1.几个核心参数概念:
2.注意事项
- ThreadPoolExecutor将根据 corePoolSize和 maximumPoolSize设置的边界自动调整池大小 当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。
- 如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,超过数量的任务将被拒绝。
- 如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。
- 如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。
- 在大多数情况下,核心和最大池大小仅基于构造函数来设置,不过也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。
- 并不是先加入任务就一定会先执行,假设队列大小为 4,corePoolSize为2,maximumPoolSize为6,那么当加入15个任务时, 执行的顺序类似这样:首先执行任务 1、2,然后任务3~6被放入队列。这时候队列满了,任务7、8、9、10 会被马上执行,而任务 11~15 则会抛出异常。最终顺序是:1、2、7、8、9、10、3、4、5、6。 当然这个过程是针对指定大小的
ArrayBlockingQueue<Runnable>
来说,如果是默认的LinkedBlockingQueue,因为该队列无大小限制,maximumPoolSize会无效,队列大小不受控制,会有资源耗尽的风险。 - 最多能执行多少个任务??? maximumPoolSize+队列长度,超过这个数字之后就拒绝接受任务
//简单示例
public static void main(String[] args) {
ThreadPoolExecutor executor=new ThreadPoolExecutor(
2,
6,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(4));
for(int i=0;i<15;i++){
MyRunnable myRunnable = new MyRunnable(i);
executor.execute(myRunnable);
System.out.println("线程池中线程数目:"+executor.getPoolSize()+
",队列中等待执行的任务数目:"+executor.getQueue().size()+
",已执行完别的任务数目:"+executor.getCompletedTaskCount());
}
executor.shutdown();
}
class MyRunnable implements Runnable {
private int taskNum;
public MyRunnable(int num) {
this.taskNum = num;
}
@Override
public void run() {
System.out.println("正在执行task "+taskNum);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task "+taskNum+"执行完毕");
}
}
以上代码,最大线程数为6,队列长度为4,可执行6+4=10个任务,如果执行15个任务,最后5个会被拒绝
使用ThreadFactory自定义工厂能替代默认的DefaultThreadFactory,DefaultThreadFactory默认创建出来的线程都是用户线程而非守护线程,且线程的优先级都是Thread.NORM_PRIORITY。在自定义工厂里面,我们能创建定制化的Thread,并且计数,或则限制创建Thread的数量,给每个Thread设置对应的好听的名字,或者其他的很多很多事情。
示例:
import java.util.concurrent.ThreadFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
public class MyThreadFactory implements ThreadFactory {
private int counter;
private String name;
private List<String> stats;
public MyThreadFactory(String name) {
counter = 0;
this.name = name;
stats = new ArrayList<>();
}
@Override
public Thread newThread(Runnable run) {
Thread t = new Thread(run, name + "-Thread-" + counter);
counter++;
stats.add(String.format("Created thread %d with name %s on%s\n" ,t.getId() ,t.getName() ,new Date()));
return t;
}
public String getStas() {
StringBuffer buffer = new StringBuffer();
Iterator<String> it = stats.iterator();
while(it.hasNext()) {
buffer.append(it.next());
// buffer.append("\n");
}
return buffer.toString();
}
public static void main(String[] args) {
MyThreadFactory factory = new MyThreadFactory("MyThreadFactory");
Task task = new Task();
Thread thread = null;
for(int i = 0; i < 10; i++) {
thread = factory.newThread(task);
thread.start();
}
System.out.printf("Factory stats:\n");
System.out.printf("%s\n",factory.getStas());
}
}
class Task implements Runnable{
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static final int RUNNING = -1 << COUNT_BITS;
//如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕
private static final int SHUTDOWN = 0 << COUNT_BITS;
//如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
//当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态
private static final int TERMINATED = 3 << COUNT_BITS;
Executors
创建4种线程池在java doc中,并不提倡我们直接使用ThreadPoolExecutor来创建线程池,而是使用Executors类中提供的几个静态方法来创建线程池(底层其实还是调用ThreadPoolExecutor来创建线程池),实际项目中手动创建ThreadPoolExecutor,指定参数会更好,理由是写代码的人可以更清楚线程池的运行规则,避免资源耗尽的风险
Executors各个方法的弊端:
1)
newFixedThreadPool
和newSingleThreadExecutor
主要的问题是队列长度不受控制,可能会有OOM风险 2)newCachedThreadPool
和newScheduledThreadPool
主要问题是maximumPoolSize=Integer.MAX_VALUE,可能会有OOM风险
//1. 创建一个可缓存线程池,应用中存在的线程数可以无限大
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
System.out.println("****************************newCachedThreadPool*******************************");
for(int i=0;i<4;i++)
{
final int index=i;
newCachedThreadPool.execute(new FourThreadPoolsRunnableImpl(index));
}
//2. 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待,corePoolSize和maximumPoolSize值相等
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
System.out.println("****************************newFixedThreadPool*******************************");
for(int i=0;i<4;i++)
{
final int index=i;
newFixedThreadPool.execute(new FourThreadPoolsRunnableImpl(index));
}
//3. 创建一个定长线程池,支持定时及周期性任务执行
ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(2);
System.out.println("****************************newFixedThreadPool*******************************");
for(int i=0;i<4;i++)
{
final int index=i;
//延迟三秒执行
newScheduledThreadPool.schedule(new FourThreadPoolsRunnableImpl(index),3, TimeUnit.SECONDS);
}
// dubbo的延迟暴露服务特性就是通过这种方式来实现的
//**************************************************************************************************************
private static final ScheduledExecutorService delayExportExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboServiceDelayExporter", true));
if (delay != null && delay > 0) {
delayExportExecutor.schedule(new Runnable() {
public void run() {
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
} else {
doExport();
}
//**************************************************************************************************************
//4. 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行,corePoolSize和maximumPoolSize都被设置为1
ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
System.out.println("****************************newFixedThreadPool*******************************");
for(int i=0;i<4;i++)
{
final int index=i;
newSingleThreadExecutor.execute(new FourThreadPoolsRunnableImpl(index));
}
/*newCachedThreadPool.shutdown();
newFixedThreadPool.shutdown();
newScheduledThreadPool.shutdown();
newSingleThreadExecutor.shutdown();*/
//执行的任务类
public class FourThreadPoolsRunnableImpl implements Runnable{
private Integer index;
public FourThreadPoolsRunnableImpl(Integer index)
{
this.index=index;
}
@Override
public void run() {
try {
System.out.println("开始处理线程,index="+index);
Thread.sleep(3000);
System.out.println("执行完毕,index="+index+" 线程标识:"+this.toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//定义线程安全线程池
private static AtomicReference<ExecutorService> serviceRef = new AtomicReference();
//第一步,初始化线程池
public static void init(String maxPoolSizeStr) {
if (serviceRef.get() == null) {
synchronized(ThreadPoolExecutorDemo.class) {
if (serviceRef.get() == null) {
int maxPoolSize = 50;
if (maxPoolSizeStr != null) {
maxPoolSize = Integer.parseInt(maxPoolSizeStr);
}
//设定固定大小的线程池, 默认线程池大小50
serviceRef.set(new ThreadPoolExecutor(maxPoolSize, maxPoolSize, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue()));
}
}
}
}
//第二步,使用线程池执行callable任务, 返回future结果集
public static <T> Map<String, Future<T>> execute(){
Map<String, Future<T>> resultFutures = new HashMap();
Map<String, Callable<T>> tasks = createTasks();
Iterator i$ = tasks.keySet().iterator();
while(i$.hasNext()) {
String key = (String)i$.next();
resultFutures.put(key, ((ExecutorService)serviceRef.get()).submit((Callable)tasks.get(key)));
}
return resultFutures;
}
private static <T> Map<String,Callable<T>> createTasks() {
//示例创建多个待执行的callable任务,供线程池执行
Map<String,Callable<T>> taskMap = new HashMap<>();
taskMap.put("task1", new MyCallable(1));
taskMap.put("task2",new MyCallable(2));
return taskMap;
}
//第三步, 提供使用完成之后停止线程池的方法
public static void shutdown() {
if (serviceRef.get() != null) {
synchronized(ThreadPoolExecutorDemo.class) {
if (serviceRef.get() != null) {
((ExecutorService)serviceRef.get()).shutdown();
serviceRef.set(null);
}
}
}
}
总结java线程CountDownLatch基础知识
CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。主线程必须在启动其他线程后立即调用CountDownLatch.await()方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。
//伪代码
//Main thread start
//Create CountDownLatch for N threads
//Create and start N threads
//Main thread wait on latch
//N threads completes there tasks are returns
//Main thread resume execution
有时我们想同时启动多个线程,实现最大程度的并行性。例如,我们想测试一个单例类。如果我们创建一个初始计数为1的CountDownLatch,并让所有线程都在这个锁上等待,那么我们可以很轻松地完成测试。我们只需调用 一次countDown()方法就可以让所有的等待线程同时恢复执行。
应用程序启动前检查各个外部服务的状态
//入口程序
public class MyApplication {
public static void main(String[] args) {
boolean result = false;
try {
result = ApplicationStartupUtil.checkExternalServices();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("External services validation completed !! Result was :: "+ result);
}
}
//服务检查工具类
public class ApplicationStartupUtil
{
//List of service checkers
private static List<BaseHealthChecker> _services;
//This latch will be used to wait on
private static CountDownLatch _latch;
private ApplicationStartupUtil()
{
}
private final static ApplicationStartupUtil INSTANCE = new ApplicationStartupUtil();
public static ApplicationStartupUtil getInstance()
{
return INSTANCE;
}
public static boolean checkExternalServices() throws Exception
{
//Initialize the latch with number of service checkers
_latch = new CountDownLatch(3);
//All add checker in lists
_services = new ArrayList<BaseHealthChecker>();
_services.add(new NetworkHealthChecker(_latch));
_services.add(new CacheHealthChecker(_latch));
_services.add(new DatabaseHealthChecker(_latch));
//Start service checkers using executor framework
Executor executor = Executors.newFixedThreadPool(_services.size());
for(final BaseHealthChecker v : _services)
{
executor.execute(v);
}
//Now wait till all services are checked
_latch.await();
//Services are file and now proceed startup
for(final BaseHealthChecker v : _services)
{
if( ! v.isServiceUp())
{
return false;
}
}
return true;
}
}
//服务检查的基类
public abstract class BaseHealthChecker implements Runnable {
private CountDownLatch _latch;
private String _serviceName;
private boolean _serviceUp;
//Get latch object in constructor so that after completing the task, thread can countDown() the latch
public BaseHealthChecker(String serviceName, CountDownLatch latch)
{
super();
this._latch = latch;
this._serviceName = serviceName;
this._serviceUp = false;
}
@Override
public void run() {
try {
verifyService();
_serviceUp = true;
} catch (Throwable t) {
t.printStackTrace(System.err);
_serviceUp = false;
} finally {
if(_latch != null) {
_latch.countDown();
}
}
}
public String getServiceName() {
return _serviceName;
}
public boolean isServiceUp() {
return _serviceUp;
}
//This methos needs to be implemented by all specific service checker
public abstract void verifyService();
}
//检查网络的实现类
public class NetworkHealthChecker extends BaseHealthChecker
{
public NetworkHealthChecker (CountDownLatch latch) {
super("Network Service", latch);
}
@Override
public void verifyService()
{
System.out.println("Checking " + this.getServiceName());
try
{
Thread.sleep(7000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println(this.getServiceName() + " is UP");
}
}
//检查缓存的实现类
public class CacheHealthChecker extends BaseHealthChecker{//实现同上}
//检查数据库的实现类
public class DatabaseHealthChecker extends BaseHealthChecker{//实现同上}
public class CountDownLatch {
//利用了AQS内部类的共享锁的机制来实现的
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
//调用await的时候其实就是判断state是否被减到0了
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
//1. 初始化的时候直接设置了state为一个数值
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
public void await() throws InterruptedException {
//2. 调用了内部类的Sync.tryAcquireShared()方法
//其实就是判断state是否等于0了
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
//3. 将state数值减1
public void countDown() {
sync.releaseShared(1);
}
public long getCount() {
return sync.getCount();
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}
不可重入,支持公平信号量、非公平信号量,默认非公平
可以用于资源访问限流,连接池,缓存池等
Semaphore只是控制线程的数量,并不能实现同步,所以如果需要同步还是加锁或使用同步机制。信号量只是在信号不够的时候挂起线程,但是并不能保证信号量足够的时候获取对象和返还对象是线程安全的。
非公平信号量的吞吐量总是要比公平信号量的吞吐量要大,但是需要强调的是非公平信号量和非公平锁一样存在“饥渴死”的现象,也就是说活跃线程可能总是拿到信号量,而非活跃线程可能难以拿到信号量。而对于公平信号量由于总是靠请求的线程的顺序来获取信号量,所以不存在此问题。
final Semaphore semaphore = new Semaphore(2);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;
executorService.execute(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("线程:" + Thread.currentThread().getName() + "获得许可:" + index);
TimeUnit.SECONDS.sleep(1);
semaphore.release();
System.out.println("TASK个数:" + semaphore.availablePermits());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
executorService.shutdown();
CyclicBarrier翻译过来也叫栅栏,意思很明显,就是一组线程相互等待,均到达栅栏的时候,再运行。CyclicBarrier是可以重复使用的,而之前的CountDownLatch是一次性的。CyclicBarrier允许一组线程相互等待,直到到达某个公共屏障点,屏障点即一组任务执行完毕的时候。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
public class RunningMan extends Thread{
private String name;
private long speed;
private CyclicBarrier cyclicBarrier;
private CountDownLatch countDownLatch;
public RunningMan(String name,long speed,CyclicBarrier cyclicBarrier,CountDownLatch countDownLatch){
this.name=name;
this.speed=speed;
this.cyclicBarrier=cyclicBarrier;
this.countDownLatch=countDownLatch;
}
@Override
public void run(){
running();
celebrate();
sayGoodBy();
}
private void running(){
System.out.println("Waiting "+name);
try {
//所有人开始等待人到期,最后一个人到期后就开始跑了
int index=cyclicBarrier.await();
System.out.println("index is "+index);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("Running "+speed);
try {
Thread.sleep(speed);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void celebrate(){
System.out.println("Celebrate man comming "+name);
try {
int index=cyclicBarrier.await(); //等待人到场后开始庆祝喝酒
System.out.println("index is "+index);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("Drinking "+speed);
try {
Thread.sleep(speed);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void sayGoodBy(){
try {
System.out.println(name+":say good bye");
//等待每个人say good bye
int index=cyclicBarrier.await();
//CountDownLatch减1
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public class CyclicBarrierTest {
// 定义一个CyclicBarrier,可以重复使用
private static CyclicBarrier cyclicBarrier=new CyclicBarrier(5);
// 定义一个CountDownLatch,用于最后每个人say bye后打印,bye bye
private static CountDownLatch countDownLatch=new CountDownLatch(5);
public static void main(String[] args){
for(int i=1;i<=5;i++){
new RunningMan("name"+i,Long.valueOf(i*1000),cyclicBarrier,countDownLatch).start();
}
try {
countDownLatch.await(); //主线程等待最后bye bye
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("ByeBye");
}
}
总结java线程基础知识
示例代码:利用Condition来创建自定义的阻塞队列
//实现一个阻塞队列
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class MyBlockingQueue<E> {
//阻塞队列最大容量
int size;
ReentrantLock lock = new ReentrantLock();
//队列底层实现
LinkedList<E> list=new LinkedList<>();
//队列满时的等待条件
Condition notFull = lock.newCondition();
//队列空时的等待条件
Condition notEmpty = lock.newCondition();
public MyBlockingQueue(int size) {
this.size = size;
}
public void enqueue(E e) throws InterruptedException {
try {
lock.lock();
String threadName = Thread.currentThread().getName();
System.out.println(threadName+" 开始生产,锁的hashCode: "+lock.hashCode());
//队列已满,在notFull条件上等待
while (list.size() ==size)
{
notFull.await();
}
//入队:加入链表末尾
list.add(e);
System.out.println("入队:" +e);
//通知在notEmpty条件上等待的线程
notEmpty.signal();
} finally {
lock.unlock();
}
}
public E dequeue() throws InterruptedException {
try {
lock.lock();
String threadName = Thread.currentThread().getName();
System.out.println(threadName+" 开始消费,锁的hashCode: "+lock.hashCode());
//队列为空,在notEmpty条件上等待
while (list.size() == 0)
{
notEmpty.await();
}
//出队:移除链表首元素
E e = list.removeFirst();
System.out.println(threadName + " 出队:"+e);
//通知在notFull条件上等待的线程
notFull.signal();
return e;
} finally {
lock.unlock();
}
}
}
ReentrantLock默认是非公平锁,直接先通过CAS将state+1尝试获取锁,多线程会争抢锁,而公平锁是直接插入到CLH队列的尾部等待锁,先到先得
读写锁的机制:
锁的升级、降级:
写锁可以降级为读锁,反过来读锁不可以升级为写锁。就是说同一个线程获取写锁之后还可以再次获取读锁,不会死锁,但是反过来不行。
//同一个线程写锁通过重入的方式可以降级为读锁,反过来读锁不支持升级到写锁
ReentrantReadWriteLock rwLock=new ReentrantReadWriteLock();
rwLock.writeLock().lock();
System.out.println("get write lock");
rwLock.readLock().lock();
System.out.println("get read lock");
rwLock.readLock().unlock();
System.out.println("read unlock");
rwLock.writeLock().unlock();
System.out.println("write unlock");
System.out.println(rwLock.getReadHoldCount());
System.out.println(rwLock.getWriteHoldCount());
读写缓存的示例,锁降级的目的是为了提高执行效率:
public class CacheDemo {
private Map<String, Object> map = new HashMap<>(128);
private ReadWriteLock rwl = new ReentrantReadWriteLock();
public Object get(String id){
Object value = null;
//首先开启读锁,开始读缓存数据
rwl.readLock().lock();
try{
//如果缓存中没有释放读锁,上写锁
if(map.get(id) == null){
rwl.readLock().unlock();
rwl.writeLock().lock();
try{
//防止多写线程重复查询赋值
if(map.get(id) == null){
//此时可以去数据库中查找,这里简单的模拟一下
value = "value from thread: "+Thread.currentThread().getName();
map.put(id,value);
}
//写完成之后立即加读锁进行降级,目的是让写操作之后的逻辑不会影响其他线程的读操作(提高效率),但读锁这个时候还没有释放,可以继续执行读操作或者其它逻辑
rwl.readLock().lock();
}finally{
//写操作完成之后立即降级为读锁,然后释放写锁
rwl.writeLock().unlock();
}
}
//进行读操作
return map.get(id);
}finally{
rwl.readLock().unlock(); //最终释放读锁
}
}
}
读锁之间可以共享,同一把锁多个线程可以同时加锁。写锁是独占的,写锁与写锁,写锁与读锁,都不能同时加锁。
public class ReadAndWriteLockTest {
public static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public static void main(String[] args) {
//同时写
ExecutorService service = Executors.newCachedThreadPool();
service.execute(new Runnable() {
@Override
public void run() {
writeFile(Thread.currentThread());
}
});
service.execute(new Runnable() {
@Override
public void run() {
writeFile(Thread.currentThread());
}
});
service.shutdown();
}
// 读操作
public static void readFile(Thread thread) {
lock.readLock().lock();
boolean isWriteLocked = lock.isWriteLocked();
if (!isWriteLocked) {
System.out.println("当前为读锁!");
}
try {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(thread.getName() + ":正在进行读操作……");
}
System.out.println(thread.getName() + ":读操作完毕!");
} finally {
System.out.println("释放读锁!");
lock.readLock().unlock();
}
}
// 写操作
public static void writeFile(Thread thread) {
lock.writeLock().lock();
boolean isWriteLocked = lock.isWriteLocked();
if (isWriteLocked) {
System.out.println("当前为写锁!");
}
try {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(thread.getName() + ":正在进行写操作……");
}
System.out.println(thread.getName() + ":写操作完毕!");
} finally {
System.out.println("释放写锁!");
lock.writeLock().unlock();
}
}
}
总结java线程基础知识
synchronized,是Java中用于解决并发情况下数据同步访问的一个很重要的关键字。当我们想要保证一个共享资源在同一时间只会被一个线程访问到时,我们可以在代码中使用synchronized关键字对类或者对象加锁。那么,本文来介绍一下synchronized关键字的实现原理是什么。在阅读本文之间,建议先看下Java虚拟机是如何执行线程同步的 。
众所周知,在Java中,synchronized有两种使用形式,同步方法和同步代码块。代码如下:
/**
* @author Hollis 17/11/9.
*/
public class SynchronizedTest {
public synchronized void doSth(){
System.out.println("Hello World");
}
public void doSth1(){
synchronized (SynchronizedTest.class){
System.out.println("Hello World");
}
}
}
我们先来使用Javap来反编译以上代码,结果如下(部分无用信息过滤掉了):
public synchronized void doSth();
descriptor: ()V
flags: ACC_PUBLIC, ACC_SYNCHRONIZED
Code:
stack=2, locals=1, args_size=1
0: getstatic #2 // Field java/lang/System.out:Ljava/io/PrintStream;
3: ldc #3 // String Hello World
5: invokevirtual #4 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
8: return
public void doSth1();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=2, locals=3, args_size=1
0: ldc #5 // class com/hollis/SynchronizedTest
2: dup
3: astore_1
4: monitorenter
5: getstatic #2 // Field java/lang/System.out:Ljava/io/PrintStream;
8: ldc #3 // String Hello World
10: invokevirtual #4 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
13: aload_1
14: monitorexit
15: goto 23
18: astore_2
19: aload_1
20: monitorexit
21: aload_2
22: athrow
23: return
反编译后,我们可以看到Java编译器为我们生成的字节码。在对于doSth和doSth1的处理上稍有不同。也就是说。JVM对于同步方法和同步代码块的处理方式不同。
对于同步方法,JVM采用ACC_SYNCHRONIZED标记符来实现同步。 对于同步代码块。JVM采用monitorenter、monitorexit两个指令来实现同步。
关于这部分内容,在JVM规范中也可以找到相关的描述。
The Java® Virtual Machine Specification中有关于方法级同步的介绍:
Method-level synchronization is performed implicitly, as part of method invocation and return. A synchronized method is distinguished in the run-time constant pool’s method_info structure by the ACC_SYNCHRONIZED flag, which is checked by the method invocation instructions. When invoking a method for which ACC_SYNCHRONIZED is set, the executing thread enters a monitor, invokes the method itself, and exits the monitor whether the method invocation completes normally or abruptly. During the time the executing thread owns the monitor, no other thread may enter it. If an exception is thrown during invocation of the synchronized method and the synchronized method does not handle the exception, the monitor for the method is automatically exited before the exception is rethrown out of the synchronized method.
主要说的是: 方法级的同步是隐式的。同步方法的常量池中会有一个ACC_SYNCHRONIZED
标志。当某个线程要访问某个方法的时候,会检查是否有ACC_SYNCHRONIZED
,如果有设置,则需要先获得监视器锁,然后开始执行方法,方法执行之后再释放监视器锁。这时如果其他线程来请求执行方法,会因为无法获得监视器锁而被阻断住。值得注意的是,如果在方法执行过程中,发生了异常,并且方法内部并没有处理该异常,那么在异常被抛到方法外面之前监视器锁会被自动释放。
同步代码块使用monitorenter
和monitorexit
两个指令实现。 The Java® Virtual Machine Specification 中有关于这两个指令的介绍:
monitorenter
Each object is associated with a monitor. A monitor is locked if and only if it has an owner. The thread that executes monitorenter attempts to gain ownership of the monitor associated with objectref, as follows:
If the entry count of the monitor associated with objectref is zero, the thread enters the monitor and sets its entry count to one. The thread is then the owner of the monitor.
If the thread already owns the monitor associated with objectref, it reenters the monitor, incrementing its entry count.
If another thread already owns the monitor associated with objectref, the thread blocks until the monitor’s entry count is zero, then tries again to gain ownership.
monitorexit
The thread that executes monitorexit must be the owner of the monitor associated with the instance referenced by objectref.
The thread decrements the entry count of the monitor associated with objectref. If as a result the value of the entry count is zero, the thread exits the monitor and is no longer its owner. Other threads that are blocking to enter the monitor are allowed to attempt to do so.
大致内容如下: 可以把执行monitorenter
指令理解为加锁,执行monitorexit
理解为释放锁。 每个对象维护着一个记录着被锁次数的计数器。未被锁定的对象的该计数器为0,当一个线程获得锁(执行monitorenter
)后,该计数器自增变为 1 ,当同一个线程再次获得该对象的锁的时候,计数器再次自增。当同一个线程释放锁(执行monitorexit
指令)的时候,计数器再自减。当计数器为0的时候。锁将被释放,其他线程便可以获得锁。
同步方法通过ACC_SYNCHRONIZED
关键字隐式的对方法进行加锁。当线程要执行的方法被标注上ACC_SYNCHRONIZED
时,需要先获得锁才能执行该方法。
同步代码块通过monitorenter
和monitorexit
执行来进行加锁。当线程执行到monitorenter
的时候要先获得所锁,才能执行后面的方法。当线程执行到monitorexit
的时候则要释放锁。
每个对象自身维护这一个被加锁次数的计数器,当计数器数字为0时表示可以被任意线程获得锁。当计数器不为0时,只有获得锁的线程才能再次获得锁。即可重入锁。
两个指令的执行是JVM通过调用操作系统的互斥原语mutex来实现,被阻塞的线程会被挂起、等待重新调度,会导致“用户态和内核态”两个态之间来回切换,对性能有较大影响。
锁偏向是一种针对加锁操作的优化手段。它的核心思想是:如果一个线程获得了锁,那么锁就进入偏向模式。当这个线程再次请求锁时,无须再做任何同步操作。这样就节省了大量有关锁申请的操作,从而提高了程序性能。因此,对于几乎没有锁竞争的场合,偏向锁有比较好的优化效果,因为连续多次极有可能是同一个线程请求相同的锁。而对于锁竞争比较激烈的场合,其效果不佳。因为在竞争激烈的场合,最有可能的情况是每次都是不同的线程来请求相同的锁。这样偏向模式会失效,因此还不如不启用偏向锁。
偏向锁的撤销在上述第四步骤中有提到,偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动去释放偏向锁。偏向锁的撤销,需要等待全局安全点(在这个时间点上没有字节码正在执行),它会首先暂停拥有偏向锁的线程,判断锁对象是否处于被锁定状态,撤销偏向锁后恢复到未锁定(标志位为“01”)或轻量级锁(标志位为“00”)的状态。
(1)在代码进入同步块的时候,如果同步对象锁状态为无锁状态(锁标志位为“01”状态,是否为偏向锁为“0”),虚拟机首先将在当前线程的栈帧中建立一个名为锁记录(Lock Record)的空间,用于存储锁对象目前的Mark Word的拷贝,官方称之为 Displaced Mark Word。这时候线程堆栈与对象头的状态如图所示。
(2)拷贝对象头中的Mark Word复制到锁记录中。
(3)拷贝成功后,虚拟机将使用CAS操作尝试将对象的Mark Word更新为指向Lock Record的指针,并将Lock record里的owner指针指向object mark word。如果更新成功,则执行步骤(4),否则执行步骤(5)。
(4)如果这个更新动作成功了,那么这个线程就拥有了该对象的锁,并且对象Mark Word的锁标志位设置为“00”,即表示此对象处于轻量级锁定状态,这时候线程堆栈与对象头的状态如图所示。
(5)如果这个更新操作失败了,虚拟机首先会检查对象的Mark Word是否指向当前线程的栈帧,如果是就说明当前线程已经拥有了这个对象的锁,那就可以直接进入同步块继续执行。否则说明多个线程竞争锁,轻量级锁就要膨胀为重量级锁,锁标志的状态值变为“10”,Mark Word中存储的就是指向重量级锁(互斥量)的指针,后面等待锁的线程也要进入阻塞状态。 而当前线程便尝试使用自旋来获取锁,自旋就是为了不让线程阻塞,而采用循环去获取锁的过程。
(1)通过CAS操作尝试把线程中复制的Displaced Mark Word对象替换当前的Mark Word。
(2)如果替换成功,整个同步过程就完成了。
(3)如果替换失败,说明有其他线程尝试过获取该锁(此时锁已膨胀),那就要在释放锁的同时,唤醒被挂起的线程。
总结java线程基础知识
在多核多CPU的环境下并发执行指令的时候,由于每个CPU核心都有自己的高速缓存,CPU缓存之间是相互隔离的,但是各个缓存中数据可能对应主内存中相同的数据,在CPU进行数据读取、写入的过程中就会存在缓存一致性的问题。解决缓存一致性的问题有两种方式:
- 在总线上加锁,同一时间只能有一个CPU核心操作主内存的同一份数据内存
- 缓存一致性协议,当CPU写数据时,如果发现操作的变量是共享变量,即在其他CPU中也存在该变量的副本,会发出信号通知其他CPU将该变量的缓存行置为无效状态,因此当其他CPU需要读取这个变量时,发现自己缓存中缓存该变量的缓存行是无效的,那么它就会从内存重新读取。
缓存一致性协议(MESI):
下面分析两核cpu的读写操作 假如cpuA要读取一个数据x的流程 1.cpuA先在自己的缓存里找x,看是否缓存过,如果是则查看状态,如果不是Invalid状态,则直接读取 2.若状态为无效,则cpuA向总线发送一个请求,表示需要读取x,由于cpu之间是互相监听的状态,其他cpu获取到这个请求,到自己的缓存中找x,如果找到这个数据且不为Invalid状态,向总线发送一个消息通知cpuA去获取,如果是Exclusive(独占)状态,首先要将它变为Shared(共享态),如果该数据为Modifield(被修改)状态,则要把该数据写到内存中去,把状态改为Shared,再通知cpuA,如果其他cpu的状态都为Invalid态,则向主内存中读取 cpuA要写入一个数据的流程 1.首先查看自己的缓存中是否有这个数据,若存在且为Exclusive或Modified状态,则直接写入 2.如果时Shared态,则通过总线向其他cpu发送消息,其他cpu接受到消息把自己的数据置为无效 3,如果为Invalid态,则先向主内存中读取,然后写入,再把其他cpu内的数据置为Invalid 总结:cpu除了和内存传输时需要进行总线交互,还会不断地监听和嗅探总线上发生的数据交换,当一个缓存代表它所在的处理器去读写内存时,其他处理器都会得到通知来使自己的缓存保持同步
Java内存模型(Java Memory Model ,JMM)就是一种符合内存模型规范的,屏蔽了各种硬件和操作系统的访问差异的,保证了Java程序在各种平台下对内存的访问都能保证效果一致的机制及规范。JMM决定一个线程对共享变量的写入何时对另一个线程可见。
Java内存模型规定了所有的变量都存储在主内存中,每条线程还有自己的工作内存,线程的工作内存中保存了该线程中是用到的变量的主内存副本拷贝,线程对变量的所有操作都必须在工作内存中进行,而不能直接读写主内存。不同的线程之间也无法直接访问对方工作内存中的变量,线程间变量的传递均需要自己的工作内存和主存之间进行数据同步进行。
而JMM就作用于工作内存和主存之间数据同步过程。他规定了如何做数据同步以及什么时候做数据同步。
总结下,JMM是一种规范,目的是解决由于多线程通过共享内存进行通信时,存在的本地内存数据不一致、编译器会对代码指令重排序、处理器会对代码乱序执行等带来的问题。
Java内存模型,除了定义了一套规范,还提供了一系列原语,封装了底层实现后,供开发者直接使用。 比如volatile、synchronized、final、concurrent包等
JMM规定线程间通信必须要经过主内存。如果线程A与线程B之间要通信的话,必须要经历下面2个步骤:
关于主内存与工作内存之间的具体交互协议,即一个变量如何从主内存拷贝到工作内存、如何从工作内存同步到主内存之间的实现细节,Java内存模型定义了以下八种操作来完成: