aboutsummaryrefslogtreecommitdiffstats
path: root/trunk/infrastructure/net.appjet.ajstdlib
diff options
context:
space:
mode:
authorElliot Kroo <kroo@appjet.com>2010-03-11 15:21:30 -0800
committerElliot Kroo <kroo@appjet.com>2010-03-11 15:21:30 -0800
commit98e2821b38a775737e42a2479a6bc65107210859 (patch)
tree55939a8ba1dce4f4e48ebb13b658061d62bf1b9a /trunk/infrastructure/net.appjet.ajstdlib
parentc1894c8e0a52f4e3d2f89fa92f0066bbf0fcf1b1 (diff)
downloadetherpad-98e2821b38a775737e42a2479a6bc65107210859.tar.gz
etherpad-98e2821b38a775737e42a2479a6bc65107210859.tar.xz
etherpad-98e2821b38a775737e42a2479a6bc65107210859.zip
reorganizing the first level of folders (trunk/branch folders are not the git way :)
Diffstat (limited to 'trunk/infrastructure/net.appjet.ajstdlib')
-rw-r--r--trunk/infrastructure/net.appjet.ajstdlib/ajstdlib.scala253
-rw-r--r--trunk/infrastructure/net.appjet.ajstdlib/sqlbase.scala563
-rw-r--r--trunk/infrastructure/net.appjet.ajstdlib/streaming-client.js920
-rw-r--r--trunk/infrastructure/net.appjet.ajstdlib/streaming-iframe.html76
-rw-r--r--trunk/infrastructure/net.appjet.ajstdlib/streaming.scala892
-rw-r--r--trunk/infrastructure/net.appjet.ajstdlib/timer.scala85
6 files changed, 0 insertions, 2789 deletions
diff --git a/trunk/infrastructure/net.appjet.ajstdlib/ajstdlib.scala b/trunk/infrastructure/net.appjet.ajstdlib/ajstdlib.scala
deleted file mode 100644
index 8d285af..0000000
--- a/trunk/infrastructure/net.appjet.ajstdlib/ajstdlib.scala
+++ /dev/null
@@ -1,253 +0,0 @@
-/**
- * Copyright 2009 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS-IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package net.appjet.ajstdlib;
-
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.{ScheduledThreadPoolExecutor, Callable};
-import scala.collection.mutable.{HashMap, SynchronizedMap};
-
-import org.mozilla.javascript.{Context, ScriptableObject, Function, RhinoException, Scriptable};
-
-import net.appjet.oui.{SpecialJarOrNotFile, DiskLibrary, FixedDiskLibrary, VariableDiskLibrary, ExecutionContext, ExecutionContextUtils, ScopeReuseManager, config, exceptionlog};
-import net.appjet.bodylock.{BodyLock, ExecutionException};
-import net.appjet.common.util.LenientFormatter;
-
-import org.mortbay.jetty.nio.SelectChannelConnector;
-import org.mortbay.util.ajax.ContinuationSupport;
-
-object ajstdlib {
- def runModuleInNewScope(cx: ExecutionContext, moduleName: String): Any = {
- val newScope = BodyLock.subScope(cx.runner.globalScope);
- if (! libraryExists(moduleName))
- return Context.getUndefinedValue(); // unfortunately, returning "false" doesn't really do the right thing here.
- try {
- libraryExecutable(moduleName).execute(newScope);
- } catch {
- case e: ExecutionException => throw e;
- case e => throw new ExecutionException("Error occurred while running module: "+moduleName, e);
- // TODO: There was code here to print errors to the response if something didn't compile. Replace this code?
- }
- newScope;
- }
-
- private val modules = new HashMap[String, DiskLibrary] with SynchronizedMap[String, DiskLibrary];
- private def library(name: String) = modules.getOrElseUpdate(name+".js", new VariableDiskLibrary(name+".js"));
- private def libraryExists(name: String) = {
- val lib = library(name);
- // ScopeReuseManager.watch(lib);
- lib.exists;
- }
- private def libraryExecutable(name: String) = {
- val lib = library(name);
- // ScopeReuseManager.watch(lib);
- lib.executable;
- }
-
- val globalLock = new ReentrantLock();
- val attributes = new HashMap[String, Any] with SynchronizedMap[String, Any];
-
- def init() {
- // any other ajstdlib initialization goes here.
- Comet.init();
- }
-}
-
-object printf {
- def printf(format: String, list: Array[Object]): String = {
-// val list: Array[Object] = new Array[Object](argList.getLength)
-// for (i <- List.range(0, list.length))
-// list(i) = argList.getElement(i).getOrElse(null) match {
-// case AppVM.JSNumber(n) => n
-// case AppVM.JSString(s) => s
-// case AppVM.JSBoolean(b) => b
-// case _ => null
-// }
- val args = list.map(x => Context.jsToJava(x, classOf[Object]));
- try {
- val fmt = new LenientFormatter()
- fmt.format(format, args: _*)
- fmt.toString()
- } catch {
- case e: java.util.IllegalFormatException =>
- throw new ExecutionException("String Format Error: <tt>printf</tt> error: "+e.getMessage(), e)
- }
- }
-}
-
-
-import java.security.MessageDigest;
-
-object md5 {
- def md5(input: String): String = {
- val bytes = input.getBytes("UTF-8");
- md5(bytes);
- }
- def md5(bytes: Array[byte]): String = {
- var md = MessageDigest.getInstance("MD5");
- var digest = md.digest(bytes);
- var builder = new StringBuilder();
- for (b <- digest) {
- builder.append(Integer.toString((b >> 4) & 0xf, 16));
- builder.append(Integer.toString(b & 0xf, 16));
- }
- builder.toString();
- }
-}
-
-object execution {
- def runAsync(ec: ExecutionContext, f: Function) {
- ec.asyncs += f;
- }
-
- def executeCodeInNewScope(parentScope: Scriptable,
- code: String, name: String,
- startLine: Int): Scriptable = {
- val ec = ExecutionContextUtils.currentContext;
- val executable =
- try {
- BodyLock.compileString(code, name, startLine);
- } catch {
- case e: RhinoException =>
- throw new ExecutionException(
- "Failed to execute code in new scope.", e);
- }
- if (ec == null || ec.runner == null) {
- Thread.dumpStack();
- }
- val scope = BodyLock.subScope(
- if (parentScope != null) parentScope
- else ec.runner.mainScope);
- scope.setParentScope(ec.runner.mainScope);
- executable.execute(scope);
- scope;
- }
-
- def runTask(taskName: String, args: Array[Object]): AnyRef = {
- val ec = net.appjet.oui.execution.runOutOfBand(
- net.appjet.oui.execution.scheduledTaskExecutable,
- "Task "+taskName,
- Some(Map("taskName" -> taskName,
- "taskArguments" -> args)),
- { error =>
- error match {
- case e: Throwable => exceptionlog(e);
- case _ => exceptionlog(error.toString);
- }
- });
- ec.result;
- }
-
- def runTaskSimply(taskName: String, args: Array[Object]): AnyRef = {
- net.appjet.oui.execution.runOutOfBandSimply(
- net.appjet.oui.execution.scheduledTaskExecutable,
- Some(Map("taskName" -> taskName,
- "taskArguments" -> args)));
- }
-
- def wrapRunTask(taskName: String, args: Array[Object],
- returnType: Class[_]): Function0[AnyRef] = {
- new Function0[AnyRef] {
- def apply = Context.jsToJava(runTaskSimply(taskName, args), returnType);
- }
- }
-
- val threadpools = new HashMap[String, ScheduledThreadPoolExecutor]
- with SynchronizedMap[String, ScheduledThreadPoolExecutor];
-
- def createNamedTaskThreadPool(name: String, poolSize: Int) {
- threadpools.put(name, new ScheduledThreadPoolExecutor(poolSize));
- }
-
- class TaskRunner(val taskName: String, args: Array[Object]) extends Callable[AnyRef] {
- def call(): AnyRef = {
- runTask(taskName, args);
- }
- }
-
- def scheduleTaskInPool(poolName: String, taskName: String, delayMillis: Long, args: Array[Object]) = {
- val pool = threadpools.getOrElse(poolName, throw new RuntimeException("No such task threadpool: "+poolName));
- pool.schedule(new TaskRunner(taskName, args), delayMillis, java.util.concurrent.TimeUnit.MILLISECONDS);
- }
-
- def shutdownAndWaitOnTaskThreadPool(poolName: String, timeoutMillis: Long) = {
- val pool = threadpools.getOrElse(poolName, throw new RuntimeException("No such task threadpool: "+poolName));
- pool.shutdown();
- pool.awaitTermination(timeoutMillis, java.util.concurrent.TimeUnit.MILLISECONDS);
- }
-
- def getContinuation(ec: ExecutionContext) = {
- val req = ec.request.req;
- ContinuationSupport.getContinuation(req, req).asInstanceOf[SelectChannelConnector.RetryContinuation];
- }
-
- def sync[T](obj: AnyRef)(block: => T): T = {
- obj.synchronized {
- block;
- }
- }
-}
-
-import javax.mail._;
-import javax.mail.internet._;
-import java.util.Properties;
-
-object email {
- def sendEmail(toAddr: Array[String], fromAddr: String, subject: String, headers: Scriptable, content: String): String = {
- try {
- val badAddresses = for (a <- toAddr if (a.indexOf("@") == -1)) yield a;
- if (badAddresses.length > 0) {
- "The email address"+(if (badAddresses.length > 1) "es" else "")+" "+
- badAddresses.mkString("\"", "\", \"", "\"")+" do"+(if (badAddresses.length == 1) "es" else "")+
- " not appear to be valid.";
- } else {
- val debug = false;
-
- val props = new Properties;
- props.put("mail.smtp.host", config.smtpServerHost);
- props.put("mail.smtp.port", config.smtpServerPort.toString());
- if (config.smtpUser != "")
- props.put("mail.smtp.auth", "true");
-
- val session = Session.getInstance(props, if (config.smtpUser != "") new Authenticator {
- override def getPasswordAuthentication() =
- new PasswordAuthentication(config.smtpUser, config.smtpPass);
- } else null);
- session.setDebug(debug);
-
- val msg = new MimeMessage(session);
- val fromIAddr = new InternetAddress(fromAddr);
- msg.setFrom(fromIAddr);
- val toIAddr: Array[Address] = toAddr.map(x => (new InternetAddress(x))); // new InternetAddress(toAddr);
- msg.setRecipients(Message.RecipientType.TO, toIAddr);
-
- if (headers != null)
- for (o <- headers.getIds() if o.isInstanceOf[String]) {
- val k = o.asInstanceOf[String]
- msg.addHeader(k, headers.get(k, headers).asInstanceOf[String]);
- }
-
- msg.setSubject(subject);
- msg.setContent(content, "text/plain");
- Transport.send(msg);
- "";
- }
- } catch {
- case e: MessagingException => { exceptionlog(e); e.printStackTrace() ; "Messaging exception: "+e.getMessage+"."; }
- case e: Exception => { exceptionlog(e); e.printStackTrace(); "Unknown error."; }
- }
- }
-}
diff --git a/trunk/infrastructure/net.appjet.ajstdlib/sqlbase.scala b/trunk/infrastructure/net.appjet.ajstdlib/sqlbase.scala
deleted file mode 100644
index 047c086..0000000
--- a/trunk/infrastructure/net.appjet.ajstdlib/sqlbase.scala
+++ /dev/null
@@ -1,563 +0,0 @@
-/**
- * Copyright 2009 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS-IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package net.appjet.ajstdlib;
-
-import scala.collection.mutable.ArrayBuffer;
-
-import java.sql.{DriverManager, SQLException, Statement};
-import net.appjet.oui.{profiler, config, NoninheritedDynamicVariable};
-import com.mchange.v2.c3p0._;
-
-class SQLBase(driverClass: String, url: String, userName: String, password: String) {
-
- def isMysql:Boolean = (url.startsWith("jdbc:mysql:"));
- def isDerby:Boolean = (url.startsWith("jdbc:derby:"));
-
- if (isDerby) {
- System.setProperty("derby.system.home", config.derbyHome);
- val f = new java.io.File(config.derbyHome);
- if (! f.exists) {
- if (! f.mkdirs())
- throw new RuntimeException("Couldn't create internal database storage directory: "+config.derbyHome);
- }
- if (! f.isDirectory)
- throw new RuntimeException("Internal database storage directory is not a directory: "+config.derbyHome);
- if (! f.canWrite)
- throw new RuntimeException("Can't write to internal database storage directory: "+config.derbyHome);
- }
-
- val cpds = new ComboPooledDataSource();
- cpds.setDriverClass(driverClass);
- cpds.setJdbcUrl(url+(if (isMysql) "?useUnicode=true&characterEncoding=utf8" else ""));
-
- // derby does not require a password
- if (!isDerby) {
- cpds.setUser(userName);
- cpds.setPassword(password);
- }
-
- cpds.setMaxPoolSize(config.jdbcPoolSize);
- cpds.setMaxConnectionAge(6*60*60); // 6 hours
- if (config.devMode) {
- cpds.setAutomaticTestTable("cpds_testtable");
- cpds.setTestConnectionOnCheckout(true);
- }
-
-// {
-// // register db driver
-// try {
-// new JDCConnectionDriver(driverClass, url+"?useUnicode=true&characterEncoding=utf8", userName, password);
-// } catch {
-// case e => {
-// e.printStackTrace();
-// Runtime.getRuntime.halt(1);
-// }
-// }
-// }
-
- private def getConnectionFromPool = {
- val c = cpds.getConnection();
- c.setAutoCommit(true);
- c;
- }
-
- // Creates a dynamic variable whose .value depends on the innermost
- // .withValue(){} on the call-stack.
- private val currentConnection = new NoninheritedDynamicVariable[Option[java.sql.Connection]](None);
-
- def withConnection[A](block: java.sql.Connection=>A): A = {
- currentConnection.value match {
- case Some(c) => {
- block(c);
- }
- case None => {
- val t1 = profiler.time;
- val c = getConnectionFromPool;
- profiler.recordCumulative("getConnection", profiler.time-t1);
- try {
- currentConnection.withValue(Some(c)) {
- block(c);
- }
- } finally {
- c.close;
- }
- }
- }
- }
-
- private val currentlyInTransaction = new NoninheritedDynamicVariable(false);
-
- def inTransaction[A](block: java.sql.Connection=>A): A = {
- withConnection(c => {
- if (currentlyInTransaction.value) {
- return block(c);
- } else {
- currentlyInTransaction.withValue(true) {
- c.setAutoCommit(false);
- c.setTransactionIsolation(java.sql.Connection.TRANSACTION_REPEATABLE_READ);
-
- try {
- val result = block(c);
- c.commit();
- c.setAutoCommit(true);
- result;
- } catch {
- case e@net.appjet.oui.AppGeneratedStopException => {
- c.commit();
- c.setAutoCommit(true);
- throw e;
- }
- case (e:org.mozilla.javascript.WrappedException) if (e.getWrappedException ==
- net.appjet.oui.AppGeneratedStopException) => {
- c.commit();
- c.setAutoCommit(true);
- throw e;
- }
- case e => {
- //println("inTransaction() caught error:");
- //e.printStackTrace();
- try {
- c.rollback();
- c.setAutoCommit(true);
- } catch {
- case ex => {
- println("Could not rollback transaction because: "+ex.toString());
- }
- }
- throw e;
- }
- }
- }
- }
- });
- }
-
- def closing[A](closable: java.sql.Statement)(block: =>A): A = {
- try { block } finally { closable.close(); }
- }
-
- def closing[A](closable: java.sql.ResultSet)(block: =>A): A = {
- try { block } finally { closable.close(); }
- }
-
- def tableName(t: String) = id(t);
-
- val identifierQuoteString = withConnection(_.getMetaData.getIdentifierQuoteString);
- def quoteIdentifier(s: String) = identifierQuoteString+s+identifierQuoteString;
- private def id(s: String) = quoteIdentifier(s);
-
- def longTextType = if (isDerby) "CLOB" else "MEDIUMTEXT";
-
- // derby seems to do things intelligently w.r.t. case-sensitivity and unicode support.
- def createTableOptions = if (isMysql) " ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE utf8_bin" else "";
-
- // creates table if it doesn't exist already
- def createJSONTable(table: String) {
- withConnection { c=>
- val s = c.createStatement;
- if (! doesTableExist(c, table)) {
- closing(s) {
- s.execute("CREATE TABLE "+tableName(table)+" ("+
- id("ID")+" VARCHAR(128) PRIMARY KEY NOT NULL, "+
- id("JSON")+" "+longTextType+" NOT NULL"+
- ")"+createTableOptions);
- }
- }
- }
- }
-
- // requires: table exists
- // returns null if key doesn't exist
- def getJSON(table: String, key: String): String = {
- withConnection { c=>
- val s = c.prepareStatement("SELECT "+id("JSON")+" FROM "+tableName(table)+" WHERE "+id("ID")+" = ?");
- closing(s) {
- s.setString(1, key);
- var resultSet = s.executeQuery();
- closing(resultSet) {
- if (! resultSet.next()) {
- null;
- }
- else {
- resultSet.getString(1);
- }
- }
- }
- }
- }
-
- def getAllJSON(table: String, start: Int, count: Int): Array[Object] = {
- withConnection { c =>
- val s = c.prepareStatement("SELECT "+id("ID")+","+id("JSON")+" FROM "+tableName(table)+
- " ORDER BY "+id("ID")+" DESC"+
- " LIMIT ? OFFSET ?");
- closing(s) {
- s.setInt(2, start);
- s.setInt(1, count);
- var resultSet = s.executeQuery();
- var output = new ArrayBuffer[Object];
- closing(resultSet) {
- while (resultSet.next()) {
- output += new { val id = resultSet.getString(1); val value = resultSet.getString(2) };
- }
- output.toArray;
- }
- }
- }
- }
-
- def getAllJSONKeys(table: String): Array[String] = {
- withConnection { c =>
- val s = c.prepareStatement("SELECT "+id("ID")+" FROM "+tableName(table));
- closing(s) {
- var resultSet = s.executeQuery();
- var output = new ArrayBuffer[String];
- closing(resultSet) {
- while (resultSet.next()) {
- output += resultSet.getString(1);
- }
- output.toArray;
- }
- }
- }
- }
-
- // requires: table exists
- // inserts key if it doesn't exist
- def putJSON(table: String, key: String, json: String) {
- withConnection { c=>
- val update = c.prepareStatement("UPDATE "+tableName(table)+" SET "+id("JSON")+"=? WHERE "+id("ID")+"=?");
- closing(update) {
- update.setString(1, json);
- update.setString(2, key);
- update.executeUpdate();
- if (update.getUpdateCount == 0) {
- val insert = c.prepareStatement(
- "INSERT INTO "+tableName(table)+" ("+id("ID")+", "+id("JSON")+") values (?,?)");
- closing(insert) {
- insert.setString(1, key);
- insert.setString(2, json);
- insert.executeUpdate();
- }
- }
- }
- }
- }
-
- def deleteJSON(table: String, key: String) {
- // requires: table exists
- withConnection { c=>
- val update = c.prepareStatement("DELETE FROM "+tableName(table)+" WHERE "+id("ID")+"=?");
- closing(update) {
- update.setString(1, key);
- update.executeUpdate();
- }
- }
- }
-
- private def metaName(table: String) = table+"_META";
- private def metaTableName(table: String) = tableName(metaName(table));
- private def textTableName(table: String) = tableName(table+"_TEXT");
- private def escapeSearchString(dbm: java.sql.DatabaseMetaData, s: String): String = {
- val e = dbm.getSearchStringEscape();
- s.replace("_", e+"_").replace("%", e+"%");
- }
-
- private final val PAGE_SIZE = 20;
-
- def doesTableExist(connection: java.sql.Connection, table: String): Boolean = {
- val databaseMetadata = connection.getMetaData;
- val tables = databaseMetadata.getTables(null, null,
- escapeSearchString(databaseMetadata, table), null);
- closing(tables) {
- tables.next();
- }
- }
-
- def autoIncrementClause = if (isDerby) "GENERATED BY DEFAULT AS IDENTITY" else "AUTO_INCREMENT";
-
- // creates table if it doesn't exist already
- def createStringArrayTable(table: String) {
- withConnection { c=>
- if (! doesTableExist(c, metaName(table))) { // check to see if the *_META table exists
- // create tables and indices
- val s = c.createStatement;
- closing(s) {
- s.execute("CREATE TABLE "+metaTableName(table)+" ("+
- id("ID")+" VARCHAR(128) PRIMARY KEY NOT NULL, "+
- id("NUMID")+" INT UNIQUE "+autoIncrementClause+" "+
- ")"+createTableOptions);
- val defaultOffsets = (1 to PAGE_SIZE).map(x=>"").mkString(",");
- s.execute("CREATE TABLE "+textTableName(table)+" ("+
- ""+id("NUMID")+" INT, "+id("PAGESTART")+" INT, "+id("OFFSETS")+" VARCHAR(256) NOT NULL DEFAULT '"+defaultOffsets+
- "', "+id("DATA")+" "+longTextType+" NOT NULL"+
- ")"+createTableOptions);
- s.execute("CREATE INDEX "+id(table+"-NUMID-PAGESTART")+" ON "+textTableName(table)+"("+id("NUMID")+", "+id("PAGESTART")+")");
- }
- }
- }
- }
-
- // requires: table exists
- // returns: null if key or (key,index) doesn't exist, else the value
- def getStringArrayElement(table: String, key: String, index: Int): String = {
- val (pageStart, offset) = getPageStartAndOffset(index);
- val page = new StringArrayPage(table, key, pageStart, true);
- page.data(offset);
- }
-
- // requires: table exists
- // returns: an array of the mappings present in the page that should hold the
- // particular (key,index) mapping. the array may be empty or otherwise not
- // contain the given (key,index).
- def getPageStringArrayElements(table: String, key: String, index: Int): Array[IndexValueMapping] = {
- val (pageStart, offset) = getPageStartAndOffset(index);
- val page = new StringArrayPage(table, key, pageStart, true);
- val buf = new scala.collection.mutable.ListBuffer[IndexValueMapping];
-
- for(i <- 0 until page.data.length) {
- val s = page.data(i);
- if (s ne null) {
- val n = pageStart + i;
- buf += IndexValueMapping(n, s);
- }
- }
-
- buf.toArray;
- }
-
- // requires: table exists
- // creates key if doesn't exist
- // value may be null
- def putStringArrayElement(table: String, key: String, index: Int, value: String) {
- val (pageStart, offset) = getPageStartAndOffset(index);
- val page = new StringArrayPage(table, key, pageStart, false);
- page.data(offset) = value;
- page.updateDB();
- }
-
- def putMultipleStringArrayElements(table: String, key: String): Multiputter = new Multiputter {
- var currentPage = None:Option[StringArrayPage];
- def flushPage() {
- if (currentPage.isDefined) {
- val page = currentPage.get;
- page.updateDB();
- currentPage = None;
- }
- }
- def finish() {
- flushPage();
- }
- def put(index: Int, value: String) {
- try {
- val (pageStart, offset) = getPageStartAndOffset(index);
- if (currentPage.isEmpty || currentPage.get.pageStart != pageStart) {
- flushPage();
- currentPage = Some(new StringArrayPage(table, key, pageStart, false));
- }
- currentPage.get.data(offset) = value;
- }
- catch {
- case e => { e.printStackTrace; throw e }
- }
- }
- }
-
- trait Multiputter {
- def put(index: Int, value: String);
- def finish();
- }
-
- case class IndexValueMapping(index: Int, value: String);
-
- def clearStringArray(table: String, key: String) {
- withConnection { c=>
- val numid = getStringArrayNumId(c, table, key, false);
- if (numid >= 0) {
- {
- val s = c.prepareStatement("DELETE FROM "+textTableName(table)+" WHERE "+id("NUMID")+"=?");
- closing(s) {
- s.setInt(1, numid);
- s.executeUpdate();
- }
- }
- {
- val s = c.prepareStatement("DELETE FROM "+metaTableName(table)+" WHERE "+id("NUMID")+"=?");
- closing(s) {
- s.setInt(1, numid);
- s.executeUpdate();
- }
- }
- }
- }
- }
-
- private def getPageStartAndOffset(index: Int): (Int,Int) = {
- val pageStart = (index / PAGE_SIZE) * PAGE_SIZE;
- (pageStart, index - pageStart);
- }
-
- // requires: table exists
- // returns: numid of new string array
- private def newStringArray(c: java.sql.Connection, table: String, key: String): Int = {
- val s = c.prepareStatement("INSERT INTO "+metaTableName(table)+" ("+id("ID")+") VALUES (?)",
- Statement.RETURN_GENERATED_KEYS);
- closing(s) {
- s.setString(1, key);
- s.executeUpdate();
- val resultSet = s.getGeneratedKeys;
- if (resultSet == null)
- error("No generated numid for insert");
- closing(resultSet) {
- if (! resultSet.next()) error("No generated numid for insert");
- resultSet.getInt(1);
- }
- }
- }
-
- def getStringArrayNumId(c: java.sql.Connection, table: String, key: String, creating: Boolean): Int = {
- val s = c.prepareStatement("SELECT "+id("NUMID")+" FROM "+metaTableName(table)+" WHERE "+id("ID")+"=?");
- closing(s) {
- s.setString(1, key);
- val resultSet = s.executeQuery();
- closing(resultSet) {
- if (! resultSet.next()) {
- if (creating) {
- newStringArray(c, table, key);
- }
- else {
- -1
- }
- }
- else {
- resultSet.getInt(1);
- }
- }
- }
- }
-
- def getStringArrayAllKeys(table: String): Array[String] = {
- withConnection { c=>
- val s = c.prepareStatement("SELECT "+id("ID")+" FROM "+metaTableName(table));
- closing(s) {
- val resultSet = s.executeQuery();
- closing(resultSet) {
- val buf = new ArrayBuffer[String];
- while (resultSet.next()) {
- buf += resultSet.getString(1);
- }
- buf.toArray;
- }
- }
- }
- }
-
- private class StringArrayPage(table: String, key: String, val pageStart: Int, readonly: Boolean) {
-
- val data = new Array[String](PAGE_SIZE);
-
- private val numid = withConnection { c=>
- val nid = getStringArrayNumId(c, table, key, ! readonly);
-
- if (nid >= 0) {
- val s = c.prepareStatement(
- "SELECT "+id("OFFSETS")+","+id("DATA")+" FROM "+textTableName(table)+" WHERE "+id("NUMID")+"=? AND "+id("PAGESTART")+"=?");
- closing(s) {
- s.setInt(1, nid);
- s.setInt(2, pageStart);
- val resultSet = s.executeQuery();
- closing(resultSet) {
- if (! resultSet.next()) {
- if (! readonly) {
- val insert = c.prepareStatement("INSERT INTO "+textTableName(table)+
- " ("+id("NUMID")+", "+id("PAGESTART")+", "+id("DATA")+") VALUES (?,?,'')");
- closing(insert) {
- insert.setInt(1, nid);
- insert.setInt(2, pageStart);
- insert.executeUpdate();
- }
- }
- }
- else {
- val offsetsField = resultSet.getString(1);
- val dataField = resultSet.getString(2);
- val offsetStrings = offsetsField.split(",", -1);
- var i = 0;
- var idx = 0;
- while (i < PAGE_SIZE) {
- val nstr = offsetStrings(i);
- if (nstr != "") {
- val n = nstr.toInt;
- data(i) = dataField.substring(idx, idx+n);
- idx += n;
- }
- i += 1;
- }
- }
- }
- }
- }
- nid;
- }
-
- def updateDB() {
- if (readonly) {
- error("this is a readonly StringArrayPage");
- }
- // assert: the relevant row of the TEXT table exists
- if (data.find(_ ne null).isEmpty) {
- withConnection { c=>
- val update = c.prepareStatement("DELETE FROM "+textTableName(table)+
- " WHERE "+id("NUMID")+"=? AND "+id("PAGESTART")+"=?");
- closing(update) {
- update.setInt(1, numid);
- update.setInt(2, pageStart);
- update.executeUpdate();
- }
- }
- }
- else {
- val offsetsStr = data.map(s => if (s eq null) "" else s.length.toString).mkString(",");
- val dataStr = data.map(s => if (s eq null) "" else s).mkString("");
- withConnection { c=>
- val s = c.prepareStatement("UPDATE "+textTableName(table)+
- " SET "+id("OFFSETS")+"=?, "+id("DATA")+"=? WHERE "+id("NUMID")+"=? AND "+id("PAGESTART")+"=?");
- closing(s) {
- s.setString(1, offsetsStr);
- s.setString(2, dataStr);
- s.setInt(3, numid);
- s.setInt(4, pageStart);
- s.executeUpdate();
- }
- }
- }
- }
- }
-
- def close {
- if (isDerby) {
- cpds.close();
- try {
- DriverManager.getConnection("jdbc:derby:;shutdown=true");
- } catch {
- case e: SQLException => if (e.getErrorCode() != 50000) throw e
- }
- }
- }
-}
-
-
diff --git a/trunk/infrastructure/net.appjet.ajstdlib/streaming-client.js b/trunk/infrastructure/net.appjet.ajstdlib/streaming-client.js
deleted file mode 100644
index 3bfa227..0000000
--- a/trunk/infrastructure/net.appjet.ajstdlib/streaming-client.js
+++ /dev/null
@@ -1,920 +0,0 @@
-/**
- * Copyright 2009 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS-IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-var host = window.location.host;
-
-function WebSocket(id) {
- var self = this;
- var socket = this;
- var version = 2;
-
- var timeouts = {};
-
- this.onopen = function() { }
- this.onclosed = function() { }
- this.onmessage = function() { }
- this.onhiccup = function() { }
- this.onlogmessage = function() { }
- this.CONNECTING = 0;
- this.OPEN = 1;
- this.CLOSED = 2;
- this.readyState = -1;
-
- var hiccupsLastMinute = 0;
- var hiccupResetInterval = setInterval(function() {
- hiccupsLastMinute = 0;
- if (self.readyState == self.CLOSED)
- clearInterval(hiccupResetInterval);
- }, 60*1000);
-
- var isHiccuping = false;
- function hiccup(channel) {
- if (channel != officialChannel && channel != self) return;
- if (isHiccuping) return;
- log("hiccup: "+channel.name);
- if (hiccupsLastMinute++ > 10) {
- doDisconnect({reconnect: true, reason: "Too many hiccups!"});
- return;
- }
- closeAllChannels();
- timeout(timeouts, "hiccup", 15000, function() {
- isHiccuping = false;
- doDisconnect({reconnect: false, reason: "Couldn't contact server to hiccup."});
- });
- isHiccuping = true;
- function tryHiccup() {
- if (! isHiccuping) return;
- self.onhiccup({connected: false});
- log("trying hiccup");
- timeout(timeouts, "singleHiccup", 5000, function() {
- tryHiccup();
- });
- simpleXhr('post', postPath(), true, [{key: "oob", value: "hiccup"}], function(sc, msg) {
- if (! isHiccuping) return;
- if (msg.substring(0, "restart-fail".length) == "restart-fail") {
- doDisconnect({reconnect: true, reason: "Server restarted or socket timed out on server."});
- } else if (sc != 200 || msg.substring(0, 2) != "ok") {
- log("Failed to hiccup with error: "+sc+" / "+msg);
- setTimeout(tryHiccup, 500);
- } else {
- isHiccuping = false;
- timeouts.singleHiccup();
- timeouts.hiccup();
- doConnect();
- }
- });
- }
- tryHiccup();
- }
- function closeAllChannels() {
- for (var i in activeChannels) {
- if (activeChannels.hasOwnProperty(i)) {
- activeChannels[i].disconnect();
- }
- }
- officialChannel = undefined;
- }
-
- function doDisconnect(obj, silent, sync) {
- log("disconnected: "+obj.reason+" / "+(obj.data !== undefined ? "data: "+obj.data : ""));
- logAll();
- closeAllChannels();
- if (longPollingIFrame && longPollingIFrame.div) {
- longPollingIFrame.div.innerHTML = "";
- }
- if (self.readyState != self.CLOSED) {
- self.readyState = self.CLOSED;
- if (! silent) {
- postSingleMessageNow(true, "kill:"+obj.reason, sync, true);
- }
- self.onclosed(obj);
- }
- }
-
- this.disconnect = function(sync) {
- doDisconnect({reason: "Closed by client."}, false, sync);
- }
-
-
- function doBasicConnect() {
- var type = getBasicChannel();
- log("basic connect on type: "+type);
- var channel = activeChannels[type] = new channelConstructors[type]();
- channel.connect();
- }
-
- function doOtherConnect() {
- var channels = getOtherChannels();
- var channel; var type;
- for (var i = 0; i < channels.length; ++i) {
- type = channels[i];
- log("other connect on type: "+type);
- channel = activeChannels[type] = new channelConstructors[type]();
- channel.connect();
- }
- }
- function doConnect() {
- log("doing connect!");
- timeout(timeouts, "connect", 15000, function() {
- doDisconnect({reconnect: false, reason: "Timeout connecting to server: no channel type was able to connect."});
- });
- doBasicConnect();
- }
-
- this.connect = function() {
- log("socket connecting: "+id);
- doConnect();
- }
-
- // util
- function nicetime() { return Math.floor((new Date()).valueOf() / 100) % 10000000; }
- function log(s) { self.onlogmessage("(comet @t: "+nicetime()+") "+s); }
- function logAll() {
- log(self.describe())
- }
- this.describe = function() {
- function describeChannels() {
- out = [];
- for (var i in activeChannels) {
- if (activeChannels.hasOwnProperty(i)) {
- out.push(i+": "+activeChannels[i].describe());
- }
- }
- return "[ "+out.join(", ")+" ]";
- }
- return ("socket state: { id: "+id+", readyState: "+self.readyState+", isHiccuping: "+isHiccuping+", timeouts: "+describeTimeouts(timeouts)+", officialChannel: "+(officialChannel?officialChannel.name:"none")+", channels: "+describeChannels()+", isPosting: "+isPosting+", lastReceivedSeqNumber: "+lastReceivedSeqNumber+", lastPost: "+lastPost+", postTimeouts: "+describeTimeouts(postTimeouts)+", channelSeq: "+channelSeq+" }");
- }
-
- function wrapMethod(obj, method) {
- return function() {
- var arr = [];
- for (var i=0; i < arguments.length; i++) {
- arr.push(arguments[i]);
- }
- method.apply(obj, arr);
- }
- }
- var _wm = wrapMethod;
-
- // cb should take statusCode, responseText, and optionally request
- function simpleXhr(method, uri, async, params, cb, makeXhr) {
-// log("making simple Xhr: "+[method, uri, async, params].join(", "));
- var request = (makeXhr || newRequestObject)();
- request.open(method, uri, async);
- if (async) {
- request.onreadystatechange = function() {
- if (request.readyState != 4) return;
- var status;
- var responseText;
- try {
- status = request.status;
- responseText = request.responseText;
- } catch (e) { /* absorb ff error accessing request properties */ }
- cb(status, responseText, request);
- }
- }
- var data = null;
- if (params) {
- data = [];
- for (var i = 0; i < params.length; ++i) {
- data.push(encodeURIComponent(params[i].key)+"="+encodeURIComponent(params[i].value));
- }
- data = data.join("&");
- request.setRequestHeader("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
- }
- try {
- request.send(data);
- } catch (e) { request.abort(); cb(500, "Error sending data!", request); }
- if (! async) {
- var status;
- var responseText;
- try {
- status = request.status;
- responseText = request.responseText;
- } catch (e) { /* absorb ff error accessing request properties */ }
- cb(status, responseText, request);
- }
- return request;
- }
-
- var timeout_noop = function() { }
- function timeout(timeoutObject, timeoutName, millis, timeoutCallback) {
- function clearIt(timeoutObject, timeoutName) {
- if (timeoutObject[timeoutName]) {
- timeoutObject[timeoutName]();
- timeoutObject[timeoutName] = timeout_noop;
- }
- }
- var timeoutId = setTimeout(function() { clearIt(timeoutObject, timeoutName); timeoutCallback(); }, millis);
- var f = function() {
- clearTimeout(timeoutId);
- }
- clearIt(timeoutObject, timeoutName);
- timeoutObject[timeoutName] = f;
- return f;
- }
-
- // handling messages
- var lastReceivedSeqNumber = 0;
-
- function dataHandler(msg) {
- if (msg.seqNumber > lastReceivedSeqNumber+1) {
- log("bad sequence number. expecting: "+(lastReceivedSeqNumber+1)+", got: "+msg.seqNumber);
- hiccup(self);
- return false;
- }
- if (msg.seqNumber < lastReceivedSeqNumber+1) return true;
- lastReceivedSeqNumber = msg.seqNumber;
- if (! msg.isControl) {
- self.onmessage({ data: msg.content });
- return true;
- } else {
- if (msg.content == "kill") {
- doDisconnect({reconnect: false, reason: "Killed by server."});
- return false;
- }
- }
- }
-
- // client-server comm
- var postPath = function() {
- return "%contextPath%/post?r="+randomVar()+"&v="+version+"&id="+id+"&seq="+lastReceivedSeqNumber;
- }
-
- function SimpleQueue() {
- var base = [];
- var head = 0;
- var tail = 0;
- this.offer = function(data) {
- base[tail++] = data;
- }
- this.poll = function() {
- if (this.length() > 0) {
- var n = base[head];
- delete base[head++];
- return n;
- }
- }
- this.clear = function() {
- head = 0;
- tail = 0;
- var oldBase = base;
- base = [];
- return oldBase;
- }
- this.length = function() {
- return tail - head;
- }
- }
- var outgoingMessageQueue = new SimpleQueue();
- var isPosting = false;
- var postTimeouts = {};
- var lastPost;
-
- function postSingleMessageNow(isControl, data, sync, force, cb) {
- doPostMessages([{oob: isControl, data: data, cb: cb}], sync, force)
- }
-
- function doPostMessages(messages, sync, force, cb) {
- if (! force && self.readyState == self.CLOSED) return;
- if (messages.length == 0) {
- if (cb) cb();
- return;
- }
- var data = [];
- var callbacks = [];
- for (var i = 0; i < messages.length; ++i) {
- data.push({key: (messages[i].oob ? "oob" : "m"),
- value: messages[i].data});
- if (messages[i].cb)
- callbacks.push(messages[i].cb);
- }
- function postFailed(sc, msg, req, src) {
- var str = "";
- try {
- str = sc + ": "+req.statusText+" - "+msg+" ("+src+")";
- } catch (e) { /* absorb potential Firefox error accessing req */ }
- doDisconnect({reconnect: true, reason: "Posting message failed.", data: str});
- for (var i = 0; i < callbacks.length; ++i) {
- callbacks[i](false, str);
- }
- }
- function postCallback(sc, msg, request) {
- postTimeouts.post();
- if (sc != 200 || msg.substring(0, 2) != "ok") {
- postFailed(sc, msg, request, "1");
- } else {
- for (var i = 0; i < callbacks.length; ++i) {
- callbacks[i](true);
- }
- if (cb) cb();
- }
- }
- timeout(postTimeouts, "post", 15000, function() {
- doDisconnect({reconnect: true, reason: "Posting message timed out."});
- });
- simpleXhr('post', postPath(), ! sync, data, postCallback);
- }
-
- function postPendingMessages() {
- if (isPosting == true)
- return;
- var messages = outgoingMessageQueue.clear();
- if (messages.length == 0) {
- return;
- }
- isPosting = true;
- doPostMessages(messages, false, false, function() { isPosting = false; setTimeout(postPendingMessages, 0); });
- lastPost = nicetime();
- }
- this.postMessage = function(data, cb) {
- if (self.readyState != self.OPEN) {
- return;
- }
- outgoingMessageQueue.offer({data: data, cb: cb});
- setTimeout(function() { postPendingMessages() }, 0);
- }
-
- // transports
- function getValidChannels() {
- var channels = [];
- for (var i = 0; i < validChannels.length; ++i) {
- var type = validChannels[i];
- if (window.location.hash.length > 0) {
- if (window.location.hash != "#"+type) {
- continue;
- }
- }
- if ($ && $.browser.opera && type != 'shortpolling' && type != 'streaming') {
- continue;
- }
- channels.push(type);
- }
- return channels;
- }
- function getBasicChannel() {
- return getValidChannels()[0];
- }
-
- function getOtherChannels() {
- return getValidChannels().slice(1);
- }
-
- var officialChannel;
- this.getTransportType = function() {
- return (officialChannel ? officialChannel.name : "none");
- }
- var validChannels = "%acceptableChannelTypes%";
- var canUseSubdomains = "%canUseSubdomains%";
- var activeChannels = {};
- var channelConstructors = {
- shortpolling: ShortPollingChannel,
- longpolling: LongPollingChannel,
- streaming: StreamingChannel
- }
-
- function describeTimeouts(timeouts) {
- var out = [];
- for (var i in timeouts) {
- if (timeouts.hasOwnProperty(i)) {
- out.push(i+": "+(timeouts[i] == timeout_noop ? "unset" : "set"));
- }
- }
- return "{ "+out.join(", ")+" }";
- }
-
- var channelSeq = 1;
- function notifyConnect(channel) {
- timeouts.connect();
- if (! officialChannel || channel.weight > officialChannel.weight) {
- log("switching to use channel: "+channel.name);
- var oldChannel = officialChannel;
- officialChannel = channel;
- setTimeout(function() {
- postSingleMessageNow(true, "useChannel:"+(channelSeq++)+":"+channel.name, false, false, function(success, msg) {
- if (success) {
- if (oldChannel) {
- oldChannel.disconnect();
- } else {
- // there was no old channel, so try connecting the other channels.
- doOtherConnect();
- }
- if (self.readyState != self.OPEN) {
- self.readyState = self.OPEN;
- self.onopen({});
- } else {
- self.onhiccup({connected: true});
- }
- } else {
- doDisconnect({reconnect: true, reason: "Failed to select channel on server.", data: msg});
- }
- });
- }, 0);
- return true;
- } else {
- return false;
- }
- }
-
- function randomVar() {
- return String(Math.round(Math.random()*1e12));
- }
-
- function channelPath() {
- return "%contextPath%/channel?v="+version+"&r="+randomVar()+"&id="+id;
- }
-
- function newRequestObject() {
- var xmlhttp=false;
- try {
- xmlhttp = (window.ActiveXObject && new ActiveXObject("Msxml2.XMLHTTP"))
- } catch (e) {
- try {
- xmlhttp = (window.ActiveXObject && new ActiveXObject("Microsoft.XMLHTTP"));
- } catch (E) {
- xmlhttp = false;
- }
- }
- if (!xmlhttp && typeof XMLHttpRequest!='undefined') {
- try {
- xmlhttp = new XMLHttpRequest();
- } catch (e) {
- xmlhttp=false;
- }
- }
- if (!xmlhttp && window.createRequest) {
- try {
- xmlhttp = window.createRequest();
- } catch (e) {
- xmlhttp=false;
- }
- }
- return xmlhttp
- }
-
- function DataFormatError(message) {
- this.message = message;
- }
-
- function readMessage(data, startIndex) {
- if (! startIndex) startIndex = 0;
- var sep = data.indexOf(":", startIndex);
- if (sep < 0) return; // don't have all the bytes for this yet.
- var chars = Number(data.substring(startIndex, sep));
- if (isNaN(chars))
- throw new DataFormatError("Bad length: "+data.substring(startIndex, sep));
- if (data.length < sep+1+chars) return; // don't have all the bytes for this yet.
- var msg = data.substr(sep+1, chars);
- return { message: msg, lastConsumedChar: sep+1+chars }
- }
-
- function iframeReader(data, startIndex) {
- if (startIndex == 0)
- return { message: data, lastConsumedChar: data.length }
- }
-
- function parseWireFormat(data, startIndex, reader) {
- if (! startIndex) startIndex = 0;
- var msgs = [];
- var readThroughIndex = startIndex;
- while (true) {
- var msgObj = (reader || readMessage)(data, readThroughIndex)
- if (! msgObj) break;
- readThroughIndex = msgObj.lastConsumedChar;
- var msg = msgObj.message;
- var split = msg.split(":");
- if (split[0] == 'oob') {
- msgs.push({oob: split.slice(1).join(":")});
- continue;
- }
- var seq = Number(split[0]);
- if (isNaN(seq))
- throw new DataFormatError("Bad sequence number: "+split[0]);
- var control = Number(split[1]);
- if (isNaN(control))
- throw new DataFormatError("Bad control: "+split[1]);
- var msgContent = split.slice(2).join(":");
- msgs.push({seqNumber: seq, isControl: (control == 1), content: msgContent});
- }
- return { messages: msgs, lastConsumedChar: readThroughIndex }
- }
-
- function handleMessages(data, cursor, channel, reader) {
- try {
- messages = parseWireFormat(data, cursor, reader);
- } catch (e) {
- if (e instanceof DataFormatError) {
- log("Data format error: "+e.message);
- hiccup(channel);
- return;
- } else {
- log(e.toString()+" on line: "+e.lineNumber);
- }
- }
- for (var i=0; i < messages.messages.length; i++) {
- var oob = messages.messages[i].oob;
- if (oob) {
- if (oob == "restart-fail") {
- doDisconnect({reconnect: true, reason: "Server restarted or socket timed out on server."});
- return;
- }
- } else {
- if (! dataHandler(messages.messages[i]))
- break;
- }
- }
- return messages.lastConsumedChar;
- }
-
- function ShortPollingChannel() {
- this.weight = 0;
- this.name = "shortpolling";
-
- this.isConnected = false;
- this.isClosed = false;
- this.request;
- this.clearRequest = function() {
- if (this.request) {
- this.request.abort();
- this.request = null;
- }
- }
- this.timeouts = {};
-
- this.describe = function() {
- return "{ isConnected: "+this.isConnected+", isClosed: "+this.isClosed+", timeouts: "+describeTimeouts(this.timeouts)+", request: "+(this.request?"set":"not set")+" }"
- }
-
- this.pollDataHandler = function(sc, response, request) {
- if (request.readyState != 4) return;
- if (this.timeouts.poll) this.timeouts.poll();
- var messages;
- if (! this.isConnected) {
- this.timeouts.connectAttempt();
- if (sc != 200) {
- log(this.name+" connect failed: "+sc+" / "+response);
- setTimeout(_wm(this, this.attemptConnect), 500);
- return;
- }
- var msg = (response ? readMessage(response) : undefined);
- if (msg && msg.message == "oob:ok") {
- this.timeouts.initialConnect();
- this.isConnected = true;
- log(this.name+" transport connected!");
- if (! notifyConnect(this)) {
- // there are better options connected.
- log(this.name+" transport not chosen for activation.");
- this.disconnect();
- return;
- }
- this.doPoll();
- return;
- } else {
- log(this.name+" connect didn't get ok: "+sc+" / "+response);
- setTimeout(_wm(this, this.attemptConnect), 500);
- return;
- }
- }
- var chars = handleMessages(request.responseText, 0, this);
- if (sc != 200 || ((! chars) && this.emptyResponseBad)) {
- hiccup(this);
- }
- setTimeout(_wm(this, this.doPoll), this.pollDelay);
- this.clearRequest();
- }
-
- this.keepRetryingConnection = true;
- this.cancelConnect = function() {
- this.clearRequest();
- this.keepRetryingConnection = false;
- }
- this.cancelPoll = function() {
- this.clearRequest();
- log("poll timed out.");
- hiccup(this);
- }
-
- this.doPoll = function() {
- if (this.isClosed) return;
- timeout(this.timeouts, "poll", this.pollTimeout, _wm(this, this.cancelPoll));
- this.request =
- simpleXhr('GET',
- channelPath()+"&channel="+this.name+"&seq="+lastReceivedSeqNumber+this.pollParams(),
- true, undefined, _wm(this, this.pollDataHandler), this.xhrGenerator);
- }
-
- this.pollParams = function() {
- return "";
- }
- this.pollTimeout = 5000;
- this.pollDelay = 500;
-
- this.attemptConnect = function() {
- if (! this.keepRetryingConnection) return;
- log(this.name+" attempting connect");
- this.clearRequest();
- timeout(this.timeouts, "connectAttempt", 5000, _wm(this, this.attemptConnect));
- this.request = simpleXhr('GET', channelPath()+"&channel="+this.name+"&new=yes&create="+(socket.readyState == socket.OPEN ? "no" : "yes")+"&seq="+lastReceivedSeqNumber,
- true, undefined, _wm(this, this.pollDataHandler), this.xhrGenerator);
- }
- this.connect = function() {
- this.attemptConnect();
- timeout(this.timeouts, "initialConnect", 15000, _wm(this, this.cancelConnect));
- }
- this.disconnect = function() {
- log(this.name+" disconnected");
- this.isClosed = true;
- this.clearRequest();
- }
- }
-
- function StreamingChannel() {
- this.weight = 2;
- this.name = "streaming";
- var self = this;
-
- var isConnected = false;
- var request;
- function clearRequest() {
- if (request) {
- request.abort();
- request = null;
- if (theStream) theStream = null;
- if (ifrDiv) {
- ifrDiv.innerHTML = "";
- ifrDiv = null;
- }
- }
- }
- var isClosed = false;
- var timeouts = {};
- var cursor = 0;
-
- this.describe = function() {
- return "{ isConnected: "+isConnected+", isClosed: "+isClosed+", timeouts: "+describeTimeouts(timeouts)+", request: "+(request?"set":"not set")+", cursor: "+cursor+" }";
- };
-
- function connectOk() {
- isConnected = true;
- timeouts.initialConnect();
- if (! notifyConnect(self)) {
- log("streaming transport not chosen for activation");
- self.disconnect();
- return;
- }
- }
-
- function streamDataHandler() {
- if (timeouts.data) timeouts.data();
- if (isClosed) return;
- try {
- if (! request.responseText) return;
- } catch (e) { return; }
- if (! isConnected) {
- var msg = readMessage(request.responseText, cursor);
- if (! msg) return;
- cursor = msg.lastReceivedSeqNumber;
- if (msg.message == "oob:ok") {
- connectOk();
- } else {
- log("stream: incorrect channel connect message:"+msg.message);
- self.disconnect();
- return;
- }
- } else {
- cursor = handleMessages(request.responseText, cursor, self);
- }
- if (! request || request.readyState == 4) {
- clearRequest();
- if (isConnected) {
- log("stream connection unexpectedly closed.");
- hiccup(self);
- return;
- }
- }
- timeout(timeouts, "data", 60*1000, function() { hiccup(self); });
- }
-
- function iframeDataHandler(data) {
- if (isClosed) return;
- if (! isConnected) {
- if (data == "oob:ok") {
- connectOk();
- } else {
- log("iframe stream: unexpected data on connect - "+data);
- }
- } else {
- handleMessages(data, 0, self, iframeReader);
- }
- }
-
- function cancelConnect() {
- isClosed = true;
- clearRequest();
- log("stream: failed to connect.");
- }
-
- // IE Stuff.
- var theStream;
- var ifrDiv;
- var iframeTestCount = 0;
- function testIframe() {
- var state;
- try {
- state = ifrDiv.firstChild.readyState;
- } catch (e) {
- hiccup(self);
- return;
- }
- if (state == 'interactive' || iframeTestCount > 10) {
- try { var tmp = ifrDiv.firstChild.contentWindow.document.getElementById("thebody") }
- catch (e) { hiccup(self); }
- } else {
- iframeTestCount++;
- setTimeout(testIframe, 500);
- }
- }
-
- this.connect = function() {
- timeout(timeouts, "initialConnect", 15000, cancelConnect)
-
- if (canUseSubdomains) {
- var streamurl = "//"+randomVar()+".comet."+host+channelPath()+"&channel=streaming&type=iframe&new=yes&create="+(socket.readyState == socket.OPEN ? "no" : "yes")+"&seq="+lastReceivedSeqNumber;
- log("stream to: "+streamurl);
- if ($ && $.browser.opera) {
- // set up the opera stream; requires jquery because, why not?
- ifrDiv = $('<div style="display: none;"></div>').get(0);
- $('body').append(ifrDiv);
- window.comet = {
- pass_data: iframeDataHandler,
- disconnect: function() { hiccup(self); }
- }
- $(ifrDiv).append($("<iframe src='"+streamurl+"'></iframe>"));
- iframeTestCount = 0;
- setTimeout(testIframe, 2000);
- // if event-source supported disconnect notifications, fuck yeah we'd use it.
-// theStream = $('<event-source>');
-// var streamurl = channelPath()+"&channel=streaming&type=opera&new=yes&create="+(socket.readyState == socket.OPEN ? "no" : "yes")+"&seq="+lastReceivedSeqNumber;
-// theStream.get(0).addEventListener('message', function(event) {
-// iframeDataHandler(event.data);
-// }, false);
-// theStream.attr('src', streamurl);
- log("stream connect sent!");
- return;
- }
- try { // TODO: remove reference to both theStream and ifrDiv on unload!
- theStream = (window.ActiveXObject && new ActiveXObject("htmlfile"));
- if (theStream) {
- theStream.open();
- theStream.write("<html><head><title>f<\/title><\/head><body>")
- theStream.write("<s"+"cript>document.domain='"+document.domain+"';<\/s"+"cript>")
- theStream.write("<\/body><\/html>")
- theStream.close();
- ifrDiv = theStream.createElement("div")
- theStream.appendChild(ifrDiv)
- theStream.parentWindow.comet = {
- pass_data: iframeDataHandler,
- disconnect: function() { hiccup(self); }
- }
- ifrDiv.innerHTML = "<iframe src='"+streamurl+"'></iframe>";
- iframeTestCount = 0;
- setTimeout(testIframe, 2000);
- }
- } catch (e) {
- theStream = false
- }
- } else if ($ && $.browser.opera) {
- // opera thinks it can do a normal stream, but it can't.
- log("opera - not trying xhr");
- return;
- }
- // End IE Stuff.
- if (! theStream) {
- request = newRequestObject();
- request.open('get', channelPath()+"&channel=streaming&new=yes&create="+(socket.readyState == socket.OPEN ? "no" : "yes")+"&seq="+lastReceivedSeqNumber);
- request.onreadystatechange = streamDataHandler;
- try {
- request.send(null);
- } catch (e) { }
- }
- log("stream connect sent!");
- }
-
- this.disconnect = function() {
- log("stream disconnected");
- isClosed = true;
- clearRequest();
- }
- log("new streamchannel");
- }
-
- // long-polling related stuff.
- function iframePath(key) {
- return "//"+key+".comet."+host+"%contextPath%/xhrXdFrame";
- }
-
- function createHiddenDiv() {
- if (! document.getElementById('newcomethidden')) {
- var d = document.createElement('div');
- d.setAttribute('id', 'newcomethidden');
- d.style.display = 'none';
- document.body.appendChild(d);
- }
- return document.getElementById('newcomethidden');
- }
-
- function ExtHostXHR(iframe) {
- this.open = function(method, uri, async) {
- this.method = method;
- this.uri = uri;
- this.async = async;
- }
- var headers = {};
- this.setRequestHeader = function(name, value) {
- headers[name] = value;
- }
- this.send = function(data) {
- var self = this;
- this.xhr = iframe.iframe.contentWindow.doAction(this.method, this.uri, this.async, headers, data || null, function(status, response) {
- self.readyState = 4;
- self.status = status;
- self.responseText = response;
- self.onreadystatechange();
- });
- }
- this.abort = function() {
- if (this.xhr)
- iframe.contentWindow.doAbort(this.xhr);
- }
- }
-
- function createRequestIframe(cb) {
- var randomKey = randomVar();
- try {
- var activeXControl = (window.ActiveXObject && new ActiveXObject("htmlfile"));
- var htmlfileDiv;
- if (activeXControl) {
- activeXControl.open();
- activeXControl.write('<html><head><title>f</title></head><body>');
- activeXControl.write('<scr'+'ipt>document.domain=\''+document.domain+'\';</scr'+'ipt>');
- activeXControl.write('</body></html>');
- activeXControl.close();
- htmlfileDiv = activeXControl.createElement('div');
- activeXControl.appendChild(htmlfileDiv);
- activeXControl.parentWindow["done_"+randomKey] = cb;
- htmlfileDiv.innerHTML = "<iframe src='"+iframePath(randomKey)+"'></iframe>";
- return {iframe: htmlfileDiv.firstChild /* should be an iframe */, axc: activeXControl, div: htmlfileDiv};
- }
- } catch (e) {
- activeXControl = false;
- }
- log("Not using IE setup.");
- var requestIframe = document.createElement('iframe');
- createHiddenDiv().appendChild(requestIframe);
- window["done_"+randomKey] = function() { try { delete window["done_"+randomKey]; } catch (e) { }; cb(); }
- requestIframe.src = iframePath(randomKey);
- return {iframe: requestIframe};
- }
-
- function createIframeRequestObject() {
- if (! longPollingIFrame) throw Error("WebSocket isn't properly set up!");
- return new ExtHostXHR(longPollingIFrame);
- }
-
- var longPollingIFrame;
- function LongPollingChannel() {
- ShortPollingChannel.apply(this); // sets up other state.
- this.weight = 1;
- this.name = "longpolling";
-
- this.pollDelay = 0;
- this.pollTimeout = 15000;
- this.pollParams = function() {
- return "&timeout="+(this.pollTimeout-5000);
- }
- var connect = this.connect;
- this.connect = function() {
- if (! longPollingIFrame) {
- longPollingIFrame =
- createRequestIframe(_wm(this, connect)); // specifically *not* this.connect. we want the old one!
- } else {
- connect.apply(this);
- }
- }
- this.xhrGenerator = createIframeRequestObject;
- this.emptyResponseBad = true;
- }
-} \ No newline at end of file
diff --git a/trunk/infrastructure/net.appjet.ajstdlib/streaming-iframe.html b/trunk/infrastructure/net.appjet.ajstdlib/streaming-iframe.html
deleted file mode 100644
index 3bdb5c4..0000000
--- a/trunk/infrastructure/net.appjet.ajstdlib/streaming-iframe.html
+++ /dev/null
@@ -1,76 +0,0 @@
-<html>
-<head>
-<script>
-function createRequestObject() {
- var xmlhttp=false;
- /*@cc_on @*/
- /*@if (@_jscript_version >= 5)
- try {
- xmlhttp = new ActiveXObject("Msxml2.XMLHTTP");
- } catch (e) {
- try {
- xmlhttp = new ActiveXObject("Microsoft.XMLHTTP");
- } catch (E) {
- xmlhttp = false;
- }
- }
- @end @*/
- if (!xmlhttp && typeof XMLHttpRequest!='undefined') {
- try {
- xmlhttp = new XMLHttpRequest();
- } catch (e) {
- xmlhttp=false;
- }
- }
- if (!xmlhttp && window.createRequest) {
- try {
- xmlhttp = window.createRequest();
- } catch (e) {
- xmlhttp=false;
- }
- }
- return xmlhttp
-}
-var host = window.location.host;
-var oldDomain = document.domain;
-var newDomain = oldDomain.substring(oldDomain.indexOf(".", oldDomain.indexOf(".")+1)+1);
-
-function doAction(method, uri, async, headers, body, cb) {
- try {
- document.domain = oldDomain;
- } catch (e) { }
- var req = createRequestObject();
- req.open(method, '//'+host+uri, async);
- for (var i in headers) {
- req.setRequestHeader(i, headers[i]);
- }
- req.onreadystatechange = function() {
- if (req.readyState == 4) {
- try {
- document.domain = newDomain;
- cb(req.status, req.responseText);
- } catch (e) {
- // yikes. well, hopefully a timeout will notice this error.
- }
- }
- }
- req.send(body);
-}
-function doAbort(xhr) {
- try {
- document.domain = oldDomain;
- } catch (e) { }
- xhr.abort();
-}
-document.domain = newDomain;
-window.onload = function() {
- var doneKey = 'done_'+oldDomain.split(".", 2)[0];
- setTimeout(function() {
- window.parent[doneKey]();
- }, 0);
-}
-</script>
-</head>
-<body>
-</body>
-</html>
diff --git a/trunk/infrastructure/net.appjet.ajstdlib/streaming.scala b/trunk/infrastructure/net.appjet.ajstdlib/streaming.scala
deleted file mode 100644
index fbff137..0000000
--- a/trunk/infrastructure/net.appjet.ajstdlib/streaming.scala
+++ /dev/null
@@ -1,892 +0,0 @@
-/**
- * Copyright 2009 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS-IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package net.appjet.ajstdlib;
-
-import scala.collection.mutable.{Queue, HashMap, SynchronizedMap, ArrayBuffer};
-import javax.servlet.http.{HttpServletRequest, HttpServletResponse, HttpServlet};
-import org.mortbay.jetty.servlet.{ServletHolder, Context};
-import org.mortbay.jetty.{HttpConnection, Handler, RetryRequest};
-import org.mortbay.jetty.nio.SelectChannelConnector;
-import org.mortbay.io.nio.SelectChannelEndPoint;
-import org.mortbay.util.ajax.{ContinuationSupport, Continuation};
-
-import java.util.{Timer, TimerTask};
-import java.lang.ref.WeakReference;
-
-import org.mozilla.javascript.{Context => JSContext, Scriptable};
-
-import net.appjet.oui._;
-import net.appjet.oui.Util.enumerationToRichEnumeration;
-import net.appjet.common.util.HttpServletRequestFactory;
-
-trait SocketConnectionHandler {
- def message(sender: StreamingSocket, data: String, req: HttpServletRequest);
- def connect(socket: StreamingSocket, req: HttpServletRequest);
- def disconnect(socket: StreamingSocket, req: HttpServletRequest);
-}
-
-object SocketManager {
- val sockets = new HashMap[String, StreamingSocket] with SynchronizedMap[String, StreamingSocket];
- val handler = new SocketConnectionHandler {
- val cometLib = new FixedDiskLibrary(new SpecialJarOrNotFile(config.ajstdlibHome, "oncomet.js"));
- def cometExecutable = cometLib.executable;
-
- def message(socket: StreamingSocket, data: String, req: HttpServletRequest) {
- val t1 = profiler.time;
-// println("Message from: "+socket.id+": "+data);
- val runner = ScopeReuseManager.getRunner;
- val ec = ExecutionContext(new RequestWrapper(req), new ResponseWrapper(null), runner);
- ec.attributes("cometOperation") = "message";
- ec.attributes("cometId") = socket.id;
- ec.attributes("cometData") = data;
- ec.attributes("cometSocket") = socket;
- net.appjet.oui.execution.execute(
- ec,
- (sc: Int, msg: String) =>
- throw new HandlerException(sc, msg, null),
- () => {},
- () => { ScopeReuseManager.freeRunner(runner); },
- Some(cometExecutable));
- cometlatencies.register(((profiler.time-t1)/1000).toInt);
- }
- def connect(socket: StreamingSocket, req: HttpServletRequest) {
-// println("Connect on: "+socket);
- val runner = ScopeReuseManager.getRunner;
- val ec = ExecutionContext(new RequestWrapper(req), new ResponseWrapper(null), runner);
- ec.attributes("cometOperation") = "connect";
- ec.attributes("cometId") = socket.id;
- ec.attributes("cometSocket") = socket;
- net.appjet.oui.execution.execute(
- ec,
- (sc: Int, msg: String) =>
- throw new HandlerException(sc, msg, null),
- () => {},
- () => { ScopeReuseManager.freeRunner(runner); },
- Some(cometExecutable));
- }
- def disconnect(socket: StreamingSocket, req: HttpServletRequest) {
- val toRun = new Runnable {
- def run() {
- val runner = ScopeReuseManager.getRunner;
- val ec = ExecutionContext(new RequestWrapper(req), new ResponseWrapper(null), runner);
- ec.attributes("cometOperation") = "disconnect";
- ec.attributes("cometId") = socket.id;
- ec.attributes("cometSocket") = socket;
- net.appjet.oui.execution.execute(
- ec,
- (sc: Int, msg: String) =>
- throw new HandlerException(sc, msg, null),
- () => {},
- () => { ScopeReuseManager.freeRunner(runner); },
- Some(cometExecutable));
- }
- }
- main.server.getThreadPool().dispatch(toRun);
- }
- }
- def apply(id: String, create: Boolean) = {
- if (create) {
- Some(sockets.getOrElseUpdate(id, new StreamingSocket(id, handler)));
- } else {
- if (id == null)
- error("bad id: "+id);
- sockets.get(id);
- }
- }
- class HandlerException(val sc: Int, val msg: String, val cause: Exception)
- extends RuntimeException("An error occurred while handling a request: "+sc+" - "+msg, cause);
-}
-
-// And this would be the javascript interface. Whee.
-object Comet extends CometSupport.CometHandler {
- def init() {
- CometSupport.cometHandler = this;
- context.start();
- }
-
- val acceptableTransports = {
- val t = new ArrayBuffer[String];
- if (! config.disableShortPolling) {
- t += "shortpolling";
- }
- if (config.transportUseWildcardSubdomains) {
- t += "longpolling";
- }
- t += "streaming";
- t.mkString("['", "', '", "']");
- }
-
-
- val servlet = new StreamingSocketServlet();
- val holder = new ServletHolder(servlet);
- val context = new Context(null, "/", Context.NO_SESSIONS | Context.NO_SECURITY);
- context.addServlet(holder, "/*");
- context.setMaxFormContentSize(1024*1024);
-
- def handleCometRequest(req: HttpServletRequest, res: HttpServletResponse) {
- context.handle(req.getRequestURI().substring(config.transportPrefix.length), req, res, Handler.FORWARD);
- }
-
- lazy val ccLib = new FixedDiskResource(new JarOrNotFile(config.ajstdlibHome, "streaming-client.js") {
- override val classBase = "/net/appjet/ajstdlib/";
- override val fileSep = "/../../net.appjet.ajstdlib/";
- });
- def clientCode(contextPath: String, acceptableChannelTypes: String) = {
- ccLib.contents.replaceAll("%contextPath%", contextPath).replaceAll("\"%acceptableChannelTypes%\"", acceptableChannelTypes).replaceAll("\"%canUseSubdomains%\"", if (config.transportUseWildcardSubdomains) "true" else "false");
- }
- def clientMTime = ccLib.fileLastModified;
-
- lazy val ccFrame = new FixedDiskResource(new JarOrNotFile(config.ajstdlibHome, "streaming-iframe.html") {
- override val classBase = "/net/appjet/ajstdlib/";
- override val fileSep = "/../../net.appjet.ajstdlib/";
- });
- def frameCode = {
- if (! config.devMode)
- ccFrame.contents.replace("<head>\n<script>", """<head>
- <script>
- window.onerror = function() { /* silently drop errors */ }
- </script>
- <script>""");
- else
- ccFrame.contents;
- }
-
-
- // public
- def connections(ec: ExecutionContext): Scriptable = {
- JSContext.getCurrentContext().newArray(ec.runner.globalScope, SocketManager.sockets.keys.toList.toArray[Object]);
- }
-
- // public
- def connectionStatus = {
- val m = new HashMap[String, Int];
- for (socket <- SocketManager.sockets.values) {
- val key = socket.currentChannel.map(_.kind.toString()).getOrElse("(unconnected)");
- m(key) = m.getOrElse(key, 0) + 1;
- }
- m;
- }
-
- // public
- def getNumCurrentConnections = SocketManager.sockets.size;
-
- // public
- def write(id: String, msg: String) {
- SocketManager.sockets.get(id).foreach(_.sendMessage(false, msg));
- }
-
- // public
- def isConnected(id: String): java.lang.Boolean = {
- SocketManager.sockets.contains(id);
- }
-
- // public
- def getTransportType(id: String): String = {
- SocketManager.sockets.get(id).map(_.currentChannel.map(_.kind.toString()).getOrElse("none")).getOrElse("none");
- }
-
- // public
- def disconnect(id: String) {
- SocketManager.sockets.get(id).foreach(x => x.close());
- }
-
- // public
- def setAttribute(ec: ExecutionContext, id: String, key: String, value: String) {
- ec.attributes.get("cometSocket").map(x => Some(x.asInstanceOf[StreamingSocket])).getOrElse(SocketManager.sockets.get(id))
- .foreach(_.attributes(key) = value);
- }
- // public
- def getAttribute(ec: ExecutionContext, id: String, key: String): String = {
- ec.attributes.get("cometSocket").map(x => Some(x.asInstanceOf[StreamingSocket])).getOrElse(SocketManager.sockets.get(id))
- .map(_.attributes.getOrElse(key, null)).getOrElse(null);
- }
-
- // public
- def getClientCode(ec: ExecutionContext) = {
- clientCode(config.transportPrefix, acceptableTransports);
- }
- def getClientMTime(ec: ExecutionContext) = clientMTime;
-}
-
-class StreamingSocket(val id: String, handler: SocketConnectionHandler) {
- var hasConnected = false;
- var shutdown = false;
- var killed = false;
- var currentChannel: Option[Channel] = None;
- val activeChannels = new HashMap[ChannelType.Value, Channel]
- with SynchronizedMap[ChannelType.Value, Channel];
-
- lazy val attributes = new HashMap[String, String] with SynchronizedMap[String, String];
-
- def channel(typ: String, create: Boolean, subType: String): Option[Channel] = {
- val channelType = ChannelType.valueOf(typ);
- if (channelType.isEmpty) {
- streaminglog(Map(
- "type" -> "error",
- "error" -> "unknown channel type",
- "channelType" -> channelType));
- None;
- } else if (create) {
- Some(activeChannels.getOrElseUpdate(channelType.get, Channels.createNew(channelType.get, this, subType)));
- } else {
- activeChannels.get(channelType.get);
- }
- }
-
- val outgoingMessageQueue = new Queue[SocketMessage];
- val unconfirmedMessages = new HashMap[Int, SocketMessage];
-
- var lastSentSeqNumber = 0;
- var lastConfirmedSeqNumber = 0;
-
- // external API
- def sendMessage(isControl: boolean, body: String) {
- if (hasConnected && ! shutdown) {
- synchronized {
- lastSentSeqNumber += 1;
- val msg = new SocketMessage(lastSentSeqNumber, isControl, body);
- outgoingMessageQueue += msg;
- unconfirmedMessages(msg.seq) = msg;
- }
- currentChannel.foreach(_.messageWaiting());
- }
- }
- def close() {
- synchronized {
- sendMessage(true, "kill");
- shutdown = true;
- Channels.timer.schedule(new TimerTask {
- def run() {
- kill("server request, timeout");
- }
- }, 15000);
- }
- }
-
- var creatingRequest: Option[HttpServletRequest] = None;
- // internal API
- def kill(reason: String) {
- synchronized {
- if (! killed) {
- streaminglog(Map(
- "type" -> "event",
- "event" -> "connection-killed",
- "connection" -> id,
- "reason" -> reason));
- killed = true;
- SocketManager.sockets -= id;
- activeChannels.foreach(_._2.close());
- currentChannel = None;
- if (hasConnected) {
- handler.disconnect(this, creatingRequest.getOrElse(null));
- }
- }
- }
- }
- def receiveMessage(body: String, req: HttpServletRequest) {
-// println("Message received on "+id+": "+body);
- handler.message(this, body, req);
- }
- def getWaitingMessage(channel: Channel): Option[SocketMessage] = {
- synchronized {
- if (currentChannel.isDefined && currentChannel.get == channel &&
- ! outgoingMessageQueue.isEmpty) {
- Some(outgoingMessageQueue.dequeue);
- } else {
- None;
- }
- }
- }
- def getUnconfirmedMessages(channel: Channel): Collection[SocketMessage] = {
- synchronized {
- if (currentChannel.isDefined && currentChannel.get == channel) {
- for (i <- lastConfirmedSeqNumber+1 until lastSentSeqNumber+1)
- yield unconfirmedMessages(i);
- } else {
- List[SocketMessage]();
- }
- }
- }
- def updateConfirmedSeqNumber(channel: Channel, received: Int) {
- synchronized {
- if (received > lastConfirmedSeqNumber && (channel == null || (currentChannel.isDefined && channel == currentChannel.get))) {
- val oldConfirmed = lastConfirmedSeqNumber;
- lastConfirmedSeqNumber = received;
- for (i <- oldConfirmed+1 until lastConfirmedSeqNumber+1) { // inclusive!
- unconfirmedMessages -= i;
- }
- }
- }
- }
-
- var lastChannelUpdate = 0;
- def useChannel(seqNo: Int, channelType0: String, req: HttpServletRequest) = synchronized {
- if (seqNo <= lastChannelUpdate) false else {
- lastChannelUpdate = seqNo;
- val channelType = ChannelType.valueOf(channelType0);
- if (channelType.isDefined) {
- val channel = activeChannels.get(channelType.get);
- if (channel.isDefined) {
- if (! hasConnected) {
- hasConnected = true;
- creatingRequest = Some(HttpServletRequestFactory.createRequest(req));
- handler.connect(this, req);
- }
- currentChannel = channel;
-// println("switching "+id+" to channel: "+channelType0);
- if (currentChannel.get.isConnected) {
- revive(channel.get);
- } else {
- hiccup(channel.get);
- }
- currentChannel.get.messageWaiting();
- true;
- } else
- false;
- } else
- false;
- }
- }
-// def handleReceivedMessage(seq: Int, data: String) {
-// synchronized {
-// handler.message(this, data)
-// // TODO(jd): add client->server sequence numbers.
-// // if (seq == lastReceivedSeqNumber+1){
-// // lastReceivedSeqNumber = seq;
-// // handler.message(this, data);
-// // } else {
-// // // handle error.
-// // }
-// }
-// }
- def hiccup(channel: Channel) = synchronized {
- if (currentChannel.isDefined && channel == currentChannel.get) {
-// println("hiccuping: "+id);
- scheduleTimeout();
- }
- }
- def revive(channel: Channel) = synchronized {
- if (currentChannel.isDefined && channel == currentChannel.get) {
-// println("reviving: "+id);
- cancelTimeout();
- }
- }
- def prepareForReconnect() = synchronized {
-// println("client-side hiccup: "+id);
- activeChannels.foreach(_._2.close());
- activeChannels.clear();
- currentChannel = None;
- scheduleTimeout();
- }
-
- // helpers
- var timeoutTask: TimerTask = null;
-
- def scheduleTimeout() {
- if (timeoutTask != null) return;
- val p = new WeakReference(this);
- timeoutTask = new TimerTask {
- def run() {
- val socket = p.get();
- if (socket != null) {
- socket.kill("timeout");
- }
- }
- }
- Channels.timer.schedule(timeoutTask, 15*1000);
- }
- def cancelTimeout() {
- if (timeoutTask != null)
- timeoutTask.cancel();
- timeoutTask = null;
- }
- scheduleTimeout();
-
- streaminglog(Map(
- "type" -> "event",
- "event" -> "connection-created",
- "connection" -> id));
-}
-
-object ChannelType extends Enumeration("shortpolling", "longpolling", "streaming") {
- val ShortPolling, LongPolling, Streaming = Value;
-}
-
-object Channels {
- def createNew(typ: ChannelType.Value, socket: StreamingSocket, subType: String): Channel = {
- typ match {
- case ChannelType.ShortPolling => new ShortPollingChannel(socket);
- case ChannelType.LongPolling => new LongPollingChannel(socket);
- case ChannelType.Streaming => {
- subType match {
- case "iframe" => new StreamingChannel(socket) with IFrameChannel;
- case "opera" => new StreamingChannel(socket) with OperaChannel;
- case _ => new StreamingChannel(socket);
- }
- }
- }
- }
-
- val timer = new Timer(true);
-}
-
-class SocketMessage(val seq: Int, val isControl: Boolean, val body: String) {
- def payload = seq+":"+(if (isControl) "1" else "0")+":"+body;
-}
-
-trait Channel {
- def messageWaiting();
- def close();
- def handle(req: HttpServletRequest, res: HttpServletResponse);
- def isConnected: Boolean;
-
- def kind: ChannelType.Value;
- def sendRestartFailure(ec: ExecutionContext);
-}
-
-trait XhrChannel extends Channel {
- def wrapBody(msg: String) = msg.length+":"+msg;
-
- // wire format: msgLength:seq:[01]:msg
- def wireFormat(msg: SocketMessage) = wrapBody(msg.payload);
- def controlMessage(data: String) = wrapBody("oob:"+data);
-
- def sendRestartFailure(ec: ExecutionContext) {
- ec.response.write(controlMessage("restart-fail"));
- }
-}
-
-// trait IFrameChannel extends Channel {
-// def wireFormat(msg: SocketMessage)
-// }
-
-class ShortPollingChannel(val socket: StreamingSocket) extends Channel with XhrChannel {
- def kind = ChannelType.ShortPolling;
-
- def messageWaiting() {
- // do nothing.
- }
- def close() {
- // do nothing
- }
- def isConnected = false;
-
- def handle(req: HttpServletRequest, res: HttpServletResponse) {
- val ec = req.getAttribute("executionContext").asInstanceOf[ExecutionContext];
- val out = new StringBuilder();
- socket.synchronized {
- socket.revive(this);
- if (req.getParameter("new") == "yes") {
- out.append(controlMessage("ok"));
- } else {
- val lastReceivedSeq = java.lang.Integer.parseInt(req.getParameter("seq"));
- socket.updateConfirmedSeqNumber(this, lastReceivedSeq);
- for (msg <- socket.getUnconfirmedMessages(this)) {
- out.append(wireFormat(msg));
- }
- // ALL MESSAGES ARE UNCONFIRMED AT THIS POINT! JUST CLEAR QUEUE.
- var msg = socket.getWaitingMessage(this);
- while (msg.isDefined) {
- msg = socket.getWaitingMessage(this);
- }
- }
- }
-// println("Writing to "+socket.id+": "+out.toString);
- ec.response.write(out.toString);
- socket.synchronized {
- socket.hiccup(this);
- }
- }
-}
-
-trait IFrameChannel extends StreamingChannel {
- override def wrapBody(msgBody: String) = {
- val txt = "<script type=\"text/javascript\">p('"+
- msgBody.replace("\\","\\\\").replace("'", "\\'")+"');</script>";
- if (txt.length < 256)
- String.format("%256s", txt);
- else
- txt;
- }
-
- def header(req: HttpServletRequest) = {
- val document_domain =
- "\""+req.getHeader("Host").split("\\.").slice(2).mkString(".").split(":")(0)+"\"";
- """<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN"
-"http://www.w3.org/TR/html4/strict.dtd">
-<html><head><title>f</title></head><body id="thebody" onload="(!parent.closed)&&d()"><script type="text/javascript">document.domain = """+document_domain+""";
-var p = function(data) { try { parent.comet.pass_data } catch (err) { /* failed to pass data. no recourse. */ } };
-var d = parent.comet.disconnect;"""+(if(!config.devMode)"\nwindow.onerror = function() { /* silently drop errors */ }\n" else "")+"</script>"; // " - damn textmate mode!
- }
-
- override def sendRestartFailure(ec: ExecutionContext) {
- ec.response.write(header(ec.request.req));
- ec.response.write(controlMessage("restart-fail"));
- }
-
- override def handleNewConnection(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) {
- super.handleNewConnection(req, res, out);
- res.setContentType("text/html");
- out.append(header(req));
- }
-}
-
-trait OperaChannel extends StreamingChannel {
- override def wrapBody(msgBody: String) = {
- "Event: message\ndata: "+msgBody+"\n\n";
- }
- override def handleNewConnection(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) {
- super.handleNewConnection(req, res, out);
- res.setContentType("application/x-dom-event-stream");
- }
-}
-
-class StreamingChannel(val socket: StreamingSocket) extends Channel with XhrChannel {
- def kind = ChannelType.Streaming;
-
- var c: Option[SelectChannelConnector.RetryContinuation] = None;
- var doClose = false;
-
- def messageWaiting() {
- main.server.getThreadPool().dispatch(new Runnable() {
- def run() {
- socket.synchronized {
- c.filter(_.isPending()).foreach(_.resume());
- }
- }
- });
- }
-
- def setSequenceNumberIfAppropriate(req: HttpServletRequest) {
- if (c.get.isNew) {
- val lastReceivedSeq = java.lang.Integer.parseInt(req.getParameter("seq"));
- socket.updateConfirmedSeqNumber(this, lastReceivedSeq);
- }
- }
-
- def sendHandshake(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) {
- out.append(controlMessage("ok"));
- }
-
- def sendUnconfirmedMessages(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) {
- for (msg <- socket.getUnconfirmedMessages(this)) {
- out.append(wireFormat(msg));
- }
- }
-
- def sendWaitingMessages(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) {
- var msg = socket.getWaitingMessage(this);
- while (msg.isDefined) {
- out.append(wireFormat(msg.get));
- msg = socket.getWaitingMessage(this);
- }
- }
-
- def handleUnexpectedDisconnect(req: HttpServletRequest, res: HttpServletResponse, ep: KnowsAboutDispatch) {
- socket.synchronized {
- socket.hiccup(this);
- }
- ep.close();
- }
-
- def writeAndFlush(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder, ep: KnowsAboutDispatch) {
-// println("Writing to "+socket.id+": "+out.toString);
- res.getWriter.print(out.toString);
- res.getWriter.flush();
- }
-
- def suspendIfNecessary(req: HttpServletRequest, res: HttpServletResponse,
- out: StringBuilder, ep: KnowsAboutDispatch) {
- scheduleKeepalive(50*1000);
- ep.undispatch();
- c.get.suspend(0);
- }
-
- def sendKeepaliveIfNecessary(out: StringBuilder, sendKeepalive: Boolean) {
- if (out.length == 0 && sendKeepalive) {
- out.append(controlMessage("keepalive"));
- }
- }
-
- def shouldHandshake(req: HttpServletRequest, res: HttpServletResponse) = c.get.isNew;
-
- var sendKeepalive = false;
- var keepaliveTask: TimerTask = null;
- def scheduleKeepalive(timeout: Int) {
- if (keepaliveTask != null) {
- keepaliveTask.cancel();
- }
- val p = new WeakReference(this);
- keepaliveTask = new TimerTask {
- def run() {
- val channel = p.get();
- if (channel != null) {
- channel.synchronized {
- channel.sendKeepalive = true;
- channel.messageWaiting();
- }
- }
- }
- }
- Channels.timer.schedule(keepaliveTask, timeout);
- }
-
- def handleNewConnection(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) {
- req.setAttribute("StreamingSocketServlet_channel", this);
- res.setHeader("Connection", "close");
- for ((k, v) <- Util.noCacheHeaders) { res.setHeader(k, v); } // maybe this will help with proxies?
- res.setContentType("text/messages; charset=utf-8");
- }
-
- def handle(req: HttpServletRequest, res: HttpServletResponse) {
- val ec = req.getAttribute("executionContext").asInstanceOf[ExecutionContext];
- val ep = HttpConnection.getCurrentConnection.getEndPoint.asInstanceOf[KnowsAboutDispatch];
- val out = new StringBuilder;
- try {
- socket.synchronized {
- val sendKeepaliveNow = sendKeepalive;
- sendKeepalive = false;
- if (keepaliveTask != null) {
- keepaliveTask.cancel();
- keepaliveTask = null;
- }
- c = Some(ContinuationSupport.getContinuation(req, socket).asInstanceOf[SelectChannelConnector.RetryContinuation]);
- setSequenceNumberIfAppropriate(req);
- if (doClose) {
- ep.close();
- return;
- }
- if (c.get.isNew) {
- handleNewConnection(req, res, out);
- } else {
- c.get.suspend(-1);
- if (ep.isDispatched) {
- handleUnexpectedDisconnect(req, res, ep);
- return;
- }
- }
- if (shouldHandshake(req, res)) {
-// println("new stream request: "+socket.id);
- sendHandshake(req, res, out);
- sendUnconfirmedMessages(req, res, out);
- }
- sendWaitingMessages(req, res, out);
- sendKeepaliveIfNecessary(out, sendKeepaliveNow);
- suspendIfNecessary(req, res, out, ep);
- }
- } finally {
- writeAndFlush(req, res, out, ep);
- }
- }
-
- def close() {
- doClose = true;
- messageWaiting();
- }
-
- def isConnected = ! doClose;
-}
-
-class LongPollingChannel(socket: StreamingSocket) extends StreamingChannel(socket) {
-// println("creating longpoll!");
- override def kind = ChannelType.LongPolling;
-
- override def shouldHandshake(req: HttpServletRequest, res: HttpServletResponse) =
- req.getParameter("new") == "yes";
-
- override def sendHandshake(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) {
-// println("sending handshake");
- out.append(controlMessage("ok"));
- }
-
- override def suspendIfNecessary(req: HttpServletRequest, res: HttpServletResponse,
- out: StringBuilder, ep: KnowsAboutDispatch) {
- if (out.length == 0) {
-// println("suspending longpoll: "+socket.id);
- val to = java.lang.Integer.parseInt(req.getParameter("timeout"));
-// println("LongPoll scheduling keepalive for: "+to);
- scheduleKeepalive(to);
- ep.undispatch();
- c.get.suspend(0);
- }
- }
-
- override def writeAndFlush(req: HttpServletRequest, res: HttpServletResponse,
- out: StringBuilder, ep: KnowsAboutDispatch) {
- if (out.length > 0) {
-// println("Writing to "+socket.id+": "+out.toString);
-// println("writing and flushing longpoll")
- val ec = req.getAttribute("executionContext").asInstanceOf[ExecutionContext];
- for ((k, v) <- Util.noCacheHeaders) { ec.response.setHeader(k, v); } // maybe this will help with proxies?
-// println("writing: "+out);
- ec.response.write(out.toString);
- socket.synchronized {
- socket.hiccup(this);
- c = None;
- }
- }
- }
-
- override def handleNewConnection(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) {
- socket.revive(this);
- req.setAttribute("StreamingSocketServlet_channel", this);
- }
-
- override def isConnected = socket.synchronized {
- c.isDefined;
- }
-}
-
-class StreamingSocketServlet extends HttpServlet {
- val version = 2;
-
- override def doGet(req: HttpServletRequest, res: HttpServletResponse) {
-// describeRequest(req);
- val ec = req.getAttribute("executionContext").asInstanceOf[ExecutionContext];
- try {
- if (req.getPathInfo() == "/js/client.js") {
- val contextPath = config.transportPrefix;
- val acceptableTransports = Comet.acceptableTransports;
- ec.response.setContentType("application/x-javascript");
- ec.response.write(Comet.clientCode(contextPath, acceptableTransports));
- } else if (req.getPathInfo() == "/xhrXdFrame") {
- ec.response.setContentType("text/html; charset=utf-8");
- ec.response.write(Comet.frameCode);
- } else {
- val v = req.getParameter("v");
- if (v == null || java.lang.Integer.parseInt(v) != version) {
- res.sendError(HttpServletResponse.SC_BAD_REQUEST, "bad version number!");
- return;
- }
- val existingChannel = req.getAttribute("StreamingSocketServlet_channel");
- if (existingChannel != null) {
- existingChannel.asInstanceOf[Channel].handle(req, res);
- } else {
- val socketId = req.getParameter("id");
- val channelType = req.getParameter("channel");
- val isNew = req.getParameter("new") == "yes";
- val shouldCreateSocket = req.getParameter("create") == "yes";
- val subType = req.getParameter("type");
- val channel = SocketManager(socketId, shouldCreateSocket).map(_.channel(channelType, isNew, subType)).getOrElse(None);
- if (channel.isDefined) {
- channel.get.handle(req, res);
- } else {
- streaminglog(Map(
- "type" -> "event",
- "event" -> "restart-failure",
- "connection" -> socketId));
- val failureChannel = ChannelType.valueOf(channelType).map(Channels.createNew(_, null, subType));
- if (failureChannel.isDefined) {
- failureChannel.get.sendRestartFailure(ec);
- } else {
- ec.response.setStatusCode(HttpServletResponse.SC_NOT_FOUND);
- ec.response.write("So such socket, and/or unknown channel type: "+channelType);
- }
- }
- }
- }
- } catch {
- case e: RetryRequest => throw e;
- case t: Throwable => {
- exceptionlog("A comet error occurred: ");
- exceptionlog(t);
- ec.response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
- ec.response.write(t.getMessage());
- }
- }
- }
-
- def describeRequest(req: HttpServletRequest) {
- println(req.getMethod+" on "+req.getRequestURI()+"?"+req.getQueryString());
- for (pname <-
- req.getParameterNames.asInstanceOf[java.util.Enumeration[String]]) {
- println(" "+pname+" -> "+req.getParameterValues(pname).mkString("[", ",", "]"));
- }
- }
-
- override def doPost(req: HttpServletRequest, res: HttpServletResponse) {
- val v = req.getParameter("v");
- if (v == null || java.lang.Integer.parseInt(v) != version) {
- res.sendError(HttpServletResponse.SC_BAD_REQUEST, "bad version number!");
- return;
- }
- val ec = req.getAttribute("executionContext").asInstanceOf[ExecutionContext];
- val socketId = req.getParameter("id");
- val socket = SocketManager(socketId, false);
-
-// describeRequest(req);
-
- if (socket.isEmpty) {
- ec.response.write("restart-fail");
- streaminglog(Map(
- "type" -> "event",
- "event" -> "restart-failure",
- "connection" -> socketId));
-// println("socket restart-fail: "+socketId);
- } else {
- val seq = java.lang.Integer.parseInt(req.getParameter("seq"));
- socket.get.updateConfirmedSeqNumber(null, seq);
- val messages = req.getParameterValues("m");
- val controlMessages = req.getParameterValues("oob");
- try {
- if (messages != null)
- for (msg <- messages) socket.get.receiveMessage(msg, req);
- if (controlMessages != null)
- for (msg <- controlMessages) {
-// println("Control message from "+socket.get.id+": "+msg);
- msg match {
- case "hiccup" => {
- streaminglog(Map(
- "type" -> "event",
- "event" -> "hiccup",
- "connection" -> socketId));
- socket.get.prepareForReconnect();
- }
- case _ => {
- if (msg.startsWith("useChannel")) {
- val msgParts = msg.split(":");
- socket.get.useChannel(java.lang.Integer.parseInt(msgParts(1)), msgParts(2), req);
- } else if (msg.startsWith("kill")) {
- socket.get.kill("client request: "+msg.substring(Math.min(msg.length, "kill:".length)));
- } else {
- streaminglog(Map(
- "type" -> "error",
- "error" -> "unknown control message",
- "connection" -> socketId,
- "message" -> msg));
- }
- }
- }
- }
- ec.response.write("ok");
- } catch {
- case e: SocketManager.HandlerException => {
- exceptionlog(e);
- ec.response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
- ec.response.write(e.getMessage());
- // log these?
- }
- case t: Throwable => {
- // shouldn't happen...
- exceptionlog(t);
- ec.response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
- ec.response.write(t.getMessage());
- }
- }
- }
- }
-}
diff --git a/trunk/infrastructure/net.appjet.ajstdlib/timer.scala b/trunk/infrastructure/net.appjet.ajstdlib/timer.scala
deleted file mode 100644
index dac8fb6..0000000
--- a/trunk/infrastructure/net.appjet.ajstdlib/timer.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Copyright 2009 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS-IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package net.appjet.ajstdlib;
-
-import scala.collection.mutable.{HashMap,ListBuffer};
-import java.util.concurrent.locks.ReentrantLock;
-
-object timer {
-
- var _timings = new HashMap[String,ListBuffer[double]];
- var _lock = new ReentrantLock;
- var _callstack = new ThreadLocal[ListBuffer[String]];
-
- def start(opname: String) = {
- var _localcallstack = _callstack.get();
- if (_localcallstack == null) {
- _callstack.set(new ListBuffer[String]);
- _localcallstack = _callstack.get();
- }
- _localcallstack += opname;
- var _oplabel = _localcallstack.mkString(".");
- val startTime: long = System.nanoTime();
-
- new {
- def done() {
- val elapsedTimeMs: double = (System.nanoTime() - startTime) / 1.0e6;
-
- _lock.lock();
- try {
- var times = _timings.getOrElse(_oplabel, new ListBuffer[double]);
- /*
- if (times.size > 100000) {
- times = new ListBuffer[double];
- }*/
- times += elapsedTimeMs;
- _timings.put(_oplabel, times);
- _localcallstack.remove(_localcallstack.length-1);
- } finally {
- _lock.unlock();
- }
- }
- }
- }
-
- def getOpNames(): Array[String] = {
- _lock.lock();
- try {
- return _timings.keys.toList.toArray;
- } finally {
- _lock.unlock();
- }
- }
-
- def getStats(opname: String): Array[double] = {
- _lock.lock();
-
- try {
- var times:ListBuffer[double] = _timings(opname);
- var total = times.foldRight(0.0)(_ + _);
- return Array(times.size, total, (total / times.size));
- } finally {
- _lock.unlock();
- }
- }
-
- def reset() {
- _lock.lock();
- _timings = new HashMap[String,ListBuffer[double]];
- _lock.unlock();
- }
-}