Friday, August 5, 2016

Writing an Asynchronous Echo Server

Echo servers are basically the "Hello, World!" of network programming, so I'm going to step through building an asynchronous echo server using AnyEvent, although any bare-bones event loop, like EV, could be used.

This is less to do with building an echo server and more about thinking asynchronously and what complications arise from writing asynchronous code from the ground up. Asynchronous solutions in IO-heavy applications will result in much better performance than if you, for example, went with a forking model.

The forking model is often very tempting as it has a much lower barrier to entry, but is also very memory hungry as each forked process ends up with a copy of the parent processes' memory. The other problem with the forking model in an IO-heavy application, is that when disk and/or network IO is the bottleneck, adding more processes, which will only attempt more IO operations, is more likely to compound the problem rather than fix it, so it's often the wrong tool for the job, despite many people using it as such.

Knowing when your code is IO-bound is something most likely discovered with some kind of profiling tool. In the Perl world, Devel::NYTProf is the defacto standard weapon of choice for profiling and I can't count the number of times it's helped me out with identifying performance bottlenecks.

Before I get into any code, I just want to mention that all of what I'm about to discuss could be managed by AnyEvent::Handle (or your favourite event loop framework's equivalent), but, as a learning exercise, I'm doing it the hard way, to appreciate what AnyEvent::Handle abstracts away for us. The take-home message should be: use AnyEvent::Handle. If you don't use it, hopefully this will give you a taste of what you're in for.

Gotta Start Somewhere

Let's start with a pretty basic implementation of a non-blocking echo server.


#!/usr/bin/env perl

use warnings;
use strict;

use AnyEvent;
use IO::Socket;
use Socket qw/SOMAXCONN/;
use POSIX qw/EAGAIN EWOULDBLOCK EPIPE/;

use constant {
  SYSREAD_MAX => 8192,
};

my $listen = IO::Socket::INET->new(
  Listen    => SOMAXCONN,
  LocalAddr => 'localhost',
  LocalPort => 5000,
  ReuseAddr => 1,
  Blocking  => 0
) or die $!;

print "Listening on port 5000...\n";

my %clients;

my $w = AnyEvent->io(
  fh   => $listen,
  poll => 'r',
  cb   => sub {
    my $client = $listen->accept;
    $client->blocking(0);

    printf "Client connection from %s\n", $client->peerhost;

    $clients{$client}->{r} = AnyEvent->io(
      fh   => $client,
      poll => 'r',
      cb   => sub { read_data($client) }
    );
  }
);

AE::cv->recv;

sub read_data {
  my ($client) = @_;

  my $bytes = sysread $client, my $buf, SYSREAD_MAX;

  if ( not defined $bytes ) {
    if ( ( $! == EAGAIN ) or ( $! == EWOULDBLOCK ) ) {
      return;
    }
  }
  elsif ( $bytes == 0 ) {
    disconnect($client);
    return;
  }

  chomp( my $chomped = $buf );
  printf "Read %d bytes from %s: %s\n", $bytes, $client->peerhost, $chomped;

  my $w; $w = AnyEvent->io(
    fh   => $client,
    poll => 'w',
    cb   => sub {
      write_data( $client, $buf );
      undef $w;
    }
  );
}

sub write_data {
  my ( $client, $buf ) = @_;

  my $bytes = syswrite $client, $buf, length($buf);

  if ( not defined $bytes ) {
    if ( ( $! == EAGAIN ) or ( $! == EWOULDBLOCK ) ) {
      return;
    }
    elsif ( $! == EPIPE ) {
      disconnect($client);
      return;
    }
  }
  else {
    printf "Wrote %d bytes to client %s\n", $bytes, $client->peerhost;
  }
}

sub disconnect {
  my ($client) = @_;

  printf "Client %s disconnected\n", $client->peerhost;
  delete $clients{$client};
  $client->close;
}

