Follow me
RSS feed
My sources
My Viadeo

Créer un système de messagerie instantanée avec Ruby, AMQP et Ncurses

Greg | 11 Nov 2011

Projets Dans ce post, nous allons nous amuser à mettre en place un système de messagerie instantanée en utilisant AMQP. AMQP est un protocole permettant de gérer des échanges de messages entre applications. Il s'agit d'un protocole ouvert dont il existe de multiples implémentations. Dans cet exemple, nous utiliserons Ruby pour créer notre système de chat.

Afin de rendre notre client de messagerie un peu plus convivial, nous nous amuserons à mettre en place une interface utilisateur en mode texte avec Ncurses. Aucun rapport avec AMQP bien entendu, il s'agit simplement de s'amuser un peu ;)

Autant vous prévenir tout de suite, je ne vais faire qu'effleurer les deux sujets (AMQP et Ncurses). Mon objectif étant de vous faire toucher du doigt ces différentes technologies, en espérant attirer votre curiosité, et en vous laissant le plaisir d'approfondir ces sujets par vous même.

AMQP

Avant de se lancer dans le développement, il faut comprendre les mécanismes d'AMQP. Pour cela, regardons le schéma1 suivant :

AMQP Model by Wikipedia

Comme vous pouvez le voir, le fonctionnement est très simple. Nous avons des applications qui envoient des messages à des exchanges sur le serveur. Les exchanges se chargent de router les messages vers des queues. Les clients s'étant abonnés à ces queues recevront alors les messages. Bien entendu, les clients produisant et consommant les messages peuvent être les mêmes.

Jusque-là, rien de bien compliqué. Ce qu'il faut bien comprendre, ce sont les différents types d'exchanges et le fonctionnement des queues. Avant de voir cela, voyons sommairement comment mettre en place un client.

La première étape consiste à se connecter au broker. Une fois la connexion établie, nous créons un channel à partir duquel nous mettons en place un exchange et une queue. Pour envoyer un message, il suffit de le publier auprès de l'exchange. A l'autre bout, pour recevoir les messages, nous allons souscrire à la queue. Voici un exemple :

 1 require "rubygems"
 2 require "amqp"
 3 
 4 AMQP.start("amqp://localhost") do |connection|
 5   channel  = AMQP::Channel.new(connection)
 6   queue    = channel.queue("example.hello", :auto_delete => true)
 7   exchange = channel.direct("")
 8 
 9   queue.subscribe do |headers, payload|
10     puts "Message reçu: #{payload}"
11 
12     connection.close {
13       EM.stop { exit }
14     }
15   end
16 
17   exchange.publish "Hello, world!", :routing_key => queue.name
18 end

Nous utilisons la méthode AMQP.start (ligne 4) pour initialiser la connexion. Cette méthode prend en paramètre l'URL de connexion au broker. Nous créons ensuite le channel, la queue et l'exchange (lignes 5 à 7). Pour la queue, vous noterez l'utilisation du paramètre :auto_delete permettant d'indiquer que l'exchange peut être supprimé quand il n'y a plus de queue qui l'utilise. A la ligne 9, nous nous inscrivons auprès de la queue afin de récupérer les messages. La méthode subscribe prend en paramètre un bloc recevant les informations d'entête des messages (headers) ainsi que le corps de ces messages (payload). Dans le bloc, nous affichons simplement le contenu du message et nous fermons la connexion avec le broker. Vous remarquerez que j'utilise un EM.stop lors de la fermeture de la connexion. En effet, la gem amqp utilise EventMachine. Nous aurions d'ailleurs pu utiliser la syntaxe suivante pour notre exemple :

EventMachine.run do
  connection = AMQP.connect("amqp://localhost")
  # ...
end

Pour envoyer un message, nous utilisons la méthode publish (ligne 17). Nous utilisons l'option :routing_key afin d'indiquer à l'exchange comment il doit router le message (vers quelle queue).

Les exchanges et les queues sont gérés par le serveur (aussi appelé broker). Nous n'allons pas écrire ce serveur. Le but de ce post n'étant pas d'implémenter le protocole AMQP2 mais de l'utiliser. Nous allons donc utiliser une implémentation existante. Pour cela vous avez le choix entre3 OpenAMQ, StormMQ, RabbitMQ ou Qpid. Pour la partie client, j'utilise Ruby avec la gem amqp.

