In the previous post, I laid down foundations for PeerMonad, where
most of the protocol logic will live.
This article will focus on using that towards our goal — downloading files.
Let’s quickly recap the first actions our client takes. First, it asks the tracker for peers, then it initiates connections to all of them and enters peer loops.
Conceptually, our peer loop boils down to this:
forever (getPeerEvent >>= handleEvent)
The peer loop is entered after handshakes. For peers that have completed at least one piece, the first message they send is a bitfield with their progress. We need this information to know which pieces can be downloaded from this peer, so let’s remember it.
You might recall from the first post that peers start out as
uninterested and choked.
To get to the point where we can send data messages and
download files, the peer has to unchoke us first.
For this purpose, the Interested message is sent out.
Let’s handle the first message:
handleEvent (PWPEvent pwp) = handlePWP pwp
handlePWP (Bitfield field) = do
peerData <- getPeerData
newBitField <- runMemory $ do
len <- BF.length <$> getBitfield
let newBitField = BF.BitField field len
modifyAvailability $ PS.addToAvailability newBitField
return newBitField
updatePeerData $ peerData { peerBitField = newBitField }
emit Interested
- Make a
BitFieldout of the raw bytestring with the same length as ours. - Update the availability cache with this new bitfield.
- Store the bitfield in our local
PeerDatastructure. - Emit the
Interestedmessage by default. This is short-sighted, but should do for now.
When the peer finally unchokes us, we can start downloading:
handlePWP Unchoke = do
peerData <- getPeerData
updatePeerData $ peerData { peerChoking = False }
requestNextChunk
We’ll get into requestNextChunk soon enough, but before, let’s explain
how downloads are split up in the protocol.
Every torrent is divided into pieces with size described in the MetaInfo
dictionary.
The last piece is allowed to be smaller.
Each piece is further split up into chunks/blocks.
Their size is implementation-defined and from what I could find, is usually
either 214 or 215 bytes.
There’s some discussion on the wiki describing the history behind
this limit.
214 appears to be the safer bet for compatibility reasons.
In this case, the last chunk is allowed to be smaller too.
Chunk fields
For holding the download status of chunks within a piece, I created this structure:
data ChunkField = ChunkField
{ missingChunks :: IntSet
, requestedChunks :: IntSet
, completedChunks :: IntSet
}
newChunkField :: Int -> ChunkField
newChunkField n = ChunkField (IntSet.fromList [0..(n-1)])
IntSet.empty
IntSet.empty
It starts out by storing all chunk indexes in the missing set. From there, we have access to simple operations like marking a chunk as requested or completed, asking if all chunks were completed or at least requested. There’s also a function that returns the next missing chunk (if there is one) and marks it as requested for us:
getNextChunk :: ChunkField -> Maybe (ChunkField, ChunkId)
This is the basis of our chunk selection algorithm, which returns missing chunks in ascending order of indexes.
Now that we can store progress within a piece, describing progress of all pieces that are being downloaded is simple:
type Chunks = Map PieceId (ChunkField, ByteString)
For every piece that is being downloaded, it stores a ChunkField and the
buffer for final data.
Keeping around that buffer can be quite expensive and other, more mature clients
write every chunk to disk and verify them later when completed.
For simplicity, we’ll keep it in memory until all chunks are completed.
Summing up, to know which pieces are completed - we use a BitField.
To store the progress of a single piece - we use a ChunkField.
Requesting chunk by chunk
Let’s see the requestNextChunk function.
requestNextChunk :: F PeerMonad ()
requestNextChunk = do
peerData <- getPeerData
meta <- getMeta
when (peerChoking peerData == False && requestsLive peerData < maxRequestsPerPeer) $
operation <- runMemory $ nextRequestOperation peerData meta
case operation of
Just (RequestChunk pieceId chunkId pwpRequest) -> do
let modifiedPeer = peerData { requestsLive = requestsLive peerData + 1 }
updatePeerData modifiedPeer
emit pwpRequest
requestNextChunk
Nothing -> return ()
-
Some conditions have to be checked at the beginning. The peer must not be choking us and we cannot exceed the
maxRequestsPerPeerlimit. This limit is important for balancing work and scaling the bandwidth used. I’ll describe it in future posts dedicated to the topic. For now it’s used to forbid peer loops from claiming too many pieces for themselves. -
It relies on another function,
nextRequestOperation, to inspect shared memory, make some changes and come up with an operation for us. When it tells us to request a specific chunk, we do just that. -
The request is sent and the operation tried again.
Let’s dig into the nextRequestOperation.
data RequestOperation = RequestChunk PieceId ChunkId PWP
nextRequestOperation :: PeerData
-> MetaInfo
-> F MemoryMonad (Maybe RequestOperation)
nextRequestOperation peerData meta = do
chunks <- getChunks
avData <- getAvailability
ourBitField <- getBitfield
let peersBitField = peerBitField peerData
infoDict = info meta
defaultPieceLen = pieceLength infoDict
totalSize = Meta.length infoDict
requestedBitField = BF.fromChunkFields (BF.length peersBitField)
(Map.toList (fst <$> chunks))
completedBitField = BF.union ourBitField requestedBitField
pendingBitField = BF.difference peersBitField completedBitField
case PS.getNextPiece pendingBitField avData of
Nothing -> return Nothing
Just piece ->
let pieceLen = expectedPieceSize totalSize defaultPieceLen piece
in case Map.lookup piece chunks of
Nothing -> do
let chunksCount = chunksInPiece pieceLen defaultChunkSize
chunkData = B.replicate (fromIntegral pieceLen) 0
chunkField = CF.newChunkField (fromIntegral chunksCount)
chunkInfo = (chunkField, chunkData)
claimChunk chunkInfo piece
Just chunkInfo -> claimChunk chunkInfo piece
-
Shared variables needed to make a decision are read.
-
Taking in our bitfield of completed pieces, we add full requested ones. The result is subtracted from peer’s bitfield to get a full list of pieces that we can request from this peer.
getNextPieceimplements a rare-first piece selection algorithm. -
The piece we were told to download is looked up in the map. A new ChunkField is inserted if this piece has not been started yet. Finally, a single chunk is claimed and returned as an operation.
In the code above, claimChunk is used, so let’s take a look at it.
I promise this is the last piece (😃) of requesting logic for now.
claimChunk :: (CF.ChunkField, B.ByteString)
-> PieceId
-> F MemoryMonad (Maybe RequestOperation)
claimChunk (chunkField, chunkData) piece@(PieceId pieceId) =
case CF.getNextChunk chunkField of
Just (chunkField', chunk@(ChunkId chunkId)) -> do
let request = Request pieceId
(chunkId * defaultChunkSize)
(getChunkSize piece chunk)
modifyChunks $ Map.insert piece (chunkField', chunkData)
return $ Just $ RequestChunk piece chunk request
_ -> return Nothing
where getChunkSize :: PieceId -> ChunkId -> Word32
When we have a chunk to request here, we do just that.
A RequestChunk operation is returned to the caller.
Exception handling
The code so far has one fatal flaw: It relies on peers having flawless connection and uptime. In reality, there is a whole class of network errors, not to mention bad implementations of the client or the user closing the application that’s currently seeding us. All of these things can happen and we have to prepare for it.
It won’t be all that complicated, though. Forgetting about the resources allocated by the implementation of PeerMonad for now, the peer loop itself operates only on PeerData and shared memory. When an exception happens, there is nothing in PeerData that needs cleaning up — it only stores general information about the peer.
The story is different with shared state. We need to remember which chunks have been requested, so in case the connection is lost, we can mark them back as missing again. The piece availability cache should also be updated when the peer loop closes. No additional data is needed here, it can be done by simply taking the peer’s bitfield and removing it from the cache.
To remember which chunks have been requested, let’s add registerActiveChunk
and deregisterActiveChunk as operations to our PeerMonad.
They will take both piece and chunk ids and store or remove them from somewhere.
To retrieve this data, we’ll define getActiveChunks too.
Because events are used for anything that comes in from the outside,
we have a nice property in return.
Exceptions are not immediately thrown at us, they will come as events.
This is useful for any code that behaves similarly to:
(piece, chunk) <- runMemory $ do
-- claim a chunk
registerActiveChunk piece chunk
Usually, we can only be sure that STM will handle exceptions and always keep
us in a consistent state.
Thanks to errors delivered as events, we also know that no exception can happen
before registerActiveChunk in this example.
This guarantee will help in making the client more reliable.
Finally, our error handler might look like this:
-- | Marks the chunk as missing and deregisters the active chunk.
releaseActiveChunk :: PieceId -> ChunkId -> F PeerMonad ()
releaseActiveChunk pieceId chunkId = do
runMemory $ modifyChunks $ \chunks ->
case Map.lookup pieceId chunks of
Just (cf, d) ->
let cf' = CF.markMissing cf chunkId
in Map.insert pieceId (cf', d) chunks
Nothing -> chunks
deregisterActiveChunk pieceId chunkId
onError = do
activeChunks <- getActiveChunks
let keys = Map.toList activeChunks
traverse_ (uncurry releaseActiveChunk) keys
Receiving data
We’ve seen how to request data from the peer, it’s now time to process the response.
Let’s start with the function that receives raw data for a chunk.
receiveChunk :: PieceId -> Word32 -> ByteString -> F PeerMonad ()
receiveChunk piece offset d = do
let chunkIndex = ChunkId (divideSize offset defaultChunkSize)
wasMarked <- runMemory $ do
chunks <- getChunks
case Map.lookup piece chunks of
Just (chunkField, chunkData) -> do
do
-- copy the incoming data into appropriate place in chunkData
let (ptr, o, len) = BI.toForeignPtr chunkData
chunkVector = VS.unsafeFromForeignPtr ptr o len
(ptr', o', len') = BI.toForeignPtr d
dataVector = VS.unsafeFromForeignPtr ptr' o' len'
dest = VS.take (B.length d) $ VS.drop (fromIntegral offset) chunkVector
src = dataVector
unsafePerformIO $ VS.copy dest src >> return (return ())
let chunkField' = CF.markCompleted chunkField chunkIndex
modifyChunks $ Map.insert piece (chunkField', chunkData)
return True
_ -> return False -- someone already filled this
deregisterActiveChunk piece chunkIndex
pData <- getPeerData
updatePeerData (pData { requestsLive = requestsLive pData - 1 })
when wasMarked $ processPiece piece
- Modify the buffer by copying the new data at the appropriate place.
This is done by converting bytestrings to storable vectors.
I will probably change it in the future,
Seq ByteStringseems like a good choice, concatted using a builder. - Mark the chunk as completed
wasMarkedis returned out of the transaction telling if the chunk was completed successfully.- The active chunk is forgotten, so it won’t be released in case of an exception.
When the chunk was successfully marked as completed, processPiece is called.
Its job is to check if the piece is now completed and proceed accordingly if so:
processPiece :: PieceId -> F PeerMonad ()
processPiece piece@(PieceId pieceId) = do
meta <- getMeta
let infoDict = info meta
defaultPieceLen = pieceLength infoDict
dataToWrite <- runMemory $ do
Just (chunkField, d) <- Map.lookup piece <$> getChunks
let getPieceHash (PieceId n) =
B.take 20 $ B.drop (fromIntegral n * 20) $ pieces infoDict
case CF.isCompleted chunkField of
True -> do
modifyChunks $ Map.delete piece
if hash d == getPieceHash piece
then do
bitfield <- getBitfield
let newBitfield = BF.set bitfield pieceId True
modifyBitfield (const newBitfield)
modifyAvailability (PS.addToAvailability newBitfield .
PS.removeFromAvailability bitfield)
return (Just d)
else return Nothing
False -> return Nothing
case dataToWrite of
Just d -> writeData (defaultPieceLen * pieceId) d
Nothing -> return ()
- When all the chunks are completed, we remove download progress for this piece.
- A hash check is performed to verify the completed piece. I’m using the Crypto.Hash library here.
- Bitfield and availability cache are updated.
- Data is written to disk.
When the piece is not completed, we short-circuit with no data to write and leave this function. If the hash check fails, our bitfield is not updated, but the chunkfield removed. This means that the piece is still marked as incomplete and the download progress removed. It will be retried just like any other missing piece.
A simple heuristic
Exception handling is needed and will help in many cases, but there is one more thing we should add. Imagine a scenario where, after requesting a chunk from the peer, he does not reply. The connection is alive, no exceptions were raised, but we still can’t get a reply from the other end.
For this purpose, I extended the registerActiveChunk functionality to record a
timestamp whenever an active chunk is registered.
This allows us to periodically check if a request to a peer timed out.
Since we already have code that releases a chunk and marks it back
as missing, it’s easy to implement.
Closing thoughts
Using PeerMonad from the previous article, we’ve implemented the download flow for our client. It will request pieces starting from rarest chunk by chunk. We’ve already made big steps in reliability, peers disconnecting or refusing to serve data won’t stop us from completing the torrent.
A single-file torrent, that is — a limitation I hope to address in the next article.