The big issues are:

  1. The read_data() function, after reading data from the client, will blindly create more and more watchers to write data back to the client. These watchers aren't guaranteed to be run in the order they were spawned, which means we run the risk of writing data in the wrong order. The number of watchers we create will be non-deterministic, which means memory usage may also go up. In the same way that we only have one read watcher, it'd be great to only have one write watcher.
  2. The write_data() function presumes that because we asked syswrite() to write X bytes to the socket, that X bytes were actually written. Because this is a non-blocking socket, we're not guaranteed that to be the case, and if we end up in this situation, and there are many scheduled write_data() events to happen, we need to finish sending what's left of the current buffer first, otherwise we risk writing data in the wrong order.

Another big issue, which isn't the case for an echo server, but would be in the case of, for example, a HTTP server, is that the read_data() function will sysread() some data, presume it's read the entire input and then act on that input. At the moment the code reads, at most, 8192 bytes of data from the client, but a full HTTP request (e.g. a file upload) may easily exceed that.

Buffers

A way to solve these issues is with read and write buffers and one watcher to act on each buffer, so that, at most, each client connection results in two watchers being created, one to act on the read buffer, and one to act on the write buffer.


#!/usr/bin/env perl

use warnings;
use strict;

use AnyEvent;
use IO::Socket;
use Socket qw/SOMAXCONN/;
use POSIX qw/EAGAIN EWOULDBLOCK EPIPE/;

use constant {
  SYSREAD_MAX  => 8192,
  SYSWRITE_MAX => 8192,
};

my $listen = IO::Socket::INET->new(
  Listen    => SOMAXCONN,
  LocalAddr => 'localhost',
  LocalPort => 5000,
  ReuseAddr => 1,
  Blocking  => 0
) or die $!;

print "Listening on port 5000...\n";

my %clients;

my $w = AnyEvent->io(
  fh   => $listen,
  poll => 'r',
  cb   => sub {
    my $client = $listen->accept;
    $client->blocking(0);

    printf "Client connection from %s\n", $client->peerhost;

    # Each connection gets a read buffer, a write buffer, and read/write
    # watchers.
    $clients{$client}->{rbuf} = '';
    $clients{$client}->{r}    = AnyEvent->io(
      fh   => $client,
      poll => 'r',
      cb   => sub { read_data($client) }
    );

    $clients{$client}->{wbuf} = '';
    $clients{$client}->{w}    = AnyEvent->io(
      fh   => $client,
      poll => 'w',
      cb   => sub { write_data($client) }
    );
  }
);

AE::cv->recv;

What's changed is that each client socket gets exactly one read watcher, one write watcher and read and write buffers. Otherwise, everything's the same.


sub read_data {
  my ($client) = @_;

  # Read data from the client and append it to the read buffer
  my $bytes = sysread $client, $clients{$client}->{rbuf}, SYSREAD_MAX,
    length( $clients{$client}->{rbuf} );

  if ( not defined $bytes ) {
    if ( ( $! == EAGAIN ) or ( $! == EWOULDBLOCK ) ) {
      return;
    }
  }
  elsif ( $bytes == 0 ) {
    disconnect($client);
    return;
  }

  printf "Read %d bytes from %s. Read buffer: %s\n", $bytes,
    $client->peerhost, $clients{$client}->{rbuf};

  while ( ( my $i = index( $clients{$client}->{rbuf}, "\n" ) ) >= 0 ) {
    my $msg = substr( $clients{$client}->{rbuf}, 0, $i + 1, '' );
    push_write( $client, $msg );
  }
}

The main changes here are that we sysread right onto the end of the read buffer, and then we process what's in the read buffer. For an echo server, we presume that one "message" is any data that has been terminated by a newline character. So when we have received a full message, we queue it to be sent back to the client with the push_write function.


sub push_write {
  my ( $client, $msg ) = @_;

  $clients{$client}->{wbuf} .= $msg;
}

