欢迎光临
我们一直在努力

Java并发——基石篇(上)

概要

并行是这个时代的主旋律,也是很多现代操作系统需要提供的必备功能,在过去摩尔定律催生下,单个CPU核心计算的速度越来越快。但是随着产业的发展,单个CPU核心的计算上限已经难以突破,传统的加强单核的思维模式已经不能满足需求。在古代,人们需要强大的战马来驱动战车,为了能够使得战斗力越来越强,人们驯化了越来越强劲的战马,但是单匹马的力量始终是有限的,因此人们发明了多马并驾的战车结构。同样地,在现代计算机领域,人们在单个CPU核心能力有限的情况下,使用多个核心的CPU进行并行计算以驱动强大的算力。
但是,多CPU和多战马是远远不同的,在现实世界中的计算任务大多需要相互协调,其根本原因是人类的思维方式是线性串行的,设计一个完全并行的计算逻辑体系还是有相当大难度的。

如何设计一个高并发的程序,不仅仅是工程界的难题,在计算机学术界也是一个需要不断突破的研究领域。从学术理论提出,到算法设计,再到工程实施,再到长夜验证调优,整个流程都需要比较长的时间来进行迭代,究其根本,并行计算本身救赎非常复杂,不确定的,不可预测的逻辑系统。

多核系统中的一致性

Java号称一次编写,到处运行,其本身也是构建在不同的系统之上的,以其运行时JVM来屏蔽系统底层的差异。因此,在介绍Java并发体系之前,有必要简要介绍依稀计算机系统层面上的并发,以及面对的问题。

我们的目的其实很简单,就是让计算机在同一时刻,能够运行更多的任务。而并行计算,提供了非常不错的解决方案。虽然这看起来很自然,但实际上面临着众多的问题,其中一个重大的问题就是绝大多数的计算不仅仅是CPU一个人的事,而是需要很多计算机系统部件共同参与。但是我们知道,计算机系统中运行速度最快的就是CPU,其他部件例如:内存、磁盘、网络等等都是及其缓慢的,同时这些操作在目前的计算机体系中是很难消除的,因为我们不可能仅仅靠寄存器就完成所有的计算任务。面对高速CPU和低速存储之间的鸿沟,如果想要实现高效数据通讯,一个良好的解决方案就是在他们之间台南佳一个cache层,这个cache层的速度和整体的速度关系如下:

CPU --> cache --> 存储

通过cache这个缓冲地带,实现CPU和存储之间的高效沟通,这是计算机和软件领域通用的一个问题解决问题:增加中间层,如果一个中间层解决不了,那就两层。在运算的时候,CPU将需要使用到的数据复制到cache中,以后每次获取数据都较为快速的从cache中获取,加快访问速度。

所谓理想很丰满,现实很骨感。这种计算体系有一个重要的问题需要解决,那就是:缓存一致性(cache coherence)问题。在现代的计算机系统中,主要都是多核系统为主。在这些计算机系统中,每一个CPU都拥有自己独立的高速缓存,但是因为主存只有一个,因此他们之间只能共享,这种系统也被称为:共享内存多核系统(Shared-Menory multiprocessors System)。

同时为了保证CPU数据存储的一致性,需要定义一个统一的缓存一致性协议,这类协议有很多,例如:MSI、MESI、MOSI、Synapse、Firefly以及Dragon Protocol等等。所以,通常情况下,共享内存多核系统的架构如下:
image

除了使用高速cache来缓存CPU和存储设备之间的速度鸿沟,为了能够充分利用多核CPU的处理性能,处理在实际执行机器指令时并不一定会按照程序设定的指令顺序执行,可能存在代码乱序执行(Out-Of_Order Execution)优化。但是,仅仅只是在代码层面上乱序执行,系统会保证执行的结果逻辑正确,从宏观上看就好像是顺序执行一样。

Java内存模型

上面我们探讨了共享内存多核系统的内存模型,我们提到了高速缓存以及缓存一致性问题,同时还介绍了指令乱序执行的问题。其实,这些概念在Java中也是存在的。因为Java的目标是:一次编写,到处运行,因此必须在JVM层面上将系统之间的差异屏蔽掉。面对如此多的系统,最好的方式就是定义一套Java自己的内存访问模型,然后在不同的硬件平台和操作系统上分别利用本地接口来实现。这里的思想其实和增加cache是一样的,通过增加中间层来解决系统差异带来的协作问题。

