/**
* Copyright 2009 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS-IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import("execution");
import("profiler");
import("etherpad.pad.model");
import("etherpad.pad.model.accessPadGlobal");
import("etherpad.log");
import("etherpad.utils");
jimport("net.appjet.oui.exceptionlog");
jimport("java.util.concurrent.ConcurrentHashMap");
jimport("java.lang.System.out.println");
var MIN_WRITE_INTERVAL_MS = 2000; // 2 seconds
var MIN_WRITE_DELAY_NOTIFY_MS = 2000; // 2 seconds
var AGE_FOR_PAD_FLUSH_MS = 5*60*1000; // 5 minutes
var DBUNWRITABLE_WRITE_DELAY_MS = 30*1000; // 30 seconds
// state is { constant: true }, { constant: false }, { trueAfter: timeInMs }
function setWritableState(state) {
_dbwriter().dbWritable = state;
}
function getWritableState() {
return _dbwriter().dbWritable;
}
function isDBWritable() {
return _isDBWritable();
}
function _isDBWritable() {
var state = _dbwriter().dbWritable;
if (typeof state != "object") {
return true;
}
else if (state.constant !== undefined) {
return !! state.constant;
}
else if (state.trueAfter !== undefined) {
return (+new Date()) > state.trueAfter;
}
else return true;
}
function getWritableStateDescription(state) {
var v = _isDBWritable();
var restOfMessage = "";
if (state.trueAfter !== undefined) {
var now = +new Date();
var then = state.trueAfter;
var diffSeconds = java.lang.String.format("%.1f", Math.abs(now - then)/1000);
if (now < then) {
restOfMessage = " until "+diffSeconds+" seconds from now";
}
else {
restOfMessage = " since "+diffSeconds+" seconds ago";
}
}
return v+restOfMessage;
}
function _dbwriter() {
return appjet.cache.dbwriter;
}
function onStartup() {
appjet.cache.dbwriter = {};
var dbwriter = _dbwriter();
dbwriter.pendingWrites = new ConcurrentHashMap();
dbwriter.scheduledFor = new ConcurrentHashMap(); // padId --> long
dbwriter.dbWritable = { constant: true };
execution.initTaskThreadPool("dbwriter", 4);
// we don't wait for scheduled tasks in the infreq pool to run and complete
execution.initTaskThreadPool("dbwriter_infreq", 1);
_scheduleCheckForStalePads();
}
function _scheduleCheckForStalePads() {
execution.scheduleTask("dbwriter_infreq", "checkForStalePads", AGE_FOR_PAD_FLUSH_MS, []);
}
function onShutdown() {
log.info("Doing final DB writes before shutdown...");
var success = execution.shutdownAndWaitOnTaskThreadPool("dbwriter", 10000);
if (! success) {
log.warn("ERROR! DB WRITER COULD NOT SHUTDOWN THREAD POOL!");
}
}
function _logException(e) {
var exc = utils.toJavaException(e);
log.warn("writeAllToDB: Error writing to SQL! Written to exceptions.log: "+exc);
log.logException(exc);
exceptionlog.apply(exc);
}
function taskFlushPad(padId, reason) {
var dbwriter = _dbwriter();
if (! _isDBWritable()) {
// DB is unwritable, delay
execution.scheduleTask("dbwriter_infreq", "flushPad", DBUNWRITABLE_WRITE_DELAY_MS, [padId, reason]);
return;
}
model.accessPadGlobal(padId, function(pad) {
writePadNow(pad, true);
}, "r");
log.info("taskFlushPad: flushed "+padId+(reason?(" (reason: "+reason+")"):''));
}
function taskWritePad(padId) {
var dbwriter = _dbwriter();
if (! _isDBWritable()) {
// DB is unwritable, delay
dbwriter.scheduledFor.put(padId, (+(new Date)+DBUNWRITABLE_WRITE_DELAY_MS));
execution.scheduleTask("dbwriter", "writePad", DBUNWRITABLE_WRITE_DELAY_MS, [padId]);
return;
}
profiler.reset();
var t1 = profiler.rcb("lock wait");
model.accessPadGlobal(padId, function(pad) {
t1();
_dbwriter().pendingWrites.remove(padId); // do this first
var success = false;
try {
var t2 = profiler.rcb("write");
writePadNow(pad);
t2();
success = true;
}
finally {
if (! success) {
log.warn("DB WRITER FAILED TO WRITE PAD: "+padId);
}
profiler.print();
}
}, "r");
}
function taskCheckForStalePads() {
// do this first
_scheduleCheckForStalePads();
if (! _isDBWritable()) return;
// get "active" pads into an array
var padIter = appjet.cache.pads.meta.keySet().iterator();
var padList = [];
while (padIter.hasNext()) { padList.push(padIter.next()); }
var numStale = 0;
for (var i = 0; i < padList.length; i++) {
if (! _isDBWritable()) break;
var p = padList[i];
if (model.isPadLockHeld(p)) {
// skip it, don't want to lock up stale pad flusher
}
else {
accessPadGlobal(p, function(pad) {
if (pad.exists()) {
var padAge = (+new Date()) - pad._meta.status.lastAccess;
if (padAge > AGE_FOR_PAD_FLUSH_MS) {
writePadNow(pad, true);
numStale++;
}
}
}, "r");
}
}
log.info("taskCheckForStalePads: flushed "+numStale+" stale pads");
}
function notifyPadDirty(padId) {
var dbwriter = _dbwriter();
if (! dbwriter.pendingWrites.containsKey(padId)) {
dbwriter.pendingWrites.put(padId, "pending");
dbwriter.scheduledFor.put(padId, (+(new Date)+MIN_WRITE_INTERVAL_MS));
execution.scheduleTask("dbwriter", "writePad", MIN_WRITE_INTERVAL_MS, [padId]);
}
}
function scheduleFlushPad(padId, reason) {
execution.scheduleTask("dbwriter_infreq", "flushPad", 0, [padId, reason]);
}
/*function _dbwriterLoopBody(executor) {
try {
var info = writeAllToDB(executor);
if (!info.boring) {
log.info("DB writer: "+info.toSource());
}
java.lang.Thread.sleep(Math.max(0, MIN_WRITE_INTERVAL_MS - info.elapsed));
}
catch (e) {
_logException(e);
java.lang.Thread.sleep(MIN_WRITE_INTERVAL_MS);
}
}
function _startInThread(name, func) {
(new Thread(new Runnable({
run: function() {
func();
}
}), name)).start();
}
function killDBWriterThreadAndWait() {
appjet.cache.abortDBWriter = true;
while (appjet.cache.runningDBWriter) {
java.lang.Thread.sleep(100);
}
}*/
/*function writeAllToDB(executor, andFlush) {
if (!executor) {
executor = new ScheduledThreadPoolExecutor(NUM_WRITER_THREADS);
}
profiler.reset();
var startWriteTime = profiler.time();
var padCount = new AtomicInteger(0);
var writeCount = new AtomicInteger(0);
var removeCount = new AtomicInteger(0);
// get pads into an array
var padIter = appjet.cache.pads.meta.keySet().iterator();
var padList = [];
while (padIter.hasNext()) { padList.push(padIter.next()); }
var latch = new CountDownLatch(padList.length);
for (var i = 0; i < padList.length; i++) {
_spawnCall(executor, function(p) {
try {
var padWriteResult = {};
accessPadGlobal(p, function(pad) {
if (pad.exists()) {
padCount.getAndIncrement();
padWriteResult = writePad(pad, andFlush);
if (padWriteResult.didWrite) writeCount.getAndIncrement();
if (padWriteResult.didRemove) removeCount.getAndIncrement();
}
}, "r");
} catch (e) {
_logException(e);
} finally {
latch.countDown();
}
}, padList[i]);
}
// wait for them all to finish
latch.await();
var endWriteTime = profiler.time();
var elapsed = Math.round((endWriteTime - startWriteTime)/1000)/1000;
var interesting = (writeCount.get() > 0 || removeCount.get() > 0);
var obj = {padCount:padCount.get(), writeCount:writeCount.get(), elapsed:elapsed, removeCount:removeCount.get()};
if (! interesting) obj.boring = true;
if (interesting) {
profiler.record("writeAll", profiler.time()-startWriteTime);
profiler.print();
}
return obj;
}*/
function writePadNow(pad, andFlush) {
var didWrite = false;
var didRemove = false;
if (pad.exists()) {
var dbUpToDate = false;
if (pad._meta.status.dirty) {
/*log.info("Writing pad "+pad.getId());*/
pad._meta.status.dirty = false;
//var t1 = +new Date();
pad.writeToDB();
//var t2 = +new Date();
didWrite = true;
//log.info("Wrote pad "+pad.getId()+" in "+(t2-t1)+" ms.");
var now = +(new Date);
var sched = _dbwriter().scheduledFor.get(pad.getId());
if (sched) {
var delay = now - sched;
if (delay > MIN_WRITE_DELAY_NOTIFY_MS) {
log.warn("dbwriter["+pad.getId()+"] behind schedule by "+delay+"ms");
}
_dbwriter().scheduledFor.remove(pad.getId());
}
}
if (andFlush) {
// remove from cache
model.removeFromMemory(pad);
didRemove = true;
}
}
return {didWrite:didWrite, didRemove:didRemove};
}
/*function _spawnCall(executor, func, varargs) {
var args = Array.prototype.slice.call(arguments, 2);
var that = this;
executor.schedule(new Runnable({
run: function() {
func.apply(that, args);
}
}), 0, TimeUnit.MICROSECONDS);
}*/