All this does is append to the write buffer. There is already a write watcher associated with this client socket, which will consume the write buffer when it's scheduled to run by the event loop.


sub write_data {
  my ($client) = @_;

  # Nothing in the write buffer?
  return unless $clients{$client}->{wbuf};

  my $bytes = syswrite $client, $clients{$client}->{wbuf}, SYSWRITE_MAX;

  if ( not defined $bytes ) {
    if ( ( $! == EAGAIN ) or ( $! == EWOULDBLOCK ) ) {
      return;
    }
    elsif ( $! == EPIPE ) {
      disconnect($client);
      return;
    }
  }
  else {
    # $bytes were successfully sent to the client, so we can remove it from
    # the write buffer.
    substr( $clients{$client}->{wbuf}, 0, $bytes ) = '';
    printf "Wrote %d bytes to client %s\n", $bytes, $client->peerhost;
  }
}

All this code does is attempt to write the contents of the write buffer to the client socket. When a chunk of data has been written successfully to the socket, the write buffer is trimmed of that data.


sub disconnect {
  my ($client) = @_;

  printf "Client %s disconnected\n", $client->peerhost;
  delete $clients{$client};
  $client->close;
}

And this function hasn't changed at all.

That works pretty damn well and we've kept the memory usage per connection as consistent as possible.

More Issues to Consider

We've got the basics down, but there's more (there's always more).

Lingering

All we've dealt with here is an asynchronous echo server. Asynchronous client code has its own issues.

In the code above, a call to push_write() simply appends data to the write buffer, but that doesn't mean that it's been successfully written to the socket yet. So, if in a client application, we wanted to now disconnect from the server after a bunch of calls to push_write, we don't want to close the connection to the server until the write buffer has been completely flushed. One solution to this is to introduce lingering, as it's called in the TCP world (see the SO_LINGER socket option).

Lingering in this case means that, for a number of seconds, the connection will hang around attempting to flush the write buffer before closing the connection. It's a simple idea that adds more complexity to our code.

Buffer Sizes

Another potential issue, when on a very slow network for example, is that the buffer sizes may grow out of control, so they may need to be capped.

TLS, Corking, Delays and More

The list goes on...

There are many socket options and behaviours that you may want to utilise depending on the nature of your application, so support for these options would need to be included.

Factoring Everything Out

The thing that's clear from the code above is that most of it is handling generic asynchronous programming issues, and only a tiny amount of it is actually specific to the functionality of an echo server.

The first time I used AnyEvent::Handle, I didn't understand why it was designed the way it was, but when the time came to implement many of these features in a proprietary event loop framework where I couldn't use AnyEvent, I soon realised I was reinventing the wheel and was finally able to appreciate AnyEvent::Handle's design.

Just for reference, here's what the echo server looks like when written with AnyEvent::Socket and AnyEvent::Handle.


#!/usr/bin/env perl

use warnings;
use strict;

use AnyEvent;
use AnyEvent::Socket;
use AnyEvent::Handle;

print "Listening on port 5000...\n";

tcp_server undef, 5000, sub {
  my ( $fh, $host, $port ) = @_;

  printf "Client connection from %s\n", $host;

  my $hdl;
  my $disconnect_f = sub {
    printf "Client %s disconnected\n", $host;
    $hdl->destroy;
  };

  $hdl = AnyEvent::Handle->new(
    fh       => $fh,
    on_eof   => $disconnect_f,
    on_error => $disconnect_f,
    on_read  => sub {
      $hdl->push_read( line => sub {
        my ( $hdl, $line ) = @_;
        printf "Read from %s: %s\n", $host, $line;
        $hdl->push_write("$line\n");
      } );
    }
  );
};

AE::cv->recv;

So, like most things, I guess the takeaway here is: don't reinvent the wheel if you don't have to, and if you do, learn from those who came before you.

No comments:

Post a Comment