Java工作内存和主存之间的一致性保证主要通过以下4种操作完成:

  1. read:Java执行引擎访问引擎访问本地工作内存中的变量副本,如果变量副本无效(变量副本不存在也是无效的一种),那就去主存中获取,同时在本地工作内存中缓存一份
  2. write:Java执行引擎将最新的变量值赋值给工作内存中的变量副本,同时需要判断是否需要将这个新的值立即同步到主内存,如果需要同步的话,还需要配合lock操作
  3. lock:Java执行引擎将主内存中的变量锁定,锁定的含义有:其他的线程在此之后不能访问这个变量直到本线程unlock;一旦锁定,其他线程对这个变量的操作必须等待
  4. unlock:Java执行引擎将主内存中的变量解锁,解锁之后各个线程才能重新并发访问这个变量,直到变量被某个线程再次锁定

Java Thread创建

在Java中,我们都知道,一个线程直接对应了一个Thread对象。创建和启动一个线程是比较容易的,我们只需要创建一个Thread对象,然后调用对象的start方法即可。但是在创建一个Thread对象和启动线程JVM中究竟发生了什么?本节我们就来看下。

在创建一个Thread对象的时候,除了一些初始化设置之外就没有其他实质性的操作,真正的工作其实是在start方法调用中产生的。

Java通过registerNatives方法将Thread类中的java方法和一个本地的C/C++函数进行对应,同时registerNatives方法是类加载的时候调用的,因此在类首次加载的时候(Bootstarp类加载)就会注册这些native方法。

