GRPC-C++源碼分析(四)--ServerCompletionQueue續

2.1.2 grpc_determine_iomgr_platform

看下調用邏輯:

<code>//iomgr_internal.cc
void grpc_determine_iomgr_platform() {
if (iomgr_platform_vtable == nullptr) {
grpc_set_default_iomgr_platform();
}
}/<code>

grpc_set_default_iomgr_platform的實現:

<code>//iomgr_posix.cc
void grpc_set_default_iomgr_platform() {
grpc_set_tcp_client_impl(&grpc_posix_tcp_client_vtable);
grpc_set_tcp_server_impl(&grpc_posix_tcp_server_vtable);
grpc_set_timer_impl(&grpc_generic_timer_vtable);
grpc_set_pollset_vtable(&grpc_posix_pollset_vtable);
grpc_set_pollset_set_vtable(&grpc_posix_pollset_set_vtable);
grpc_set_resolver_impl(&grpc_posix_resolver_vtable);
grpc_set_iomgr_platform_vtable(&vtable);
}/<code>
  • 先關注grpc_set_iomgr_platform_vtable,從名字上就能看出set了一個叫&vtable的東西
  • 關注它的原因是接下來的grpc_iomgr_platform_init方法馬上就會用到
  • 其他的grpc_set_方法也同樣很重要,會在後面看到它們的用處

2.1.3 grpc_iomgr_platform_init

<code>void grpc_iomgr_platform_init() { iomgr_platform_vtable->init(); }/<code>
  • 關注點:iomgr_platform_vtable,而這個iomgr_platform_vtable就是由2.1.2節中的grpc_set_iomgr_platform_vtable方法賦值的
  • 線索轉移到了iomgr_platform_vtable->init()

2.1.4 grpc_iomgr_platform_init----iomgr_platform_vtable->init

iomgr_platform_vtable定義在:

<code>static grpc_iomgr_platform_vtable vtable = {
iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown,
iomgr_platform_shutdown_background_closure,
iomgr_platform_is_any_background_poller_thread};/<code>
  • iomgr_platform_vtable->init調用的是iomgr_platform_init方法,下面給出了調用路徑
GRPC-C++源碼分析(四)--ServerCompletionQueue續

  • 上圖明確了grpc_iomgr_platform_init方法的終極意義在於賦值給全局變量grpc_event_engine_vtable* g_event_engine
<code>//ev_posix.cc
static const grpc_event_engine_vtable* g_event_engine = nullptr;/<code>
  • try_engine調用了g_factoriesi.factory的方法,返回值賦給了grpc_event_engine_vtable* g_event_engine
  • g_factories定義在ev_posix.cc中
<code>//ev_posix.cc
static event_engine_factory g_factories[] = {
{ENGINE_HEAD_CUSTOM, nullptr}, {ENGINE_HEAD_CUSTOM, nullptr},
{ENGINE_HEAD_CUSTOM, nullptr}, {ENGINE_HEAD_CUSTOM, nullptr},
{"epollex", grpc_init_epollex_linux}, {"epoll1", grpc_init_epoll1_linux},
{"poll", grpc_init_poll_posix}, {"none", init_non_polling},
{ENGINE_TAIL_CUSTOM, nullptr}, {ENGINE_TAIL_CUSTOM, nullptr},
{ENGINE_TAIL_CUSTOM, nullptr}, {ENGINE_TAIL_CUSTOM, nullptr},
};/<code>
  • 具體選取那個factory是GRPC_POLL_STRATEGY環境變量決定的,它是在grpc_event_engine_init中獲取的
<code>//ev_posix.cc
void grpc_event_engine_init(void) {
char* s = gpr_getenv("GRPC_POLL_STRATEGY");
if (s == nullptr) {
s = gpr_strdup("all");
}/<code>
  • 在我的centos7.5上,默認選取的g_factories是:"epollex", grpc_init_epollex_linux
  • grpc_init_epollex_linux方法中return了一個vtable
<code>//ev_epollex_linux.cc 

const grpc_event_engine_vtable* grpc_init_epollex_linux(
bool explicitly_requested) {
………………
return &vtable;
}/<code>
  • 一起看下這個vtable的定義
<code>//ev_epollex_linux.cc
static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset),
true,
false,

fd_create,
fd_wrapped_fd,
fd_orphan,
fd_shutdown,
fd_notify_on_read,
fd_notify_on_write,
fd_notify_on_error,
fd_become_readable,
fd_become_writable,
fd_has_errors,
fd_is_shutdown,

pollset_init,
pollset_shutdown,
pollset_destroy,
pollset_work,
pollset_kick,
pollset_add_fd,

pollset_set_create,
pollset_set_unref, // destroy ==> unref 1 public ref
pollset_set_add_pollset,
pollset_set_del_pollset,
pollset_set_add_pollset_set,
pollset_set_del_pollset_set,
pollset_set_add_fd,
pollset_set_del_fd,

