Distributed ping pong

31 min. read

I've been doing Martin Kleppmann's distributed systems course, and though I am liking it and I am learning a lot, it is too theoretical. After all, it's a university course. I feel like I really need to build a distributed system of my own if I really want to sink in the knowledge.

Searching on the Internet I found no practical tutorials on distributed systems, no "hello world" or "to do list" on the topic. Until I read a reply that someone left in Quora to someone with my same itch:

"Ping-pong is usually the hello world of distributed systems"

Makes sense. Indeed the minimal distributed system is one with just two nodes. So I decided to give it a try. I used Ruby, as it's the language that I feel most comfortable with, and since I am learning a new thing, I want to keep the unknowns to a minimum.

What I want to achieve

  1. I want to have a node acting as server and another acting as client.

  2. The client will send a request message to the server and the server will return a response message to the client. Ping pong.

  3. When I first start a node (starting a node is just starting the app from a specific machine), it tries to connect to a server node, then starts itself as a server, listening to messages from other nodes.

  4. In the app I set a default IP and port as the server address, as well as a default client port.

  5. It is also possible to manually pass a specific address to connect to via command-line options.

The code can be found here, below is a walk-through. Please beware that I am still a distributed systems noob and this app is basically me playing with the Ruby Socket library.

Node

Recalling point 1:

I want to have a node acting as server and another acting as client.

Let's start with a tiny class to represent a node. A node will have an id and an address, made up of an IP and a port. I would like to send the node details when messaging, so I also need a hash representation of the node to easily transform everything into JSON.

When I create the node, I pass an id and address, but I can also get the IP and port. In tests it looks like this:


require 'node'

RSpec.describe Node do
  subject(:node) { described_class.new(1, '127.0.0.1:8000') }

  it 'has an IP' do
    expect(node.ip).to eq('127.0.0.1')
  end

  it 'has an port' do
    expect(node.port).to eq('8000')
  end

  it 'has a hash representation' do
    expect(node.to_h).to eq(id: node.id, ip: node.ip, port: node.port)
  end
end

The Node class looks like this:


# frozen_string_literal: true

class Node
  attr_reader :id, :address

  def initialize(id, address)
    @id = id
    @address = address
  end

  def ip
    address_parts.first
  end

  def port
    address_parts.last
  end

  def to_h
    { id:, ip:, port: }
  end

  private

  def address_parts
    @address_parts ||= address.split(':')
  end
end

I will now create a node factory to create server and client nodes, and to transform a hash representation of a node into a Node instance.

For the server node, I set the id to -1 for now. The address comes from the options entered in the command line. If no address was entered, a default will be used.

I created a custom Options class to encapsulate the CLI user options (the address of the server as IP:port and the port of the client). It also defines the default values:


Options = Struct.new(:address, :port) do
  def self.default
    new('127.0.0.1:8001', '8000')
  end
end

This will handle point 4:

In the app I set a default IP and port as the server address, as well as a default client port.

The node factory can now create the server node:


class NodeFactory
  DEFAULT_SERVER_ID = -1

  def initialize(options = Options.default)
    @options = options
  end

  def server
    Node.new(DEFAULT_SERVER_ID, options.address)
  end

  # ...
end

The client node will have a randomly generated id. The address for now will be built using the port passed in the CLI (or a default one) and an IP taken from the list of IPs available in the machine (obtained from Socket.ip_address_list as an Addrinfo object). I am getting the first IP in the list, which will be localhost in most cases:


class NodeFactory
  # ...

  def client
    Node.new(id, address)
  end

  private

  attr_reader :options

  def id
    Random.rand(99_999_999)
  end

  def address
    ip = Socket.ip_address_list.first.ip_address
    port = options.port
    join(ip, port)
  end

  def join(ip, port)
    [ip, port].join(':')
  end
end

The factory also can create Node objects from a hash representation, this will come handy when processing node responses. It's not relevant if it's a server or a client because sometimes the server receives a message from the client and sometimes it's the other way around:


  def from_hash(hash)
    Node.new(hash[:id], join(hash[:ip], hash[:port]))
  end

With this, we finished point 1.

Messages

The client will send a request message to the server and the server will return a response message to the client. Ping pong.

Let's see what a message would look like. To make things simple I will have hardcoded messages for now. So a client node will always send the same hardcoded request message to a server node, and the server node will reply always with the same hardcoded response message. I know, so useful :P

