File: utils/GridSQL.js
"use strict";
var Promise = require("bluebird");
var debug = require("debug")("app:utils/GridSQL");
var through2 = require("through2");
/**
* MongoDB's GridFS inspired data storage for Knex. The given file is saved to
* multiple binary columns in order to enable proper file streaming.
*
* GridSQL works like a key value store. All files must be written with an
* unique file id.
*
* @namespace utils
* @class GridSQL
* @constructor
* @param {Object} options
* @param {Object} options.knex Knex instance
* @param {Object} [options.chunkSize=(1024*255)] Chunk size
* @param {Object} [options.tableName=chunks] Table name
*/
function GridSQL(options) {
if (!(this instanceof GridSQL)) return new GridSQL(options);
this.knex = options && options.knex;
this.tableName = (options && options.tableName) || "chunks";
this.chunkSize = (options && options.chunkSize) || (1024 * 255);
if (!this.knex) throw new Error("Missing options.knex");
}
/**
* Read file as a node.js readable stream.
*
* This method has a little bug https://github.com/tgriesser/knex/issues/484
*
* @method read
* @param {String} fileId
* @return {stream.Readable}
*/
GridSQL.prototype.read = function(fileId) {
debug("reading %s", fileId);
var rowStream = this.knex.select("chunk")
.from(this.tableName)
.where({ fileId: fileId })
.orderBy("sequence", "asc")
.stream({highWaterMark: 5});
return rowStream.pipe(through2.obj(function (row, enc, cb) {
if (!row || !Buffer.isBuffer(row.chunk)) {
var err = new Error("Invalid GridSQL row");
err.row = row;
return cb(err);
}
debug("Reading chunk %s byte chunk", row.chunk.length);
this.push(row.chunk);
cb();
}));
};
/**
* Write node.js readable stream database with a unique file id
*
* @method write
* @param {String} fileId
* @param {stream.Writable} readable
* @param {Object} [options]
* @param {Object} [options.chunkSize] Custom chunks size for this file
* @return {Bluebird.Promise} The returned promise is resolved when the
* readable stream is fully saved to the database
*/
GridSQL.prototype.write = function(fileId, readable, options) {
var knex = this.knex;
var tableName = this.tableName;
debug("Writing %s", fileId);
var tableChunkSize = (options && options.chunkSize) || this.chunkSize;
if (!knex) throw new Error("Missing options.knex");
var sequence = 0;
var bytesWritten = 0;
return knex.transaction(function(t) {
var current = Promise.resolve();
function read() {
if (current.isPending()) return;
var chunk = readable.read(tableChunkSize);
if (chunk === null) return;
sequence += 1;
bytesWritten += chunk.length;
debug(
"Got %s chunk of %s (%s) of bytes for %s",
sequence, chunk.length, bytesWritten, fileId
);
current = t.insert({
fileId: fileId,
chunk: chunk,
sequence: sequence
})
.into(tableName)
.then(function() {
debug("Chunk %s written for %s", sequence, fileId);
process.nextTick(read);
});
}
readable.on("readable", read);
read();
return new Promise(function(resolve, reject){
readable.on("error", reject);
readable.on("end", resolve);
})
.then(function() {
return current;
});
})
.then(function() {
return {
fileId: fileId,
stream: readable,
bytesWritten: bytesWritten,
chunkCount: sequence
};
});
};
module.exports = GridSQL;