Connecting to the Bitcoin network with Scala, Akka, and scodec
The Bitcoin network is made up of nodes that communicate over TCP using a set of different message types. Besides sending and receiving messages, a node in the extended Bitcoin network can do other things at the same time, such as mining, validating transactions, running a wallet, etc.
In the reference client implementation, there is one thread that handles socket communication and one thread that processes individual peer messages. Incoming peer messages are read into a CDataStream
buffer for each connected node, which is then processed using the ProcessMessage
function. ProcessMessage
deserializes messages as they are being processed.
In my Scala Bitcoin client implementation, I want to use case classes to represent the different message types that will be sent and received by the client. There will have to be a way to convert raw TCP data back and forth into instances of message classes that can be sent and received by the message-handling Akka actors.
The Message Codec
I use the scodec
library to define the codec that will encode and decode messages.
In the Bitcoin protocol, there is a common message header at the beginning of each message that includes a command string that tells the type of payload that will follow. I will need to create a polymorphic codec so that I can handle messages containing all of the different payload types. This is possible using a Companion Type System
, following the example given by Kifi.
I start by defining a message trait:
trait Message { self =>
type E >: self.type <: Message
def companion: MessageCompanion[E]
def instance: E = self
}
along with it’s companion trait:
trait MessageCompanion[E <: Message] {
def codec(version: Int): Codec[E]
def command: String
}
and the companion object:
object MessageCompanion {
val all: Set[MessageCompanion[_ <: Message]] = Set(Addr, Alert, Block, GetAddr, GetBlocks,
GetData, GetHeaders, Headers, Inv, MemPool, NotFound, Ping, Pong, Reject,
Tx, Verack, Version)
val byCommand: Map[ByteVector, MessageCompanion[_ <: Message]] = {
require(all.map(_.command).size == all.size, "Type headers must be unique.")
all.map { companion => Message.padCommand(companion.command) -> companion }.toMap
}
}
Now we can use the information from the message header to implement a scodec Codec
for the full message:
def codec(magic: Long, version: Int): Codec[Message] = {
def encode(msg: Message) = {
val c = msg.companion.codec(version)
for {
magic <- uint32L.encode(magic)
command <- bytes(12).encode(padCommand(msg.companion.command))
payload <- c.encode(msg)
length <- uint32L.encode(payload.length / 8)
chksum <- uint32L.encode(Util.checksum(payload.toByteVector))
} yield magic ++ command ++ length ++ chksum ++ payload
}
def decode(bits: BitVector) =
for {
metadata <- decodeHeader(bits, magic, version)
(command, length, chksum, payload, rest) = metadata
msg <- decodePayload(payload, version, chksum, command)
} yield msg
Codec[Message](encode _, decode _)
}
The encode
function is straightforward. It first gets the correct Codec
for the message type that it is encoding. Then it encodes the different parts of the header and payload, and then it concatenates them all in the correct order.
The decode
function is more complicated. It doesn’t know which message type Codec
to use until it has started decoding the message header. So we first use a decodeHeader
function to extract the metadata from the header. Then we continue the for-comprehension (flatMap) with a second function called decodePayload
to decode the remainder of the message.
decodePayload
uses the information we’ve gathered from the header to decode the message payload:
def decodePayload(payload: BitVector, version: Int, chksum: Long, command: ByteVector) = {
val cmd = MessageCompanion.byCommand(command)
cmd.codec(version).decode(payload).flatMap { p =>
if (!p.remainder.isEmpty)
Failure(scodec.Err("payload length did not match."))
else if (Util.checksum(payload.toByteVector) == chksum) {
Successful(p)
} else {
Failure(scodec.Err("checksum did not match."))
}
}
}
and this is where the companion type becomes useful. Using just the command string from the header, we can use the byCommand
function to get the correct message type, and therefore also the correct codec for it’s payload.
Now, defining a new message type is easy and requires no changes to the Message
trait. It only requires an implementation of the Message
trait and an implemenation of the parametrized MessageCompanion
trait. For example, the GetAddr
message:
case class GetAddr() extends Message {
type E = GetAddr
def companion = GetAddr
}
object GetAddr extends MessageCompanion[GetAddr] {
def codec(version: Int): Codec[GetAddr] = provide(GetAddr())
def command = "getaddr"
}
The Akka Extension
Now that I have the codec to serialize and deserialize peer messages, I can create the TCP peer connections to other nodes in the Bitcoin network.
The Akka library includes a useful extension for creating TCP connections. It handles the low-level resources, and provides a simple interface for receiving and sending data. My goal is to create a second Akka extension, on top of the existing TCP extension, that will send and receive Bitcoin messages instead of raw binary data.
A real Bitcoin node should be able to both initiate and accept connections with other nodes in the network. I use Akka’s Tcp.Connect
command for outbound connections and Tcp.Bind
command for inbound connections.
When a new outbound connection needs to be initiated, a SocketClient
actor is spawned that attempts to open a socket connection with the remote address. If it succeeds, then it spawns a PeerConnection
actor using the new socket.
class SocketClient(magic: Long, socketManager: ActorRef) extends Actor {
import context.system
def receive: Receive = {
case connect: BTC.Connect =>
val s = sender()
socketManager ! Tcp.Connect(connect.remoteAddress, timeout = Some(5 seconds))
context.become(connecting(s, connect))
}
def connecting(handler: ActorRef, cmd: BTC.Connect): Receive = {
case Tcp.CommandFailed(_: Tcp.Connect) =>
handler ! BTC.CommandFailed(cmd)
context stop self
case c @ Tcp.Connected(remote, local) =>
val s = sender()
val bc = context.actorOf(PeerConnection.props(magic, handler, s, c, false), name = "PeerConnection")
context.watch(bc)
bc ! PeerConnection.StartConnection
case _: Terminated =>
context.stop(self)
}
}
A similar thing happens for accepting incoming connections. A SocketServer
actor binds to a port and spawns a new PeerConnection
every time that a new socket is connected on the bound port.
class SocketServer(magic: Long, socketManager: ActorRef) extends Actor {
import context.system
var i = 0
def receive: Receive = {
case bind: BTC.Bind =>
socketManager ! Tcp.Bind(self, bind.localAddress)
context.become(binding(bind.handler, bind))
}
def binding(handler: ActorRef, cmd: BTC.Bind): Receive = {
case b @ Tcp.Bound(localAddress) =>
handler ! BTC.Bound
context.become(bound(handler, cmd))
case Tcp.CommandFailed(_: Tcp.Bind) =>
handler ! BTC.CommandFailed(cmd)
context stop self
}
def bound(handler: ActorRef, cmd: BTC.Bind): Receive = {
case c @ Tcp.Connected(remote, local) =>
val s = sender()
val bc = context.actorOf(PeerConnection.props(magic, handler, s, c, true), name = s"PeerConnection-$i")
bc ! PeerConnection.StartConnection
i += 1
}
}
The Peer Handshake
In the peer connection handshake, a node sends a Version message to the remote node. It then receives a Version message and a Verack message from the remote node. When these messages are receive, the local node sends the remote client a Verack message, and the connection is officially established.
Now, to encode this in an Akka actor, I represent the different states of the handshake as a finite state machine using the Akka become
method:
def receive: Receive = {
case StartConnection =>
socketHandler ! SocketHandler.HandleSocket()
if (!inbound)
socketHandler ! OutgoingMessage(localV)
context.become(awaitingVersion)
case ConnectTimeout =>
context.stop(self)
}
def awaitingVersion: Receive = {
case IncomingMessage(remoteV: Version) =>
if (inbound)
socketHandler ! OutgoingMessage(localV)
socketHandler ! OutgoingMessage(Verack())
context.become(awaitingVerack(remoteV))
case ConnectTimeout =>
context.stop(self)
}
def awaitingVerack(remoteV: Version): Receive = {
case IncomingMessage(verack: Verack) =>
val verNum = reconcileVersions(localV, remoteV)
socketHandler ! SocketHandler.SetVersion(verNum)
handler ! BTC.Connected(remoteV, c, inbound)
context.become(connected(remoteV))
case ConnectTimeout =>
context.stop(self)
}
def connected(remoteV: Version): Receive = {
case msg: IncomingMessage =>
stash()
case BTC.Register(ref: ActorRef) =>
context.watch(ref)
connectTimer.cancel
context.become(registered(ref, remoteV))
unstashAll()
case _: Terminated =>
context.stop(self)
case ConnectTimeout =>
context.stop(self)
}
The nice thing about this representation is that the same exact receive methods are used, regardless of whether the node is making inbound or an outbound connection. All that is required is a single inbound
parameter that modifies the behavior in two of the states.