Also, it looks relevant to include information about the node from which the message is sent, and the node to which the message is sent.

Finally, a message should have a JSON representation so that I can send it through the network, and I should also be able to parse a JSON message into the objects that formed it.

Tests look like this:


# frozen_string_literal: true

require 'message'
require 'node_factory'

RSpec.describe Message do
  it 'has a JSON representation' do
    allow(Random).to receive(:rand).and_return(1)
    factory = NodeFactory.new
    message = Message.new(factory.client, factory.server, 'contents')

    hash = {
      join_cluster: {
        source: { id: 1, ip: '127.0.0.1', port: '8000' },
        destination: { id: -1, ip: '127.0.0.1', port: '8001' },
        contents: 'contents'
      }
    }
    expect(message.to_json).to eq(hash.to_json)
  end
end

That hash representation of a Node came in handy here. The message class is very simple:


# frozen_string_literal: true

require 'json'

class Message
  attr_reader :from, :to, :contents

  def initialize(from, to, contents)
    @from = from
    @to = to
    @contents = contents
  end

  def to_json(_ = nil)
  {
    joincluster: {
      source: from.to_h,
      destination: to.toh,
        contents:
      }
    }.to_json
  end
end

To parse the JSON response from the network into a Message object, we use a Response class that has a message method. The tests look like this:


# frozen_string_literal: true

require 'parsers/response'

RSpec.describe Parsers::Response do
  describe '#message' do
    def json
      {
        join_cluster: {
          source: { id: 1, ip: '127.0.0.1', port: '8000' },
          destination: { id: -1, ip: '127.0.0.1', port: '8001' },
          contents: 'contents'
        }
      }.to_json
    end

    it 'parses JSON into a message' do
      allow(Random).to receive(:rand).and_return(1)
      message = described_class.new(json).message

      expect(message.from.id).to eq(1)
      expect(message.to.id).to eq(-1)
      expect(message.contents).to eq('contents')
    end
  end
end

And the Response class:


# frozen_string_literal: true

require 'json'

require_relative '../message'
require_relative '../node_factory'

module Parsers
  class Response
    def initialize(json)
      @response = JSON.parse(json, symbolize_names: true)
    end

    def message
      from = NodeFactory.new.from_hash(source)
      to = NodeFactory.new.from_hash(destination)

      Message.new(from, to, contents)
    end

    private

    attr_reader :response

    def source
      response[:join_cluster][:source]
    end

    def destination
      response[:join_cluster][:destination]
    end

    def contents
      response[:join_cluster][:contents]
    end
  end
end

Point 2 is done!

Ping pong

When I first start a node (starting a node is just starting the app from a specific machine), it tries to connect to a server node, then starts itself as a server, listening to messages from other nodes.

I need a class to handle connections in an out, receiving and sending, using Ruby sockets. What I described in point 3 is the logic to wire it all together, and can be translated to tests for the main class of the application, which I called PingPong, like this:


# frozen_string_literal: true

require 'connection'
require 'node_factory'
require 'ping_pong'
require 'ui'