Les exchanges

Comme je l'ai indiqué, les exchanges reçoivent et routent les messages. Il existe quatre types d'exchanges :

direct

Cet exchange est utilisé pour de l'échange 1:1. Il utilise une clé de routage pour délivrer les messages à une queue. Pour voir cela, écrivons un client :

# client.rb
require "rubygems"
require "amqp"

who = ARGV[0]

AMQP.start("amqp://localhost") do |connection|
  channel  = AMQP::Channel.new(connection)
  queue    = channel.queue("hello.#{who}", :auto_delete => true)

  queue.subscribe do |headers, payload|
    puts "Message reçu: #{payload}"

    connection.close {
      EM.stop { exit }
    }
  end
end 

Et un serveur :

# server.rb
require "rubygems"
require "amqp"

who = ARGV[0]
puts "Say hello to #{who} via hello.#{who}"

AMQP.start("amqp://localhost") do |connection|
   channel  = AMQP::Channel.new(connection)
   exchange = channel.direct("")

   exchange.publish "Bonjour #{who}!", :routing_key => "hello.#{who}"

   Thread.new do
      sleep 1
      connection.close {
         EM.stop { exit }
      }
   end
end

Nous utilisons l'option :routing_key, lors de l'envoi du message (method publish), pour préciser à quelle queue doit être envoyé le message. Si vous démarrez plusieurs clients, avec, en paramètre des noms différents, et que vous exécutez plusieurs fois le serveur avec ces mêmes noms, vous verrez que chaque client recevra le bon message.

amqp-chat.rb

fanout

Cet exchange envoie les messages vers toutes les queues qui lui sont rattachées, en ignorant les clés de routage. Pour vérifier cela, modifier le client de la façon suivante :

# client.rb
require "rubygems"
require "amqp"

who = ARGV[0]

AMQP.start("amqp://localhost") do |connection|
  channel  = AMQP::Channel.new(connection)
  exchange = channel.fanout("public.message")
  
  channel.queue("hello.#{who}", :auto_delete => true).bind(exchange).subscribe do |headers, payload|
    puts "Message reçu: #{payload}"

    connection.close {
      EM.stop { exit }
    }
  end
end

Dans ce code, nous déclarons un exchange avec le nom public.message, et nous utilisons la méthode bind pour lier l'exchange à la queue. Modifiez ensuite le serveur :

# server.rb
require "rubygems"
require "amqp"

who = ARGV[0]
puts "Say hello to #{who} via hello.#{who}"

AMQP.start("amqp://localhost") do |connection|
   channel  = AMQP::Channel.new(connection)
   exchange = channel.fanout("public.message")

   exchange.publish "Bonjour #{who}!", :routing_key => "hello.#{who}"

   Thread.new do
      sleep 1
      connection.close {
         EM.stop { exit }
      }
   end
end

Ici, nous faisons attention d'utiliser l'exchange de même nom. Si vous testez en lançant plusieurs clients, avec des noms différents, vous verrez que dans tous les cas, quand nous envoyons un message, il sera reçu systématiquement par tous les clients.

amqp-chat.rb

topic

Les exchanges de type topic permettent de router des messages vers une ou plusieurs queues en utilisant des clés de routages qui peuvent être décrites sous forme de pattern.

Les patterns utilisés pour décrire les clés de routages peuvent utiliser 2 caractères spéciaux :

Ainsi, la clé de routage user.# matchera aussi bien user.greg que user.greg.private ou encore user. Par contre la clé user.* matchera user.greg, user.mu ou user.bene, mais pas user, ni user.greg.private.

Voici un exemple d'utilisation :

# topic.rb
require "rubygems"
require "amqp"

