storm定时器与java.util.Timer定时器比较相似。java.util.Timer定时器实际上是个线程,定时调度所拥有的TimerTasks;storm定时器也有一个线程负责调度所拥有的"定时任务"。storm定时器的"定时任务"是一个vector类型的数据[time, callback, uuid],内有会有三个值,分别是时间、函数、和uuid,很好理解,时间表示该定时任务什么时候执行,函数表示要执行的函数,uuid用于标识该"定时任务"。"定时任务"被存放到定时器的PriorityQueue队列中(和PriorityBlockingQueue区别,在于没有阻塞机制,不是线程安全的)。优先级队列是堆数据结构的典型应用,如果不提供Comparator的话,优先队列中元素默认按自然顺序排列,也就是数字默认是小的在队列头,字符串则按字典序排列(参阅 Comparable),也可以根据 Comparator 来指定,这取决于使用哪种构造方法。优先级队列不允许null元素。依靠自然排序的优先级队列还不允许插入不可比较的对象(这样做可能导致 ClassCastException)。当然也可以自己重新实现Comparator接口, 比如storm定时器就用reify重新实现了Comparator接口。storm定时器的执行过程比较简单,通过timer-thread,不断检查PriorityQueue里面时间最小的"定时任务"是否已经可以触发了, 如果可以(当前时间>=执行时间),就poll出来,调用callback,并sleep。storm定时器相关的函数均定义在timer.clj文件中,storm定时器是由mk-timer函数创建的,mk-timer函数定义如下:
mk-timer函数
;; kill-fn函数会在timer-thread发生exception的时候被调用,timer-name标识定时器的名称
(
defnk
mk-timer
[
:kill-fn (
fn
[
&
_
] )
:timer-name
nil
]
;; queue绑定PriorityQueue队列,创建PriorityQueue队列时指定队列初始容量为10,并指定一个Comparator比较器,Comparator比较器比较"定时任务"执行时间的大小,这样每次poll出执行时间最小的"定 时任务",
PriorityQueue队列是一个依赖执行时间的小顶堆
(
let
[
queue (
PriorityQueue.
10 (
reify
Comparator
(
compare
[
this
o1
o2
]
(
- (
first
o1) (
first
o2)))
(
equals
[
this
obj
]
true)))
;; active标识timer-thread是"active"的
active (
atom
true)
;; 创建一个锁,因为PriorityQueue并不是线程安全的,所以通过这个锁,可以使多线程互斥访问PriorityQueue
lock (
Object.)
;; notifier是一个java信号量,初始值为0,notifier信号量的主要功能就是当我们调用cancel-timer函数中断一个timer-thread时,等待timer-thread结束,当timer-thread结束前会release ;; notifier信号量
notifier (
Semaphore.
0)
;; thread-name绑定timer-thread线程名,没有指定时默认为"timer"
thread-name (
if
timer-name
timer-name
"timer")
;; timer-thread线程
timer-thread (
Thread.
(
fn
[]
;; 当timer-thread为"active"即active=true时,进入while循环
(
while
@
active
(
try
;; peek函数从PriorityQueue获取执行时间最小的"定时任务",但并不出队列。time-millis绑定执行时间,elem绑定"定时任务"数据
(
let
[[
time-millis
_
_
:as
elem
] (
locking
lock (
.peek
queue
))]
;; 如果elem不为nil且当前时间>=执行时间,那么先加锁,然后poll出该"定时任务",并将"定时任务"的callback函数绑定到afn,最后调用该函数;否则判断time-millis ;; 是否为nil
;; 我们可以发现该定时器是软时间执行"定时任务"的,也就是说"定时任务"有可能被延迟执行,同时如果afn函数执行时间比较长,那么会影响下一个"定时任务"的执行
(
if (
and
elem (
>= (
current-time-millis)
time-millis))
;; It is imperative to not run the function
;; inside the timer lock. Otherwise, it is
;; possible to deadlock if the fn deals with
;; other locks, like the submit lock.
(
let
[
afn (
locking
lock (
second (
.poll
queue
)))]
;; 执行"定时任务"的callback函数
(
afn))
;; 该if语句是上面if语句的else分支,判断time-millis是否为nil,如果time-millis不为nil,则timer-thread线程sleep(执行时间-当前时间);否则 ;; sleep(1000),表明PriorityQueue中没有"定时任务"
(
if
time-millis
;; If any events are scheduled, sleep until
;; event generation. If any recurring events
;; are scheduled then we will always go
;; through this branch, sleeping only the
;; exact necessary amount of time.
(
Time/sleep (
-
time-millis (
current-time-millis)))
;; Otherwise poll to see if any new event
;; was scheduled. This is, in essence, the
;; response time for detecting any new event
;; schedulings when there are no scheduled
;; events.
(
Time/sleep
1000))))
(
catch
Throwable
t
;; Because the interrupted exception can be
;; wrapped in a RuntimeException.
;; 检查是否是InterruptedException,如果是InterruptedException,说明线程是由于接收interrupt信号而中断的,不做异常处理,否则调用kill-fn函数、修改线程状 ;; 态并抛出该异常
(
when-not (
exception-cause?
InterruptedException
t)
(
kill-fn
t)
(
reset!
active
false)
(
throw
t)))))
;; release notifier信号量,标识timer—thread运行结束
(
.release
notifier))
thread-name
)]
;; 设置timer-thread为守护线程
(
.setDaemon
timer-thread
true)
;; 设置timer-thread为最高优先级
(
.setPriority
timer-thread
Thread/MAX_PRIORITY)
;; 启动timer-thread线程
(
.start
timer-thread)
;; 返回该定时器的"属性"
{
:timer-thread
timer-thread
:queue
queue
:active
active
:lock
lock
:cancel-notifier
notifier
}))
我们可以通过调用cancel-timer函数中断一个timer-thread线程,cancel-timer函数定义如下:
cancel-timer函数
(
defn
cancel-timer
[
timer
]
;; 检查timer状态是否是"active",如果不是则抛出异常
(
check-active!
timer)
;; 加锁
(
locking (
:lock
timer)
;; 将timer的状态active设置成false,即"dead"
(
reset! (
:active
timer)
false)
;; 调用interrupt方法,中断线程,通过mk-timer函数我们可以知道在线程的run方法内调用了sleep方法,当接收到中断新号后会抛出InterruptedException异常使线程退出
(
.interrupt (
:timer-thread
timer)))
;; acquire timer中的notifier信号量,因为只有当线程结束前才会release notifier信号量,所以此处是等待线程结束
(
.acquire (
:cancel-notifier
timer)))
check-active!函数定义如下:
check-active!函数
(
defn-
check-active!
[
timer
]
(
when-not
@(
:active
timer)
(
throw (
IllegalStateException.
"Timer is not active"))))
通过调用schedule函数和schedule-recurring函数我们可以向storm定时器中添加"定时任务"。schedule函数定义如下:
schedule函数
(
defnk
schedule
;; timer绑定定时器,delay-secs绑定"定时任务"相对当前时间的延迟时间,afn绑定callback函数,check-active是否需要检查定时器
[
timer
delay-secs
afn
:check-active
true
]
;; 检查定时器状态
(
when
check-active (
check-active!
timer))
(
let
[
id (
uuid)
^
PriorityQueue
queue (
:queue
timer
)]
;; 加锁,执行时间=当前时间+延迟时间,将"定时任务"的vector类型数据添加到PriorityQueue队列中
(
locking (
:lock
timer)
(
.add
queue
[(
+ (
current-time-millis) (
secs-to-millis-long
delay-secs))
afn
id
]))))
schedule-recurring函数定义如下:schedule-recurring函数也很简单,与schedule函数的区别就是在"定时任务"的callback函数中又添加了一个相同的"定时任务"。schedule函数的语义可以理解成向定时器添加
一个"一次性任务",schedule-recurring函数的语义可以理解成向定时器添加"一个周期执行的定时任务"(开始执行时间=当前时间+延迟时间,然后每隔recur-secs执行一次),
schedule-recurring函数
(
defn
schedule-recurring
[
timer
delay-secs
recur-secs
afn
]
(
schedule
timer
delay-secs
(
fn
this
[]
(
afn)
; This avoids a race condition with cancel-timer.
(
schedule
timer
recur-secs
this
:check-active
false))))
nimbus检查心跳和重分配任务的实现就是通过schedule-recurring函数向storm定时器添加了一个"周期任务"实现的。
(
schedule-recurring (
:timer
nimbus)
0
(
conf
NIMBUS-MONITOR-FREQ-SECS)
(
fn
[]
(
when (
conf
NIMBUS-REASSIGN)
(
locking (
:submit-lock
nimbus)
(
mk-assignments
nimbus)))
(
do-cleanup
nimbus)
))
转载于:https://www.cnblogs.com/ierbar0604/p/3948558.html
相关资源:各显卡算力对照表!