is_any_background_poller_thread,
shutdown_background_closure,
shutdown_engine,
};/<code>

如果整個2.1節都沒理解也沒關係,記住我們有了一個grpc_event_engine_vtable* g_event_engine指針就可以了,這個指針的內容就是上面的static const grpc_event_engine_vtable vtable

2.2 grpc::g_core_codegen_interface

把思路回到第二節開頭,不僅g_glip在GrpcLibraryInitializer 中完成了初始化,還有一個全局變量grpc::g_core_codegen_interface也在其中完成了初始化,它會在CompletionQueue的構造函數中用到,再貼一次代碼:

<code>//grpc_library.h
class GrpcLibraryInitializer final {
public:
GrpcLibraryInitializer() {
if (grpc::g_glip == nullptr) {
static auto* const g_gli = new GrpcLibrary();
grpc::g_glip = g_gli;
}
if (grpc::g_core_codegen_interface == nullptr) {
static auto* const g_core_codegen = new CoreCodegen();
grpc::g_core_codegen_interface = g_core_codegen;
}
}/<code>

3 CompletionQueue

回到第一節的圖:

GRPC-C++源碼分析(四)--ServerCompletionQueue續

  • 第2節已經分析了GrpcLibraryCodegen
  • 本節開始分析CompletionQueue

再來看看CompletionQueue的構造方法實現:

<code>  CompletionQueue(const grpc_completion_queue_attributes& attributes) {
cq_ = g_core_codegen_interface->grpc_completion_queue_create(
g_core_codegen_interface->grpc_completion_queue_factory_lookup(
&attributes),
&attributes, NULL);
InitialAvalanching(); // reserve this for the future shutdown
}/<code>
  • 終極目的是返回一個cq_
  • 先調用g_core_codegen_interface->grpc_completion_queue_factory_lookup返回一個grpc_completion_queue_factory* factory
  • 再調用g_core_codegen_interface->grpc_completion_queue_create返回cq_

3.1 grpc_completion_queue_factory_lookup

2.2節中已經看到g_core_codegen_interface的初始化,grpc_completion_queue_factory_lookup在父類CoreCodegenInterface中是個純虛函數,具體實現在CoreCodegen類中

<code>//core_codegen.h
class CoreCodegen final : public CoreCodegenInterface {
private:
virtual const grpc_completion_queue_factory*
grpc_completion_queue_factory_lookup(
const grpc_completion_queue_attributes* attributes) override;
virtual grpc_completion_queue* grpc_completion_queue_create(
const grpc_completion_queue_factory* factory,
const grpc_completion_queue_attributes* attributes,
void* reserved) override;/<code>
GRPC-C++源碼分析(四)--ServerCompletionQueue續

  • lookup方法最終返回一個grpc_completion_queue_factory g_default_cq_factory變量

3.2 grpc_completion_queue_create

GRPC-C++源碼分析(四)--ServerCompletionQueue續

  • grpc_completion_queue_create中調用了factory->vtable->create,其中factory就是3.1節中grpc_completion_queue_factory_lookup返回的變量g_default_cq_factory
  • 圖中看到grpc_completion_queue_create最終調用的是grpc_completion_queue_create_internal方法
<code>//completion_queue.cc
grpc_completion_queue* grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
grpc_experimental_completion_queue_functor* shutdown_callback) {
GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0);

//最終返回的的那個cq
grpc_completion_queue* cq;

GRPC_API_TRACE(
"grpc_completion_queue_create_internal(completion_type=%d, "
"polling_type=%d)",
2, (completion_type, polling_type));

//這兩個vtable是cq的核心內容
const cq_vtable* vtable = &g_cq_vtable[completion_type];
const cq_poller_vtable* poller_vtable =
&g_poller_vtable_by_poller_type[polling_type];

grpc_core::ExecCtx exec_ctx;
GRPC_STATS_INC_CQS_CREATED();

//cq的初始化也很魔性,除了本身的大小,還額外預留了vtable->data_size + poller_vtable->size()
cq = static_cast<grpc>(
gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size +
poller_vtable->size()));

//賦值給了cq裡的變量
cq->vtable = vtable;
cq->poller_vtable = poller_vtable;

/* One for destroy(), one for pollset_shutdown */
gpr_ref_init(&cq->owning_refs, 2);

//vtable的初始化

poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu);
vtable->init(DATA_FROM_CQ(cq), shutdown_callback);

GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq,
grpc_schedule_on_exec_ctx);
return cq;
}/<grpc>/<code>
  • completion_type和polling_type都是0,,是在1.1 節初始化ServerCompletionQueue時候定義的
  • g_cq_vtable和g_poller_vtable_by_poller_type都定義在completion_queue.cc文件中,很容易定位到
  • 這裡需要解釋的是cq的初始化和兩個vtable的init調用(poller_vtable->init和vtable->init)


分享到:


相關文章: