Cache2k:Guava Cache及Caffeine之外的新選擇
序
本文主要研究一下cache2k這款新型緩存。
示例
Cache<String,String> cache = new Cache2kBuilder<String, String>() {}
.eternal(true)
.expireAfterWrite(5, TimeUnit.MINUTES) // expire/refresh after 5 minutes
.setupWith(UniversalResiliencePolicy::enable, b -> b // enable resilience policy
.resilienceDuration(30, TimeUnit.SECONDS) // cope with at most 30 seconds
// outage before propagating
// exceptions
)
.refreshAhead(true) // keep fresh when expiring
.loader(k -> expensiveOperation(k)) // auto populating function
.build();
常見(jiàn)問(wèn)題的解決方案
空值問(wèn)題
JCache規(guī)范不支持null,所以cache2k默認(rèn)也不支持,不過(guò)可以通過(guò)permitNullValues(true)來(lái)開(kāi)啟,這樣子緩存就可以存儲(chǔ)null值。
cache stampede問(wèn)題
又稱作cache miss storm,指的是高并發(fā)場(chǎng)景緩存同時(shí)失效導(dǎo)致大面積回源,cache2k采用的是block的請(qǐng)求方式,避免對(duì)同一個(gè)key并發(fā)回源。
org/cache2k/core/HeapCache.java。
protected Entry<K, V> getEntryInternal(K key, int hc, int val) {
if (loader == null) {
return peekEntryInternal(key, hc, val);
}
Entry<K, V> e;
for (;;) {
e = lookupOrNewEntry(key, hc, val);
if (e.hasFreshData(clock)) {
return e;
}
synchronized (e) {
e.waitForProcessing();
if (e.hasFreshData(clock)) {
return e;
}
if (e.isGone()) {
metrics.heapHitButNoRead();
metrics.goneSpin();
continue;
}
e.startProcessing(Entry.ProcessingState.LOAD, null);
break;
}
}
boolean finished = false;
try {
load(e);
finished = true;
} finally {
e.ensureAbort(finished);
}
if (e.getValueOrException() == null && isRejectNullValues()) {
return null;
}
return e;
}
同步回源造成的接口穩(wěn)定性問(wèn)題
cache2k提供了refreshAhead參數(shù),在新數(shù)據(jù)沒(méi)有拉取成功之前,過(guò)期數(shù)據(jù)仍然可以訪問(wèn),避免請(qǐng)求到來(lái)時(shí)發(fā)現(xiàn)數(shù)據(jù)過(guò)期觸發(fā)同步回源造成接口延時(shí)增大問(wèn)題。不過(guò)具體底層還依賴prefetchExecutor,如果refresh的時(shí)候沒(méi)有足夠的線程可以使用則會(huì)立馬過(guò)期,等待下次get出發(fā)同步回源。
org/cache2k/core/HeapCache.java。
public void timerEventRefresh(Entry<K, V> e, Object task) {
metrics.timerEvent();
synchronized (e) {
if (e.getTask() != task) { return; }
try {
refreshExecutor.execute(createFireAndForgetAction(e, Operations.SINGLETON.refresh));
} catch (RejectedExecutionException ex) {
metrics.refreshRejected();
expireOrScheduleFinalExpireEvent(e);
}
}
}
默認(rèn)的executor如下,采用的是SynchronousQueue隊(duì)列,可以通過(guò)builder自己去設(shè)置refreshExecutor
Executor provideDefaultLoaderExecutor(int threadCount) {
int corePoolThreadSize = 0;
return new ThreadPoolExecutor(corePoolThreadSize, threadCount,
21, TimeUnit.SECONDS,
new SynchronousQueue<>(),
threadFactoryProvider.newThreadFactory(getThreadNamePrefix()),
new ThreadPoolExecutor.AbortPolicy());
}
回源故障問(wèn)題
針對(duì)回源的下游出現(xiàn)故障的問(wèn)題,cache2k提供了ResiliencePolicy策略,其實(shí)現(xiàn)類為UniversalResiliencePolicy
當(dāng)load方法拋出異常且cache里頭還有數(shù)據(jù)的時(shí)候,異常不會(huì)拋給client,用當(dāng)前的數(shù)據(jù)返回,這里有個(gè)resilienceDuration時(shí)間,如果超過(guò)這個(gè)時(shí)間load方法還繼續(xù)拋出異常則異常會(huì)拋給client。如果沒(méi)有單獨(dú)設(shè)置resilienceDuration,則默認(rèn)取的是expiryAfterWrite時(shí)間。
org/cache2k/core/HeapCache.java。
private Object loadGotException(Entry<K, V> e, long t0, long t, Throwable wrappedException) {
ExceptionWrapper<K, V> exceptionWrapper =
new ExceptionWrapper(keyObjFromEntry(e), wrappedException, t0, e, exceptionPropagator);
long expiry = 0;
long refreshTime = 0;
boolean suppressException = false;
RefreshAheadPolicy.Context<Object> refreshCtx;
try {
if (e.isValidOrExpiredAndNoException()) {
expiry = timing.suppressExceptionUntil(e, exceptionWrapper);
}
if (expiry > t0) {
suppressException = true;
} else {
expiry = timing.cacheExceptionUntil(e, exceptionWrapper);
}
refreshCtx = getContext(e, t0, t, true, true, false, expiry);
refreshTime = timing.calculateRefreshTime(refreshCtx);
} catch (Exception ex) {
return resiliencePolicyException(e, t0, t, new ResiliencePolicyException(ex), null);
}
exceptionWrapper = new ExceptionWrapper<>(exceptionWrapper, Math.abs(expiry));
Object wrappedValue = exceptionWrapper;
if (expiry != 0) {
wrappedValue = timing.wrapLoadValueForRefresh(refreshCtx, e, exceptionWrapper);
}
Object loadResult;
synchronized (e) {
insertUpdateStats(e, (V) wrappedValue, t0, t, true, expiry, suppressException);
if (suppressException) {
e.setSuppressedLoadExceptionInformation(exceptionWrapper);
loadResult = e.getValueOrException();
} else {
if (isRecordModificationTime()) {
e.setModificationTime(t0);
}
e.setValueOrWrapper(exceptionWrapper);
loadResult = exceptionWrapper;
}
finishLoadOrEviction(e, expiry, refreshTime);
}
return loadResult;
}
這里timing.suppressExceptionUntil是委托給了ResiliencePolicy#suppressExceptionUntil。
cache2k-addon/src/main/java/org/cache2k/addon/UniversalResiliencePolicy.java。
public long suppressExceptionUntil(K key,
LoadExceptionInfo<K, V> loadExceptionInfo,
CacheEntry<K, V> cachedEntry) {
if (resilienceDuration == 0 || resilienceDuration == Long.MAX_VALUE) {
return resilienceDuration;
}
long maxSuppressUntil = loadExceptionInfo.getSinceTime() + resilienceDuration;
long deltaMs = calculateRetryDelta(loadExceptionInfo);
return Math.min(loadExceptionInfo.getLoadTime() + deltaMs, maxSuppressUntil);
}
UniversalResiliencePolicy還提供了異常重試的功能,重試間隔為retryInterval,如果沒(méi)有配置則為resilienceDuration的5%,采取的是指數(shù)退避的模式,factor為1.5。
小結(jié)
cache2k提供了Guava Cache及Caffeine沒(méi)有的ResiliencePolicy,針對(duì)C端高并發(fā)場(chǎng)景提供了容錯(cuò)的功能,值得借鑒一下。