From 6bf22e18d1357f11048902e2c5ac9f814cd123fa Mon Sep 17 00:00:00 2001 From: Sergiy Date: Fri, 4 Dec 2009 16:52:16 +0000 Subject: [PATCH] Implement RTMP output (publishing FLV stream to RTMP server). Patch by Sergiy (piratfm at `do-no-evil-mail`.com) Originally committed as revision 20731 to svn://svn.ffmpeg.org/ffmpeg/trunk --- doc/general.texi | 3 +- libavformat/avformat.h | 2 +- libavformat/rtmpproto.c | 254 +++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 244 insertions(+), 15 deletions(-) diff --git a/doc/general.texi b/doc/general.texi index 09cd03a24..17c080488 100644 --- a/doc/general.texi +++ b/doc/general.texi @@ -201,7 +201,8 @@ library: @item RL2 @tab @tab X @tab Audio and video format used in some games by Entertainment Software Partners. @item RPL/ARMovie @tab @tab X -@item RTMP @tab @tab X +@item RTMP @tab X @tab X + @tab Output is performed by publishing stream to RTMP server @item RTP @tab @tab X @item RTSP @tab @tab X @item SDP @tab @tab X diff --git a/libavformat/avformat.h b/libavformat/avformat.h index d0d9b8d22..507a49110 100644 --- a/libavformat/avformat.h +++ b/libavformat/avformat.h @@ -22,7 +22,7 @@ #define AVFORMAT_AVFORMAT_H #define LIBAVFORMAT_VERSION_MAJOR 52 -#define LIBAVFORMAT_VERSION_MINOR 40 +#define LIBAVFORMAT_VERSION_MINOR 41 #define LIBAVFORMAT_VERSION_MICRO 0 #define LIBAVFORMAT_VERSION_INT AV_VERSION_INT(LIBAVFORMAT_VERSION_MAJOR, \ diff --git a/libavformat/rtmpproto.c b/libavformat/rtmpproto.c index e8f5c2df9..837905cd6 100644 --- a/libavformat/rtmpproto.c +++ b/libavformat/rtmpproto.c @@ -47,9 +47,12 @@ typedef enum { STATE_START, ///< client has not done anything yet STATE_HANDSHAKED, ///< client has performed handshake + STATE_RELEASING, ///< client releasing stream before publish it (for output) + STATE_FCPUBLISH, ///< client FCPublishing stream (for output) STATE_CONNECTING, ///< client connected to server successfully STATE_READY, ///< client has sent all needed commands and waits for server reply STATE_PLAYING, ///< client has started receiving multimedia data from server + STATE_PUBLISHING, ///< client has started sending multimedia data to server (for output) } ClientState; /** protocol handler context */ @@ -65,6 +68,7 @@ typedef struct RTMPContext { uint8_t* flv_data; ///< buffer with data for demuxer int flv_size; ///< current buffer size int flv_off; ///< number of bytes read from current buffer + RTMPPacket out_pkt; ///< rtmp packet, created from flv a/v or metadata (for output) } RTMPContext; #define PLAYER_KEY_OPEN_PART_LEN 30 ///< length of partial key used for first client digest signing @@ -97,7 +101,7 @@ static void gen_connect(URLContext *s, RTMPContext *rt, const char *proto, const char *host, int port) { RTMPPacket pkt; - uint8_t ver[32], *p; + uint8_t ver[64], *p; char tcurl[512]; ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0, 4096); @@ -110,12 +114,19 @@ static void gen_connect(URLContext *s, RTMPContext *rt, const char *proto, ff_amf_write_field_name(&p, "app"); ff_amf_write_string(&p, rt->app); + if (rt->is_input) { snprintf(ver, sizeof(ver), "%s %d,%d,%d,%d", RTMP_CLIENT_PLATFORM, RTMP_CLIENT_VER1, RTMP_CLIENT_VER2, RTMP_CLIENT_VER3, RTMP_CLIENT_VER4); + } else { + snprintf(ver, sizeof(ver), "FMLE/3.0 (compatible; %s)", LIBAVFORMAT_IDENT); + ff_amf_write_field_name(&p, "type"); + ff_amf_write_string(&p, "nonprivate"); + } ff_amf_write_field_name(&p, "flashVer"); ff_amf_write_string(&p, ver); ff_amf_write_field_name(&p, "tcUrl"); ff_amf_write_string(&p, tcurl); + if (rt->is_input) { ff_amf_write_field_name(&p, "fpad"); ff_amf_write_bool(&p, 0); ff_amf_write_field_name(&p, "capabilities"); @@ -126,6 +137,7 @@ static void gen_connect(URLContext *s, RTMPContext *rt, const char *proto, ff_amf_write_number(&p, 252.0); ff_amf_write_field_name(&p, "videoFunction"); ff_amf_write_number(&p, 1.0); + } ff_amf_write_object_end(&p); pkt.data_size = p - pkt.data; @@ -134,6 +146,75 @@ static void gen_connect(URLContext *s, RTMPContext *rt, const char *proto, } /** + * Generates 'releaseStream' call and sends it to the server. It should make + * the server release some channel for media streams. + */ +static void gen_release_stream(URLContext *s, RTMPContext *rt) +{ + RTMPPacket pkt; + uint8_t *p; + + ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0, + 29 + strlen(rt->playpath)); + + av_log(LOG_CONTEXT, AV_LOG_DEBUG, "Releasing stream...\n"); + p = pkt.data; + ff_amf_write_string(&p, "releaseStream"); + ff_amf_write_number(&p, 2.0); + ff_amf_write_null(&p); + ff_amf_write_string(&p, rt->playpath); + + ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]); + ff_rtmp_packet_destroy(&pkt); +} + +/** + * Generates 'FCPublish' call and sends it to the server. It should make + * the server preapare for receiving media streams. + */ +static void gen_fcpublish_stream(URLContext *s, RTMPContext *rt) +{ + RTMPPacket pkt; + uint8_t *p; + + ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0, + 25 + strlen(rt->playpath)); + + av_log(LOG_CONTEXT, AV_LOG_DEBUG, "FCPublish stream...\n"); + p = pkt.data; + ff_amf_write_string(&p, "FCPublish"); + ff_amf_write_number(&p, 3.0); + ff_amf_write_null(&p); + ff_amf_write_string(&p, rt->playpath); + + ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]); + ff_rtmp_packet_destroy(&pkt); +} + +/** + * Generates 'FCUnpublish' call and sends it to the server. It should make + * the server destroy stream. + */ +static void gen_fcunpublish_stream(URLContext *s, RTMPContext *rt) +{ + RTMPPacket pkt; + uint8_t *p; + + ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0, + 27 + strlen(rt->playpath)); + + av_log(LOG_CONTEXT, AV_LOG_DEBUG, "UnPublishing stream...\n"); + p = pkt.data; + ff_amf_write_string(&p, "FCUnpublish"); + ff_amf_write_number(&p, 5.0); + ff_amf_write_null(&p); + ff_amf_write_string(&p, rt->playpath); + + ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]); + ff_rtmp_packet_destroy(&pkt); +} + +/** * Generates 'createStream' call and sends it to the server. It should make * the server allocate some channel for media streams. */ @@ -147,8 +228,31 @@ static void gen_create_stream(URLContext *s, RTMPContext *rt) p = pkt.data; ff_amf_write_string(&p, "createStream"); - ff_amf_write_number(&p, 3.0); + ff_amf_write_number(&p, rt->is_input ? 3.0 : 4.0); + ff_amf_write_null(&p); + + ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]); + ff_rtmp_packet_destroy(&pkt); +} + + +/** + * Generates 'deleteStream' call and sends it to the server. It should make + * the server remove some channel for media streams. + */ +static void gen_delete_stream(URLContext *s, RTMPContext *rt) +{ + RTMPPacket pkt; + uint8_t *p; + + av_log(LOG_CONTEXT, AV_LOG_DEBUG, "Deleting stream...\n"); + ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0, 34); + + p = pkt.data; + ff_amf_write_string(&p, "deleteStream"); + ff_amf_write_number(&p, 0.0); ff_amf_write_null(&p); + ff_amf_write_number(&p, rt->main_channel_id); ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]); ff_rtmp_packet_destroy(&pkt); @@ -190,6 +294,30 @@ static void gen_play(URLContext *s, RTMPContext *rt) } /** + * Generates 'publish' call and sends it to the server. + */ +static void gen_publish(URLContext *s, RTMPContext *rt) +{ + RTMPPacket pkt; + uint8_t *p; + + av_log(LOG_CONTEXT, AV_LOG_DEBUG, "Sending publish command for '%s'\n", rt->playpath); + ff_rtmp_packet_create(&pkt, RTMP_SOURCE_CHANNEL, RTMP_PT_INVOKE, 0, + 30 + strlen(rt->playpath)); + pkt.extra = rt->main_channel_id; + + p = pkt.data; + ff_amf_write_string(&p, "publish"); + ff_amf_write_number(&p, 0.0); + ff_amf_write_null(&p); + ff_amf_write_string(&p, rt->playpath); + ff_amf_write_string(&p, "live"); + + ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, rt->prev_pkt[1]); + ff_rtmp_packet_destroy(&pkt); +} + +/** * Generates ping reply and sends it to the server. */ static void gen_pong(URLContext *s, RTMPContext *rt, RTMPPacket *ppkt) @@ -349,6 +477,7 @@ static int rtmp_handshake(URLContext *s, RTMPContext *rt) av_log(LOG_CONTEXT, AV_LOG_DEBUG, "Server version %d.%d.%d.%d\n", serverdata[5], serverdata[6], serverdata[7], serverdata[8]); + if (rt->is_input) { server_pos = rtmp_validate_digest(serverdata + 1, 772); if (!server_pos) { server_pos = rtmp_validate_digest(serverdata + 1, 8); @@ -380,6 +509,10 @@ static int rtmp_handshake(URLContext *s, RTMPContext *rt) // write reply back to the server url_write(rt->stream, tosend, RTMP_HANDSHAKE_PACKET_SIZE); + } else { + url_write(rt->stream, serverdata+1, RTMP_HANDSHAKE_PACKET_SIZE); + } + return 0; } @@ -401,6 +534,8 @@ static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt) "Chunk size change packet is not 4 bytes long (%d)\n", pkt->data_size); return -1; } + if (!rt->is_input) + ff_rtmp_packet_write(rt->stream, pkt, rt->chunk_size, rt->prev_pkt[1]); rt->chunk_size = AV_RB32(pkt->data); if (rt->chunk_size <= 0) { av_log(LOG_CONTEXT, AV_LOG_ERROR, "Incorrect chunk size %d\n", rt->chunk_size); @@ -425,9 +560,29 @@ static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt) } else if (!memcmp(pkt->data, "\002\000\007_result", 10)) { switch (rt->state) { case STATE_HANDSHAKED: + if (!rt->is_input) { + gen_release_stream(s, rt); + gen_fcpublish_stream(s, rt); + rt->state = STATE_RELEASING; + } else { + rt->state = STATE_CONNECTING; + } gen_create_stream(s, rt); + break; + case STATE_FCPUBLISH: rt->state = STATE_CONNECTING; break; + case STATE_RELEASING: + rt->state = STATE_FCPUBLISH; + /* hack for Wowza Media Server, it does not send result for + * releaseStream and FCPublish calls */ + if (!pkt->data[10]) { + int pkt_id = (int) av_int2dbl(AV_RB64(pkt->data + 11)); + if (pkt_id == 4) + rt->state = STATE_CONNECTING; + } + if(rt->state != STATE_CONNECTING) + break; case STATE_CONNECTING: //extract a number from the result if (pkt->data[10] || pkt->data[19] != 5 || pkt->data[20]) { @@ -435,7 +590,11 @@ static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt) } else { rt->main_channel_id = (int) av_int2dbl(AV_RB64(pkt->data + 21)); } + if (rt->is_input) { gen_play(s, rt); + } else { + gen_publish(s, rt); + } rt->state = STATE_READY; break; } @@ -459,10 +618,8 @@ static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt) } t = ff_amf_get_field_value(ptr, data_end, "code", tmpstr, sizeof(tmpstr)); - if (!t && !strcmp(tmpstr, "NetStream.Play.Start")) { - rt->state = STATE_PLAYING; - return 0; - } + if (!t && !strcmp(tmpstr, "NetStream.Play.Start")) rt->state = STATE_PLAYING; + if (!t && !strcmp(tmpstr, "NetStream.Publish.Start")) rt->state = STATE_PUBLISHING; } break; } @@ -501,11 +658,11 @@ static int get_packet(URLContext *s, int for_header) ff_rtmp_packet_destroy(&rpkt); return -1; } - if (for_header && rt->state == STATE_PLAYING) { + if (for_header && (rt->state == STATE_PLAYING || rt->state == STATE_PUBLISHING)) { ff_rtmp_packet_destroy(&rpkt); return 0; } - if (!rpkt.data_size) { + if (!rpkt.data_size || !rt->is_input) { ff_rtmp_packet_destroy(&rpkt); continue; } @@ -545,6 +702,14 @@ static int rtmp_close(URLContext *h) { RTMPContext *rt = h->priv_data; + if(!rt->is_input) { + rt->flv_data = NULL; + if (rt->out_pkt.data_size) + ff_rtmp_packet_destroy(&rt->out_pkt); + gen_fcunpublish_stream(h, rt); + } + gen_delete_stream(h, rt); + av_freep(&rt->flv_data); url_close(rt->stream); av_free(rt); @@ -586,10 +751,6 @@ static int rtmp_open(URLContext *s, const char *uri, int flags) goto fail; } - if (!rt->is_input) { - av_log(LOG_CONTEXT, AV_LOG_ERROR, "RTMP output is not supported yet.\n"); - goto fail; - } else { rt->state = STATE_START; if (rtmp_handshake(s, rt)) return -1; @@ -635,11 +796,17 @@ static int rtmp_open(URLContext *s, const char *uri, int flags) } while (ret == EAGAIN); if (ret < 0) goto fail; + + if (rt->is_input) { // generate FLV header for demuxer rt->flv_size = 13; rt->flv_data = av_realloc(rt->flv_data, rt->flv_size); rt->flv_off = 0; memcpy(rt->flv_data, "FLV\1\5\0\0\0\011\0\0\0\0", rt->flv_size); + } else { + rt->flv_size = 0; + rt->flv_data = NULL; + rt->flv_off = 0; } s->max_packet_size = url_get_max_packet_size(rt->stream); @@ -679,7 +846,68 @@ static int rtmp_read(URLContext *s, uint8_t *buf, int size) static int rtmp_write(URLContext *h, uint8_t *buf, int size) { - return 0; + RTMPContext *rt = h->priv_data; + int size_temp = size; + int pktsize, pkttype; + uint32_t ts; + const uint8_t *buf_temp = buf; + + if (size < 11) { + av_log(LOG_CONTEXT, AV_LOG_DEBUG, "FLV packet too small %d\n", size); + return 0; + } + + do { + if (!rt->flv_off) { + //skip flv header + if (buf_temp[0] == 'F' && buf_temp[1] == 'L' && buf_temp[2] == 'V') { + buf_temp += 9 + 4; + size_temp -= 9 + 4; + } + + pkttype = bytestream_get_byte(&buf_temp); + pktsize = bytestream_get_be24(&buf_temp); + ts = bytestream_get_be24(&buf_temp); + ts |= bytestream_get_byte(&buf_temp) << 24; + bytestream_get_be24(&buf_temp); + size_temp -= 11; + rt->flv_size = pktsize; + + //force 12bytes header + if (((pkttype == RTMP_PT_VIDEO || pkttype == RTMP_PT_AUDIO) && ts == 0) || + pkttype == RTMP_PT_NOTIFY) { + if (pkttype == RTMP_PT_NOTIFY) + pktsize += 16; + rt->prev_pkt[1][RTMP_SOURCE_CHANNEL].channel_id = 0; + } + + //this can be a big packet, it's better to send it right here + ff_rtmp_packet_create(&rt->out_pkt, RTMP_SOURCE_CHANNEL, pkttype, ts, pktsize); + rt->out_pkt.extra = rt->main_channel_id; + rt->flv_data = rt->out_pkt.data; + + if (pkttype == RTMP_PT_NOTIFY) + ff_amf_write_string(&rt->flv_data, "@setDataFrame"); + } + + if (rt->flv_size - rt->flv_off > size_temp) { + bytestream_get_buffer(&buf_temp, rt->flv_data + rt->flv_off, size_temp); + rt->flv_off += size_temp; + } else { + bytestream_get_buffer(&buf_temp, rt->flv_data + rt->flv_off, rt->flv_size - rt->flv_off); + rt->flv_off += rt->flv_size - rt->flv_off; + } + + if (rt->flv_off == rt->flv_size) { + bytestream_get_be32(&buf_temp); + + ff_rtmp_packet_write(rt->stream, &rt->out_pkt, rt->chunk_size, rt->prev_pkt[1]); + ff_rtmp_packet_destroy(&rt->out_pkt); + rt->flv_size = 0; + rt->flv_off = 0; + } + } while (buf_temp - buf < size_temp); + return size; } URLProtocol rtmp_protocol = { -- 2.11.0