AMQP.start("amqp://localhost") do |connection|
    channel  = AMQP::Channel.new(connection)
    exchange = channel.topic("os.info", :auto_delete => true)

    channel.queue("comp.os.unix").bind(exchange, :routing_key => "comp.os.unix.#").subscribe do |headers, payload|
      puts "Reçu par comp.os.unix: #{payload}"
    end
    channel.queue("comp.os.unix.linux").bind(exchange, :routing_key => "comp.os.unix.linux").subscribe do |headers, payload|
      puts "Reçu par comp.os.unix.linux: #{payload}"
    end
    channel.queue("comp.os.unix.bsd").bind(exchange, :routing_key => "comp.os.unix.bsd").subscribe do |headers, payload|
      puts "Reçu par comp.os.unix.bsd: #{payload}"
    end
    channel.queue("comp.os.mac").bind(exchange, :routing_key => "comp.os.mac").subscribe do |headers, payload|
      puts "Reçu par comp.os.mac: #{payload}"
    end
    channel.queue("comp.os").bind(exchange, :routing_key => "comp.os.*").subscribe do |headers, payload|
      puts "Reçu par comp.os: #{payload}"
    end

    EventMachine.add_timer(1) do
      exchange.publish("Hello Linux!", :routing_key => "comp.os.unix.linux")
      exchange.publish("Hello BSD!", :routing_key => "comp.os.unix.bsd")
      exchange.publish("Hello Mac!", :routing_key => "comp.os.mac")
      exchange.publish("Hello Unix!", :routing_key => "comp.os.unix")
    end

    EM.add_timer(3, Proc.new { connection.close { EventMachine.stop } })
end

amqp-chat.rb

headers

Les exchanges de types headers utilisent un système de routage basé sur les données d'entête des messages pour dispatcher ces derniers vers les queues.

En effet, il est possible d'attacher aux messages envoyés, des informations personnalisées. Voyons le code suivant :

# client.rb
require "rubygems"
require "amqp"

who = ARGV[0]

AMQP.start("amqp://localhost") do |connection|
   channel  = AMQP::Channel.new(connection)
   exchange = channel.fanout("headers.example")

   channel.queue("headers.queue", :auto_delete => true).bind(exchange).subscribe do |headers, payload|
      puts "Message reçu: #{payload}"
      p headers.attributes[:headers]

      connection.close {
         EM.stop { exit }
      }
   end

   exchange.publish "Bonjour #{who}!", :headers => { :toto => "hello", :titi => 42 }

   Thread.new do
      sleep 1
      connection.close {
         EM.stop { exit }
      }
   end
end

Si vous exécutez cet exemple, vous verrez que nous récupérons, via la clé :headers de l'attribut attributes des headers ce qui a été passé via l'option :headers lors de la publication du message.

C'est sur ce type de données que vont se baser les exchanges de type headers. Pour cela, nous utiliserons l'option :arguments lors du rattachement de l'exchange à une queue :

queue.bind(exchange, :arguments => { :toto => hello, :titi => 42 })

Le hash passé comme valeur de l'option :arguments peut également contenir un tuple avec la clé x-match indiquant comment doit se faire le matching. Dans ce tuple, la valeur possible peut-être :

require "rubygems"
require "amqp"

AMQP.start("amqp://localhost") do |connection|
    channel  = AMQP::Channel.new(connection)
    exchange = channel.headers("os.info", :durable => true)

    channel.queue("comp.os.unix").bind(exchange, :arguments => { :os => "unix" }).subscribe do |metadata, payload|
      puts "Reçu par comp.os.unix: #{payload}"
    end
    channel.queue("comp.os.unix.linux").bind(exchange, :arguments => { :os => "unix", :name => "linux" }).subscribe do |metadata, payload|
      puts "Reçu par comp.os.unix.linux: #{payload}"
    end
    channel.queue("comp.os.unix.bsd").bind(exchange, :arguments => { :os => "unix", :name => "bsd" }).subscribe do |metadata, payload|
      puts "Reçu par comp.os.unix.bsd: #{payload}"
    end
    channel.queue("comp.os.mac").bind(exchange, :arguments => { 'x-match' => 'any', :os => "mac", :version => "10.7" }).subscribe do |metadata, payload|
      puts "Reçu par comp.os.mac: #{payload}"
    end
    channel.queue("comp.os").bind(exchange).subscribe do |metadata, payload|
      puts "Reçu par comp.os: #{payload}"
    end

    EventMachine.add_timer(1) do
      exchange.publish("Hello Linux!", :headers => { :os => "unix", :name => "linux" })
      exchange.publish("Hello BSD!", :headers => { :os => "unix", :name => "bsd" })
      exchange.publish("Hello Mac!", :headers => { :os => "mac" })
      exchange.publish("Hello Unix!", :headers => { :os => "unix" })
    end

    EM.add_timer(3, Proc.new { connection.close { EventMachine.stop } })
