Distributed chat

26 min. read

I've been doing Martin Kleppmann's distributed systems course, and though I am liking it and I am learning a lot, it is too theoretical. After all, it's a university course. I feel like I really need to build a distributed system of my own if I really want to sink in the knowledge.

I first built a distributed ping-pong, after realizing that the simplest distributed system is one with just two nodes. But it was very basic, each node only sent one message and it was hardcoded. At the end of that post I promise to do a chat next.

In my notes about distributed systems I talk about a multiplayer network game, where it was concluded that the best design was a central server and clients connecting to it. So my chat has a main server app running in the background and different clients that can connect and disconnect to it in order to send messages to each other.

The code can be found here, below is a walk-through, although not as detailed as for the ping pong. Please beware that I am still a distributed systems noob and this app is basically me playing with the Ruby Socket library.

Main differences

I separated the code I had in the Connection class (in the ping-pong repo) into a Server class and a Client class. Before, each node could act as both a server or a client, while here I have a central server with clients that connect to it.

The Options are reduced too, as now I just start the app as a server or as a client by adding -s or nothing respectively. Both client and server connect to the same IP and port, which is localhost:3000 by default, or you can pass a custom address with the option -a. So here we always have to start the server first, and then the clients are started in a different terminal / device / whatever.

Finally, the Options struct acts as a factory that will create an instance of a server or client depending on the parameters we passed to the command line.

This repository has two commits, one without curses and the other with curses. The first commit is the one with no curses (pun intended).

According to Wikipedia, curses is:

...a terminal control library for Unix systems, enabling the construction of text user interface apps.

Basically, it allows you to build nice terminal layouts for your CLI apps, with sections and menus and sidebars and colors and what not.

This turned out to be more complicated than I had thought, and derailed the project a bit. But why did I need curses anyway? Let's take a look at the first commit.

First commit

The Client class starts two threads, one to send messages to the server:


  def send
    Thread.new do
      kernel.loop do
        message = ui.gets
        server.puts(message)
      end
    end
  end

and one to receive messages from the server:


  def receive
    Thread.new do
      kernel.loop do
        message = server.gets.chomp
        ui.puts(message)
      end
    end
  end

The server here is again a Ruby TCPSocket instance, and the ui is a custom class to deal with writing and reading from the terminal. The reason to put these in their own thread is so that the client is not blocked while the user is typing a message, but rather can receive messages while typing.

You would have noticed the reference to kernel.loop. Why not just loop? This is so that we can test the class easily, by passing a fake kernel with a loop method that has just a custom number of loops, so that we are not looping forever in the tests (which is otherwise what we want in real life). If we don't pass anything, it uses the Ruby Kernel:


class Client
  def initialize(server, ui, kernel = Kernel)
    @server = server
    @ui = ui
    @kernel = kernel

    Thread.abort_on_exception = true
    Thread.report_on_exception = false
  end

  # ...
end

In tests I use the FakeKernel class:


# frozen_string_literal: true

class FakeKernel
  def initialize(times)
    @times = times
  end

  def loop(&block)
    times.times(&block)
  end

  private

  attr_reader :times
end

Which is used like this in the client's test class:


subject(:client) { described_class.new(server, UI.new, FakeKernel.new(2)) }
let(:server) { instance_double(TCPSocket, puts: nil) }

If nothing comes from the server we get a NoMethodError for the line message = server.gets.chomp, this probably means the server has disconnected, so we gracefully terminate the client after informing the user of what happened:


def run
  ui.ask_for_username
  [receive, send].each(&:join)
rescue NoMethodError
  ui.print_error(UI::SERVER_DIED_ERROR)
end

This can probably be improved and we could handle more error messages but for this pet project it's enough.

The full Client class:


# frozen_string_literal: true

class Client
  def initialize(server, ui, kernel = Kernel)
    @server = server
    @ui = ui
    @kernel = kernel

    Thread.abort_on_exception = true
    Thread.report_on_exception = false
  end

  def run
    ui.ask_for_username
    [receive, send].each(&:join)
  rescue NoMethodError
    ui.print_error(UI::SERVER_DIED_ERROR)
  end

  private

  attr_reader :server, :ui, :kernel

  def receive
    Thread.new do
      kernel.loop do
        message = server.gets.chomp
        ui.puts(message)
      end
    end
  end

  def send
    Thread.new do
      kernel.loop do
        message = ui.gets
        server.puts(message)
      end
    end
  end
