aboutsummaryrefslogtreecommitdiffstats
path: root/trunk/infrastructure/net.appjet.ajstdlib
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/infrastructure/net.appjet.ajstdlib')
-rw-r--r--trunk/infrastructure/net.appjet.ajstdlib/ajstdlib.scala253
-rw-r--r--trunk/infrastructure/net.appjet.ajstdlib/sqlbase.scala563
-rw-r--r--trunk/infrastructure/net.appjet.ajstdlib/streaming-client.js920
-rw-r--r--trunk/infrastructure/net.appjet.ajstdlib/streaming-iframe.html76
-rw-r--r--trunk/infrastructure/net.appjet.ajstdlib/streaming.scala892
-rw-r--r--trunk/infrastructure/net.appjet.ajstdlib/timer.scala85
6 files changed, 2789 insertions, 0 deletions
diff --git a/trunk/infrastructure/net.appjet.ajstdlib/ajstdlib.scala b/trunk/infrastructure/net.appjet.ajstdlib/ajstdlib.scala
new file mode 100644
index 0000000..8d285af
--- /dev/null
+++ b/trunk/infrastructure/net.appjet.ajstdlib/ajstdlib.scala
@@ -0,0 +1,253 @@
+/**
+ * Copyright 2009 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS-IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.appjet.ajstdlib;
+
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.{ScheduledThreadPoolExecutor, Callable};
+import scala.collection.mutable.{HashMap, SynchronizedMap};
+
+import org.mozilla.javascript.{Context, ScriptableObject, Function, RhinoException, Scriptable};
+
+import net.appjet.oui.{SpecialJarOrNotFile, DiskLibrary, FixedDiskLibrary, VariableDiskLibrary, ExecutionContext, ExecutionContextUtils, ScopeReuseManager, config, exceptionlog};
+import net.appjet.bodylock.{BodyLock, ExecutionException};
+import net.appjet.common.util.LenientFormatter;
+
+import org.mortbay.jetty.nio.SelectChannelConnector;
+import org.mortbay.util.ajax.ContinuationSupport;
+
+object ajstdlib {
+ def runModuleInNewScope(cx: ExecutionContext, moduleName: String): Any = {
+ val newScope = BodyLock.subScope(cx.runner.globalScope);
+ if (! libraryExists(moduleName))
+ return Context.getUndefinedValue(); // unfortunately, returning "false" doesn't really do the right thing here.
+ try {
+ libraryExecutable(moduleName).execute(newScope);
+ } catch {
+ case e: ExecutionException => throw e;
+ case e => throw new ExecutionException("Error occurred while running module: "+moduleName, e);
+ // TODO: There was code here to print errors to the response if something didn't compile. Replace this code?
+ }
+ newScope;
+ }
+
+ private val modules = new HashMap[String, DiskLibrary] with SynchronizedMap[String, DiskLibrary];
+ private def library(name: String) = modules.getOrElseUpdate(name+".js", new VariableDiskLibrary(name+".js"));
+ private def libraryExists(name: String) = {
+ val lib = library(name);
+ // ScopeReuseManager.watch(lib);
+ lib.exists;
+ }
+ private def libraryExecutable(name: String) = {
+ val lib = library(name);
+ // ScopeReuseManager.watch(lib);
+ lib.executable;
+ }
+
+ val globalLock = new ReentrantLock();
+ val attributes = new HashMap[String, Any] with SynchronizedMap[String, Any];
+
+ def init() {
+ // any other ajstdlib initialization goes here.
+ Comet.init();
+ }
+}
+
+object printf {
+ def printf(format: String, list: Array[Object]): String = {
+// val list: Array[Object] = new Array[Object](argList.getLength)
+// for (i <- List.range(0, list.length))
+// list(i) = argList.getElement(i).getOrElse(null) match {
+// case AppVM.JSNumber(n) => n
+// case AppVM.JSString(s) => s
+// case AppVM.JSBoolean(b) => b
+// case _ => null
+// }
+ val args = list.map(x => Context.jsToJava(x, classOf[Object]));
+ try {
+ val fmt = new LenientFormatter()
+ fmt.format(format, args: _*)
+ fmt.toString()
+ } catch {
+ case e: java.util.IllegalFormatException =>
+ throw new ExecutionException("String Format Error: <tt>printf</tt> error: "+e.getMessage(), e)
+ }
+ }
+}
+
+
+import java.security.MessageDigest;
+
+object md5 {
+ def md5(input: String): String = {
+ val bytes = input.getBytes("UTF-8");
+ md5(bytes);
+ }
+ def md5(bytes: Array[byte]): String = {
+ var md = MessageDigest.getInstance("MD5");
+ var digest = md.digest(bytes);
+ var builder = new StringBuilder();
+ for (b <- digest) {
+ builder.append(Integer.toString((b >> 4) & 0xf, 16));
+ builder.append(Integer.toString(b & 0xf, 16));
+ }
+ builder.toString();
+ }
+}
+
+object execution {
+ def runAsync(ec: ExecutionContext, f: Function) {
+ ec.asyncs += f;
+ }
+
+ def executeCodeInNewScope(parentScope: Scriptable,
+ code: String, name: String,
+ startLine: Int): Scriptable = {
+ val ec = ExecutionContextUtils.currentContext;
+ val executable =
+ try {
+ BodyLock.compileString(code, name, startLine);
+ } catch {
+ case e: RhinoException =>
+ throw new ExecutionException(
+ "Failed to execute code in new scope.", e);
+ }
+ if (ec == null || ec.runner == null) {
+ Thread.dumpStack();
+ }
+ val scope = BodyLock.subScope(
+ if (parentScope != null) parentScope
+ else ec.runner.mainScope);
+ scope.setParentScope(ec.runner.mainScope);
+ executable.execute(scope);
+ scope;
+ }
+
+ def runTask(taskName: String, args: Array[Object]): AnyRef = {
+ val ec = net.appjet.oui.execution.runOutOfBand(
+ net.appjet.oui.execution.scheduledTaskExecutable,
+ "Task "+taskName,
+ Some(Map("taskName" -> taskName,
+ "taskArguments" -> args)),
+ { error =>
+ error match {
+ case e: Throwable => exceptionlog(e);
+ case _ => exceptionlog(error.toString);
+ }
+ });
+ ec.result;
+ }
+
+ def runTaskSimply(taskName: String, args: Array[Object]): AnyRef = {
+ net.appjet.oui.execution.runOutOfBandSimply(
+ net.appjet.oui.execution.scheduledTaskExecutable,
+ Some(Map("taskName" -> taskName,
+ "taskArguments" -> args)));
+ }
+
+ def wrapRunTask(taskName: String, args: Array[Object],
+ returnType: Class[_]): Function0[AnyRef] = {
+ new Function0[AnyRef] {
+ def apply = Context.jsToJava(runTaskSimply(taskName, args), returnType);
+ }
+ }
+
+ val threadpools = new HashMap[String, ScheduledThreadPoolExecutor]
+ with SynchronizedMap[String, ScheduledThreadPoolExecutor];
+
+ def createNamedTaskThreadPool(name: String, poolSize: Int) {
+ threadpools.put(name, new ScheduledThreadPoolExecutor(poolSize));
+ }
+
+ class TaskRunner(val taskName: String, args: Array[Object]) extends Callable[AnyRef] {
+ def call(): AnyRef = {
+ runTask(taskName, args);
+ }
+ }
+
+ def scheduleTaskInPool(poolName: String, taskName: String, delayMillis: Long, args: Array[Object]) = {
+ val pool = threadpools.getOrElse(poolName, throw new RuntimeException("No such task threadpool: "+poolName));
+ pool.schedule(new TaskRunner(taskName, args), delayMillis, java.util.concurrent.TimeUnit.MILLISECONDS);
+ }
+
+ def shutdownAndWaitOnTaskThreadPool(poolName: String, timeoutMillis: Long) = {
+ val pool = threadpools.getOrElse(poolName, throw new RuntimeException("No such task threadpool: "+poolName));
+ pool.shutdown();
+ pool.awaitTermination(timeoutMillis, java.util.concurrent.TimeUnit.MILLISECONDS);
+ }
+
+ def getContinuation(ec: ExecutionContext) = {
+ val req = ec.request.req;
+ ContinuationSupport.getContinuation(req, req).asInstanceOf[SelectChannelConnector.RetryContinuation];
+ }
+
+ def sync[T](obj: AnyRef)(block: => T): T = {
+ obj.synchronized {
+ block;
+ }
+ }
+}
+
+import javax.mail._;
+import javax.mail.internet._;
+import java.util.Properties;
+
+object email {
+ def sendEmail(toAddr: Array[String], fromAddr: String, subject: String, headers: Scriptable, content: String): String = {
+ try {
+ val badAddresses = for (a <- toAddr if (a.indexOf("@") == -1)) yield a;
+ if (badAddresses.length > 0) {
+ "The email address"+(if (badAddresses.length > 1) "es" else "")+" "+
+ badAddresses.mkString("\"", "\", \"", "\"")+" do"+(if (badAddresses.length == 1) "es" else "")+
+ " not appear to be valid.";
+ } else {
+ val debug = false;
+
+ val props = new Properties;
+ props.put("mail.smtp.host", config.smtpServerHost);
+ props.put("mail.smtp.port", config.smtpServerPort.toString());
+ if (config.smtpUser != "")
+ props.put("mail.smtp.auth", "true");
+
+ val session = Session.getInstance(props, if (config.smtpUser != "") new Authenticator {
+ override def getPasswordAuthentication() =
+ new PasswordAuthentication(config.smtpUser, config.smtpPass);
+ } else null);
+ session.setDebug(debug);
+
+ val msg = new MimeMessage(session);
+ val fromIAddr = new InternetAddress(fromAddr);
+ msg.setFrom(fromIAddr);
+ val toIAddr: Array[Address] = toAddr.map(x => (new InternetAddress(x))); // new InternetAddress(toAddr);
+ msg.setRecipients(Message.RecipientType.TO, toIAddr);
+
+ if (headers != null)
+ for (o <- headers.getIds() if o.isInstanceOf[String]) {
+ val k = o.asInstanceOf[String]
+ msg.addHeader(k, headers.get(k, headers).asInstanceOf[String]);
+ }
+
+ msg.setSubject(subject);
+ msg.setContent(content, "text/plain");
+ Transport.send(msg);
+ "";
+ }
+ } catch {
+ case e: MessagingException => { exceptionlog(e); e.printStackTrace() ; "Messaging exception: "+e.getMessage+"."; }
+ case e: Exception => { exceptionlog(e); e.printStackTrace(); "Unknown error."; }
+ }
+ }
+}
diff --git a/trunk/infrastructure/net.appjet.ajstdlib/sqlbase.scala b/trunk/infrastructure/net.appjet.ajstdlib/sqlbase.scala
new file mode 100644
index 0000000..047c086
--- /dev/null
+++ b/trunk/infrastructure/net.appjet.ajstdlib/sqlbase.scala
@@ -0,0 +1,563 @@
+/**
+ * Copyright 2009 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS-IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.appjet.ajstdlib;
+
+import scala.collection.mutable.ArrayBuffer;
+
+import java.sql.{DriverManager, SQLException, Statement};
+import net.appjet.oui.{profiler, config, NoninheritedDynamicVariable};
+import com.mchange.v2.c3p0._;
+
+class SQLBase(driverClass: String, url: String, userName: String, password: String) {
+
+ def isMysql:Boolean = (url.startsWith("jdbc:mysql:"));
+ def isDerby:Boolean = (url.startsWith("jdbc:derby:"));
+
+ if (isDerby) {
+ System.setProperty("derby.system.home", config.derbyHome);
+ val f = new java.io.File(config.derbyHome);
+ if (! f.exists) {
+ if (! f.mkdirs())
+ throw new RuntimeException("Couldn't create internal database storage directory: "+config.derbyHome);
+ }
+ if (! f.isDirectory)
+ throw new RuntimeException("Internal database storage directory is not a directory: "+config.derbyHome);
+ if (! f.canWrite)
+ throw new RuntimeException("Can't write to internal database storage directory: "+config.derbyHome);
+ }
+
+ val cpds = new ComboPooledDataSource();
+ cpds.setDriverClass(driverClass);
+ cpds.setJdbcUrl(url+(if (isMysql) "?useUnicode=true&characterEncoding=utf8" else ""));
+
+ // derby does not require a password
+ if (!isDerby) {
+ cpds.setUser(userName);
+ cpds.setPassword(password);
+ }
+
+ cpds.setMaxPoolSize(config.jdbcPoolSize);
+ cpds.setMaxConnectionAge(6*60*60); // 6 hours
+ if (config.devMode) {
+ cpds.setAutomaticTestTable("cpds_testtable");
+ cpds.setTestConnectionOnCheckout(true);
+ }
+
+// {
+// // register db driver
+// try {
+// new JDCConnectionDriver(driverClass, url+"?useUnicode=true&characterEncoding=utf8", userName, password);
+// } catch {
+// case e => {
+// e.printStackTrace();
+// Runtime.getRuntime.halt(1);
+// }
+// }
+// }
+
+ private def getConnectionFromPool = {
+ val c = cpds.getConnection();
+ c.setAutoCommit(true);
+ c;
+ }
+
+ // Creates a dynamic variable whose .value depends on the innermost
+ // .withValue(){} on the call-stack.
+ private val currentConnection = new NoninheritedDynamicVariable[Option[java.sql.Connection]](None);
+
+ def withConnection[A](block: java.sql.Connection=>A): A = {
+ currentConnection.value match {
+ case Some(c) => {
+ block(c);
+ }
+ case None => {
+ val t1 = profiler.time;
+ val c = getConnectionFromPool;
+ profiler.recordCumulative("getConnection", profiler.time-t1);
+ try {
+ currentConnection.withValue(Some(c)) {
+ block(c);
+ }
+ } finally {
+ c.close;
+ }
+ }
+ }
+ }
+
+ private val currentlyInTransaction = new NoninheritedDynamicVariable(false);
+
+ def inTransaction[A](block: java.sql.Connection=>A): A = {
+ withConnection(c => {
+ if (currentlyInTransaction.value) {
+ return block(c);
+ } else {
+ currentlyInTransaction.withValue(true) {
+ c.setAutoCommit(false);
+ c.setTransactionIsolation(java.sql.Connection.TRANSACTION_REPEATABLE_READ);
+
+ try {
+ val result = block(c);
+ c.commit();
+ c.setAutoCommit(true);
+ result;
+ } catch {
+ case e@net.appjet.oui.AppGeneratedStopException => {
+ c.commit();
+ c.setAutoCommit(true);
+ throw e;
+ }
+ case (e:org.mozilla.javascript.WrappedException) if (e.getWrappedException ==
+ net.appjet.oui.AppGeneratedStopException) => {
+ c.commit();
+ c.setAutoCommit(true);
+ throw e;
+ }
+ case e => {
+ //println("inTransaction() caught error:");
+ //e.printStackTrace();
+ try {
+ c.rollback();
+ c.setAutoCommit(true);
+ } catch {
+ case ex => {
+ println("Could not rollback transaction because: "+ex.toString());
+ }
+ }
+ throw e;
+ }
+ }
+ }
+ }
+ });
+ }
+
+ def closing[A](closable: java.sql.Statement)(block: =>A): A = {
+ try { block } finally { closable.close(); }
+ }
+
+ def closing[A](closable: java.sql.ResultSet)(block: =>A): A = {
+ try { block } finally { closable.close(); }
+ }
+
+ def tableName(t: String) = id(t);
+
+ val identifierQuoteString = withConnection(_.getMetaData.getIdentifierQuoteString);
+ def quoteIdentifier(s: String) = identifierQuoteString+s+identifierQuoteString;
+ private def id(s: String) = quoteIdentifier(s);
+
+ def longTextType = if (isDerby) "CLOB" else "MEDIUMTEXT";
+
+ // derby seems to do things intelligently w.r.t. case-sensitivity and unicode support.
+ def createTableOptions = if (isMysql) " ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE utf8_bin" else "";
+
+ // creates table if it doesn't exist already
+ def createJSONTable(table: String) {
+ withConnection { c=>
+ val s = c.createStatement;
+ if (! doesTableExist(c, table)) {
+ closing(s) {
+ s.execute("CREATE TABLE "+tableName(table)+" ("+
+ id("ID")+" VARCHAR(128) PRIMARY KEY NOT NULL, "+
+ id("JSON")+" "+longTextType+" NOT NULL"+
+ ")"+createTableOptions);
+ }
+ }
+ }
+ }
+
+ // requires: table exists
+ // returns null if key doesn't exist
+ def getJSON(table: String, key: String): String = {
+ withConnection { c=>
+ val s = c.prepareStatement("SELECT "+id("JSON")+" FROM "+tableName(table)+" WHERE "+id("ID")+" = ?");
+ closing(s) {
+ s.setString(1, key);
+ var resultSet = s.executeQuery();
+ closing(resultSet) {
+ if (! resultSet.next()) {
+ null;
+ }
+ else {
+ resultSet.getString(1);
+ }
+ }
+ }
+ }
+ }
+
+ def getAllJSON(table: String, start: Int, count: Int): Array[Object] = {
+ withConnection { c =>
+ val s = c.prepareStatement("SELECT "+id("ID")+","+id("JSON")+" FROM "+tableName(table)+
+ " ORDER BY "+id("ID")+" DESC"+
+ " LIMIT ? OFFSET ?");
+ closing(s) {
+ s.setInt(2, start);
+ s.setInt(1, count);
+ var resultSet = s.executeQuery();
+ var output = new ArrayBuffer[Object];
+ closing(resultSet) {
+ while (resultSet.next()) {
+ output += new { val id = resultSet.getString(1); val value = resultSet.getString(2) };
+ }
+ output.toArray;
+ }
+ }
+ }
+ }
+
+ def getAllJSONKeys(table: String): Array[String] = {
+ withConnection { c =>
+ val s = c.prepareStatement("SELECT "+id("ID")+" FROM "+tableName(table));
+ closing(s) {
+ var resultSet = s.executeQuery();
+ var output = new ArrayBuffer[String];
+ closing(resultSet) {
+ while (resultSet.next()) {
+ output += resultSet.getString(1);
+ }
+ output.toArray;
+ }
+ }
+ }
+ }
+
+ // requires: table exists
+ // inserts key if it doesn't exist
+ def putJSON(table: String, key: String, json: String) {
+ withConnection { c=>
+ val update = c.prepareStatement("UPDATE "+tableName(table)+" SET "+id("JSON")+"=? WHERE "+id("ID")+"=?");
+ closing(update) {
+ update.setString(1, json);
+ update.setString(2, key);
+ update.executeUpdate();
+ if (update.getUpdateCount == 0) {
+ val insert = c.prepareStatement(
+ "INSERT INTO "+tableName(table)+" ("+id("ID")+", "+id("JSON")+") values (?,?)");
+ closing(insert) {
+ insert.setString(1, key);
+ insert.setString(2, json);
+ insert.executeUpdate();
+ }
+ }
+ }
+ }
+ }
+
+ def deleteJSON(table: String, key: String) {
+ // requires: table exists
+ withConnection { c=>
+ val update = c.prepareStatement("DELETE FROM "+tableName(table)+" WHERE "+id("ID")+"=?");
+ closing(update) {
+ update.setString(1, key);
+ update.executeUpdate();
+ }
+ }
+ }
+
+ private def metaName(table: String) = table+"_META";
+ private def metaTableName(table: String) = tableName(metaName(table));
+ private def textTableName(table: String) = tableName(table+"_TEXT");
+ private def escapeSearchString(dbm: java.sql.DatabaseMetaData, s: String): String = {
+ val e = dbm.getSearchStringEscape();
+ s.replace("_", e+"_").replace("%", e+"%");
+ }
+
+ private final val PAGE_SIZE = 20;
+
+ def doesTableExist(connection: java.sql.Connection, table: String): Boolean = {
+ val databaseMetadata = connection.getMetaData;
+ val tables = databaseMetadata.getTables(null, null,
+ escapeSearchString(databaseMetadata, table), null);
+ closing(tables) {
+ tables.next();
+ }
+ }
+
+ def autoIncrementClause = if (isDerby) "GENERATED BY DEFAULT AS IDENTITY" else "AUTO_INCREMENT";
+
+ // creates table if it doesn't exist already
+ def createStringArrayTable(table: String) {
+ withConnection { c=>
+ if (! doesTableExist(c, metaName(table))) { // check to see if the *_META table exists
+ // create tables and indices
+ val s = c.createStatement;
+ closing(s) {
+ s.execute("CREATE TABLE "+metaTableName(table)+" ("+
+ id("ID")+" VARCHAR(128) PRIMARY KEY NOT NULL, "+
+ id("NUMID")+" INT UNIQUE "+autoIncrementClause+" "+
+ ")"+createTableOptions);
+ val defaultOffsets = (1 to PAGE_SIZE).map(x=>"").mkString(",");
+ s.execute("CREATE TABLE "+textTableName(table)+" ("+
+ ""+id("NUMID")+" INT, "+id("PAGESTART")+" INT, "+id("OFFSETS")+" VARCHAR(256) NOT NULL DEFAULT '"+defaultOffsets+
+ "', "+id("DATA")+" "+longTextType+" NOT NULL"+
+ ")"+createTableOptions);
+ s.execute("CREATE INDEX "+id(table+"-NUMID-PAGESTART")+" ON "+textTableName(table)+"("+id("NUMID")+", "+id("PAGESTART")+")");
+ }
+ }
+ }
+ }
+
+ // requires: table exists
+ // returns: null if key or (key,index) doesn't exist, else the value
+ def getStringArrayElement(table: String, key: String, index: Int): String = {
+ val (pageStart, offset) = getPageStartAndOffset(index);
+ val page = new StringArrayPage(table, key, pageStart, true);
+ page.data(offset);
+ }
+
+ // requires: table exists
+ // returns: an array of the mappings present in the page that should hold the
+ // particular (key,index) mapping. the array may be empty or otherwise not
+ // contain the given (key,index).
+ def getPageStringArrayElements(table: String, key: String, index: Int): Array[IndexValueMapping] = {
+ val (pageStart, offset) = getPageStartAndOffset(index);
+ val page = new StringArrayPage(table, key, pageStart, true);
+ val buf = new scala.collection.mutable.ListBuffer[IndexValueMapping];
+
+ for(i <- 0 until page.data.length) {
+ val s = page.data(i);
+ if (s ne null) {
+ val n = pageStart + i;
+ buf += IndexValueMapping(n, s);
+ }
+ }
+
+ buf.toArray;
+ }
+
+ // requires: table exists
+ // creates key if doesn't exist
+ // value may be null
+ def putStringArrayElement(table: String, key: String, index: Int, value: String) {
+ val (pageStart, offset) = getPageStartAndOffset(index);
+ val page = new StringArrayPage(table, key, pageStart, false);
+ page.data(offset) = value;
+ page.updateDB();
+ }
+
+ def putMultipleStringArrayElements(table: String, key: String): Multiputter = new Multiputter {
+ var currentPage = None:Option[StringArrayPage];
+ def flushPage() {
+ if (currentPage.isDefined) {
+ val page = currentPage.get;
+ page.updateDB();
+ currentPage = None;
+ }
+ }
+ def finish() {
+ flushPage();
+ }
+ def put(index: Int, value: String) {
+ try {
+ val (pageStart, offset) = getPageStartAndOffset(index);
+ if (currentPage.isEmpty || currentPage.get.pageStart != pageStart) {
+ flushPage();
+ currentPage = Some(new StringArrayPage(table, key, pageStart, false));
+ }
+ currentPage.get.data(offset) = value;
+ }
+ catch {
+ case e => { e.printStackTrace; throw e }
+ }
+ }
+ }
+
+ trait Multiputter {
+ def put(index: Int, value: String);
+ def finish();
+ }
+
+ case class IndexValueMapping(index: Int, value: String);
+
+ def clearStringArray(table: String, key: String) {
+ withConnection { c=>
+ val numid = getStringArrayNumId(c, table, key, false);
+ if (numid >= 0) {
+ {
+ val s = c.prepareStatement("DELETE FROM "+textTableName(table)+" WHERE "+id("NUMID")+"=?");
+ closing(s) {
+ s.setInt(1, numid);
+ s.executeUpdate();
+ }
+ }
+ {
+ val s = c.prepareStatement("DELETE FROM "+metaTableName(table)+" WHERE "+id("NUMID")+"=?");
+ closing(s) {
+ s.setInt(1, numid);
+ s.executeUpdate();
+ }
+ }
+ }
+ }
+ }
+
+ private def getPageStartAndOffset(index: Int): (Int,Int) = {
+ val pageStart = (index / PAGE_SIZE) * PAGE_SIZE;
+ (pageStart, index - pageStart);
+ }
+
+ // requires: table exists
+ // returns: numid of new string array
+ private def newStringArray(c: java.sql.Connection, table: String, key: String): Int = {
+ val s = c.prepareStatement("INSERT INTO "+metaTableName(table)+" ("+id("ID")+") VALUES (?)",
+ Statement.RETURN_GENERATED_KEYS);
+ closing(s) {
+ s.setString(1, key);
+ s.executeUpdate();
+ val resultSet = s.getGeneratedKeys;
+ if (resultSet == null)
+ error("No generated numid for insert");
+ closing(resultSet) {
+ if (! resultSet.next()) error("No generated numid for insert");
+ resultSet.getInt(1);
+ }
+ }
+ }
+
+ def getStringArrayNumId(c: java.sql.Connection, table: String, key: String, creating: Boolean): Int = {
+ val s = c.prepareStatement("SELECT "+id("NUMID")+" FROM "+metaTableName(table)+" WHERE "+id("ID")+"=?");
+ closing(s) {
+ s.setString(1, key);
+ val resultSet = s.executeQuery();
+ closing(resultSet) {
+ if (! resultSet.next()) {
+ if (creating) {
+ newStringArray(c, table, key);
+ }
+ else {
+ -1
+ }
+ }
+ else {
+ resultSet.getInt(1);
+ }
+ }
+ }
+ }
+
+ def getStringArrayAllKeys(table: String): Array[String] = {
+ withConnection { c=>
+ val s = c.prepareStatement("SELECT "+id("ID")+" FROM "+metaTableName(table));
+ closing(s) {
+ val resultSet = s.executeQuery();
+ closing(resultSet) {
+ val buf = new ArrayBuffer[String];
+ while (resultSet.next()) {
+ buf += resultSet.getString(1);
+ }
+ buf.toArray;
+ }
+ }
+ }
+ }
+
+ private class StringArrayPage(table: String, key: String, val pageStart: Int, readonly: Boolean) {
+
+ val data = new Array[String](PAGE_SIZE);
+
+ private val numid = withConnection { c=>
+ val nid = getStringArrayNumId(c, table, key, ! readonly);
+
+ if (nid >= 0) {
+ val s = c.prepareStatement(
+ "SELECT "+id("OFFSETS")+","+id("DATA")+" FROM "+textTableName(table)+" WHERE "+id("NUMID")+"=? AND "+id("PAGESTART")+"=?");
+ closing(s) {
+ s.setInt(1, nid);
+ s.setInt(2, pageStart);
+ val resultSet = s.executeQuery();
+ closing(resultSet) {
+ if (! resultSet.next()) {
+ if (! readonly) {
+ val insert = c.prepareStatement("INSERT INTO "+textTableName(table)+
+ " ("+id("NUMID")+", "+id("PAGESTART")+", "+id("DATA")+") VALUES (?,?,'')");
+ closing(insert) {
+ insert.setInt(1, nid);
+ insert.setInt(2, pageStart);
+ insert.executeUpdate();
+ }
+ }
+ }
+ else {
+ val offsetsField = resultSet.getString(1);
+ val dataField = resultSet.getString(2);
+ val offsetStrings = offsetsField.split(",", -1);
+ var i = 0;
+ var idx = 0;
+ while (i < PAGE_SIZE) {
+ val nstr = offsetStrings(i);
+ if (nstr != "") {
+ val n = nstr.toInt;
+ data(i) = dataField.substring(idx, idx+n);
+ idx += n;
+ }
+ i += 1;
+ }
+ }
+ }
+ }
+ }
+ nid;
+ }
+
+ def updateDB() {
+ if (readonly) {
+ error("this is a readonly StringArrayPage");
+ }
+ // assert: the relevant row of the TEXT table exists
+ if (data.find(_ ne null).isEmpty) {
+ withConnection { c=>
+ val update = c.prepareStatement("DELETE FROM "+textTableName(table)+
+ " WHERE "+id("NUMID")+"=? AND "+id("PAGESTART")+"=?");
+ closing(update) {
+ update.setInt(1, numid);
+ update.setInt(2, pageStart);
+ update.executeUpdate();
+ }
+ }
+ }
+ else {
+ val offsetsStr = data.map(s => if (s eq null) "" else s.length.toString).mkString(",");
+ val dataStr = data.map(s => if (s eq null) "" else s).mkString("");
+ withConnection { c=>
+ val s = c.prepareStatement("UPDATE "+textTableName(table)+
+ " SET "+id("OFFSETS")+"=?, "+id("DATA")+"=? WHERE "+id("NUMID")+"=? AND "+id("PAGESTART")+"=?");
+ closing(s) {
+ s.setString(1, offsetsStr);
+ s.setString(2, dataStr);
+ s.setInt(3, numid);
+ s.setInt(4, pageStart);
+ s.executeUpdate();
+ }
+ }
+ }
+ }
+ }
+
+ def close {
+ if (isDerby) {
+ cpds.close();
+ try {
+ DriverManager.getConnection("jdbc:derby:;shutdown=true");
+ } catch {
+ case e: SQLException => if (e.getErrorCode() != 50000) throw e
+ }
+ }
+ }
+}
+
+
diff --git a/trunk/infrastructure/net.appjet.ajstdlib/streaming-client.js b/trunk/infrastructure/net.appjet.ajstdlib/streaming-client.js
new file mode 100644
index 0000000..3bfa227
--- /dev/null
+++ b/trunk/infrastructure/net.appjet.ajstdlib/streaming-client.js
@@ -0,0 +1,920 @@
+/**
+ * Copyright 2009 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS-IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+var host = window.location.host;
+
+function WebSocket(id) {
+ var self = this;
+ var socket = this;
+ var version = 2;
+
+ var timeouts = {};
+
+ this.onopen = function() { }
+ this.onclosed = function() { }
+ this.onmessage = function() { }
+ this.onhiccup = function() { }
+ this.onlogmessage = function() { }
+ this.CONNECTING = 0;
+ this.OPEN = 1;
+ this.CLOSED = 2;
+ this.readyState = -1;
+
+ var hiccupsLastMinute = 0;
+ var hiccupResetInterval = setInterval(function() {
+ hiccupsLastMinute = 0;
+ if (self.readyState == self.CLOSED)
+ clearInterval(hiccupResetInterval);
+ }, 60*1000);
+
+ var isHiccuping = false;
+ function hiccup(channel) {
+ if (channel != officialChannel && channel != self) return;
+ if (isHiccuping) return;
+ log("hiccup: "+channel.name);
+ if (hiccupsLastMinute++ > 10) {
+ doDisconnect({reconnect: true, reason: "Too many hiccups!"});
+ return;
+ }
+ closeAllChannels();
+ timeout(timeouts, "hiccup", 15000, function() {
+ isHiccuping = false;
+ doDisconnect({reconnect: false, reason: "Couldn't contact server to hiccup."});
+ });
+ isHiccuping = true;
+ function tryHiccup() {
+ if (! isHiccuping) return;
+ self.onhiccup({connected: false});
+ log("trying hiccup");
+ timeout(timeouts, "singleHiccup", 5000, function() {
+ tryHiccup();
+ });
+ simpleXhr('post', postPath(), true, [{key: "oob", value: "hiccup"}], function(sc, msg) {
+ if (! isHiccuping) return;
+ if (msg.substring(0, "restart-fail".length) == "restart-fail") {
+ doDisconnect({reconnect: true, reason: "Server restarted or socket timed out on server."});
+ } else if (sc != 200 || msg.substring(0, 2) != "ok") {
+ log("Failed to hiccup with error: "+sc+" / "+msg);
+ setTimeout(tryHiccup, 500);
+ } else {
+ isHiccuping = false;
+ timeouts.singleHiccup();
+ timeouts.hiccup();
+ doConnect();
+ }
+ });
+ }
+ tryHiccup();
+ }
+ function closeAllChannels() {
+ for (var i in activeChannels) {
+ if (activeChannels.hasOwnProperty(i)) {
+ activeChannels[i].disconnect();
+ }
+ }
+ officialChannel = undefined;
+ }
+
+ function doDisconnect(obj, silent, sync) {
+ log("disconnected: "+obj.reason+" / "+(obj.data !== undefined ? "data: "+obj.data : ""));
+ logAll();
+ closeAllChannels();
+ if (longPollingIFrame && longPollingIFrame.div) {
+ longPollingIFrame.div.innerHTML = "";
+ }
+ if (self.readyState != self.CLOSED) {
+ self.readyState = self.CLOSED;
+ if (! silent) {
+ postSingleMessageNow(true, "kill:"+obj.reason, sync, true);
+ }
+ self.onclosed(obj);
+ }
+ }
+
+ this.disconnect = function(sync) {
+ doDisconnect({reason: "Closed by client."}, false, sync);
+ }
+
+
+ function doBasicConnect() {
+ var type = getBasicChannel();
+ log("basic connect on type: "+type);
+ var channel = activeChannels[type] = new channelConstructors[type]();
+ channel.connect();
+ }
+
+ function doOtherConnect() {
+ var channels = getOtherChannels();
+ var channel; var type;
+ for (var i = 0; i < channels.length; ++i) {
+ type = channels[i];
+ log("other connect on type: "+type);
+ channel = activeChannels[type] = new channelConstructors[type]();
+ channel.connect();
+ }
+ }
+ function doConnect() {
+ log("doing connect!");
+ timeout(timeouts, "connect", 15000, function() {
+ doDisconnect({reconnect: false, reason: "Timeout connecting to server: no channel type was able to connect."});
+ });
+ doBasicConnect();
+ }
+
+ this.connect = function() {
+ log("socket connecting: "+id);
+ doConnect();
+ }
+
+ // util
+ function nicetime() { return Math.floor((new Date()).valueOf() / 100) % 10000000; }
+ function log(s) { self.onlogmessage("(comet @t: "+nicetime()+") "+s); }
+ function logAll() {
+ log(self.describe())
+ }
+ this.describe = function() {
+ function describeChannels() {
+ out = [];
+ for (var i in activeChannels) {
+ if (activeChannels.hasOwnProperty(i)) {
+ out.push(i+": "+activeChannels[i].describe());
+ }
+ }
+ return "[ "+out.join(", ")+" ]";
+ }
+ return ("socket state: { id: "+id+", readyState: "+self.readyState+", isHiccuping: "+isHiccuping+", timeouts: "+describeTimeouts(timeouts)+", officialChannel: "+(officialChannel?officialChannel.name:"none")+", channels: "+describeChannels()+", isPosting: "+isPosting+", lastReceivedSeqNumber: "+lastReceivedSeqNumber+", lastPost: "+lastPost+", postTimeouts: "+describeTimeouts(postTimeouts)+", channelSeq: "+channelSeq+" }");
+ }
+
+ function wrapMethod(obj, method) {
+ return function() {
+ var arr = [];
+ for (var i=0; i < arguments.length; i++) {
+ arr.push(arguments[i]);
+ }
+ method.apply(obj, arr);
+ }
+ }
+ var _wm = wrapMethod;
+
+ // cb should take statusCode, responseText, and optionally request
+ function simpleXhr(method, uri, async, params, cb, makeXhr) {
+// log("making simple Xhr: "+[method, uri, async, params].join(", "));
+ var request = (makeXhr || newRequestObject)();
+ request.open(method, uri, async);
+ if (async) {
+ request.onreadystatechange = function() {
+ if (request.readyState != 4) return;
+ var status;
+ var responseText;
+ try {
+ status = request.status;
+ responseText = request.responseText;
+ } catch (e) { /* absorb ff error accessing request properties */ }
+ cb(status, responseText, request);
+ }
+ }
+ var data = null;
+ if (params) {
+ data = [];
+ for (var i = 0; i < params.length; ++i) {
+ data.push(encodeURIComponent(params[i].key)+"="+encodeURIComponent(params[i].value));
+ }
+ data = data.join("&");
+ request.setRequestHeader("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
+ }
+ try {
+ request.send(data);
+ } catch (e) { request.abort(); cb(500, "Error sending data!", request); }
+ if (! async) {
+ var status;
+ var responseText;
+ try {
+ status = request.status;
+ responseText = request.responseText;
+ } catch (e) { /* absorb ff error accessing request properties */ }
+ cb(status, responseText, request);
+ }
+ return request;
+ }
+
+ var timeout_noop = function() { }
+ function timeout(timeoutObject, timeoutName, millis, timeoutCallback) {
+ function clearIt(timeoutObject, timeoutName) {
+ if (timeoutObject[timeoutName]) {
+ timeoutObject[timeoutName]();
+ timeoutObject[timeoutName] = timeout_noop;
+ }
+ }
+ var timeoutId = setTimeout(function() { clearIt(timeoutObject, timeoutName); timeoutCallback(); }, millis);
+ var f = function() {
+ clearTimeout(timeoutId);
+ }
+ clearIt(timeoutObject, timeoutName);
+ timeoutObject[timeoutName] = f;
+ return f;
+ }
+
+ // handling messages
+ var lastReceivedSeqNumber = 0;
+
+ function dataHandler(msg) {
+ if (msg.seqNumber > lastReceivedSeqNumber+1) {
+ log("bad sequence number. expecting: "+(lastReceivedSeqNumber+1)+", got: "+msg.seqNumber);
+ hiccup(self);
+ return false;
+ }
+ if (msg.seqNumber < lastReceivedSeqNumber+1) return true;
+ lastReceivedSeqNumber = msg.seqNumber;
+ if (! msg.isControl) {
+ self.onmessage({ data: msg.content });
+ return true;
+ } else {
+ if (msg.content == "kill") {
+ doDisconnect({reconnect: false, reason: "Killed by server."});
+ return false;
+ }
+ }
+ }
+
+ // client-server comm
+ var postPath = function() {
+ return "%contextPath%/post?r="+randomVar()+"&v="+version+"&id="+id+"&seq="+lastReceivedSeqNumber;
+ }
+
+ function SimpleQueue() {
+ var base = [];
+ var head = 0;
+ var tail = 0;
+ this.offer = function(data) {
+ base[tail++] = data;
+ }
+ this.poll = function() {
+ if (this.length() > 0) {
+ var n = base[head];
+ delete base[head++];
+ return n;
+ }
+ }
+ this.clear = function() {
+ head = 0;
+ tail = 0;
+ var oldBase = base;
+ base = [];
+ return oldBase;
+ }
+ this.length = function() {
+ return tail - head;
+ }
+ }
+ var outgoingMessageQueue = new SimpleQueue();
+ var isPosting = false;
+ var postTimeouts = {};
+ var lastPost;
+
+ function postSingleMessageNow(isControl, data, sync, force, cb) {
+ doPostMessages([{oob: isControl, data: data, cb: cb}], sync, force)
+ }
+
+ function doPostMessages(messages, sync, force, cb) {
+ if (! force && self.readyState == self.CLOSED) return;
+ if (messages.length == 0) {
+ if (cb) cb();
+ return;
+ }
+ var data = [];
+ var callbacks = [];
+ for (var i = 0; i < messages.length; ++i) {
+ data.push({key: (messages[i].oob ? "oob" : "m"),
+ value: messages[i].data});
+ if (messages[i].cb)
+ callbacks.push(messages[i].cb);
+ }
+ function postFailed(sc, msg, req, src) {
+ var str = "";
+ try {
+ str = sc + ": "+req.statusText+" - "+msg+" ("+src+")";
+ } catch (e) { /* absorb potential Firefox error accessing req */ }
+ doDisconnect({reconnect: true, reason: "Posting message failed.", data: str});
+ for (var i = 0; i < callbacks.length; ++i) {
+ callbacks[i](false, str);
+ }
+ }
+ function postCallback(sc, msg, request) {
+ postTimeouts.post();
+ if (sc != 200 || msg.substring(0, 2) != "ok") {
+ postFailed(sc, msg, request, "1");
+ } else {
+ for (var i = 0; i < callbacks.length; ++i) {
+ callbacks[i](true);
+ }
+ if (cb) cb();
+ }
+ }
+ timeout(postTimeouts, "post", 15000, function() {
+ doDisconnect({reconnect: true, reason: "Posting message timed out."});
+ });
+ simpleXhr('post', postPath(), ! sync, data, postCallback);
+ }
+
+ function postPendingMessages() {
+ if (isPosting == true)
+ return;
+ var messages = outgoingMessageQueue.clear();
+ if (messages.length == 0) {
+ return;
+ }
+ isPosting = true;
+ doPostMessages(messages, false, false, function() { isPosting = false; setTimeout(postPendingMessages, 0); });
+ lastPost = nicetime();
+ }
+ this.postMessage = function(data, cb) {
+ if (self.readyState != self.OPEN) {
+ return;
+ }
+ outgoingMessageQueue.offer({data: data, cb: cb});
+ setTimeout(function() { postPendingMessages() }, 0);
+ }
+
+ // transports
+ function getValidChannels() {
+ var channels = [];
+ for (var i = 0; i < validChannels.length; ++i) {
+ var type = validChannels[i];
+ if (window.location.hash.length > 0) {
+ if (window.location.hash != "#"+type) {
+ continue;
+ }
+ }
+ if ($ && $.browser.opera && type != 'shortpolling' && type != 'streaming') {
+ continue;
+ }
+ channels.push(type);
+ }
+ return channels;
+ }
+ function getBasicChannel() {
+ return getValidChannels()[0];
+ }
+
+ function getOtherChannels() {
+ return getValidChannels().slice(1);
+ }
+
+ var officialChannel;
+ this.getTransportType = function() {
+ return (officialChannel ? officialChannel.name : "none");
+ }
+ var validChannels = "%acceptableChannelTypes%";
+ var canUseSubdomains = "%canUseSubdomains%";
+ var activeChannels = {};
+ var channelConstructors = {
+ shortpolling: ShortPollingChannel,
+ longpolling: LongPollingChannel,
+ streaming: StreamingChannel
+ }
+
+ function describeTimeouts(timeouts) {
+ var out = [];
+ for (var i in timeouts) {
+ if (timeouts.hasOwnProperty(i)) {
+ out.push(i+": "+(timeouts[i] == timeout_noop ? "unset" : "set"));
+ }
+ }
+ return "{ "+out.join(", ")+" }";
+ }
+
+ var channelSeq = 1;
+ function notifyConnect(channel) {
+ timeouts.connect();
+ if (! officialChannel || channel.weight > officialChannel.weight) {
+ log("switching to use channel: "+channel.name);
+ var oldChannel = officialChannel;
+ officialChannel = channel;
+ setTimeout(function() {
+ postSingleMessageNow(true, "useChannel:"+(channelSeq++)+":"+channel.name, false, false, function(success, msg) {
+ if (success) {
+ if (oldChannel) {
+ oldChannel.disconnect();
+ } else {
+ // there was no old channel, so try connecting the other channels.
+ doOtherConnect();
+ }
+ if (self.readyState != self.OPEN) {
+ self.readyState = self.OPEN;
+ self.onopen({});
+ } else {
+ self.onhiccup({connected: true});
+ }
+ } else {
+ doDisconnect({reconnect: true, reason: "Failed to select channel on server.", data: msg});
+ }
+ });
+ }, 0);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ function randomVar() {
+ return String(Math.round(Math.random()*1e12));
+ }
+
+ function channelPath() {
+ return "%contextPath%/channel?v="+version+"&r="+randomVar()+"&id="+id;
+ }
+
+ function newRequestObject() {
+ var xmlhttp=false;
+ try {
+ xmlhttp = (window.ActiveXObject && new ActiveXObject("Msxml2.XMLHTTP"))
+ } catch (e) {
+ try {
+ xmlhttp = (window.ActiveXObject && new ActiveXObject("Microsoft.XMLHTTP"));
+ } catch (E) {
+ xmlhttp = false;
+ }
+ }
+ if (!xmlhttp && typeof XMLHttpRequest!='undefined') {
+ try {
+ xmlhttp = new XMLHttpRequest();
+ } catch (e) {
+ xmlhttp=false;
+ }
+ }
+ if (!xmlhttp && window.createRequest) {
+ try {
+ xmlhttp = window.createRequest();
+ } catch (e) {
+ xmlhttp=false;
+ }
+ }
+ return xmlhttp
+ }
+
+ function DataFormatError(message) {
+ this.message = message;
+ }
+
+ function readMessage(data, startIndex) {
+ if (! startIndex) startIndex = 0;
+ var sep = data.indexOf(":", startIndex);
+ if (sep < 0) return; // don't have all the bytes for this yet.
+ var chars = Number(data.substring(startIndex, sep));
+ if (isNaN(chars))
+ throw new DataFormatError("Bad length: "+data.substring(startIndex, sep));
+ if (data.length < sep+1+chars) return; // don't have all the bytes for this yet.
+ var msg = data.substr(sep+1, chars);
+ return { message: msg, lastConsumedChar: sep+1+chars }
+ }
+
+ function iframeReader(data, startIndex) {
+ if (startIndex == 0)
+ return { message: data, lastConsumedChar: data.length }
+ }
+
+ function parseWireFormat(data, startIndex, reader) {
+ if (! startIndex) startIndex = 0;
+ var msgs = [];
+ var readThroughIndex = startIndex;
+ while (true) {
+ var msgObj = (reader || readMessage)(data, readThroughIndex)
+ if (! msgObj) break;
+ readThroughIndex = msgObj.lastConsumedChar;
+ var msg = msgObj.message;
+ var split = msg.split(":");
+ if (split[0] == 'oob') {
+ msgs.push({oob: split.slice(1).join(":")});
+ continue;
+ }
+ var seq = Number(split[0]);
+ if (isNaN(seq))
+ throw new DataFormatError("Bad sequence number: "+split[0]);
+ var control = Number(split[1]);
+ if (isNaN(control))
+ throw new DataFormatError("Bad control: "+split[1]);
+ var msgContent = split.slice(2).join(":");
+ msgs.push({seqNumber: seq, isControl: (control == 1), content: msgContent});
+ }
+ return { messages: msgs, lastConsumedChar: readThroughIndex }
+ }
+
+ function handleMessages(data, cursor, channel, reader) {
+ try {
+ messages = parseWireFormat(data, cursor, reader);
+ } catch (e) {
+ if (e instanceof DataFormatError) {
+ log("Data format error: "+e.message);
+ hiccup(channel);
+ return;
+ } else {
+ log(e.toString()+" on line: "+e.lineNumber);
+ }
+ }
+ for (var i=0; i < messages.messages.length; i++) {
+ var oob = messages.messages[i].oob;
+ if (oob) {
+ if (oob == "restart-fail") {
+ doDisconnect({reconnect: true, reason: "Server restarted or socket timed out on server."});
+ return;
+ }
+ } else {
+ if (! dataHandler(messages.messages[i]))
+ break;
+ }
+ }
+ return messages.lastConsumedChar;
+ }
+
+ function ShortPollingChannel() {
+ this.weight = 0;
+ this.name = "shortpolling";
+
+ this.isConnected = false;
+ this.isClosed = false;
+ this.request;
+ this.clearRequest = function() {
+ if (this.request) {
+ this.request.abort();
+ this.request = null;
+ }
+ }
+ this.timeouts = {};
+
+ this.describe = function() {
+ return "{ isConnected: "+this.isConnected+", isClosed: "+this.isClosed+", timeouts: "+describeTimeouts(this.timeouts)+", request: "+(this.request?"set":"not set")+" }"
+ }
+
+ this.pollDataHandler = function(sc, response, request) {
+ if (request.readyState != 4) return;
+ if (this.timeouts.poll) this.timeouts.poll();
+ var messages;
+ if (! this.isConnected) {
+ this.timeouts.connectAttempt();
+ if (sc != 200) {
+ log(this.name+" connect failed: "+sc+" / "+response);
+ setTimeout(_wm(this, this.attemptConnect), 500);
+ return;
+ }
+ var msg = (response ? readMessage(response) : undefined);
+ if (msg && msg.message == "oob:ok") {
+ this.timeouts.initialConnect();
+ this.isConnected = true;
+ log(this.name+" transport connected!");
+ if (! notifyConnect(this)) {
+ // there are better options connected.
+ log(this.name+" transport not chosen for activation.");
+ this.disconnect();
+ return;
+ }
+ this.doPoll();
+ return;
+ } else {
+ log(this.name+" connect didn't get ok: "+sc+" / "+response);
+ setTimeout(_wm(this, this.attemptConnect), 500);
+ return;
+ }
+ }
+ var chars = handleMessages(request.responseText, 0, this);
+ if (sc != 200 || ((! chars) && this.emptyResponseBad)) {
+ hiccup(this);
+ }
+ setTimeout(_wm(this, this.doPoll), this.pollDelay);
+ this.clearRequest();
+ }
+
+ this.keepRetryingConnection = true;
+ this.cancelConnect = function() {
+ this.clearRequest();
+ this.keepRetryingConnection = false;
+ }
+ this.cancelPoll = function() {
+ this.clearRequest();
+ log("poll timed out.");
+ hiccup(this);
+ }
+
+ this.doPoll = function() {
+ if (this.isClosed) return;
+ timeout(this.timeouts, "poll", this.pollTimeout, _wm(this, this.cancelPoll));
+ this.request =
+ simpleXhr('GET',
+ channelPath()+"&channel="+this.name+"&seq="+lastReceivedSeqNumber+this.pollParams(),
+ true, undefined, _wm(this, this.pollDataHandler), this.xhrGenerator);
+ }
+
+ this.pollParams = function() {
+ return "";
+ }
+ this.pollTimeout = 5000;
+ this.pollDelay = 500;
+
+ this.attemptConnect = function() {
+ if (! this.keepRetryingConnection) return;
+ log(this.name+" attempting connect");
+ this.clearRequest();
+ timeout(this.timeouts, "connectAttempt", 5000, _wm(this, this.attemptConnect));
+ this.request = simpleXhr('GET', channelPath()+"&channel="+this.name+"&new=yes&create="+(socket.readyState == socket.OPEN ? "no" : "yes")+"&seq="+lastReceivedSeqNumber,
+ true, undefined, _wm(this, this.pollDataHandler), this.xhrGenerator);
+ }
+ this.connect = function() {
+ this.attemptConnect();
+ timeout(this.timeouts, "initialConnect", 15000, _wm(this, this.cancelConnect));
+ }
+ this.disconnect = function() {
+ log(this.name+" disconnected");
+ this.isClosed = true;
+ this.clearRequest();
+ }
+ }
+
+ function StreamingChannel() {
+ this.weight = 2;
+ this.name = "streaming";
+ var self = this;
+
+ var isConnected = false;
+ var request;
+ function clearRequest() {
+ if (request) {
+ request.abort();
+ request = null;
+ if (theStream) theStream = null;
+ if (ifrDiv) {
+ ifrDiv.innerHTML = "";
+ ifrDiv = null;
+ }
+ }
+ }
+ var isClosed = false;
+ var timeouts = {};
+ var cursor = 0;
+
+ this.describe = function() {
+ return "{ isConnected: "+isConnected+", isClosed: "+isClosed+", timeouts: "+describeTimeouts(timeouts)+", request: "+(request?"set":"not set")+", cursor: "+cursor+" }";
+ };
+
+ function connectOk() {
+ isConnected = true;
+ timeouts.initialConnect();
+ if (! notifyConnect(self)) {
+ log("streaming transport not chosen for activation");
+ self.disconnect();
+ return;
+ }
+ }
+
+ function streamDataHandler() {
+ if (timeouts.data) timeouts.data();
+ if (isClosed) return;
+ try {
+ if (! request.responseText) return;
+ } catch (e) { return; }
+ if (! isConnected) {
+ var msg = readMessage(request.responseText, cursor);
+ if (! msg) return;
+ cursor = msg.lastReceivedSeqNumber;
+ if (msg.message == "oob:ok") {
+ connectOk();
+ } else {
+ log("stream: incorrect channel connect message:"+msg.message);
+ self.disconnect();
+ return;
+ }
+ } else {
+ cursor = handleMessages(request.responseText, cursor, self);
+ }
+ if (! request || request.readyState == 4) {
+ clearRequest();
+ if (isConnected) {
+ log("stream connection unexpectedly closed.");
+ hiccup(self);
+ return;
+ }
+ }
+ timeout(timeouts, "data", 60*1000, function() { hiccup(self); });
+ }
+
+ function iframeDataHandler(data) {
+ if (isClosed) return;
+ if (! isConnected) {
+ if (data == "oob:ok") {
+ connectOk();
+ } else {
+ log("iframe stream: unexpected data on connect - "+data);
+ }
+ } else {
+ handleMessages(data, 0, self, iframeReader);
+ }
+ }
+
+ function cancelConnect() {
+ isClosed = true;
+ clearRequest();
+ log("stream: failed to connect.");
+ }
+
+ // IE Stuff.
+ var theStream;
+ var ifrDiv;
+ var iframeTestCount = 0;
+ function testIframe() {
+ var state;
+ try {
+ state = ifrDiv.firstChild.readyState;
+ } catch (e) {
+ hiccup(self);
+ return;
+ }
+ if (state == 'interactive' || iframeTestCount > 10) {
+ try { var tmp = ifrDiv.firstChild.contentWindow.document.getElementById("thebody") }
+ catch (e) { hiccup(self); }
+ } else {
+ iframeTestCount++;
+ setTimeout(testIframe, 500);
+ }
+ }
+
+ this.connect = function() {
+ timeout(timeouts, "initialConnect", 15000, cancelConnect)
+
+ if (canUseSubdomains) {
+ var streamurl = "//"+randomVar()+".comet."+host+channelPath()+"&channel=streaming&type=iframe&new=yes&create="+(socket.readyState == socket.OPEN ? "no" : "yes")+"&seq="+lastReceivedSeqNumber;
+ log("stream to: "+streamurl);
+ if ($ && $.browser.opera) {
+ // set up the opera stream; requires jquery because, why not?
+ ifrDiv = $('<div style="display: none;"></div>').get(0);
+ $('body').append(ifrDiv);
+ window.comet = {
+ pass_data: iframeDataHandler,
+ disconnect: function() { hiccup(self); }
+ }
+ $(ifrDiv).append($("<iframe src='"+streamurl+"'></iframe>"));
+ iframeTestCount = 0;
+ setTimeout(testIframe, 2000);
+ // if event-source supported disconnect notifications, fuck yeah we'd use it.
+// theStream = $('<event-source>');
+// var streamurl = channelPath()+"&channel=streaming&type=opera&new=yes&create="+(socket.readyState == socket.OPEN ? "no" : "yes")+"&seq="+lastReceivedSeqNumber;
+// theStream.get(0).addEventListener('message', function(event) {
+// iframeDataHandler(event.data);
+// }, false);
+// theStream.attr('src', streamurl);
+ log("stream connect sent!");
+ return;
+ }
+ try { // TODO: remove reference to both theStream and ifrDiv on unload!
+ theStream = (window.ActiveXObject && new ActiveXObject("htmlfile"));
+ if (theStream) {
+ theStream.open();
+ theStream.write("<html><head><title>f<\/title><\/head><body>")
+ theStream.write("<s"+"cript>document.domain='"+document.domain+"';<\/s"+"cript>")
+ theStream.write("<\/body><\/html>")
+ theStream.close();
+ ifrDiv = theStream.createElement("div")
+ theStream.appendChild(ifrDiv)
+ theStream.parentWindow.comet = {
+ pass_data: iframeDataHandler,
+ disconnect: function() { hiccup(self); }
+ }
+ ifrDiv.innerHTML = "<iframe src='"+streamurl+"'></iframe>";
+ iframeTestCount = 0;
+ setTimeout(testIframe, 2000);
+ }
+ } catch (e) {
+ theStream = false
+ }
+ } else if ($ && $.browser.opera) {
+ // opera thinks it can do a normal stream, but it can't.
+ log("opera - not trying xhr");
+ return;
+ }
+ // End IE Stuff.
+ if (! theStream) {
+ request = newRequestObject();
+ request.open('get', channelPath()+"&channel=streaming&new=yes&create="+(socket.readyState == socket.OPEN ? "no" : "yes")+"&seq="+lastReceivedSeqNumber);
+ request.onreadystatechange = streamDataHandler;
+ try {
+ request.send(null);
+ } catch (e) { }
+ }
+ log("stream connect sent!");
+ }
+
+ this.disconnect = function() {
+ log("stream disconnected");
+ isClosed = true;
+ clearRequest();
+ }
+ log("new streamchannel");
+ }
+
+ // long-polling related stuff.
+ function iframePath(key) {
+ return "//"+key+".comet."+host+"%contextPath%/xhrXdFrame";
+ }
+
+ function createHiddenDiv() {
+ if (! document.getElementById('newcomethidden')) {
+ var d = document.createElement('div');
+ d.setAttribute('id', 'newcomethidden');
+ d.style.display = 'none';
+ document.body.appendChild(d);
+ }
+ return document.getElementById('newcomethidden');
+ }
+
+ function ExtHostXHR(iframe) {
+ this.open = function(method, uri, async) {
+ this.method = method;
+ this.uri = uri;
+ this.async = async;
+ }
+ var headers = {};
+ this.setRequestHeader = function(name, value) {
+ headers[name] = value;
+ }
+ this.send = function(data) {
+ var self = this;
+ this.xhr = iframe.iframe.contentWindow.doAction(this.method, this.uri, this.async, headers, data || null, function(status, response) {
+ self.readyState = 4;
+ self.status = status;
+ self.responseText = response;
+ self.onreadystatechange();
+ });
+ }
+ this.abort = function() {
+ if (this.xhr)
+ iframe.contentWindow.doAbort(this.xhr);
+ }
+ }
+
+ function createRequestIframe(cb) {
+ var randomKey = randomVar();
+ try {
+ var activeXControl = (window.ActiveXObject && new ActiveXObject("htmlfile"));
+ var htmlfileDiv;
+ if (activeXControl) {
+ activeXControl.open();
+ activeXControl.write('<html><head><title>f</title></head><body>');
+ activeXControl.write('<scr'+'ipt>document.domain=\''+document.domain+'\';</scr'+'ipt>');
+ activeXControl.write('</body></html>');
+ activeXControl.close();
+ htmlfileDiv = activeXControl.createElement('div');
+ activeXControl.appendChild(htmlfileDiv);
+ activeXControl.parentWindow["done_"+randomKey] = cb;
+ htmlfileDiv.innerHTML = "<iframe src='"+iframePath(randomKey)+"'></iframe>";
+ return {iframe: htmlfileDiv.firstChild /* should be an iframe */, axc: activeXControl, div: htmlfileDiv};
+ }
+ } catch (e) {
+ activeXControl = false;
+ }
+ log("Not using IE setup.");
+ var requestIframe = document.createElement('iframe');
+ createHiddenDiv().appendChild(requestIframe);
+ window["done_"+randomKey] = function() { try { delete window["done_"+randomKey]; } catch (e) { }; cb(); }
+ requestIframe.src = iframePath(randomKey);
+ return {iframe: requestIframe};
+ }
+
+ function createIframeRequestObject() {
+ if (! longPollingIFrame) throw Error("WebSocket isn't properly set up!");
+ return new ExtHostXHR(longPollingIFrame);
+ }
+
+ var longPollingIFrame;
+ function LongPollingChannel() {
+ ShortPollingChannel.apply(this); // sets up other state.
+ this.weight = 1;
+ this.name = "longpolling";
+
+ this.pollDelay = 0;
+ this.pollTimeout = 15000;
+ this.pollParams = function() {
+ return "&timeout="+(this.pollTimeout-5000);
+ }
+ var connect = this.connect;
+ this.connect = function() {
+ if (! longPollingIFrame) {
+ longPollingIFrame =
+ createRequestIframe(_wm(this, connect)); // specifically *not* this.connect. we want the old one!
+ } else {
+ connect.apply(this);
+ }
+ }
+ this.xhrGenerator = createIframeRequestObject;
+ this.emptyResponseBad = true;
+ }
+} \ No newline at end of file
diff --git a/trunk/infrastructure/net.appjet.ajstdlib/streaming-iframe.html b/trunk/infrastructure/net.appjet.ajstdlib/streaming-iframe.html
new file mode 100644
index 0000000..3bdb5c4
--- /dev/null
+++ b/trunk/infrastructure/net.appjet.ajstdlib/streaming-iframe.html
@@ -0,0 +1,76 @@
+<html>
+<head>
+<script>
+function createRequestObject() {
+ var xmlhttp=false;
+ /*@cc_on @*/
+ /*@if (@_jscript_version >= 5)
+ try {
+ xmlhttp = new ActiveXObject("Msxml2.XMLHTTP");
+ } catch (e) {
+ try {
+ xmlhttp = new ActiveXObject("Microsoft.XMLHTTP");
+ } catch (E) {
+ xmlhttp = false;
+ }
+ }
+ @end @*/
+ if (!xmlhttp && typeof XMLHttpRequest!='undefined') {
+ try {
+ xmlhttp = new XMLHttpRequest();
+ } catch (e) {
+ xmlhttp=false;
+ }
+ }
+ if (!xmlhttp && window.createRequest) {
+ try {
+ xmlhttp = window.createRequest();
+ } catch (e) {
+ xmlhttp=false;
+ }
+ }
+ return xmlhttp
+}
+var host = window.location.host;
+var oldDomain = document.domain;
+var newDomain = oldDomain.substring(oldDomain.indexOf(".", oldDomain.indexOf(".")+1)+1);
+
+function doAction(method, uri, async, headers, body, cb) {
+ try {
+ document.domain = oldDomain;
+ } catch (e) { }
+ var req = createRequestObject();
+ req.open(method, '//'+host+uri, async);
+ for (var i in headers) {
+ req.setRequestHeader(i, headers[i]);
+ }
+ req.onreadystatechange = function() {
+ if (req.readyState == 4) {
+ try {
+ document.domain = newDomain;
+ cb(req.status, req.responseText);
+ } catch (e) {
+ // yikes. well, hopefully a timeout will notice this error.
+ }
+ }
+ }
+ req.send(body);
+}
+function doAbort(xhr) {
+ try {
+ document.domain = oldDomain;
+ } catch (e) { }
+ xhr.abort();
+}
+document.domain = newDomain;
+window.onload = function() {
+ var doneKey = 'done_'+oldDomain.split(".", 2)[0];
+ setTimeout(function() {
+ window.parent[doneKey]();
+ }, 0);
+}
+</script>
+</head>
+<body>
+</body>
+</html>
diff --git a/trunk/infrastructure/net.appjet.ajstdlib/streaming.scala b/trunk/infrastructure/net.appjet.ajstdlib/streaming.scala
new file mode 100644
index 0000000..fbff137
--- /dev/null
+++ b/trunk/infrastructure/net.appjet.ajstdlib/streaming.scala
@@ -0,0 +1,892 @@
+/**
+ * Copyright 2009 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS-IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.appjet.ajstdlib;
+
+import scala.collection.mutable.{Queue, HashMap, SynchronizedMap, ArrayBuffer};
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse, HttpServlet};
+import org.mortbay.jetty.servlet.{ServletHolder, Context};
+import org.mortbay.jetty.{HttpConnection, Handler, RetryRequest};
+import org.mortbay.jetty.nio.SelectChannelConnector;
+import org.mortbay.io.nio.SelectChannelEndPoint;
+import org.mortbay.util.ajax.{ContinuationSupport, Continuation};
+
+import java.util.{Timer, TimerTask};
+import java.lang.ref.WeakReference;
+
+import org.mozilla.javascript.{Context => JSContext, Scriptable};
+
+import net.appjet.oui._;
+import net.appjet.oui.Util.enumerationToRichEnumeration;
+import net.appjet.common.util.HttpServletRequestFactory;
+
+trait SocketConnectionHandler {
+ def message(sender: StreamingSocket, data: String, req: HttpServletRequest);
+ def connect(socket: StreamingSocket, req: HttpServletRequest);
+ def disconnect(socket: StreamingSocket, req: HttpServletRequest);
+}
+
+object SocketManager {
+ val sockets = new HashMap[String, StreamingSocket] with SynchronizedMap[String, StreamingSocket];
+ val handler = new SocketConnectionHandler {
+ val cometLib = new FixedDiskLibrary(new SpecialJarOrNotFile(config.ajstdlibHome, "oncomet.js"));
+ def cometExecutable = cometLib.executable;
+
+ def message(socket: StreamingSocket, data: String, req: HttpServletRequest) {
+ val t1 = profiler.time;
+// println("Message from: "+socket.id+": "+data);
+ val runner = ScopeReuseManager.getRunner;
+ val ec = ExecutionContext(new RequestWrapper(req), new ResponseWrapper(null), runner);
+ ec.attributes("cometOperation") = "message";
+ ec.attributes("cometId") = socket.id;
+ ec.attributes("cometData") = data;
+ ec.attributes("cometSocket") = socket;
+ net.appjet.oui.execution.execute(
+ ec,
+ (sc: Int, msg: String) =>
+ throw new HandlerException(sc, msg, null),
+ () => {},
+ () => { ScopeReuseManager.freeRunner(runner); },
+ Some(cometExecutable));
+ cometlatencies.register(((profiler.time-t1)/1000).toInt);
+ }
+ def connect(socket: StreamingSocket, req: HttpServletRequest) {
+// println("Connect on: "+socket);
+ val runner = ScopeReuseManager.getRunner;
+ val ec = ExecutionContext(new RequestWrapper(req), new ResponseWrapper(null), runner);
+ ec.attributes("cometOperation") = "connect";
+ ec.attributes("cometId") = socket.id;
+ ec.attributes("cometSocket") = socket;
+ net.appjet.oui.execution.execute(
+ ec,
+ (sc: Int, msg: String) =>
+ throw new HandlerException(sc, msg, null),
+ () => {},
+ () => { ScopeReuseManager.freeRunner(runner); },
+ Some(cometExecutable));
+ }
+ def disconnect(socket: StreamingSocket, req: HttpServletRequest) {
+ val toRun = new Runnable {
+ def run() {
+ val runner = ScopeReuseManager.getRunner;
+ val ec = ExecutionContext(new RequestWrapper(req), new ResponseWrapper(null), runner);
+ ec.attributes("cometOperation") = "disconnect";
+ ec.attributes("cometId") = socket.id;
+ ec.attributes("cometSocket") = socket;
+ net.appjet.oui.execution.execute(
+ ec,
+ (sc: Int, msg: String) =>
+ throw new HandlerException(sc, msg, null),
+ () => {},
+ () => { ScopeReuseManager.freeRunner(runner); },
+ Some(cometExecutable));
+ }
+ }
+ main.server.getThreadPool().dispatch(toRun);
+ }
+ }
+ def apply(id: String, create: Boolean) = {
+ if (create) {
+ Some(sockets.getOrElseUpdate(id, new StreamingSocket(id, handler)));
+ } else {
+ if (id == null)
+ error("bad id: "+id);
+ sockets.get(id);
+ }
+ }
+ class HandlerException(val sc: Int, val msg: String, val cause: Exception)
+ extends RuntimeException("An error occurred while handling a request: "+sc+" - "+msg, cause);
+}
+
+// And this would be the javascript interface. Whee.
+object Comet extends CometSupport.CometHandler {
+ def init() {
+ CometSupport.cometHandler = this;
+ context.start();
+ }
+
+ val acceptableTransports = {
+ val t = new ArrayBuffer[String];
+ if (! config.disableShortPolling) {
+ t += "shortpolling";
+ }
+ if (config.transportUseWildcardSubdomains) {
+ t += "longpolling";
+ }
+ t += "streaming";
+ t.mkString("['", "', '", "']");
+ }
+
+
+ val servlet = new StreamingSocketServlet();
+ val holder = new ServletHolder(servlet);
+ val context = new Context(null, "/", Context.NO_SESSIONS | Context.NO_SECURITY);
+ context.addServlet(holder, "/*");
+ context.setMaxFormContentSize(1024*1024);
+
+ def handleCometRequest(req: HttpServletRequest, res: HttpServletResponse) {
+ context.handle(req.getRequestURI().substring(config.transportPrefix.length), req, res, Handler.FORWARD);
+ }
+
+ lazy val ccLib = new FixedDiskResource(new JarOrNotFile(config.ajstdlibHome, "streaming-client.js") {
+ override val classBase = "/net/appjet/ajstdlib/";
+ override val fileSep = "/../../net.appjet.ajstdlib/";
+ });
+ def clientCode(contextPath: String, acceptableChannelTypes: String) = {
+ ccLib.contents.replaceAll("%contextPath%", contextPath).replaceAll("\"%acceptableChannelTypes%\"", acceptableChannelTypes).replaceAll("\"%canUseSubdomains%\"", if (config.transportUseWildcardSubdomains) "true" else "false");
+ }
+ def clientMTime = ccLib.fileLastModified;
+
+ lazy val ccFrame = new FixedDiskResource(new JarOrNotFile(config.ajstdlibHome, "streaming-iframe.html") {
+ override val classBase = "/net/appjet/ajstdlib/";
+ override val fileSep = "/../../net.appjet.ajstdlib/";
+ });
+ def frameCode = {
+ if (! config.devMode)
+ ccFrame.contents.replace("<head>\n<script>", """<head>
+ <script>
+ window.onerror = function() { /* silently drop errors */ }
+ </script>
+ <script>""");
+ else
+ ccFrame.contents;
+ }
+
+
+ // public
+ def connections(ec: ExecutionContext): Scriptable = {
+ JSContext.getCurrentContext().newArray(ec.runner.globalScope, SocketManager.sockets.keys.toList.toArray[Object]);
+ }
+
+ // public
+ def connectionStatus = {
+ val m = new HashMap[String, Int];
+ for (socket <- SocketManager.sockets.values) {
+ val key = socket.currentChannel.map(_.kind.toString()).getOrElse("(unconnected)");
+ m(key) = m.getOrElse(key, 0) + 1;
+ }
+ m;
+ }
+
+ // public
+ def getNumCurrentConnections = SocketManager.sockets.size;
+
+ // public
+ def write(id: String, msg: String) {
+ SocketManager.sockets.get(id).foreach(_.sendMessage(false, msg));
+ }
+
+ // public
+ def isConnected(id: String): java.lang.Boolean = {
+ SocketManager.sockets.contains(id);
+ }
+
+ // public
+ def getTransportType(id: String): String = {
+ SocketManager.sockets.get(id).map(_.currentChannel.map(_.kind.toString()).getOrElse("none")).getOrElse("none");
+ }
+
+ // public
+ def disconnect(id: String) {
+ SocketManager.sockets.get(id).foreach(x => x.close());
+ }
+
+ // public
+ def setAttribute(ec: ExecutionContext, id: String, key: String, value: String) {
+ ec.attributes.get("cometSocket").map(x => Some(x.asInstanceOf[StreamingSocket])).getOrElse(SocketManager.sockets.get(id))
+ .foreach(_.attributes(key) = value);
+ }
+ // public
+ def getAttribute(ec: ExecutionContext, id: String, key: String): String = {
+ ec.attributes.get("cometSocket").map(x => Some(x.asInstanceOf[StreamingSocket])).getOrElse(SocketManager.sockets.get(id))
+ .map(_.attributes.getOrElse(key, null)).getOrElse(null);
+ }
+
+ // public
+ def getClientCode(ec: ExecutionContext) = {
+ clientCode(config.transportPrefix, acceptableTransports);
+ }
+ def getClientMTime(ec: ExecutionContext) = clientMTime;
+}
+
+class StreamingSocket(val id: String, handler: SocketConnectionHandler) {
+ var hasConnected = false;
+ var shutdown = false;
+ var killed = false;
+ var currentChannel: Option[Channel] = None;
+ val activeChannels = new HashMap[ChannelType.Value, Channel]
+ with SynchronizedMap[ChannelType.Value, Channel];
+
+ lazy val attributes = new HashMap[String, String] with SynchronizedMap[String, String];
+
+ def channel(typ: String, create: Boolean, subType: String): Option[Channel] = {
+ val channelType = ChannelType.valueOf(typ);
+ if (channelType.isEmpty) {
+ streaminglog(Map(
+ "type" -> "error",
+ "error" -> "unknown channel type",
+ "channelType" -> channelType));
+ None;
+ } else if (create) {
+ Some(activeChannels.getOrElseUpdate(channelType.get, Channels.createNew(channelType.get, this, subType)));
+ } else {
+ activeChannels.get(channelType.get);
+ }
+ }
+
+ val outgoingMessageQueue = new Queue[SocketMessage];
+ val unconfirmedMessages = new HashMap[Int, SocketMessage];
+
+ var lastSentSeqNumber = 0;
+ var lastConfirmedSeqNumber = 0;
+
+ // external API
+ def sendMessage(isControl: boolean, body: String) {
+ if (hasConnected && ! shutdown) {
+ synchronized {
+ lastSentSeqNumber += 1;
+ val msg = new SocketMessage(lastSentSeqNumber, isControl, body);
+ outgoingMessageQueue += msg;
+ unconfirmedMessages(msg.seq) = msg;
+ }
+ currentChannel.foreach(_.messageWaiting());
+ }
+ }
+ def close() {
+ synchronized {
+ sendMessage(true, "kill");
+ shutdown = true;
+ Channels.timer.schedule(new TimerTask {
+ def run() {
+ kill("server request, timeout");
+ }
+ }, 15000);
+ }
+ }
+
+ var creatingRequest: Option[HttpServletRequest] = None;
+ // internal API
+ def kill(reason: String) {
+ synchronized {
+ if (! killed) {
+ streaminglog(Map(
+ "type" -> "event",
+ "event" -> "connection-killed",
+ "connection" -> id,
+ "reason" -> reason));
+ killed = true;
+ SocketManager.sockets -= id;
+ activeChannels.foreach(_._2.close());
+ currentChannel = None;
+ if (hasConnected) {
+ handler.disconnect(this, creatingRequest.getOrElse(null));
+ }
+ }
+ }
+ }
+ def receiveMessage(body: String, req: HttpServletRequest) {
+// println("Message received on "+id+": "+body);
+ handler.message(this, body, req);
+ }
+ def getWaitingMessage(channel: Channel): Option[SocketMessage] = {
+ synchronized {
+ if (currentChannel.isDefined && currentChannel.get == channel &&
+ ! outgoingMessageQueue.isEmpty) {
+ Some(outgoingMessageQueue.dequeue);
+ } else {
+ None;
+ }
+ }
+ }
+ def getUnconfirmedMessages(channel: Channel): Collection[SocketMessage] = {
+ synchronized {
+ if (currentChannel.isDefined && currentChannel.get == channel) {
+ for (i <- lastConfirmedSeqNumber+1 until lastSentSeqNumber+1)
+ yield unconfirmedMessages(i);
+ } else {
+ List[SocketMessage]();
+ }
+ }
+ }
+ def updateConfirmedSeqNumber(channel: Channel, received: Int) {
+ synchronized {
+ if (received > lastConfirmedSeqNumber && (channel == null || (currentChannel.isDefined && channel == currentChannel.get))) {
+ val oldConfirmed = lastConfirmedSeqNumber;
+ lastConfirmedSeqNumber = received;
+ for (i <- oldConfirmed+1 until lastConfirmedSeqNumber+1) { // inclusive!
+ unconfirmedMessages -= i;
+ }
+ }
+ }
+ }
+
+ var lastChannelUpdate = 0;
+ def useChannel(seqNo: Int, channelType0: String, req: HttpServletRequest) = synchronized {
+ if (seqNo <= lastChannelUpdate) false else {
+ lastChannelUpdate = seqNo;
+ val channelType = ChannelType.valueOf(channelType0);
+ if (channelType.isDefined) {
+ val channel = activeChannels.get(channelType.get);
+ if (channel.isDefined) {
+ if (! hasConnected) {
+ hasConnected = true;
+ creatingRequest = Some(HttpServletRequestFactory.createRequest(req));
+ handler.connect(this, req);
+ }
+ currentChannel = channel;
+// println("switching "+id+" to channel: "+channelType0);
+ if (currentChannel.get.isConnected) {
+ revive(channel.get);
+ } else {
+ hiccup(channel.get);
+ }
+ currentChannel.get.messageWaiting();
+ true;
+ } else
+ false;
+ } else
+ false;
+ }
+ }
+// def handleReceivedMessage(seq: Int, data: String) {
+// synchronized {
+// handler.message(this, data)
+// // TODO(jd): add client->server sequence numbers.
+// // if (seq == lastReceivedSeqNumber+1){
+// // lastReceivedSeqNumber = seq;
+// // handler.message(this, data);
+// // } else {
+// // // handle error.
+// // }
+// }
+// }
+ def hiccup(channel: Channel) = synchronized {
+ if (currentChannel.isDefined && channel == currentChannel.get) {
+// println("hiccuping: "+id);
+ scheduleTimeout();
+ }
+ }
+ def revive(channel: Channel) = synchronized {
+ if (currentChannel.isDefined && channel == currentChannel.get) {
+// println("reviving: "+id);
+ cancelTimeout();
+ }
+ }
+ def prepareForReconnect() = synchronized {
+// println("client-side hiccup: "+id);
+ activeChannels.foreach(_._2.close());
+ activeChannels.clear();
+ currentChannel = None;
+ scheduleTimeout();
+ }
+
+ // helpers
+ var timeoutTask: TimerTask = null;
+
+ def scheduleTimeout() {
+ if (timeoutTask != null) return;
+ val p = new WeakReference(this);
+ timeoutTask = new TimerTask {
+ def run() {
+ val socket = p.get();
+ if (socket != null) {
+ socket.kill("timeout");
+ }
+ }
+ }
+ Channels.timer.schedule(timeoutTask, 15*1000);
+ }
+ def cancelTimeout() {
+ if (timeoutTask != null)
+ timeoutTask.cancel();
+ timeoutTask = null;
+ }
+ scheduleTimeout();
+
+ streaminglog(Map(
+ "type" -> "event",
+ "event" -> "connection-created",
+ "connection" -> id));
+}
+
+object ChannelType extends Enumeration("shortpolling", "longpolling", "streaming") {
+ val ShortPolling, LongPolling, Streaming = Value;
+}
+
+object Channels {
+ def createNew(typ: ChannelType.Value, socket: StreamingSocket, subType: String): Channel = {
+ typ match {
+ case ChannelType.ShortPolling => new ShortPollingChannel(socket);
+ case ChannelType.LongPolling => new LongPollingChannel(socket);
+ case ChannelType.Streaming => {
+ subType match {
+ case "iframe" => new StreamingChannel(socket) with IFrameChannel;
+ case "opera" => new StreamingChannel(socket) with OperaChannel;
+ case _ => new StreamingChannel(socket);
+ }
+ }
+ }
+ }
+
+ val timer = new Timer(true);
+}
+
+class SocketMessage(val seq: Int, val isControl: Boolean, val body: String) {
+ def payload = seq+":"+(if (isControl) "1" else "0")+":"+body;
+}
+
+trait Channel {
+ def messageWaiting();
+ def close();
+ def handle(req: HttpServletRequest, res: HttpServletResponse);
+ def isConnected: Boolean;
+
+ def kind: ChannelType.Value;
+ def sendRestartFailure(ec: ExecutionContext);
+}
+
+trait XhrChannel extends Channel {
+ def wrapBody(msg: String) = msg.length+":"+msg;
+
+ // wire format: msgLength:seq:[01]:msg
+ def wireFormat(msg: SocketMessage) = wrapBody(msg.payload);
+ def controlMessage(data: String) = wrapBody("oob:"+data);
+
+ def sendRestartFailure(ec: ExecutionContext) {
+ ec.response.write(controlMessage("restart-fail"));
+ }
+}
+
+// trait IFrameChannel extends Channel {
+// def wireFormat(msg: SocketMessage)
+// }
+
+class ShortPollingChannel(val socket: StreamingSocket) extends Channel with XhrChannel {
+ def kind = ChannelType.ShortPolling;
+
+ def messageWaiting() {
+ // do nothing.
+ }
+ def close() {
+ // do nothing
+ }
+ def isConnected = false;
+
+ def handle(req: HttpServletRequest, res: HttpServletResponse) {
+ val ec = req.getAttribute("executionContext").asInstanceOf[ExecutionContext];
+ val out = new StringBuilder();
+ socket.synchronized {
+ socket.revive(this);
+ if (req.getParameter("new") == "yes") {
+ out.append(controlMessage("ok"));
+ } else {
+ val lastReceivedSeq = java.lang.Integer.parseInt(req.getParameter("seq"));
+ socket.updateConfirmedSeqNumber(this, lastReceivedSeq);
+ for (msg <- socket.getUnconfirmedMessages(this)) {
+ out.append(wireFormat(msg));
+ }
+ // ALL MESSAGES ARE UNCONFIRMED AT THIS POINT! JUST CLEAR QUEUE.
+ var msg = socket.getWaitingMessage(this);
+ while (msg.isDefined) {
+ msg = socket.getWaitingMessage(this);
+ }
+ }
+ }
+// println("Writing to "+socket.id+": "+out.toString);
+ ec.response.write(out.toString);
+ socket.synchronized {
+ socket.hiccup(this);
+ }
+ }
+}
+
+trait IFrameChannel extends StreamingChannel {
+ override def wrapBody(msgBody: String) = {
+ val txt = "<script type=\"text/javascript\">p('"+
+ msgBody.replace("\\","\\\\").replace("'", "\\'")+"');</script>";
+ if (txt.length < 256)
+ String.format("%256s", txt);
+ else
+ txt;
+ }
+
+ def header(req: HttpServletRequest) = {
+ val document_domain =
+ "\""+req.getHeader("Host").split("\\.").slice(2).mkString(".").split(":")(0)+"\"";
+ """<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN"
+"http://www.w3.org/TR/html4/strict.dtd">
+<html><head><title>f</title></head><body id="thebody" onload="(!parent.closed)&&d()"><script type="text/javascript">document.domain = """+document_domain+""";
+var p = function(data) { try { parent.comet.pass_data } catch (err) { /* failed to pass data. no recourse. */ } };
+var d = parent.comet.disconnect;"""+(if(!config.devMode)"\nwindow.onerror = function() { /* silently drop errors */ }\n" else "")+"</script>"; // " - damn textmate mode!
+ }
+
+ override def sendRestartFailure(ec: ExecutionContext) {
+ ec.response.write(header(ec.request.req));
+ ec.response.write(controlMessage("restart-fail"));
+ }
+
+ override def handleNewConnection(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) {
+ super.handleNewConnection(req, res, out);
+ res.setContentType("text/html");
+ out.append(header(req));
+ }
+}
+
+trait OperaChannel extends StreamingChannel {
+ override def wrapBody(msgBody: String) = {
+ "Event: message\ndata: "+msgBody+"\n\n";
+ }
+ override def handleNewConnection(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) {
+ super.handleNewConnection(req, res, out);
+ res.setContentType("application/x-dom-event-stream");
+ }
+}
+
+class StreamingChannel(val socket: StreamingSocket) extends Channel with XhrChannel {
+ def kind = ChannelType.Streaming;
+
+ var c: Option[SelectChannelConnector.RetryContinuation] = None;
+ var doClose = false;
+
+ def messageWaiting() {
+ main.server.getThreadPool().dispatch(new Runnable() {
+ def run() {
+ socket.synchronized {
+ c.filter(_.isPending()).foreach(_.resume());
+ }
+ }
+ });
+ }
+
+ def setSequenceNumberIfAppropriate(req: HttpServletRequest) {
+ if (c.get.isNew) {
+ val lastReceivedSeq = java.lang.Integer.parseInt(req.getParameter("seq"));
+ socket.updateConfirmedSeqNumber(this, lastReceivedSeq);
+ }
+ }
+
+ def sendHandshake(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) {
+ out.append(controlMessage("ok"));
+ }
+
+ def sendUnconfirmedMessages(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) {
+ for (msg <- socket.getUnconfirmedMessages(this)) {
+ out.append(wireFormat(msg));
+ }
+ }
+
+ def sendWaitingMessages(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) {
+ var msg = socket.getWaitingMessage(this);
+ while (msg.isDefined) {
+ out.append(wireFormat(msg.get));
+ msg = socket.getWaitingMessage(this);
+ }
+ }
+
+ def handleUnexpectedDisconnect(req: HttpServletRequest, res: HttpServletResponse, ep: KnowsAboutDispatch) {
+ socket.synchronized {
+ socket.hiccup(this);
+ }
+ ep.close();
+ }
+
+ def writeAndFlush(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder, ep: KnowsAboutDispatch) {
+// println("Writing to "+socket.id+": "+out.toString);
+ res.getWriter.print(out.toString);
+ res.getWriter.flush();
+ }
+
+ def suspendIfNecessary(req: HttpServletRequest, res: HttpServletResponse,
+ out: StringBuilder, ep: KnowsAboutDispatch) {
+ scheduleKeepalive(50*1000);
+ ep.undispatch();
+ c.get.suspend(0);
+ }
+
+ def sendKeepaliveIfNecessary(out: StringBuilder, sendKeepalive: Boolean) {
+ if (out.length == 0 && sendKeepalive) {
+ out.append(controlMessage("keepalive"));
+ }
+ }
+
+ def shouldHandshake(req: HttpServletRequest, res: HttpServletResponse) = c.get.isNew;
+
+ var sendKeepalive = false;
+ var keepaliveTask: TimerTask = null;
+ def scheduleKeepalive(timeout: Int) {
+ if (keepaliveTask != null) {
+ keepaliveTask.cancel();
+ }
+ val p = new WeakReference(this);
+ keepaliveTask = new TimerTask {
+ def run() {
+ val channel = p.get();
+ if (channel != null) {
+ channel.synchronized {
+ channel.sendKeepalive = true;
+ channel.messageWaiting();
+ }
+ }
+ }
+ }
+ Channels.timer.schedule(keepaliveTask, timeout);
+ }
+
+ def handleNewConnection(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) {
+ req.setAttribute("StreamingSocketServlet_channel", this);
+ res.setHeader("Connection", "close");
+ for ((k, v) <- Util.noCacheHeaders) { res.setHeader(k, v); } // maybe this will help with proxies?
+ res.setContentType("text/messages; charset=utf-8");
+ }
+
+ def handle(req: HttpServletRequest, res: HttpServletResponse) {
+ val ec = req.getAttribute("executionContext").asInstanceOf[ExecutionContext];
+ val ep = HttpConnection.getCurrentConnection.getEndPoint.asInstanceOf[KnowsAboutDispatch];
+ val out = new StringBuilder;
+ try {
+ socket.synchronized {
+ val sendKeepaliveNow = sendKeepalive;
+ sendKeepalive = false;
+ if (keepaliveTask != null) {
+ keepaliveTask.cancel();
+ keepaliveTask = null;
+ }
+ c = Some(ContinuationSupport.getContinuation(req, socket).asInstanceOf[SelectChannelConnector.RetryContinuation]);
+ setSequenceNumberIfAppropriate(req);
+ if (doClose) {
+ ep.close();
+ return;
+ }
+ if (c.get.isNew) {
+ handleNewConnection(req, res, out);
+ } else {
+ c.get.suspend(-1);
+ if (ep.isDispatched) {
+ handleUnexpectedDisconnect(req, res, ep);
+ return;
+ }
+ }
+ if (shouldHandshake(req, res)) {
+// println("new stream request: "+socket.id);
+ sendHandshake(req, res, out);
+ sendUnconfirmedMessages(req, res, out);
+ }
+ sendWaitingMessages(req, res, out);
+ sendKeepaliveIfNecessary(out, sendKeepaliveNow);
+ suspendIfNecessary(req, res, out, ep);
+ }
+ } finally {
+ writeAndFlush(req, res, out, ep);
+ }
+ }
+
+ def close() {
+ doClose = true;
+ messageWaiting();
+ }
+
+ def isConnected = ! doClose;
+}
+
+class LongPollingChannel(socket: StreamingSocket) extends StreamingChannel(socket) {
+// println("creating longpoll!");
+ override def kind = ChannelType.LongPolling;
+
+ override def shouldHandshake(req: HttpServletRequest, res: HttpServletResponse) =
+ req.getParameter("new") == "yes";
+
+ override def sendHandshake(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) {
+// println("sending handshake");
+ out.append(controlMessage("ok"));
+ }
+
+ override def suspendIfNecessary(req: HttpServletRequest, res: HttpServletResponse,
+ out: StringBuilder, ep: KnowsAboutDispatch) {
+ if (out.length == 0) {
+// println("suspending longpoll: "+socket.id);
+ val to = java.lang.Integer.parseInt(req.getParameter("timeout"));
+// println("LongPoll scheduling keepalive for: "+to);
+ scheduleKeepalive(to);
+ ep.undispatch();
+ c.get.suspend(0);
+ }
+ }
+
+ override def writeAndFlush(req: HttpServletRequest, res: HttpServletResponse,
+ out: StringBuilder, ep: KnowsAboutDispatch) {
+ if (out.length > 0) {
+// println("Writing to "+socket.id+": "+out.toString);
+// println("writing and flushing longpoll")
+ val ec = req.getAttribute("executionContext").asInstanceOf[ExecutionContext];
+ for ((k, v) <- Util.noCacheHeaders) { ec.response.setHeader(k, v); } // maybe this will help with proxies?
+// println("writing: "+out);
+ ec.response.write(out.toString);
+ socket.synchronized {
+ socket.hiccup(this);
+ c = None;
+ }
+ }
+ }
+
+ override def handleNewConnection(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) {
+ socket.revive(this);
+ req.setAttribute("StreamingSocketServlet_channel", this);
+ }
+
+ override def isConnected = socket.synchronized {
+ c.isDefined;
+ }
+}
+
+class StreamingSocketServlet extends HttpServlet {
+ val version = 2;
+
+ override def doGet(req: HttpServletRequest, res: HttpServletResponse) {
+// describeRequest(req);
+ val ec = req.getAttribute("executionContext").asInstanceOf[ExecutionContext];
+ try {
+ if (req.getPathInfo() == "/js/client.js") {
+ val contextPath = config.transportPrefix;
+ val acceptableTransports = Comet.acceptableTransports;
+ ec.response.setContentType("application/x-javascript");
+ ec.response.write(Comet.clientCode(contextPath, acceptableTransports));
+ } else if (req.getPathInfo() == "/xhrXdFrame") {
+ ec.response.setContentType("text/html; charset=utf-8");
+ ec.response.write(Comet.frameCode);
+ } else {
+ val v = req.getParameter("v");
+ if (v == null || java.lang.Integer.parseInt(v) != version) {
+ res.sendError(HttpServletResponse.SC_BAD_REQUEST, "bad version number!");
+ return;
+ }
+ val existingChannel = req.getAttribute("StreamingSocketServlet_channel");
+ if (existingChannel != null) {
+ existingChannel.asInstanceOf[Channel].handle(req, res);
+ } else {
+ val socketId = req.getParameter("id");
+ val channelType = req.getParameter("channel");
+ val isNew = req.getParameter("new") == "yes";
+ val shouldCreateSocket = req.getParameter("create") == "yes";
+ val subType = req.getParameter("type");
+ val channel = SocketManager(socketId, shouldCreateSocket).map(_.channel(channelType, isNew, subType)).getOrElse(None);
+ if (channel.isDefined) {
+ channel.get.handle(req, res);
+ } else {
+ streaminglog(Map(
+ "type" -> "event",
+ "event" -> "restart-failure",
+ "connection" -> socketId));
+ val failureChannel = ChannelType.valueOf(channelType).map(Channels.createNew(_, null, subType));
+ if (failureChannel.isDefined) {
+ failureChannel.get.sendRestartFailure(ec);
+ } else {
+ ec.response.setStatusCode(HttpServletResponse.SC_NOT_FOUND);
+ ec.response.write("So such socket, and/or unknown channel type: "+channelType);
+ }
+ }
+ }
+ }
+ } catch {
+ case e: RetryRequest => throw e;
+ case t: Throwable => {
+ exceptionlog("A comet error occurred: ");
+ exceptionlog(t);
+ ec.response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ ec.response.write(t.getMessage());
+ }
+ }
+ }
+
+ def describeRequest(req: HttpServletRequest) {
+ println(req.getMethod+" on "+req.getRequestURI()+"?"+req.getQueryString());
+ for (pname <-
+ req.getParameterNames.asInstanceOf[java.util.Enumeration[String]]) {
+ println(" "+pname+" -> "+req.getParameterValues(pname).mkString("[", ",", "]"));
+ }
+ }
+
+ override def doPost(req: HttpServletRequest, res: HttpServletResponse) {
+ val v = req.getParameter("v");
+ if (v == null || java.lang.Integer.parseInt(v) != version) {
+ res.sendError(HttpServletResponse.SC_BAD_REQUEST, "bad version number!");
+ return;
+ }
+ val ec = req.getAttribute("executionContext").asInstanceOf[ExecutionContext];
+ val socketId = req.getParameter("id");
+ val socket = SocketManager(socketId, false);
+
+// describeRequest(req);
+
+ if (socket.isEmpty) {
+ ec.response.write("restart-fail");
+ streaminglog(Map(
+ "type" -> "event",
+ "event" -> "restart-failure",
+ "connection" -> socketId));
+// println("socket restart-fail: "+socketId);
+ } else {
+ val seq = java.lang.Integer.parseInt(req.getParameter("seq"));
+ socket.get.updateConfirmedSeqNumber(null, seq);
+ val messages = req.getParameterValues("m");
+ val controlMessages = req.getParameterValues("oob");
+ try {
+ if (messages != null)
+ for (msg <- messages) socket.get.receiveMessage(msg, req);
+ if (controlMessages != null)
+ for (msg <- controlMessages) {
+// println("Control message from "+socket.get.id+": "+msg);
+ msg match {
+ case "hiccup" => {
+ streaminglog(Map(
+ "type" -> "event",
+ "event" -> "hiccup",
+ "connection" -> socketId));
+ socket.get.prepareForReconnect();
+ }
+ case _ => {
+ if (msg.startsWith("useChannel")) {
+ val msgParts = msg.split(":");
+ socket.get.useChannel(java.lang.Integer.parseInt(msgParts(1)), msgParts(2), req);
+ } else if (msg.startsWith("kill")) {
+ socket.get.kill("client request: "+msg.substring(Math.min(msg.length, "kill:".length)));
+ } else {
+ streaminglog(Map(
+ "type" -> "error",
+ "error" -> "unknown control message",
+ "connection" -> socketId,
+ "message" -> msg));
+ }
+ }
+ }
+ }
+ ec.response.write("ok");
+ } catch {
+ case e: SocketManager.HandlerException => {
+ exceptionlog(e);
+ ec.response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ ec.response.write(e.getMessage());
+ // log these?
+ }
+ case t: Throwable => {
+ // shouldn't happen...
+ exceptionlog(t);
+ ec.response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ ec.response.write(t.getMessage());
+ }
+ }
+ }
+ }
+}
diff --git a/trunk/infrastructure/net.appjet.ajstdlib/timer.scala b/trunk/infrastructure/net.appjet.ajstdlib/timer.scala
new file mode 100644
index 0000000..dac8fb6
--- /dev/null
+++ b/trunk/infrastructure/net.appjet.ajstdlib/timer.scala
@@ -0,0 +1,85 @@
+/**
+ * Copyright 2009 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS-IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.appjet.ajstdlib;
+
+import scala.collection.mutable.{HashMap,ListBuffer};
+import java.util.concurrent.locks.ReentrantLock;
+
+object timer {
+
+ var _timings = new HashMap[String,ListBuffer[double]];
+ var _lock = new ReentrantLock;
+ var _callstack = new ThreadLocal[ListBuffer[String]];
+
+ def start(opname: String) = {
+ var _localcallstack = _callstack.get();
+ if (_localcallstack == null) {
+ _callstack.set(new ListBuffer[String]);
+ _localcallstack = _callstack.get();
+ }
+ _localcallstack += opname;
+ var _oplabel = _localcallstack.mkString(".");
+ val startTime: long = System.nanoTime();
+
+ new {
+ def done() {
+ val elapsedTimeMs: double = (System.nanoTime() - startTime) / 1.0e6;
+
+ _lock.lock();
+ try {
+ var times = _timings.getOrElse(_oplabel, new ListBuffer[double]);
+ /*
+ if (times.size > 100000) {
+ times = new ListBuffer[double];
+ }*/
+ times += elapsedTimeMs;
+ _timings.put(_oplabel, times);
+ _localcallstack.remove(_localcallstack.length-1);
+ } finally {
+ _lock.unlock();
+ }
+ }
+ }
+ }
+
+ def getOpNames(): Array[String] = {
+ _lock.lock();
+ try {
+ return _timings.keys.toList.toArray;
+ } finally {
+ _lock.unlock();
+ }
+ }
+
+ def getStats(opname: String): Array[double] = {
+ _lock.lock();
+
+ try {
+ var times:ListBuffer[double] = _timings(opname);
+ var total = times.foldRight(0.0)(_ + _);
+ return Array(times.size, total, (total / times.size));
+ } finally {
+ _lock.unlock();
+ }
+ }
+
+ def reset() {
+ _lock.lock();
+ _timings = new HashMap[String,ListBuffer[double]];
+ _lock.unlock();
+ }
+}