end

amqp-chat.rb

Les queues

Avant de continuer, et avancer sur l'objectif de ce post, qui est de faire un système de messagerie instantanée, je voudrais juste faire un aparté sur les queues. Vous avez certainement remarqué que j'ai souvent utilisé l'option :auto_delete lors de la création de queues. Cette option permet de demander à ça que la queue soit détruite quand elle n'est plus utilisée par aucun client. Par défaut ce n'est pas le cas, et c'est plutôt une bonne chose. Il est ainsi possible d'envoyer des messages dans une ou plusieurs queues, qui seront consommés par des clients qui ne sont pas connectés.

Pour illustrer cela, voici le code deux deux clients et un serveur :

# client1.rb
require "rubygems"
require "amqp"

who = ARGV[0]

AMQP.start("amqp://localhost") do |connection|
  channel  = AMQP::Channel.new(connection)
  exchange = channel.fanout("public.message")
  
  channel.queue("hello.client1").bind(exchange).subscribe do |headers, payload|
    puts "Message reçu: #{payload}"

    connection.close {
      EM.stop { exit }
    }
  end
end

Ce premier client utilise la queue hello.client1.

# client2.rb
require "rubygems"
require "amqp"

who = ARGV[0]

AMQP.start("amqp://localhost") do |connection|
  channel  = AMQP::Channel.new(connection)
  exchange = channel.fanout("public.message")
  
  channel.queue("hello.client2").bind(exchange).subscribe do |headers, payload|
    puts "Message reçu: #{payload}"

    connection.close {
      EM.stop { exit }
    }
  end
end

Ce second client utilise la queue hello.client2.

require "rubygems"
require "amqp"

who = ARGV[0]

AMQP.start("amqp://localhost") do |connection|
   channel  = AMQP::Channel.new(connection)
   exchange = channel.fanout("public.message")
   channel.queue("hello.client1").bind(exchange)
   channel.queue("hello.client2").bind(exchange)

   exchange.publish "[#{Time.now}] Bonjour clients!"

   EM.add_timer(0.5, Proc.new { connection.close { EventMachine.stop } })
end

Le serveur va également créer les queues hello.client1 et hello.client2 avant de publier son message. Ainsi, les queues n'étant jamais détruites, les clients pourront récupérer le message envoyé par le serveur, y compris s'ils sont démarrés après.

amqp-chat.rb

Dans cet exemple, j'utilise RabbitMQ comme broker et la commande rabbitmqctl pour lister l'état des queues.

Première version

Maintenant que nous savons comment utiliser AMQP, nous pouvons écrire une première version de notre système de chat :

 1 #!/urs/bin/env ruby
 2 
 3 require "rubygems"
 4 require "amqp"
 5 
 6 nickname = ARGV[0]
 7 raise "Usage : #{$0} <nickname>" if nickname.nil?
 8 
 9 AMQP.start("amqp://localhost") do |connection|
10    channel  = AMQP::Channel.new(connection)
11    exchange = channel.fanout("amqp.chat.v1")
12 
13    queue = channel.queue(nickname, :auto_delete => true)
14 
15    queue.bind(exchange).subscribe do |headers, payload|
16       puts "#{payload}"
17    end
18 
19    exchange.publish "** #{nickname} enter"
20 
21    Thread.new do
22       while true do
23          message = STDIN.gets.chomp
24          case message
25          when /^\/quit\s*(.*)/
26             exchange.publish "** #{nickname} has quit (#{$1})"
27             connection.close {
28                EventMachine.stop
29             }
30          else
31             exchange.publish "[#{Time.now.strftime('%H:%M:%S')}] #{nickname} : #{message}"
32          end
33       end
34    end
35 end

Si vous avez bien lu la première partie de ce post, vous ne devriez pas être trop perdu avec le code ci-dessus. Je me suis contenté de mettre en place un exchange de type fanout. Chaque client se connectant créé une queue ayant comme nom le nickname choisit par l'utilisateur. Je fais ensuite une boucle infinie en capturant la saisie au clavier. Si cette saisie commence par /quit alors nous terminons la session.

