'use strict' const check = require('check-types') const eventify = require('./eventify') const events = require('./events') const JsonStream = require('./jsonstream') const Hoopy = require('hoopy') const promise = require('./promise') const tryer = require('tryer') const DEFAULT_BUFFER_LENGTH = 1024 module.exports = streamify /** * Public function `streamify`. * * Asynchronously serialises a data structure to a stream of JSON * data. Sanely handles promises, buffers, maps and other iterables. * * @param data: The data to transform. * * @option space: Indentation string, or the number of spaces * to indent each nested level by. * * @option promises: 'resolve' or 'ignore', default is 'resolve'. * * @option buffers: 'toString' or 'ignore', default is 'toString'. * * @option maps: 'object' or 'ignore', default is 'object'. * * @option iterables: 'array' or 'ignore', default is 'array'. * * @option circular: 'error' or 'ignore', default is 'error'. * * @option yieldRate: The number of data items to process per timeslice, * default is 16384. * * @option bufferLength: The length of the buffer, default is 1024. * * @option highWaterMark: If set, will be passed to the readable stream constructor * as the value for the highWaterMark option. * * @option Promise: The promise constructor to use, defaults to bluebird. **/ function streamify (data, options = {}) { const emitter = eventify(data, options) const json = new Hoopy(options.bufferLength || DEFAULT_BUFFER_LENGTH) const Promise = promise(options) const space = normaliseSpace(options) let streamOptions const { highWaterMark } = options if (highWaterMark) { streamOptions = { highWaterMark } } const stream = new JsonStream(read, streamOptions) let awaitPush = true let index = 0 let indentation = '' let isEnded let isPaused = false let isProperty let length = 0 let mutex = Promise.resolve() let needsComma emitter.on(events.array, noRacing(array)) emitter.on(events.object, noRacing(object)) emitter.on(events.property, noRacing(property)) emitter.on(events.string, noRacing(string)) emitter.on(events.number, noRacing(value)) emitter.on(events.literal, noRacing(value)) emitter.on(events.endArray, noRacing(endArray)) emitter.on(events.endObject, noRacing(endObject)) emitter.on(events.end, noRacing(end)) emitter.on(events.error, noRacing(error)) emitter.on(events.dataError, noRacing(dataError)) return stream function read () { if (awaitPush) { awaitPush = false if (isEnded) { if (length > 0) { after() } return endStream() } } if (isPaused) { after() } } function after () { if (awaitPush) { return } let i for (i = 0; i < length && ! awaitPush; ++i) { if (! stream.push(json[i + index], 'utf8')) { awaitPush = true } } if (i === length) { index = length = 0 } else { length -= i index += i } } function endStream () { if (! awaitPush) { stream.push(null) } } function noRacing (handler) { return eventData => mutex = mutex.then(() => handler(eventData)) } function array () { return beforeScope() .then(() => addJson('[')) .then(() => afterScope()) } function beforeScope () { return before(true) } function before (isScope) { if (isProperty) { isProperty = false if (space) { return addJson(' ') } return Promise.resolve() } return Promise.resolve() .then(() => { if (needsComma) { if (isScope) { needsComma = false } return addJson(',') } if (! isScope) { needsComma = true } }) .then(() => { if (space && indentation) { return indent() } }) } function addJson (chunk) { if (length + 1 <= json.length) { json[index + length++] = chunk after() return Promise.resolve() } isPaused = true return new Promise(resolve => { const unpause = emitter.pause() tryer({ interval: -10, until () { return length + 1 <= json.length }, pass () { isPaused = false json[index + length++] = chunk resolve() setImmediate(unpause) } }) }) } function indent () { return addJson(`\n${indentation}`) } function afterScope () { needsComma = false if (space) { indentation += space } } function object () { return beforeScope() .then(() => addJson('{')) .then(() => afterScope()) } function property (name) { return before() .then(() => addJson(`"${name}":`)) .then(() => { isProperty = true }) } function string (s) { return value(`"${s}"`) } function value (v) { return before() .then(() => addJson(`${v}`)) } function endArray () { return beforeScopeEnd() .then(() => addJson(']')) .then(() => afterScopeEnd()) } function beforeScopeEnd () { if (space) { indentation = indentation.substr(space.length) return indent() } return Promise.resolve() } function afterScopeEnd () { needsComma = true } function endObject () { return beforeScopeEnd() .then(() => addJson('}')) .then(() => afterScopeEnd()) } function end () { after() isEnded = true endStream() } function error (err) { stream.emit('error', err) } function dataError (err) { stream.emit('dataError', err) } } function normaliseSpace (options) { if (check.positive(options.space)) { return new Array(options.space + 1).join(' ') } if (check.nonEmptyString(options.space)) { return options.space } }