end

The server accepts each client connection in its own thread, so that several clients can connect to it:


class Server
  # ...

  def run
    threads = []
    kernel.loop do
      threads << Thread.start(server.accept) { |client| connect(client) }
    end
    threads.each(&:join)
  end

  #...
end

As mentioned at the end of the ping pong post, the server keeps track of all the clients, for which I created a ClientList class:


require_relative 'client_list'

class Server
  attr_reader :clients

  def initialize(server, ui, kernel = Kernel)
    @server = server
    @ui = ui
    @kernel = kernel

    @clients = ClientList.new(Mutex.new)
  end

  #...
end

The ClientList class keeps the list of clients updated and uses a Ruby Mutex, so that two clients trying to write to this list at the same time from different threads are handled gracefully. The clients are kept in a hash where the usernames are the hash keys, and a client is considered unique when both the username and client socket are unique (this design has room for improvement but for now it's fine):


class ClientList
  def initialize(mutex)
    @mutex = mutex
    @clients = {}
  end

  def add(username, client)
    return if clients[username]

    return if clients.values.find { |repeated| repeated == client }

    mutex.synchronize { clients[username] = client }
  end

  #...
end

The broadcastable clients are all the clients in the list except the one sending the message:


def broadcastable(username)
  clients.except(username)
end

In the course, nodes can broadcast to themselves, which would be interesting to implement here (i.e. instead of printing the user message from its client, we wait until it is sent to the server, and when we receive it back, we print it like the messages from the other clients).

The ClientList class in full:


# frozen_string_literal: true

class ClientList
  def initialize(mutex)
    @mutex = mutex
    @clients = {}
  end

  def add(username, client)
    return if clients[username]

    return if clients.values.find { |repeated| repeated == client }

    mutex.synchronize { clients[username] = client }
  end

  def remove(username)
    mutex.synchronize { clients.delete(username) }
  end

  def broadcastable(username)
    clients.except(username)
  end

  def users
    clients.keys
  end

  private

  attr_reader :clients, :mutex
end

Coming back to the server, in the connect method, we receive the username from each client and then add it to the client list:


def connect(client)
  received = client.gets
  return error(client) unless received

  username = received.chomp.to_sym
  return duplicate_error(client) unless clients.add(username, client)

  report_user_joined(username, client)
  start_chat(username, client)
end

If a client disconnects for some reason, we recover gracefully, killing the thread where that client was. Same if the username or client is duplicated, only we also inform the client:


def duplicate_error(client)
  client.puts(UI::DUPLICATE_CLIENT_ERROR)
  error(client)
end

def error(client)
  client.flush
  client.close
  Thread.kill(Thread.current)
  nil
end

If none of these things happen, we consider the connection successful, so we report to all clients in the list that a new user has joined the chat.

Then we start the chat, which is a loop that receives messages from the client and broadcasts them to all the clients in the list:


def start_chat(username, client)
  kernel.loop do
    received = client.gets
    break client_error(username, client) unless received

    message = ui.format_message(received, username)
    broadcast(username, message)
  end
end

If a client falls or the user leaves (by pressing Ctrl + C), we remove the client from our list and broadcast to everyone that the user left:


def client_error(username, client)
  clients.remove(username)
  report_user_left(username)
  error(client)
end

Finally, the Chat class wires all up, we pass it a node, which can be an instance of the Client class or an instance of the Server class, depending on the options we passed in the command line. Pressing Ctrl + C will trigger the Interrupt exception, which will allow us to exit gracefully. If there are any other errors they will be captured by the StandardError rescue block. Using Kernel.exit instead of just exit is again so that we can fake it easily in tests:


# frozen_string_literal: true

class Chat
  EXIT_CODE = 130

  def initialize(node, ui)
    @node = node
    @ui = ui
  end

  def run
    node.run
  rescue Interrupt
    Kernel.exit(EXIT_CODE)
  rescue StandardError => e
    ui.print_error(e.message)
  end

  private

  attr_reader :node, :ui
end

And the app that runs it all, either as client or as server (the options acts as a factory that will create an instance of a server or client depending on the parameters we passed to the command line):


#!/usr/bin/env ruby
# frozen_string_literal: true

$LOAD_PATH.unshift File.join(File.dirname(__FILE__), '..', 'lib')

require 'chat'
require 'parsers/arguments'
require 'ui'

options = Parsers::Arguments.new.parse(ARGV)
Chat.new(options.node, UI.new).run

Second commit

When I ran the chat, I noticed a small bug. Messages you receive from the server and characters you type are both printed to the same terminal. In other words, you may be typing a sentence and if you receive an incoming message before you are finished, it will be printed in the middle of your sentence. Yes, it's a mess, from a usability point of view. Although when you hit Enter, your message will be printed and sent correctly.

This is because we listen to the server in one thread and we respond in another independent thread. So when something arrives, it doesn't wait for you to finish typing. It just prints it there.

So how can we separate the "input" area from the "display-chat" area? The curses gem is introduced in the second commit to avoid this issue by separating the terminal in two areas. But why did I say this then:

This turned out to be more complicated than I had thought, and derailed the project a bit.

Well, turns out that if what you want to do with curses is a chat input, you have to micromanage the basic concept of typing. Moving the cursor left or right, inserting characters, deleting characters, etc.

You also have to micromanage resizing elements on screen resize, or making the input window higher when you type a long message and shrinking the display chat window to accomodate for that. Scrolling also needs to be hardcoded... I started working on this and couldn't make it work completely, so I decided to leave it for later when I am more inspired or have time to do more research. So the chat only implements the basic typing stuff for now (don't type long messages lol). This micromanagement took the longest when implementing a curses UI in this chat.