RSpec.describe PingPong do
  subject(:ping_pong) { described_class.new(factory, connection, UI.new) }

  let(:server) { NodeFactory.new.server }
  let(:client) { NodeFactory.new.client }
  let(:factory) { instance_double(NodeFactory, server:, client:) }
  let(:connection) { instance_double(Connection).as_null_object }

  before { allow($stdout).to receive(:puts) }

  describe '#play' do
    it 'creates server node' do
      ping_pong.play
      expect(factory).to have_received(:server)
    end

    it 'creates client node' do
      ping_pong.play
      expect(factory).to have_received(:client)
    end

    it 'prints client node information' do
      expect { ping_pong.play }.to output(/Node #{client.id}/).to_stdout
    end

    it 'tries to connect to the server node' do
      ping_pong.play
      expect(connection).to have_received(:connect_to_cluster).with(client, server)
    end

    it 'makes client the server and starts listening' do
      ping_pong.play
      expect(connection).to have_received(:listen_to_cluster).with(client)
    end
  end
end

UI is just a class to print messages to the terminal. The main class of the application looks like this:


# frozen_string_literal: true

class PingPong
  def initialize(factory, connection, ui)
    @factory = factory
    @connection = connection
    @ui = ui
  end

  def play
    server = factory.server
    client = factory.client
    ui.print_node(client)

    connection.connect_to_cluster(client, server)

    server = client
    connection.listen_to_cluster(server)
  end

  private

  attr_reader :factory, :connection, :ui
end

We create client and server from the user options (or defaults), the client tries to connect to the server and they exchange messages. Then the client node starts itself as a server, listening to other nodes who connect later.

So what happens when we connect_to_cluster? We open a TCP connection between the client machine and the server machine. These are just two computers running this same app.

  • If the client is the first to start the app and there are no other machines running the app, no connection will happen, and it will just start itself as a server.

  • If another machine is running the app and its address corresponds to the address that the client is trying to connect to, they will exchange messages. The client will send a hardcoded request message asking to join the cluster. The other machine will reply with a hardcoded response message.

A client socket is created in Ruby using TCPSocket.new or TCPSocket.open. Then you can use puts and gets on the socket to send and receive streams through the network. The method send_and_receive could have a better name, but since I am learning, I wanted to make it dead obvious that this is where the message exchange happens between the two nodes.


class Connection
  # ...

  def connect_to_cluster(from, to)
    client = TCPSocket.open(to.ip, to.port)
    ui.print_is_connected(from.id, to.address)
    send_and_receive(client, from, to)
    client.close
  rescue StandardError => e
    ui.print_connection_error(e, from.id)
  end

  private

  attr_reader :ui, :maximum

  def send_and_receive(client, from, to)
    send(client, request_message(from, to))
    receive(client)
  end

  def send(client, message)
    client.puts(message.to_json)
  end

  def receive(client)
    message = Parsers::Response.new(client.gets).message
    ui.print_message(message)
    message.from
  end

  def request_message(from, to)
    Message.new(from, to, ui.request_contents(from))
  end
end

Here is where we use the JSON representation of a message that we mentioned before. Also, the receive method returns the sender node (message.from), but we are not going to use it here. We'll use it when listening.

The hardcoded message is in the UI class:


class UI
  # ...

  def request_contents(from)
    "Node #{from.id} is asking to join cluster"
  end
end
If we wanted the user to type something instead of setting a hardcoded message, we could change this method to:

class UI
  # ...

  def request_contents(from)
    print("Node #{from.id}: ")
    gets.chomp
  end
end

The connect_to_cluster method is written from the point of view of the client. So we first send a message and then receive a message. And what happens when we listen_to_cluster? Exactly the same, but in the opposite order, so we first receive and then we send.

But now we need a server, so we use TCPServer instead. And we also want to listen to many clients, so we open the connections inside a loop. I didn't want an infinite loop as we may have a maximum number of clients that we can handle before the server machine uses all its resources. So I set the maximum to an arbitrary number (10). Finally, we don't want the next client to have to wait until the server has finished with the previous client. So we listen to each client in a separate thread.


class Connection
  EXIT_CODE = 130

  def initialize(ui, maximum = 10)
    @ui = ui
    @maximum = maximum
    Thread.abort_on_exception = true
  end

  # ...

  def listen_to_cluster(from)
    server = TCPServer.open(from.ip, from.port)
    ui.print_is_listening(from.id)
    receive_and_send(server, from)
    server.close
  rescue StandardError => e
    ui.print_listening_error(e, from.id)
  rescue Interrupt
    Kernel.exit(EXIT_CODE)
  end

  private

  attr_reader :ui, :maximum

  # ...

  def receive_and_send(server, from)
    maximum.times do
      Thread.start(server.accept) do |client|
        to = receive(client)
        send(client, response_message(from, to))
        client.close
      end.join
    end
  end

  # ...

  def response_message(from, to)
    Message.new(from, to, ui.response_contents(to))
  end
end

Here I am using the sender node sent by receive (message.from) and using it as our to for the send method.

The reason I rescue from Interrupt explicitly is because this method will hang the app waiting for a node to connect (typical server behaviour), so I can stop the app with Ctrl + C and exit gracefully. Otherwise, an exception will blow up in our face. 130 is just the code for Ctrl + C.

With this, point 3 is done and point 4 was done before.

Command line

It is also possible to manually pass a specific address to connect to via command-line options.

Ruby comes with the handy optparse library which allows to define custom command-line options and what happens when those are passed to our app.

I encapsulate this logic in an Arguments class which will take the user entered options and use the parser defined with OptionParser.new to return the Options object I mentioned before. You can just print this parser object when you want to print the help and it will generate a nicely formatted output with the options you defined.


module Parsers
  class Arguments
    # ...

    def parse(arguments)
      parser.parse!(arguments)
      options
    rescue StandardError => e
      puts("ERROR: #{e.message}\n\n")
      print_help
    end

    private

    def print_help
      puts(parser)
      Kernel.exit
    end
  end
end

I need to exit with Kernel.exit both when there is an error or when the user requests to print the help, otherwise, the default options will be passed to the app and the app will start running.

I am doing very basic validation of this user entered input. Usually I would create a Validation class for this, but it would be overkill here as I only have two options I can enter, and I don't need super complex validation. For example, for the server address to connect to, I am using the fact that both IPAddr and Integer will throw an error if what you are passing is not right:


module Parsers
  class Arguments
    # ...

    def read_address
      parser.on('-a ADDRESS', '--address=ADDRESS', MESSAGES[:address]) do |address|
        ip, port = address.split(':')
        options.address = [IPAddr.new(ip), Integer(port)].join(':')
      end
    end

    # ...
  end
end

And that's it, we are done.

Running the nodes

With everything in place I can now get our bin/app script ready and wire up everything needed to run the app:


# !/usr/bin/env ruby
# frozen_string_literal: true

$LOAD_PATH.unshift File.join(File.dirname(__FILE__), '..', 'lib')

require 'connection'
require 'node_factory'
require 'parsers/arguments'
require 'ping_pong'
require 'ui'

options = Parsers::Arguments.new.parse(ARGV)
factory = NodeFactory.new(options)

ui = UI.new
connection = Connection.new(ui, 5)

PingPong.new(factory, connection, ui).play

You can print the help and exit:


$ bundle exec ruby bin/app -h
Usage: ruby bin/app [options]
    -h, --help             Prints this help.
    -a, --address=ADDRESS  IP:port of the node to connect to. Default is 127.0.0.1:8001.
    -p, --port=PORT        Port to run this node on. Default is 8000.

Trying in my local machine

Let's try it with two nodes on my local machine listening in localhost and two different ports:


$ bundle exec ruby bin/app

Node 74299309 at 127.0.0.1:8000

ERROR: Connection refused - connect(2) for "127.0.0.1" port 8001.
Node 74299309 couldnt connect to cluster.
Will start node 74299309 as server.

It tries to connect to the server in the default address, but there is none, because this is the first "node". Now that I have this node running as a server at 127.0.0.1:8000 I can run the app in another terminal but in port 8001:


$ bundle exec ruby bin/app -a 127.0.0.1:8000 -p 8001

Node 99112954 at 127.0.0.1:8001

Node 99112954 established connection with 127.0.0.1:8000.

  Message info:
    from: node at 127.0.0.1:8000
    to:   node at 127.0.0.1:8001
    contents: "Node 99112954 has joined cluster"

Will start node 99112954 as server.

This node found the server, they exchanged the hardcoded messages, and then set itself as a server. In the first terminal, the message from port 8001 was received:


      Message info:
        from: node at 127.0.0.1:8001
        to:   node at 127.0.0.1:8000
        contents: "Node 99112954 is asking to join cluster"

You could now connect to port 8001 and the same thing would happen.

Trying in different physical machines

I tried this with my friend's computer. Both connected to the same WiFi, and using the "public" IP we got in the local net. This means we can not use localhost anymore, so we need to change Socket.ip_address_list.first to Socket.ip_address_list[1]. Nodes in my laptop could send and receive messages from nodes in my friend's laptop and viceversa. It works!!

Next steps

If the distributed systems course is about something, that is failure. Every lesson is about all the possible ways things can go wrong with a distributed system. This ping pong app doesn't have many ways to go wrong though: communication always happens exclusively between two nodes, and only once per pair. When I have more time I could try turning one node off or turning the router off and try different recovery scenarios.

The next step would be to build something like a chat, where each node has to send a message to all the other nodes, and where several messages can be sent between the same nodes. This means all nodes need to know about all the other nodes in the cluster. But at the moment I am not doing anything when a node "requests to join", it's just a hardcoded text but nothing happens. This node should probably need to be added to a list of nodes, and this list of nodes kept in sync with all nodes. Also, messages would probably need to be sent in order.

So here is where I see that all the problems mentioned in the course could happen.

Finally, the connection class is the one in charge of deciding what to send when something is received (it always sends the same thing, but). This needs to be handled by someone else, who understands the incoming message and manufactures a response that is relevant to that message. Like endpoints on an API.

Anyway, I learned stuff doing this and I enjoyed. Let's try that chat next!

Comments