diff options
Diffstat (limited to 'infrastructure/net.appjet.oui/stats.scala')
-rw-r--r-- | infrastructure/net.appjet.oui/stats.scala | 220 |
1 files changed, 220 insertions, 0 deletions
diff --git a/infrastructure/net.appjet.oui/stats.scala b/infrastructure/net.appjet.oui/stats.scala new file mode 100644 index 0000000..075182f --- /dev/null +++ b/infrastructure/net.appjet.oui/stats.scala @@ -0,0 +1,220 @@ +/** + * Copyright 2009 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS-IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.appjet.oui; + +import java.util.Date; + +import scala.collection.mutable.{HashMap, HashSet, Set, Map, ArrayBuffer}; +import scala.util.Sorting; + +trait BucketMap extends scala.collection.mutable.Map[int, BucketedLastHits] { + def t = 1000*60; + override def apply(s: int) = synchronized { getOrElseUpdate(s, new BucketedLastHits(t)) }; + def counts = { val p = this; new scala.collection.Map.Projection[int, int] { + def size = p.size; + def get(s: int) = p.get(s).map(_.count); + def elements = p.elements.map(o => (o._1, o._2.count)); + }}; +} + +abstract class BucketKeeper[A, B](val size: Long, val numbuckets: int, val noUpdate: Boolean) { + def this(size: Long, noUpdate: Boolean) = + this(size, Math.max(100, if (noUpdate) 1 else (size/60000).toInt), noUpdate) + def this(size: Long) = this(size, false); + + val buckets = new Array[A](numbuckets); + + val millisPerBucket = size/numbuckets; + var lastSwitch = System.currentTimeMillis(); + var currentBucket = 0; + + def withSyncUpdate[E](block: E): E = synchronized { + updateBuckets(); + block; + } + + protected def bucketAtTime(d: Date) = { + val msAgo = lastSwitch - d.getTime(); + val bucketsAgo = Math.floor(msAgo/millisPerBucket).asInstanceOf[Int]; + if (bucketsAgo < numbuckets) { + val bucket = (currentBucket - bucketsAgo + numbuckets) % numbuckets + // println("Applying to old bucket: "+bucket+" / current: "+currentBucket+", old count: "+count); + Some(bucket); + } else { + // println("No bucket found for: "+d); + None; + } + } + + protected def updateBuckets(): Unit = { + if (! noUpdate) { + val now = System.currentTimeMillis(); + while (now > lastSwitch + millisPerBucket) { + lastSwitch += millisPerBucket; + currentBucket = (currentBucket + 1) % numbuckets; + bucketClear(currentBucket); + } + } + } + + protected def bucketClear(index: Int); + protected def bucketsInOrder: Seq[A] = + buckets.slice((currentBucket+1)%numbuckets, numbuckets) ++ + buckets.slice(0, currentBucket) + + def mergeBuckets(b: Seq[A]): B; + + def history(bucketsPerSample: Int, numSamples: Int): Array[B] = withSyncUpdate { + val bseq = bucketsInOrder.reverse.take(bucketsPerSample*numSamples); + val sampleCount = Math.min(numSamples, bseq.length); + val samples = + for (i <- 0 until sampleCount) yield { + mergeBuckets(bseq.slice(i*bucketsPerSample, (i+1)*bucketsPerSample)); + } + samples.reverse.toArray; + } + def latest(bucketsPerSample: Int): B = history(bucketsPerSample, 1)(0); + def count: B = withSyncUpdate { mergeBuckets(buckets); } + + for (i <- 0 until numbuckets) { + bucketClear(i); + } +} + +class BucketedUniques(size: Long, noUpdate: Boolean) +extends BucketKeeper[Set[Any], Int](size, noUpdate) { + def this(size: Long) = this(size, false); + + override protected def bucketClear(index: Int): Unit = { + buckets(index) = new HashSet[Any]; + } + + override def mergeBuckets(b: Seq[Set[Any]]) = { + b.foldLeft(scala.collection.immutable.Set[Any]())(_ ++ _).size; + } + + def hit(d: Date, value: Any): Unit = withSyncUpdate { + for (bucket <- bucketAtTime(d)) { + buckets(bucket) += value; + } + } +} + +class BucketedValueCounts(size: Long, noUpdate: Boolean) +extends BucketKeeper[HashMap[String, Int], (Int, Map[String, Int])](size, noUpdate) { + def this(size: Long) = this(size, false); + + override protected def bucketClear(index: Int): Unit = { + buckets(index) = new HashMap[String, Int]; + } + + override def mergeBuckets(b: Seq[HashMap[String, Int]]) = { + val out = new HashMap[String, Int]; + var total = 0; + for (m <- b) { + for ((k, v) <- m) { + out(k) = out.getOrElse(k, 0) + v; + total += v; + } + } + (total, out); + } + + def hit(d: Date, value: String, increment: Int): Unit = withSyncUpdate { + for (bucket <- bucketAtTime(d)) { + buckets(bucket)(value) = + buckets(bucket).getOrElse(value, 0)+increment; + } + } + + def hit(d: Date, value: String): Unit = hit(d, value, 1); +} + + +/** + * Keeps track of how many "hits" in the last size milliseconds. + * Has granularity speicified by numbuckets. + */ +class BucketedLastHits(size: Long, noUpdate: Boolean) +extends BucketKeeper[Int, Int](size, noUpdate) { + def this(size: Long) = this(size, false); + + override protected def bucketClear(index: int): Unit = { + buckets(index) = 0; + } + + override def mergeBuckets(b: Seq[Int]) = { + b.foldRight(0)(_+_); + } + + def hit(d: Date): Unit = hit(d, 1); + def hit(d: Date, n: Int): Unit = withSyncUpdate { + for (bucket <- bucketAtTime(d)) { + buckets(bucket) = buckets(bucket) + n; + } + } +} + +class BucketedLastHitsHistogram(size: Long, noUpdate: Boolean) +extends BucketKeeper[ArrayBuffer[Int], Function1[Float, Int]](size, noUpdate) { + def this(size: Long) = this(size, false); + + override protected def bucketClear(index: Int): Unit = { + buckets(index) = new ArrayBuffer[Int]; + } + + // elements will end up sorted. + protected def histogramFunction(elements: Array[Int]): Function1[Float, Int] = { + Sorting.quickSort(elements); + (percentile: Float) => { + if (elements.length == 0) { + 0 + } else { + elements( + Math.round(percentile/100.0f*(elements.length-1))); + } + } + } + + override def mergeBuckets(b: Seq[ArrayBuffer[Int]]) = { + val elements = new Array[Int](b.foldRight(0)(_.size + _)); + var currentIndex = 0; + for (bucket <- b if bucket.length > 0) { + // copyToArray is broken through scala 2.7.5, fixed in trunk. + // bucket.copyToArray(allElements, currentIndex); + val bucketArray = bucket.toArray; + System.arraycopy(bucketArray, 0, elements, currentIndex, bucketArray.length); + currentIndex += bucket.size + } + histogramFunction(elements); + } + + def hit(d: Date): Unit = hit(d, 1); + def hit(d: Date, n: Int): Unit = withSyncUpdate { + for (bucket <- bucketAtTime(d)) { + buckets(bucket) += n; + } + } +} + +object appstats { + val minutelyStatus = new HashMap[int, BucketedLastHits] with BucketMap; + val hourlyStatus = new HashMap[int, BucketedLastHits] with BucketMap { override val t = 1000*60*60 }; + val dailyStatus = new HashMap[int, BucketedLastHits] with BucketMap { override val t = 1000*60*60*24 }; + val weeklyStatus = new HashMap[int, BucketedLastHits] with BucketMap { override val t = 1000*60*60*24*7 }; + val stati = Array(minutelyStatus, hourlyStatus, dailyStatus, weeklyStatus); +} |