1 // Copyright 2012 Google Inc. All Rights Reserved.
3 package com
.google
.appengine
.api
.datastore
;
5 import static com
.google
.appengine
.api
.datastore
.FetchOptions
.Builder
.withLimit
;
7 import com
.google
.appengine
.api
.utils
.SystemProperty
;
8 import com
.google
.apphosting
.api
.ApiProxy
.OverQuotaException
;
9 import com
.google
.apphosting
.api
.DeadlineExceededException
;
10 import com
.google
.common
.base
.Preconditions
;
11 import com
.google
.common
.base
.Ticker
;
12 import com
.google
.common
.collect
.Lists
;
13 import com
.google
.common
.collect
.Sets
;
15 import java
.util
.Collection
;
16 import java
.util
.Date
;
17 import java
.util
.Iterator
;
18 import java
.util
.LinkedHashMap
;
19 import java
.util
.List
;
22 import java
.util
.concurrent
.TimeUnit
;
23 import java
.util
.logging
.Level
;
24 import java
.util
.logging
.Logger
;
27 * This class is used to log usages of indexes that have been selected for usage monitoring
28 * by the application administrator.
31 class MonitoredIndexUsageTracker
{
33 private static final int DEFAULT_USAGE_REFRESH_PERIOD_SECS
= 60;
34 private static final double DEFAULT_REFRESH_QUERY_DEADLINE_SECS
= 0.2;
35 private static final double DEFAULT_PUT_DEADLINE_SECS
= 0.2;
36 private static final String
[] DEFAULT_API_PACKAGE_PREFIXES
=
37 {"com.google.appengine", "com.google.apphosting", "org.datanucleus"};
38 static final String REFRESH_PERIOD_SECS_SYS_PROP
=
39 "appengine.datastore.indexMonitoring.persistedUsageRefreshPeriodSecs";
40 private static final String REFRESH_QUERY_DEADLINE_SECS_SYS_PROP
=
41 "appengine.datastore.indexMonitoring.refreshUsageInfoQueryDeadlineSecs";
42 private static final String PUT_DEADLINE_SECS_SYS_PROP
=
43 "appengine.datastore.indexMonitoring.newUsagePutDeadlineSecs";
44 private static final String PACKAGE_PREFIXES_TO_SKIP_SYS_PROP
=
45 "appengine.datastore.apiPackagePrefixes";
46 private static final String NEW_USAGE_LOGGING_THRESHOLD_SECS_SYS_PROP
=
47 "appengine.datastore.indexMonitoring.newUsageLoggingThresholdSecs";
49 static final int REFRESH_QUERY_FAILURE_LOGGING_THRESHOLD
= 10;
50 private static final int MAX_MONITORED_INDEXES
= 100;
51 private static final int MAX_TRACKED_USAGES_PER_INDEX
= 30;
52 private static final int MAX_STACK_FRAMES_SAVED
= 200;
54 private static final String USAGE_ENTITY_KIND_PREFIX
= "_ah_datastore_monitored_index_";
55 private static final String USAGE_ENTITY_QUERY_PROPERTY
= "query";
56 private static final String USAGE_ENTITY_CAPTURE_TIME_PROPERTY
=
57 "diagnosticCaptureDurationNanos";
58 private static final String USAGE_ENTITY_OCCURRENCE_TIME_PROPERTY
= "occurrenceTime";
59 private static final String USAGE_ENTITY_STACK_TRACE_PROPERTY
= "stackTrace";
60 private static final String USAGE_ENTITY_APP_VERSION_PROPERTY
= "appVersion";
62 static Logger logger
= Logger
.getLogger(MonitoredIndexUsageTracker
.class.getName());
63 private final int maxUsagesTrackedPerIndex
;
64 private final UsageIdCache perIndexUsageIds
;
65 private final Ticker ticker
;
66 private final long usageRefreshPeriodNanos
;
67 private final double refreshQueryDeadlineSecs
;
68 private final double putDeadlineSecs
;
69 private final long newUsageLoggingThresholdNanos
;
70 private final PrefixTrie
<Boolean
> apiPackagePrefixTrie
;
72 MonitoredIndexUsageTracker() {
73 this(MAX_MONITORED_INDEXES
, MAX_TRACKED_USAGES_PER_INDEX
, Ticker
.systemTicker());
76 MonitoredIndexUsageTracker(int maxIndexesTracked
, int maxUsagesPerIndex
, Ticker ticker
) {
77 this.maxUsagesTrackedPerIndex
= maxUsagesPerIndex
;
79 usageRefreshPeriodNanos
= getUsageRefreshPeriodNanos();
80 refreshQueryDeadlineSecs
= getRefreshQueryDeadlineSecs();
81 putDeadlineSecs
= getPutDeadlineSecs();
82 newUsageLoggingThresholdNanos
= getNewUsageLoggingThresholdNanos();
83 perIndexUsageIds
= new UsageIdCache(maxIndexesTracked
);
84 apiPackagePrefixTrie
= getApiPackagePrefixesTrie();
88 * Log that the specified {@code monitoredIndexes} were used to execute the {@code query}.
90 public void addNewUsage(Collection
<Index
> monitoredIndexes
, Query query
) {
91 Preconditions
.checkNotNull(monitoredIndexes
);
92 Preconditions
.checkNotNull(query
);
94 long methodInvocationTimeNanos
= ticker
.read();
95 Date occurenceDate
= newDate();
96 LazyApiInvokerStackTrace lazyStackTrace
= new LazyApiInvokerStackTrace();
98 List
<ExpiringPersistedUsageIds
> usageIdsPerIndex
=
99 Lists
.newArrayListWithExpectedSize(monitoredIndexes
.size());
100 for (Index index
: monitoredIndexes
) {
101 usageIdsPerIndex
.add(perIndexUsageIds
.get(index
.getId()));
104 List
<Entity
> newUsagesToPersist
= Lists
.newArrayList();
105 Iterator
<ExpiringPersistedUsageIds
> usageIdsPerIndexIter
= usageIdsPerIndex
.iterator();
106 for (Index index
: monitoredIndexes
) {
107 PersistedUsageIds persistedUsageIds
= usageIdsPerIndexIter
.next().get();
108 if (persistedUsageIds
.addNewUsage(getUsageEntityKeyName(query
))) {
109 newUsagesToPersist
.add(newUsageEntity(index
, query
, occurenceDate
, lazyStackTrace
));
113 if (!newUsagesToPersist
.isEmpty()) {
114 persistNewUsages(newUsagesToPersist
);
117 long methodElapsedTimeNanos
= ticker
.read() - methodInvocationTimeNanos
;
118 if (methodElapsedTimeNanos
> newUsageLoggingThresholdNanos
) {
119 long elapsedTimeSecs
= methodElapsedTimeNanos
/ (1000 * 1000 * 1000);
120 long elapsedTimeRemNanos
= methodElapsedTimeNanos
% (1000 * 1000 * 1000);
121 logger
.severe(String
.format(
122 "WARNING: tracking usage of monitored indexes took %d.%09d secs",
123 elapsedTimeSecs
, elapsedTimeRemNanos
));
127 void persistNewUsages(List
<Entity
> newUsagesToPersist
) {
128 AsyncDatastoreService asyncDatastore
= newAsyncDatastoreService(putDeadlineSecs
);
130 asyncDatastore
.put((Transaction
) null, newUsagesToPersist
);
131 } catch (RuntimeException e
) {
132 logger
.log(Level
.SEVERE
,
133 String
.format("Failed to record monitored index usage: %s",
134 newUsagesToPersist
.get(0).toString()),
139 static class QueryAndFetchOptions
{
141 final FetchOptions fetchOptions
;
143 QueryAndFetchOptions(Query query
, FetchOptions fetchOptions
) {
145 this.fetchOptions
= fetchOptions
;
149 QueryAndFetchOptions
getPersistedUsageRefreshQuery(Long indexId
) {
150 Query refreshQuery
= new Query(getUsageEntityKind(indexId
));
151 refreshQuery
.setKeysOnly();
152 return new QueryAndFetchOptions(refreshQuery
, withLimit(maxUsagesTrackedPerIndex
));
155 private static String
getUsageEntityKind(long compositeIndexId
) {
156 return USAGE_ENTITY_KIND_PREFIX
+ compositeIndexId
;
159 private static String
getUsageEntityKeyName(Query query
) {
160 return Integer
.toString(query
.hashCodeNoFilterValues());
163 Entity
newUsageEntity(Index index
, Query query
, Date occurenceTime
,
164 LazyApiInvokerStackTrace lazyStackTrace
) {
165 String kind
= getUsageEntityKind(index
.getId());
166 Key key
= KeyFactory
.createKey(kind
, getUsageEntityKeyName(query
));
167 StackTraceInfo stackTraceInfo
= lazyStackTrace
.get();
169 Entity entity
= new Entity(key
);
170 entity
.setProperty(USAGE_ENTITY_QUERY_PROPERTY
, new Text(query
.toString()));
171 entity
.setProperty(USAGE_ENTITY_CAPTURE_TIME_PROPERTY
, stackTraceInfo
.captureTimeNanos
);
172 entity
.setProperty(USAGE_ENTITY_OCCURRENCE_TIME_PROPERTY
, occurenceTime
);
173 entity
.setProperty(USAGE_ENTITY_STACK_TRACE_PROPERTY
, new Text(stackTraceInfo
.stackTrace
));
174 entity
.setProperty(USAGE_ENTITY_APP_VERSION_PROPERTY
, SystemProperty
.applicationVersion
.get());
178 AsyncDatastoreService
newAsyncDatastoreService(double deadlineSecs
) {
179 return DatastoreServiceFactory
.getAsyncDatastoreService(
180 DatastoreServiceConfig
.Builder
181 .withDatastoreCallbacks(DatastoreCallbacks
.NoOpDatastoreCallbacks
.INSTANCE
)
182 .deadline(deadlineSecs
));
190 * This class caches usage ids on a per index basis.
192 * Note that initially {@code CacheBuilder} was used to implement this cache, but unfortunately
193 * the datastore api library can not currently depend on {@code com.google.common.cache}.
195 private class UsageIdCache
{
196 final UsageIdCacheMap usageIdMap
;
198 private UsageIdCache(final int capacity
) {
199 usageIdMap
= new UsageIdCacheMap(capacity
);
202 private synchronized ExpiringPersistedUsageIds
get(Long indexId
) {
203 ExpiringPersistedUsageIds usageIds
= usageIdMap
.get(indexId
);
204 if ((usageIds
== null) || (usageIds
.isExpired())) {
205 usageIds
= new ExpiringPersistedUsageIds(indexId
, usageIds
);
206 usageIdMap
.put(indexId
, usageIds
);
213 * This class contains the core datastructure for the {@code UsageIdCache}.
214 * It extends from LinkedHashMap which provides a bounded HashMap. It sorts the contents by
215 * LRU access order (get() and put() count as accesses). Entries that are infrequently
216 * accessed are the first to be evicted when the size bound for the map is exceeded.
218 * This class had to be named (vs. being an anonymous class) because {@code SerializationTest}
219 * requires every serializable object (anonymous or otherwise) to have a golden file.
220 * Note that this class is never actually serialized. Therefore it's okay to break
221 * serialization compatibility in the future if needed.
223 static class UsageIdCacheMap
extends LinkedHashMap
<Long
, ExpiringPersistedUsageIds
> {
224 static final long serialVersionUID
= -5010587885037930115L;
225 private static final int DEFAULT_INITIAL_CAPACITY
= 16;
226 private static final float DEFAULT_LOAD_FACTOR
= 0.75f
;
227 private static final boolean SORT_ELEMENTS_BY_ACCESS_ORDER
= true;
229 private final int capacity
;
231 UsageIdCacheMap(int capacity
) {
232 super(DEFAULT_INITIAL_CAPACITY
, DEFAULT_LOAD_FACTOR
, SORT_ELEMENTS_BY_ACCESS_ORDER
);
233 this.capacity
= capacity
;
237 protected boolean removeEldestEntry(Map
.Entry
<Long
, ExpiringPersistedUsageIds
> eldest
) {
238 return size() > capacity
;
243 * This class gets an up to date snapshot of the persisted usage ids for a monitored
246 private class ExpiringPersistedUsageIds
{
247 private final Long creationTimeNanos
;
248 private final Thread usageLoaderThread
;
250 private volatile int numContiguousRefreshQueryFailures
;
252 private volatile PersistedUsageIds usageIds
;
254 private Iterable
<Entity
> refreshQueryEntities
;
257 * Constructs a new expiring persisted usage id set. The contents of the set are filled in
258 * by the thread invoking the constructor. In the constructor an asynchronous query is issued
259 * to fetch the set of usages that are currently persisted. The results for the query
260 * are reaped when the thread invoking the constructor calls {@link #get}.
262 * @param indexId the id of the index for which to load usage information.
263 * @param prevExpiringUsageIds the prior usage id set associated with {@code indexId}.
264 * If NULL this is the first time usage information is being loaded by this clone for the
267 private ExpiringPersistedUsageIds(Long indexId
, ExpiringPersistedUsageIds prevExpiringUsageIds
) {
268 this.creationTimeNanos
= ticker
.read();
269 this.usageLoaderThread
= Thread
.currentThread();
271 if (prevExpiringUsageIds
!= null) {
272 this.usageIds
= prevExpiringUsageIds
.usageIds
;
273 numContiguousRefreshQueryFailures
= prevExpiringUsageIds
.numContiguousRefreshQueryFailures
;
275 usageIds
= PersistedUsageIds
.TOMBSTONE_INSTANCE
;
276 numContiguousRefreshQueryFailures
= 0;
279 QueryAndFetchOptions refreshQuery
= getPersistedUsageRefreshQuery(indexId
);
280 AsyncDatastoreService asyncDatastore
= newAsyncDatastoreService(refreshQueryDeadlineSecs
);
281 PreparedQuery refreshPQ
= asyncDatastore
.prepare(refreshQuery
.query
);
283 refreshQueryEntities
= null;
285 refreshQueryEntities
= refreshPQ
.asIterable(refreshQuery
.fetchOptions
);
286 } catch (RuntimeException e
) {
287 numContiguousRefreshQueryFailures
++;
288 logger
.log(Level
.SEVERE
,
289 String
.format("Failed to query existing monitored index usages: %s",
290 refreshPQ
.toString()),
295 private PersistedUsageIds
get() {
296 if ((usageLoaderThread
!= Thread
.currentThread())
297 || (null == refreshQueryEntities
)) {
301 boolean logException
= false;
302 Throwable throwable
= null;
304 PersistedUsageIds existingKeys
= new PersistedUsageIds(maxUsagesTrackedPerIndex
);
305 for (Entity persistedEntity
: refreshQueryEntities
) {
306 existingKeys
.addNewUsage(persistedEntity
.getKey().getName());
308 usageIds
= existingKeys
;
309 numContiguousRefreshQueryFailures
= 0;
310 } catch (OverQuotaException e
) {
312 } catch (DeadlineExceededException e
) {
314 } catch (DatastoreTimeoutException e
) {
316 } catch (DatastoreFailureException e
) {
318 } catch (RuntimeException e
) {
323 if (throwable
!= null) {
324 numContiguousRefreshQueryFailures
++;
326 if ((numContiguousRefreshQueryFailures
% REFRESH_QUERY_FAILURE_LOGGING_THRESHOLD
) == 0) {
331 logger
.log(Level
.SEVERE
, "Failed to query existing monitored index usage information",
336 refreshQueryEntities
= null;
341 private boolean isExpired() {
342 return (ticker
.read() - creationTimeNanos
) > usageRefreshPeriodNanos
;
347 * This class holds the set of persisted usage ids for a monitored index. Call
348 * {@link #addNewUsage} to conditionally add an element to the persisted usage set. Only
349 * items that are added to the set should be persisted by the caller.
351 private static class PersistedUsageIds
{
353 * The tombstone instance always returns FALSE from {@link #addNewUsage}. It is used
354 * to prevent the persistence of new usages for an index.
356 private static final PersistedUsageIds TOMBSTONE_INSTANCE
= new PersistedUsageIds(0);
358 private final Set
<String
> persistedIds
;
359 private final int maxIdsPersisted
;
361 PersistedUsageIds(int maxIdsPersisted
) {
362 persistedIds
= Sets
.newHashSetWithExpectedSize(maxIdsPersisted
);
363 this.maxIdsPersisted
= maxIdsPersisted
;
367 * @return {@code true} if the {@code usageId} was added to the set and should be persisted
370 synchronized boolean addNewUsage(String usageId
) {
371 if (persistedIds
.size() >= maxIdsPersisted
) {
374 return persistedIds
.add(usageId
);
378 private static long getUsageRefreshPeriodNanos() {
379 String usageRefreshPeriodSecsStr
= System
.getProperty(REFRESH_PERIOD_SECS_SYS_PROP
);
380 if (usageRefreshPeriodSecsStr
!= null) {
381 double usageRefreshPeriodSecs
= Double
.parseDouble(usageRefreshPeriodSecsStr
);
382 return (long) (usageRefreshPeriodSecs
* 1000 * 1000 * 1000);
384 return TimeUnit
.SECONDS
.toNanos(DEFAULT_USAGE_REFRESH_PERIOD_SECS
);
388 private static double getRefreshQueryDeadlineSecs() {
389 String refreshDeadlineSecsStr
= System
.getProperty(REFRESH_QUERY_DEADLINE_SECS_SYS_PROP
);
390 if (refreshDeadlineSecsStr
!= null) {
391 return Double
.parseDouble(refreshDeadlineSecsStr
);
393 return DEFAULT_REFRESH_QUERY_DEADLINE_SECS
;
397 private static double getPutDeadlineSecs() {
398 String putDeadlineSecsStr
= System
.getProperty(PUT_DEADLINE_SECS_SYS_PROP
);
399 if (putDeadlineSecsStr
!= null) {
400 return Double
.parseDouble(putDeadlineSecsStr
);
402 return DEFAULT_PUT_DEADLINE_SECS
;
406 private static long getNewUsageLoggingThresholdNanos() {
407 String newUsageLoggingThreshSecsStr
=
408 System
.getProperty(NEW_USAGE_LOGGING_THRESHOLD_SECS_SYS_PROP
);
409 if (newUsageLoggingThreshSecsStr
!= null) {
410 double newUsageLoggingThreshSecs
= Double
.parseDouble(newUsageLoggingThreshSecsStr
);
411 return (long) (newUsageLoggingThreshSecs
* 1000 * 1000 * 1000);
413 return Long
.MAX_VALUE
;
418 private static class StackTraceInfo
{
419 final String stackTrace
;
420 final long captureTimeNanos
;
422 StackTraceInfo(String stackTrace
, long captureTimeNanos
) {
423 this.stackTrace
= stackTrace
;
424 this.captureTimeNanos
= captureTimeNanos
;
429 * This class lazily captures the stack trace on the first call to {@link #get} and
430 * returns the same stack trace on future calls to {@code get}. The purpose of this class is to
431 * avoid any repeated overhead due to capturing the stack trace multiple times in the
432 * same api invocation.
434 class LazyApiInvokerStackTrace
{
435 private StackTraceInfo stackTraceInfo
;
438 * @return a {@code Pair} containing the stack trace string and the time it took to capture the
439 * stack trace in nanoseconds.
441 private StackTraceInfo
get() {
442 if (stackTraceInfo
== null) {
443 long startNs
= ticker
.read();
444 String stackTrace
= getApiInvokerStackTrace(apiPackagePrefixTrie
);
445 long captureTimeNanos
= ticker
.read() - startNs
;
446 stackTraceInfo
= new StackTraceInfo(stackTrace
, captureTimeNanos
);
448 return stackTraceInfo
;
452 private static PrefixTrie
<Boolean
> getApiPackagePrefixesTrie() {
453 PrefixTrie
<Boolean
> prefixTrie
= new PrefixTrie
<Boolean
>();
455 for (String apiPrefix
: DEFAULT_API_PACKAGE_PREFIXES
) {
456 prefixTrie
.put(apiPrefix
, Boolean
.TRUE
);
459 String sysPropApiPrefixesString
= System
.getProperty(PACKAGE_PREFIXES_TO_SKIP_SYS_PROP
);
460 if (sysPropApiPrefixesString
!= null) {
461 String
[] sysPropApiPrefixes
= sysPropApiPrefixesString
.split("\\,");
462 for (String apiPrefix
: sysPropApiPrefixes
) {
463 prefixTrie
.put(apiPrefix
, Boolean
.TRUE
);
471 * Return the stack trace of the invoker excluding stack frames from the top of the stack (most
472 * recently invoked) that have a package prefix that matches a prefix in
473 * {@code apiPackagePrefixes}. At max {@code MAX_STACK_FRAMES_SAVED} stack frames are
476 private static String
getApiInvokerStackTrace(
477 PrefixTrie
<Boolean
> apiPackagePrefixes
) {
478 StackTraceElement
[] stack
= Thread
.currentThread().getStackTrace();
480 for (frameIdx
= 1; frameIdx
< stack
.length
; frameIdx
++) {
481 String className
= stack
[frameIdx
].getClassName();
482 if (apiPackagePrefixes
.get(className
) == null) {
487 if (frameIdx
>= stack
.length
) {
491 int numFrames
= stack
.length
- frameIdx
;
492 numFrames
= Math
.min(numFrames
, MAX_STACK_FRAMES_SAVED
);
493 StringBuilder sb
= new StringBuilder();
494 for (; frameIdx
< stack
.length
; frameIdx
++) {
495 sb
.append(stack
[frameIdx
]);
498 if (sb
.length() > 0) {
499 sb
.deleteCharAt(sb
.length() - 1);
501 return sb
.toString();