root/cygnal/rtmp_server.cpp

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. _streamid
  2. processClientHandShake
  3. handShakeResponse
  4. serverFinish
  5. packetSend
  6. packetRead
  7. encodeResult
  8. encodeResult
  9. encodeResult
  10. encodeResult
  11. encodeResult
  12. encodePing
  13. encodePing
  14. encodeBWDone
  15. encodeAudio
  16. encodeVideo
  17. parseEchoRequest
  18. formatEchoResponse
  19. formatEchoResponse
  20. formatEchoResponse
  21. createClientID
  22. createClientID
  23. createStreamID
  24. sendFile
  25. sendToClient
  26. sendToClient
  27. rtmp_handler

// rtmp.cpp:  Adobe/Macromedia Real Time Message Protocol handler, for Gnash.
// 
//   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010,
//   2011 Free Software Foundation, Inc
// 
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 3 of the License, or
// (at your option) any later version.
// 
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
//

#ifdef HAVE_CONFIG_H
#include "gnashconfig.h"
#endif

#include <iostream>
#include <string>
#include <map>
#include <cstdlib>
#include <cstdio>

#include <boost/cstdint.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/detail/endian.hpp>
#include <boost/random/uniform_real.hpp>
#include <boost/random/uniform_int.hpp>
#include <boost/random/mersenne_twister.hpp>
#include <boost/lexical_cast.hpp>

#if ! (defined(_WIN32) || defined(WIN32))
#       include <netinet/in.h>
#endif

#include "log.h"
#include "URL.h"
#include "amf.h"
#include "rtmp.h"
#include "rtmp_server.h"
#include "network.h"
#include "element.h"
#include "handler.h"
#include "utility.h"
#include "buffer.h"
#include "GnashSleep.h"
#include "crc.h"
#include "cache.h"
#include "diskstream.h"
#ifdef HAVE_SYS_TIME_H
# include <sys/time.h>
#endif 
using namespace gnash;
using namespace std;

