This app provides monitoring and information features for the common freifunk user and the technical stuff of a freifunk community.
Code base is taken from a TUM Practical Course project and added here to see if Freifunk Altdorf can use it.
https://www.freifunk-altdorf.de
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
136 lines
3.0 KiB
136 lines
3.0 KiB
var stream = require("stream"), |
|
constants = require("./constants"), |
|
util = require("util"); |
|
|
|
var ReadableStreamBuffer = module.exports = function(opts) { |
|
var that = this; |
|
|
|
stream.Stream.call(this); |
|
|
|
opts = opts || {}; |
|
var frequency = opts.hasOwnProperty("frequency") ? opts.frequency : constants.DEFAULT_FREQUENCY; |
|
var chunkSize = opts.chunkSize || constants.DEFAULT_CHUNK_SIZE; |
|
var initialSize = opts.initialSize || constants.DEFAULT_INITIAL_SIZE; |
|
var incrementAmount = opts.incrementAmount || constants.DEFAULT_INCREMENT_AMOUNT; |
|
|
|
var size = 0; |
|
var buffer = new Buffer(initialSize); |
|
var encoding = null; |
|
|
|
this.readable = true; |
|
this.writable = false; |
|
|
|
var sendData = function() { |
|
var amount = Math.min(chunkSize, size); |
|
|
|
if (amount > 0) { |
|
var chunk = null; |
|
if(encoding) { |
|
chunk = buffer.toString(encoding, 0, amount); |
|
} |
|
else { |
|
chunk = new Buffer(amount); |
|
buffer.copy(chunk, 0, 0, amount); |
|
} |
|
|
|
that.emit("data", chunk); |
|
|
|
if(amount < buffer.length) |
|
buffer.copy(buffer, 0, amount, size); |
|
size -= amount; |
|
} |
|
|
|
if(size === 0 && !that.readable) { |
|
that.emit("end"); |
|
that.emit("close"); |
|
if (sendData && sendData.interval) { |
|
clearInterval(sendData.interval); |
|
sendData.interval = null; |
|
} |
|
} |
|
}; |
|
|
|
this.size = function() { |
|
return size; |
|
}; |
|
|
|
this.maxSize = function() { |
|
return buffer.length; |
|
}; |
|
|
|
var increaseBufferIfNecessary = function(incomingDataSize) { |
|
if((buffer.length - size) < incomingDataSize) { |
|
var factor = Math.ceil((incomingDataSize - (buffer.length - size)) / incrementAmount); |
|
|
|
var newBuffer = new Buffer(buffer.length + (incrementAmount * factor)); |
|
buffer.copy(newBuffer, 0, 0, size); |
|
buffer = newBuffer; |
|
} |
|
}; |
|
|
|
this.put = function(data, encoding) { |
|
if(!that.readable) return; |
|
|
|
var wasEmpty = size === 0; |
|
if(Buffer.isBuffer(data)) { |
|
increaseBufferIfNecessary(data.length); |
|
data.copy(buffer, size, 0); |
|
size += data.length; |
|
} |
|
else { |
|
data = data + ""; |
|
var dataSizeInBytes = Buffer.byteLength(data); |
|
increaseBufferIfNecessary(dataSizeInBytes); |
|
buffer.write(data, size, encoding || "utf8"); |
|
size += dataSizeInBytes; |
|
} |
|
|
|
if (wasEmpty && size > 0) { |
|
this.emit('readable') |
|
} |
|
|
|
if (!this.isPaused && !frequency) { |
|
while (size > 0) { |
|
sendData(); |
|
} |
|
} |
|
}; |
|
|
|
this.pause = function() { |
|
this.isPaused = true; |
|
if(sendData && sendData.interval) { |
|
clearInterval(sendData.interval); |
|
delete sendData.interval; |
|
} |
|
}; |
|
|
|
this.resume = function() { |
|
this.isPaused = false; |
|
if(sendData && !sendData.interval && frequency > 0) { |
|
sendData.interval = setInterval(sendData, frequency); |
|
} |
|
}; |
|
|
|
this.destroy = function() { |
|
that.emit("end"); |
|
if(sendData.interval) clearInterval(sendData.interval); |
|
sendData = null; |
|
that.readable = false; |
|
that.emit("close"); |
|
}; |
|
|
|
this.destroySoon = function() { |
|
that.readable = false; |
|
if (!sendData.interval) { |
|
that.emit("end"); |
|
that.emit("close"); |
|
} |
|
}; |
|
|
|
this.setEncoding = function(_encoding) { |
|
encoding = _encoding; |
|
}; |
|
|
|
this.resume(); |
|
}; |
|
util.inherits(ReadableStreamBuffer, stream.Stream);
|
|
|