diff options
author | Egil Moeller <egil.moller@freecode.no> | 2010-03-21 23:33:06 +0100 |
---|---|---|
committer | Egil Moeller <egil.moller@freecode.no> | 2010-03-21 23:33:06 +0100 |
commit | d56b9b3b82cdebcaeb00eec0fcb4326ad21adaa8 (patch) | |
tree | b1dfe31956f3fc86e3408f1efac5e12acf65b11a /infrastructure/net.appjet.ajstdlib | |
parent | c1894c8e0a52f4e3d2f89fa92f0066bbf0fcf1b1 (diff) | |
parent | 103d4926ae6c61824dc0b48be7bf66f08830ed47 (diff) | |
download | etherpad-d56b9b3b82cdebcaeb00eec0fcb4326ad21adaa8.tar.gz etherpad-d56b9b3b82cdebcaeb00eec0fcb4326ad21adaa8.tar.xz etherpad-d56b9b3b82cdebcaeb00eec0fcb4326ad21adaa8.zip |
Merge branch 'master' of git@github.com:ether/pad
Diffstat (limited to 'infrastructure/net.appjet.ajstdlib')
-rw-r--r-- | infrastructure/net.appjet.ajstdlib/ajstdlib.scala | 253 | ||||
-rw-r--r-- | infrastructure/net.appjet.ajstdlib/sqlbase.scala | 563 | ||||
-rw-r--r-- | infrastructure/net.appjet.ajstdlib/streaming-client.js | 920 | ||||
-rw-r--r-- | infrastructure/net.appjet.ajstdlib/streaming-iframe.html | 76 | ||||
-rw-r--r-- | infrastructure/net.appjet.ajstdlib/streaming.scala | 892 | ||||
-rw-r--r-- | infrastructure/net.appjet.ajstdlib/timer.scala | 85 |
6 files changed, 2789 insertions, 0 deletions
diff --git a/infrastructure/net.appjet.ajstdlib/ajstdlib.scala b/infrastructure/net.appjet.ajstdlib/ajstdlib.scala new file mode 100644 index 0000000..8d285af --- /dev/null +++ b/infrastructure/net.appjet.ajstdlib/ajstdlib.scala @@ -0,0 +1,253 @@ +/** + * Copyright 2009 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS-IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.appjet.ajstdlib; + +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.{ScheduledThreadPoolExecutor, Callable}; +import scala.collection.mutable.{HashMap, SynchronizedMap}; + +import org.mozilla.javascript.{Context, ScriptableObject, Function, RhinoException, Scriptable}; + +import net.appjet.oui.{SpecialJarOrNotFile, DiskLibrary, FixedDiskLibrary, VariableDiskLibrary, ExecutionContext, ExecutionContextUtils, ScopeReuseManager, config, exceptionlog}; +import net.appjet.bodylock.{BodyLock, ExecutionException}; +import net.appjet.common.util.LenientFormatter; + +import org.mortbay.jetty.nio.SelectChannelConnector; +import org.mortbay.util.ajax.ContinuationSupport; + +object ajstdlib { + def runModuleInNewScope(cx: ExecutionContext, moduleName: String): Any = { + val newScope = BodyLock.subScope(cx.runner.globalScope); + if (! libraryExists(moduleName)) + return Context.getUndefinedValue(); // unfortunately, returning "false" doesn't really do the right thing here. + try { + libraryExecutable(moduleName).execute(newScope); + } catch { + case e: ExecutionException => throw e; + case e => throw new ExecutionException("Error occurred while running module: "+moduleName, e); + // TODO: There was code here to print errors to the response if something didn't compile. Replace this code? + } + newScope; + } + + private val modules = new HashMap[String, DiskLibrary] with SynchronizedMap[String, DiskLibrary]; + private def library(name: String) = modules.getOrElseUpdate(name+".js", new VariableDiskLibrary(name+".js")); + private def libraryExists(name: String) = { + val lib = library(name); + // ScopeReuseManager.watch(lib); + lib.exists; + } + private def libraryExecutable(name: String) = { + val lib = library(name); + // ScopeReuseManager.watch(lib); + lib.executable; + } + + val globalLock = new ReentrantLock(); + val attributes = new HashMap[String, Any] with SynchronizedMap[String, Any]; + + def init() { + // any other ajstdlib initialization goes here. + Comet.init(); + } +} + +object printf { + def printf(format: String, list: Array[Object]): String = { +// val list: Array[Object] = new Array[Object](argList.getLength) +// for (i <- List.range(0, list.length)) +// list(i) = argList.getElement(i).getOrElse(null) match { +// case AppVM.JSNumber(n) => n +// case AppVM.JSString(s) => s +// case AppVM.JSBoolean(b) => b +// case _ => null +// } + val args = list.map(x => Context.jsToJava(x, classOf[Object])); + try { + val fmt = new LenientFormatter() + fmt.format(format, args: _*) + fmt.toString() + } catch { + case e: java.util.IllegalFormatException => + throw new ExecutionException("String Format Error: <tt>printf</tt> error: "+e.getMessage(), e) + } + } +} + + +import java.security.MessageDigest; + +object md5 { + def md5(input: String): String = { + val bytes = input.getBytes("UTF-8"); + md5(bytes); + } + def md5(bytes: Array[byte]): String = { + var md = MessageDigest.getInstance("MD5"); + var digest = md.digest(bytes); + var builder = new StringBuilder(); + for (b <- digest) { + builder.append(Integer.toString((b >> 4) & 0xf, 16)); + builder.append(Integer.toString(b & 0xf, 16)); + } + builder.toString(); + } +} + +object execution { + def runAsync(ec: ExecutionContext, f: Function) { + ec.asyncs += f; + } + + def executeCodeInNewScope(parentScope: Scriptable, + code: String, name: String, + startLine: Int): Scriptable = { + val ec = ExecutionContextUtils.currentContext; + val executable = + try { + BodyLock.compileString(code, name, startLine); + } catch { + case e: RhinoException => + throw new ExecutionException( + "Failed to execute code in new scope.", e); + } + if (ec == null || ec.runner == null) { + Thread.dumpStack(); + } + val scope = BodyLock.subScope( + if (parentScope != null) parentScope + else ec.runner.mainScope); + scope.setParentScope(ec.runner.mainScope); + executable.execute(scope); + scope; + } + + def runTask(taskName: String, args: Array[Object]): AnyRef = { + val ec = net.appjet.oui.execution.runOutOfBand( + net.appjet.oui.execution.scheduledTaskExecutable, + "Task "+taskName, + Some(Map("taskName" -> taskName, + "taskArguments" -> args)), + { error => + error match { + case e: Throwable => exceptionlog(e); + case _ => exceptionlog(error.toString); + } + }); + ec.result; + } + + def runTaskSimply(taskName: String, args: Array[Object]): AnyRef = { + net.appjet.oui.execution.runOutOfBandSimply( + net.appjet.oui.execution.scheduledTaskExecutable, + Some(Map("taskName" -> taskName, + "taskArguments" -> args))); + } + + def wrapRunTask(taskName: String, args: Array[Object], + returnType: Class[_]): Function0[AnyRef] = { + new Function0[AnyRef] { + def apply = Context.jsToJava(runTaskSimply(taskName, args), returnType); + } + } + + val threadpools = new HashMap[String, ScheduledThreadPoolExecutor] + with SynchronizedMap[String, ScheduledThreadPoolExecutor]; + + def createNamedTaskThreadPool(name: String, poolSize: Int) { + threadpools.put(name, new ScheduledThreadPoolExecutor(poolSize)); + } + + class TaskRunner(val taskName: String, args: Array[Object]) extends Callable[AnyRef] { + def call(): AnyRef = { + runTask(taskName, args); + } + } + + def scheduleTaskInPool(poolName: String, taskName: String, delayMillis: Long, args: Array[Object]) = { + val pool = threadpools.getOrElse(poolName, throw new RuntimeException("No such task threadpool: "+poolName)); + pool.schedule(new TaskRunner(taskName, args), delayMillis, java.util.concurrent.TimeUnit.MILLISECONDS); + } + + def shutdownAndWaitOnTaskThreadPool(poolName: String, timeoutMillis: Long) = { + val pool = threadpools.getOrElse(poolName, throw new RuntimeException("No such task threadpool: "+poolName)); + pool.shutdown(); + pool.awaitTermination(timeoutMillis, java.util.concurrent.TimeUnit.MILLISECONDS); + } + + def getContinuation(ec: ExecutionContext) = { + val req = ec.request.req; + ContinuationSupport.getContinuation(req, req).asInstanceOf[SelectChannelConnector.RetryContinuation]; + } + + def sync[T](obj: AnyRef)(block: => T): T = { + obj.synchronized { + block; + } + } +} + +import javax.mail._; +import javax.mail.internet._; +import java.util.Properties; + +object email { + def sendEmail(toAddr: Array[String], fromAddr: String, subject: String, headers: Scriptable, content: String): String = { + try { + val badAddresses = for (a <- toAddr if (a.indexOf("@") == -1)) yield a; + if (badAddresses.length > 0) { + "The email address"+(if (badAddresses.length > 1) "es" else "")+" "+ + badAddresses.mkString("\"", "\", \"", "\"")+" do"+(if (badAddresses.length == 1) "es" else "")+ + " not appear to be valid."; + } else { + val debug = false; + + val props = new Properties; + props.put("mail.smtp.host", config.smtpServerHost); + props.put("mail.smtp.port", config.smtpServerPort.toString()); + if (config.smtpUser != "") + props.put("mail.smtp.auth", "true"); + + val session = Session.getInstance(props, if (config.smtpUser != "") new Authenticator { + override def getPasswordAuthentication() = + new PasswordAuthentication(config.smtpUser, config.smtpPass); + } else null); + session.setDebug(debug); + + val msg = new MimeMessage(session); + val fromIAddr = new InternetAddress(fromAddr); + msg.setFrom(fromIAddr); + val toIAddr: Array[Address] = toAddr.map(x => (new InternetAddress(x))); // new InternetAddress(toAddr); + msg.setRecipients(Message.RecipientType.TO, toIAddr); + + if (headers != null) + for (o <- headers.getIds() if o.isInstanceOf[String]) { + val k = o.asInstanceOf[String] + msg.addHeader(k, headers.get(k, headers).asInstanceOf[String]); + } + + msg.setSubject(subject); + msg.setContent(content, "text/plain"); + Transport.send(msg); + ""; + } + } catch { + case e: MessagingException => { exceptionlog(e); e.printStackTrace() ; "Messaging exception: "+e.getMessage+"."; } + case e: Exception => { exceptionlog(e); e.printStackTrace(); "Unknown error."; } + } + } +} diff --git a/infrastructure/net.appjet.ajstdlib/sqlbase.scala b/infrastructure/net.appjet.ajstdlib/sqlbase.scala new file mode 100644 index 0000000..047c086 --- /dev/null +++ b/infrastructure/net.appjet.ajstdlib/sqlbase.scala @@ -0,0 +1,563 @@ +/** + * Copyright 2009 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS-IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.appjet.ajstdlib; + +import scala.collection.mutable.ArrayBuffer; + +import java.sql.{DriverManager, SQLException, Statement}; +import net.appjet.oui.{profiler, config, NoninheritedDynamicVariable}; +import com.mchange.v2.c3p0._; + +class SQLBase(driverClass: String, url: String, userName: String, password: String) { + + def isMysql:Boolean = (url.startsWith("jdbc:mysql:")); + def isDerby:Boolean = (url.startsWith("jdbc:derby:")); + + if (isDerby) { + System.setProperty("derby.system.home", config.derbyHome); + val f = new java.io.File(config.derbyHome); + if (! f.exists) { + if (! f.mkdirs()) + throw new RuntimeException("Couldn't create internal database storage directory: "+config.derbyHome); + } + if (! f.isDirectory) + throw new RuntimeException("Internal database storage directory is not a directory: "+config.derbyHome); + if (! f.canWrite) + throw new RuntimeException("Can't write to internal database storage directory: "+config.derbyHome); + } + + val cpds = new ComboPooledDataSource(); + cpds.setDriverClass(driverClass); + cpds.setJdbcUrl(url+(if (isMysql) "?useUnicode=true&characterEncoding=utf8" else "")); + + // derby does not require a password + if (!isDerby) { + cpds.setUser(userName); + cpds.setPassword(password); + } + + cpds.setMaxPoolSize(config.jdbcPoolSize); + cpds.setMaxConnectionAge(6*60*60); // 6 hours + if (config.devMode) { + cpds.setAutomaticTestTable("cpds_testtable"); + cpds.setTestConnectionOnCheckout(true); + } + +// { +// // register db driver +// try { +// new JDCConnectionDriver(driverClass, url+"?useUnicode=true&characterEncoding=utf8", userName, password); +// } catch { +// case e => { +// e.printStackTrace(); +// Runtime.getRuntime.halt(1); +// } +// } +// } + + private def getConnectionFromPool = { + val c = cpds.getConnection(); + c.setAutoCommit(true); + c; + } + + // Creates a dynamic variable whose .value depends on the innermost + // .withValue(){} on the call-stack. + private val currentConnection = new NoninheritedDynamicVariable[Option[java.sql.Connection]](None); + + def withConnection[A](block: java.sql.Connection=>A): A = { + currentConnection.value match { + case Some(c) => { + block(c); + } + case None => { + val t1 = profiler.time; + val c = getConnectionFromPool; + profiler.recordCumulative("getConnection", profiler.time-t1); + try { + currentConnection.withValue(Some(c)) { + block(c); + } + } finally { + c.close; + } + } + } + } + + private val currentlyInTransaction = new NoninheritedDynamicVariable(false); + + def inTransaction[A](block: java.sql.Connection=>A): A = { + withConnection(c => { + if (currentlyInTransaction.value) { + return block(c); + } else { + currentlyInTransaction.withValue(true) { + c.setAutoCommit(false); + c.setTransactionIsolation(java.sql.Connection.TRANSACTION_REPEATABLE_READ); + + try { + val result = block(c); + c.commit(); + c.setAutoCommit(true); + result; + } catch { + case e@net.appjet.oui.AppGeneratedStopException => { + c.commit(); + c.setAutoCommit(true); + throw e; + } + case (e:org.mozilla.javascript.WrappedException) if (e.getWrappedException == + net.appjet.oui.AppGeneratedStopException) => { + c.commit(); + c.setAutoCommit(true); + throw e; + } + case e => { + //println("inTransaction() caught error:"); + //e.printStackTrace(); + try { + c.rollback(); + c.setAutoCommit(true); + } catch { + case ex => { + println("Could not rollback transaction because: "+ex.toString()); + } + } + throw e; + } + } + } + } + }); + } + + def closing[A](closable: java.sql.Statement)(block: =>A): A = { + try { block } finally { closable.close(); } + } + + def closing[A](closable: java.sql.ResultSet)(block: =>A): A = { + try { block } finally { closable.close(); } + } + + def tableName(t: String) = id(t); + + val identifierQuoteString = withConnection(_.getMetaData.getIdentifierQuoteString); + def quoteIdentifier(s: String) = identifierQuoteString+s+identifierQuoteString; + private def id(s: String) = quoteIdentifier(s); + + def longTextType = if (isDerby) "CLOB" else "MEDIUMTEXT"; + + // derby seems to do things intelligently w.r.t. case-sensitivity and unicode support. + def createTableOptions = if (isMysql) " ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE utf8_bin" else ""; + + // creates table if it doesn't exist already + def createJSONTable(table: String) { + withConnection { c=> + val s = c.createStatement; + if (! doesTableExist(c, table)) { + closing(s) { + s.execute("CREATE TABLE "+tableName(table)+" ("+ + id("ID")+" VARCHAR(128) PRIMARY KEY NOT NULL, "+ + id("JSON")+" "+longTextType+" NOT NULL"+ + ")"+createTableOptions); + } + } + } + } + + // requires: table exists + // returns null if key doesn't exist + def getJSON(table: String, key: String): String = { + withConnection { c=> + val s = c.prepareStatement("SELECT "+id("JSON")+" FROM "+tableName(table)+" WHERE "+id("ID")+" = ?"); + closing(s) { + s.setString(1, key); + var resultSet = s.executeQuery(); + closing(resultSet) { + if (! resultSet.next()) { + null; + } + else { + resultSet.getString(1); + } + } + } + } + } + + def getAllJSON(table: String, start: Int, count: Int): Array[Object] = { + withConnection { c => + val s = c.prepareStatement("SELECT "+id("ID")+","+id("JSON")+" FROM "+tableName(table)+ + " ORDER BY "+id("ID")+" DESC"+ + " LIMIT ? OFFSET ?"); + closing(s) { + s.setInt(2, start); + s.setInt(1, count); + var resultSet = s.executeQuery(); + var output = new ArrayBuffer[Object]; + closing(resultSet) { + while (resultSet.next()) { + output += new { val id = resultSet.getString(1); val value = resultSet.getString(2) }; + } + output.toArray; + } + } + } + } + + def getAllJSONKeys(table: String): Array[String] = { + withConnection { c => + val s = c.prepareStatement("SELECT "+id("ID")+" FROM "+tableName(table)); + closing(s) { + var resultSet = s.executeQuery(); + var output = new ArrayBuffer[String]; + closing(resultSet) { + while (resultSet.next()) { + output += resultSet.getString(1); + } + output.toArray; + } + } + } + } + + // requires: table exists + // inserts key if it doesn't exist + def putJSON(table: String, key: String, json: String) { + withConnection { c=> + val update = c.prepareStatement("UPDATE "+tableName(table)+" SET "+id("JSON")+"=? WHERE "+id("ID")+"=?"); + closing(update) { + update.setString(1, json); + update.setString(2, key); + update.executeUpdate(); + if (update.getUpdateCount == 0) { + val insert = c.prepareStatement( + "INSERT INTO "+tableName(table)+" ("+id("ID")+", "+id("JSON")+") values (?,?)"); + closing(insert) { + insert.setString(1, key); + insert.setString(2, json); + insert.executeUpdate(); + } + } + } + } + } + + def deleteJSON(table: String, key: String) { + // requires: table exists + withConnection { c=> + val update = c.prepareStatement("DELETE FROM "+tableName(table)+" WHERE "+id("ID")+"=?"); + closing(update) { + update.setString(1, key); + update.executeUpdate(); + } + } + } + + private def metaName(table: String) = table+"_META"; + private def metaTableName(table: String) = tableName(metaName(table)); + private def textTableName(table: String) = tableName(table+"_TEXT"); + private def escapeSearchString(dbm: java.sql.DatabaseMetaData, s: String): String = { + val e = dbm.getSearchStringEscape(); + s.replace("_", e+"_").replace("%", e+"%"); + } + + private final val PAGE_SIZE = 20; + + def doesTableExist(connection: java.sql.Connection, table: String): Boolean = { + val databaseMetadata = connection.getMetaData; + val tables = databaseMetadata.getTables(null, null, + escapeSearchString(databaseMetadata, table), null); + closing(tables) { + tables.next(); + } + } + + def autoIncrementClause = if (isDerby) "GENERATED BY DEFAULT AS IDENTITY" else "AUTO_INCREMENT"; + + // creates table if it doesn't exist already + def createStringArrayTable(table: String) { + withConnection { c=> + if (! doesTableExist(c, metaName(table))) { // check to see if the *_META table exists + // create tables and indices + val s = c.createStatement; + closing(s) { + s.execute("CREATE TABLE "+metaTableName(table)+" ("+ + id("ID")+" VARCHAR(128) PRIMARY KEY NOT NULL, "+ + id("NUMID")+" INT UNIQUE "+autoIncrementClause+" "+ + ")"+createTableOptions); + val defaultOffsets = (1 to PAGE_SIZE).map(x=>"").mkString(","); + s.execute("CREATE TABLE "+textTableName(table)+" ("+ + ""+id("NUMID")+" INT, "+id("PAGESTART")+" INT, "+id("OFFSETS")+" VARCHAR(256) NOT NULL DEFAULT '"+defaultOffsets+ + "', "+id("DATA")+" "+longTextType+" NOT NULL"+ + ")"+createTableOptions); + s.execute("CREATE INDEX "+id(table+"-NUMID-PAGESTART")+" ON "+textTableName(table)+"("+id("NUMID")+", "+id("PAGESTART")+")"); + } + } + } + } + + // requires: table exists + // returns: null if key or (key,index) doesn't exist, else the value + def getStringArrayElement(table: String, key: String, index: Int): String = { + val (pageStart, offset) = getPageStartAndOffset(index); + val page = new StringArrayPage(table, key, pageStart, true); + page.data(offset); + } + + // requires: table exists + // returns: an array of the mappings present in the page that should hold the + // particular (key,index) mapping. the array may be empty or otherwise not + // contain the given (key,index). + def getPageStringArrayElements(table: String, key: String, index: Int): Array[IndexValueMapping] = { + val (pageStart, offset) = getPageStartAndOffset(index); + val page = new StringArrayPage(table, key, pageStart, true); + val buf = new scala.collection.mutable.ListBuffer[IndexValueMapping]; + + for(i <- 0 until page.data.length) { + val s = page.data(i); + if (s ne null) { + val n = pageStart + i; + buf += IndexValueMapping(n, s); + } + } + + buf.toArray; + } + + // requires: table exists + // creates key if doesn't exist + // value may be null + def putStringArrayElement(table: String, key: String, index: Int, value: String) { + val (pageStart, offset) = getPageStartAndOffset(index); + val page = new StringArrayPage(table, key, pageStart, false); + page.data(offset) = value; + page.updateDB(); + } + + def putMultipleStringArrayElements(table: String, key: String): Multiputter = new Multiputter { + var currentPage = None:Option[StringArrayPage]; + def flushPage() { + if (currentPage.isDefined) { + val page = currentPage.get; + page.updateDB(); + currentPage = None; + } + } + def finish() { + flushPage(); + } + def put(index: Int, value: String) { + try { + val (pageStart, offset) = getPageStartAndOffset(index); + if (currentPage.isEmpty || currentPage.get.pageStart != pageStart) { + flushPage(); + currentPage = Some(new StringArrayPage(table, key, pageStart, false)); + } + currentPage.get.data(offset) = value; + } + catch { + case e => { e.printStackTrace; throw e } + } + } + } + + trait Multiputter { + def put(index: Int, value: String); + def finish(); + } + + case class IndexValueMapping(index: Int, value: String); + + def clearStringArray(table: String, key: String) { + withConnection { c=> + val numid = getStringArrayNumId(c, table, key, false); + if (numid >= 0) { + { + val s = c.prepareStatement("DELETE FROM "+textTableName(table)+" WHERE "+id("NUMID")+"=?"); + closing(s) { + s.setInt(1, numid); + s.executeUpdate(); + } + } + { + val s = c.prepareStatement("DELETE FROM "+metaTableName(table)+" WHERE "+id("NUMID")+"=?"); + closing(s) { + s.setInt(1, numid); + s.executeUpdate(); + } + } + } + } + } + + private def getPageStartAndOffset(index: Int): (Int,Int) = { + val pageStart = (index / PAGE_SIZE) * PAGE_SIZE; + (pageStart, index - pageStart); + } + + // requires: table exists + // returns: numid of new string array + private def newStringArray(c: java.sql.Connection, table: String, key: String): Int = { + val s = c.prepareStatement("INSERT INTO "+metaTableName(table)+" ("+id("ID")+") VALUES (?)", + Statement.RETURN_GENERATED_KEYS); + closing(s) { + s.setString(1, key); + s.executeUpdate(); + val resultSet = s.getGeneratedKeys; + if (resultSet == null) + error("No generated numid for insert"); + closing(resultSet) { + if (! resultSet.next()) error("No generated numid for insert"); + resultSet.getInt(1); + } + } + } + + def getStringArrayNumId(c: java.sql.Connection, table: String, key: String, creating: Boolean): Int = { + val s = c.prepareStatement("SELECT "+id("NUMID")+" FROM "+metaTableName(table)+" WHERE "+id("ID")+"=?"); + closing(s) { + s.setString(1, key); + val resultSet = s.executeQuery(); + closing(resultSet) { + if (! resultSet.next()) { + if (creating) { + newStringArray(c, table, key); + } + else { + -1 + } + } + else { + resultSet.getInt(1); + } + } + } + } + + def getStringArrayAllKeys(table: String): Array[String] = { + withConnection { c=> + val s = c.prepareStatement("SELECT "+id("ID")+" FROM "+metaTableName(table)); + closing(s) { + val resultSet = s.executeQuery(); + closing(resultSet) { + val buf = new ArrayBuffer[String]; + while (resultSet.next()) { + buf += resultSet.getString(1); + } + buf.toArray; + } + } + } + } + + private class StringArrayPage(table: String, key: String, val pageStart: Int, readonly: Boolean) { + + val data = new Array[String](PAGE_SIZE); + + private val numid = withConnection { c=> + val nid = getStringArrayNumId(c, table, key, ! readonly); + + if (nid >= 0) { + val s = c.prepareStatement( + "SELECT "+id("OFFSETS")+","+id("DATA")+" FROM "+textTableName(table)+" WHERE "+id("NUMID")+"=? AND "+id("PAGESTART")+"=?"); + closing(s) { + s.setInt(1, nid); + s.setInt(2, pageStart); + val resultSet = s.executeQuery(); + closing(resultSet) { + if (! resultSet.next()) { + if (! readonly) { + val insert = c.prepareStatement("INSERT INTO "+textTableName(table)+ + " ("+id("NUMID")+", "+id("PAGESTART")+", "+id("DATA")+") VALUES (?,?,'')"); + closing(insert) { + insert.setInt(1, nid); + insert.setInt(2, pageStart); + insert.executeUpdate(); + } + } + } + else { + val offsetsField = resultSet.getString(1); + val dataField = resultSet.getString(2); + val offsetStrings = offsetsField.split(",", -1); + var i = 0; + var idx = 0; + while (i < PAGE_SIZE) { + val nstr = offsetStrings(i); + if (nstr != "") { + val n = nstr.toInt; + data(i) = dataField.substring(idx, idx+n); + idx += n; + } + i += 1; + } + } + } + } + } + nid; + } + + def updateDB() { + if (readonly) { + error("this is a readonly StringArrayPage"); + } + // assert: the relevant row of the TEXT table exists + if (data.find(_ ne null).isEmpty) { + withConnection { c=> + val update = c.prepareStatement("DELETE FROM "+textTableName(table)+ + " WHERE "+id("NUMID")+"=? AND "+id("PAGESTART")+"=?"); + closing(update) { + update.setInt(1, numid); + update.setInt(2, pageStart); + update.executeUpdate(); + } + } + } + else { + val offsetsStr = data.map(s => if (s eq null) "" else s.length.toString).mkString(","); + val dataStr = data.map(s => if (s eq null) "" else s).mkString(""); + withConnection { c=> + val s = c.prepareStatement("UPDATE "+textTableName(table)+ + " SET "+id("OFFSETS")+"=?, "+id("DATA")+"=? WHERE "+id("NUMID")+"=? AND "+id("PAGESTART")+"=?"); + closing(s) { + s.setString(1, offsetsStr); + s.setString(2, dataStr); + s.setInt(3, numid); + s.setInt(4, pageStart); + s.executeUpdate(); + } + } + } + } + } + + def close { + if (isDerby) { + cpds.close(); + try { + DriverManager.getConnection("jdbc:derby:;shutdown=true"); + } catch { + case e: SQLException => if (e.getErrorCode() != 50000) throw e + } + } + } +} + + diff --git a/infrastructure/net.appjet.ajstdlib/streaming-client.js b/infrastructure/net.appjet.ajstdlib/streaming-client.js new file mode 100644 index 0000000..3bfa227 --- /dev/null +++ b/infrastructure/net.appjet.ajstdlib/streaming-client.js @@ -0,0 +1,920 @@ +/** + * Copyright 2009 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS-IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +var host = window.location.host; + +function WebSocket(id) { + var self = this; + var socket = this; + var version = 2; + + var timeouts = {}; + + this.onopen = function() { } + this.onclosed = function() { } + this.onmessage = function() { } + this.onhiccup = function() { } + this.onlogmessage = function() { } + this.CONNECTING = 0; + this.OPEN = 1; + this.CLOSED = 2; + this.readyState = -1; + + var hiccupsLastMinute = 0; + var hiccupResetInterval = setInterval(function() { + hiccupsLastMinute = 0; + if (self.readyState == self.CLOSED) + clearInterval(hiccupResetInterval); + }, 60*1000); + + var isHiccuping = false; + function hiccup(channel) { + if (channel != officialChannel && channel != self) return; + if (isHiccuping) return; + log("hiccup: "+channel.name); + if (hiccupsLastMinute++ > 10) { + doDisconnect({reconnect: true, reason: "Too many hiccups!"}); + return; + } + closeAllChannels(); + timeout(timeouts, "hiccup", 15000, function() { + isHiccuping = false; + doDisconnect({reconnect: false, reason: "Couldn't contact server to hiccup."}); + }); + isHiccuping = true; + function tryHiccup() { + if (! isHiccuping) return; + self.onhiccup({connected: false}); + log("trying hiccup"); + timeout(timeouts, "singleHiccup", 5000, function() { + tryHiccup(); + }); + simpleXhr('post', postPath(), true, [{key: "oob", value: "hiccup"}], function(sc, msg) { + if (! isHiccuping) return; + if (msg.substring(0, "restart-fail".length) == "restart-fail") { + doDisconnect({reconnect: true, reason: "Server restarted or socket timed out on server."}); + } else if (sc != 200 || msg.substring(0, 2) != "ok") { + log("Failed to hiccup with error: "+sc+" / "+msg); + setTimeout(tryHiccup, 500); + } else { + isHiccuping = false; + timeouts.singleHiccup(); + timeouts.hiccup(); + doConnect(); + } + }); + } + tryHiccup(); + } + function closeAllChannels() { + for (var i in activeChannels) { + if (activeChannels.hasOwnProperty(i)) { + activeChannels[i].disconnect(); + } + } + officialChannel = undefined; + } + + function doDisconnect(obj, silent, sync) { + log("disconnected: "+obj.reason+" / "+(obj.data !== undefined ? "data: "+obj.data : "")); + logAll(); + closeAllChannels(); + if (longPollingIFrame && longPollingIFrame.div) { + longPollingIFrame.div.innerHTML = ""; + } + if (self.readyState != self.CLOSED) { + self.readyState = self.CLOSED; + if (! silent) { + postSingleMessageNow(true, "kill:"+obj.reason, sync, true); + } + self.onclosed(obj); + } + } + + this.disconnect = function(sync) { + doDisconnect({reason: "Closed by client."}, false, sync); + } + + + function doBasicConnect() { + var type = getBasicChannel(); + log("basic connect on type: "+type); + var channel = activeChannels[type] = new channelConstructors[type](); + channel.connect(); + } + + function doOtherConnect() { + var channels = getOtherChannels(); + var channel; var type; + for (var i = 0; i < channels.length; ++i) { + type = channels[i]; + log("other connect on type: "+type); + channel = activeChannels[type] = new channelConstructors[type](); + channel.connect(); + } + } + function doConnect() { + log("doing connect!"); + timeout(timeouts, "connect", 15000, function() { + doDisconnect({reconnect: false, reason: "Timeout connecting to server: no channel type was able to connect."}); + }); + doBasicConnect(); + } + + this.connect = function() { + log("socket connecting: "+id); + doConnect(); + } + + // util + function nicetime() { return Math.floor((new Date()).valueOf() / 100) % 10000000; } + function log(s) { self.onlogmessage("(comet @t: "+nicetime()+") "+s); } + function logAll() { + log(self.describe()) + } + this.describe = function() { + function describeChannels() { + out = []; + for (var i in activeChannels) { + if (activeChannels.hasOwnProperty(i)) { + out.push(i+": "+activeChannels[i].describe()); + } + } + return "[ "+out.join(", ")+" ]"; + } + return ("socket state: { id: "+id+", readyState: "+self.readyState+", isHiccuping: "+isHiccuping+", timeouts: "+describeTimeouts(timeouts)+", officialChannel: "+(officialChannel?officialChannel.name:"none")+", channels: "+describeChannels()+", isPosting: "+isPosting+", lastReceivedSeqNumber: "+lastReceivedSeqNumber+", lastPost: "+lastPost+", postTimeouts: "+describeTimeouts(postTimeouts)+", channelSeq: "+channelSeq+" }"); + } + + function wrapMethod(obj, method) { + return function() { + var arr = []; + for (var i=0; i < arguments.length; i++) { + arr.push(arguments[i]); + } + method.apply(obj, arr); + } + } + var _wm = wrapMethod; + + // cb should take statusCode, responseText, and optionally request + function simpleXhr(method, uri, async, params, cb, makeXhr) { +// log("making simple Xhr: "+[method, uri, async, params].join(", ")); + var request = (makeXhr || newRequestObject)(); + request.open(method, uri, async); + if (async) { + request.onreadystatechange = function() { + if (request.readyState != 4) return; + var status; + var responseText; + try { + status = request.status; + responseText = request.responseText; + } catch (e) { /* absorb ff error accessing request properties */ } + cb(status, responseText, request); + } + } + var data = null; + if (params) { + data = []; + for (var i = 0; i < params.length; ++i) { + data.push(encodeURIComponent(params[i].key)+"="+encodeURIComponent(params[i].value)); + } + data = data.join("&"); + request.setRequestHeader("Content-Type", "application/x-www-form-urlencoded; charset=utf-8"); + } + try { + request.send(data); + } catch (e) { request.abort(); cb(500, "Error sending data!", request); } + if (! async) { + var status; + var responseText; + try { + status = request.status; + responseText = request.responseText; + } catch (e) { /* absorb ff error accessing request properties */ } + cb(status, responseText, request); + } + return request; + } + + var timeout_noop = function() { } + function timeout(timeoutObject, timeoutName, millis, timeoutCallback) { + function clearIt(timeoutObject, timeoutName) { + if (timeoutObject[timeoutName]) { + timeoutObject[timeoutName](); + timeoutObject[timeoutName] = timeout_noop; + } + } + var timeoutId = setTimeout(function() { clearIt(timeoutObject, timeoutName); timeoutCallback(); }, millis); + var f = function() { + clearTimeout(timeoutId); + } + clearIt(timeoutObject, timeoutName); + timeoutObject[timeoutName] = f; + return f; + } + + // handling messages + var lastReceivedSeqNumber = 0; + + function dataHandler(msg) { + if (msg.seqNumber > lastReceivedSeqNumber+1) { + log("bad sequence number. expecting: "+(lastReceivedSeqNumber+1)+", got: "+msg.seqNumber); + hiccup(self); + return false; + } + if (msg.seqNumber < lastReceivedSeqNumber+1) return true; + lastReceivedSeqNumber = msg.seqNumber; + if (! msg.isControl) { + self.onmessage({ data: msg.content }); + return true; + } else { + if (msg.content == "kill") { + doDisconnect({reconnect: false, reason: "Killed by server."}); + return false; + } + } + } + + // client-server comm + var postPath = function() { + return "%contextPath%/post?r="+randomVar()+"&v="+version+"&id="+id+"&seq="+lastReceivedSeqNumber; + } + + function SimpleQueue() { + var base = []; + var head = 0; + var tail = 0; + this.offer = function(data) { + base[tail++] = data; + } + this.poll = function() { + if (this.length() > 0) { + var n = base[head]; + delete base[head++]; + return n; + } + } + this.clear = function() { + head = 0; + tail = 0; + var oldBase = base; + base = []; + return oldBase; + } + this.length = function() { + return tail - head; + } + } + var outgoingMessageQueue = new SimpleQueue(); + var isPosting = false; + var postTimeouts = {}; + var lastPost; + + function postSingleMessageNow(isControl, data, sync, force, cb) { + doPostMessages([{oob: isControl, data: data, cb: cb}], sync, force) + } + + function doPostMessages(messages, sync, force, cb) { + if (! force && self.readyState == self.CLOSED) return; + if (messages.length == 0) { + if (cb) cb(); + return; + } + var data = []; + var callbacks = []; + for (var i = 0; i < messages.length; ++i) { + data.push({key: (messages[i].oob ? "oob" : "m"), + value: messages[i].data}); + if (messages[i].cb) + callbacks.push(messages[i].cb); + } + function postFailed(sc, msg, req, src) { + var str = ""; + try { + str = sc + ": "+req.statusText+" - "+msg+" ("+src+")"; + } catch (e) { /* absorb potential Firefox error accessing req */ } + doDisconnect({reconnect: true, reason: "Posting message failed.", data: str}); + for (var i = 0; i < callbacks.length; ++i) { + callbacks[i](false, str); + } + } + function postCallback(sc, msg, request) { + postTimeouts.post(); + if (sc != 200 || msg.substring(0, 2) != "ok") { + postFailed(sc, msg, request, "1"); + } else { + for (var i = 0; i < callbacks.length; ++i) { + callbacks[i](true); + } + if (cb) cb(); + } + } + timeout(postTimeouts, "post", 15000, function() { + doDisconnect({reconnect: true, reason: "Posting message timed out."}); + }); + simpleXhr('post', postPath(), ! sync, data, postCallback); + } + + function postPendingMessages() { + if (isPosting == true) + return; + var messages = outgoingMessageQueue.clear(); + if (messages.length == 0) { + return; + } + isPosting = true; + doPostMessages(messages, false, false, function() { isPosting = false; setTimeout(postPendingMessages, 0); }); + lastPost = nicetime(); + } + this.postMessage = function(data, cb) { + if (self.readyState != self.OPEN) { + return; + } + outgoingMessageQueue.offer({data: data, cb: cb}); + setTimeout(function() { postPendingMessages() }, 0); + } + + // transports + function getValidChannels() { + var channels = []; + for (var i = 0; i < validChannels.length; ++i) { + var type = validChannels[i]; + if (window.location.hash.length > 0) { + if (window.location.hash != "#"+type) { + continue; + } + } + if ($ && $.browser.opera && type != 'shortpolling' && type != 'streaming') { + continue; + } + channels.push(type); + } + return channels; + } + function getBasicChannel() { + return getValidChannels()[0]; + } + + function getOtherChannels() { + return getValidChannels().slice(1); + } + + var officialChannel; + this.getTransportType = function() { + return (officialChannel ? officialChannel.name : "none"); + } + var validChannels = "%acceptableChannelTypes%"; + var canUseSubdomains = "%canUseSubdomains%"; + var activeChannels = {}; + var channelConstructors = { + shortpolling: ShortPollingChannel, + longpolling: LongPollingChannel, + streaming: StreamingChannel + } + + function describeTimeouts(timeouts) { + var out = []; + for (var i in timeouts) { + if (timeouts.hasOwnProperty(i)) { + out.push(i+": "+(timeouts[i] == timeout_noop ? "unset" : "set")); + } + } + return "{ "+out.join(", ")+" }"; + } + + var channelSeq = 1; + function notifyConnect(channel) { + timeouts.connect(); + if (! officialChannel || channel.weight > officialChannel.weight) { + log("switching to use channel: "+channel.name); + var oldChannel = officialChannel; + officialChannel = channel; + setTimeout(function() { + postSingleMessageNow(true, "useChannel:"+(channelSeq++)+":"+channel.name, false, false, function(success, msg) { + if (success) { + if (oldChannel) { + oldChannel.disconnect(); + } else { + // there was no old channel, so try connecting the other channels. + doOtherConnect(); + } + if (self.readyState != self.OPEN) { + self.readyState = self.OPEN; + self.onopen({}); + } else { + self.onhiccup({connected: true}); + } + } else { + doDisconnect({reconnect: true, reason: "Failed to select channel on server.", data: msg}); + } + }); + }, 0); + return true; + } else { + return false; + } + } + + function randomVar() { + return String(Math.round(Math.random()*1e12)); + } + + function channelPath() { + return "%contextPath%/channel?v="+version+"&r="+randomVar()+"&id="+id; + } + + function newRequestObject() { + var xmlhttp=false; + try { + xmlhttp = (window.ActiveXObject && new ActiveXObject("Msxml2.XMLHTTP")) + } catch (e) { + try { + xmlhttp = (window.ActiveXObject && new ActiveXObject("Microsoft.XMLHTTP")); + } catch (E) { + xmlhttp = false; + } + } + if (!xmlhttp && typeof XMLHttpRequest!='undefined') { + try { + xmlhttp = new XMLHttpRequest(); + } catch (e) { + xmlhttp=false; + } + } + if (!xmlhttp && window.createRequest) { + try { + xmlhttp = window.createRequest(); + } catch (e) { + xmlhttp=false; + } + } + return xmlhttp + } + + function DataFormatError(message) { + this.message = message; + } + + function readMessage(data, startIndex) { + if (! startIndex) startIndex = 0; + var sep = data.indexOf(":", startIndex); + if (sep < 0) return; // don't have all the bytes for this yet. + var chars = Number(data.substring(startIndex, sep)); + if (isNaN(chars)) + throw new DataFormatError("Bad length: "+data.substring(startIndex, sep)); + if (data.length < sep+1+chars) return; // don't have all the bytes for this yet. + var msg = data.substr(sep+1, chars); + return { message: msg, lastConsumedChar: sep+1+chars } + } + + function iframeReader(data, startIndex) { + if (startIndex == 0) + return { message: data, lastConsumedChar: data.length } + } + + function parseWireFormat(data, startIndex, reader) { + if (! startIndex) startIndex = 0; + var msgs = []; + var readThroughIndex = startIndex; + while (true) { + var msgObj = (reader || readMessage)(data, readThroughIndex) + if (! msgObj) break; + readThroughIndex = msgObj.lastConsumedChar; + var msg = msgObj.message; + var split = msg.split(":"); + if (split[0] == 'oob') { + msgs.push({oob: split.slice(1).join(":")}); + continue; + } + var seq = Number(split[0]); + if (isNaN(seq)) + throw new DataFormatError("Bad sequence number: "+split[0]); + var control = Number(split[1]); + if (isNaN(control)) + throw new DataFormatError("Bad control: "+split[1]); + var msgContent = split.slice(2).join(":"); + msgs.push({seqNumber: seq, isControl: (control == 1), content: msgContent}); + } + return { messages: msgs, lastConsumedChar: readThroughIndex } + } + + function handleMessages(data, cursor, channel, reader) { + try { + messages = parseWireFormat(data, cursor, reader); + } catch (e) { + if (e instanceof DataFormatError) { + log("Data format error: "+e.message); + hiccup(channel); + return; + } else { + log(e.toString()+" on line: "+e.lineNumber); + } + } + for (var i=0; i < messages.messages.length; i++) { + var oob = messages.messages[i].oob; + if (oob) { + if (oob == "restart-fail") { + doDisconnect({reconnect: true, reason: "Server restarted or socket timed out on server."}); + return; + } + } else { + if (! dataHandler(messages.messages[i])) + break; + } + } + return messages.lastConsumedChar; + } + + function ShortPollingChannel() { + this.weight = 0; + this.name = "shortpolling"; + + this.isConnected = false; + this.isClosed = false; + this.request; + this.clearRequest = function() { + if (this.request) { + this.request.abort(); + this.request = null; + } + } + this.timeouts = {}; + + this.describe = function() { + return "{ isConnected: "+this.isConnected+", isClosed: "+this.isClosed+", timeouts: "+describeTimeouts(this.timeouts)+", request: "+(this.request?"set":"not set")+" }" + } + + this.pollDataHandler = function(sc, response, request) { + if (request.readyState != 4) return; + if (this.timeouts.poll) this.timeouts.poll(); + var messages; + if (! this.isConnected) { + this.timeouts.connectAttempt(); + if (sc != 200) { + log(this.name+" connect failed: "+sc+" / "+response); + setTimeout(_wm(this, this.attemptConnect), 500); + return; + } + var msg = (response ? readMessage(response) : undefined); + if (msg && msg.message == "oob:ok") { + this.timeouts.initialConnect(); + this.isConnected = true; + log(this.name+" transport connected!"); + if (! notifyConnect(this)) { + // there are better options connected. + log(this.name+" transport not chosen for activation."); + this.disconnect(); + return; + } + this.doPoll(); + return; + } else { + log(this.name+" connect didn't get ok: "+sc+" / "+response); + setTimeout(_wm(this, this.attemptConnect), 500); + return; + } + } + var chars = handleMessages(request.responseText, 0, this); + if (sc != 200 || ((! chars) && this.emptyResponseBad)) { + hiccup(this); + } + setTimeout(_wm(this, this.doPoll), this.pollDelay); + this.clearRequest(); + } + + this.keepRetryingConnection = true; + this.cancelConnect = function() { + this.clearRequest(); + this.keepRetryingConnection = false; + } + this.cancelPoll = function() { + this.clearRequest(); + log("poll timed out."); + hiccup(this); + } + + this.doPoll = function() { + if (this.isClosed) return; + timeout(this.timeouts, "poll", this.pollTimeout, _wm(this, this.cancelPoll)); + this.request = + simpleXhr('GET', + channelPath()+"&channel="+this.name+"&seq="+lastReceivedSeqNumber+this.pollParams(), + true, undefined, _wm(this, this.pollDataHandler), this.xhrGenerator); + } + + this.pollParams = function() { + return ""; + } + this.pollTimeout = 5000; + this.pollDelay = 500; + + this.attemptConnect = function() { + if (! this.keepRetryingConnection) return; + log(this.name+" attempting connect"); + this.clearRequest(); + timeout(this.timeouts, "connectAttempt", 5000, _wm(this, this.attemptConnect)); + this.request = simpleXhr('GET', channelPath()+"&channel="+this.name+"&new=yes&create="+(socket.readyState == socket.OPEN ? "no" : "yes")+"&seq="+lastReceivedSeqNumber, + true, undefined, _wm(this, this.pollDataHandler), this.xhrGenerator); + } + this.connect = function() { + this.attemptConnect(); + timeout(this.timeouts, "initialConnect", 15000, _wm(this, this.cancelConnect)); + } + this.disconnect = function() { + log(this.name+" disconnected"); + this.isClosed = true; + this.clearRequest(); + } + } + + function StreamingChannel() { + this.weight = 2; + this.name = "streaming"; + var self = this; + + var isConnected = false; + var request; + function clearRequest() { + if (request) { + request.abort(); + request = null; + if (theStream) theStream = null; + if (ifrDiv) { + ifrDiv.innerHTML = ""; + ifrDiv = null; + } + } + } + var isClosed = false; + var timeouts = {}; + var cursor = 0; + + this.describe = function() { + return "{ isConnected: "+isConnected+", isClosed: "+isClosed+", timeouts: "+describeTimeouts(timeouts)+", request: "+(request?"set":"not set")+", cursor: "+cursor+" }"; + }; + + function connectOk() { + isConnected = true; + timeouts.initialConnect(); + if (! notifyConnect(self)) { + log("streaming transport not chosen for activation"); + self.disconnect(); + return; + } + } + + function streamDataHandler() { + if (timeouts.data) timeouts.data(); + if (isClosed) return; + try { + if (! request.responseText) return; + } catch (e) { return; } + if (! isConnected) { + var msg = readMessage(request.responseText, cursor); + if (! msg) return; + cursor = msg.lastReceivedSeqNumber; + if (msg.message == "oob:ok") { + connectOk(); + } else { + log("stream: incorrect channel connect message:"+msg.message); + self.disconnect(); + return; + } + } else { + cursor = handleMessages(request.responseText, cursor, self); + } + if (! request || request.readyState == 4) { + clearRequest(); + if (isConnected) { + log("stream connection unexpectedly closed."); + hiccup(self); + return; + } + } + timeout(timeouts, "data", 60*1000, function() { hiccup(self); }); + } + + function iframeDataHandler(data) { + if (isClosed) return; + if (! isConnected) { + if (data == "oob:ok") { + connectOk(); + } else { + log("iframe stream: unexpected data on connect - "+data); + } + } else { + handleMessages(data, 0, self, iframeReader); + } + } + + function cancelConnect() { + isClosed = true; + clearRequest(); + log("stream: failed to connect."); + } + + // IE Stuff. + var theStream; + var ifrDiv; + var iframeTestCount = 0; + function testIframe() { + var state; + try { + state = ifrDiv.firstChild.readyState; + } catch (e) { + hiccup(self); + return; + } + if (state == 'interactive' || iframeTestCount > 10) { + try { var tmp = ifrDiv.firstChild.contentWindow.document.getElementById("thebody") } + catch (e) { hiccup(self); } + } else { + iframeTestCount++; + setTimeout(testIframe, 500); + } + } + + this.connect = function() { + timeout(timeouts, "initialConnect", 15000, cancelConnect) + + if (canUseSubdomains) { + var streamurl = "//"+randomVar()+".comet."+host+channelPath()+"&channel=streaming&type=iframe&new=yes&create="+(socket.readyState == socket.OPEN ? "no" : "yes")+"&seq="+lastReceivedSeqNumber; + log("stream to: "+streamurl); + if ($ && $.browser.opera) { + // set up the opera stream; requires jquery because, why not? + ifrDiv = $('<div style="display: none;"></div>').get(0); + $('body').append(ifrDiv); + window.comet = { + pass_data: iframeDataHandler, + disconnect: function() { hiccup(self); } + } + $(ifrDiv).append($("<iframe src='"+streamurl+"'></iframe>")); + iframeTestCount = 0; + setTimeout(testIframe, 2000); + // if event-source supported disconnect notifications, fuck yeah we'd use it. +// theStream = $('<event-source>'); +// var streamurl = channelPath()+"&channel=streaming&type=opera&new=yes&create="+(socket.readyState == socket.OPEN ? "no" : "yes")+"&seq="+lastReceivedSeqNumber; +// theStream.get(0).addEventListener('message', function(event) { +// iframeDataHandler(event.data); +// }, false); +// theStream.attr('src', streamurl); + log("stream connect sent!"); + return; + } + try { // TODO: remove reference to both theStream and ifrDiv on unload! + theStream = (window.ActiveXObject && new ActiveXObject("htmlfile")); + if (theStream) { + theStream.open(); + theStream.write("<html><head><title>f<\/title><\/head><body>") + theStream.write("<s"+"cript>document.domain='"+document.domain+"';<\/s"+"cript>") + theStream.write("<\/body><\/html>") + theStream.close(); + ifrDiv = theStream.createElement("div") + theStream.appendChild(ifrDiv) + theStream.parentWindow.comet = { + pass_data: iframeDataHandler, + disconnect: function() { hiccup(self); } + } + ifrDiv.innerHTML = "<iframe src='"+streamurl+"'></iframe>"; + iframeTestCount = 0; + setTimeout(testIframe, 2000); + } + } catch (e) { + theStream = false + } + } else if ($ && $.browser.opera) { + // opera thinks it can do a normal stream, but it can't. + log("opera - not trying xhr"); + return; + } + // End IE Stuff. + if (! theStream) { + request = newRequestObject(); + request.open('get', channelPath()+"&channel=streaming&new=yes&create="+(socket.readyState == socket.OPEN ? "no" : "yes")+"&seq="+lastReceivedSeqNumber); + request.onreadystatechange = streamDataHandler; + try { + request.send(null); + } catch (e) { } + } + log("stream connect sent!"); + } + + this.disconnect = function() { + log("stream disconnected"); + isClosed = true; + clearRequest(); + } + log("new streamchannel"); + } + + // long-polling related stuff. + function iframePath(key) { + return "//"+key+".comet."+host+"%contextPath%/xhrXdFrame"; + } + + function createHiddenDiv() { + if (! document.getElementById('newcomethidden')) { + var d = document.createElement('div'); + d.setAttribute('id', 'newcomethidden'); + d.style.display = 'none'; + document.body.appendChild(d); + } + return document.getElementById('newcomethidden'); + } + + function ExtHostXHR(iframe) { + this.open = function(method, uri, async) { + this.method = method; + this.uri = uri; + this.async = async; + } + var headers = {}; + this.setRequestHeader = function(name, value) { + headers[name] = value; + } + this.send = function(data) { + var self = this; + this.xhr = iframe.iframe.contentWindow.doAction(this.method, this.uri, this.async, headers, data || null, function(status, response) { + self.readyState = 4; + self.status = status; + self.responseText = response; + self.onreadystatechange(); + }); + } + this.abort = function() { + if (this.xhr) + iframe.contentWindow.doAbort(this.xhr); + } + } + + function createRequestIframe(cb) { + var randomKey = randomVar(); + try { + var activeXControl = (window.ActiveXObject && new ActiveXObject("htmlfile")); + var htmlfileDiv; + if (activeXControl) { + activeXControl.open(); + activeXControl.write('<html><head><title>f</title></head><body>'); + activeXControl.write('<scr'+'ipt>document.domain=\''+document.domain+'\';</scr'+'ipt>'); + activeXControl.write('</body></html>'); + activeXControl.close(); + htmlfileDiv = activeXControl.createElement('div'); + activeXControl.appendChild(htmlfileDiv); + activeXControl.parentWindow["done_"+randomKey] = cb; + htmlfileDiv.innerHTML = "<iframe src='"+iframePath(randomKey)+"'></iframe>"; + return {iframe: htmlfileDiv.firstChild /* should be an iframe */, axc: activeXControl, div: htmlfileDiv}; + } + } catch (e) { + activeXControl = false; + } + log("Not using IE setup."); + var requestIframe = document.createElement('iframe'); + createHiddenDiv().appendChild(requestIframe); + window["done_"+randomKey] = function() { try { delete window["done_"+randomKey]; } catch (e) { }; cb(); } + requestIframe.src = iframePath(randomKey); + return {iframe: requestIframe}; + } + + function createIframeRequestObject() { + if (! longPollingIFrame) throw Error("WebSocket isn't properly set up!"); + return new ExtHostXHR(longPollingIFrame); + } + + var longPollingIFrame; + function LongPollingChannel() { + ShortPollingChannel.apply(this); // sets up other state. + this.weight = 1; + this.name = "longpolling"; + + this.pollDelay = 0; + this.pollTimeout = 15000; + this.pollParams = function() { + return "&timeout="+(this.pollTimeout-5000); + } + var connect = this.connect; + this.connect = function() { + if (! longPollingIFrame) { + longPollingIFrame = + createRequestIframe(_wm(this, connect)); // specifically *not* this.connect. we want the old one! + } else { + connect.apply(this); + } + } + this.xhrGenerator = createIframeRequestObject; + this.emptyResponseBad = true; + } +}
\ No newline at end of file diff --git a/infrastructure/net.appjet.ajstdlib/streaming-iframe.html b/infrastructure/net.appjet.ajstdlib/streaming-iframe.html new file mode 100644 index 0000000..3bdb5c4 --- /dev/null +++ b/infrastructure/net.appjet.ajstdlib/streaming-iframe.html @@ -0,0 +1,76 @@ +<html> +<head> +<script> +function createRequestObject() { + var xmlhttp=false; + /*@cc_on @*/ + /*@if (@_jscript_version >= 5) + try { + xmlhttp = new ActiveXObject("Msxml2.XMLHTTP"); + } catch (e) { + try { + xmlhttp = new ActiveXObject("Microsoft.XMLHTTP"); + } catch (E) { + xmlhttp = false; + } + } + @end @*/ + if (!xmlhttp && typeof XMLHttpRequest!='undefined') { + try { + xmlhttp = new XMLHttpRequest(); + } catch (e) { + xmlhttp=false; + } + } + if (!xmlhttp && window.createRequest) { + try { + xmlhttp = window.createRequest(); + } catch (e) { + xmlhttp=false; + } + } + return xmlhttp +} +var host = window.location.host; +var oldDomain = document.domain; +var newDomain = oldDomain.substring(oldDomain.indexOf(".", oldDomain.indexOf(".")+1)+1); + +function doAction(method, uri, async, headers, body, cb) { + try { + document.domain = oldDomain; + } catch (e) { } + var req = createRequestObject(); + req.open(method, '//'+host+uri, async); + for (var i in headers) { + req.setRequestHeader(i, headers[i]); + } + req.onreadystatechange = function() { + if (req.readyState == 4) { + try { + document.domain = newDomain; + cb(req.status, req.responseText); + } catch (e) { + // yikes. well, hopefully a timeout will notice this error. + } + } + } + req.send(body); +} +function doAbort(xhr) { + try { + document.domain = oldDomain; + } catch (e) { } + xhr.abort(); +} +document.domain = newDomain; +window.onload = function() { + var doneKey = 'done_'+oldDomain.split(".", 2)[0]; + setTimeout(function() { + window.parent[doneKey](); + }, 0); +} +</script> +</head> +<body> +</body> +</html> diff --git a/infrastructure/net.appjet.ajstdlib/streaming.scala b/infrastructure/net.appjet.ajstdlib/streaming.scala new file mode 100644 index 0000000..fbff137 --- /dev/null +++ b/infrastructure/net.appjet.ajstdlib/streaming.scala @@ -0,0 +1,892 @@ +/** + * Copyright 2009 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS-IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.appjet.ajstdlib; + +import scala.collection.mutable.{Queue, HashMap, SynchronizedMap, ArrayBuffer}; +import javax.servlet.http.{HttpServletRequest, HttpServletResponse, HttpServlet}; +import org.mortbay.jetty.servlet.{ServletHolder, Context}; +import org.mortbay.jetty.{HttpConnection, Handler, RetryRequest}; +import org.mortbay.jetty.nio.SelectChannelConnector; +import org.mortbay.io.nio.SelectChannelEndPoint; +import org.mortbay.util.ajax.{ContinuationSupport, Continuation}; + +import java.util.{Timer, TimerTask}; +import java.lang.ref.WeakReference; + +import org.mozilla.javascript.{Context => JSContext, Scriptable}; + +import net.appjet.oui._; +import net.appjet.oui.Util.enumerationToRichEnumeration; +import net.appjet.common.util.HttpServletRequestFactory; + +trait SocketConnectionHandler { + def message(sender: StreamingSocket, data: String, req: HttpServletRequest); + def connect(socket: StreamingSocket, req: HttpServletRequest); + def disconnect(socket: StreamingSocket, req: HttpServletRequest); +} + +object SocketManager { + val sockets = new HashMap[String, StreamingSocket] with SynchronizedMap[String, StreamingSocket]; + val handler = new SocketConnectionHandler { + val cometLib = new FixedDiskLibrary(new SpecialJarOrNotFile(config.ajstdlibHome, "oncomet.js")); + def cometExecutable = cometLib.executable; + + def message(socket: StreamingSocket, data: String, req: HttpServletRequest) { + val t1 = profiler.time; +// println("Message from: "+socket.id+": "+data); + val runner = ScopeReuseManager.getRunner; + val ec = ExecutionContext(new RequestWrapper(req), new ResponseWrapper(null), runner); + ec.attributes("cometOperation") = "message"; + ec.attributes("cometId") = socket.id; + ec.attributes("cometData") = data; + ec.attributes("cometSocket") = socket; + net.appjet.oui.execution.execute( + ec, + (sc: Int, msg: String) => + throw new HandlerException(sc, msg, null), + () => {}, + () => { ScopeReuseManager.freeRunner(runner); }, + Some(cometExecutable)); + cometlatencies.register(((profiler.time-t1)/1000).toInt); + } + def connect(socket: StreamingSocket, req: HttpServletRequest) { +// println("Connect on: "+socket); + val runner = ScopeReuseManager.getRunner; + val ec = ExecutionContext(new RequestWrapper(req), new ResponseWrapper(null), runner); + ec.attributes("cometOperation") = "connect"; + ec.attributes("cometId") = socket.id; + ec.attributes("cometSocket") = socket; + net.appjet.oui.execution.execute( + ec, + (sc: Int, msg: String) => + throw new HandlerException(sc, msg, null), + () => {}, + () => { ScopeReuseManager.freeRunner(runner); }, + Some(cometExecutable)); + } + def disconnect(socket: StreamingSocket, req: HttpServletRequest) { + val toRun = new Runnable { + def run() { + val runner = ScopeReuseManager.getRunner; + val ec = ExecutionContext(new RequestWrapper(req), new ResponseWrapper(null), runner); + ec.attributes("cometOperation") = "disconnect"; + ec.attributes("cometId") = socket.id; + ec.attributes("cometSocket") = socket; + net.appjet.oui.execution.execute( + ec, + (sc: Int, msg: String) => + throw new HandlerException(sc, msg, null), + () => {}, + () => { ScopeReuseManager.freeRunner(runner); }, + Some(cometExecutable)); + } + } + main.server.getThreadPool().dispatch(toRun); + } + } + def apply(id: String, create: Boolean) = { + if (create) { + Some(sockets.getOrElseUpdate(id, new StreamingSocket(id, handler))); + } else { + if (id == null) + error("bad id: "+id); + sockets.get(id); + } + } + class HandlerException(val sc: Int, val msg: String, val cause: Exception) + extends RuntimeException("An error occurred while handling a request: "+sc+" - "+msg, cause); +} + +// And this would be the javascript interface. Whee. +object Comet extends CometSupport.CometHandler { + def init() { + CometSupport.cometHandler = this; + context.start(); + } + + val acceptableTransports = { + val t = new ArrayBuffer[String]; + if (! config.disableShortPolling) { + t += "shortpolling"; + } + if (config.transportUseWildcardSubdomains) { + t += "longpolling"; + } + t += "streaming"; + t.mkString("['", "', '", "']"); + } + + + val servlet = new StreamingSocketServlet(); + val holder = new ServletHolder(servlet); + val context = new Context(null, "/", Context.NO_SESSIONS | Context.NO_SECURITY); + context.addServlet(holder, "/*"); + context.setMaxFormContentSize(1024*1024); + + def handleCometRequest(req: HttpServletRequest, res: HttpServletResponse) { + context.handle(req.getRequestURI().substring(config.transportPrefix.length), req, res, Handler.FORWARD); + } + + lazy val ccLib = new FixedDiskResource(new JarOrNotFile(config.ajstdlibHome, "streaming-client.js") { + override val classBase = "/net/appjet/ajstdlib/"; + override val fileSep = "/../../net.appjet.ajstdlib/"; + }); + def clientCode(contextPath: String, acceptableChannelTypes: String) = { + ccLib.contents.replaceAll("%contextPath%", contextPath).replaceAll("\"%acceptableChannelTypes%\"", acceptableChannelTypes).replaceAll("\"%canUseSubdomains%\"", if (config.transportUseWildcardSubdomains) "true" else "false"); + } + def clientMTime = ccLib.fileLastModified; + + lazy val ccFrame = new FixedDiskResource(new JarOrNotFile(config.ajstdlibHome, "streaming-iframe.html") { + override val classBase = "/net/appjet/ajstdlib/"; + override val fileSep = "/../../net.appjet.ajstdlib/"; + }); + def frameCode = { + if (! config.devMode) + ccFrame.contents.replace("<head>\n<script>", """<head> + <script> + window.onerror = function() { /* silently drop errors */ } + </script> + <script>"""); + else + ccFrame.contents; + } + + + // public + def connections(ec: ExecutionContext): Scriptable = { + JSContext.getCurrentContext().newArray(ec.runner.globalScope, SocketManager.sockets.keys.toList.toArray[Object]); + } + + // public + def connectionStatus = { + val m = new HashMap[String, Int]; + for (socket <- SocketManager.sockets.values) { + val key = socket.currentChannel.map(_.kind.toString()).getOrElse("(unconnected)"); + m(key) = m.getOrElse(key, 0) + 1; + } + m; + } + + // public + def getNumCurrentConnections = SocketManager.sockets.size; + + // public + def write(id: String, msg: String) { + SocketManager.sockets.get(id).foreach(_.sendMessage(false, msg)); + } + + // public + def isConnected(id: String): java.lang.Boolean = { + SocketManager.sockets.contains(id); + } + + // public + def getTransportType(id: String): String = { + SocketManager.sockets.get(id).map(_.currentChannel.map(_.kind.toString()).getOrElse("none")).getOrElse("none"); + } + + // public + def disconnect(id: String) { + SocketManager.sockets.get(id).foreach(x => x.close()); + } + + // public + def setAttribute(ec: ExecutionContext, id: String, key: String, value: String) { + ec.attributes.get("cometSocket").map(x => Some(x.asInstanceOf[StreamingSocket])).getOrElse(SocketManager.sockets.get(id)) + .foreach(_.attributes(key) = value); + } + // public + def getAttribute(ec: ExecutionContext, id: String, key: String): String = { + ec.attributes.get("cometSocket").map(x => Some(x.asInstanceOf[StreamingSocket])).getOrElse(SocketManager.sockets.get(id)) + .map(_.attributes.getOrElse(key, null)).getOrElse(null); + } + + // public + def getClientCode(ec: ExecutionContext) = { + clientCode(config.transportPrefix, acceptableTransports); + } + def getClientMTime(ec: ExecutionContext) = clientMTime; +} + +class StreamingSocket(val id: String, handler: SocketConnectionHandler) { + var hasConnected = false; + var shutdown = false; + var killed = false; + var currentChannel: Option[Channel] = None; + val activeChannels = new HashMap[ChannelType.Value, Channel] + with SynchronizedMap[ChannelType.Value, Channel]; + + lazy val attributes = new HashMap[String, String] with SynchronizedMap[String, String]; + + def channel(typ: String, create: Boolean, subType: String): Option[Channel] = { + val channelType = ChannelType.valueOf(typ); + if (channelType.isEmpty) { + streaminglog(Map( + "type" -> "error", + "error" -> "unknown channel type", + "channelType" -> channelType)); + None; + } else if (create) { + Some(activeChannels.getOrElseUpdate(channelType.get, Channels.createNew(channelType.get, this, subType))); + } else { + activeChannels.get(channelType.get); + } + } + + val outgoingMessageQueue = new Queue[SocketMessage]; + val unconfirmedMessages = new HashMap[Int, SocketMessage]; + + var lastSentSeqNumber = 0; + var lastConfirmedSeqNumber = 0; + + // external API + def sendMessage(isControl: boolean, body: String) { + if (hasConnected && ! shutdown) { + synchronized { + lastSentSeqNumber += 1; + val msg = new SocketMessage(lastSentSeqNumber, isControl, body); + outgoingMessageQueue += msg; + unconfirmedMessages(msg.seq) = msg; + } + currentChannel.foreach(_.messageWaiting()); + } + } + def close() { + synchronized { + sendMessage(true, "kill"); + shutdown = true; + Channels.timer.schedule(new TimerTask { + def run() { + kill("server request, timeout"); + } + }, 15000); + } + } + + var creatingRequest: Option[HttpServletRequest] = None; + // internal API + def kill(reason: String) { + synchronized { + if (! killed) { + streaminglog(Map( + "type" -> "event", + "event" -> "connection-killed", + "connection" -> id, + "reason" -> reason)); + killed = true; + SocketManager.sockets -= id; + activeChannels.foreach(_._2.close()); + currentChannel = None; + if (hasConnected) { + handler.disconnect(this, creatingRequest.getOrElse(null)); + } + } + } + } + def receiveMessage(body: String, req: HttpServletRequest) { +// println("Message received on "+id+": "+body); + handler.message(this, body, req); + } + def getWaitingMessage(channel: Channel): Option[SocketMessage] = { + synchronized { + if (currentChannel.isDefined && currentChannel.get == channel && + ! outgoingMessageQueue.isEmpty) { + Some(outgoingMessageQueue.dequeue); + } else { + None; + } + } + } + def getUnconfirmedMessages(channel: Channel): Collection[SocketMessage] = { + synchronized { + if (currentChannel.isDefined && currentChannel.get == channel) { + for (i <- lastConfirmedSeqNumber+1 until lastSentSeqNumber+1) + yield unconfirmedMessages(i); + } else { + List[SocketMessage](); + } + } + } + def updateConfirmedSeqNumber(channel: Channel, received: Int) { + synchronized { + if (received > lastConfirmedSeqNumber && (channel == null || (currentChannel.isDefined && channel == currentChannel.get))) { + val oldConfirmed = lastConfirmedSeqNumber; + lastConfirmedSeqNumber = received; + for (i <- oldConfirmed+1 until lastConfirmedSeqNumber+1) { // inclusive! + unconfirmedMessages -= i; + } + } + } + } + + var lastChannelUpdate = 0; + def useChannel(seqNo: Int, channelType0: String, req: HttpServletRequest) = synchronized { + if (seqNo <= lastChannelUpdate) false else { + lastChannelUpdate = seqNo; + val channelType = ChannelType.valueOf(channelType0); + if (channelType.isDefined) { + val channel = activeChannels.get(channelType.get); + if (channel.isDefined) { + if (! hasConnected) { + hasConnected = true; + creatingRequest = Some(HttpServletRequestFactory.createRequest(req)); + handler.connect(this, req); + } + currentChannel = channel; +// println("switching "+id+" to channel: "+channelType0); + if (currentChannel.get.isConnected) { + revive(channel.get); + } else { + hiccup(channel.get); + } + currentChannel.get.messageWaiting(); + true; + } else + false; + } else + false; + } + } +// def handleReceivedMessage(seq: Int, data: String) { +// synchronized { +// handler.message(this, data) +// // TODO(jd): add client->server sequence numbers. +// // if (seq == lastReceivedSeqNumber+1){ +// // lastReceivedSeqNumber = seq; +// // handler.message(this, data); +// // } else { +// // // handle error. +// // } +// } +// } + def hiccup(channel: Channel) = synchronized { + if (currentChannel.isDefined && channel == currentChannel.get) { +// println("hiccuping: "+id); + scheduleTimeout(); + } + } + def revive(channel: Channel) = synchronized { + if (currentChannel.isDefined && channel == currentChannel.get) { +// println("reviving: "+id); + cancelTimeout(); + } + } + def prepareForReconnect() = synchronized { +// println("client-side hiccup: "+id); + activeChannels.foreach(_._2.close()); + activeChannels.clear(); + currentChannel = None; + scheduleTimeout(); + } + + // helpers + var timeoutTask: TimerTask = null; + + def scheduleTimeout() { + if (timeoutTask != null) return; + val p = new WeakReference(this); + timeoutTask = new TimerTask { + def run() { + val socket = p.get(); + if (socket != null) { + socket.kill("timeout"); + } + } + } + Channels.timer.schedule(timeoutTask, 15*1000); + } + def cancelTimeout() { + if (timeoutTask != null) + timeoutTask.cancel(); + timeoutTask = null; + } + scheduleTimeout(); + + streaminglog(Map( + "type" -> "event", + "event" -> "connection-created", + "connection" -> id)); +} + +object ChannelType extends Enumeration("shortpolling", "longpolling", "streaming") { + val ShortPolling, LongPolling, Streaming = Value; +} + +object Channels { + def createNew(typ: ChannelType.Value, socket: StreamingSocket, subType: String): Channel = { + typ match { + case ChannelType.ShortPolling => new ShortPollingChannel(socket); + case ChannelType.LongPolling => new LongPollingChannel(socket); + case ChannelType.Streaming => { + subType match { + case "iframe" => new StreamingChannel(socket) with IFrameChannel; + case "opera" => new StreamingChannel(socket) with OperaChannel; + case _ => new StreamingChannel(socket); + } + } + } + } + + val timer = new Timer(true); +} + +class SocketMessage(val seq: Int, val isControl: Boolean, val body: String) { + def payload = seq+":"+(if (isControl) "1" else "0")+":"+body; +} + +trait Channel { + def messageWaiting(); + def close(); + def handle(req: HttpServletRequest, res: HttpServletResponse); + def isConnected: Boolean; + + def kind: ChannelType.Value; + def sendRestartFailure(ec: ExecutionContext); +} + +trait XhrChannel extends Channel { + def wrapBody(msg: String) = msg.length+":"+msg; + + // wire format: msgLength:seq:[01]:msg + def wireFormat(msg: SocketMessage) = wrapBody(msg.payload); + def controlMessage(data: String) = wrapBody("oob:"+data); + + def sendRestartFailure(ec: ExecutionContext) { + ec.response.write(controlMessage("restart-fail")); + } +} + +// trait IFrameChannel extends Channel { +// def wireFormat(msg: SocketMessage) +// } + +class ShortPollingChannel(val socket: StreamingSocket) extends Channel with XhrChannel { + def kind = ChannelType.ShortPolling; + + def messageWaiting() { + // do nothing. + } + def close() { + // do nothing + } + def isConnected = false; + + def handle(req: HttpServletRequest, res: HttpServletResponse) { + val ec = req.getAttribute("executionContext").asInstanceOf[ExecutionContext]; + val out = new StringBuilder(); + socket.synchronized { + socket.revive(this); + if (req.getParameter("new") == "yes") { + out.append(controlMessage("ok")); + } else { + val lastReceivedSeq = java.lang.Integer.parseInt(req.getParameter("seq")); + socket.updateConfirmedSeqNumber(this, lastReceivedSeq); + for (msg <- socket.getUnconfirmedMessages(this)) { + out.append(wireFormat(msg)); + } + // ALL MESSAGES ARE UNCONFIRMED AT THIS POINT! JUST CLEAR QUEUE. + var msg = socket.getWaitingMessage(this); + while (msg.isDefined) { + msg = socket.getWaitingMessage(this); + } + } + } +// println("Writing to "+socket.id+": "+out.toString); + ec.response.write(out.toString); + socket.synchronized { + socket.hiccup(this); + } + } +} + +trait IFrameChannel extends StreamingChannel { + override def wrapBody(msgBody: String) = { + val txt = "<script type=\"text/javascript\">p('"+ + msgBody.replace("\\","\\\\").replace("'", "\\'")+"');</script>"; + if (txt.length < 256) + String.format("%256s", txt); + else + txt; + } + + def header(req: HttpServletRequest) = { + val document_domain = + "\""+req.getHeader("Host").split("\\.").slice(2).mkString(".").split(":")(0)+"\""; + """<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" +"http://www.w3.org/TR/html4/strict.dtd"> +<html><head><title>f</title></head><body id="thebody" onload="(!parent.closed)&&d()"><script type="text/javascript">document.domain = """+document_domain+"""; +var p = function(data) { try { parent.comet.pass_data } catch (err) { /* failed to pass data. no recourse. */ } }; +var d = parent.comet.disconnect;"""+(if(!config.devMode)"\nwindow.onerror = function() { /* silently drop errors */ }\n" else "")+"</script>"; // " - damn textmate mode! + } + + override def sendRestartFailure(ec: ExecutionContext) { + ec.response.write(header(ec.request.req)); + ec.response.write(controlMessage("restart-fail")); + } + + override def handleNewConnection(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) { + super.handleNewConnection(req, res, out); + res.setContentType("text/html"); + out.append(header(req)); + } +} + +trait OperaChannel extends StreamingChannel { + override def wrapBody(msgBody: String) = { + "Event: message\ndata: "+msgBody+"\n\n"; + } + override def handleNewConnection(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) { + super.handleNewConnection(req, res, out); + res.setContentType("application/x-dom-event-stream"); + } +} + +class StreamingChannel(val socket: StreamingSocket) extends Channel with XhrChannel { + def kind = ChannelType.Streaming; + + var c: Option[SelectChannelConnector.RetryContinuation] = None; + var doClose = false; + + def messageWaiting() { + main.server.getThreadPool().dispatch(new Runnable() { + def run() { + socket.synchronized { + c.filter(_.isPending()).foreach(_.resume()); + } + } + }); + } + + def setSequenceNumberIfAppropriate(req: HttpServletRequest) { + if (c.get.isNew) { + val lastReceivedSeq = java.lang.Integer.parseInt(req.getParameter("seq")); + socket.updateConfirmedSeqNumber(this, lastReceivedSeq); + } + } + + def sendHandshake(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) { + out.append(controlMessage("ok")); + } + + def sendUnconfirmedMessages(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) { + for (msg <- socket.getUnconfirmedMessages(this)) { + out.append(wireFormat(msg)); + } + } + + def sendWaitingMessages(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) { + var msg = socket.getWaitingMessage(this); + while (msg.isDefined) { + out.append(wireFormat(msg.get)); + msg = socket.getWaitingMessage(this); + } + } + + def handleUnexpectedDisconnect(req: HttpServletRequest, res: HttpServletResponse, ep: KnowsAboutDispatch) { + socket.synchronized { + socket.hiccup(this); + } + ep.close(); + } + + def writeAndFlush(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder, ep: KnowsAboutDispatch) { +// println("Writing to "+socket.id+": "+out.toString); + res.getWriter.print(out.toString); + res.getWriter.flush(); + } + + def suspendIfNecessary(req: HttpServletRequest, res: HttpServletResponse, + out: StringBuilder, ep: KnowsAboutDispatch) { + scheduleKeepalive(50*1000); + ep.undispatch(); + c.get.suspend(0); + } + + def sendKeepaliveIfNecessary(out: StringBuilder, sendKeepalive: Boolean) { + if (out.length == 0 && sendKeepalive) { + out.append(controlMessage("keepalive")); + } + } + + def shouldHandshake(req: HttpServletRequest, res: HttpServletResponse) = c.get.isNew; + + var sendKeepalive = false; + var keepaliveTask: TimerTask = null; + def scheduleKeepalive(timeout: Int) { + if (keepaliveTask != null) { + keepaliveTask.cancel(); + } + val p = new WeakReference(this); + keepaliveTask = new TimerTask { + def run() { + val channel = p.get(); + if (channel != null) { + channel.synchronized { + channel.sendKeepalive = true; + channel.messageWaiting(); + } + } + } + } + Channels.timer.schedule(keepaliveTask, timeout); + } + + def handleNewConnection(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) { + req.setAttribute("StreamingSocketServlet_channel", this); + res.setHeader("Connection", "close"); + for ((k, v) <- Util.noCacheHeaders) { res.setHeader(k, v); } // maybe this will help with proxies? + res.setContentType("text/messages; charset=utf-8"); + } + + def handle(req: HttpServletRequest, res: HttpServletResponse) { + val ec = req.getAttribute("executionContext").asInstanceOf[ExecutionContext]; + val ep = HttpConnection.getCurrentConnection.getEndPoint.asInstanceOf[KnowsAboutDispatch]; + val out = new StringBuilder; + try { + socket.synchronized { + val sendKeepaliveNow = sendKeepalive; + sendKeepalive = false; + if (keepaliveTask != null) { + keepaliveTask.cancel(); + keepaliveTask = null; + } + c = Some(ContinuationSupport.getContinuation(req, socket).asInstanceOf[SelectChannelConnector.RetryContinuation]); + setSequenceNumberIfAppropriate(req); + if (doClose) { + ep.close(); + return; + } + if (c.get.isNew) { + handleNewConnection(req, res, out); + } else { + c.get.suspend(-1); + if (ep.isDispatched) { + handleUnexpectedDisconnect(req, res, ep); + return; + } + } + if (shouldHandshake(req, res)) { +// println("new stream request: "+socket.id); + sendHandshake(req, res, out); + sendUnconfirmedMessages(req, res, out); + } + sendWaitingMessages(req, res, out); + sendKeepaliveIfNecessary(out, sendKeepaliveNow); + suspendIfNecessary(req, res, out, ep); + } + } finally { + writeAndFlush(req, res, out, ep); + } + } + + def close() { + doClose = true; + messageWaiting(); + } + + def isConnected = ! doClose; +} + +class LongPollingChannel(socket: StreamingSocket) extends StreamingChannel(socket) { +// println("creating longpoll!"); + override def kind = ChannelType.LongPolling; + + override def shouldHandshake(req: HttpServletRequest, res: HttpServletResponse) = + req.getParameter("new") == "yes"; + + override def sendHandshake(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) { +// println("sending handshake"); + out.append(controlMessage("ok")); + } + + override def suspendIfNecessary(req: HttpServletRequest, res: HttpServletResponse, + out: StringBuilder, ep: KnowsAboutDispatch) { + if (out.length == 0) { +// println("suspending longpoll: "+socket.id); + val to = java.lang.Integer.parseInt(req.getParameter("timeout")); +// println("LongPoll scheduling keepalive for: "+to); + scheduleKeepalive(to); + ep.undispatch(); + c.get.suspend(0); + } + } + + override def writeAndFlush(req: HttpServletRequest, res: HttpServletResponse, + out: StringBuilder, ep: KnowsAboutDispatch) { + if (out.length > 0) { +// println("Writing to "+socket.id+": "+out.toString); +// println("writing and flushing longpoll") + val ec = req.getAttribute("executionContext").asInstanceOf[ExecutionContext]; + for ((k, v) <- Util.noCacheHeaders) { ec.response.setHeader(k, v); } // maybe this will help with proxies? +// println("writing: "+out); + ec.response.write(out.toString); + socket.synchronized { + socket.hiccup(this); + c = None; + } + } + } + + override def handleNewConnection(req: HttpServletRequest, res: HttpServletResponse, out: StringBuilder) { + socket.revive(this); + req.setAttribute("StreamingSocketServlet_channel", this); + } + + override def isConnected = socket.synchronized { + c.isDefined; + } +} + +class StreamingSocketServlet extends HttpServlet { + val version = 2; + + override def doGet(req: HttpServletRequest, res: HttpServletResponse) { +// describeRequest(req); + val ec = req.getAttribute("executionContext").asInstanceOf[ExecutionContext]; + try { + if (req.getPathInfo() == "/js/client.js") { + val contextPath = config.transportPrefix; + val acceptableTransports = Comet.acceptableTransports; + ec.response.setContentType("application/x-javascript"); + ec.response.write(Comet.clientCode(contextPath, acceptableTransports)); + } else if (req.getPathInfo() == "/xhrXdFrame") { + ec.response.setContentType("text/html; charset=utf-8"); + ec.response.write(Comet.frameCode); + } else { + val v = req.getParameter("v"); + if (v == null || java.lang.Integer.parseInt(v) != version) { + res.sendError(HttpServletResponse.SC_BAD_REQUEST, "bad version number!"); + return; + } + val existingChannel = req.getAttribute("StreamingSocketServlet_channel"); + if (existingChannel != null) { + existingChannel.asInstanceOf[Channel].handle(req, res); + } else { + val socketId = req.getParameter("id"); + val channelType = req.getParameter("channel"); + val isNew = req.getParameter("new") == "yes"; + val shouldCreateSocket = req.getParameter("create") == "yes"; + val subType = req.getParameter("type"); + val channel = SocketManager(socketId, shouldCreateSocket).map(_.channel(channelType, isNew, subType)).getOrElse(None); + if (channel.isDefined) { + channel.get.handle(req, res); + } else { + streaminglog(Map( + "type" -> "event", + "event" -> "restart-failure", + "connection" -> socketId)); + val failureChannel = ChannelType.valueOf(channelType).map(Channels.createNew(_, null, subType)); + if (failureChannel.isDefined) { + failureChannel.get.sendRestartFailure(ec); + } else { + ec.response.setStatusCode(HttpServletResponse.SC_NOT_FOUND); + ec.response.write("So such socket, and/or unknown channel type: "+channelType); + } + } + } + } + } catch { + case e: RetryRequest => throw e; + case t: Throwable => { + exceptionlog("A comet error occurred: "); + exceptionlog(t); + ec.response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + ec.response.write(t.getMessage()); + } + } + } + + def describeRequest(req: HttpServletRequest) { + println(req.getMethod+" on "+req.getRequestURI()+"?"+req.getQueryString()); + for (pname <- + req.getParameterNames.asInstanceOf[java.util.Enumeration[String]]) { + println(" "+pname+" -> "+req.getParameterValues(pname).mkString("[", ",", "]")); + } + } + + override def doPost(req: HttpServletRequest, res: HttpServletResponse) { + val v = req.getParameter("v"); + if (v == null || java.lang.Integer.parseInt(v) != version) { + res.sendError(HttpServletResponse.SC_BAD_REQUEST, "bad version number!"); + return; + } + val ec = req.getAttribute("executionContext").asInstanceOf[ExecutionContext]; + val socketId = req.getParameter("id"); + val socket = SocketManager(socketId, false); + +// describeRequest(req); + + if (socket.isEmpty) { + ec.response.write("restart-fail"); + streaminglog(Map( + "type" -> "event", + "event" -> "restart-failure", + "connection" -> socketId)); +// println("socket restart-fail: "+socketId); + } else { + val seq = java.lang.Integer.parseInt(req.getParameter("seq")); + socket.get.updateConfirmedSeqNumber(null, seq); + val messages = req.getParameterValues("m"); + val controlMessages = req.getParameterValues("oob"); + try { + if (messages != null) + for (msg <- messages) socket.get.receiveMessage(msg, req); + if (controlMessages != null) + for (msg <- controlMessages) { +// println("Control message from "+socket.get.id+": "+msg); + msg match { + case "hiccup" => { + streaminglog(Map( + "type" -> "event", + "event" -> "hiccup", + "connection" -> socketId)); + socket.get.prepareForReconnect(); + } + case _ => { + if (msg.startsWith("useChannel")) { + val msgParts = msg.split(":"); + socket.get.useChannel(java.lang.Integer.parseInt(msgParts(1)), msgParts(2), req); + } else if (msg.startsWith("kill")) { + socket.get.kill("client request: "+msg.substring(Math.min(msg.length, "kill:".length))); + } else { + streaminglog(Map( + "type" -> "error", + "error" -> "unknown control message", + "connection" -> socketId, + "message" -> msg)); + } + } + } + } + ec.response.write("ok"); + } catch { + case e: SocketManager.HandlerException => { + exceptionlog(e); + ec.response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + ec.response.write(e.getMessage()); + // log these? + } + case t: Throwable => { + // shouldn't happen... + exceptionlog(t); + ec.response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + ec.response.write(t.getMessage()); + } + } + } + } +} diff --git a/infrastructure/net.appjet.ajstdlib/timer.scala b/infrastructure/net.appjet.ajstdlib/timer.scala new file mode 100644 index 0000000..dac8fb6 --- /dev/null +++ b/infrastructure/net.appjet.ajstdlib/timer.scala @@ -0,0 +1,85 @@ +/** + * Copyright 2009 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS-IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.appjet.ajstdlib; + +import scala.collection.mutable.{HashMap,ListBuffer}; +import java.util.concurrent.locks.ReentrantLock; + +object timer { + + var _timings = new HashMap[String,ListBuffer[double]]; + var _lock = new ReentrantLock; + var _callstack = new ThreadLocal[ListBuffer[String]]; + + def start(opname: String) = { + var _localcallstack = _callstack.get(); + if (_localcallstack == null) { + _callstack.set(new ListBuffer[String]); + _localcallstack = _callstack.get(); + } + _localcallstack += opname; + var _oplabel = _localcallstack.mkString("."); + val startTime: long = System.nanoTime(); + + new { + def done() { + val elapsedTimeMs: double = (System.nanoTime() - startTime) / 1.0e6; + + _lock.lock(); + try { + var times = _timings.getOrElse(_oplabel, new ListBuffer[double]); + /* + if (times.size > 100000) { + times = new ListBuffer[double]; + }*/ + times += elapsedTimeMs; + _timings.put(_oplabel, times); + _localcallstack.remove(_localcallstack.length-1); + } finally { + _lock.unlock(); + } + } + } + } + + def getOpNames(): Array[String] = { + _lock.lock(); + try { + return _timings.keys.toList.toArray; + } finally { + _lock.unlock(); + } + } + + def getStats(opname: String): Array[double] = { + _lock.lock(); + + try { + var times:ListBuffer[double] = _timings(opname); + var total = times.foldRight(0.0)(_ + _); + return Array(times.size, total, (total / times.size)); + } finally { + _lock.unlock(); + } + } + + def reset() { + _lock.lock(); + _timings = new HashMap[String,ListBuffer[double]]; + _lock.unlock(); + } +} |