namespace cygnal
{

// Get access to the global config data for Cygnal
static CRcInitFile& crcfile = CRcInitFile::getDefaultInstance();

// Get access to the global Cygnal cache
static Cache& cache = Cache::getDefaultInstance();

extern map<int, Handler *> handlers;

RTMPServer::RTMPServer() 
    : _filesize(0),
      _streamid(1)
{
//    GNASH_REPORT_FUNCTION;
//     _inbytes = 0;
//     _outbytes = 0;
    
//    _body = new unsigned char(RTMP_HANDSHAKE_SIZE+1);
//    memset(_body, 0, RTMP_HANDSHAKE_SIZE+1);
}

RTMPServer::~RTMPServer()
{
//    GNASH_REPORT_FUNCTION;
    _properties.clear();
//    delete _body;
}


boost::shared_ptr<cygnal::Element>
RTMPServer::processClientHandShake(int fd)
{
    GNASH_REPORT_FUNCTION;

    log_network(_("Processing RTMP Handshake for fd #%d"), fd);
    
#ifdef USE_STATISTICS
    struct timespec start;
    clock_gettime (CLOCK_REALTIME, &start);
#endif
    
    // Adjust the timeout for reading from the network
    RTMP::setTimeout(10);
    
    // These store the information we need from the initial
    /// NetConnection object.
    boost::scoped_ptr<cygnal::Element> nc;
    boost::shared_ptr<cygnal::Buffer>  pkt;
    boost::shared_ptr<cygnal::Element> tcurl;
    boost::shared_ptr<cygnal::Element> swfurl;
    boost::shared_ptr<cygnal::Element> encoding;

//     RTMP::rtmp_headersize_e response_head_size = RTMP::HEADER_12;
    
    // Read the handshake bytes sent by the client when requesting
    // a connection.
    boost::shared_ptr<cygnal::Buffer> handshake1 = RTMP::recvMsg(fd);
    // See if we have data in the handshake, we should have 1537 bytes
    if (!handshake1) {
        log_error("Failed to read the handshake from the client.");
        return tcurl;           // nc is empty
    } else {
        log_network("Read first handshake from the client.");
    }
    
    // Send our response to the handshake, which primarily is the bytes
    // we just received.
    handShakeResponse(fd, *handshake1);
    
    // Read the response from the client from the handshale reponse we
    // just sent.
    boost::shared_ptr<cygnal::Buffer> handshake2 = RTMP::recvMsg(fd);
    // See if we have data in the handshake, we should have 1536 bytes
    if (handshake2 == 0) {
        log_error("failed to read the handshake from the client.");
        return tcurl;           // nc is empty
    } else {
        log_network("Read second handshake from the client.");
    }
    
    // Don't assume the data we just read is a handshake.
    pkt = serverFinish(fd, *handshake1, *handshake2);
    // Wmake sure we got data before trying to process it
    if (!pkt) {
        log_error("Didn't receive any data in handshake!");
        tcurl.reset(new cygnal::Element);
        return tcurl;           // nc is empty
    }
    
    // the packet is a raw RTMP message. Since the header can be a
    // variety of sizes, and this effects the data size, we need to
    // decode that first.
    boost::shared_ptr<RTMP::rtmp_head_t> qhead = RTMP::decodeHeader(pkt->reference());

    if (!qhead) {
        log_error("RTMP header had parsing error!");
        return tcurl;           // nc is empty
    }

    // We know the first packet is always a NetConnection INVOKE of
    // the connect() method. These are usually around 300-400 bytes in
    // testing, so anything larger than that is suspicios.
    if (qhead->bodysize > 1024) {
        log_error("NetConnection unusually large! %d", qhead->bodysize);
    }

    // Get the actual start of the data
    boost::uint8_t *ptr = pkt->reference() + qhead->head_size;

    // See if we have enough data to go past the chunksize, which is
    // probable. If so, all chunks are the default size of 128, the
    // same size as used for video packets. This means every chunksize
    // boundary is an RTMP header byte that must be removed, or the
    // data in the NetConnection::connect() packet will be
    // corrupted. There is probably a better way to do this, but for
    // now build a copy of the data but skip over the RTMP header
    // bytes every chunk size biundary. All RTMP headers at this stage
    // are 1 byte ones.
    boost::scoped_ptr<cygnal::Buffer> newptr(new cygnal::Buffer(qhead->bodysize));
    if (qhead->bodysize > RTMP_VIDEO_PACKET_SIZE) {
        log_network("De chunkifying the NetConnection packet.");
        int nbytes = 0;
        while (nbytes < qhead->bodysize) {
            size_t chunk = RTMP_VIDEO_PACKET_SIZE;
            if ((qhead->bodysize - nbytes) < RTMP_VIDEO_PACKET_SIZE) {
                chunk = qhead->bodysize - nbytes;
            }
            newptr->append(ptr + nbytes, chunk);
            nbytes += chunk + 1;
        }
    } else {
        newptr->copy(ptr, qhead->bodysize);
    }

    // extract the body of the message from the packet
    _netconnect = RTMP::decodeMsgBody(newptr->begin(), qhead->bodysize);
    if (!_netconnect) {
        log_error("failed to read the body of the handshake data from the client.");
        return tcurl;           // nc is empty
    } else {
        log_network("Read handshake data body from the client.");
    }

    // make sure this is actually a NetConnection packet.
    if (_netconnect->getMethodName() != "connect") {
        log_error("Didn't receive NetConnection object in handshake!");
        return tcurl;           // nc is empty
    } else {
        log_network("Got NetConnection ::connect() INVOKE.");
        _netconnect->dump();    // FIXME: debug crap
    }
    
    // Get the data for the fields we want.
    tcurl  = _netconnect->findProperty("tcUrl");
    swfurl  = _netconnect->findProperty("swfUrl");
    encoding  = _netconnect->findProperty("objectEncoding");

    // based on the Red5 tests, I see two behaviours with this next
    // packet. If only gets sent when the "objectEncoding" field of
    // the NetConnection object is in the initial packet. When this is
    // supplied, it's more remoting than streaming, so sending this
    // causes Async I/O errors in the client.
    if (!encoding) {
        // Send a onBWDone to the client to start the new NetConnection,
        boost::shared_ptr<cygnal::Buffer> bwdone = encodeBWDone(2.0);
        if (RTMP::sendMsg(fd, qhead->channel, RTMP::HEADER_8,
                          bwdone->size(), RTMP::INVOKE, RTMPMsg::FROM_SERVER, *bwdone)) {
            log_network("Sent onBWDone to client");
        } else {
            log_error("Couldn't send onBWDone to client!");
            tcurl.reset();
            return tcurl;               // nc is empty
        }
    }
    
    // Send a Set Client Window Size to the client
    boost::shared_ptr<cygnal::Buffer> winsize(new cygnal::Buffer(sizeof(boost::uint32_t)));
    boost::uint32_t swapped = 0x20000;
    swapBytes(&swapped, sizeof(boost::uint32_t));
    *winsize += swapped;
    if (RTMP::sendMsg(fd, RTMP_SYSTEM_CHANNEL, RTMP::HEADER_12,
                      winsize->size(), RTMP::WINDOW_SIZE, RTMPMsg::FROM_CLIENT, *winsize)) {
        log_network("Sent set Client Window Size to client");
    } else {
        log_error("Couldn't send set Client Window Size to client!");
        tcurl.reset();
        return tcurl;           // nc is empty
    }

    // Send a ping to the client to reset the new NetConnection,
    boost::shared_ptr<cygnal::Buffer> ping_reset =
        encodePing(RTMP::PING_RESET, 0);
    if (RTMP::sendMsg(fd, RTMP_SYSTEM_CHANNEL, RTMP::HEADER_8,
                      ping_reset->size(), RTMP::USER, RTMPMsg::FROM_SERVER, *ping_reset)) {
        log_network("Sent Ping to client");
    } else {
        log_error("Couldn't send Ping to client!");
        tcurl.reset();
        return tcurl;           // nc is empty
    }

    // Send the packet to notify the client that the
    // NetConnection::connect() was sucessful. After the client
    // receives this, the handhsake is completed.
    boost::shared_ptr<cygnal::Buffer> response =
        encodeResult(RTMPMsg::NC_CONNECT_SUCCESS);
    if (RTMP::sendMsg(fd, 3, RTMP::HEADER_8, response->allocated(),
                      RTMP::INVOKE, RTMPMsg::FROM_SERVER, *response)) {
        log_network("Sent response to client.");
    } else {
        log_error("Couldn't send response to client!");
        tcurl.reset();
        return tcurl;           // nc is empty
    }

    return tcurl;
}

// The response is the gibberish sent back twice, preceeded by a byte
// with the value of 0x3. We have to very carefully send the handshake
// in one big packet as doing otherwise seems to cause subtle timing
// problems with the Adobe player. This way it connects every time.
bool
RTMPServer::handShakeResponse(int fd, cygnal::Buffer &handshake)
{
    GNASH_REPORT_FUNCTION;

    boost::uint8_t byte;
    byte = RTMP_VERSION;

    // the response handshake is twice the size of the one we just
    // received for a total of 3072 bytes, plus room for the version.
    boost::scoped_ptr<cygnal::Buffer> zeros(new cygnal::Buffer(RTMP_HANDSHAKE_SIZE*2
                                         + RTMP_HANDSHAKE_VERSION_SIZE));
    zeros->clear();             // set entire buffer to zeros

    boost::uint8_t *ptr = zeros->reference();

    // the first byte of the handshake response is the RTMP version
    // number.
    *ptr =  RTMP_VERSION;

    // the first half we make all zeros, as it doesn't appear to be
    // used for anything. More data is the second half of the
    // response.
    zeros->setSeekPointer(ptr + RTMP_HANDSHAKE_VERSION_SIZE + 
                          RTMP_HANDSHAKE_SIZE);

    // the handhshake has a two field header, which appears to be
    // timestamp, followed by another field that appears to be another
    // timestamp or version number, which is probably ignored.
    // the first field of the header is the timestamp
    boost::uint32_t timestamp;
    // Get the timestamp of when this message was read
    timestamp = RTMP::getTime();
    *zeros += timestamp;

    // the second field is always zero
    boost::uint32_t pad = 0;
    *zeros += pad;

    // the data starts after the vesion and header bytes
    size_t offset = RTMP_HANDSHAKE_VERSION_SIZE + RTMP_HANDSHAKE_HEADER_SIZE;

    // add the handshake data, which is 1528 byte of random stuff.
    zeros->append(handshake.reference() + offset, RTMP_RANDOM_SIZE);

    // send the handshake to the client
    size_t ret = writeNet(fd, *zeros);
    
    if (ret == zeros->allocated()) {
        log_network("Sent RTMP Handshake response at %d", timestamp);
    } else {
        log_error("Couldn't sent RTMP Handshake response at %d!", timestamp);
    }

    return true;    
}

boost::shared_ptr<cygnal::Buffer>
RTMPServer::serverFinish(int fd, cygnal::Buffer &handshake1, cygnal::Buffer &handshake2)
{
    GNASH_REPORT_FUNCTION;
    boost::shared_ptr<cygnal::Buffer> buf;

    // sanity check our input data. We do this seperately as an empty
    // buffer means data wasn't read correctly from the network. We
    // should never get this far with bad data, but when it comes to
    // network programming, a little caution is always good.
    if (handshake1.empty()) {
        log_error("No data in original handshake buffer.");
        return buf;             // return empty buffer
    }
    if (handshake2.empty()) {
        log_error("No data in response handshake buffer.");
        return buf;             // return empty buffer
    }

    // the first field of the header is the timestamp of the original
    // packet sent by this server.
    boost::uint32_t timestamp1 = *reinterpret_cast<boost::uint32_t *>
        (handshake1.reference() + RTMP_HANDSHAKE_VERSION_SIZE);

    // the second field of the header is the timestamp of the previous
    // packet sent by this server.
    boost::uint32_t timestamp2 = *reinterpret_cast<boost::uint32_t *>
        (handshake1.reference() + RTMP_HANDSHAKE_VERSION_SIZE + sizeof(boost::uint32_t));

    log_network("The timestamp delta is %d", timestamp2 - timestamp1);

    // This is the location in the second handshake to the random data
    // block used in the handshake.
    size_t pkt_size = RTMP_HANDSHAKE_VERSION_SIZE + RTMP_HANDSHAKE_SIZE;
    // the handshakes are supposed to match.
    int diff = std::memcmp(handshake1.begin()
           + RTMP_HANDSHAKE_VERSION_SIZE + RTMP_HANDSHAKE_HEADER_SIZE,
                           handshake2.begin()
           + pkt_size + RTMP_HANDSHAKE_HEADER_SIZE,
                           RTMP_RANDOM_SIZE);
    if (diff <= 1) {
        log_network (_("Handshake Finish Data matched"));
    } else {
        log_error (_("Handshake Finish Data didn't match by %d bytes"), diff);
//      return buf;             // return empty buffer
    }

    // Copy the extra data from the end of the handshake to the new
    // buffer. Normally we  try to avoid copying anything around, but
    // as this is only used once for each connection, there isn't a
    // real performance hit from it.
    size_t amf_size = handshake2.allocated() - pkt_size;
    if (handshake2.allocated() >= pkt_size) {
        log_network("Got AMF data in handshake, %d bytes for fd #%d",
                    amf_size, fd);
        buf.reset(new Buffer(amf_size));
        // populate the buffer with the AMF data
        boost::uint8_t *ptr = handshake2.reference() + RTMP_HANDSHAKE_SIZE;
        buf->copy(ptr, amf_size);
    }
    
    return buf;
}

bool
RTMPServer::packetSend(cygnal::Buffer &/* buf */)
 {
    GNASH_REPORT_FUNCTION;
    return false;
}

// This overrides using same method from the base RTMP class.
bool
RTMPServer::packetRead(cygnal::Buffer &buf)
{
    GNASH_REPORT_FUNCTION;

    boost::uint8_t amf_index, headersize;
    boost::uint8_t *ptr = buf.reference();
    AMF amf;
    
    if (ptr == 0) {
        return false;
    }

//     cerr << "FIXME3: " << buf.hexify(true) << endl;
    
//    ptr += 1;                 // skip past the header byte
    
    amf_index = *ptr & RTMP_INDEX_MASK;
    headersize = headerSize(*ptr);
    log_network (_("The Header size is: %d"), headersize);
    log_network (_("The AMF index is: 0x%x"), amf_index);

//     if (headersize > 1) {
//      packetsize = parseHeader(ptr);
//         if (packetsize) {
//             log_network (_("Read first RTMP packet header of size %d"), packetsize);
//         } else {
//             log_error (_("Couldn't read first RTMP packet header"));
//             return false;
//         }
//     }

// #if 1
//     boost::uint8_t *end = buf->remove(0xc3);
// #else
//     boost::uint8_t *end = buf->find(0xc3);
//     log_network("END is %x", (void *)end);
//     *end = '*';
// #endif
    decodeHeader(ptr);
    ptr += headersize;

    boost::uint8_t* tooFar = ptr+300+sizeof(int); // FIXME:
    
    AMF amf_obj;
    boost::shared_ptr<cygnal::Element> el1 = amf_obj.extractAMF(ptr, tooFar);
    ptr += amf_obj.totalsize();
    boost::shared_ptr<cygnal::Element> el2 = amf_obj.extractAMF(ptr, tooFar);

    int size = 0;
    boost::shared_ptr<cygnal::Element> el;
    while ( size < static_cast<boost::uint16_t>(_header.bodysize) - 24 ) {
        if (ptr) {
            el = amf_obj.extractProperty(ptr, tooFar);
            if (el != 0) {
                size += amf_obj.totalsize();
                ptr += amf_obj.totalsize();
//              _properties[el->getName()] = el;
            } else {
                break;
            }
//              log_network("Bodysize is: %d size is: %d for %s", _total_size, size, el->getName());
        } else {
            break;
        }
    }
    
# if 0
    Element el;
    ptr = amf.extractElement(&el, ptr);
    el.dump();
    ptr = amf.extractElement(&el, ptr) + 1;
    el.dump();
    log_network (_("Reading AMF packets till we're done..."));
    // buf->dump();
    while (ptr < end) {
        boost::shared_ptr<cygnal::Element> el(new cygnal::Element);
        ptr = amf.extractProperty(el, ptr);
        addProperty(el);
        // el->dump();
    }
    ptr += 1;
    size_t actual_size = _total_size - RTMP_HEADER_SIZE;
    log_network("Total size in header is %d, buffer size is: %d", _total_size, buf->size());
//    buf->dump();
    if (buf->size() < actual_size) {
        log_network("FIXME: MERGING");
        buf = _que->merge(buf);
    }
    while ((ptr - buf->begin()) < static_cast<int>(actual_size)) {
        boost::shared_ptr<cygnal::Element> el(new cygnal::Element);
        if (ptr) {
            ptr = amf.extractProperty(el, ptr);
            addProperty(el);
        } else {
            return true;
        }
        el->dump();             // FIXME: dump the AMF objects as they are read in
    }
    
    RTMPproto::dump();
#endif
    switch(_header.type) {
      case CHUNK_SIZE:
          decodeChunkSize();
          break;
      case BYTES_READ:
          decodeBytesRead();
          break;
      case USER:
      {
          boost::shared_ptr<rtmp_ping_t> ping = decodePing(ptr);
          switch (ping->type) {
            case PING_CLEAR:
                break;
            case PING_PLAY:
                break;
            case PING_TIME:
                break;
            case PING_RESET:
                break;
            case PING_CLIENT:
                break;
            case PONG_CLIENT:
                break;
            default:
                return 0;
                break;
          };
          break;
      }
      case WINDOW_SIZE:
          decodeServer();
          break;
      case SET_BANDWITH:
          decodeClient();
          break;
      case ROUTE:
          log_unimpl("Route");
          break;
      case AUDIO_DATA:    
          decodeAudioData();
          break;
      case VIDEO_DATA:
          decodeVideoData();
          break;
      case SHARED_OBJ:
          decodeSharedObj();
          break;
      case AMF3_NOTIFY:
          log_unimpl("AMF3 Notify");
          break;
      case AMF3_SHARED_OBJ:
          log_unimpl("AMF3 Shared Object");
          break;
      case AMF3_INVOKE:
          log_unimpl("AMF3 Invoke");
          break;
      case NOTIFY:
          decodeNotify();
          break;
      case INVOKE:
          decodeInvoke();
          break;
      case FLV_DATA:
          log_unimpl("FLV Dat");
          break;
      default:
          log_error (_("ERROR: Unidentified RTMP message content type 0x%x"), _header.type);
          break;
    };

    return true;
}

// A result packet looks like this:
// 
// 03 00 00 00 00 00 81 14 00 00 00 00 02 00 07 5f   ..............._
// 72 65 73 75 6c 74 00 3f f0 00 00 00 00 00 00 05   result.?........
// 03 00 0b 61 70 70 6c 69 63 61 74 69 6f 6e 05 00   ...application..
// 05 6c 65 76 65 6c 02 00 06 73 74 61 74 75 73 00   .level...status.
// 0b 64 65 73 63 72 69 70 74 69 6f 6e 02 00 15 43   .description...C
// 6f 6e 6e 65 63 74 69 6f 6e 20 73 75 63 63 65 65   onnection succee
// 64 65 64 2e 00 04 63 6f 64 65 02 00 1d 4e 65 74   ded...code...Net
// 43 6f 6e 6e 65 63 74 69 6f 6e 2e 43 6f 6e 6e 65   Connection.Conne
// 63 74 2e 53 75 63 63 65 73 73 00 00 c3 09         ct.Success....
//
// _result(double ClientStream, NULL, double ServerStream)
// These are handlers for the various types
boost::shared_ptr<Buffer>
RTMPServer::encodeResult(RTMPMsg::rtmp_status_e status)
{
//    GNASH_REPORT_FUNCTION;
    return encodeResult(status, _filespec, _streamid);
}
    
boost::shared_ptr<cygnal::Buffer> 
RTMPServer::encodeResult(gnash::RTMPMsg::rtmp_status_e status, const std::string &filename)
{
//    GNASH_REPORT_FUNCTION;
    double clientid = 0.0;
    return encodeResult(status, filename, _streamid, clientid);
}

boost::shared_ptr<cygnal::Buffer> 
RTMPServer::encodeResult(gnash::RTMPMsg::rtmp_status_e status, const std::string &filename, double &clientid)
{
//    GNASH_REPORT_FUNCTION;
    return encodeResult(status, filename, _streamid, clientid);
}

boost::shared_ptr<cygnal::Buffer> 
RTMPServer::encodeResult(gnash::RTMPMsg::rtmp_status_e status, double &transid)
{
//    GNASH_REPORT_FUNCTION;
    double clientid = 0.0;
    return encodeResult(status, "", transid, clientid);
}

boost::shared_ptr<cygnal::Buffer> 
RTMPServer::encodeResult(gnash::RTMPMsg::rtmp_status_e status, const std::string &filename, double &transid, double &clientid)
{
//    GNASH_REPORT_FUNCTION;
//    Buffer *buf = new Buffer;
//     boost::uint8_t *ptr = buf->reference();
//     buf->clear();            // default everything to zeros, real data gets optionally added.
//    ptr += sizeof(boost::uint16_t); // go past the first short
//     const char *capabilities = 0;
//     const char *description = 0;
//     const char *code = 0;
//     const char *status = 0;
    bool notobject = false;

    Element *str = new Element;
    str->makeString("_result");

    Element *number = new Element;
    // add the transaction ID
    number->makeNumber(transid);

    Element top;
//    top.makeObject("application");
    top.makeObject();
    
    switch (status) {
      case RTMPMsg::APP_GC:
      case RTMPMsg::APP_RESOURCE_LOWMEMORY:
      case RTMPMsg::APP_SCRIPT_ERROR:
      case RTMPMsg::APP_SCRIPT_WARNING:
      case RTMPMsg::APP_SHUTDOWN:
      case RTMPMsg::NC_CALL_BADVERSION:
      case RTMPMsg::NC_CALL_FAILED:
//        status = 0;
//        code = "NetConnection.Call.Failed";
      case RTMPMsg::NC_CONNECT_APPSHUTDOWN:
      case RTMPMsg::NC_CONNECT_CLOSED:
      case RTMPMsg::NC_CONNECT_FAILED:
      {
//        errstr = new Element;
//        errstr->makeString("error");
          boost::shared_ptr<cygnal::Element> level(new Element);
          level->makeString("level", "error");
          top.addProperty(level);

          boost::shared_ptr<cygnal::Element> description(new Element);
          description->makeString("description", "Connection Failed.");
          top.addProperty(description);
          
          boost::shared_ptr<cygnal::Element> code(new Element);
          code->makeString("code", "Connection.Connect.Failed");
          top.addProperty(code);
      }
      case RTMPMsg::NC_CONNECT_INVALID_APPLICATION:
      case RTMPMsg::NC_CONNECT_REJECTED:
      {
//        delete str;
//        str = new Element;
//        str->makeString("error");
          boost::shared_ptr<cygnal::Element> level(new Element);
          level->makeString("level", "error");
          top.addProperty(level);

          boost::shared_ptr<cygnal::Element> description(new Element);
          description->makeString("description", "Connection Rejected.");
          top.addProperty(description);
          
          boost::shared_ptr<cygnal::Element> code(new Element);
          code->makeString("code", "NetConnection.Connect.Rejected");
          top.addProperty(code);
      }
      case RTMPMsg::NC_CONNECT_SUCCESS:
      {
          boost::shared_ptr<cygnal::Element> level(new Element);
          level->makeString("level", "status");
          top.addProperty(level);
          
          boost::shared_ptr<cygnal::Element> code(new Element);
          code->makeString("code", "NetConnection.Connect.Success");
          top.addProperty(code);

          boost::shared_ptr<cygnal::Element> description(new Element);
          description->makeString("description", "Connection succeeded.");
          top.addProperty(description);
      }
      break;
      case RTMPMsg::NS_CLEAR_FAILED:
      case RTMPMsg::NS_CLEAR_SUCCESS:
          // After a successful NetConnection, we get a
          // NetStream::createStream.
      case RTMPMsg::NS_DATA_START:
      case RTMPMsg::NS_FAILED:
      case RTMPMsg::NS_INVALID_ARGUMENT:
          // The response to a successful pauseStream command is this
          // message.
      case RTMPMsg::NS_PAUSE_NOTIFY:
      {
          str->makeString("onStatus");

          boost::shared_ptr<cygnal::Element> level(new Element);
          level->makeString("level", "status");
          top.addProperty(level);

          boost::shared_ptr<cygnal::Element> code(new Element);
          code->makeString("code", "NetStream.Pause.Notify");
          top.addProperty(code);

          boost::shared_ptr<cygnal::Element> description(new Element);
          string field = "Pausing ";
          if (!filename.empty()) {
              field += filename;
          }
          description->makeString("description", field);
          top.addProperty(description);
          
          boost::shared_ptr<cygnal::Element> details(new Element);
          details->makeString("details", filename);
          top.addProperty(details);
  
          boost::shared_ptr<cygnal::Element> cid(new Element);
          cid->makeNumber("clientid", clientid);
          top.addProperty(cid);

          break;
      }
      case RTMPMsg::NS_PLAY_COMPLETE:
      case RTMPMsg::NS_PLAY_FAILED:
      case RTMPMsg::NS_PLAY_FILE_STRUCTURE_INVALID:
      case RTMPMsg::NS_PLAY_INSUFFICIENT_BW:
      case RTMPMsg::NS_PLAY_NO_SUPPORTED_TRACK_FOUND:
      case RTMPMsg::NS_PLAY_PUBLISHNOTIFY:
          break;
      // Reset the stream. We also do this after receiving a
      // NetStream::createStream() packet
      case RTMPMsg::NS_PLAY_RESET:
      {
          str->makeString("onStatus");
//        "clientid"
          boost::shared_ptr<cygnal::Element> level(new Element);
          level->makeString("level", "status");
          top.addProperty(level);

          boost::shared_ptr<cygnal::Element> code(new Element);
          code->makeString("code", "NetStream.Play.Reset");
          top.addProperty(code);

          boost::shared_ptr<cygnal::Element> description(new Element);
          string field = "Playing and resetting ";
          if (!filename.empty()) {
              field += filename;
          }
          description->makeString("description", field);
          top.addProperty(description);
          
          boost::shared_ptr<cygnal::Element> details(new Element);
          details->makeString("details", filename);
          top.addProperty(details);
          
          boost::shared_ptr<cygnal::Element> cid(new Element);
#ifdef CLIENT_ID_NUMERIC
          double clientid = createClientID();
          cid->makeNumber("clientid", clientid);
#else
          string clientid;
          if (!_clientids[transid].empty()) {
              clientid =_clientids[transid].c_str();
          } else {
              clientid = createClientID();
              _clientids[transid] = clientid;
          }
          cid->makeString("clientid", _clientids[transid]);
#endif
          top.addProperty(cid);

          break;
      }
      case RTMPMsg::NS_PLAY_START:
      {
          str->makeString("onStatus");

          boost::shared_ptr<cygnal::Element> level(new Element);
          level->makeString("level", "status");
          top.addProperty(level);

          boost::shared_ptr<cygnal::Element> code(new Element);
          code->makeString("code", "NetStream.Play.Start");
          top.addProperty(code);

          boost::shared_ptr<cygnal::Element> description(new Element);
          string field = "Started playing ";
          if (!filename.empty()) {
              field += filename;
          }
          description->makeString("description", field);
          top.addProperty(description);
          
          boost::shared_ptr<cygnal::Element> details(new Element);
          details->makeString("details", filename);
          top.addProperty(details);
  
          boost::shared_ptr<cygnal::Element> cid(new Element);
#ifdef CLIENT_ID_NUMERIC
          double clientid = createClientID();
          cid->makeNumber("clientid", clientid);
#else
          string clientid;
          if (!_clientids[transid].empty()) {
              clientid =_clientids[transid].c_str();
          } else {
              clientid = createClientID();
              _clientids[transid] = clientid;
          }
          cid->makeString("clientid", _clientids[transid]);
#endif
          top.addProperty(cid);

          break;
      }
      case RTMPMsg::NS_PLAY_STOP:
      case RTMPMsg::NS_PLAY_STREAMNOTFOUND:
      {
          boost::shared_ptr<cygnal::Element> level(new Element);
          level->makeString("level", "error");
          top.addProperty(level);

          boost::shared_ptr<cygnal::Element> description(new Element);
          description->makeString("description", "NetStream.Play.StreamNotFound.");
          top.addProperty(description);
          
          boost::shared_ptr<cygnal::Element> code(new Element);
          code->makeString("code", "NetStream.Play.StreamNotFound");
          top.addProperty(code);
          break;
      }
      case RTMPMsg::NS_PLAY_SWITCH:
      case RTMPMsg::NS_PLAY_UNPUBLISHNOTIFY:
      case RTMPMsg::NS_PUBLISH_BADNAME:
      case RTMPMsg::NS_PUBLISH_START:
      case RTMPMsg::NS_RECORD_FAILED:
      case RTMPMsg::NS_RECORD_NOACCESS:
      case RTMPMsg::NS_RECORD_START:
      case RTMPMsg::NS_RECORD_STOP:
          // The reponse to a failed seekStream is this message.
      case RTMPMsg::NS_SEEK_FAILED:
          // The reponse to a successful seekStream is this message.
      case RTMPMsg::NS_SEEK_NOTIFY:
          break;
          // The response to a successful pauseStream command is this
          // message when the stream is started again.
      case RTMPMsg::NS_UNPAUSE_NOTIFY:
      case RTMPMsg::NS_UNPUBLISHED_SUCCESS:
      case RTMPMsg::SO_CREATION_FAILED:
      case RTMPMsg::SO_NO_READ_ACCESS:
      case RTMPMsg::SO_NO_WRITE_ACCESS:
      case RTMPMsg::SO_PERSISTENCE_MISMATCH:
          break;
          // The response for a createStream message is the
          // transaction ID, followed by the command object (usually a
          // NULL object), and the Stream ID. The Stream ID is just a
          // simple incrementing counter of streams.
      case RTMPMsg::NS_CREATE_STREAM:
      {
          // Don't encode as an object, just the properties
          notobject = true;

          boost::shared_ptr<cygnal::Element> id2(new Element);
          
          double sid = createStreamID();
          id2->makeNumber(sid);
          top.addProperty(id2);
          
          break;
      }
      // There is no response to a deleteStream request.
      case RTMPMsg::NS_DELETE_STREAM:
      default:
          break;
    };
    
    boost::shared_ptr<cygnal::Buffer> strbuf = str->encode();
    boost::shared_ptr<cygnal::Buffer> numbuf = number->encode();
    boost::shared_ptr<cygnal::Buffer> topbuf = top.encode(notobject);

    boost::shared_ptr<cygnal::Buffer> buf(new Buffer(strbuf->size() + numbuf->size() + topbuf->size()));
    *buf += strbuf;
    *buf += numbuf;
    boost::uint8_t byte = static_cast<boost::uint8_t>(RTMP::WINDOW_SIZE & 0x000000ff);
    *buf += byte;
    *buf += topbuf;

    delete str;
    delete number;
    
    return buf;
}
// A Ping packet has two parameters that ae always specified, and 2 that are optional.
// The first two bytes are the ping type, as in rtmp_ping_e, the second is the ping
// target, which is always zero as far as we can tell.
//
// More notes from: http://jira.red5.org/confluence/display/docs/Ping
// type 0: Clear the stream. No third and fourth parameters. The second parameter could be 0.
// After the connection is established, a Ping 0,0 will be sent from server to client. The
// message will also be sent to client on the start of Play and in response of a Seek or
// Pause/Resume request. This Ping tells client to re-calibrate the clock with the timestamp
// of the next packet server sends.
// type 1: Tell the stream to clear the playing buffer.
// type 3: Buffer time of the client. The third parameter is the buffer time in millisecond.
// type 4: Reset a stream. Used together with type 0 in the case of VOD. Often sent before type 0.
// type 6: Ping the client from server. The second parameter is the current time.
// type 7: Pong reply from client. The second parameter is the time the server sent with his
//         ping request.

// A RTMP Ping packet looks like this: "02 00 00 00 00 00 06 04 00 00 00 00 00 00 00 00 00 0",
// which is the Ping type byte, followed by two shorts that are the parameters. Only the first
// two paramters are required.
boost::shared_ptr<Buffer>
RTMPServer::encodePing(rtmp_ping_e type)
{
//    GNASH_REPORT_FUNCTION;
    return encodePing(type, 0);
}

boost::shared_ptr<Buffer>
RTMPServer::encodePing(rtmp_ping_e type, boost::uint32_t milliseconds)
{
//    GNASH_REPORT_FUNCTION;

    // An encoded ping message 
    boost::shared_ptr<cygnal::Buffer> buf(new Buffer(sizeof(boost::uint16_t) * 3));
//    boost::uint8_t *ptr = buf->reference();

    // Set the type of this ping message
    boost::uint16_t typefield = htons(type);
    *buf = typefield;
    
//     // go past the first short, which is the type field
//    ptr += sizeof(boost::uint16_t);

    boost::uint32_t swapped = 0;
    switch (type) {
        // These two don't appear to have any paramaters
      case PING_CLEAR:
      case PING_PLAY:
          break;
          // the third parameter is the buffer time in milliseconds
      case PING_TIME:
      {
//        ptr += sizeof(boost::uint16_t); // go past the second short
          swapped = milliseconds;
          swapBytes(&swapped, sizeof(boost::uint32_t));
          *buf += swapped;
          break;
      }
      // reset doesn't have any parameters but zeros
      case PING_RESET:
      {
          boost::uint16_t zero = 0;
          *buf += zero;
          *buf += zero;
          break;
      }
      // For Ping and Pong, the second parameter is always the milliseconds
      case PING_CLIENT:
      case PONG_CLIENT:
      {
//        swapped = htonl(milliseconds);
          swapped = milliseconds;
          swapBytes(&swapped, sizeof(boost::uint32_t));
          *buf += swapped;
          break;
      }
      default:
          break;
    };
    
    // Manually adjust the seek pointer since we added the data by
    // walking ou own temporary pointer, so none of the regular ways
    // of setting the seek pointer are appropriate.
//    buf->setSeekPointer(buf->reference() + buf->size());
    
    return buf;
}

// Encode a onBWDone message for the client. These are of a fixed size.
boost::shared_ptr<cygnal::Buffer>
RTMPServer::encodeBWDone(double id)
{
//    GNASH_REPORT_FUNCTION;
    string command = "onBWDone";

    Element cmd;
    cmd.makeString(command);

    Element num;
    num.makeNumber(id);

    Element null;
    null.makeNull();

    boost::shared_ptr<cygnal::Buffer> enccmd  = cmd.encode();
    boost::shared_ptr<cygnal::Buffer> encnum  = num.encode();
    boost::shared_ptr<cygnal::Buffer> encnull  = null.encode();

    boost::shared_ptr<cygnal::Buffer> buf(new cygnal::Buffer(enccmd->size()
                                                       + encnum->size()
                                                       + encnull->size()));

    *buf += enccmd;
    *buf += encnum;
    *buf += encnull;
        
    return buf;
}

boost::shared_ptr<cygnal::Buffer>
RTMPServer::encodeAudio(boost::uint8_t *data, size_t size)
{
    GNASH_REPORT_FUNCTION;
    
    boost::shared_ptr<cygnal::Buffer> buf;
    
    if (size) {
        if (data) {
            buf.reset(new cygnal::Buffer(size));
            buf->copy(data, size);
        }
    }

    return buf;
}

boost::shared_ptr<cygnal::Buffer>
RTMPServer::encodeVideo(boost::uint8_t *data, size_t size)
{
    GNASH_REPORT_FUNCTION;
}

#if 0
// Parse an Echo Request message coming from the Red5 echo_test. This
// method should only be used for testing purposes.
vector<boost::shared_ptr<cygnal::Element > >
RTMPServer::parseEchoRequest(boost::uint8_t *ptr, size_t size)
{
//    GNASH_REPORT_FUNCTION;
    AMF amf;
    vector<boost::shared_ptr<cygnal::Element > > headers;

    // The first element is the name of the test, 'echo'
    boost::shared_ptr<cygnal::Element> el1 = amf.extractAMF(ptr, ptr+size);
    ptr += amf.totalsize();
    headers.push_back(el1);

    // The second element is the number of the test,
    boost::shared_ptr<cygnal::Element> el2 = amf.extractAMF(ptr, ptr+size);
    ptr += amf.totalsize();
    headers.push_back(el2);

    // This one has always been a NULL object from my tests
    boost::shared_ptr<cygnal::Element> el3 = amf.extractAMF(ptr, ptr+size);
    ptr += amf.totalsize();
    headers.push_back(el3);

    // This one has always been an NULL or Undefined object from my tests
    boost::shared_ptr<cygnal::Element> el4 = amf.extractAMF(ptr, ptr+size);
    if (!el4) {
        log_error("Couldn't reliably extract the echo data!");
    }
    ptr += amf.totalsize();
    headers.push_back(el4);
    
    return headers;
}

// format a response to the 'echo' test used for testing Gnash. This
// is only used for testing by developers. The format appears to be
// a string '_result', followed by the number of the test, and then two
// NULL objects.
boost::shared_ptr<cygnal::Buffer>
RTMPServer::formatEchoResponse(double num, cygnal::Element &el)
{
//    GNASH_REPORT_FUNCTION;
    boost::shared_ptr<cygnal::Buffer> data = amf::AMF::encodeElement(el);
    return formatEchoResponse(num, data->reference(), data->allocated());
}

boost::shared_ptr<cygnal::Buffer>
RTMPServer::formatEchoResponse(double num, cygnal::Buffer &data)
{
//    GNASH_REPORT_FUNCTION;
    return formatEchoResponse(num, data.reference(), data.allocated());
}

boost::shared_ptr<cygnal::Buffer>
RTMPServer::formatEchoResponse(double num, boost::uint8_t *data, size_t size)
{
//    GNASH_REPORT_FUNCTION;

    string result = "_result";
    Element echo;
    echo.makeString(result);

    Element index;
    index.makeNumber(num);

    Element null;
    null.makeNull();

    boost::shared_ptr<cygnal::Buffer> encecho = echo.encode();
    boost::shared_ptr<cygnal::Buffer> encidx  = index.encode();   
    boost::shared_ptr<cygnal::Buffer> encnull  = null.encode();   

    boost::shared_ptr<cygnal::Buffer> buf(new cygnal::Buffer(encecho->size()
                                                       + encidx->size()
                                                       + encnull->size() + size));

    *buf = encecho;
    *buf += encidx;
    *buf += encnull;
    buf->append(data, size);

    return buf;
}
#endif

// Create a new client ID, which appears to be a random double,
// although I also see a temporary 8 character string used often as
// well.
#ifdef CLIENT_ID_NUMERIC
double 
RTMPServer::createClientID()
{
//    GNASH_REPORT_FUNCTION;
    
    boost::mt19937 seed;
    // Pick the number of errors to create based on the Buffer's data size
    boost::uniform_real<> numbers(1, 65535);

    double id = numbers(seed);
    _clientids.push_back(id);

    return id;
}
#else
std::string
RTMPServer::createClientID()
{
//    GNASH_REPORT_FUNCTION;
    string id;

// FIXME: This turns out to be a crappy random number generator,
    // and should be replaced with something less repititous.
#if 0
    boost::mt19937 seed;
    for (size_t i=0; i < 8; i++) {
        boost::uniform_int<> numbers(0x30, 0x7a);
        id += numbers(seed);
    }
#else
    char letters[] =
        "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
    boost::uint64_t random_time_bits = 0;
    boost::uint64_t value = 0;
# ifdef HAVE_GETTIMEOFDAY
    timeval tv;
    gettimeofday(&tv, NULL);
    random_time_bits = ((uint64_t)tv.tv_usec << 16) ^ tv.tv_sec;
# else
    random_time_bits = time(NULL);
# endif
    value += random_time_bits ^ getpid();
    boost::uint64_t v = value; 
    id = letters[v % 62];
    v /= 62;
    id += letters[v % 62];
    v /= 62;
    id += letters[v % 62];
    v /= 62;
    id += letters[v % 62];
    v /= 62;
    id += letters[v % 62];
    v /= 62;
    id += letters[v % 62];
    v /= 62;
    id += letters[v % 62];
    v /= 62;
#endif
    
    return id;
}
#endif

// Get the next streamID
double 
RTMPServer::createStreamID()
{
//    GNASH_REPORT_FUNCTION;
    return _streamid++;
}

bool
RTMPServer::sendFile(int fd, const std::string &filespec)
{
    GNASH_REPORT_FUNCTION;
    // See if the file is in the cache and already opened.
    boost::shared_ptr<DiskStream> filestream(cache.findFile(filespec));
    if (filestream) {
        cerr << "FIXME: found file in cache!" << endl;
    } else {
        filestream.reset(new DiskStream);
//          cerr << "New Filestream at 0x" << hex << filestream.get() << endl;
        
//          cache.addFile(url, filestream);     FIXME: always reload from disk for now.
        
        // Open the file and read the first chunk into memory
        if (!filestream->open(filespec)) {
            return false;
        } else {
            // Get the file size for the HTTP header
            if (filestream->getFileType() == DiskStream::FILETYPE_NONE) {
                return false;
            } else {
                cache.addPath(filespec, filestream->getFilespec());
            }
        }
    }
    
    size_t filesize = filestream->getFileSize();
    size_t bytes_read = 0;
    int ret = 0;
    size_t page = 0;
    if (filesize) {
#ifdef USE_STATS_CACHE
        struct timespec start;
        clock_gettime (CLOCK_REALTIME, &start);
#endif
        size_t getbytes = 0;
        if (filesize <= filestream->getPagesize()) {
            getbytes = filesize;
        } else {
            getbytes = filestream->getPagesize();
        }
        if (filesize >= CACHE_LIMIT) {
            if (sendMsg(fd, getChannel(), RTMP::HEADER_12, filesize,
                        RTMP::NOTIFY, RTMPMsg::FROM_SERVER, filestream->get(),
                        filesize)) {
            }
            do {
                filestream->loadToMem(page);
//              ret = writeNet(fd, filestream->get(), getbytes);
//              if (ret <= 0) {
//                  break;
//              }
                if (sendMsg(fd, getChannel(), RTMP::HEADER_4, filesize,
                            RTMP::NOTIFY, RTMPMsg::FROM_SERVER, filestream->get(),
                            getbytes)) {
                }
                bytes_read += ret;
                page += filestream->getPagesize();
            } while (bytes_read <= filesize);
        } else {
            filestream->loadToMem(filesize, 0);
//          ret = writeNet(fd, filestream->get(), filesize);
            if (sendMsg(fd, getChannel(), RTMP::HEADER_12, filesize,
                        RTMP::NOTIFY, RTMPMsg::FROM_SERVER, filestream->get()+24,
                        filesize-24)) {
            }
                                        
        }
        filestream->close();
#ifdef USE_STATS_CACHE
        struct timespec end;
        clock_gettime (CLOCK_REALTIME, &end);
        double time = (end.tv_sec - start.tv_sec) + ((end.tv_nsec - start.tv_nsec)/1e9);
        cerr << "File " << _filespec
             << " transferred " << filesize << " bytes in: " << fixed
             << time << " seconds for net fd #" << fd << endl;
#endif
    }    

    return true;
}

size_t
RTMPServer::sendToClient(std::vector<int> &fds, cygnal::Buffer &data)
{
//    GNASH_REPORT_FUNCTION;
    return sendToClient(fds, data.reference(), data.allocated());
}

size_t
RTMPServer::sendToClient(std::vector<int> &fds, boost::uint8_t *data,
                      size_t size)
{
//    GNASH_REPORT_FUNCTION;
    size_t ret = 0;
    
    std::vector<int>::iterator it;
    for (it=fds.begin(); it< fds.end(); it++) {
        ret = writeNet(data, size);
    }
    
    return ret;
}

// This is the thread for all incoming RTMP connections
bool
rtmp_handler(Network::thread_params_t *args)
{
    GNASH_REPORT_FUNCTION;

    Handler *hand = reinterpret_cast<Handler *>(args->handler);
    RTMPServer *rtmp = reinterpret_cast<RTMPServer *>(args->entry);
    // RTMPServer *rtmp = new RTMPServer;
    
    string docroot = args->filespec;
    string url, filespec;
    url = docroot;
    bool done = false;
    boost::shared_ptr<RTMPMsg> body;
    static bool initialize = true;
//     bool sendfile = false;
    log_network(_("Starting RTMP Handler for fd #%d, cgi-bin is \"%s\""),
                args->netfd, args->filespec);
    
#ifdef USE_STATISTICS
    struct timespec start;
    clock_gettime (CLOCK_REALTIME, &start);
#endif

    // Adjust the timeout
    rtmp->setTimeout(10);
    
    boost::shared_ptr<cygnal::Buffer>  pkt;
    boost::shared_ptr<cygnal::Element> tcurl;
    boost::shared_ptr<cygnal::Element> swfurl;
    boost::shared_ptr<cygnal::Buffer> response;

    // Keep track of the network statistics
    // See if we have any messages waiting. After the initial connect, this is
    // the main loop for processing messages.
    
    // Adjust the timeout
    rtmp->setTimeout(30);
//    boost::shared_ptr<cygnal::Buffer> buf;

    // If we have active disk streams, send those packets first.
    // 0 is a reserved stream, so we start with 1, as the reserved
    // stream isn't one we care about here.
    log_network("%d active disk streams", hand->getActiveDiskStreams());
    for (int i=1; i <= hand->getActiveDiskStreams(); i++) {
        hand->getDiskStream(i)->dump();
        if (hand->getDiskStream(i)->getState() == DiskStream::PLAY) {
            boost::uint8_t *ptr = hand->getDiskStream(i)->get();
            if (ptr) {
                if (rtmp->sendMsg(hand->getClient(i), 8,
                        RTMP::HEADER_8, 4096,
                        RTMP::NOTIFY, RTMPMsg::FROM_SERVER,
                        ptr, 4096)) {
                }
            } else {
                log_network("ERROR: No stream for client %d", i);
            }
        }
    }
    
    // This is the main message processing loop for rtmp. Most
    // messages received require a response.
    do {
        // If there is no data left from the previous chunk, process
        // that before reading more data.
        if (pkt != 0) {
            log_network("data left from previous packet");
        } else {
            pkt = rtmp->recvMsg(args->netfd);
        }
        
        if (pkt != 0) {
            boost::uint8_t *tmpptr = 0;
            if (pkt->allocated()) {
                boost::shared_ptr<RTMP::queues_t> que = rtmp->split(*pkt);
                if (!que) {
                    // FIXME: send _error result
                    return false;
                }
                boost::shared_ptr<RTMP::rtmp_head_t> qhead;
                for (size_t i=0; i<que->size(); i++) {
                    boost::shared_ptr<cygnal::Buffer> bufptr = que->at(i)->pop();
                    // que->at(i)->dump();
                    if (bufptr) {
                        // bufptr->dump();
                        qhead = rtmp->decodeHeader(bufptr->reference());
                        if (!qhead) {
                            return false;
                        }
                        // log_network("Message for channel #%d", qhead->channel);
                        tmpptr = bufptr->reference() + qhead->head_size;
                        if (qhead->channel == RTMP_SYSTEM_CHANNEL) {
                            if (qhead->type == RTMP::USER) {
                                boost::shared_ptr<RTMP::user_event_t> user
                                    = rtmp->decodeUserControl(tmpptr);
                                switch (user->type) {
                                  case RTMP::STREAM_START:
                                      log_unimpl("Stream Start");
                                      break;
                                  case RTMP::STREAM_EOF:
                                      log_unimpl("Stream EOF");
                                      break;
                                  case RTMP::STREAM_NODATA:
                                      log_unimpl("Stream No Data");
                                      break;
                                  case RTMP::STREAM_BUFFER:
                                      log_unimpl("Stream Set Buffer: %d", user->param2);
                                      break;
                                  case RTMP::STREAM_LIVE:
                                      log_unimpl("Stream Live");
                                      break;
                                  case RTMP::STREAM_PING:
                                  {
                                      boost::shared_ptr<RTMP::rtmp_ping_t> ping
                                          = rtmp->decodePing(tmpptr);
                                      log_network("Processed Ping message from client, type %d",
                                                  ping->type);
                                      break;
                                  }
                                  case RTMP::STREAM_PONG:
                                      log_unimpl("Stream Pong");
                                      break;
                                  default:
                                      break;
                                };
                            } else if (qhead->type == RTMP::AUDIO_DATA) {
                                log_network("Got the 1st Audio packet!");
                            } else if (qhead->type == RTMP::VIDEO_DATA) {
                                log_network("Got the 1st Video packet!");
                            } else if (qhead->type == RTMP::WINDOW_SIZE) {
                                log_network("Got the Window Set Size packet!");
                            } else {
                                log_network("Got unknown system message!");
                                bufptr->dump();
                            }
                        }
                    }
                    switch (qhead->type) {
                      case RTMP::CHUNK_SIZE:
                          log_unimpl("Set Chunk Size");
                          break;
                      case RTMP::BYTES_READ:
                          log_unimpl("Bytes Read");
                          break;
                      case RTMP::ABORT:
                      case RTMP::USER:
                          // already handled as this is a system channel message
                          return true;
                          break;
                      case RTMP::WINDOW_SIZE:
                          log_unimpl("Set Window Size");
                          break;
                      case RTMP::SET_BANDWITH:
                          log_unimpl("Set Bandwidth");
                          break;
                      case RTMP::ROUTE:
                      case RTMP::AUDIO_DATA:
                      case RTMP::VIDEO_DATA:
                      case RTMP::SHARED_OBJ:
                          body = rtmp->decodeMsgBody(tmpptr, qhead->bodysize);
                          log_network("SharedObject name is \"%s\"", body->getMethodName());
                          break;
                      case RTMP::AMF3_NOTIFY:
                          log_unimpl("RTMP type %d", qhead->type);
                          break;
                      case RTMP::AMF3_SHARED_OBJ:
                          log_unimpl("RTMP type %d", qhead->type);
                          break;
                      case RTMP::AMF3_INVOKE:
                          log_unimpl("RTMP type %d", qhead->type);
                          break;
                      case RTMP::NOTIFY:
                          log_unimpl("RTMP type %d", qhead->type);
                          break;
                      case RTMP::INVOKE:
                      {
                          body = rtmp->decodeMsgBody(tmpptr, qhead->bodysize);
                          if (!body) {
                              log_error("Error INVOKING method \"%s\"!", body->getMethodName());
                              continue;
                          }
                          log_network("INVOKEing method \"%s\"", body->getMethodName());
                          // log_network("%s", hexify(tmpptr, qhead->bodysize, true));
                          
                          // These next Invoke methods are for the
                          // NetStream class, which like NetConnection,
                          // is a speacial one handled directly by the
                          // server instead of any cgi-bin plugins.
                          double transid  = body->getTransactionID();
                          log_network("The Transaction ID from the client is: %g", transid);
                          if (body->getMethodName() == "createStream") {
                              hand->createStream(transid);
                              response = rtmp->encodeResult(RTMPMsg::NS_CREATE_STREAM, transid);
                              if (rtmp->sendMsg(args->netfd, qhead->channel,
                                        RTMP::HEADER_8, response->allocated(),
                                        RTMP::INVOKE, RTMPMsg::FROM_SERVER,
                                        *response)) {
                              }
                          } else if (body->getMethodName() == "play") {
                              string filespec;
                              boost::shared_ptr<gnash::RTMPMsg> nc = rtmp->getNetConnection();
                              boost::shared_ptr<cygnal::Element> tcurl = nc->findProperty("tcUrl");
                              URL url(tcurl->to_string());
                              filespec += url.hostname() + url.path();
                              filespec += '/';
                              filespec += body->at(1)->to_string();

                              if (hand->playStream(filespec)) {
                                  // Send the Set Chunk Size response
#if 1
                                  response = rtmp->encodeChunkSize(4096);
                                  if (rtmp->sendMsg(args->netfd, RTMP_SYSTEM_CHANNEL,
                                        RTMP::HEADER_12, response->allocated(),
                                        RTMP::CHUNK_SIZE, RTMPMsg::FROM_SERVER,
                                        *response)) {
                                  }
#endif
                              // Send the Play.Resetting response
                                  response = rtmp->encodeResult(RTMPMsg::NS_PLAY_RESET, body->at(1)->to_string(), transid);
                                  if (rtmp->sendMsg(args->netfd, qhead->channel,
                                        RTMP::HEADER_8, response->allocated(),
                                        RTMP::INVOKE, RTMPMsg::FROM_SERVER,
                                        *response)) {
                                  }
                                  // Send the Play.Start response
                                  response = rtmp->encodeResult(RTMPMsg::NS_PLAY_START, body->at(1)->to_string(), transid);
                                  if (rtmp->sendMsg(args->netfd, qhead->channel,
                                        RTMP::HEADER_8, response->allocated(),
                                        RTMP::INVOKE, RTMPMsg::FROM_SERVER,
                                        *response)) {
                                  }
                              } else {
                                  response = rtmp->encodeResult(RTMPMsg::NS_PLAY_STREAMNOTFOUND, body->at(1)->to_string(), transid);
                                  if (rtmp->sendMsg(args->netfd, qhead->channel,
                                        RTMP::HEADER_8, response->allocated(),
                                        RTMP::INVOKE, RTMPMsg::FROM_SERVER,
                                        *response)) {
                                  }
                              }
                              sleep(1); // FIXME: debugging crap
                              // Send the User Control - Stream Live
                              response = rtmp->encodeUserControl(RTMP::STREAM_LIVE, 1);
                              if (rtmp->sendMsg(args->netfd, RTMP_SYSTEM_CHANNEL,
                                        RTMP::HEADER_12, response->allocated(),
                                        RTMP::USER, RTMPMsg::FROM_SERVER,
                                        *response)) {
                              }
                              sleep(1); // FIXME: debugging crap
                              // Send an empty Audio packet to get
                              // things started.
                              if (rtmp->sendMsg(args->netfd, 6,
                                        RTMP::HEADER_12, 0,
                                        RTMP::AUDIO_DATA, RTMPMsg::FROM_SERVER,
                                        0, 0)) {
                              }
                              // Send an empty Video packet to get
                              // things started.
                              if (rtmp->sendMsg(args->netfd, 5,
                                        RTMP::HEADER_12, 0,
                                        RTMP::VIDEO_DATA, RTMPMsg::FROM_SERVER,
                                        0, 0)) {
                              }
                              sleep(1); // FIXME: debugging crap
                              // Send the User Control - Stream Start
                              response = rtmp->encodeUserControl(RTMP::STREAM_START, 1);
                              if (rtmp->sendMsg(args->netfd, RTMP_SYSTEM_CHANNEL,
                                        RTMP::HEADER_12, response->allocated(),
                                        RTMP::USER, RTMPMsg::FROM_SERVER,
                                        *response)) {
                              }                       
                              int active_stream = hand->getActiveDiskStreams();
                              boost::uint8_t *ptr = hand->getDiskStream(active_stream)->get();
                              if (ptr) {
                                  log_network("Sending %s to client",
                                              hand->getDiskStream(active_stream)->getFilespec());
                                  if (rtmp->sendMsg(args->netfd, 5,
                                        RTMP::HEADER_12, 400,
                                        RTMP::NOTIFY, RTMPMsg::FROM_SERVER,
                                        ptr, 400)) {
                                      log_network("Sent first page to client");
                                  }
                              }  
                          } else if (body->getMethodName() == "seek") {
                              hand->seekStream();
                          } else if (body->getMethodName() == "pause") {
                              hand->pauseStream(transid);
                          } else if (body->getMethodName() == "close") {
                              hand->closeStream(transid);
                          } else if (body->getMethodName() == "resume") {
                              hand->resumeStream(transid);
                          } else if (body->getMethodName() == "delete") {
                              hand->deleteStream(transid);
                          } else if (body->getMethodName() == "publish") {
                              hand->publishStream();
                          } else if (body->getMethodName() == "togglePause") {
                              hand->togglePause(transid);
                              // This is a server installation specific  method.
                          } else if (body->getMethodName() == "FCSubscribe") {
                              hand->setFCSubscribe(body->at(0)->to_string());
                          } else if (body->getMethodName() == "_error") {
                              log_error("Received an _error message from the client!");
                          } else {
                              /* size_t ret = */ hand->writeToPlugin(tmpptr, qhead->bodysize);
                              boost::shared_ptr<cygnal::Buffer> result = hand->readFromPlugin();
                              if (result) {
                                  if (rtmp->sendMsg(args->netfd, qhead->channel,
                                                    RTMP::HEADER_8, result->allocated(),
                                                    RTMP::INVOKE, RTMPMsg::FROM_SERVER,
                                                    *result)) {
                                      log_network("Sent response to client.");
                                  }
                              }
                              done = true;
                          }
                          break;
                      }
                      case RTMP::FLV_DATA:
                          log_unimpl("RTMP type %d", qhead->type);
                          break;
                      default:
                          log_error (_("ERROR: Unidentified AMF header data type 0x%x"), qhead->type);
                          break;
                    };
    
//                  body->dump();

                    // size_t ret = hand->writeToPlugin(tmpptr, qhead->bodysize);
#if 0
                    boost::shared_ptr<cygnal::Buffer> result = hand->readFromPlugin();
                    if (result) { // FIXME: this needs a real channel number
                        if (rtmp->sendMsg(args->netfd, 0x3, RTMP::HEADER_8, ret,
                                          RTMP::INVOKE, RTMPMsg::FROM_SERVER, *result)) {
                            log_network("Sent response to client.");
                        }
                    }
#endif              
//                  log_network("RET is: %d", ret);
                } // end of processing all the messages in the que
                
                // we're done processing these packets, so get rid of them
                pkt.reset();

                
            } else {
                log_network("Never read any data from fd #%d", args->netfd);
#if 0
                // Send a ping to reset the new stream
                boost::shared_ptr<cygnal::Buffer> ping_reset =
                    rtmp->encodePing(RTMP::PING_CLEAR, 0);
                if (rtmp->sendMsg(args->netfd, RTMP_SYSTEM_CHANNEL,
                          RTMP::HEADER_12, ping_reset->size(),
                          RTMP::PING, RTMPMsg::FROM_SERVER, *ping_reset)) {
                    log_network("Sent Ping to client");
                } else {
                    log_error("Couldn't send Ping to client!");
                }
#endif
                initialize = true;
                return true;
            }
        } else {
            // log_error("Communication error with client using fd #%d", args->netfd);
            rtmp->closeNet(args->netfd);
            initialize = true;
            return false;
        }
    } while (!done);
    
    return true;
}

} // end of gnash namespace

// local Variables:
// mode: C++
// indent-tabs-mode: t
// End:

/* [<][>][^][v][top][bottom][index][help] */