/* Make sure registerNatives is the first thing <clinit> does. */
private static native void registerNatives();
static {
    registerNatives();
}
static JNINativeMethod methods[] = {
    {"start0",           "()V",        (void *)&JVM_StartThread},
    {"stop0",            "(" OBJ ")V", (void *)&JVM_StopThread},
    {"isAlive",          "()Z",        (void *)&JVM_IsThreadAlive},
    {"suspend0",         "()V",        (void *)&JVM_SuspendThread},
    {"resume0",          "()V",        (void *)&JVM_ResumeThread},
    {"setPriority0",     "(I)V",       (void *)&JVM_SetThreadPriority},
    {"yield",            "()V",        (void *)&JVM_Yield},
    {"sleep",            "(J)V",       (void *)&JVM_Sleep},
    {"currentThread",    "()" THD,     (void *)&JVM_CurrentThread},
    {"countStackFrames", "()I",        (void *)&JVM_CountStackFrames},
    {"interrupt0",       "()V",        (void *)&JVM_Interrupt},
    {"isInterrupted",    "(Z)Z",       (void *)&JVM_IsInterrupted},
    {"holdsLock",        "(" OBJ ")Z", (void *)&JVM_HoldsLock},
    {"getThreads",        "()[" THD,   (void *)&JVM_GetAllThreads},
    {"dumpThreads",      "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
    {"setNativeName",    "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};
JNIEXPORT void JNICALL
Java_java_lang_Thread_registerNatives(JNIEnv *env, jclass cls)
{
    (*env)->RegisterNatives(env, cls, methods, ARRAY_LENGTH(methods));
}

再看看对应JNI的结构体

/*
 * used in RegisterNatives to describe native method name, signature, * and function pointer. */
typedef struct {
    char *name;
    char *signature;
    void *fnPtr;
} JNINativeMethod;

即第一列是Java中定义的native方法名称,第二列是Java方法签名,第三列是本地方法对应函数。因此,Java中的start方法就是对应native的JVM——StartThread函数:

JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
  JVMWrapper("JVM_StartThread");
  JavaThread *native_thread = NULL;

  // We cannot hold the Threads_lock when we throw an exception,   // due to rank ordering issues. Example:  we might need to grab the   // Heap_lock while we construct the exception.   bool throw_illegal_thread_state = false;

  // We must release the Threads_lock before we can post a jvmti event   // in Thread::start.   {
    // Ensure that the C++ Thread and OSThread structures aren't freed before     // we operate.     MutexLocker mu(Threads_lock);

    // Since JDK 5 the java.lang.Thread threadStatus is used to prevent     // re-starting an already started thread, so we should usually find     // that the JavaThread is null. However for a JNI attached thread     // there is a small window between the Thread object being created     // (with its JavaThread set) and the update to its threadStatus, so we     // have to check for this     if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
      throw_illegal_thread_state = true;
    } else {
      // We could also check the stillborn flag to see if this thread was already stopped, but       // for historical reasons we let the thread detect that itself when it starts running 
      jlong size =
             java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
      // Allocate the C++ Thread structure and create the native thread.  The       // stack size retrieved from java is 64-bit signed, but the constructor takes       // size_t (an unsigned type), which may be 32 or 64-bit depending on the platform.       //  - Avoid truncating on 32-bit platforms if size is greater than UINT_MAX.       //  - Avoid passing negative values which would result in really large stacks.       NOT_LP64(if (size > SIZE_MAX) size = SIZE_MAX;)
      size_t sz = size > 0 ? (size_t) size : 0;
      // 重点看这里!!!       native_thread = new JavaThread(&thread_entry, sz);

      // At this point it may be possible that no osthread was created for the       // JavaThread due to lack of memory. Check for this situation and throw       // an exception if necessary. Eventually we may want to change this so       // that we only grab the lock if the thread was created successfully -       // then we can also do this check and throw the exception in the       // JavaThread constructor.       if (native_thread->osthread() != NULL) {
        // Note: the current thread is not being used within "prepare".         native_thread->prepare(jthread);
      }
    }
  }

  if (throw_illegal_thread_state) {
    THROW(vmSymbols::java_lang_IllegalThreadStateException());
  }

  assert(native_thread != NULL, "Starting null thread?");

  if (native_thread->osthread() == NULL) {
    // No one should hold a reference to the 'native_thread'.     native_thread->smr_delete();
    if (JvmtiExport::should_post_resource_exhausted()) {
      JvmtiExport::post_resource_exhausted(
        JVMTI_RESOURCE_EXHAUSTED_OOM_ERROR | JVMTI_RESOURCE_EXHAUSTED_THREADS,
        os::native_thread_creation_failed_msg());
    }
    THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(),
              os::native_thread_creation_failed_msg());
  }

  Thread::start(native_thread);

JVM_END

这段代码的主要作用是创建一个JavaThread对象并启动。我们进入创建JavaThread构造函数

JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) :
                       Thread() {
  initialize();
  _jni_attach_state = _not_attaching_via_jni;
  set_entry_point(entry_point);
  // Create the native thread itself.   // %note runtime_23   os::ThreadType thr_type = os::java_thread;
  thr_type = entry_point == &compiler_thread_entry ? os::compiler_thread :
                                                     os::java_thread;
  // 通过 os 类的 create_thread 函数来创建一个线程   os::create_thread(this, thr_type, stack_sz);
  // The _osthread may be NULL here because we ran out of memory (too many threads active).   // We need to throw and OutOfMemoryError - however we cannot do this here because the caller   // may hold a lock and all locks must be unlocked before throwing the exception (throwing   // the exception consists of creating the exception object & initializing it, initialization   // will leave the VM via a JavaCall and then all locks must be unlocked).   //   // The thread is still suspended when we reach here. Thread must be explicit started   // by creator! Furthermore, the thread must also explicitly be added to the Threads list   // by calling Threads:add. The reason why this is not done here, is because the thread   // object must be fully initialized (take a look at JVM_Start) }

可以看到,重点是通过os类的create_thread函数来创建一个线程,因为JVM是跨平台的,并且不同操作系统上的线程实现机制可能不太一样,因此这里的create_thread肯定会有多个针对不同平台的实现,我们查看这个函数的实现就知道了:
image
可以看到,HotSpot提供了主要的操作系统上的实现,因为在服务器上,linux的占比是很高的,因此我们这里就看下linux上的实现:

bool os::create_thread(Thread* thread, ThreadType thr_type,
                       size_t req_stack_size) {
  ...
  // init thread attributes   pthread_attr_t attr;
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
  // Calculate stack size if it's not specified by caller.   size_t stack_size = os::Posix::get_initial_stack_size(thr_type, req_stack_size);
  // In the Linux NPTL pthread implementation the guard size mechanism   // is not implemented properly. The posix standard requires adding   // the size of the guard pages to the stack size, instead Linux   // takes the space out of 'stacksize'. Thus we adapt the requested   // stack_size by the size of the guard pages to mimick proper   // behaviour. However, be careful not to end up with a size   // of zero due to overflow. Don't add the guard page in that case.   size_t guard_size = os::Linux::default_guard_size(thr_type);
  if (stack_size <= SIZE_MAX - guard_size) {
    stack_size += guard_size;
  }
  assert(is_aligned(stack_size, os::vm_page_size()), "stack_size not aligned");

  int status = pthread_attr_setstacksize(&attr, stack_size);
  assert_status(status == 0, status, "pthread_attr_setstacksize");

  // Configure glibc guard page.   pthread_attr_setguardsize(&attr, os::Linux::default_guard_size(thr_type));
  ...
  pthread_t tid;
  // 创建并启动线程   int ret = pthread_create(&tid, &attr, (void* (*)(void*)) thread_native_entry, thread);
  ...
}

这个函数比较长,这里就省略部分,只保留和线程创建启动相关的部分,可以看到,在linux平台上,JVM的线程是通过大名鼎鼎的pthread库来创建启动线程的,这里需要注意的是,在指定线程栈大小的时候,并不是程序员指定多少就是多少,而是要根据系统平台的限制来综合决定的。我们也可以得出结论,Java Thread在底层对应一个pthread线程。我们看下pthread创建并启动线程的接口:

int thread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg);

第一个是pthread_t结构体数据指针,存放线程信息,第二个是线程的属性,第三个是线程体,也就是线程实际执行的函数,第四个是线程体的参数列表。
上面调用这个接口的地方,我们指定了线程体函数是thread_native_entry,参数是thread指针。我们先看下thread_native_entry这个函数的定义:

// Thread start routine for all newly created threads static void *thread_native_entry(Thread *thread) {
  ...
  // call one more level start routine   thread->run();
  ...
}

同样,这里只保留了重点代码,通过注释我们可以知道,thread->run()这一行是最可能执行我们run方法的地方。我们看一下代码:

// The first routine called by a new Java thread void JavaThread::run() {
  ...
  // We call another function to do the rest so we are sure that the stack addresses used   // from there will be lower than the stack base just computed   thread_main_inner();
}

这里重点是调用了thread_main_inner函数:

void JavaThread::thread_main_inner() {
  assert(JavaThread::current() == this, "sanity check");
  assert(this->threadObj() != NULL, "just checking");

  // Execute thread entry point unless this thread has a pending exception   // or has been stopped before starting.   // Note: Due to JVM_StopThread we can have pending exceptions already!   if (!this->has_pending_exception() &&
      !java_lang_Thread::is_stillborn(this->threadObj())) {
    {
      ResourceMark rm(this);
      this->set_native_thread_name(this->get_thread_name());
    }
    HandleMark hm(this);
    // 这里开始调用 java thread 的 run 方法啦~~~     this->entry_point()(this, this);
  }

  DTRACE_THREAD_PROBE(stop, this);

  // java 中的 run 方法执行完毕了,这里需要退出线程并清理资源   this->exit(false);
  // delete cpp 的对象   this->smr_delete();
}

可以看到,Java Thread中的run方法就是在this->entry_point()(this,this);这里调用的。看这里的调用方式就知道,entry_point()返回的是一个函数指针,然后直接调用,entry_point函数实现如下:

ThreadFunction entry_point() const             { return _entry_point; }

那么_entry_point是哪里来的?我们再看上面JavaThread的构造函数,我们发现了一个方法set_entry_point(entry_point),_entry_point就是我们创建JavaThread对象时传入的函数指针。

https://segmentfault.com/a/1190000039771963

赞(0)
未经允许不得转载:ITyet » Java并发——基石篇(上)
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址