/**
* Copyright 2009 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS-IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.appjet.oui;
import java.text.SimpleDateFormat;
import java.io.{File, FileWriter, StringWriter, PrintWriter};
import java.util.Date;
import java.util.concurrent.{ConcurrentLinkedQueue, ConcurrentHashMap, CopyOnWriteArraySet};
import java.util.concurrent.atomic.AtomicInteger;
import scala.util.Sorting;
import scala.ref.WeakReference;
import scala.collection.mutable.{Map, HashMap};
import scala.collection.jcl.{SetWrapper, Conversions};
import org.json.{JSONObject, JSONArray};
import org.mozilla.javascript.{Scriptable, Context};
import Util.iteratorToRichIterator;
import scala.collection.jcl.Conversions._;
trait LoggablePropertyBag {
def date: Date;
def `type`: String = value("type").asInstanceOf[String];
def json: String;
def tabDelimited: String;
def keys: Array[String];
def value(k: String): Any;
}
class LoggableFromScriptable(
scr: Scriptable,
extra: Option[scala.collection.Map[String, String]])
extends LoggablePropertyBag {
def this(scr: Scriptable) = this(scr, None);
if (extra.isDefined) {
for ((k, v) <- extra.get if (! scr.has(k, scr))) {
scr.put(k, scr, v);
}
}
val keys =
scr.getIds()
.map(_.asInstanceOf[String])
.filter(scr.get(_, scr) != Context.getUndefinedValue());
Sorting.quickSort(keys);
if (! scr.has("date", scr)) {
scr.put("date", scr, System.currentTimeMillis());
}
val date = new Date(scr.get("date", scr).asInstanceOf[Number].longValue);
val json = FastJSON.stringify(scr);
val tabDelimited = GenericLoggerUtils.dateString(date) + "\t" +
keys.filter("date" != _).map(value(_)).mkString("\t");
def value(k: String) = {
scr.get(k, scr);
}
}
class LoggableFromMap[T](
map: scala.collection.Map[String, T],
extra: Option[scala.collection.Map[String, String]])
extends LoggablePropertyBag {
def this(map: scala.collection.Map[String, T]) = this(map, None);
val keys = map.keys.collect.toArray ++
extra.map(_.keys.collect.toArray).getOrElse(Array[String]());
Sorting.quickSort(keys);
def fillJson(json: JSONObject,
map: scala.collection.Map[String, T]): JSONObject = {
for ((k, v) <- map) {
v match {
case b: Boolean => json.put(k, b);
case d: Double => json.put(k, d);
case i: Int => json.put(k, i);
case l: Long => json.put(k, l);
case m: java.util.Map[_,_] => json.put(k, m);
case m: scala.collection.Map[String,T] =>
json.put(k, fillJson(new JSONObject(), m));
case c: java.util.Collection[_] => json.put(k, c);
case o: Object => json.put(k, o);
case _ => {};
}
}
json;
}
val json0 = fillJson(new JSONObject(), map);
if (extra.isDefined) {
for ((k, v) <- extra.get if (! json0.has(k))) {
json0.put(k, v);
}
}
if (! json0.has("date")) {
json0.put("date", System.currentTimeMillis());
}
val date = new Date(json0.getLong("date"));
val json = json0.toString;
val tabDelimited =
GenericLoggerUtils.dateString(date) + "\t" +
keys.filter("date" != _).map(value(_)).mkString("\t");
def value(k: String) = {
map.orElse(extra.getOrElse(Map[String, Any]()))(k);
}
}
class LoggableFromJson(val json: String) extends LoggablePropertyBag {
val obj = new JSONObject(json);
val date = new Date(obj.getLong("date"));
val keys = obj.sortedKeys().map(String.valueOf(_)).collect.toArray;
def value(k: String) = obj.get(k);
val tabDelimited =
GenericLoggerUtils.dateString(date) + "\t"+
keys.filter("date" != _).map(value(_)).mkString("\t");
}
object GenericLoggerUtils {
lazy val df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSZ");
def dateString(date: Date) = df.format(date);
var extraPropertiesFunction: Option[() => Map[String, String]] = None;
def setExtraPropertiesFunction(f: () => Map[String, String]) {
extraPropertiesFunction = Some(() => {
try {
f();
} catch {
case e => withoutExtraProperties {
exceptionlog(e);
Map[String, String]();
}
}
});
}
def getExtraProperties: Option[Map[String, String]] = {
if (shouldGetExtraProperties) {
withoutExtraProperties(extraPropertiesFunction.map(_()));
} else {
None;
}
}
val registeredWranglers =
new ConcurrentHashMap[String, SetWrapper[WeakReference[LogWrangler]]];
def registerWrangler(name: String, wrangler: LogWrangler) {
wranglers(name) += wrangler.ref;
}
def clearWrangler(name: String, wrangler: LogWrangler) {
wranglers(name) -= wrangler.ref;
}
def wranglers(name: String) = {
if (! registeredWranglers.containsKey(name)) {
val set1 = Conversions.convertSet(
new CopyOnWriteArraySet[WeakReference[LogWrangler]]);
val set2 = registeredWranglers.putIfAbsent(
name, set1);
if (set2 == null) {
set1
} else {
set2
}
} else {
registeredWranglers.get(name);
}
}
def tellWranglers(name: String, lpb: LoggablePropertyBag) {
for (w <- wranglers(name)) {
w.get.foreach(_.tell(lpb));
if (! w.isValid) {
wranglers(name) -= w;
}
}
}
val shouldGetExtraProperties_var =
new NoninheritedDynamicVariable[Boolean](true);
def withoutExtraProperties[E](block: => E): E = {
shouldGetExtraProperties_var.withValue(false)(block);
}
def shouldGetExtraProperties = shouldGetExtraProperties_var.value;
}
class GenericLogger(path: String, logName: String, rotateDaily: Boolean) {
val queue = new ConcurrentLinkedQueue[LoggablePropertyBag];
var loggerThread: Thread = null;
var currentLogDay:Date = null;
var logWriter: FileWriter = null;
var logBase = config.logDir;
def setLogBase(p: String) { logBase = p }
var echoToStdOut = false;
def setEchoToStdOut(e: Boolean) {
echoToStdOut = e;
}
def stdOutPrefix = logName+": "
def initLogWriter(logDay: Date) {
currentLogDay = logDay;
// if rotating, log filename is logBase/[path/]logName/logName-<date>.jslog
// otherwise, log filename is logBase/[path/]logName.jslog
var fileName =
if (rotateDaily) {
val df = new SimpleDateFormat("yyyy-MM-dd");
logName + "/" + logName + "-" + df.format(logDay) + ".jslog";
} else {
logName + ".jslog";
}
if (path != null && path.length > 0) {
fileName = path + "/" + fileName;
}
val f = new File(logBase+"/"+fileName);
if (! f.getParentFile.exists) {
f.getParentFile().mkdirs();
}
logWriter = new FileWriter(f, true);
}
def rotateIfNecessary(messageDate: Date) {
if (rotateDaily) {
if (!((messageDate.getYear == currentLogDay.getYear) &&
(messageDate.getMonth == currentLogDay.getMonth) &&
(messageDate.getDate == currentLogDay.getDate))) {
logWriter.flush();
logWriter.close();
initLogWriter(messageDate);
}
}
}
def flush() {
flush(java.lang.Integer.MAX_VALUE);
}
def close() {
logWriter.close();
}
def flush(n: Int) = synchronized {
var count = 0;
while (count < n && ! queue.isEmpty()) {
val lpb = queue.poll();
rotateIfNecessary(lpb.date);
logWriter.write(lpb.json+"\n");
if (echoToStdOut)
print(lpb.tabDelimited.split("\n").mkString(stdOutPrefix, "\n"+stdOutPrefix, "\n"));
count += 1;
}
if (count > 0) {
logWriter.flush();
}
count;
}
def start() {
initLogWriter(new Date());
loggerThread = new Thread("GenericLogger "+logName) {
this.setDaemon(true);
override def run() {
while (true) {
if (queue.isEmpty()) {
Thread.sleep(500);
} else {
flush(1000);
}
}
}
}
main.loggers += this;
loggerThread.start();
}
def log(lpb: LoggablePropertyBag) {
if (loggerThread != null) {
queue.offer(lpb);
GenericLoggerUtils.tellWranglers(logName, lpb);
}
}
def logObject(scr: Scriptable) {
log(new LoggableFromScriptable(
scr, GenericLoggerUtils.getExtraProperties));
}
def log[T](m: scala.collection.Map[String, T]) {
log(new LoggableFromMap(
m, GenericLoggerUtils.getExtraProperties));
}
def log(s: String) {
log(Map("message" -> s));
}
def apply(s: String) {
log(s);
}
def apply(scr: Scriptable) {
logObject(scr);
}
def apply[T](m: scala.collection.Map[String, T]) {
log(m);
}
}
object profiler extends GenericLogger("backend", "profile", false) {
def apply(id: String, op: String, method: String, path: String, countAndNanos: (Long, Long)) {
if (loggerThread != null)
log(id+":"+op+":"+method+":"+path+":"+
Math.round(countAndNanos._2/1000)+
(if (countAndNanos._1 > 1) ":"+countAndNanos._1 else ""));
}
// def apply(state: RequestState, op: String, nanos: long) {
// apply(state.requestId, op, state.req.getMethod(), state.req.getRequestURI(), nanos);
// }
def time =
System.nanoTime();
// thread-specific stuff.
val map = new ThreadLocal[HashMap[String, Any]] {
override def initialValue = new HashMap[String, Any];
}
val idGen = new java.util.concurrent.atomic.AtomicLong(0);
val id = new ThreadLocal[Long] {
override def initialValue = idGen.getAndIncrement();
}
def reset() = {
map.remove();
id.remove();
}
def record(key: String, time: Long) {
map.get()(key) = (1L, time);
}
def recordCumulative(key: String, time: Long) {
map.get()(key) = map.get().getOrElse(key, (0L, 0L)) match {
case (count: Long, time0: Long) => (count+1, time0+time);
case _ => { } // do nothing, but maybe shoud error.
}
}
def print() {
for ((k, t) <- map.get()) {
profiler(""+id.get(), k, "/", "/", t match {
case (count: Long, time0: Long) => (count, time0);
case _ => (-1L, -1L);
});
}
}
def printTiming[E](name: String)(block: => E): E = {
val startTime = time;
val r = block;
val endTime = time;
println(name+": "+((endTime - startTime)/1000)+" us.");
r;
}
}
object eventlog extends GenericLogger("backend", "server-events", true) {
start();
}
object streaminglog extends GenericLogger("backend", "streaming-events", true) {
start();
}
object exceptionlog extends GenericLogger("backend", "exceptions", true) {
def apply(e: Throwable) {
val s = new StringWriter;
e.printStackTrace(new PrintWriter(s));
log(Map(
"description" -> e.toString(),
"trace" -> s.toString()));
}
echoToStdOut = config.devMode
override def stdOutPrefix = "(exlog): ";
start();
}
// object dprintln extends GenericLogger("backend", "debug", true) {
// echoToStdOut = config.devMode;
// }
class STFULogger extends org.mortbay.log.Logger {
def debug(m: String, a0: Object, a1: Object) { }
def debug(m: String, t: Throwable) { }
def getLogger(m: String) = { this }
def info(m: String, a0: Object, a2: Object) { }
def isDebugEnabled() = { false }
def setDebugEnabled(t: Boolean) { }
def warn(m: String, a0: Object, a1: Object) { }
def warn(m: String, t: Throwable) { }
}
case class Percentile(count: Int, p50: Int, p90: Int, p95: Int, p99: Int, max: Int);
object cometlatencies {
var latencies = new java.util.concurrent.ConcurrentLinkedQueue[Int];
def register(t: Int) = latencies.offer(t);
var loggerThread: Thread = null;
var lastCount: Option[Map[String, Int]] = None;
var lastStats: Option[Percentile] = None;
def start() {
loggerThread = new Thread("latencies logger") {
this.setDaemon(true);
override def run() {
while(true) {
Thread.sleep(60*1000); // every minute
try {
val oldLatencies = latencies;
latencies = new java.util.concurrent.ConcurrentLinkedQueue[Int];
val latArray = oldLatencies.toArray().map(_.asInstanceOf[int]);
Sorting.quickSort(latArray);
def pct(p: Int) =
if (latArray.length > 0)
latArray(Math.floor((p/100.0)*latArray.length).toInt);
else
0;
def s(a: Any) = String.valueOf(a);
lastStats = Some(Percentile(latArray.length,
pct(50), pct(90), pct(95), pct(99),
if (latArray.length > 0) latArray.last else 0));
eventlog.log(Map(
"type" -> "streaming-message-latencies",
"count" -> s(lastStats.get.count),
"p50" -> s(lastStats.get.p50),
"p90" -> s(lastStats.get.p90),
"p95" -> s(lastStats.get.p95),
"p99" -> s(lastStats.get.p99),
"max" -> s(lastStats.get.max)));
lastCount = Some({
val c = Class.forName("net.appjet.ajstdlib.Comet$");
c.getDeclaredMethod("connectionStatus")
.invoke(c.getDeclaredField("MODULE$").get(null))
}.asInstanceOf[Map[String, Int]]);
eventlog.log(
Map("type" -> "streaming-connection-count") ++
lastCount.get.elements.map(p => (p._1, String.valueOf(p._2))));
} catch {
case e: Exception => {
exceptionlog(e);
}
}
}
}
}
loggerThread.start();
}
start();
}
object executionlatencies extends GenericLogger("backend", "latency", true) {
start();
def time = System.currentTimeMillis();
}
abstract class LogWrangler {
def tell(lpb: LoggablePropertyBag);
def tell(json: String) { tell(new LoggableFromJson(json)); }
lazy val ref = new WeakReference(this);
def watch(logName: String) {
GenericLoggerUtils.registerWrangler(logName, this);
}
}
// you probably want to subclass this, or at least set data.
class FilterWrangler(
`type`: String,
filter: LoggablePropertyBag => Boolean,
field: String) extends LogWrangler {
def tell(lpb: LoggablePropertyBag) {
if ((`type` == null || lpb.`type` == `type`) &&
(filter == null || filter(lpb))) {
val entry = lpb.value(field);
data(lpb.date, entry);
}
}
var data: (Date, Any) => Unit = null;
def setData(data0: (Date, Any) => Unit) {
data = data0;
}
}
class TopNWrangler(n: Int, `type`: String,
filter: LoggablePropertyBag => Boolean,
field: String)
extends FilterWrangler(`type`, filter, field) {
val entries = new ConcurrentHashMap[String, AtomicInteger]();
def sortedEntries = {
Sorting.stableSort(
convertMap(entries).toSeq,
(p1: (String, AtomicInteger), p2: (String, AtomicInteger)) =>
p1._2.get() > p2._2.get());
}
def count = {
(convertMap(entries) :\ 0) { (x, y) => x._2.get() + y }
}
def topNItems(n: Int): Array[(String, Int)] =
sortedEntries.take(n).map(p => (p._1, p._2.get())).toArray;
def topNItems: Array[(String, Int)] = topNItems(n);
data = (date: Date, value: Any) => {
val entry = value.asInstanceOf[String];
val i =
if (! entries.containsKey(entry)) {
val newInt = new AtomicInteger(0);
val oldInt = entries.putIfAbsent(entry, newInt);
if (oldInt == null) { newInt } else { oldInt }
} else {
entries.get(entry);
}
i.incrementAndGet();
}
}