Voici une capture de cette première version :

amqp-chat.rb

Seconde version : ajoutons des fonctionnalités

Afin de rendre notre système de messagerie plus intéressant, nous pouvons essayer de lui ajouter des fonctionnalités. Je vous propose les suivantes :

Comme vous pouvez l'imaginer, il va falloir changer de méthode pour gérer différents types de messages. En effet, nous avons besoin de plusieurs queues : une pour les messages publics, une pour les messages privés et une pour les messages systèmes. Je vous propose d'utiliser un exchange de type headers.

exchange = channel.headers("amqp.chat.v2")

Associé à cet exchange, chaque utilisateur créera trois queues, une pour chaque type de message.

La première queue servira à recevoir les messages publics. Elle aura donc un nom du type public.<nickname> et sera rattaché à l'exchange avec comme arguments un simple tuple :type => 'public' :

public_queue = channel.queue("public.#{nickname}", :auto_delete => true)
public_queue.bind(exchange, :arguments => { :type => 'public' })

Pour délivrer un message privé, il nous suffira donc de mettre dans les headers le tuple :type => 'public'.

exchange.publish message, :headers => {:type => 'public'}

Le second type de message est celui de type privé. Ces messages ne sont donc visibles que pour l'utilisateur concerné. Dans ce cas, nous allons créer une queue avec comme arguments un tuple :type => 'private' et un autre avec la clé :nick et comme valeur le nickname de l'utilisateur.

private_queue = channel.queue("private.#{nickname}", :auto_delete => true)
private_queue.bind(exchange, :arguments => { :type => 'private', :nick => nickname })

Pour envoyer un message privé à un utilisateur, il suffira de préciser, dans l'entête du message, que l'exchange doit router le message sur une queue de type private pour le nick de l'utilisateur à qui nous voulons envoyer notre message.

exchange.publish message, :headers => {:type => 'private', :nick => nickname}

Pour les messages systèmes, nous utiliserons simplement une queue avec comme type la valeur system :

system_queue = channel.queue("system.#{nickname}", :auto_delete => true)
system_queue.bind(exchange, :arguments => { :type => 'system' })

Nous utiliserons cette queue pour la commande /who. En effet, pour répondre à cette demande, nous enverrons un message de type system à chaque utilisateur connecté, avec dans ce message, uniquement le nickname de l'utilisateur qui fait la demande. Chaque utilisateur recevant ce message, renverra automatiquement une réponse privée (du genre "<nickname> is here") à l'utilisateur ayant fait la demande.

system_queue.bind(exchange, :arguments => { :type => 'system' }).subscribe do |headers, payload|
   exchange.publish "-- #{nickname} is here", :headers => {:type => 'private', :nick => payload.to_s}
end

Voici le code de notre seconde version :

 1 #!/usr/bin/env ruby
 2 
 3 require "rubygems"
 4 require "amqp"
 5 
 6 def help
 7    puts <<EOH
 8    /help : display this help
 9    /me <message> : send an action message
