ceph中的SafeTimer类详解

it2022-05-05  76

ceph中的SafeTimer类详解

定时器事件或者任务SafeTimer分析两个关键的mapSafeTimerThreadSafeTimer类 SafeTimer的使用

定时器事件或者任务

ceph中的事件都是集成自Context类,事件处理方法是finish(),这是一个纯虚函数,由不同的事件完成自己的处理方法。 ceph的定时器原理是开启线程,循环检查是否有事件需要执行,如果没有就挂起,等待触发条件到来。主要用于集群中的某些特定任务,比如心跳机制,心跳机制就是在finish()中每隔一段事件ping一次。

SafeTimer分析

两个关键的map

SafeTimer中是循环检查需要执行的事件,所以维护了两个map,一个是包含时间和事件信息的schedule,其中的事件按照时间排序,SafeTimer会查看schedule中的第一个任务是否到期,如果到了就调用callback函数执行,如果没有就直接退出循环,是任务按时间执行的依据;另一个是events,主要用来检查任务是否正确添加或取消。

名称类型keyvalueschedulemultimaputime_tContexteventsmapContextschedule 的迭代器

SafeTimerThread

该类是从Thread类继承,拥有一个SafeTimer类成员变量和entry()成员函数,SafeTimer中的成员thread为此类型,承担了定时器循环检查是否有事件触发需要执行相应程序的任务。

class SafeTimerThread : public Thread { SafeTimer *parent; public: explicit SafeTimerThread(SafeTimer *s) : parent(s) {} void *entry() override { parent->timer_thread(); return NULL; } };

其中entry函数的意义是将SafeTimer中的timer_thread函数作为定时器线程的入口函数,原理是在基类Thread中利用entry_wrapper进行设置。

SafeTimer类

SafeTimer类中有若干成员函数,包括

名称作用参数init()定时器初始化noneshutdown()关闭定时器nonetimer_thread()定时器的循环线程noneadd_event_after()一段时间后添加事件seconds ,callbackadd_event_at()在某个时间点添加事件when, callbackcancel_event()取消某个事件callbackcancel_all_events()取消所有事件nonedump()caller

init() 初始化并创建定时器的线程

