diff options
author | alexanders@b2ef00c0-3703-41da-baef-cfe82387ac0c <none@none> | 2010-02-03 00:50:41 +0000 |
---|---|---|
committer | alexanders@b2ef00c0-3703-41da-baef-cfe82387ac0c <none@none> | 2010-02-03 00:50:41 +0000 |
commit | 89bda83e0570ab87c6e449f5955613d5385e90b3 (patch) | |
tree | beae82eff98e4b6e18e1521c49d48d087a8cef55 /trunk/infrastructure/net.appjet.ajstdlib | |
parent | d912ef9675f2e516df4eba081107729afbffe10c (diff) | |
download | etherpad-89bda83e0570ab87c6e449f5955613d5385e90b3.tar.gz etherpad-89bda83e0570ab87c6e449f5955613d5385e90b3.tar.xz etherpad-89bda83e0570ab87c6e449f5955613d5385e90b3.zip |
removed obsolete svn folder from hg tree
--HG--
extra : convert_revision : svn%3Ab2ef00c0-3703-41da-baef-cfe82387ac0c/trunk%408
Diffstat (limited to 'trunk/infrastructure/net.appjet.ajstdlib')
6 files changed, 0 insertions, 2789 deletions
diff --git a/trunk/infrastructure/net.appjet.ajstdlib/ajstdlib.scala b/trunk/infrastructure/net.appjet.ajstdlib/ajstdlib.scala deleted file mode 100644 index 8d285af..0000000 --- a/trunk/infrastructure/net.appjet.ajstdlib/ajstdlib.scala +++ /dev/null @@ -1,253 +0,0 @@ -/** - * Copyright 2009 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS-IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package net.appjet.ajstdlib; - -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.{ScheduledThreadPoolExecutor, Callable}; -import scala.collection.mutable.{HashMap, SynchronizedMap}; - -import org.mozilla.javascript.{Context, ScriptableObject, Function, RhinoException, Scriptable}; - -import net.appjet.oui.{SpecialJarOrNotFile, DiskLibrary, FixedDiskLibrary, VariableDiskLibrary, ExecutionContext, ExecutionContextUtils, ScopeReuseManager, config, exceptionlog}; -import net.appjet.bodylock.{BodyLock, ExecutionException}; -import net.appjet.common.util.LenientFormatter; - -import org.mortbay.jetty.nio.SelectChannelConnector; -import org.mortbay.util.ajax.ContinuationSupport; - -object ajstdlib { - def runModuleInNewScope(cx: ExecutionContext, moduleName: String): Any = { - val newScope = BodyLock.subScope(cx.runner.globalScope); - if (! libraryExists(moduleName)) - return Context.getUndefinedValue(); // unfortunately, returning "false" doesn't really do the right thing here. - try { - libraryExecutable(moduleName).execute(newScope); - } catch { - case e: ExecutionException => throw e; - case e => throw new ExecutionException("Error occurred while running module: "+moduleName, e); - // TODO: There was code here to print errors to the response if something didn't compile. Replace this code? - } - newScope; - } - - private val modules = new HashMap[String, DiskLibrary] with SynchronizedMap[String, DiskLibrary]; - private def library(name: String) = modules.getOrElseUpdate(name+".js", new VariableDiskLibrary(name+".js")); - private def libraryExists(name: String) = { - val lib = library(name); - // ScopeReuseManager.watch(lib); - lib.exists; - } - private def libraryExecutable(name: String) = { - val lib = library(name); - // ScopeReuseManager.watch(lib); - lib.executable; - } - - val globalLock = new ReentrantLock(); - val attributes = new HashMap[String, Any] with SynchronizedMap[String, Any]; - - def init() { - // any other ajstdlib initialization goes here. - Comet.init(); - } -} - -object printf { - def printf(format: String, list: Array[Object]): String = { -// val list: Array[Object] = new Array[Object](argList.getLength) -// for (i <- List.range(0, list.length)) -// list(i) = argList.getElement(i).getOrElse(null) match { -// case AppVM.JSNumber(n) => n -// case AppVM.JSString(s) => s -// case AppVM.JSBoolean(b) => b -// case _ => null -// } - val args = list.map(x => Context.jsToJava(x, classOf[Object])); - try { - val fmt = new LenientFormatter() - fmt.format(format, args: _*) - fmt.toString() - } catch { - case e: java.util.IllegalFormatException => - throw new ExecutionException("String Format Error: <tt>printf</tt> error: "+e.getMessage(), e) - } - } -} - - -import java.security.MessageDigest; - -object md5 { - def md5(input: String): String = { - val bytes = input.getBytes("UTF-8"); - md5(bytes); - } - def md5(bytes: Array[byte]): String = { - var md = MessageDigest.getInstance("MD5"); - var digest = md.digest(bytes); - var builder = new StringBuilder(); - for (b <- digest) { - builder.append(Integer.toString((b >> 4) & 0xf, 16)); - builder.append(Integer.toString(b & 0xf, 16)); - } - builder.toString(); - } -} - -object execution { - def runAsync(ec: ExecutionContext, f: Function) { - ec.asyncs += f; - } - - def executeCodeInNewScope(parentScope: Scriptable, - code: String, name: String, - startLine: Int): Scriptable = { - val ec = ExecutionContextUtils.currentContext; - val executable = - try { - BodyLock.compileString(code, name, startLine); - } catch { - case e: RhinoException => - throw new ExecutionException( - "Failed to execute code in new scope.", e); - } - if (ec == null || ec.runner == null) { - Thread.dumpStack(); - } - val scope = BodyLock.subScope( - if (parentScope != null) parentScope - else ec.runner.mainScope); - scope.setParentScope(ec.runner.mainScope); - executable.execute(scope); - scope; - } - - def runTask(taskName: String, args: Array[Object]): AnyRef = { - val ec = net.appjet.oui.execution.runOutOfBand( - net.appjet.oui.execution.scheduledTaskExecutable, - "Task "+taskName, - Some(Map("taskName" -> taskName, - "taskArguments" -> args)), - { error => - error match { - case e: Throwable => exceptionlog(e); - case _ => exceptionlog(error.toString); - } - }); - ec.result; - } - - def runTaskSimply(taskName: String, args: Array[Object]): AnyRef = { - net.appjet.oui.execution.runOutOfBandSimply( - net.appjet.oui.execution.scheduledTaskExecutable, - Some(Map("taskName" -> taskName, - "taskArguments" -> args))); - } - - def wrapRunTask(taskName: String, args: Array[Object], - returnType: Class[_]): Function0[AnyRef] = { - new Function0[AnyRef] { - def apply = Context.jsToJava(runTaskSimply(taskName, args), returnType); - } - } - - val threadpools = new HashMap[String, ScheduledThreadPoolExecutor] - with SynchronizedMap[String, ScheduledThreadPoolExecutor]; - - def createNamedTaskThreadPool(name: String, poolSize: Int) { - threadpools.put(name, new ScheduledThreadPoolExecutor(poolSize)); - } - - class TaskRunner(val taskName: String, args: Array[Object]) extends Callable[AnyRef] { - def call(): AnyRef = { - runTask(taskName, args); - } - } - - def scheduleTaskInPool(poolName: String, taskName: String, delayMillis: Long, args: Array[Object]) = { - val pool = threadpools.getOrElse(poolName, throw new RuntimeException("No such task threadpool: "+poolName)); - pool.schedule(new TaskRunner(taskName, args), delayMillis, java.util.concurrent.TimeUnit.MILLISECONDS); - } - - def shutdownAndWaitOnTaskThreadPool(poolName: String, timeoutMillis: Long) = { - val pool = threadpools.getOrElse(poolName, throw new RuntimeException("No such task threadpool: "+poolName)); - pool.shutdown(); - pool.awaitTermination(timeoutMillis, java.util.concurrent.TimeUnit.MILLISECONDS); - } - - def getContinuation(ec: ExecutionContext) = { - val req = ec.request.req; - ContinuationSupport.getContinuation(req, req).asInstanceOf[SelectChannelConnector.RetryContinuation]; - } - - def sync[T](obj: AnyRef)(block: => T): T = { - obj.synchronized { - block; - } - } -} - -import javax.mail._; -import javax.mail.internet._; -import java.util.Properties; - -object email { - def sendEmail(toAddr: Array[String], fromAddr: String, subject: String, headers: Scriptable, content: String): String = { - try { - val badAddresses = for (a <- toAddr if (a.indexOf("@") == -1)) yield a; - if (badAddresses.length > 0) { - "The email address"+(if (badAddresses.length > 1) "es" else "")+" "+ - badAddresses.mkString("\"", "\", \"", "\"")+" do"+(if (badAddresses.length == 1) "es" else "")+ - " not appear to be valid."; - } else { - val debug = false; - - val props = new Properties; - props.put("mail.smtp.host", config.smtpServerHost); - props.put("mail.smtp.port", config.smtpServerPort.toString()); - if (config.smtpUser != "") - props.put("mail.smtp.auth", "true"); - - val session = Session.getInstance(props, if (config.smtpUser != "") new Authenticator { - override def getPasswordAuthentication() = - new PasswordAuthentication(config.smtpUser, config.smtpPass); - } else null); - session.setDebug(debug); - - val msg = new MimeMessage(session); - val fromIAddr = new InternetAddress(fromAddr); - msg.setFrom(fromIAddr); - val toIAddr: Array[Address] = toAddr.map(x => (new InternetAddress(x))); // new InternetAddress(toAddr); - msg.setRecipients(Message.RecipientType.TO, toIAddr); - - if (headers != null) - for (o <- headers.getIds() if o.isInstanceOf[String]) { - val k = o.asInstanceOf[String] - msg.addHeader(k, headers.get(k, headers).asInstanceOf[String]); - } - - msg.setSubject(subject); - msg.setContent(content, "text/plain"); - Transport.send(msg); - ""; - } - } catch { - case e: MessagingException => { exceptionlog(e); e.printStackTrace() ; "Messaging exception: "+e.getMessage+"."; } - case e: Exception => { exceptionlog(e); e.printStackTrace(); "Unknown error."; } - } - } -} diff --git a/trunk/infrastructure/net.appjet.ajstdlib/sqlbase.scala b/trunk/infrastructure/net.appjet.ajstdlib/sqlbase.scala deleted file mode 100644 index 047c086..0000000 --- a/trunk/infrastructure/net.appjet.ajstdlib/sqlbase.scala +++ /dev/null @@ -1,563 +0,0 @@ -/** - * Copyright 2009 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS-IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package net.appjet.ajstdlib; - -import scala.collection.mutable.ArrayBuffer; - -import java.sql.{DriverManager, SQLException, Statement}; -import net.appjet.oui.{profiler, config, NoninheritedDynamicVariable}; -import com.mchange.v2.c3p0._; - -class SQLBase(driverClass: String, url: String, userName: String, password: String) { - - def isMysql:Boolean = (url.startsWith("jdbc:mysql:")); - def isDerby:Boolean = (url.startsWith("jdbc:derby:")); - - if (isDerby) { - System.setProperty("derby.system.home", config.derbyHome); - val f = new java.io.File(config.derbyHome); - if (! f.exists) { - if (! f.mkdirs()) - throw new RuntimeException("Couldn't create internal database storage directory: "+config.derbyHome); - } - if (! f.isDirectory) - throw new RuntimeException("Internal database storage directory is not a directory: "+config.derbyHome); - if (! f.canWrite) - throw new RuntimeException("Can't write to internal database storage directory: "+config.derbyHome); - } - - val cpds = new ComboPooledDataSource(); - cpds.setDriverClass(driverClass); - cpds.setJdbcUrl(url+(if (isMysql) "?useUnicode=true&characterEncoding=utf8" else "")); - - // derby does not require a password - if (!isDerby) { - cpds.setUser(userName); - cpds.setPassword(password); - } - - cpds.setMaxPoolSize(config.jdbcPoolSize); - cpds.setMaxConnectionAge(6*60*60); // 6 hours - if (config.devMode) { - cpds.setAutomaticTestTable("cpds_testtable"); - cpds.setTestConnectionOnCheckout(true); - } - -// { -// // register db driver -// try { -// new JDCConnectionDriver(driverClass, url+"?useUnicode=true&characterEncoding=utf8", userName, password); -// } catch { -// case e => { -// e.printStackTrace(); -// Runtime.getRuntime.halt(1); -// } -// } -// } - - private def getConnectionFromPool = { - val c = cpds.getConnection(); - c.setAutoCommit(true); - c; - } - - // Creates a dynamic variable whose .value depends on the innermost - // .withValue(){} on the call-stack. - private val currentConnection = new NoninheritedDynamicVariable[Option[java.sql.Connection]](None); - - def withConnection[A](block: java.sql.Connection=>A): A = { - currentConnection.value match { - case Some(c) => { - block(c); - } - case None => { - val t1 = profiler.time; - val c = getConnectionFromPool; - profiler.recordCumulative("getConnection", profiler.time-t1); - try { - currentConnection.withValue(Some(c)) { - block(c); - } - } finally { - c.close; - } - } - } - } - - private val currentlyInTransaction = new NoninheritedDynamicVariable(false); - - def inTransaction[A](block: java.sql.Connection=>A): A = { - withConnection(c => { - if (currentlyInTransaction.value) { - return block(c); - } else { - currentlyInTransaction.withValue(true) { - c.setAutoCommit(false); - c.setTransactionIsolation(java.sql.Connection.TRANSACTION_REPEATABLE_READ); - - try { - val result = block(c); - c.commit(); - c.setAutoCommit(true); - result; - } catch { - case e@net.appjet.oui.AppGeneratedStopException => { - c.commit(); - c.setAutoCommit(true); - throw e; - } - case (e:org.mozilla.javascript.WrappedException) if (e.getWrappedException == - net.appjet.oui.AppGeneratedStopException) => { - c.commit(); - c.setAutoCommit(true); - throw e; - } - case e => { - //println("inTransaction() caught error:"); - //e.printStackTrace(); - try { - c.rollback(); - c.setAutoCommit(true); - } catch { - case ex => { - println("Could not rollback transaction because: "+ex.toString()); - } - } - throw e; - } - } - } - } - }); - } - - def closing[A](closable: java.sql.Statement)(block: =>A): A = { - try { block } finally { closable.close(); } - } - - def closing[A](closable: java.sql.ResultSet)(block: =>A): A = { - try { block } finally { closable.close(); } - } - - def tableName(t: String) = id(t); - - val identifierQuoteString = withConnection(_.getMetaData.getIdentifierQuoteString); - def quoteIdentifier(s: String) = identifierQuoteString+s+identifierQuoteString; - private def id(s: String) = quoteIdentifier(s); - - def longTextType = if (isDerby) "CLOB" else "MEDIUMTEXT"; - - // derby seems to do things intelligently w.r.t. case-sensitivity and unicode support. - def createTableOptions = if (isMysql) " ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE utf8_bin" else ""; - - // creates table if it doesn't exist already - def createJSONTable(table: String) { - withConnection { c=> - val s = c.createStatement; - if (! doesTableExist(c, table)) { - closing(s) { - s.execute("CREATE TABLE "+tableName(table)+" ("+ - id("ID")+" VARCHAR(128) PRIMARY KEY NOT NULL, "+ - id("JSON")+" "+longTextType+" NOT NULL"+ - ")"+createTableOptions); - } - } - } - } - - // requires: table exists - // returns null if key doesn't exist - def getJSON(table: String, key: String): String = { - withConnection { c=> - val s = c.prepareStatement("SELECT "+id("JSON")+" FROM "+tableName(table)+" WHERE "+id("ID")+" = ?"); - closing(s) { - s.setString(1, key); - var resultSet = s.executeQuery(); - closing(resultSet) { - if (! resultSet.next()) { - null; - } - else { - resultSet.getString(1); - } - } - } - } - } - - def getAllJSON(table: String, start: Int, count: Int): Array[Object] = { - withConnection { c => - val s = c.prepareStatement("SELECT "+id("ID")+","+id("JSON")+" FROM "+tableName(table)+ - " ORDER BY "+id("ID")+" DESC"+ - " LIMIT ? OFFSET ?"); - closing(s) { - s.setInt(2, start); - s.setInt(1, count); - var resultSet = s.executeQuery(); - var output = new ArrayBuffer[Object]; - closing(resultSet) { - while (resultSet.next()) { - output += new { val id = resultSet.getString(1); val value = resultSet.getString(2) }; - } - output.toArray; - } - } - } - } - - def getAllJSONKeys(table: String): Array[String] = { - withConnection { c => - val s = c.prepareStatement("SELECT "+id("ID")+" FROM "+tableName(table)); - closing(s) { - var resultSet = s.executeQuery(); - var output = new ArrayBuffer[String]; - closing(resultSet) { - while (resultSet.next()) { - output += resultSet.getString(1); - } - output.toArray; - } - } - } - } - - // requires: table exists - // inserts key if it doesn't exist - def putJSON(table: String, key: String, json: String) { - withConnection { c=> - val update = c.prepareStatement("UPDATE "+tableName(table)+" SET "+id("JSON")+"=? WHERE "+id("ID")+"=?"); - closing(update) { - update.setString(1, json); - update.setString(2, key); - update.executeUpdate(); - if (update.getUpdateCount == 0) { - val insert = c.prepareStatement( - "INSERT INTO "+tableName(table)+" ("+id("ID")+", "+id("JSON")+") values (?,?)"); - closing(insert) { - insert.setString(1, key); - insert.setString(2, json); - insert.executeUpdate(); - } - } - } - } - } - - def deleteJSON(table: String, key: String) { - // requires: table exists - withConnection { c=> - val update = c.prepareStatement("DELETE FROM "+tableName(table)+" WHERE "+id("ID")+"=?"); - closing(update) { - update.setString(1, key); - update.executeUpdate(); - } - } - } - - private def metaName(table: String) = table+"_META"; - private def metaTableName(table: String) = tableName(metaName(table)); - private def textTableName(table: String) = tableName(table+"_TEXT"); - private def escapeSearchString(dbm: java.sql.DatabaseMetaData, s: String): String = { - val e = dbm.getSearchStringEscape(); - s.replace("_", e+"_").replace("%", e+"%"); - } - - private final val PAGE_SIZE = 20; - - def doesTableExist(connection: java.sql.Connection, table: String): Boolean = { - val databaseMetadata = connection.getMetaData; - val tables = databaseMetadata.getTables(null, null, - escapeSearchString(databaseMetadata, table), null); - closing(tables) { - tables.next(); - } - } - - def autoIncrementClause = if (isDerby) "GENERATED BY DEFAULT AS IDENTITY" else "AUTO_INCREMENT"; - - // creates table if it doesn't exist already - def createStringArrayTable(table: String) { - withConnection { c=> - if (! doesTableExist(c, metaName(table))) { // check to see if the *_META table exists - // create tables and indices - val s = c.createStatement; - closing(s) { - s.execute("CREATE TABLE "+metaTableName(table)+" ("+ - id("ID")+" VARCHAR(128) PRIMARY KEY NOT NULL, "+ - id("NUMID")+" INT UNIQUE "+autoIncrementClause+" "+ - ")"+createTableOptions); - val defaultOffsets = (1 to PAGE_SIZE).map(x=>"").mkString(","); - s.execute("CREATE TABLE "+textTableName(table)+" ("+ - ""+id("NUMID")+" INT, "+id("PAGESTART")+" INT, "+id("OFFSETS")+" VARCHAR(256) NOT NULL DEFAULT '"+defaultOffsets+ - "', "+id("DATA")+" "+longTextType+" NOT NULL"+ - ")"+createTableOptions); - s.execute("CREATE INDEX "+id(table+"-NUMID-PAGESTART")+" ON "+textTableName(table)+"("+id("NUMID")+", "+id("PAGESTART")+")"); - } - } - } - } - - // requires: table exists - // returns: null if key or (key,index) doesn't exist, else the value - def getStringArrayElement(table: String, key: String, index: Int): String = { - val (pageStart, offset) = getPageStartAndOffset(index); - val page = new StringArrayPage(table, key, pageStart, true); - page.data(offset); - } - - // requires: table exists - // returns: an array of the mappings present in the page that should hold the - // particular (key,index) mapping. the array may be empty or otherwise not - // contain the given (key,index). - def getPageStringArrayElements(table: String, key: String, index: Int): Array[IndexValueMapping] = { - val (pageStart, offset) = getPageStartAndOffset(index); - val page = new StringArrayPage(table, key, pageStart, true); - val buf = new scala.collection.mutable.ListBuffer[IndexValueMapping]; - - for(i <- 0 until page.data.length) { - val s = page.data(i); - if (s ne null) { - val n = pageStart + i; - buf += IndexValueMapping(n, s); - } - } - - buf.toArray; - } - - // requires: table exists - // creates key if doesn't exist - // value may be null - def putStringArrayElement(table: String, key: String, index: Int, value: String) { - val (pageStart, offset) = getPageStartAndOffset(index); - val page = new StringArrayPage(table, key, pageStart, false); - page.data(offset) = value; - page.updateDB(); - } - - def putMultipleStringArrayElements(table: String, key: String): Multiputter = new Multiputter { - var currentPage = None:Option[StringArrayPage]; - def flushPage() { - if (currentPage.isDefined) { - val page = currentPage.get; - page.updateDB(); - currentPage = None; - } - } - def finish() { - flushPage(); - } - def put(index: Int, value: String) { - try { - val (pageStart, offset) = getPageStartAndOffset(index); - if (currentPage.isEmpty || currentPage.get.pageStart != pageStart) { - flushPage(); - currentPage = Some(new StringArrayPage(table, key, pageStart, false)); - } - currentPage.get.data(offset) = value; - } - catch { - case e => { e.printStackTrace; throw e } - } - } - } - - trait Multiputter { - def put(index: Int, value: String); - def finish(); - } - - case class IndexValueMapping(index: Int, value: String); - - def clearStringArray(table: String, key: String) { - withConnection { c=> - val numid = getStringArrayNumId(c, table, key, false); - if (numid >= 0) { - { - val s = c.prepareStatement("DELETE FROM "+textTableName(table)+" WHERE "+id("NUMID")+"=?"); - closing(s) { - s.setInt(1, numid); - s.executeUpdate(); - } - } - { - val s = c.prepareStatement("DELETE FROM "+metaTableName(table)+" WHERE "+id("NUMID")+"=?"); - closing(s) { - s.setInt(1, numid); - s.executeUpdate(); - } - } - } - } - } - - private def getPageStartAndOffset(index: Int): (Int,Int) = { - val pageStart = (index / PAGE_SIZE) * PAGE_SIZE; - (pageStart, index - pageStart); - } - - // requires: table exists - // returns: numid of new string array - private def newStringArray(c: java.sql.Connection, table: String, key: String): Int = { - val s = c.prepareStatement("INSERT INTO "+metaTableName(table)+" ("+id("ID")+") VALUES (?)", - Statement.RETURN_GENERATED_KEYS); - closing(s) { - s.setString(1, key); - s.executeUpdate(); - val resultSet = s.getGeneratedKeys; - if (resultSet == null) - error("No generated numid for insert"); - closing(resultSet) { - if (! resultSet.next()) error("No generated numid for insert"); - resultSet.getInt(1); - } - } - } - - def getStringArrayNumId(c: java.sql.Connection, table: String, key: String, creating: Boolean): Int = { - val s = c.prepareStatement("SELECT "+id("NUMID")+" FROM "+metaTableName(table)+" WHERE "+id("ID")+"=?"); - closing(s) { - s.setString(1, key); - val resultSet = s.executeQuery(); - closing(resultSet) { - if (! resultSet.next()) { - if (creating) { - newStringArray(c, table, key); - } - else { - -1 - } - } - else { - resultSet.getInt(1); - } - } - } - } - - def getStringArrayAllKeys(table: String): Array[String] = { - withConnection { c=> - val s = c.prepareStatement("SELECT "+id("ID")+" FROM "+metaTableName(table)); - closing(s) { - val resultSet = s.executeQuery(); - closing(resultSet) { - val buf = new ArrayBuffer[String]; - while (resultSet.next()) { - buf += resultSet.getString(1); - } - buf.toArray; - } - } - } - } - - private class StringArrayPage(table: String, key: String, val pageStart: Int, readonly: Boolean) { - - val data = new Array[String](PAGE_SIZE); - - private val numid = withConnection { c=> - val nid = getStringArrayNumId(c, table, key, ! readonly); - - if (nid >= 0) { - val s = c.prepareStatement( - "SELECT "+id("OFFSETS")+","+id("DATA")+" FROM "+textTableName(table)+" WHERE "+id("NUMID")+"=? AND "+id("PAGESTART")+"=?"); - closing(s) { - s.setInt(1, nid); - s.setInt(2, pageStart); - val resultSet = s.executeQuery(); - closing(resultSet) { - if (! resultSet.next()) { - if (! readonly) { - val insert = c.prepareStatement("INSERT INTO "+textTableName(table)+ - " ("+id("NUMID")+", "+id("PAGESTART")+", "+id("DATA")+") VALUES (?,?,'')"); - closing(insert) { - insert.setInt(1, nid); - insert.setInt(2, pageStart); - insert.executeUpdate(); - } - } - } - else { - val offsetsField = resultSet.getString(1); - val dataField = resultSet.getString(2); - val offsetStrings = offsetsField.split(",", -1); - var i = 0; - var idx = 0; - while (i < PAGE_SIZE) { - val nstr = offsetStrings(i); - if (nstr != "") { - val n = nstr.toInt; - data(i) = dataField.substring(idx, idx+n); - idx += n; - } - i += 1; - } - } - } - } - } - nid; - } - - def updateDB() { - if (readonly) { - error("this is a readonly StringArrayPage"); - } - // assert: the relevant row of the TEXT table exists - if (data.find(_ ne null).isEmpty) { - withConnection { c=> - val update = c.prepareStatement("DELETE FROM "+textTableName(table)+ - " WHERE "+id("NUMID")+"=? AND "+id("PAGESTART")+"=?"); - closing(update) { - update.setInt(1, numid); - update.setInt(2, pageStart); - update.executeUpdate(); - } - } - } - else { - val offsetsStr = data.map(s => if (s eq null) "" else s.length.toString).mkString(","); - val dataStr = data.map(s => if (s eq null) "" else s).mkString(""); - withConnection { c=> - val s = c.prepareStatement("UPDATE "+textTableName(table)+ - " SET "+id("OFFSETS")+"=?, "+id("DATA")+"=? WHERE "+id("NUMID")+"=? AND "+id("PAGESTART")+"=?"); - closing(s) { - s.setString(1, offsetsStr); - s.setString(2, dataStr); - s.setInt(3, numid); - s.setInt(4, pageStart); - s.executeUpdate(); - } - } - } - } - } - - def close { - if (isDerby) { - cpds.close(); - try { - DriverManager.getConnection("jdbc:derby:;shutdown=true"); - } catch { - case e: SQLException => if (e.getErrorCode() != 50000) throw e - } - } - } -} - - diff --git a/trunk/infrastructure/net.appjet.ajstdlib/streaming-client.js b/trunk/infrastructure/net.appjet.ajstdlib/streaming-client.js deleted file mode 100644 index 3bfa227..0000000 --- a/trunk/infrastructure/net.appjet.ajstdlib/streaming-client.js +++ /dev/null @@ -1,920 +0,0 @@ -/** - * Copyright 2009 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS-IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -var host = window.location.host; - -function WebSocket(id) { - var self = this; - var socket = this; - var version = 2; - - var timeouts = {}; - - this.onopen = function() { } - this.onclosed = function() { } - this.onmessage = function() { } - this.onhiccup = function() { } - this.onlogmessage = function() { } - this.CONNECTING = 0; - this.OPEN = 1; - this.CLOSED = 2; - this.readyState = -1; - - var hiccupsLastMinute = 0; - var hiccupResetInterval = setInterval(function() { - hiccupsLastMinute = 0; - if (self.readyState == self.CLOSED) - clearInterval(hiccupResetInterval); - }, 60*1000); - - var isHiccuping = false; - function hiccup(channel) { - if (channel != officialChannel && channel != self) return; - if (isHiccuping) return; - log("hiccup: "+channel.name); - if (hiccupsLastMinute++ > 10) { - doDisconnect({reconnect: true, reason: "Too many hiccups!"}); - return; - } - closeAllChannels(); - timeout(timeouts, "hiccup", 15000, function() { - isHiccuping = false; - doDisconnect({reconnect: false, reason: "Couldn't contact server to hiccup."}); - }); - isHiccuping = true; - function tryHiccup() { - if (! isHiccuping) return; - self.onhiccup({connected: false}); - log("trying hiccup"); - timeout(timeouts, "singleHiccup", 5000, function() { - tryHiccup(); - }); - simpleXhr('post', postPath(), true, [{key: "oob", value: "hiccup"}], function(sc, msg) { - if (! isHiccuping) return; - if (msg.substring(0, "restart-fail".length) == "restart-fail") { - doDisconnect({reconnect: true, reason: "Server restarted or socket timed out on server."}); - } else if (sc != 200 || msg.substring(0, 2) != "ok") { - log("Failed to hiccup with error: "+sc+" / "+msg); - setTimeout(tryHiccup, 500); - } else { - isHiccuping = false; - timeouts.singleHiccup(); - timeouts.hiccup(); - doConnect(); - } - }); - } - tryHiccup(); - } - function closeAllChannels() { - for (var i in activeChannels) { - if (activeChannels.hasOwnProperty(i)) { - activeChannels[i].disconnect(); - } - } - officialChannel = undefined; - } - - function doDisconnect(obj, silent, sync) { - log("disconnected: "+obj.reason+" / "+(obj.data !== undefined ? "data: "+obj.data : "")); - logAll(); - closeAllChannels(); - if (longPollingIFrame && longPollingIFrame.div) { - longPollingIFrame.div.innerHTML = ""; - } - if (self.readyState != self.CLOSED) { - self.readyState = self.CLOSED; - if (! silent) { - postSingleMessageNow(true, "kill:"+obj.reason, sync, true); - } - self.onclosed(obj); - } - } - - this.disconnect = function(sync) { - doDisconnect({reason: "Closed by client."}, false, sync); - } - - - function doBasicConnect() { - var type = getBasicChannel(); - log("basic connect on type: "+type); - var channel = activeChannels[type] = new channelConstructors[type](); - channel.connect(); - } - - function doOtherConnect() { - var channels = getOtherChannels(); - var channel; var type; - for (var i = 0; i < channels.length; ++i) { - type = channels[i]; - log("other connect on type: "+type); - channel = activeChannels[type] = new channelConstructors[type](); - channel.connect(); - } - } - function doConnect() { - log("doing connect!"); - timeout(timeouts, "connect", 15000, function() { - doDisconnect({reconnect: false, reason: "Timeout connecting to server: no channel type was able to connect."}); - }); - doBasicConnect(); - } - - this.connect = function() { - log("socket connecting: "+id); - doConnect(); - } - - // util - function nicetime() { return Math.floor((new Date()).valueOf() / 100) % 10000000; } - function log(s) { self.onlogmessage("(comet @t: "+nicetime()+") "+s); } - function logAll() { - log(self.describe()) - } - this.describe = function() { - function describeChannels() { - out = []; - for (var i in activeChannels) { - if (activeChannels.hasOwnProperty(i)) { - out.push(i+": "+activeChannels[i].describe()); - } - } - return "[ "+out.join(", ")+" ]"; - } - return ("socket state: { id: "+id+", readyState: "+self.readyState+", isHiccuping: "+isHiccuping+", timeouts: "+describeTimeouts(timeouts)+", officialChannel: "+(officialChannel?officialChannel.name:"none")+", channels: "+describeChannels()+", isPosting: "+isPosting+", lastReceivedSeqNumber: "+lastReceivedSeqNumber+", lastPost: "+lastPost+", postTimeouts: "+describeTimeouts(postTimeouts)+", channelSeq: "+channelSeq+" }"); - } - - function wrapMethod(obj, method) { - return function() { - var arr = []; - for (var i=0; i < arguments.length; i++) { - arr.push(arguments[i]); - } - method.apply(obj, arr); - } - } - var _wm = wrapMethod; - - // cb should take statusCode, responseText, and optionally request - function simpleXhr(method, uri, async, params, cb, makeXhr) { -// log("making simple Xhr: "+[method, uri, async, params].join(", ")); - var request = (makeXhr || newRequestObject)(); - request.open(method, uri, async); - if (async) { - request.onreadystatechange = function() { - if (request.readyState != 4) return; - var status; - var responseText; - try { - status = request.status; - responseText = request.responseText; - } catch (e) { /* absorb ff error accessing request properties */ } - cb(status, responseText, request); - } - } - var data = null; - if (params) { - data = []; - for (var i = 0; i < params.length; ++i) { - data.push(encodeURIComponent(params[i].key)+"="+encodeURIComponent(params[i].value)); - } - data = data.join("&"); - request.setRequestHeader("Content-Type", "application/x-www-form-urlencoded; charset=utf-8"); - } - try { - request.send(data); - } catch (e) { request.abort(); cb(500, "Error sending data!", request); } - if (! async) { - var status; - var responseText; - try { - status = request.status; - responseText = request.responseText; - } catch (e) { /* absorb ff error accessing request properties */ } - cb(status, responseText, request); - } - return request; - } - - var timeout_noop = function() { } - function timeout(timeoutObject, timeoutName, millis, timeoutCallback) { - function clearIt(timeoutObject, timeoutName) { - if (timeoutObject[timeoutName]) { - timeoutObject[timeoutName](); - timeoutObject[timeoutName] = timeout_noop; - } - } - var timeoutId = setTimeout(function() { clearIt(timeoutObject, timeoutName); timeoutCallback(); }, millis); - var f = function() { - clearTimeout(timeoutId); - } - clearIt(timeoutObject, timeoutName); - timeoutObject[timeoutName] = f; - return f; - } - - // handling messages - var lastReceivedSeqNumber = 0; - - function dataHandler(msg) { - if (msg.seqNumber > lastReceivedSeqNumber+1) { - log("bad sequence number. expecting: "+(lastReceivedSeqNumber+1)+", got: "+msg.seqNumber); - hiccup(self); - return false; - } - if (msg.seqNumber < lastReceivedSeqNumber+1) return true; - lastReceivedSeqNumber = msg.seqNumber; - if (! msg.isControl) { - self.onmessage({ data: msg.content }); - return true; - } else { - if (msg.content == "kill") { - doDisconnect({reconnect: false, reason: "Killed by server."}); - return false; - } - } - } - - // client-server comm - var postPath = function() { - return "%contextPath%/post?r="+randomVar()+"&v="+version+"&id="+id+"&seq="+lastReceivedSeqNumber; - } - - function SimpleQueue() { - var base = []; - var head = 0; - var tail = 0; - this.offer = function(data) { - base[tail++] = data; - } - this.poll = function() { - if (this.length() > 0) { - var n = base[head]; - delete base[head++]; - return n; - } - } - this.clear = function() { - head = 0; - tail = 0; - var oldBase = base; - base = []; - return oldBase; - } - this.length = function() { - return tail - head; - } - } - var outgoingMessageQueue = new SimpleQueue(); - var isPosting = false; - var postTimeouts = {}; - var lastPost; - - function postSingleMessageNow(isControl, data, sync, force, cb) { - doPostMessages([{oob: isControl, data: data, cb: cb}], sync, force) - } - - function doPostMessages(messages, sync, force, cb) { - if (! force && self.readyState == self.CLOSED) return; - if (messages.length == 0) { - if (cb) cb(); - return; - } - var data = []; - var callbacks = []; - for (var i = 0; i < messages.length; ++i) { - data.push({key: (messages[i].oob ? "oob" : "m"), - value: messages[i].data}); - if (messages[i].cb) - callbacks.push(messages[i].cb); - } - function postFailed(sc, msg, req, src) { - var str = ""; - try { - str = sc + ": "+req.statusText+" - "+msg+" ("+src+")"; - } catch (e) { /* absorb potential Firefox error accessing req */ } - doDisconnect({reconnect: true, reason: "Posting message failed.", data: str}); - for (var i = 0; i < callbacks.length; ++i) { - callbacks[i](false, str); - } - } - function postCallback(sc, msg, request) { - postTimeouts.post(); - if (sc != 200 || msg.substring(0, 2) != "ok") { - postFailed(sc, msg, request, "1"); - } else { - for (var i = 0; i < callbacks.length; ++i) { - callbacks[i](true); - } - if (cb) cb(); - } - } - timeout(postTimeouts, "post", 15000, function() { - doDisconnect({reconnect: true, reason: "Posting message timed out."}); - }); - simpleXhr('post', postPath(), ! sync, data, postCallback); - } - - function postPendingMessages() { - if (isPosting == true) - return; - var messages = outgoingMessageQueue.clear(); - if (messages.length == 0) { - return; - } - isPosting = true; - doPostMessages(messages, false, false, function() { isPosting = false; setTimeout(postPendingMessages, 0); }); - lastPost = nicetime(); - } - this.postMessage = function(data, cb) { - if (self.readyState != self.OPEN) { - return; - } - outgoingMessageQueue.offer({data: data, cb: cb}); - setTimeout(function() { postPendingMessages() }, 0); - } - - // transports - function getValidChannels() { - var channels = []; - for (var i = 0; i < validChannels.length; ++i) { - var type = validChannels[i]; - if (window.location.hash.length > 0) { - if (window.location.hash != "#"+type) { - continue; - } - } - if ($ && $.browser.opera && type != 'shortpolling' && type != 'streaming') { - continue; - } - channels.push(type); - } - return channels; - } - function getBasicChannel() { - return getValidChannels()[0]; - } - - function getOtherChannels() { - return getValidChannels().slice(1); - } - - var officialChannel; - this.getTransportType = function() { - return (officialChannel ? officialChannel.name : "none"); - } - var validChannels = "%acceptableChannelTypes%"; - var canUseSubdomains = "%canUseSubdomains%"; - var activeChannels = {}; - var channelConstructors = { - shortpolling: ShortPollingChannel, - longpolling: LongPollingChannel, - streaming: StreamingChannel - } - - function describeTimeouts(timeouts) { - var out = []; - for (var i in timeouts) { - if (timeouts.hasOwnProperty(i)) { - out.push(i+": "+(timeouts[i] == timeout_noop ? "unset" : "set")); - } - } - return "{ "+out.join(", ")+" }"; - } - - var channelSeq = 1; - function notifyConnect(channel) { - timeouts.connect(); - if (! officialChannel || channel.weight > officialChannel.weight) { - log("switching to use channel: "+channel.name); - var oldChannel = officialChannel; - officialChannel = channel; - setTimeout(function() { - postSingleMessageNow(true, "useChannel:"+(channelSeq++)+":"+channel.name, false, false, function(success, msg) { - if (success) { - if (oldChannel) { - oldChannel.disconnect(); - } else { - // there was no old channel, so try connecting the other channels. - doOtherConnect(); - } - if (self.readyState != self.OPEN) { - self.readyState = self.OPEN; - self.onopen({}); - } else { - self.onhiccup({connected: true}); - } - } else { - doDisconnect({reconnect: true, reason: "Failed to select channel on server.", data: msg}); - } - }); - }, 0); - return true; - } else { - return false; - } - } - - function randomVar() { - return String(Math.round(Math.random()*1e12)); - } - - function channelPath() { - return "%contextPath%/channel?v="+version+"&r="+randomVar()+"&id="+id; - } - - function newRequestObject() { - var xmlhttp=false; - try { - xmlhttp = (window.ActiveXObject && new ActiveXObject("Msxml2.XMLHTTP")) - } catch (e) { - try { - xmlhttp = (window.ActiveXObject && new ActiveXObject("Microsoft.XMLHTTP")); - } catch (E) { - xmlhttp = false; - } - } - if (!xmlhttp && typeof XMLHttpRequest!='undefined') { - try { - xmlhttp = new XMLHttpRequest(); - } catch (e) { - xmlhttp=false; - } - } - if (!xmlhttp && window.createRequest) { - try { - xmlhttp = window.createRequest(); - } catch (e) { - xmlhttp=false; - } - } - return xmlhttp - } - - function DataFormatError(message) { - this.message = message; - } - - function readMessage(data, startIndex) { - if (! startIndex) startIndex = 0; - var sep = data.indexOf(":", startIndex); - if (sep < 0) return; // don't have all the bytes for this yet. - var chars = Number(data.substring(startIndex, sep)); - if (isNaN(chars)) - throw new DataFormatError("Bad length: "+data.substring(startIndex, sep)); - if (data.length < sep+1+chars) return; // don't have all the bytes for this yet. - var msg = data.substr(sep+1, chars); - return { message: msg, lastConsumedChar: sep+1+chars } - } - - function iframeReader(data, startIndex) { - if (startIndex == 0) - return { message: data, lastConsumedChar: data.length } - } - - function parseWireFormat(data, startIndex, reader) { - if (! startIndex) startIndex = 0; - var msgs = []; - var readThroughIndex = startIndex; - while (true) { - var msgObj = (reader || readMessage)(data, readThroughIndex) - if (! msgObj) break; - readThroughIndex = msgObj.lastConsumedChar; - var msg = msgObj.message; - var split = msg.split(":"); - if (split[0] == 'oob') { - msgs.push({oob: split.slice(1).join(":")}); - continue; - } - var seq = Number(split[0]); - if (isNaN(seq)) - throw new DataFormatError("Bad sequence number: "+split[0]); - var control = Number(split[1]); - if (isNaN(control)) - throw new DataFormatError("Bad control: "+split[1]); - var msgContent = split.slice(2).join(":"); - msgs.push({seqNumber: seq, isControl: (control == 1), content: msgContent}); - } - return { messages: msgs, lastConsumedChar: readThroughIndex } - } - - function handleMessages(data, cursor, channel, reader) { - try { - messages = parseWireFormat(data, cursor, reader); - } catch (e) { - if (e instanceof DataFormatError) { - log("Data format error: "+e.message); - hiccup(channel); - return; - } else { - log(e.toString()+" on line: "+e.lineNumber); - } - } - for (var i=0; i < messages.messages.length; i++) { - var oob = messages.messages[i].oob; - if (oob) { - if (oob == "restart-fail") { - doDisconnect({reconnect: true, reason: "Server restarted or socket timed out on server."}); - return; - } - } else { - if (! dataHandler(messages.messages[i])) - break; - } - } - return messages.lastConsumedChar; - } - - function ShortPollingChannel() { - this.weight = 0; - this.name = "shortpolling"; - - this.isConnected = false; - this.isClosed = false; - this.request; - this.clearRequest = function() { - if (this.request) { - this.request.abort(); - this.request = null; - } - } - this.timeouts = {}; - - this.describe = function() { - return "{ isConnected: "+this.isConnected+", isClosed: "+this.isClosed+", timeouts: "+describeTimeouts(this.timeouts)+", request: "+(this.request?"set":"not set")+" }" - } - - this.pollDataHandler = function(sc, response, request) { - if (request.readyState != 4) return; - if (this.timeouts.poll) this.timeouts.poll(); - var messages; - if (! this.isConnected) { - this.timeouts.connectAttempt(); - if (sc != 200) { - log(this.name+" connect failed: "+sc+" / "+response); - setTimeout(_wm(this, this.attemptConnect), 500); - return; - } - var msg = (response ? readMessage(response) : undefined); - if (msg && msg.message == "oob:ok") { - this.timeouts.initialConnect(); - this.isConnected = true; - log(this.name+" transport connected!"); - if (! notifyConnect(this)) { - // there are better options connected. - log(this.name+" transport not chosen for activation."); - this.disconnect(); - return; - } - this.doPoll(); - return; - } else { - log(this.name+" connect didn't get ok: "+sc+" / "+response); - setTimeout(_wm(this, this.attemptConnect), 500); - return; - } - } - var chars = handleMessages(request.responseText, 0, this); - if (sc != 200 || ((! chars) && this.emptyResponseBad)) { - hiccup(this); - } - setTimeout(_wm(this, this.doPoll), this.pollDelay); - this.clearRequest(); - } - - this.keepRetryingConnection = true; - this.cancelConnect = function() { - this.clearRequest(); - this.keepRetryingConnection = false; - } - this.cancelPoll = function() { - this.clearRequest(); - log("poll timed out."); - hiccup(this); - } - - this.doPoll = function() { - if (this.isClosed) return; - timeout(this.timeouts, "poll", this.pollTimeout, _wm(this, this.cancelPoll)); - this.request = - simpleXhr('GET', - channelPath()+"&channel="+this.name+"&seq="+lastReceivedSeqNumber+this.pollParams(), - true, undefined, _wm(this, this.pollDataHandler), this.xhrGenerator); - } - - this.pollParams = function() { - return ""; - } - this.pollTimeout = 5000; - this.pollDelay = 500; - - this.attemptConnect = function() { - if (! this.keepRetryingConnection) return; - log(this.name+" attempting connect"); - this.clearRequest(); - timeout(this.timeouts, "connectAttempt", 5000, _wm(this, this.attemptConnect)); - this.request = simpleXhr('GET', channelPath()+"&channel="+this.name+"&new=yes&create="+(socket.readyState == socket.OPEN ? "no" : "yes")+"&seq="+lastReceivedSeqNumber, - true, undefined, _wm(this, this.pollDataHandler), this.xhrGenerator); - } - this.connect = function() { - this.attemptConnect(); - timeout(this.timeouts, "initialConnect", 15000, _wm(this, this.cancelConnect)); - } - this.disconnect = function() { - log(this.name+" disconnected"); - this.isClosed = true; - this.clearRequest(); - } - } - - function StreamingChannel() { - this.weight = 2; - this.name = "streaming"; - var self = this; - - var isConnected = false; - var request; - function clearRequest() { - if (request) { - request.abort(); - request = null; - if (theStream) theStream = null; - if (ifrDiv) { - ifrDiv.innerHTML = ""; - ifrDiv = null; - } - } - } - var isClosed = false; - var timeouts = {}; - var cursor = 0; - - this.describe = function() { - return "{ isConnected: "+isConnected+", isClosed: "+isClosed+", timeouts: "+describeTimeouts(timeouts)+", request: "+(request?"set":"not set")+", cursor: "+cursor+" }"; - }; - - function connectOk() { - isConnected = true; - timeouts.initialConnect(); - if (! notifyConnect(self)) { - log("streaming transport not chosen for activation"); - self.disconnect(); - return; - } - } - - function streamDataHandler() { - if (timeouts.data) timeouts.data(); - if (isClosed) return; - try { - if (! request.responseText) return; - } catch (e) { return; } - if (! isConnected) { - var msg = readMessage(request.responseText, cursor); - if (! msg) return; - cursor = msg.lastReceivedSeqNumber; - if (msg.message == "oob:ok") { - connectOk(); - } else { - log("stream: incorrect channel connect message:"+msg.message); - self.disconnect(); - return; - } - } else { - cursor = handleMessages(request.responseText, cursor, self); - } - if (! request || request.readyState == 4) { - clearRequest(); - if (isConnected) { - log("stream connection unexpectedly closed."); - hiccup(self); - return; - } - } - timeout(timeouts, "data", 60*1000, function() { hiccup(self); }); - } - - function iframeDataHandler(data) { - if (isClosed) return; - if (! isConnected) { - if (data == "oob:ok") { - connectOk(); - } else { - log("iframe stream: unexpected data on connect - "+data); - } - } else { - handleMessages(data, 0, self, iframeReader); - } - } - - function cancelConnect() { - isClosed = true; - clearRequest(); - log("stream: failed to connect."); - } - - // IE Stuff. - var theStream; - var ifrDiv; - var iframeTestCount = 0; - function testIframe() { - var state; - try { - state = ifrDiv.firstChild.readyState; - } catch (e) { - hiccup(self); - return; - } - if (state == 'interactive' || iframeTestCount > 10) { - try { var tmp = ifrDiv.firstChild.contentWindow.document.getElementById("thebody") } - catch (e) { hiccup(self); } - } else { - iframeTestCount++; - setTimeout(testIframe, 500); - } - } - - this.connect = function() { - timeout(timeouts, "initialConnect", 15000, cancelConnect) - - if (canUseSubdomains) { - var streamurl = "//"+randomVar()+".comet."+host+channelPath()+"&channel=streaming&type=iframe&new=yes&create="+(socket.readyState == socket.OPEN ? "no" : "yes")+"&seq="+lastReceivedSeqNumber; - log("stream to: "+streamurl); - if ($ && $.browser.opera) { - // set up the opera stream; requires jquery because, why not? - ifrDiv = $('<div style="display: none;"></div>').get(0); - $('body').append(ifrDiv); - window.comet = { - pass_data: iframeDataHandler, - disconnect: function() { hiccup(self); } - } - $(ifrDiv).append($("<iframe src='"+streamurl+"'></iframe>")); - iframeTestCount = 0; - setTimeout(testIframe, 2000); - // if event-source supported disconnect notifications, fuck yeah we'd use it. -// theStream = $('<event-source>'); -// var streamurl = channelPath()+"&channel=streaming&type=opera&new=yes&create="+(socket.readyState == socket.OPEN ? "no" : "yes")+"&seq="+lastReceivedSeqNumber; -// theStream.get(0).addEventListener('message', function(event) { -// iframeDataHandler(event.data); -// }, false); -// theStream.attr('src', streamurl); - log("stream connect sent!"); - return; - } - try { // TODO: remove reference to both theStream and ifrDiv on unload! - theStream = (window.ActiveXObject && new ActiveXObject("htmlfile")); - if (theStream) { - theStream.open(); - theStream.write("<html><head><title>f<\/title><\/head><body>") - theStream.write("<s"+"cript>document.domain='"+document.domain+"';<\/s"+"cript>") - theStream.write("<\/body><\/html>") - theStream.close(); - ifrDiv = theStream.createElement("div") - theStream.appendChild(ifrDiv) - theStream.parentWindow.comet = { - pass_data: iframeDataHandler, - disconnect: function() { hiccup(self); } - } - ifrDiv.innerHTML = "<iframe src='"+streamurl+"'></iframe>"; - iframeTestCount = 0; - setTimeout(testIframe, 2000); - } - } catch (e) { - theStream = false - } - } else if ($ && $.browser.opera) { - // opera thinks it can do a normal stream, but it can't. - log("opera - not trying xhr"); - return; - } - // End IE Stuff. - if (! theStream) { - request = newRequestObject(); - request.open('get', channelPath()+"&channel=streaming&new=yes&create="+(socket.readyState == socket.OPEN ? "no" : "yes")+"&seq="+lastReceivedSeqNumber); - request.onreadystatechange = streamDataHandler; - try { - request.send(null); - } catch (e) { } - } - log("stream connect sent!"); - } - - this.disconnect = function() { - log("stream disconnected"); - isClosed = true; - clearRequest(); - } - log("new streamchannel"); - } - - // long-polling related stuff. - function iframePath(key) { - return "//"+key+".comet."+host+"%contextPath%/xhrXdFrame"; - } - - function createHiddenDiv() { - if (! document.getElementById('newcomethidden')) { - var d = document.createElement('div'); - d.setAttribute('id', 'newcomethidden'); - d.style.display = 'none'; - document.body.appendChild(d); - } - return document.getElementById('newcomethidden'); - } - - function ExtHostXHR(iframe) { - this.open = function(method, uri, async) { - this.method = method; - this.uri = uri; - this.async = async; - } - var headers = {}; - this.setRequestHeader = function(name, value) { - headers[name] = value; - } - this.send = function(data) { - var self = this; - this.xhr = iframe.iframe.contentWindow.doAction(this.method, this.uri, this.async, headers, data || null, function(status, response) { - self.readyState = 4; - self.status = status; - self.responseText = response; - self.onreadystatechange(); - }); - } - this.abort = function() { - if (this.xhr) - iframe.contentWindow.doAbort(this.xhr); - } - } - - function createRequestIframe(cb) { - var randomKey = randomVar(); - try { - var activeXControl = (window.ActiveXObject && new ActiveXObject("htmlfile")); - var htmlfileDiv; - if (activeXControl) { - activeXControl.open(); - activeXControl.write('<html><head><title>f</title></head><body>'); - activeXControl.write('<scr'+'ipt>document.domain=\''+document.domain+'\';</scr'+'ipt>'); - activeXControl.write('</body></html>'); - activeXControl.close(); - htmlfileDiv = activeXControl.createElement('div'); - activeXControl.appendChild(htmlfileDiv); - activeXControl.parentWindow["done_"+randomKey] = cb; - htmlfileDiv.innerHTML = "<iframe src='"+iframePath(randomKey)+"'></iframe>"; - return {iframe: htmlfileDiv.firstChild /* should be an iframe */, axc: activeXControl, div: htmlfileDiv}; - } - } catch (e) { - activeXControl = false; - } - log("Not using IE setup."); - var requestIframe = document.createElement('iframe'); - createHiddenDiv().appendChild(requestIframe); - window["done_"+randomKey] = function() { try { delete window["done_"+randomKey]; } catch (e) { }; cb(); } - requestIframe.src = iframePath(randomKey); - return {iframe: requestIframe}; - } - - function createIframeRequestObject() { - if (! longPollingIFrame) throw Error("WebSocket isn't properly set up!"); - return new ExtHostXHR(longPollingIFrame); - } - - var longPollingIFrame; - function LongPollingChannel() { - ShortPollingChannel.apply(this); // sets up other state. - this.weight = 1; - this.name = "longpolling"; - - this.pollDelay = 0; - this.pollTimeout = 15000; - this.pollParams = function() { - return "&timeout="+(this.pollTimeout-5000); - } - var connect = this.connect; - this.connect = function() { - if (! longPollingIFrame) { - longPollingIFrame = - createRequestIframe(_wm(this, connect)); // specifically *not* this.connect. we want the old one! - } else { - connect.apply(this); - } - } - this.xhrGenerator = createIframeRequestObject; - this.emptyResponseBad = true; - } -}
\ No newline at end of file diff --git a/trunk/infrastructure/net.appjet.ajstdlib/streaming-iframe.html b/trunk/infrastructure/net.appjet.ajstdlib/streaming-iframe.html deleted file mode 100644 index 3bdb5c4..0000000 --- a/trunk/infrastructure/net.appjet.ajstdlib/streaming-iframe.html +++ /dev/null @@ -1,76 +0,0 @@ -<html> -<head> -<script> -function createRequestObject() { - var xmlhttp=false; - /*@cc_on @*/ - /*@if (@_jscript_version >= 5) - try { - xmlhttp = new ActiveXObject("Msxml2.XMLHTTP"); - } catch (e) { - try { - xmlhttp = new ActiveXObject("Microsoft.XMLHTTP"); - } catch (E) { - xmlhttp = false; - } - } - @end @*/ - if (!xmlhttp && typeof XMLHttpRequest!='undefined') { - try { - xmlhttp = new XMLHttpRequest(); - } catch (e) { - xmlhttp=false; - } - } - if (!xmlhttp && window.createRequest) { - try { - xmlhttp = window.createRequest(); - } catch (e) { - xmlhttp=false; - } - } - return xmlhttp -} -var host = window.location.host; -var oldDomain = document.domain; -var newDomain = oldDomain.substring(oldDomain.indexOf(".", oldDomain.indexOf(".")+1)+1); - -function doAction(method, uri, async, headers, body, cb) { - try { - document.domain = oldDomain; - } catch (e) { } - var req = createRequestObject(); - req.open(method, '//'+host+uri, async); - for (var i in headers) { - req.setRequestHeader(i, headers[i]); - } - req.onreadystatechange = function() { - if (req.readyState == 4) { - try { - document.domain = newDomain; - cb(req.status, req.responseText); - } catch (e) { - // yikes. well, hopefully a timeout will notice this error. - } - } - } - req.send(body); -} -function doAbort(xhr) { - try { - document.domain = oldDomain; - } catch (e) { } - xhr.abort(); -} -document.domain = newDomain; -window.onload = function() { - var doneKey = 'done_'+oldDomain.split(".", 2)[0]; - setTimeout(function() { - window.parent[doneKey](); - }, 0); -} -</script> -</head> -<body> -</body> -</html> diff --git a/trunk/infrastructure/net.appjet.ajstdlib/streaming.scala b/trunk/infrastructure/net.appjet.ajstdlib/streaming.scala deleted file mode 100644 index fbff137..0000000 --- a/trunk/infrastructure/net.appjet.ajstdlib/streaming.scala +++ /dev/null @@ -1,892 +0,0 @@ -/** - * Copyright 2009 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS-IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package net.appjet.ajstdlib; - -import scala.collection.mutable.{Queue, HashMap, SynchronizedMap, ArrayBuffer}; -import javax.servlet.http.{HttpServletRequest, HttpServletResponse, HttpServlet}; -import org.mortbay.jetty.servlet.{ServletHolder, Context}; -import org.mortbay.jetty.{HttpConnection, Handler, RetryRequest}; -import org.mortbay.jetty.nio.SelectChannelConnector; -import org.mortbay.io.nio.SelectChannelEndPoint; -import org.mortbay.util.ajax.{ContinuationSupport, Continuation}; - -import java.util.{Timer, TimerTask}; -import java.lang.ref.WeakReference; - -import org.mozilla.javascript.{Context => JSContext, Scriptable}; - -import net.appjet.oui._; -import net.appjet.oui.Util.enumerationToRichEnumeration; -import net.appjet.common.util.HttpServletRequestFactory; - -trait SocketConnectionHandler { - def message(sender: StreamingSocket, data: String, req: HttpServletRequest); - def connect(socket: StreamingSocket, req: HttpServletRequest); - def disconnect(socket: StreamingSocket, req: HttpServletRequest); -} - -object SocketManager { - val sockets = new HashMap[String, StreamingSocket] with SynchronizedMap[String, StreamingSocket]; - val handler = new SocketConnectionHandler { - val cometLib = new FixedDiskLibrary(new SpecialJarOrNotFile(config.ajstdlibHome, "oncomet.js")); - def cometExecutable = cometLib.executable; - - def message(socket: StreamingSocket, data: String, req: HttpServletRequest) { - val t1 = profiler.time; -// println("Message from: "+socket.id+": "+data); - val runner = ScopeReuseManager.getRunner; - val ec = ExecutionContext(new RequestWrapper(req), new ResponseWrapper(null), runner); - ec.attributes("cometOperation") = "message"; - ec.attributes("cometId") = socket.id; - ec.attributes("cometData") = data; - ec.attributes("cometSocket") = socket; - net.appjet.oui.execution.execute( - ec, - (sc: Int, msg: String) => - throw new HandlerException(sc, msg, null), - () => {}, - () => { ScopeReuseManager.freeRunner(runner); }, - Some(cometExecutable)); - cometlatencies.register(((profiler.time-t1)/1000).toInt); - } - def connect(socket: StreamingSocket, req: HttpServletRequest) { -// println("Connect on: "+socket); - val runner = ScopeReuseManager.getRunner; - val ec = ExecutionContext(new RequestWrapper(req), new ResponseWrapper(null), runner); - ec.attributes("cometOperation") = "connect"; - ec.attributes("cometId") = socket.id; - ec.attributes("cometSocket") = socket; - net.appjet.oui.execution.execute( - ec, - (sc: Int, msg: String) => - throw new HandlerException(sc, msg, null), - () => {}, - () => { ScopeReuseManager.freeRunner(runner); }, - Some(cometExecutable)); - } - def disconnect(socket: StreamingSocket, req: HttpServletRequest) { - val toRun = new Runnable { - def run() { - val runner = ScopeReuseManager.getRunner; - val ec = ExecutionContext(new RequestWrapper(req), new ResponseWrapper(null), runner); - ec.attributes("cometOperation") = "disconnect"; - ec.attributes("cometId") = socket.id; - ec.attributes("cometSocket") = socket; - net.appjet.oui.execution.execute( - ec, - (sc: Int, msg: String) => - throw new HandlerException(sc, msg, null), - () => {}, - () => { ScopeReuseManager.freeRunner(runner); }, - Some(cometExecutable)); - } - } - main.server.getThreadPool().dispatch(toRun); - } - } - def apply(id: String, create: Boolean) = { - if (create) { - Some(sockets.getOrElseUpdate(id, new StreamingSocket(id, handler))); - } else { - if (id == null) - error("bad id: "+id); - sockets.get(id); - } - } - class HandlerException(val sc: Int, val msg: String, val cause: Exception) - extends RuntimeException("An error occurred while handling a request: "+sc+" - "+msg, cause); -} - -// And this would be the javascript interface. Whee. -object Comet extends CometSupport.CometHandler { - def init() { - CometSupport.cometHandler = this; - context.start(); - } - - val acceptableTransports = { - val t = new ArrayBuffer[String]; - if (! config.disableShortPolling) { - t += "shortpolling"; - } - if (config.transportUseWildcardSubdomains) { - t += "longpolling"; - } - t += "streaming"; - t.mkString("['", "', '", "']"); - } - - - val servlet = new StreamingSocketServlet(); - val holder = new ServletHolder(servlet); - val context = new Context(null, "/", Context.NO_SESSIONS | Context.NO_SECURITY); - context.addServlet(holder, "/*"); - context.setMaxFormContentSize(1024*1024); - - def handleCometRequest(req: HttpServletRequest, res: HttpServletResponse) { - context.handle(req.getRequestURI().substring(config.transportPrefix.length), req, res, Handler.FORWARD); - } - - lazy val ccLib = new FixedDiskResource(new JarOrNotFile(config.ajstdlibHome, "streaming-client.js") { - override val classBase = "/net/appjet/ajstdlib/"; - override val fileSep = "/../../net.appjet.ajstdlib/"; - }); - def clientCode(contextPath: String, acceptableChannelTypes: String) = { - ccLib.contents.replaceAll("%contextPath%", contextPath).replaceAll("\"%acceptableChannelTypes%\"", acceptableChannelTypes).replaceAll("\"%canUseSubdomains%\"", if (config.transportUseWildcardSubdomains) "true" else "false"); - } - def clientMTime = ccLib.fileLastModified; - - lazy val ccFrame = new FixedDiskResource(new JarOrNotFile(config.ajstdlibHome, "streaming-iframe.html") { - override val classBase = "/net/appjet/ajstdlib/"; - override val fileSep = "/../../net.appjet.ajstdlib/"; - }); - def frameCode = { - if (! config.devMode) - ccFrame.contents.replace("<head>\n<script>", """<head> - <script> - window.onerror = function() { /* silently drop errors */ } - </script> - <script>"""); - else - ccFrame.contents; - } - - - // public - def connections(ec: ExecutionContext): Scriptable = { - JSContext.getCurrentContext().newArray(ec.runner.globalScope, SocketManager.sockets.keys.toList.toArray[Object]); - } - - // public - def connectionStatus = { - val m = new HashMap[String, Int]; - for (socket <- SocketManager.sockets.values) { - val key = socket.currentChannel.map(_.kind.toString()).getOrElse("(unconnected)"); - m(key) = m.getOrElse(key, 0) + 1; - } - m; - } - - // public - def getNumCurrentConnections = SocketManager.sockets.size; - - // public - def write(id: String, msg: String) { - SocketManager.sockets.get(id).foreach(_.sendMessage(false, msg)); - } - - // public - def isConnected(id: String): java.lang.Boolean = { - SocketManager.sockets.contains(id); - } - - // public - def getTransportType(id: String): String = { - SocketManager.sockets.get(id).map(_.currentChannel.map(_.kind.toString()).getOrElse("none")).getOrElse("none"); - } - - // public - def disconnect(id: String) { - SocketManager.sockets.get(id).foreach(x => x.close()); - } - - // public - def setAttribute(ec: ExecutionContext, id: String, key: String, value: String) { - ec.attributes.get("cometSocket").map(x => Some(x.asInstanceOf[StreamingSocket])).getOrElse(SocketManager.sockets.get(id)) - .foreach(_.attributes(key) = value); - } - // public - def getAttribute(ec: ExecutionContext, id: String, key: String): String = { - ec.attributes.get("cometSocket").map(x => Some(x.asInstanceOf[StreamingSocket])).getOrElse(SocketManager.sockets.get(id)) - .map(_.attributes.getOrElse(key, null)).getOrElse(null); - } - - // public - def getClientCode(ec: ExecutionContext) = { - clientCode(config.transportPrefix, acceptableTransports); - } - def getClientMTime(ec: ExecutionContext) = clientMTime; -} - -class StreamingSocket(val id: String, handler: SocketConnectionHandler) { - var hasConnected = false; - var shutdown = false; - var killed = false; - var currentChannel: Option[Channel] = None; - val activeChannels = new HashMap[ChannelType.Value, Channel] - with SynchronizedMap[ChannelType.Value, Channel]; - - lazy val attributes = new HashMap[String, String] with SynchronizedMap[String, String]; - - def channel(typ: String, create: Boolean, subType: String): Option[Channel] = { - val channelType = ChannelType.valueOf(typ); - if (channelType.isEmpty) { - streaminglog(Map( - "type" -> "error", - "error" -> "unknown channel type", - "channelType" -> channelType)); - None; - } else if (create) { - Some(activeChannels.getOrElseUpdate(channelType.get, Channels.createNew(channelType.get, this, subType))); - } else { - activeChannels.get(channelType.get); - } - } - - val outgoingMessageQueue = new Queue[SocketMessage]; - val unconfirmedMessages = new HashMap[Int, SocketMessage]; - - var lastSentSeqNumber = 0; - var lastConfirmedSeqNumber = 0; - - // external API - def sendMessage(isControl: boolean, body: String) { - if (hasConnected && ! shutdown) { - synchronized { - lastSentSeqNumber += 1; - val msg = new SocketMessage(lastSentSeqNumber, isControl, body); - outgoingMessageQueue += msg; - unconfirmedMessages(msg.seq) = msg; - } - currentChannel.foreach(_.messageWaiting()); - } - } - def close() { - synchronized { - sendMessage(true, "kill"); - shutdown = true; - Channels.timer.schedule(new TimerTask { - def run() { - kill("server request, timeout"); - } - }, 15000); - } - } - - var creatingRequest: Option[HttpServletRequest] = None; - // internal API - def kill(reason: String) { - synchronized { - if (! killed) { - streaminglog(Map( - "type" -> "event", - "event" -> "connection-killed", - "connection" -> id, - "reason" -> reason)); - killed = true; - SocketManager.sockets -= id; - activeChannels.foreach(_._2.close()); - currentChannel = None; - if (hasConnected) { - handler.disconnect(this, creatingRequest.getOrElse(null)); - } - } - } - } - def receiveMessage(body: String, req: HttpServletRequest) { -// println("Message received on "+id+": "+body); - handler.message(this, body, req); - } - def getWaitingMessage(channel: Channel): Option[SocketMessage] = { - synchronized { - if (currentChannel.isDefined && currentChannel.get == channel && - ! outgoingMessageQueue.isEmpty) { - Some(outgoingMessageQueue.dequeue); - } else { - None; - } - } - } - def getUnconfirmedMessages(channel: Channel): Collection[SocketMessage] = { - synchronized { - if (currentChannel.isDefined && currentChannel.get == channel) { - for (i <- lastConfirmedSeqNumber+1 until lastSentSeqNumber+1) - yield unconfirmedMessages(i); - } else { - List[SocketMessage](); - } - } - } - def updateConfirmedSeqNumber(channel: Channel, received: Int) { - synchronized { - if (received > lastConfirmedSeqNumber && (channel == null || (currentChannel.isDefined && channel == currentChannel.get))) { - val oldConfirmed = lastConfirmedSeqNumber; - lastConfirmedSeqNumber = received; - for (i <- oldConfirmed+1 until lastConfirmedSeqNumber+1) { // inclusive! - unconfirmedMessages -= i; - } - } - } - } - - var lastChannelUpdate = 0; - def useChannel(seqNo: Int, channelType0: String, req: HttpServletRequest) = synchronized { - if (seqNo <= lastChannelUpdate) false else { - lastChannelUpdate = seqNo; - val channelType = ChannelType.valueOf(channelType0); - if (channelType.isDefined) { - val channel = activeChannels.get(channelType.get); - if (channel.isDefined) { - if (! hasConnected) { - hasConnected = true; - creatingRequest = Some(HttpServletRequestFactory.createRequest(req)); - handler.connect(this, req); - } - currentChannel = channel; -// println("switching "+id+" to channel: "+channelType0); - if (currentChannel.get.isConnected) { - revive(channel.get); - } else { - hiccup(channel.get); - } - currentChannel.get.messageWaiting(); - true; - } else - false; - } else - false; - } - } -// def handleReceivedMessage(seq: Int, data: String) { -// synchronized { -// handler.message(this, data) -// // TODO(jd): add client->server sequence numbers. -// // if (seq == lastReceivedSeqNumber+1){ -// // lastReceivedSeqNumber = seq; -// // handler.message(this, data); -// // } else { -// // // handle error. -// // } -// } -// } - def hiccup(channel: Channel) = synchronized { - if (currentChannel.isDefined && channel == currentChannel.get) { -// println("hiccuping: "+id); - scheduleTimeout(); - } - } - def revive(channel: Channel) = synchronized { - if (currentChannel.isDefined && channel == currentChannel.get) { -// println("reviving: "+id); - cancelTimeout(); - } - } - def prepareForReconnect() = synchronized { -// println("client-side hiccup: "+id); - activeChannels.foreach(_._2.close()); - activeChannels.clear(); - currentChannel = None; - scheduleTimeout(); - } - - // helpers - var timeoutTask: TimerTask = null; - - def scheduleTimeout() { - if (timeoutTask != null) return; - val p = new WeakReference(this); - timeoutTask = new TimerTask { - def run() { - val socket = p.get(); - if (socket != null) { - socket.kill("timeout"); - } - } - } - Channels.timer.schedule(timeoutTask, 15*1000); - } - def cancelTimeout() { - if (timeoutTask != null) - timeoutTask.cancel(); - timeoutTask = null; - } - scheduleTimeout(); - - streaminglog(Map( - "type" -> "event", - "event" -> "connection-created", - "connection" -> id)); -} - -object ChannelType extends Enumeration("shortpolling", "longpolling", "streaming") { - val ShortPolling, LongPolling, Streaming = Value; -} - -object Channels { - def createNew(typ: ChannelType.Value, socket: StreamingSocket, subType: String): Channel = { - typ match { - case ChannelType.ShortPolling => new ShortPollingChannel(socket); - case ChannelType.LongPolling => new LongPollingChannel(socket); - case ChannelType.Streaming => { - subType match { - case "iframe" => new StreamingChannel(socket) with IFrameChannel; - case "opera" => new StreamingChannel(socket) with OperaChannel; - case _ => new StreamingChannel(socket); - } - } - } - } - - val timer = new Timer(true); -} - -class SocketMessage(val seq: Int, val isControl: Boolean, val body: String) { - def payload = seq+":"+(if (isControl) "1" else "0")+":"+body; -} - -trait Channel { - def messageWaiting(); - def close(); - def handle(req: HttpServletRequest, res: HttpServletResponse); - def isConnected: Boolean; - - def kind: ChannelType.Value; - def sendRestartFailure(ec: ExecutionContext); -} - -trait XhrChannel extends Channel { - def wrapBody(msg: String) = msg.length+":"+msg; - - // wire format: msgLength:seq:[01]:msg - def wireFormat(msg: SocketMessage) = wrapBody(msg.payload); - def controlMessage(data: String) = wrapBody("oob:"+data); - - def sendRestartFailure(ec: ExecutionContext) { - ec.response.write(controlMessage("restart-fail")); - } -} - -// trait IFrameChannel extends Channel { -// def wireFormat(msg: SocketMessage) -// } - -class ShortPollingChannel(val socket: StreamingSocket) extends Channel with XhrChannel { - def kind = ChannelType.ShortPolling; - - def messageWaiting() { - // do nothing. - } - def close() { - // do nothing - } - def isConnected = false; - - def handle(req: HttpServletRequest, res: HttpServletResponse) { - val ec = req.getAttribute("executionContext").asInstanceOf[ExecutionContext]; - val out = new StringBuilder(); - socket.synchronized { - socket.revive(this); - if (req.getParameter("new") == "yes") { - out.append(controlMessage("ok")); - } else { - val lastReceivedSeq = java.lang.Integer.parseInt(req.getParameter("seq")); - socket.updateConfirmedSeqNumber(this, lastReceivedSeq); - for (msg <- socket.getUnconfirmedMessages(this)) { - out.append(wireFormat(msg)); - } - // ALL MESSAGES ARE UNCONFIRMED AT THIS POINT! JUST CLEAR QUEUE. - var msg = socket.getWaitingMessage(this); - while (msg.isDefined) { - msg = socket.getWaitingMessage(this); - } - } - } -// println("Writing to "+socket.id+": "+out.toString); - ec.response.write(out.toString); - socket.synchronized { - socket.hiccup(this); - } - } -} - -trait IFrameChannel extends StreamingChannel { - override def wrapBody(msgBody: String) = { - val txt = "<script type=\"text/javascript\">p('"+ - msgBody.replace("\\","\\\\").replace("'", "\\'")+"');</script>"; - if (txt.length < 256) - String.format("%256s", txt); - else - txt; - } - - def header(req: HttpServletRequest) = { - val document_domain = - "\""+req.getHeader("Host").split("\\.").slice(2).mkString(".").split(":")(0)+"\""; - """<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" -"http://www.w3.org/TR/html4/strict.dtd"> -<html><head><title>f</title></head><body id="thebody" onload="(!parent.closed)&&d()"><script type="text/javascript">document.domain = """+document_domain+"""; -var p = function(data) { try { parent.comet.pass_data } catch (err) { /* failed to pass data. no recourse. */ } }; -var d = parent.comet.disconnect;"""+(if(!config.devMode)"\nwindow.onerror = function() { /* silently drop errors */ }\n" else "")+"</script>"; // " - damn textmate mode! - } - - override def sendRestartFailure(ec: ExecutionContext) { - ec.response.write(header(ec.request.req)); - ec.response.write(controlMessage("restart-fail")); - } - - override def handleNewConnection(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) { - super.handleNewConnection(req, res, out); - res.setContentType("text/html"); - out.append(header(req)); - } -} - -trait OperaChannel extends StreamingChannel { - override def wrapBody(msgBody: String) = { - "Event: message\ndata: "+msgBody+"\n\n"; - } - override def handleNewConnection(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) { - super.handleNewConnection(req, res, out); - res.setContentType("application/x-dom-event-stream"); - } -} - -class StreamingChannel(val socket: StreamingSocket) extends Channel with XhrChannel { - def kind = ChannelType.Streaming; - - var c: Option[SelectChannelConnector.RetryContinuation] = None; - var doClose = false; - - def messageWaiting() { - main.server.getThreadPool().dispatch(new Runnable() { - def run() { - socket.synchronized { - c.filter(_.isPending()).foreach(_.resume()); - } - } - }); - } - - def setSequenceNumberIfAppropriate(req: HttpServletRequest) { - if (c.get.isNew) { - val lastReceivedSeq = java.lang.Integer.parseInt(req.getParameter("seq")); - socket.updateConfirmedSeqNumber(this, lastReceivedSeq); - } - } - - def sendHandshake(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) { - out.append(controlMessage("ok")); - } - - def sendUnconfirmedMessages(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) { - for (msg <- socket.getUnconfirmedMessages(this)) { - out.append(wireFormat(msg)); - } - } - - def sendWaitingMessages(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) { - var msg = socket.getWaitingMessage(this); - while (msg.isDefined) { - out.append(wireFormat(msg.get)); - msg = socket.getWaitingMessage(this); - } - } - - def handleUnexpectedDisconnect(req: HttpServletRequest, res: HttpServletResponse, ep: KnowsAboutDispatch) { - socket.synchronized { - socket.hiccup(this); - } - ep.close(); - } - - def writeAndFlush(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder, ep: KnowsAboutDispatch) { -// println("Writing to "+socket.id+": "+out.toString); - res.getWriter.print(out.toString); - res.getWriter.flush(); - } - - def suspendIfNecessary(req: HttpServletRequest, res: HttpServletResponse, - out: StringBuilder, ep: KnowsAboutDispatch) { - scheduleKeepalive(50*1000); - ep.undispatch(); - c.get.suspend(0); - } - - def sendKeepaliveIfNecessary(out: StringBuilder, sendKeepalive: Boolean) { - if (out.length == 0 && sendKeepalive) { - out.append(controlMessage("keepalive")); - } - } - - def shouldHandshake(req: HttpServletRequest, res: HttpServletResponse) = c.get.isNew; - - var sendKeepalive = false; - var keepaliveTask: TimerTask = null; - def scheduleKeepalive(timeout: Int) { - if (keepaliveTask != null) { - keepaliveTask.cancel(); - } - val p = new WeakReference(this); - keepaliveTask = new TimerTask { - def run() { - val channel = p.get(); - if (channel != null) { - channel.synchronized { - channel.sendKeepalive = true; - channel.messageWaiting(); - } - } - } - } - Channels.timer.schedule(keepaliveTask, timeout); - } - - def handleNewConnection(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) { - req.setAttribute("StreamingSocketServlet_channel", this); - res.setHeader("Connection", "close"); - for ((k, v) <- Util.noCacheHeaders) { res.setHeader(k, v); } // maybe this will help with proxies? - res.setContentType("text/messages; charset=utf-8"); - } - - def handle(req: HttpServletRequest, res: HttpServletResponse) { - val ec = req.getAttribute("executionContext").asInstanceOf[ExecutionContext]; - val ep = HttpConnection.getCurrentConnection.getEndPoint.asInstanceOf[KnowsAboutDispatch]; - val out = new StringBuilder; - try { - socket.synchronized { - val sendKeepaliveNow = sendKeepalive; - sendKeepalive = false; - if (keepaliveTask != null) { - keepaliveTask.cancel(); - keepaliveTask = null; - } - c = Some(ContinuationSupport.getContinuation(req, socket).asInstanceOf[SelectChannelConnector.RetryContinuation]); - setSequenceNumberIfAppropriate(req); - if (doClose) { - ep.close(); - return; - } - if (c.get.isNew) { - handleNewConnection(req, res, out); - } else { - c.get.suspend(-1); - if (ep.isDispatched) { - handleUnexpectedDisconnect(req, res, ep); - return; - } - } - if (shouldHandshake(req, res)) { -// println("new stream request: "+socket.id); - sendHandshake(req, res, out); - sendUnconfirmedMessages(req, res, out); - } - sendWaitingMessages(req, res, out); - sendKeepaliveIfNecessary(out, sendKeepaliveNow); - suspendIfNecessary(req, res, out, ep); - } - } finally { - writeAndFlush(req, res, out, ep); - } - } - - def close() { - doClose = true; - messageWaiting(); - } - - def isConnected = ! doClose; -} - -class LongPollingChannel(socket: StreamingSocket) extends StreamingChannel(socket) { -// println("creating longpoll!"); - override def kind = ChannelType.LongPolling; - - override def shouldHandshake(req: HttpServletRequest, res: HttpServletResponse) = - req.getParameter("new") == "yes"; - - override def sendHandshake(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) { -// println("sending handshake"); - out.append(controlMessage("ok")); - } - - override def suspendIfNecessary(req: HttpServletRequest, res: HttpServletResponse, - out: StringBuilder, ep: KnowsAboutDispatch) { - if (out.length == 0) { -// println("suspending longpoll: "+socket.id); - val to = java.lang.Integer.parseInt(req.getParameter("timeout")); -// println("LongPoll scheduling keepalive for: "+to); - scheduleKeepalive(to); - ep.undispatch(); - c.get.suspend(0); - } - } - - override def writeAndFlush(req: HttpServletRequest, res: HttpServletResponse, - out: StringBuilder, ep: KnowsAboutDispatch) { - if (out.length > 0) { -// println("Writing to "+socket.id+": "+out.toString); -// println("writing and flushing longpoll") - val ec = req.getAttribute("executionContext").asInstanceOf[ExecutionContext]; - for ((k, v) <- Util.noCacheHeaders) { ec.response.setHeader(k, v); } // maybe this will help with proxies? -// println("writing: "+out); - ec.response.write(out.toString); - socket.synchronized { - socket.hiccup(this); - c = None; - } - } - } - - override def handleNewConnection(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) { - socket.revive(this); - req.setAttribute("StreamingSocketServlet_channel", this); - } - - override def isConnected = socket.synchronized { - c.isDefined; - } -} - -class StreamingSocketServlet extends HttpServlet { - val version = 2; - - override def doGet(req: HttpServletRequest, res: HttpServletResponse) { -// describeRequest(req); - val ec = req.getAttribute("executionContext").asInstanceOf[ExecutionContext]; - try { - if (req.getPathInfo() == "/js/client.js") { - val contextPath = config.transportPrefix; - val acceptableTransports = Comet.acceptableTransports; - ec.response.setContentType("application/x-javascript"); - ec.response.write(Comet.clientCode(contextPath, acceptableTransports)); - } else if (req.getPathInfo() == "/xhrXdFrame") { - ec.response.setContentType("text/html; charset=utf-8"); - ec.response.write(Comet.frameCode); - } else { - val v = req.getParameter("v"); - if (v == null || java.lang.Integer.parseInt(v) != version) { - res.sendError(HttpServletResponse.SC_BAD_REQUEST, "bad version number!"); - return; - } - val existingChannel = req.getAttribute("StreamingSocketServlet_channel"); - if (existingChannel != null) { - existingChannel.asInstanceOf[Channel].handle(req, res); - } else { - val socketId = req.getParameter("id"); - val channelType = req.getParameter("channel"); - val isNew = req.getParameter("new") == "yes"; - val shouldCreateSocket = req.getParameter("create") == "yes"; - val subType = req.getParameter("type"); - val channel = SocketManager(socketId, shouldCreateSocket).map(_.channel(channelType, isNew, subType)).getOrElse(None); - if (channel.isDefined) { - channel.get.handle(req, res); - } else { - streaminglog(Map( - "type" -> "event", - "event" -> "restart-failure", - "connection" -> socketId)); - val failureChannel = ChannelType.valueOf(channelType).map(Channels.createNew(_, null, subType)); - if (failureChannel.isDefined) { - failureChannel.get.sendRestartFailure(ec); - } else { - ec.response.setStatusCode(HttpServletResponse.SC_NOT_FOUND); - ec.response.write("So such socket, and/or unknown channel type: "+channelType); - } - } - } - } - } catch { - case e: RetryRequest => throw e; - case t: Throwable => { - exceptionlog("A comet error occurred: "); - exceptionlog(t); - ec.response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); - ec.response.write(t.getMessage()); - } - } - } - - def describeRequest(req: HttpServletRequest) { - println(req.getMethod+" on "+req.getRequestURI()+"?"+req.getQueryString()); - for (pname <- - req.getParameterNames.asInstanceOf[java.util.Enumeration[String]]) { - println(" "+pname+" -> "+req.getParameterValues(pname).mkString("[", ",", "]")); - } - } - - override def doPost(req: HttpServletRequest, res: HttpServletResponse) { - val v = req.getParameter("v"); - if (v == null || java.lang.Integer.parseInt(v) != version) { - res.sendError(HttpServletResponse.SC_BAD_REQUEST, "bad version number!"); - return; - } - val ec = req.getAttribute("executionContext").asInstanceOf[ExecutionContext]; - val socketId = req.getParameter("id"); - val socket = SocketManager(socketId, false); - -// describeRequest(req); - - if (socket.isEmpty) { - ec.response.write("restart-fail"); - streaminglog(Map( - "type" -> "event", - "event" -> "restart-failure", - "connection" -> socketId)); -// println("socket restart-fail: "+socketId); - } else { - val seq = java.lang.Integer.parseInt(req.getParameter("seq")); - socket.get.updateConfirmedSeqNumber(null, seq); - val messages = req.getParameterValues("m"); - val controlMessages = req.getParameterValues("oob"); - try { - if (messages != null) - for (msg <- messages) socket.get.receiveMessage(msg, req); - if (controlMessages != null) - for (msg <- controlMessages) { -// println("Control message from "+socket.get.id+": "+msg); - msg match { - case "hiccup" => { - streaminglog(Map( - "type" -> "event", - "event" -> "hiccup", - "connection" -> socketId)); - socket.get.prepareForReconnect(); - } - case _ => { - if (msg.startsWith("useChannel")) { - val msgParts = msg.split(":"); - socket.get.useChannel(java.lang.Integer.parseInt(msgParts(1)), msgParts(2), req); - } else if (msg.startsWith("kill")) { - socket.get.kill("client request: "+msg.substring(Math.min(msg.length, "kill:".length))); - } else { - streaminglog(Map( - "type" -> "error", - "error" -> "unknown control message", - "connection" -> socketId, - "message" -> msg)); - } - } - } - } - ec.response.write("ok"); - } catch { - case e: SocketManager.HandlerException => { - exceptionlog(e); - ec.response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); - ec.response.write(e.getMessage()); - // log these? - } - case t: Throwable => { - // shouldn't happen... - exceptionlog(t); - ec.response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); - ec.response.write(t.getMessage()); - } - } - } - } -} diff --git a/trunk/infrastructure/net.appjet.ajstdlib/timer.scala b/trunk/infrastructure/net.appjet.ajstdlib/timer.scala deleted file mode 100644 index dac8fb6..0000000 --- a/trunk/infrastructure/net.appjet.ajstdlib/timer.scala +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Copyright 2009 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS-IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package net.appjet.ajstdlib; - -import scala.collection.mutable.{HashMap,ListBuffer}; -import java.util.concurrent.locks.ReentrantLock; - -object timer { - - var _timings = new HashMap[String,ListBuffer[double]]; - var _lock = new ReentrantLock; - var _callstack = new ThreadLocal[ListBuffer[String]]; - - def start(opname: String) = { - var _localcallstack = _callstack.get(); - if (_localcallstack == null) { - _callstack.set(new ListBuffer[String]); - _localcallstack = _callstack.get(); - } - _localcallstack += opname; - var _oplabel = _localcallstack.mkString("."); - val startTime: long = System.nanoTime(); - - new { - def done() { - val elapsedTimeMs: double = (System.nanoTime() - startTime) / 1.0e6; - - _lock.lock(); - try { - var times = _timings.getOrElse(_oplabel, new ListBuffer[double]); - /* - if (times.size > 100000) { - times = new ListBuffer[double]; - }*/ - times += elapsedTimeMs; - _timings.put(_oplabel, times); - _localcallstack.remove(_localcallstack.length-1); - } finally { - _lock.unlock(); - } - } - } - } - - def getOpNames(): Array[String] = { - _lock.lock(); - try { - return _timings.keys.toList.toArray; - } finally { - _lock.unlock(); - } - } - - def getStats(opname: String): Array[double] = { - _lock.lock(); - - try { - var times:ListBuffer[double] = _timings(opname); - var total = times.foldRight(0.0)(_ + _); - return Array(times.size, total, (total / times.size)); - } finally { - _lock.unlock(); - } - } - - def reset() { - _lock.lock(); - _timings = new HashMap[String,ListBuffer[double]]; - _lock.unlock(); - } -} |