10    /privmsg <nickname> <message> : send a private message
11    /quit : Quit AMQP chat
12    /who : ask who is here
13 EOH
14 end
15 
16 nickname = ARGV[0]
17 raise "Usage : #{$0} <nickname>" if nickname.nil? 
18 
19 AMQP.start("amqp://localhost") do |connection|
20    channel  = AMQP::Channel.new(connection)
21    exchange = channel.headers("amqp.chat.v2")
22 
23    public_queue = channel.queue("public.#{nickname}", :auto_delete => true) 
24    public_queue.bind(exchange, :arguments => { :type => 'public' }).subscribe do |headers, payload|
25       puts payload.to_s
26    end 
27    private_queue = channel.queue("private.#{nickname}", :auto_delete => true) 
28    private_queue.bind(exchange, :arguments => { :type => 'private', :nick => nickname }).subscribe do |headers, payload|
29       puts payload.to_s
30    end 
31    system_queue = channel.queue("system.#{nickname}", :auto_delete => true) 
32    system_queue.bind(exchange, :arguments => { :type => 'system' }).subscribe do |headers, payload|
33       exchange.publish "-- #{nickname} is here", :headers => {:type => 'private', :nick => payload.to_s}
34    end 
35 
36    exchange.publish "** #{nickname} enter", :headers => {:type => 'public'}
37 
38    Thread.new do
39       while true
40          message = STDIN.gets.chomp
41          case message
42          when /^\/quit\s*(.*)/
43             exchange.publish "** #{nickname} has quit (#{$1})", :headers => {:type => 'public'}
44             connection.close {
45                EventMachine.stop
46             }
47             break
48          when /^\/me\s*(.*)/
49             exchange.publish "** #{nickname} #{$1}", :headers => {:type => "public"}
50          when /^\/privmsg\s*([^\s]*)\s*(.*)/
51             exchange.publish ">> [#{Time.now.strftime('%H:%M:%S')}] #{nickname} : #{$2}", :headers => {:type => 'private', :nick => $1}
52             exchange.publish ">> [#{Time.now.strftime('%H:%M:%S')}] #{nickname} : #{$2}", :headers => {:type => 'private', :nick => nickname}
53          when /^\/who/
54             exchange.publish nickname, :headers => {:type => "system"}
55          when /^\/help/
56             help
57          else
58             exchange.publish "[#{Time.now.strftime('%H:%M:%S')}] #{nickname} : #{message}", :headers => {:type => 'public'}
59          end
60       end
61    end
62 end

Et voici une capture :

amqp-chat.rb

Troisième version : une interface en Ncurses

Nous avons une version fonctionnelle, mais pas vraiment conviviale. En effet, si nous recevons un message, alors que nous sommes en train d'en rédiger un, c'est un peu le foutoir.

Nous allons améliorer cela en mettant en place une interface utilisateur grâce à Ncurses, et plus précisément, dans notre cas, à ncurses-ruby.

  1 #!/usr/bin/env ruby
  2 
  3 require "rubygems"
  4 require "ncurses"
  5 require "amqp"
  6 
  7 class ChatGui
  8    def read_line(y, x,
  9                  window     = Ncurses.stdscr,
 10                  max_len    = (window.getmaxx - x - 1), 
 11                  string     = "", 
 12                  cursor_pos = 0)
 13       loop do
 14          window.mvaddstr(y,x,string)
 15          window.move(y,x+cursor_pos)
 16          ch = window.getch
 17          case ch
 18          when Ncurses::KEY_ENTER, ?\n.ord, ?\r.ord
 19             return string
 20          when Ncurses::KEY_BACKSPACE, 127
 21             string = string[0...([0, cursor_pos-1].max)] + string[cursor_pos..-1]
 22             cursor_pos = [0, cursor_pos-1].max
 23             window.mvaddstr(y, x+string.length, " ")
 24          when (" "[0].ord..255)
 25             if (cursor_pos < max_len)
 26                string[cursor_pos,0] = ch.chr
 27                cursor_pos += 1
 28             else
 29                Ncurses.beep
 30             end 
 31          else
 32             Ncurses.beep
 33          end 
 34       end 
 35    end
 36 
 37    def add_message(message)
 38       @messages += message.split("\n")
 39       if @messages.size > @max_messages
 40          @messages.shift
 41       end
 42 
 43       refresh_messages_window
 44    end
 45 
 46    def refresh_messages_window
 47       @messages_window.clear
 48       y = 0
 49       @messages.each do |message|
 50          @messages_window.mvaddstr(y, 0, message)
 51          y = y + 1
 52       end
 53       @messages_window.refresh
 54    end
 55 
 56    def initialize(nick)
 57       @messages = []
 58       Ncurses.initscr
 59       Ncurses.cbreak
 60       Ncurses.noecho
 61       Ncurses.keypad(Ncurses.stdscr, true)
 62 
 63       @window = Ncurses.stdscr
 64       @maxy = @window.getmaxy - 1
 65       @maxx = @window.getmaxx - 1
 66 
 67       @prompt_window = Ncurses.newwin(2, @maxx, @maxy - 2, 0)
 68       @prompt = "#{nick} >"
 69 
 70       @messages_window = Ncurses.newwin(@maxy - 2, @maxx, 0, 0)
 71       @max_messages = @messages_window.getmaxy
 72    end
 73 
 74    def run(&b)
 75       loop do
 76          # refresh_messages_window
 77 
 78          @prompt_window.mvaddstr(0, 0, "-"*@maxx)
 79          @prompt_window.mvaddstr(1, 0, @prompt)
 80          message = read_line(1, @prompt.length + 1, @prompt_window)
 81          yield message
 82          @prompt_window.clear
 83       end
 84    end
 85 
 86    def quit
 87       Ncurses.endwin
 88    end
 89 end
 90 
 91 # -- main --
 92 
 93 def help(gui)
 94    gui.add_message <<EOH
 95    /help : display this help
 96    /me <message> : send an action message
 97    /privmsg <nickname> <message> : send a private message
 98    /quit : Quit AMQP chat
 99    /who : ask who is here