void SafeTimer::init() { ldout(cct,10) << "init" << dendl; thread = new SafeTimerThread(this); thread->create("safe_timer"); //新建一个叫SafeTimer的线程,线程函数为time_thread }

shutdown() 关闭定时器,join()函数应该会使得日志中先显示timer_thread awake然后才是timer_thread exiting。

void SafeTimer::shutdown() { ldout(cct,10) << "shutdown" << dendl; if (thread) { assert(lock.is_locked()); cancel_all_events(); //取消所有任务 stopping = true; //停止循环 cond.Signal(); //唤醒挂起的timer_thread lock.Unlock(); thread->join(); //等待子线程,也就是定时器线程结束后才执行后面的语句 lock.Lock(); //为什么又lock?不是已经结束了? delete thread; thread = NULL; } }

为什么线程都结束了还要lock一下?

timer_thread() 这里是SafeTimer线程执行的核心部分,配合流程图分析。该线程只要定时器不停止就一直存在,循环检查,有事件需要执行就去调用相应的callback函数执行;没有的时候就处于挂起状态,直达有新的事件添加进来,或者时间到了需要执行任务,该线程将会被变量cond的signal唤醒

yes no yes no yes no 循环执行schedule中的任务 开始 定时器stop? 线程退出 获取当前时间 schedule为空? 挂起线程等待有任务添加 时间还没到? 挂起线程等待时间到 获取schedule中的第一个任务 删去schedule和events中的callback函数 执行callback函数 线程唤醒 void SafeTimer::timer_thread() { lock.Lock(); ldout(cct,10) << "timer_thread starting" << dendl; //这个宏的写法是啥意思?cct包含了系统信息,10代表日志等级 while (!stopping) { //循环检查 utime_t now = ceph_clock_now(); //在condvar.h中包含 //如果没有事件需要执行那就挂起 while (!schedule.empty()) { scheduled_map_t::iterator p = schedule.begin(); // is the future now?如果第一个任务执行时间还没到就直接退出 if (p->first > now) break; Context *callback = p->second; events.erase(callback);//执行完了就从map中删去 schedule.erase(p); ldout(cct,10) << "timer_thread executing " << callback << dendl; //如果不是safe callback,那就不用lock if (!safe_callbacks) lock.Unlock(); callback->complete(0); //实际触发的事件执行 if (!safe_callbacks) lock.Lock(); } // recheck stopping if we dropped the lock if (!safe_callbacks && stopping) break; ldout(cct,20) << "timer_thread going to sleep" << dendl; if (schedule.empty()) cond.Wait(lock); //这里线程会进入等待队列,挂起不执行后面的程序?直到由任务添加进来需要执行 else cond.WaitUntil(lock, schedule.begin()->first);、而如果是执行事件还未到,那就挂起到时间条件满足 ldout(cct,20) << "timer_thread awake" << dendl; } ldout(cct,10) << "timer_thread exiting" << dendl; lock.Unlock(); }

add_event_after,add_event_at,cancel_event,cancel_all_events作用就是schedule和events中的事件添加和删除,保证事件都能够执行到。

SafeTimer的使用

SafeTimer在ceph中常用于定时的任务,比如monitor节点对集群的监控,OSD之间的心跳机制等等。下面从ceph的OSD.cc中的源码来分析SafeTimer的使用。

通常SafeTimer定义之后,会调用SafeTimer的调用函数,确定其将响应的事件cct,然后会调用init()函数初始化并启动定时器。如osd.cc中

agent_timer(osd->client_messenger->cct, agent_timer_lock); agent_timer.init();

如果需要持续不断执行某一任务,那么就需要不断地调用add_event_after()函数来为schedule添加事件,如

void OSDService::agent_entry() { dout(10) << __func__ << " start" << dendl; agent_lock.Lock(); while (!agent_stop_flag) { if (agent_queue.empty()) { dout(20) << __func__ << " empty queue" << dendl; agent_cond.Wait(agent_lock); continue; } uint64_t level = agent_queue.rbegin()->first; set<PGRef>& top = agent_queue.rbegin()->second; dout(10) << __func__ << " tiers " << agent_queue.size() << ", top is " << level << " with pgs " << top.size() << ", ops " << agent_ops << "/" << cct->_conf->osd_agent_max_ops << (agent_active ? " active" : " NOT ACTIVE") << dendl; dout(20) << __func__ << " oids " << agent_oids << dendl; int max = cct->_conf->osd_agent_max_ops - agent_ops; int agent_flush_quota = max; if (!flush_mode_high_count) agent_flush_quota = cct->_conf->osd_agent_max_low_ops - agent_ops; if (agent_flush_quota <= 0 || top.empty() || !agent_active) { agent_cond.Wait(agent_lock); continue; } if (!agent_valid_iterator || agent_queue_pos == top.end()) { agent_queue_pos = top.begin(); agent_valid_iterator = true; } PGRef pg = *agent_queue_pos; dout(10) << "high_count " << flush_mode_high_count << " agent_ops " << agent_ops << " flush_quota " << agent_flush_quota << dendl; agent_lock.Unlock(); if (!pg->agent_work(max, agent_flush_quota)) { dout(10) << __func__ << " " << pg->get_pgid() << " no agent_work, delay for " << cct->_conf->osd_agent_delay_time << " seconds" << dendl; osd->logger->inc(l_osd_tier_delay); // Queue a timer to call agent_choose_mode for this pg in 5 seconds agent_timer_lock.Lock(); Context *cb = new AgentTimeoutCB(pg); agent_timer.add_event_after(cct->_conf->osd_agent_delay_time, cb); agent_timer_lock.Unlock(); } agent_lock.Lock(); } agent_lock.Unlock(); dout(10) << __func__ << " finish" << dendl; }

如果不再需要定时器,那么直接shutdown()就可以

agent_timer.shutdown();

最新回复(0)