123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791 |
- //
- // Created by Mingliang Chen on 18/6/21.
- // illuspas[a]gmail.com
- // Copyright (c) 2018 Nodemedia. All rights reserved.
- //
-
- const EventEmitter = require('events');
- const Logger = require('./logger');
- const Crypto = require('crypto');
- const Url = require('url');
- const Net = require('net');
- const AMF = require('./amf');
-
- const FLASHVER = "LNX 9,0,124,2";
- const RTMP_OUT_CHUNK_SIZE = 60000;
- const RTMP_PORT = 1935;
-
- const RTMP_HANDSHAKE_SIZE = 1536;
- const RTMP_HANDSHAKE_UNINIT = 0;
- const RTMP_HANDSHAKE_0 = 1;
- const RTMP_HANDSHAKE_1 = 2;
- const RTMP_HANDSHAKE_2 = 3;
-
- const RTMP_PARSE_INIT = 0;
- const RTMP_PARSE_BASIC_HEADER = 1;
- const RTMP_PARSE_MESSAGE_HEADER = 2;
- const RTMP_PARSE_EXTENDED_TIMESTAMP = 3;
- const RTMP_PARSE_PAYLOAD = 4;
-
- const RTMP_CHUNK_HEADER_MAX = 18;
-
- const RTMP_CHUNK_TYPE_0 = 0; // 11-bytes: timestamp(3) + length(3) + stream type(1) + stream id(4)
- const RTMP_CHUNK_TYPE_1 = 1; // 7-bytes: delta(3) + length(3) + stream type(1)
- const RTMP_CHUNK_TYPE_2 = 2; // 3-bytes: delta(3)
- const RTMP_CHUNK_TYPE_3 = 3; // 0-byte
-
- const RTMP_CHANNEL_PROTOCOL = 2;
- const RTMP_CHANNEL_INVOKE = 3;
- const RTMP_CHANNEL_AUDIO = 4;
- const RTMP_CHANNEL_VIDEO = 5;
- const RTMP_CHANNEL_DATA = 6;
-
- const rtmpHeaderSize = [11, 7, 3, 0];
-
-
- /* Protocol Control Messages */
- const RTMP_TYPE_SET_CHUNK_SIZE = 1;
- const RTMP_TYPE_ABORT = 2;
- const RTMP_TYPE_ACKNOWLEDGEMENT = 3; // bytes read report
- const RTMP_TYPE_WINDOW_ACKNOWLEDGEMENT_SIZE = 5; // server bandwidth
- const RTMP_TYPE_SET_PEER_BANDWIDTH = 6; // client bandwidth
-
- /* User Control Messages Event (4) */
- const RTMP_TYPE_EVENT = 4;
-
- const RTMP_TYPE_AUDIO = 8;
- const RTMP_TYPE_VIDEO = 9;
-
- /* Data Message */
- const RTMP_TYPE_FLEX_STREAM = 15; // AMF3
- const RTMP_TYPE_DATA = 18; // AMF0
-
- /* Shared Object Message */
- const RTMP_TYPE_FLEX_OBJECT = 16; // AMF3
- const RTMP_TYPE_SHARED_OBJECT = 19; // AMF0
-
- /* Command Message */
- const RTMP_TYPE_FLEX_MESSAGE = 17; // AMF3
- const RTMP_TYPE_INVOKE = 20; // AMF0
-
- /* Aggregate Message */
- const RTMP_TYPE_METADATA = 22;
-
- const RTMP_CHUNK_SIZE = 128;
- const RTMP_PING_TIME = 60000;
- const RTMP_PING_TIMEOUT = 30000;
-
- const STREAM_BEGIN = 0x00;
- const STREAM_EOF = 0x01;
- const STREAM_DRY = 0x02;
- const STREAM_EMPTY = 0x1f;
- const STREAM_READY = 0x20;
-
- const RTMP_TRANSACTION_CONNECT = 1;
- const RTMP_TRANSACTION_CREATE_STREAM = 2;
- const RTMP_TRANSACTION_GET_STREAM_LENGTH = 3;
-
- const RtmpPacket = {
- create: (fmt = 0, cid = 0) => {
- return {
- header: {
- fmt: fmt,
- cid: cid,
- timestamp: 0,
- length: 0,
- type: 0,
- stream_id: 0
- },
- clock: 0,
- delta: 0,
- payload: null,
- capacity: 0,
- bytes: 0
- };
- }
- };
-
- class NodeRtmpClient {
- constructor(rtmpUrl) {
- this.url = rtmpUrl;
- this.info = this.rtmpUrlParser(rtmpUrl);
- this.isPublish = false;
- this.launcher = new EventEmitter();
-
- this.handshakePayload = Buffer.alloc(RTMP_HANDSHAKE_SIZE);
- this.handshakeState = RTMP_HANDSHAKE_UNINIT;
- this.handshakeBytes = 0;
-
- this.parserBuffer = Buffer.alloc(RTMP_CHUNK_HEADER_MAX);
- this.parserState = RTMP_PARSE_INIT;
- this.parserBytes = 0;
- this.parserBasicBytes = 0;
- this.parserPacket = null;
- this.inPackets = new Map();
-
- this.inChunkSize = RTMP_CHUNK_SIZE;
- this.outChunkSize = RTMP_CHUNK_SIZE;
-
- this.streamId = 0;
- this.isSocketOpen = false;
- }
-
- onSocketData(data) {
- let bytes = data.length;
- let p = 0;
- let n = 0;
- while (bytes > 0) {
- switch (this.handshakeState) {
- case RTMP_HANDSHAKE_UNINIT:
- // read s0
- // Logger.debug('[rtmp client] read s0');
- this.handshakeState = RTMP_HANDSHAKE_0;
- this.handshakeBytes = 0;
- bytes -= 1;
- p += 1;
- break;
- case RTMP_HANDSHAKE_0:
- // read s1
- n = RTMP_HANDSHAKE_SIZE - this.handshakeBytes;
- n = n <= bytes ? n : bytes;
- data.copy(this.handshakePayload, this.handshakeBytes, p, p + n);
- this.handshakeBytes += n;
- bytes -= n;
- p += n;
- if (this.handshakeBytes === RTMP_HANDSHAKE_SIZE) {
- // Logger.debug('[rtmp client] read s1');
- this.handshakeState = RTMP_HANDSHAKE_1;
- this.handshakeBytes = 0;
- this.socket.write(this.handshakePayload);// write c2;
- // Logger.debug('[rtmp client] write c2');
- }
- break;
- case RTMP_HANDSHAKE_1:
- //read s2
- n = RTMP_HANDSHAKE_SIZE - this.handshakeBytes;
- n = n <= bytes ? n : bytes;
- data.copy(this.handshakePayload, this.handshakeBytes, p, n);
- this.handshakeBytes += n;
- bytes -= n;
- p += n;
- if (this.handshakeBytes === RTMP_HANDSHAKE_SIZE) {
- // Logger.debug('[rtmp client] read s2');
- this.handshakeState = RTMP_HANDSHAKE_2;
- this.handshakeBytes = 0;
- this.handshakePayload = null;
-
- this.rtmpSendConnect();
- }
- break;
- case RTMP_HANDSHAKE_2:
- return this.rtmpChunkRead(data, p, bytes);
- }
- }
- }
-
- onSocketError(e) {
- Logger.error('rtmp_client', "onSocketError", e);
- this.isSocketOpen = false;
- this.stop();
- }
-
- onSocketClose() {
- // Logger.debug('rtmp_client', "onSocketClose");
- this.isSocketOpen = false;
- this.stop();
- }
-
- onSocketTimeout() {
- // Logger.debug('rtmp_client', "onSocketTimeout");
- this.isSocketOpen = false;
- this.stop();
- }
-
- on(event, callback) {
- this.launcher.on(event, callback);
- }
-
- startPull() {
- this._start();
- }
-
- startPush() {
- this.isPublish = true;
- this._start();
- }
-
- _start() {
- this.socket = Net.createConnection(this.info.port, this.info.hostname, () => {
- //rtmp handshark c0c1
- let c0c1 = Crypto.randomBytes(1537);
- c0c1.writeUInt8(3);
- c0c1.writeUInt32BE(Date.now() / 1000, 1);
- c0c1.writeUInt32BE(0, 5);
- this.socket.write(c0c1);
- // Logger.debug('[rtmp client] write c0c1');
- });
- this.socket.on('data', this.onSocketData.bind(this));
- this.socket.on('error', this.onSocketError.bind(this));
- this.socket.on('close', this.onSocketClose.bind(this));
- this.socket.on('timeout', this.onSocketTimeout.bind(this));
- this.socket.setTimeout(60000);
- }
-
- stop() {
- if (this.streamId > 0) {
- if(!this.socket.destroyed) {
- if (this.isPublish) {
- this.rtmpSendFCUnpublish();
- }
- this.rtmpSendDeleteStream();
- this.socket.destroy();
- }
- this.streamId = 0;
- this.launcher.emit('close');
- }
- }
-
- pushAudio(audioData, timestamp) {
- if (this.streamId == 0) return;
- let packet = RtmpPacket.create();
- packet.header.fmt = RTMP_CHUNK_TYPE_0;
- packet.header.cid = RTMP_CHANNEL_AUDIO;
- packet.header.type = RTMP_TYPE_AUDIO;
- packet.payload = audioData;
- packet.header.length = packet.payload.length;
- packet.header.timestamp = timestamp;
- let rtmpChunks = this.rtmpChunksCreate(packet);
- this.socket.write(rtmpChunks);
- }
-
- pushVideo(videoData, timestamp) {
- if (this.streamId == 0) return;
- let packet = RtmpPacket.create();
- packet.header.fmt = RTMP_CHUNK_TYPE_0;
- packet.header.cid = RTMP_CHANNEL_VIDEO;
- packet.header.type = RTMP_TYPE_VIDEO;
- packet.payload = videoData;
- packet.header.length = packet.payload.length;
- packet.header.timestamp = timestamp;
- let rtmpChunks = this.rtmpChunksCreate(packet);
- this.socket.write(rtmpChunks);
- }
-
- pushScript(scriptData, timestamp) {
- if (this.streamId == 0) return;
- let packet = RtmpPacket.create();
- packet.header.fmt = RTMP_CHUNK_TYPE_0;
- packet.header.cid = RTMP_CHANNEL_DATA;
- packet.header.type = RTMP_TYPE_DATA;
- packet.payload = scriptData;
- packet.header.length = packet.payload.length;
- packet.header.timestamp = timestamp;
- let rtmpChunks = this.rtmpChunksCreate(packet);
- this.socket.write(rtmpChunks);
- }
-
- rtmpUrlParser(url) {
- let urlInfo = Url.parse(url, true);
- urlInfo.app = urlInfo.path.split('/')[1];
- urlInfo.port = !!urlInfo.port ? urlInfo.port : RTMP_PORT;
- urlInfo.tcurl = urlInfo.href.match(/rtmp:\/\/([^\/]+)\/([^\/]+)/)[0];
- urlInfo.stream = urlInfo.path.slice(urlInfo.app.length + 2);
- return urlInfo;
- }
-
- rtmpChunkBasicHeaderCreate(fmt, cid) {
- let out;
- if (cid >= 64 + 255) {
- out = Buffer.alloc(3);
- out[0] = (fmt << 6) | 1;
- out[1] = (cid - 64) & 0xFF;
- out[2] = ((cid - 64) >> 8) & 0xFF;
- } else if (cid >= 64) {
- out = Buffer.alloc(2);
- out[0] = (fmt << 6) | 0;
- out[1] = (cid - 64) & 0xFF;
- } else {
- out = Buffer.alloc(1);
- out[0] = (fmt << 6) | cid;
- }
- return out;
- }
-
- rtmpChunkMessageHeaderCreate(header) {
- let out = Buffer.alloc(rtmpHeaderSize[header.fmt % 4]);
- if (header.fmt <= RTMP_CHUNK_TYPE_2) {
- out.writeUIntBE(header.timestamp >= 0xffffff ? 0xffffff : header.timestamp, 0, 3);
- }
-
- if (header.fmt <= RTMP_CHUNK_TYPE_1) {
- out.writeUIntBE(header.length, 3, 3);
- out.writeUInt8(header.type, 6);
- }
-
- if (header.fmt === RTMP_CHUNK_TYPE_0) {
- out.writeUInt32LE(header.stream_id, 7);
- }
- return out;
- }
-
- rtmpChunksCreate(packet) {
- let header = packet.header;
- let payload = packet.payload;
- let payloadSize = header.length;
- let chunkSize = this.outChunkSize;
- let chunksOffset = 0;
- let payloadOffset = 0;
-
- let chunkBasicHeader = this.rtmpChunkBasicHeaderCreate(header.fmt, header.cid);
- let chunkBasicHeader3 = this.rtmpChunkBasicHeaderCreate(RTMP_CHUNK_TYPE_3, header.cid);
- let chunkMessageHeader = this.rtmpChunkMessageHeaderCreate(header);
- let useExtendedTimestamp = header.timestamp >= 0xffffff;
- let headerSize = chunkBasicHeader.length + chunkMessageHeader.length + (useExtendedTimestamp ? 4 : 0);
-
- let n = headerSize + payloadSize + Math.floor(payloadSize / chunkSize);
- if (useExtendedTimestamp) {
- n += Math.floor(payloadSize / chunkSize) * 4;
- }
- if (!(payloadSize % chunkSize)) {
- n -= 1;
- if (useExtendedTimestamp) { //TODO CHECK
- n -= 4;
- }
- }
-
- let chunks = Buffer.alloc(n);
- chunkBasicHeader.copy(chunks, chunksOffset);
- chunksOffset += chunkBasicHeader.length;
- chunkMessageHeader.copy(chunks, chunksOffset);
- chunksOffset += chunkMessageHeader.length;
- if (useExtendedTimestamp) {
- chunks.writeUInt32BE(header.timestamp, chunksOffset);
- chunksOffset += 4;
- }
- while (payloadSize > 0) {
- if (payloadSize > chunkSize) {
- payload.copy(chunks, chunksOffset, payloadOffset, payloadOffset + chunkSize);
- payloadSize -= chunkSize;
- chunksOffset += chunkSize;
- payloadOffset += chunkSize;
- chunkBasicHeader3.copy(chunks, chunksOffset);
- chunksOffset += chunkBasicHeader3.length;
- if (useExtendedTimestamp) {
- chunks.writeUInt32BE(header.timestamp, chunksOffset);
- chunksOffset += 4;
- }
- } else {
- payload.copy(chunks, chunksOffset, payloadOffset, payloadOffset + payloadSize);
- payloadSize -= payloadSize;
- chunksOffset += payloadSize;
- payloadOffset += payloadSize;
- }
- }
- return chunks;
- }
-
- rtmpChunkRead(data, p, bytes) {
- let size = 0;
- let offset = 0;
- let extended_timestamp = 0;
-
- while (offset < bytes) {
- switch (this.parserState) {
- case RTMP_PARSE_INIT:
- this.parserBytes = 1;
- this.parserBuffer[0] = data[p + offset++];
- if (0 === (this.parserBuffer[0] & 0x3F)) {
- this.parserBasicBytes = 2;
- } else if (1 === (this.parserBuffer[0] & 0x3F)) {
- this.parserBasicBytes = 3;
- } else {
- this.parserBasicBytes = 1;
- }
- this.parserState = RTMP_PARSE_BASIC_HEADER;
- break;
- case RTMP_PARSE_BASIC_HEADER:
- while (this.parserBytes < this.parserBasicBytes && offset < bytes) {
- this.parserBuffer[this.parserBytes++] = data[p + offset++];
- }
- if (this.parserBytes >= this.parserBasicBytes) {
- this.parserState = RTMP_PARSE_MESSAGE_HEADER;
- }
- break;
- case RTMP_PARSE_MESSAGE_HEADER:
- size = rtmpHeaderSize[this.parserBuffer[0] >> 6] + this.parserBasicBytes;
- while (this.parserBytes < size && offset < bytes) {
- this.parserBuffer[this.parserBytes++] = data[p + offset++];
- }
- if (this.parserBytes >= size) {
- this.rtmpPacketParse();
- this.parserState = RTMP_PARSE_EXTENDED_TIMESTAMP;
- }
- break;
- case RTMP_PARSE_EXTENDED_TIMESTAMP:
- size = rtmpHeaderSize[this.parserPacket.header.fmt] + this.parserBasicBytes;
- if (this.parserPacket.header.timestamp === 0xFFFFFF) size += 4;
- while (this.parserBytes < size && offset < bytes) {
- this.parserBuffer[this.parserBytes++] = data[p + offset++];
- }
- if (this.parserBytes >= size) {
- if (this.parserPacket.header.timestamp === 0xFFFFFF) {
- extended_timestamp = this.parserBuffer.readUInt32BE(rtmpHeaderSize[this.parserPacket.header.fmt] + this.parserBasicBytes);
- }
-
- if (0 === this.parserPacket.bytes) {
- if (RTMP_CHUNK_TYPE_0 === this.parserPacket.header.fmt) {
- this.parserPacket.clock = 0xFFFFFF === this.parserPacket.header.timestamp ? extended_timestamp : this.parserPacket.header.timestamp;
- this.parserPacket.delta = 0;
- } else {
- this.parserPacket.delta = 0xFFFFFF === this.parserPacket.header.timestamp ? extended_timestamp : this.parserPacket.header.timestamp;
- }
- this.rtmpPacketAlloc();
- }
- this.parserState = RTMP_PARSE_PAYLOAD;
- }
- break;
- case RTMP_PARSE_PAYLOAD:
- size = Math.min(this.inChunkSize - (this.parserPacket.bytes % this.inChunkSize), this.parserPacket.header.length - this.parserPacket.bytes);
- size = Math.min(size, bytes - offset);
- if (size > 0) {
- data.copy(this.parserPacket.payload, this.parserPacket.bytes, p + offset, p + offset + size);
- }
- this.parserPacket.bytes += size;
- offset += size;
-
- if (this.parserPacket.bytes >= this.parserPacket.header.length) {
- this.parserState = RTMP_PARSE_INIT;
- this.parserPacket.bytes = 0;
- this.parserPacket.clock += this.parserPacket.delta;
- this.rtmpHandler();
- } else if (0 === (this.parserPacket.bytes % this.inChunkSize)) {
- this.parserState = RTMP_PARSE_INIT;
- }
- break;
- }
- }
- }
-
- rtmpPacketParse() {
- let fmt = this.parserBuffer[0] >> 6;
- let cid = 0;
- if (this.parserBasicBytes === 2) {
- cid = 64 + this.parserBuffer[1];
- } else if (this.parserBasicBytes === 3) {
- cid = 64 + this.parserBuffer[1] + this.parserBuffer[2] << 8;
- } else {
- cid = this.parserBuffer[0] & 0x3F;
- }
- let hasp = this.inPackets.has(cid);
- if (!hasp) {
- this.parserPacket = RtmpPacket.create(fmt, cid);
- this.inPackets.set(cid, this.parserPacket);
- } else {
- this.parserPacket = this.inPackets.get(cid);
- }
- this.parserPacket.header.fmt = fmt;
- this.parserPacket.header.cid = cid;
- this.rtmpChunkMessageHeaderRead();
- // Logger.log(this.parserPacket);
-
- }
-
- rtmpChunkMessageHeaderRead() {
- let offset = this.parserBasicBytes;
-
- // timestamp / delta
- if (this.parserPacket.header.fmt <= RTMP_CHUNK_TYPE_2) {
- this.parserPacket.header.timestamp = this.parserBuffer.readUIntBE(offset, 3);
- offset += 3;
- }
-
- // message length + type
- if (this.parserPacket.header.fmt <= RTMP_CHUNK_TYPE_1) {
- this.parserPacket.header.length = this.parserBuffer.readUIntBE(offset, 3);
- this.parserPacket.header.type = this.parserBuffer[offset + 3];
- offset += 4;
- }
-
- if (this.parserPacket.header.fmt === RTMP_CHUNK_TYPE_0) {
- this.parserPacket.header.stream_id = this.parserBuffer.readUInt32LE(offset);
- offset += 4;
- }
- return offset;
- }
-
- rtmpPacketAlloc() {
- if (this.parserPacket.capacity < this.parserPacket.header.length) {
- this.parserPacket.payload = Buffer.alloc(this.parserPacket.header.length + 1024);
- this.parserPacket.capacity = this.parserPacket.header.length + 1024;
- }
- }
-
- rtmpHandler() {
- switch (this.parserPacket.header.type) {
- case RTMP_TYPE_SET_CHUNK_SIZE:
- case RTMP_TYPE_ABORT:
- case RTMP_TYPE_ACKNOWLEDGEMENT:
- case RTMP_TYPE_WINDOW_ACKNOWLEDGEMENT_SIZE:
- case RTMP_TYPE_SET_PEER_BANDWIDTH:
- return 0 === this.rtmpControlHandler() ? -1 : 0;
- case RTMP_TYPE_EVENT:
- return 0 === this.rtmpEventHandler() ? -1 : 0;
- case RTMP_TYPE_AUDIO:
- return this.rtmpAudioHandler();
- case RTMP_TYPE_VIDEO:
- return this.rtmpVideoHandler();
- case RTMP_TYPE_FLEX_MESSAGE:
- case RTMP_TYPE_INVOKE:
- return this.rtmpInvokeHandler();
- case RTMP_TYPE_FLEX_STREAM:// AMF3
- case RTMP_TYPE_DATA: // AMF0
- return this.rtmpDataHandler();
- }
- }
-
- rtmpControlHandler() {
- let payload = this.parserPacket.payload;
- switch (this.parserPacket.header.type) {
- case RTMP_TYPE_SET_CHUNK_SIZE:
- this.inChunkSize = payload.readUInt32BE();
- // Logger.debug('set inChunkSize', this.inChunkSize);
- break;
- case RTMP_TYPE_ABORT:
- break;
- case RTMP_TYPE_ACKNOWLEDGEMENT:
- break;
- case RTMP_TYPE_WINDOW_ACKNOWLEDGEMENT_SIZE:
- this.ackSize = payload.readUInt32BE();
- // Logger.debug('set ack Size', this.ackSize);
- break;
- case RTMP_TYPE_SET_PEER_BANDWIDTH:
- break;
- }
- }
-
- rtmpEventHandler() {
- let payload = this.parserPacket.payload.slice(0, this.parserPacket.header.length);
- let event = payload.readUInt16BE();
- let value = payload.readUInt32BE(2);
- // Logger.log('rtmpEventHandler', event, value);
- switch (event) {
- case 6:
- this.rtmpSendPingResponse(value);
- break;
- }
- }
-
- rtmpInvokeHandler() {
- let offset = this.parserPacket.header.type === RTMP_TYPE_FLEX_MESSAGE ? 1 : 0;
- let payload = this.parserPacket.payload.slice(offset, this.parserPacket.header.length);
- let invokeMessage = AMF.decodeAmf0Cmd(payload);
- // Logger.log('rtmpInvokeHandler', invokeMessage);
-
- switch (invokeMessage.cmd) {
- case '_result':
- this.rtmpCommandOnresult(invokeMessage);
- break;
- case '_error':
- this.rtmpCommandOnerror(invokeMessage);
- break;
- case 'onStatus':
- this.rtmpCommandOnstatus(invokeMessage);
- break;
- }
- }
-
- rtmpCommandOnresult(invokeMessage) {
- // Logger.debug(invokeMessage);
- switch (invokeMessage.transId) {
- case RTMP_TRANSACTION_CONNECT:
- this.launcher.emit('status', invokeMessage.info);
- this.rtmpOnconnect();
- break;
- case RTMP_TRANSACTION_CREATE_STREAM:
- this.rtmpOncreateStream(invokeMessage.info);
- break;
- }
- }
-
- rtmpCommandOnerror(invokeMessage) {
- this.launcher.emit('status', invokeMessage.info);
- }
-
- rtmpCommandOnstatus(invokeMessage) {
- this.launcher.emit('status', invokeMessage.info);
- }
-
- rtmpOnconnect() {
- if (this.isPublish) {
- this.rtmpSendReleaseStream();
- this.rtmpSendFCPublish();
- }
- this.rtmpSendCreateStream();
- }
-
- rtmpOncreateStream(sid) {
- this.streamId = sid;
- if (this.isPublish) {
- this.rtmpSendPublish();
- this.rtmpSendSetChunkSize();
- } else {
- this.rtmpSendPlay();
- this.rtmpSendSetBufferLength(1000);
- }
- }
-
- rtmpAudioHandler() {
- let payload = this.parserPacket.payload.slice(0, this.parserPacket.header.length);
- this.launcher.emit('audio', payload, this.parserPacket.clock);
- }
-
- rtmpVideoHandler() {
- let payload = this.parserPacket.payload.slice(0, this.parserPacket.header.length);
- this.launcher.emit('video', payload, this.parserPacket.clock);
- }
-
- rtmpDataHandler() {
- let payload = this.parserPacket.payload.slice(0, this.parserPacket.header.length);
- this.launcher.emit('script', payload, this.parserPacket.clock);
- }
-
- sendInvokeMessage(sid, opt) {
- let packet = RtmpPacket.create();
- packet.header.fmt = RTMP_CHUNK_TYPE_0;
- packet.header.cid = RTMP_CHANNEL_INVOKE;
- packet.header.type = RTMP_TYPE_INVOKE;
- packet.header.stream_id = sid;
- packet.payload = AMF.encodeAmf0Cmd(opt);
- packet.header.length = packet.payload.length;
- let chunks = this.rtmpChunksCreate(packet);
- this.socket.write(chunks);
- }
-
- rtmpSendConnect() {
- let opt = {
- cmd: 'connect',
- transId: RTMP_TRANSACTION_CONNECT,
- cmdObj: {
- app: this.info.app,
- flashVer: FLASHVER,
- tcUrl: this.info.tcurl,
- fpad: 0,
- capabilities: 15,
- audioCodecs: 3191,
- videoCodecs: 252,
- videoFunction: 1,
- encoding: 0
- }
- }
- this.sendInvokeMessage(0, opt);
- }
-
- rtmpSendReleaseStream() {
- let opt = {
- cmd: 'releaseStream',
- transId: 0,
- cmdObj: null,
- streamName: this.info.stream,
- };
- this.sendInvokeMessage(this.streamId, opt);
- }
-
- rtmpSendFCPublish() {
- let opt = {
- cmd: 'FCPublish',
- transId: 0,
- cmdObj: null,
- streamName: this.info.stream,
- };
- this.sendInvokeMessage(this.streamId, opt);
- }
-
- rtmpSendCreateStream() {
- let opt = {
- cmd: 'createStream',
- transId: RTMP_TRANSACTION_CREATE_STREAM,
- cmdObj: null
- };
- this.sendInvokeMessage(0, opt);
- }
-
- rtmpSendPlay() {
- let opt = {
- cmd: 'play',
- transId: 0,
- cmdObj: null,
- streamName: this.info.stream,
- start: -2,
- duration: -1,
- reset: 1
- };
- this.sendInvokeMessage(this.streamId, opt);
- }
-
- rtmpSendSetBufferLength(bufferTime) {
- let packet = RtmpPacket.create();
- packet.header.fmt = RTMP_CHUNK_TYPE_0;
- packet.header.cid = RTMP_CHANNEL_PROTOCOL;
- packet.header.type = RTMP_TYPE_EVENT;
- packet.payload = Buffer.alloc(10);
- packet.header.length = packet.payload.length;
- packet.payload.writeUInt16BE(0x03);
- packet.payload.writeUInt32BE(this.streamId, 2);
- packet.payload.writeUInt32BE(bufferTime, 6);
- let chunks = this.rtmpChunksCreate(packet);
- this.socket.write(chunks);
- }
-
- rtmpSendPublish() {
- let opt = {
- cmd: 'publish',
- transId: 0,
- cmdObj: null,
- streamName: this.info.stream,
- type: 'live'
- };
- this.sendInvokeMessage(this.streamId, opt);
- }
-
- rtmpSendSetChunkSize() {
- let rtmpBuffer = Buffer.from('02000000000004010000000000000000', 'hex');
- rtmpBuffer.writeUInt32BE(this.inChunkSize, 12);
- this.socket.write(rtmpBuffer);
- this.outChunkSize = this.inChunkSize;
- }
-
- rtmpSendFCUnpublish() {
- let opt = {
- cmd: 'FCUnpublish',
- transId: 0,
- cmdObj: null,
- streamName: this.info.stream,
- };
- this.sendInvokeMessage(this.streamId, opt);
- }
-
- rtmpSendDeleteStream() {
- let opt = {
- cmd: 'deleteStream',
- transId: 0,
- cmdObj: null,
- streamId: this.streamId
- };
- this.sendInvokeMessage(this.streamId, opt);
- }
-
- rtmpSendPingResponse(time) {
- let packet = RtmpPacket.create();
- packet.header.fmt = RTMP_CHUNK_TYPE_0;
- packet.header.cid = RTMP_CHANNEL_PROTOCOL;
- packet.header.type = RTMP_TYPE_EVENT;
- packet.payload = Buffer.alloc(6);
- packet.header.length = packet.payload.length;
- packet.payload.writeUInt16BE(0x07);
- packet.payload.writeUInt32BE(time, 2);
- let chunks = this.rtmpChunksCreate(packet);
- this.socket.write(chunks);
- }
- }
-
- module.exports = NodeRtmpClient
|