/**
* Copyright 2009 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS-IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.appjet.oui;
import java.util.Date;
import scala.collection.mutable.{HashMap, HashSet, Set, Map, ArrayBuffer};
import scala.util.Sorting;
trait BucketMap extends scala.collection.mutable.Map[int, BucketedLastHits] {
def t = 1000*60;
override def apply(s: int) = synchronized { getOrElseUpdate(s, new BucketedLastHits(t)) };
def counts = { val p = this; new scala.collection.Map.Projection[int, int] {
def size = p.size;
def get(s: int) = p.get(s).map(_.count);
def elements = p.elements.map(o => (o._1, o._2.count));
}};
}
abstract class BucketKeeper[A, B](val size: Long, val numbuckets: int, val noUpdate: Boolean) {
def this(size: Long, noUpdate: Boolean) =
this(size, Math.max(100, if (noUpdate) 1 else (size/60000).toInt), noUpdate)
def this(size: Long) = this(size, false);
val buckets = new Array[A](numbuckets);
val millisPerBucket = size/numbuckets;
var lastSwitch = System.currentTimeMillis();
var currentBucket = 0;
def withSyncUpdate[E](block: E): E = synchronized {
updateBuckets();
block;
}
protected def bucketAtTime(d: Date) = {
val msAgo = lastSwitch - d.getTime();
val bucketsAgo = Math.floor(msAgo/millisPerBucket).asInstanceOf[Int];
if (bucketsAgo < numbuckets) {
val bucket = (currentBucket - bucketsAgo + numbuckets) % numbuckets
// println("Applying to old bucket: "+bucket+" / current: "+currentBucket+", old count: "+count);
Some(bucket);
} else {
// println("No bucket found for: "+d);
None;
}
}
protected def updateBuckets(): Unit = {
if (! noUpdate) {
val now = System.currentTimeMillis();
while (now > lastSwitch + millisPerBucket) {
lastSwitch += millisPerBucket;
currentBucket = (currentBucket + 1) % numbuckets;
bucketClear(currentBucket);
}
}
}
protected def bucketClear(index: Int);
protected def bucketsInOrder: Seq[A] =
buckets.slice((currentBucket+1)%numbuckets, numbuckets) ++
buckets.slice(0, currentBucket)
def mergeBuckets(b: Seq[A]): B;
def history(bucketsPerSample: Int, numSamples: Int): Array[B] = withSyncUpdate {
val bseq = bucketsInOrder.reverse.take(bucketsPerSample*numSamples);
val sampleCount = Math.min(numSamples, bseq.length);
val samples =
for (i <- 0 until sampleCount) yield {
mergeBuckets(bseq.slice(i*bucketsPerSample, (i+1)*bucketsPerSample));
}
samples.reverse.toArray;
}
def latest(bucketsPerSample: Int): B = history(bucketsPerSample, 1)(0);
def count: B = withSyncUpdate { mergeBuckets(buckets); }
for (i <- 0 until numbuckets) {
bucketClear(i);
}
}
class BucketedUniques(size: Long, noUpdate: Boolean)
extends BucketKeeper[Set[Any], Int](size, noUpdate) {
def this(size: Long) = this(size, false);
override protected def bucketClear(index: Int): Unit = {
buckets(index) = new HashSet[Any];
}
override def mergeBuckets(b: Seq[Set[Any]]) = {
b.foldLeft(scala.collection.immutable.Set[Any]())(_ ++ _).size;
}
def hit(d: Date, value: Any): Unit = withSyncUpdate {
for (bucket <- bucketAtTime(d)) {
buckets(bucket) += value;
}
}
}
class BucketedValueCounts(size: Long, noUpdate: Boolean)
extends BucketKeeper[HashMap[String, Int], (Int, Map[String, Int])](size, noUpdate) {
def this(size: Long) = this(size, false);
override protected def bucketClear(index: Int): Unit = {
buckets(index) = new HashMap[String, Int];
}
override def mergeBuckets(b: Seq[HashMap[String, Int]]) = {
val out = new HashMap[String, Int];
var total = 0;
for (m <- b) {
for ((k, v) <- m) {
out(k) = out.getOrElse(k, 0) + v;
total += v;
}
}
(total, out);
}
def hit(d: Date, value: String, increment: Int): Unit = withSyncUpdate {
for (bucket <- bucketAtTime(d)) {
buckets(bucket)(value) =
buckets(bucket).getOrElse(value, 0)+increment;
}
}
def hit(d: Date, value: String): Unit = hit(d, value, 1);
}
/**
* Keeps track of how many "hits" in the last size milliseconds.
* Has granularity speicified by numbuckets.
*/
class BucketedLastHits(size: Long, noUpdate: Boolean)
extends BucketKeeper[Int, Int](size, noUpdate) {
def this(size: Long) = this(size, false);
override protected def bucketClear(index: int): Unit = {
buckets(index) = 0;
}
override def mergeBuckets(b: Seq[Int]) = {
b.foldRight(0)(_+_);
}
def hit(d: Date): Unit = hit(d, 1);
def hit(d: Date, n: Int): Unit = withSyncUpdate {
for (bucket <- bucketAtTime(d)) {
buckets(bucket) = buckets(bucket) + n;
}
}
}
class BucketedLastHitsHistogram(size: Long, noUpdate: Boolean)
extends BucketKeeper[ArrayBuffer[Int], Function1[Float, Int]](size, noUpdate) {
def this(size: Long) = this(size, false);
override protected def bucketClear(index: Int): Unit = {
buckets(index) = new ArrayBuffer[Int];
}
// elements will end up sorted.
protected def histogramFunction(elements: Array[Int]): Function1[Float, Int] = {
Sorting.quickSort(elements);
(percentile: Float) => {
if (elements.length == 0) {
0
} else {
elements(
Math.round(percentile/100.0f*(elements.length-1)));
}
}
}
override def mergeBuckets(b: Seq[ArrayBuffer[Int]]) = {
val elements = new Array[Int](b.foldRight(0)(_.size + _));
var currentIndex = 0;
for (bucket <- b if bucket.length > 0) {
// copyToArray is broken through scala 2.7.5, fixed in trunk.
// bucket.copyToArray(allElements, currentIndex);
val bucketArray = bucket.toArray;
System.arraycopy(bucketArray, 0, elements, currentIndex, bucketArray.length);
currentIndex += bucket.size
}
histogramFunction(elements);
}
def hit(d: Date): Unit = hit(d, 1);
def hit(d: Date, n: Int): Unit = withSyncUpdate {
for (bucket <- bucketAtTime(d)) {
buckets(bucket) += n;
}
}
}
object appstats {
val minutelyStatus = new HashMap[int, BucketedLastHits] with BucketMap;
val hourlyStatus = new HashMap[int, BucketedLastHits] with BucketMap { override val t = 1000*60*60 };
val dailyStatus = new HashMap[int, BucketedLastHits] with BucketMap { override val t = 1000*60*60*24 };
val weeklyStatus = new HashMap[int, BucketedLastHits] with BucketMap { override val t = 1000*60*60*24*7 };
val stati = Array(minutelyStatus, hourlyStatus, dailyStatus, weeklyStatus);
}