/******************************************************************************* * Copyright (c) 2013 IBM Corp. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Andrew Banks - initial API and implementation and initial documentation *******************************************************************************/ // Only expose a single object name in the global namespace. // Everything must go through this module. Global Messaging module // only has a single public function, client, which returns // a Messaging client object given connection details. /** * @namespace Messaging * Send and receive messages using web browsers. *
* This programming interface lets a JavaScript client application use the MQTT V3.1 protocol to * connect to an MQTT-supporting messaging server. * * The function supported includes: *
*
* The Messaging.Message object. This encapsulates the payload of the message along with various attributes * associated with its delivery, in particular the destination to which it has been (or is about to be) sent. *
* The programming interface validates parameters passed to it, and will throw an Error containing an error message * intended for developer use, if it detects an error with any parameter. *
* Example:
*
*
*
client = new Messaging.Client(location.hostname, Number(location.port), "clientId");
client.onConnectionLost = onConnectionLost;
client.onMessageArrived = onMessageArrived;
client.connect({onSuccess:onConnect});
function onConnect() {
// Once a connection has been made, make a subscription and send a message.
console.log("onConnect");
client.subscribe("/World");
message = new Messaging.Message("Hello");
message.destinationName = "/World";
client.send(message);
};
function onConnectionLost(responseObject) {
if (responseObject.errorCode !== 0)
console.log("onConnectionLost:"+responseObject.errorMessage);
};
function onMessageArrived(message) {
console.log("onMessageArrived:"+message.payloadString);
client.disconnect();
};
*
* Other programming languages,
* Java,
* C.
*/
Messaging = (function (global) {
// Private variables below, these are only visible inside the function closure
// which is used to define the module.
var version = "0.0.0.0";
var buildLevel = "@BUILDLEVEL@";
/**
* Unique message type identifiers, with associated
* associated integer values.
* @private
*/
var MESSAGE_TYPE = {
CONNECT: 1,
CONNACK: 2,
PUBLISH: 3,
PUBACK: 4,
PUBREC: 5,
PUBREL: 6,
PUBCOMP: 7,
SUBSCRIBE: 8,
SUBACK: 9,
UNSUBSCRIBE: 10,
UNSUBACK: 11,
PINGREQ: 12,
PINGRESP: 13,
DISCONNECT: 14
};
// Collection of utility methods used to simplify module code
// and promote the DRY pattern.
/**
* Validate an object's parameter names to ensure they
* match a list of expected variables name for this option
* type. Used to ensure option object passed into the API don't
* contain erroneous parameters.
* @param {Object} obj User options object
* @param {key:type, key2:type, ...} valid keys and types that may exist in obj.
* @throws {Error} Invalid option parameter found.
* @private
*/
var validate = function(obj, keys) {
for(key in obj) {
if (obj.hasOwnProperty(key)) {
if (keys.hasOwnProperty(key)) {
if (typeof obj[key] !== keys[key])
throw new Error(format(ERROR.INVALID_TYPE, [typeof obj[key], key]));
} else {
var errorStr = "Unknown property, " + key + ". Valid properties are:";
for (key in keys)
if (keys.hasOwnProperty(key))
errorStr = errorStr+" "+key;
throw new Error(errorStr);
}
}
}
};
/**
* Return a new function which runs the user function bound
* to a fixed scope.
* @param {function} User function
* @param {object} Function scope
* @return {function} User function bound to another scope
* @private
*/
var scope = function (f, scope) {
return function () {
return f.apply(scope, arguments);
};
};
/**
* Unique message type identifiers, with associated
* associated integer values.
* @private
*/
var ERROR = {
OK: {code:0, text:"AMQJSC0000I OK."},
CONNECT_TIMEOUT: {code:1, text:"AMQJSC0001E Connect timed out."},
SUBSCRIBE_TIMEOUT: {code:2, text:"AMQJS0002E Subscribe timed out."},
UNSUBSCRIBE_TIMEOUT: {code:3, text:"AMQJS0003E Unsubscribe timed out."},
PING_TIMEOUT: {code:4, text:"AMQJS0004E Ping timed out."},
INTERNAL_ERROR: {code:5, text:"AMQJS0005E Internal error."},
CONNACK_RETURNCODE: {code:6, text:"AMQJS0006E Bad Connack return code:{0} {1}."},
SOCKET_ERROR: {code:7, text:"AMQJS0007E Socket error:{0}."},
SOCKET_CLOSE: {code:8, text:"AMQJS0008I Socket closed."},
MALFORMED_UTF: {code:9, text:"AMQJS0009E Malformed UTF data:{0} {1} {2}."},
UNSUPPORTED: {code:10, text:"AMQJS0010E {0} is not supported by this browser."},
INVALID_STATE: {code:11, text:"AMQJS0011E Invalid state {0}."},
INVALID_TYPE: {code:12, text:"AMQJS0012E Invalid type {0} for {1}."},
INVALID_ARGUMENT: {code:13, text:"AMQJS0013E Invalid argument {0} for {1}."},
UNSUPPORTED_OPERATION: {code:14, text:"AMQJS0014E Unsupported operation."},
INVALID_STORED_DATA: {code:15, text:"AMQJS0015E Invalid data in local storage key={0} value={1}."},
INVALID_MQTT_MESSAGE_TYPE: {code:16, text:"AMQJS0016E Invalid MQTT message type {0}."},
MALFORMED_UNICODE: {code:17, text:"AMQJS0017E Malformed Unicode string:{0} {1}."},
};
/** CONNACK RC Meaning. */
var CONNACK_RC = {
0:"Connection Accepted",
1:"Connection Refused: unacceptable protocol version",
2:"Connection Refused: identifier rejected",
3:"Connection Refused: server unavailable",
4:"Connection Refused: bad user name or password",
5:"Connection Refused: not authorized"
};
/**
* Format an error message text.
* @private
* @param {error} ERROR.KEY value above.
* @param {substitutions} [array] substituted into the text.
* @return the text with the substitutions made.
*/
var format = function(error, substitutions) {
var text = error.text;
if (substitutions) {
for (var i=0; i
* Most applications will create just one Client object and then call its connect() method,
* however applications can create more than one Client object if they wish.
* In this case the combination of host, port and clientId attributes must be different for each Client object.
*
* The send, subscribe and unsubscribe methods are implemented as asynchronous JavaScript methods
* (even though the underlying protocol exchange might be synchronous in nature).
* This means they signal their completion by calling back to the application,
* via Success or Failure callback functions provided by the application on the method in question.
* Such callbacks are called at most once per method invocation and do not persist beyond the lifetime
* of the script that made the invocation.
*
* In contrast there are some callback functions most notably onMessageArrived
* that are defined on the Messaging.Client object.
* These may get called multiple times, and aren't directly related to specific method invocations made by the client.
*
* @name Messaging.Client
*
* @constructor
* Creates a Messaging.Client object that can be used to communicate with a Messaging server.
*
* @param {string} host the address of the messaging server, as a DNS name or dotted decimal IP address.
* @param {number} port the port number in the host to connect to.
* @param {string} clientId the Messaging client identifier, between 1 and 23 characters in length.
*
* @property {string} host read only the server's DNS hostname or dotted decimal IP address.
* @property {number} port read only the server's port.
* @property {string} clientId read only used when connecting to the server.
* @property {function} onConnectionLost called when a connection has been lost,
* after a connect() method has succeeded.
* Establish the call back used when a connection has been lost. The connection may be
* lost because the client initiates a disconnect or because the server or network
* cause the client to be disconnected. The disconnect call back may be called without
* the connectionComplete call back being invoked if, for example the client fails to
* connect.
* A single response object parameter is passed to the onConnectionLost callback containing the following fields:
*
* Properties of the connect options are:
* @config {number} [timeout] If the connect has not succeeded within this number of seconds, it is deemed to have failed.
* The default is 30 seconds.
* @config {string} [userName] Authentication username for this connection.
* @config {string} [password] Authentication password for this connection.
* @config {Messaging.Message} [willMessage] sent by the server when the client disconnects abnormally.
* @config {Number} [keepAliveInterval] the server disconnects this client if there is no activity for this
* number of seconds. The default value of 60 seconds is assumed if not set.
* @config {boolean} [cleanSession] if true(default) the client and server persistent state is deleted on successful connect.
* @config {boolean} [useSSL] if present and true, use an SSL Websocket connection.
* @config {object} [invocationContext] passed to the onSuccess callback or onFailure callback.
* @config {function} [onSuccess] called when the connect acknowledgement has been received from the server.
* A single response object parameter is passed to the onSuccess callback containing the following fields:
*
* @config {number} [qos] the maiximum qos of any publications sent as a result of making this subscription.
* @config {object} [invocationContext] passed to the onSuccess callback or onFailure callback.
* @config {function} [onSuccess] called when the subscribe acknowledgement has been received from the server.
* A single response object parameter is passed to the onSuccess callback containing the following fields:
*
* @config {object} [invocationContext] passed to the onSuccess callback or onFailure callback.
* @config {function} [onSuccess] called when the unsubscribe acknowledgement has been receive dfrom the server.
* A single response object parameter is passed to the onSuccess callback containing the following fields:
*
* Other programming languages,
* Java,
* C.
*
* All attributes may be null, which implies the default values.
*
* @name Messaging.Message
* @constructor
* @param {String|ArrayBuffer} payload The message data to be sent.
*
* @property {string} payloadString read only The payload as a string if the payload consists of valid UTF-8 characters.
* @property {ArrayBuffer} payloadBytes read only The payload as an ArrayBuffer.
*
* @property {string} destinationName mandatory The name of the destination to which the message is to be sent
* (for messages about to be sent) or the name of the destination from which the message has been received.
* (for messages received by the onMessage function).
*
* @property {number} qos The Quality of Service used to deliver the message.
*
* @property {Boolean} retained If true, the message is to be retained by the server and delivered
* to both current and future subscriptions.
* If false the server only delivers the message to current subscribers, this is the default for new Messages.
* A received message has the retained boolean set to true if the message was published
* with the retained boolean set to true
* and the subscrption was made after the message has been published.
*
* @property {Boolean} duplicate read only If true, this message might be a duplicate of one which has already been received.
* This is only set on messages received from the server.
*
*/
var Message = function (newPayload) {
var payload;
if ( typeof newPayload === "string"
|| newPayload instanceof ArrayBuffer
|| newPayload instanceof Int8Array
|| newPayload instanceof Uint8Array
|| newPayload instanceof Int16Array
|| newPayload instanceof Uint16Array
|| newPayload instanceof Int32Array
|| newPayload instanceof Uint32Array
|| newPayload instanceof Float32Array
|| newPayload instanceof Float64Array
) {
payload = newPayload;
} else {
throw (format(ERROR.INVALID_ARGUMENT, [newPayload, "newPayload"]));
}
this._getPayloadString = function () {
if (typeof payload === "string")
return payload;
else
return parseUTF8(payload, 0, payload.length);
};
this._getPayloadBytes = function() {
if (typeof payload === "string") {
var buffer = new ArrayBuffer(UTF8Length(payload));
var byteStream = new Uint8Array(buffer);
stringToUTF8(payload, byteStream, 0);
return byteStream;
} else {
return payload;
};
};
var destinationName = undefined;
this._getDestinationName = function() { return destinationName; };
this._setDestinationName = function(newDestinationName) {
if (typeof newDestinationName === "string")
destinationName = newDestinationName;
else
throw new Error(format(ERROR.INVALID_ARGUMENT, [newDestinationName, "newDestinationName"]));
};
var qos = 0;
this._getQos = function() { return qos; };
this._setQos = function(newQos) {
if (newQos === 0 || newQos === 1 || newQos === 2 )
qos = newQos;
else
throw new Error("Invalid argument:"+newQos);
};
var retained = false;
this._getRetained = function() { return retained; };
this._setRetained = function(newRetained) {
if (typeof newRetained === "boolean")
retained = newRetained;
else
throw new Error(format(ERROR.INVALID_ARGUMENT, [newRetained, "newRetained"]));
};
var duplicate = false;
this._getDuplicate = function() { return duplicate; };
this._setDuplicate = function(newDuplicate) { duplicate = newDuplicate; };
};
Message.prototype = {
get payloadString() { return this._getPayloadString(); },
get payloadBytes() { return this._getPayloadBytes(); },
get destinationName() { return this._getDestinationName(); },
set destinationName(newDestinationName) { this._setDestinationName(newDestinationName); },
get qos() { return this._getQos(); },
set qos(newQos) { this._setQos(newQos); },
get retained() { return this._getRetained(); },
set retained(newRetained) { this._setRetained(newRetained); },
get duplicate() { return this._getDuplicate(); },
set duplicate(newDuplicate) { this._setDuplicate(newDuplicate); }
};
// Module contents.
return {
Client: Client,
Message: Message
};
})(window);
*
* @property {function} onMessageDelivered called when a message has been delivered.
* All processing that this Client will ever do has been completed. So, for example,
* in the case of a Qos=2 message sent by this client, the PubComp flow has been received from the server
* and the message has been removed from persistent storage before this callback is invoked.
* Parameters passed to the onMessageDelivered callback are:
*
*
* @property {function} onMessageArrived called when a message has arrived in this Messaging.client.
* Parameters passed to the onMessageArrived callback are:
*
*
*/
var Client = function (host, port, clientId) {
if (typeof host !== "string")
throw new Error(format(ERROR.INVALID_TYPE, [typeof host, "host"]));
if (typeof port !== "number" || port < 0)
throw new Error(format(ERROR.INVALID_TYPE, [typeof port, "port"]));
var clientIdLength = 0;
for (var i = 0; i
*
* @config {function} [onFailure] called when the connect request has failed or timed out.
* A single response object parameter is passed to the onFailure callback containing the following fields:
*
*
* @config {Array} [hosts] If present this set of hostnames is tried in order in place
* of the host and port paramater on the construtor. The hosts and the matching ports are tried one at at time in order until
* one of then succeeds.
* @config {Array} [ports] If present this set of ports matching the hosts.
* @throws {InvalidState} if the client is not in disconnected state. The client must have received connectionLost
* or disconnected before calling connect for a second or subsequent time.
*/
this.connect = function (connectOptions) {
connectOptions = connectOptions || {} ;
validate(connectOptions, {timeout:"number",
userName:"string",
password:"string",
willMessage:"object",
keepAliveInterval:"number",
cleanSession:"boolean",
useSSL:"boolean",
invocationContext:"object",
onSuccess:"function",
onFailure:"function",
hosts:"object",
ports:"object"});
// If no keep alive interval is set, assume 60 seconds.
if (connectOptions.keepAliveInterval === undefined)
connectOptions.keepAliveInterval = 60;
if (connectOptions.willMessage) {
if (!(connectOptions.willMessage instanceof Message))
throw new Error(format(ERROR.INVALID_TYPE, [connectOptions.willMessage, "connectOptions.willMessage"]));
// The will message must have a payload that can be represented as a string.
// Cause the willMessage to throw an exception if this is not the case.
connectOptions.willMessage.stringPayload;
if (typeof connectOptions.willMessage.destinationName === "undefined")
throw new Error(format(ERROR.INVALID_TYPE, [typeof connectOptions.willMessage.destinationName, "connectOptions.willMessage.destinationName"]));
}
if (typeof connectOptions.cleanSession === "undefined")
connectOptions.cleanSession = true;
if (connectOptions.hosts) {
if (!connectOptions.ports)
throw new Error(format(ERROR.INVALID_ARGUMENT, [connectOptions.ports, "connectOptions.ports"]));
if (!(connectOptions.hosts instanceof Array) )
throw new Error(format(ERROR.INVALID_ARGUMENT, [connectOptions.hosts, "connectOptions.hosts"]));
if (!(connectOptions.ports instanceof Array) )
throw new Error(format(ERROR.INVALID_ARGUMENT, [connectOptions.ports, "connectOptions.ports"]));
if (connectOptions.hosts.length <1 )
throw new Error(format(ERROR.INVALID_ARGUMENT, [connectOptions.hosts, "connectOptions.hosts"]));
if (connectOptions.hosts.length != connectOptions.ports.length)
throw new Error(format(ERROR.INVALID_ARGUMENT, [connectOptions.ports, "connectOptions.ports"]));
for (var i = 0; i
*
* @config {function} [onFailure] called when the subscribe request has failed or timed out.
* A single response object parameter is passed to the onFailure callback containing the following fields:
*
*
* @config {number} [timeout] which if present determines the number of seconds after which the onFailure calback is called
* the presence of a timeout does not prevent the onSuccess callback from being called when the MQTT Suback is eventually received.
* @throws {InvalidState} if the client is not in connected state.
*/
this.subscribe = function (filter, subscribeOptions) {
if (typeof filter !== "string")
throw new Error("Invalid argument:"+filter);
subscribeOptions = subscribeOptions || {} ;
validate(subscribeOptions, {qos:"number",
invocationContext:"object",
onSuccess:"function",
onFailure:"function",
timeout:"number"
});
if (subscribeOptions.timeout && !subscribeOptions.onFailure)
throw new Error("subscribeOptions.timeout specified with no onFailure callback.");
if (typeof subscribeOptions.qos !== "undefined"
&& !(subscribeOptions.qos === 0 || subscribeOptions.qos === 1 || subscribeOptions.qos === 2 ))
throw new Error(format(ERROR.INVALID_ARGUMENT, [subscribeOptions.qos, "subscribeOptions.qos"]));
client.subscribe(filter, subscribeOptions);
};
/**
* Unsubscribe for messages, stop receiving messages sent to destinations described by the filter.
*
* @name Messaging.Client#unsubscribe
* @function
* @param {string} filter describing the destinations to receive messages from.
* @param {object} [unsubscribeOptions] used to control the subscription, as follows:
*
*
* @config {function} [onFailure] called when the unsubscribe request has failed or timed out.
* A single response object parameter is passed to the onFailure callback containing the following fields:
*
*
* @config {number} [timeout] which if present determines the number of seconds after which the onFailure callback is called, the
* presence of a timeout does not prevent the onSuccess callback from being called when the MQTT UnSuback is eventually received.
* @throws {InvalidState} if the client is not in connected state.
*/
this.unsubscribe = function (filter, unsubscribeOptions) {
if (typeof filter !== "string")
throw new Error("Invalid argument:"+filter);
unsubscribeOptions = unsubscribeOptions || {} ;
validate(unsubscribeOptions, {invocationContext:"object",
onSuccess:"function",
onFailure:"function",
timeout:"number"
});
if (unsubscribeOptions.timeout && !unsubscribeOptions.onFailure)
throw new Error("unsubscribeOptions.timeout specified with no onFailure callback.");
client.unsubscribe(filter, unsubscribeOptions);
};
/**
* Send a message to the consumers of the destination in the Message.
*
* @name Messaging.Client#send
* @function
* @param {Messaging.Message} message to send.
* @throws {InvalidState} if the client is not in connected state.
*/
this.send = function (message) {
if (!(message instanceof Message))
throw new Error("Invalid argument:"+typeof message);
if (typeof message.destinationName === "undefined")
throw new Error("Invalid parameter Message.destinationName:"+message.destinationName);
client.send(message);
};
/**
* Normal disconnect of this Messaging client from its server.
*
* @name Messaging.Client#disconnect
* @function
* @throws {InvalidState} if the client is not in connected or connecting state.
*/
this.disconnect = function () {
client.disconnect();
};
/**
* Get the contents of the trace log.
*
* @name Messaging.Client#getTraceLog
* @function
* @return {Object[]} tracebuffer containing the time ordered trace records.
*/
this.getTraceLog = function () {
return client.getTraceLog();
}
/**
* Start tracing.
*
* @name Messaging.Client#startTrace
* @function
*/
this.startTrace = function () {
client.startTrace();
};
/**
* Stop tracing.
*
* @name Messaging.Client#stopTrace
* @function
*/
this.stopTrace = function () {
client.stopTrace();
};
};
Client.prototype = {
get host() { return this._getHost(); },
set host(newHost) { this._setHost(newHost); },
get port() { return this._getPort(); },
set port(newPort) { this._setPort(newPort); },
get clientId() { return this._getClientId(); },
set clientId(newClientId) { this._setClientId(newClientId); },
get onConnectionLost() { return this._getOnConnectionLost(); },
set onConnectionLost(newOnConnectionLost) { this._setOnConnectionLost(newOnConnectionLost); },
get onMessageDelivered() { return this._getOnMessageDelivered(); },
set onMessageDelivered(newOnMessageDelivered) { this._setOnMessageDelivered(newOnMessageDelivered); },
get onMessageArrived() { return this._getOnMessageArrived(); },
set onMessageArrived(newOnMessageArrived) { this._setOnMessageArrived(newOnMessageArrived); }
};
/**
* An application message, sent or received.
*
*
*