peerwireclient.cpp Example File
torrent/peerwireclient.cpp/**************************************************************************** ** ** Copyright (C) 2016 The Qt Company Ltd. ** Contact: https://www.qt.io/licensing/ ** ** This file is part of the examples of the Qt Toolkit. ** ** $QT_BEGIN_LICENSE:BSD$ ** Commercial License Usage ** Licensees holding valid commercial Qt licenses may use this file in ** accordance with the commercial license agreement provided with the ** Software or, alternatively, in accordance with the terms contained in ** a written agreement between you and The Qt Company. For licensing terms ** and conditions see https://www.qt.io/terms-conditions. For further ** information use the contact form at https://www.qt.io/contact-us. ** ** BSD License Usage ** Alternatively, you may use this file under the terms of the BSD license ** as follows: ** ** "Redistribution and use in source and binary forms, with or without ** modification, are permitted provided that the following conditions are ** met: ** * Redistributions of source code must retain the above copyright ** notice, this list of conditions and the following disclaimer. ** * Redistributions in binary form must reproduce the above copyright ** notice, this list of conditions and the following disclaimer in ** the documentation and/or other materials provided with the ** distribution. ** * Neither the name of The Qt Company Ltd nor the names of its ** contributors may be used to endorse or promote products derived ** from this software without specific prior written permission. ** ** ** THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ** "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT ** LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR ** A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT ** OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, ** SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT ** LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, ** DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY ** THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT ** (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE ** OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE." ** ** $QT_END_LICENSE$ ** ****************************************************************************/ #include "peerwireclient.h" #include <QHostAddress> #include <QTimerEvent> static const int PendingRequestTimeout = 60 * 1000; static const int ClientTimeout = 120 * 1000; static const int ConnectTimeout = 60 * 1000; static const int KeepAliveInterval = 30 * 1000; static const int RateControlTimerDelay = 2000; static const int MinimalHeaderSize = 48; static const char ProtocolId[] = "BitTorrent protocol"; static const char ProtocolIdSize = 19; // Reads a 32bit unsigned int from data in network order. static inline quint32 fromNetworkData(const char *data) { const unsigned char *udata = (const unsigned char *)data; return (quint32(udata[0]) << 24) | (quint32(udata[1]) << 16) | (quint32(udata[2]) << 8) | (quint32(udata[3])); } // Writes a 32bit unsigned int from num to data in network order. static inline void toNetworkData(quint32 num, char *data) { unsigned char *udata = (unsigned char *)data; udata[3] = (num & 0xff); udata[2] = (num & 0xff00) >> 8; udata[1] = (num & 0xff0000) >> 16; udata[0] = (num & 0xff000000) >> 24; } // Constructs an unconnected PeerWire client and starts the connect timer. PeerWireClient::PeerWireClient(const QByteArray &peerId, QObject *parent) : QTcpSocket(parent), pendingBlockSizes(0), pwState(ChokingPeer | ChokedByPeer), receivedHandShake(false), gotPeerId(false), sentHandShake(false), nextPacketLength(-1), pendingRequestTimer(0), invalidateTimeout(false), keepAliveTimer(0), torrentPeer(0) { memset(uploadSpeedData, 0, sizeof(uploadSpeedData)); memset(downloadSpeedData, 0, sizeof(downloadSpeedData)); transferSpeedTimer = startTimer(RateControlTimerDelay); timeoutTimer = startTimer(ConnectTimeout); peerIdString = peerId; connect(this, SIGNAL(readyRead()), this, SIGNAL(readyToTransfer())); connect(this, SIGNAL(connected()), this, SIGNAL(readyToTransfer())); connect(&socket, SIGNAL(connected()), this, SIGNAL(connected())); connect(&socket, SIGNAL(readyRead()), this, SIGNAL(readyRead())); connect(&socket, SIGNAL(disconnected()), this, SIGNAL(disconnected())); connect(&socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SIGNAL(error(QAbstractSocket::SocketError))); connect(&socket, SIGNAL(bytesWritten(qint64)), this, SIGNAL(bytesWritten(qint64))); connect(&socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)), this, SLOT(socketStateChanged(QAbstractSocket::SocketState))); } // Registers the peer ID and SHA1 sum of the torrent, and initiates // the handshake. void PeerWireClient::initialize(const QByteArray &infoHash, int pieceCount) { this->infoHash = infoHash; peerPieces.resize(pieceCount); if (!sentHandShake) sendHandShake(); } void PeerWireClient::setPeer(TorrentPeer *peer) { torrentPeer = peer; } TorrentPeer *PeerWireClient::peer() const { return torrentPeer; } QBitArray PeerWireClient::availablePieces() const { return peerPieces; } QList<TorrentBlock> PeerWireClient::incomingBlocks() const { return incoming; } // Sends a "choke" message, asking the peer to stop requesting blocks. void PeerWireClient::chokePeer() { const char message[] = {0, 0, 0, 1, 0}; write(message, sizeof(message)); pwState |= ChokingPeer; // After receiving a choke message, the peer will assume all // pending requests are lost. pendingBlocks.clear(); pendingBlockSizes = 0; } // Sends an "unchoke" message, allowing the peer to start/resume // requesting blocks. void PeerWireClient::unchokePeer() { const char message[] = {0, 0, 0, 1, 1}; write(message, sizeof(message)); pwState &= ~ChokingPeer; if (pendingRequestTimer) killTimer(pendingRequestTimer); } // Sends a "keep-alive" message to prevent the peer from closing // the connection when there's no activity void PeerWireClient::sendKeepAlive() { const char message[] = {0, 0, 0, 0}; write(message, sizeof(message)); } // Sends an "interested" message, informing the peer that it has got // pieces that we'd like to download. void PeerWireClient::sendInterested() { const char message[] = {0, 0, 0, 1, 2}; write(message, sizeof(message)); pwState |= InterestedInPeer; // After telling the peer that we're interested, we expect to get // unchoked within a certain timeframe; otherwise we'll drop the // connection. if (pendingRequestTimer) killTimer(pendingRequestTimer); pendingRequestTimer = startTimer(PendingRequestTimeout); } // Sends a "not interested" message, informing the peer that it does // not have any pieces that we'd like to download. void PeerWireClient::sendNotInterested() { const char message[] = {0, 0, 0, 1, 3}; write(message, sizeof(message)); pwState &= ~InterestedInPeer; } // Sends a piece notification / a "have" message, informing the peer // that we have just downloaded a new piece. void PeerWireClient::sendPieceNotification(int piece) { if (!sentHandShake) sendHandShake(); char message[] = {0, 0, 0, 5, 4, 0, 0, 0, 0}; toNetworkData(piece, &message[5]); write(message, sizeof(message)); } // Sends the complete list of pieces that we have downloaded. void PeerWireClient::sendPieceList(const QBitArray &bitField) { // The bitfield message may only be sent immediately after the // handshaking sequence is completed, and before any other // messages are sent. if (!sentHandShake) sendHandShake(); // Don't send the bitfield if it's all zeros. if (bitField.count(true) == 0) return; int bitFieldSize = bitField.size(); int size = (bitFieldSize + 7) / 8; QByteArray bits(size, '\0'); for (int i = 0; i < bitFieldSize; ++i) { if (bitField.testBit(i)) { quint32 byte = quint32(i) / 8; quint32 bit = quint32(i) % 8; bits[byte] = uchar(bits.at(byte)) | (1 << (7 - bit)); } } char message[] = {0, 0, 0, 1, 5}; toNetworkData(bits.size() + 1, &message[0]); write(message, sizeof(message)); write(bits); } // Sends a request for a block. void PeerWireClient::requestBlock(int piece, int offset, int length) { char message[] = {0, 0, 0, 1, 6}; toNetworkData(13, &message[0]); write(message, sizeof(message)); char numbers[4 * 3]; toNetworkData(piece, &numbers[0]); toNetworkData(offset, &numbers[4]); toNetworkData(length, &numbers[8]); write(numbers, sizeof(numbers)); incoming << TorrentBlock(piece, offset, length); // After requesting a block, we expect the block to be sent by the // other peer within a certain number of seconds. Otherwise, we // drop the connection. if (pendingRequestTimer) killTimer(pendingRequestTimer); pendingRequestTimer = startTimer(PendingRequestTimeout); } // Cancels a request for a block. void PeerWireClient::cancelRequest(int piece, int offset, int length) { char message[] = {0, 0, 0, 1, 8}; toNetworkData(13, &message[0]); write(message, sizeof(message)); char numbers[4 * 3]; toNetworkData(piece, &numbers[0]); toNetworkData(offset, &numbers[4]); toNetworkData(length, &numbers[8]); write(numbers, sizeof(numbers)); incoming.removeAll(TorrentBlock(piece, offset, length)); } // Sends a block to the peer. void PeerWireClient::sendBlock(int piece, int offset, const QByteArray &data) { QByteArray block; char message[] = {0, 0, 0, 1, 7}; toNetworkData(9 + data.size(), &message[0]); block += QByteArray(message, sizeof(message)); char numbers[4 * 2]; toNetworkData(piece, &numbers[0]); toNetworkData(offset, &numbers[4]); block += QByteArray(numbers, sizeof(numbers)); block += data; BlockInfo blockInfo; blockInfo.pieceIndex = piece; blockInfo.offset = offset; blockInfo.length = data.size(); blockInfo.block = block; pendingBlocks << blockInfo; pendingBlockSizes += block.size(); if (pendingBlockSizes > 32 * 16384) { chokePeer(); unchokePeer(); return; } emit readyToTransfer(); } // Attempts to write 'bytes' bytes to the socket from the buffer. // This is used by RateController, which precisely controls how much // each client can write. qint64 PeerWireClient::writeToSocket(qint64 bytes) { qint64 totalWritten = 0; do { if (outgoingBuffer.isEmpty() && !pendingBlocks.isEmpty()) { BlockInfo block = pendingBlocks.takeFirst(); pendingBlockSizes -= block.length; outgoingBuffer += block.block; } qint64 written = socket.write(outgoingBuffer.constData(), qMin<qint64>(bytes - totalWritten, outgoingBuffer.size())); if (written <= 0) return totalWritten ? totalWritten : written; totalWritten += written; uploadSpeedData[0] += written; outgoingBuffer.remove(0, written); } while (totalWritten < bytes && (!outgoingBuffer.isEmpty() || !pendingBlocks.isEmpty())); return totalWritten; } // Attempts to read at most 'bytes' bytes from the socket. qint64 PeerWireClient::readFromSocket(qint64 bytes) { char buffer[1024]; qint64 totalRead = 0; do { qint64 bytesRead = socket.read(buffer, qMin<qint64>(sizeof(buffer), bytes - totalRead)); if (bytesRead <= 0) break; qint64 oldSize = incomingBuffer.size(); incomingBuffer.resize(oldSize + bytesRead); memcpy(incomingBuffer.data() + oldSize, buffer, bytesRead); totalRead += bytesRead; } while (totalRead < bytes); if (totalRead > 0) { downloadSpeedData[0] += totalRead; emit bytesReceived(totalRead); processIncomingData(); } return totalRead; } // Returns the average number of bytes per second this client is // downloading. qint64 PeerWireClient::downloadSpeed() const { qint64 sum = 0; for (unsigned int i = 0; i < sizeof(downloadSpeedData) / sizeof(qint64); ++i) sum += downloadSpeedData[i]; return sum / (8 * 2); } // Returns the average number of bytes per second this client is // uploading. qint64 PeerWireClient::uploadSpeed() const { qint64 sum = 0; for (unsigned int i = 0; i < sizeof(uploadSpeedData) / sizeof(qint64); ++i) sum += uploadSpeedData[i]; return sum / (8 * 2); } void PeerWireClient::setReadBufferSize(qint64 size) { socket.setReadBufferSize(size); } bool PeerWireClient::canTransferMore() const { return bytesAvailable() > 0 || socket.bytesAvailable() > 0 || !outgoingBuffer.isEmpty() || !pendingBlocks.isEmpty(); } void PeerWireClient::connectToHost(const QHostAddress &address, quint16 port, OpenMode openMode) { setOpenMode(openMode); socket.connectToHost(address, port, openMode); } void PeerWireClient::diconnectFromHost() { socket.disconnectFromHost(); } void PeerWireClient::timerEvent(QTimerEvent *event) { if (event->timerId() == transferSpeedTimer) { // Rotate the upload / download records. for (int i = 6; i >= 0; --i) { uploadSpeedData[i + 1] = uploadSpeedData[i]; downloadSpeedData[i + 1] = downloadSpeedData[i]; } uploadSpeedData[0] = 0; downloadSpeedData[0] = 0; } else if (event->timerId() == timeoutTimer) { // Disconnect if we timed out; otherwise the timeout is // restarted. if (invalidateTimeout) { invalidateTimeout = false; } else { abort(); emit infoHashReceived(QByteArray()); } } else if (event->timerId() == pendingRequestTimer) { abort(); } else if (event->timerId() == keepAliveTimer) { sendKeepAlive(); } QTcpSocket::timerEvent(event); } // Sends the handshake to the peer. void PeerWireClient::sendHandShake() { sentHandShake = true; // Restart the timeout if (timeoutTimer) killTimer(timeoutTimer); timeoutTimer = startTimer(ClientTimeout); // Write the 68 byte PeerWire handshake. write(&ProtocolIdSize, 1); write(ProtocolId, ProtocolIdSize); write(QByteArray(8, '\0')); write(infoHash); write(peerIdString); } void PeerWireClient::processIncomingData() { invalidateTimeout = true; if (!receivedHandShake) { // Check that we received enough data if (bytesAvailable() < MinimalHeaderSize) return; // Sanity check the protocol ID QByteArray id = read(ProtocolIdSize + 1); if (id.at(0) != ProtocolIdSize || !id.mid(1).startsWith(ProtocolId)) { abort(); return; } // Discard 8 reserved bytes, then read the info hash and peer ID (void) read(8); // Read infoHash QByteArray peerInfoHash = read(20); if (!infoHash.isEmpty() && peerInfoHash != infoHash) { abort(); return; } emit infoHashReceived(peerInfoHash); if (infoHash.isEmpty()) { abort(); return; } // Send handshake if (!sentHandShake) sendHandShake(); receivedHandShake = true; } // Handle delayed peer id arrival if (!gotPeerId) { if (bytesAvailable() < 20) return; gotPeerId = true; if (read(20) == peerIdString) { // We connected to ourself abort(); return; } } // Initialize keep-alive timer if (!keepAliveTimer) keepAliveTimer = startTimer(KeepAliveInterval); do { // Find the packet length if (nextPacketLength == -1) { if (bytesAvailable() < 4) return; char tmp[4]; read(tmp, sizeof(tmp)); nextPacketLength = fromNetworkData(tmp); if (nextPacketLength < 0 || nextPacketLength > 200000) { // Prevent DoS abort(); return; } } // KeepAlive if (nextPacketLength == 0) { nextPacketLength = -1; continue; } // Wait with parsing until the whole packet has been received if (bytesAvailable() < nextPacketLength) return; // Read the packet QByteArray packet = read(nextPacketLength); if (packet.size() != nextPacketLength) { abort(); return; } switch (packet.at(0)) { case ChokePacket: // We have been choked. pwState |= ChokedByPeer; incoming.clear(); if (pendingRequestTimer) killTimer(pendingRequestTimer); emit choked(); break; case UnchokePacket: // We have been unchoked. pwState &= ~ChokedByPeer; emit unchoked(); break; case InterestedPacket: // The peer is interested in downloading. pwState |= PeerIsInterested; emit interested(); break; case NotInterestedPacket: // The peer is not interested in downloading. pwState &= ~PeerIsInterested; emit notInterested(); break; case HavePacket: { // The peer has a new piece available. quint32 index = fromNetworkData(&packet.data()[1]); if (index < quint32(peerPieces.size())) { // Only accept indexes within the valid range. peerPieces.setBit(int(index)); } emit piecesAvailable(availablePieces()); break; } case BitFieldPacket: // The peer has the following pieces available. for (int i = 1; i < packet.size(); ++i) { for (int bit = 0; bit < 8; ++bit) { if (packet.at(i) & (1 << (7 - bit))) { int bitIndex = int(((i - 1) * 8) + bit); if (bitIndex >= 0 && bitIndex < peerPieces.size()) { // Occasionally, broken clients claim to have // pieces whose index is outside the valid range. // The most common mistake is the index == size // case. peerPieces.setBit(bitIndex); } } } } emit piecesAvailable(availablePieces()); break; case RequestPacket: { // The peer requests a block. quint32 index = fromNetworkData(&packet.data()[1]); quint32 begin = fromNetworkData(&packet.data()[5]); quint32 length = fromNetworkData(&packet.data()[9]); emit blockRequested(int(index), int(begin), int(length)); break; } case PiecePacket: { int index = int(fromNetworkData(&packet.data()[1])); int begin = int(fromNetworkData(&packet.data()[5])); incoming.removeAll(TorrentBlock(index, begin, packet.size() - 9)); // The peer sends a block. emit blockReceived(index, begin, packet.mid(9)); // Kill the pending block timer. if (pendingRequestTimer) { killTimer(pendingRequestTimer); pendingRequestTimer = 0; } break; } case CancelPacket: { // The peer cancels a block request. quint32 index = fromNetworkData(&packet.data()[1]); quint32 begin = fromNetworkData(&packet.data()[5]); quint32 length = fromNetworkData(&packet.data()[9]); for (int i = 0; i < pendingBlocks.size(); ++i) { const BlockInfo &blockInfo = pendingBlocks.at(i); if (blockInfo.pieceIndex == int(index) && blockInfo.offset == int(begin) && blockInfo.length == int(length)) { pendingBlocks.removeAt(i); break; } } break; } default: // Unsupported packet type; just ignore it. break; } nextPacketLength = -1; } while (bytesAvailable() > 0); } void PeerWireClient::socketStateChanged(QAbstractSocket::SocketState state) { setLocalAddress(socket.localAddress()); setLocalPort(socket.localPort()); setPeerName(socket.peerName()); setPeerAddress(socket.peerAddress()); setPeerPort(socket.peerPort()); setSocketState(state); } qint64 PeerWireClient::readData(char *data, qint64 size) { int n = qMin<int>(size, incomingBuffer.size()); memcpy(data, incomingBuffer.constData(), n); incomingBuffer.remove(0, n); return n; } qint64 PeerWireClient::readLineData(char *data, qint64 maxlen) { return QIODevice::readLineData(data, maxlen); } qint64 PeerWireClient::writeData(const char *data, qint64 size) { int oldSize = outgoingBuffer.size(); outgoingBuffer.resize(oldSize + size); memcpy(outgoingBuffer.data() + oldSize, data, size); emit readyToTransfer(); return size; }