Other than that, my layout is very simple: a header at the top, a chat display area to show all the messages, and an input section at the bottom, where the user types. This did indeed solve the problem of messages being printed in the same position of the cursor where you are typing, and it works very nicely.

The only change is a new ClientUI to encapsulate all the curses logic, and a bunch of helper classes for dealing with curses details in the curses_ui folder.

For the micromanaging logic, I had to keep both an Input class and a Message class. One handles what the user sees in the terminal, the other builds the message that will be sent to the network. The reason for the existence of the Message class is that I couldn't find a way to get the text out of a curses window: it seems to behave like the canvas element in HTML: the stuff you draw inside, once you draw it, is irrecoverable pixels. Also, the manipulations on the Input class work on a curses window, while the ones in the Message class work on a simple string, so the code is different in both to achieve the same logic.

My least liked piece of code is in the ClientUI class. This class orchestrates all the elements in the layout: header, chatbox and input. Because I have to check what the user inputs, character by character, and do one thing or the other depending what the user entered, I ended up with a while loop that looks like this:


def read_message # rubocop:disable Metrics/MethodLength, Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
  while (key = input.read)
    return enter if key.enter?

    next move_left if key.left?
    next move_right if key.right?
    next if key.up?
    next if key.down?
    next move_home if key.home?
    next move_end if key.end?
    next backspace if key.backspace?
    next delete if key.delete?

    insert(key.to_s)
  end
end

This is the micromanaging I was talking about. If you don't do this, the left arrow doesn't do anything, same for the other keys. It builds the message character by character. But just look at all those # rubocop:disable.

My eyes cry blood 😭.

Taking into account the logic that needs to happen here, I dunno how to improve this without adding a bunch of classes, which will just be "more code".

With all this, insertion still works a little funny... oh well, too much time invested in this already 🙄.

Next steps

I haven't really played yet with what happens if problems appear. Like the problems mentioned in the course.

I can recover from "nothing coming from the server" (the client is told to finish gracefully) or, "client disconnected" (server updates client list) but not from other stuff. Like slow networks. Or latency due to nodes being in different countries for example, how do I ensure the order of the messages in that case? Different users will see them in different order. Or resend a message when it couldn't be delivered for some reason. Etc. etc.

Using curses was useful but time consuming. So I'll have to experiment with those things in another moment. The important thing is that I learned things with this experiment. And I built a chat that works 🙂.

I am not too happy with the tests in the second commit though, there is a lot of mocking because curses owns the terminal and that interferes with the test output and even debugging with pry. I need more time to research if there is something to test curses apps without so many mocks. Also having sockets, threads, infinite loops and curses windows all together made this app really challenging to test properly!

I also feel like the server can be refactored, I am passing client around everywhere which is usually a sign that a class is hiding inside the server. But when I attempted a refactor, I didn't get anywhere, I didn't find the right abstraction. I thought I could extract a User class for every client, that takes a client as a constructor argument. But that didn't clean or simplify anything.

Comments