100 EOH
101 end
102 
103 nickname = ARGV[0]
104 raise "Usage : #{$0} <nickname>" if nickname.nil? 
105 
106 gui = ChatGui.new(nickname)
107 
108 AMQP.start("amqp://localhost") do |connection|
109    channel  = AMQP::Channel.new(connection)
110    exchange = channel.headers("amqp.chat.v3")
111 
112    public_queue = channel.queue("public.#{nickname}", :auto_delete => true) 
113    public_queue.bind(exchange, :arguments => { :type => 'public' }).subscribe do |headers, payload|
114       gui.add_message payload.to_s
115    end
116    private_queue = channel.queue("private.#{nickname}", :auto_delete => true) 
117    private_queue.bind(exchange, :arguments => { :type => 'private', :nick => nickname }).subscribe do |headers, payload|
118       gui.add_message payload.to_s
119    end
120    system_queue = channel.queue("system.#{nickname}", :auto_delete => true) 
121    system_queue.bind(exchange, :arguments => { :type => 'system' }).subscribe do |headers, payload|
122       exchange.publish "-- #{nickname} is here", :headers => {:type => 'private', :nick => payload.to_s}
123    end
124 
125    exchange.publish "** #{nickname} enter", :headers => {:type => 'public'}
126 
127    Thread.new do
128       gui.run do |message|
129          case message 
130          when /^\/quit\s*(.*)/
131             exchange.publish "** #{nickname} has quit (#{$1})", :headers => {:type => 'public'}
132             connection.close { 
133                EventMachine.stop 
134             }
135             break
136          when /^\/me\s*(.*)/
137             exchange.publish "** #{nickname} #{$1}", :headers => {:type => "public"}
138          when /^\/privmsg\s*([^\s]*)\s*(.*)/
139             exchange.publish ">> [#{Time.now.strftime('%H:%M:%S')}] #{nickname} : #{$2}", :headers => {:type => 'private', :nick => $1}
140             exchange.publish ">> [#{Time.now.strftime('%H:%M:%S')}] #{nickname} : #{$2}", :headers => {:type => 'private', :nick => nickname}
141          when /^\/who/
142             exchange.publish nickname, :headers => {:type => "system"}
143          when /^\/help/
144             help gui
145          else
146             exchange.publish "[#{Time.now.strftime('%H:%M:%S')}] #{nickname} : #{message}", :headers => {:type => 'public'}
147          end
148       end
149    end
150 end
151 
152 gui.quit

La classe ChatGui permet de gérer la partie interface avec Ncurses. Pour cela, nous mettons en place deux fenêtres, @prompt_window et @messages_window servant respectivement à la saisie des messages et à l'affichage de ces derniers. La saisie des messages est gérée par la méthode read_line. L'ajout de nouveaux messages et le rafraichissement de la fenêtre @messages_window sont dévolus à la méthode add_message. La méthode run prend en paramètre un bloc recevant en argument le message venant d'être saisi par l'utilisateur.

Le reste du code n'est rien de plus qu'une adaptation de notre seconde version ;)

Voici ce que donne cette troisième version :

amqp-chat.rb

Si vous souhaitez vous amuser, sachez que le code est également disponible ici.


1 Source : http://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol
2 Si jamais vous souhaiter poussez le jeu un peu plus loin, voici la spécification d'AMQP 1.0
3 N'hésitez pas à compléter cette liste en commentaire.

Copyright © 2009 - 2011 Grégoire Lejeune.
All documents licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 2.5 License, except ones with specified licence.
Powered by Jekyll.