Follow me
RSS feed
My sources
My Viadeo

Server side Faye’s client ou le protocole Bayeux

Greg | 24 Nov 2009

ProjetsDans mon précédent article, je vous ai montré comment mettre en place un Comet avec Capcode. J'ai découvert que certains d'entre vous lisent ce que j'écris1 par le biais de quelques mails dans ma boite aux lettres. L'un d'entre vous m'a même demandé comment faire pour envoyer des évènements sur le bus Comet sans passer par l'application Web. Bonne question, sachant qu'en effet, faye ne propose aucune solution pour cela. Et bien je me suis amusé à en développer une...

Faye

Pour comprendre ce que nous allons faire, il faut prendre le temps de regarder comment fonctionne faye. Mais avant, nous devons choisir la solution à adopter pour écrire notre librairie. En effet, nous avons deux choix possibles : soit nous pouvons parler directement au serveur faye, soit nous pouvons simuler le comportement d'un client. En fonction de ce choix, nous regarderons telle ou telle partie du code de faye. Bon et puisque c'est moi qui choisis2, j'opte pour la seconde solution.

Avant de décortiquer le code, rappelons le cheminement par l'utilisateur lors de l'utilisation de soapbox. Avant tout nous démarrons l'application3. Nous donnons notre username. Ensuite nous indiquons qui nous voulons suivre. Enfin nous racontons notre vie. Ces deux dernières étapes pouvant se faire dans n'importe quel ordre, et autant de fois que nous voulons.

Regardons maintenant le code. Lors de l'accès à l'application (route /) nous recevons la page mise en place par la vue views/index.rhtml. Dans cette vue, ce qui nous intéresse ce sont les lignes suivantes :

1 <script type="text/javascript">
2   Comet = new Faye.Client('/comet');
3   Comet.connect();
4     
5   Soapbox.init(Comet);
6 </script>

Nous créons un client Comet (ligne 41) pour le bus accessible via la route /comet puis nous établissons la connexion (ligne 42). Ensuite nous initialisons l'application (ligne 44). Si nous regardons à quoi correspondent la création et la connexion du client Comet, il faut se plonger dans le code du script comet.js. Ce fichier est généré lors de la création du gem faye à partir de l'ensemble des fichiers contenus dans le répertoire client des sources. Le fichier qui nous intéresse principalement ici est client/client.js. Dans ce fichier vous trouverez le code correspondant à la connexion entre les lignes 75 et 113.

Si nous regardons maintenant le fichier soapbox.js nous pouvons étudier ce qui se passe lors de l'initialisation de l'application. Nous voyons dans ce code que soapbox attend que l'utilisateur saisisse son username. Une fois ceci fait, l'application souscrit au channel /mentioning/<username>, elle masque la zone de saisie du username et affiche les zones de saisie des personnes à suivre et des messages. Elle met ensuite en place les actions correspondantes pour ces deux zones de saisie. La première de ces actions se traduit par la souscription au channel /from/<follow> avec comme callback : accept. La seconde revoie vers la méthode post dont le rôle principal est de publier le message sur le channel /from/<username>.

Outch !

Ce qui est important ici c'est de voir ce dont nous avons besoin pour faire cela. Nous en retiendrons donc la nécessité de développer une méthode de connexion, une méthode de souscription et une méthode de publication. Pour comprendre comment ces méthodes agissent, il suffit de regarder ce qui est fait dans les méthodes correspondantes de Faye.Client.

Je ne vais pas pouvoir, au risque de perdre plus de monde que ceux qui ont déjà lâché prise avant la fin du dernier paragraphe, entrer plus dans les détails. Il va donc falloir me croire sur parole ;) En fait, ce que nous allons faire c'est mettre en place une version cliente du protocole des échanges Comet appelé protocole Bayeux. Je vous engage donc à en lire la documentation si vous souhaitez approfondir le sujet.

Messages

Les échanges entre le client Comet et le bus peuvent se faire de deux façons, soit en long-polling soit en callback-polling. Dans notre cas, nous utilisons la première solution en faisant un envoie en POST du paramètre message, ce dernier ayant pour valeur une chaine JSON.

Mettons cela en place :

module Faye
  class Client
    def initialize( uri_or_string )
      @uri = uri_or_string
      @uri = URI.parse(@uri) if @uri.class == String

      # ...

    end

    # ...

    private
    def send( message )
      res = Net::HTTP.post_form( @uri, { "message" => message.to_json } )
      return JSON.parse( res.body )
    end
  end
end

Je place cette méthode en private car elle n'a d'intérêt que pour les méthodes que nous allons écrire ensuite.

Comme vous pouvez le voir, le retour est également une structure JSON.

"Handshake"

Si vous regardez précisément ce qui se passe lors de l'initialisation de la connexion du client Comet, vous verrez qu'en fait, il n'y a pas de connexion véritable, mais un handshake. Cela consiste en fait à se présenter au bus Comet et à obtenir de sa part, un identifiant. Nous devrons, par la suite toujours, utiliser cet identifiant pour communiquer avec le bus.

Le message du handshake doit contenir au moins les champs suivants :

Un channel est représenté comme un chemin sous la forme /foo/bar/... et doit systématiquement commencer par un /. Il en existe deux types : ceux définis par l'application et ceux réservés par le protocole. Ces derniers commencent toujours par /meta/. Dans le cas du handshake, le channel doit être /meta/handshake.

La version doit correspondre au numéro de version du protocole. 1.0 ici.

Le paramètre supportedConnectionTypes contient la liste des types de transports supportés. Bien que nous ayons dit plus haut que nous emploierons du long-polling, nous positionnerons la valeur ["long-polling", "callback-polling"]. Ceci simplement parce que si nous souhaitons implémenter le callback-polling nous n'aurons pas de modification à faire ;)

Voici donc à quoi ressemblera la méthode de handshake :

module Faye
  class Client
    def initialize( uri_or_string )
      @uri = uri_or_string
      @uri = URI.parse(@uri) if @uri.class == String
      @clientId = nil
      @interval = nil

      # ...

    end

    # ...

    def handshake
      id = Faye.random(32)
      message = {
        "channel" => Faye::Channel::HANDSHAKE,
        "version" => Faye::BAYEUX_VERSION,
        "supportedConnectionTypes" => [ "long-polling", "callback-polling" ],
        "id" => id
      }
      
      response = send( message )[0]
      if response["successful"] and response["id"] == id
        @clientId = response["clientId"]
        @interval = response["advice"]["interval"]
      else
        raise
      end
    end

    # ...

  end
end

Dans cette méthode j'utilise Faye::Channel::HANDSHAKE et Faye::BAYEUX_VERSION, deux constantes déclarées dans faye. Si vous souhaitez être indépendant de ce dernier, vous pouvez remplacer ces valeurs par celle que j'ai indiquée. Vous noterez également que j'ai ajouté dans le message le paramètre optionnel id. Ce paramètre permettra de valider, dans certains cas, que la réponse que nous recevons est bien celle attendu. En effet, si en retour nous retrouvons ce même ID c'est que, à priori tout va bien. Je dis bien "à priori"...

Je vous ai indiqué que la raison d'être de ce handshake était de récupérer un identifiant client. C'est ce que nous faisons en parsant la réponse. Cette réponse peut avoir deux structures différentes en fonctions ou non de la présence d'erreur. Je ne détaillerai pas la structure complète, retenez seulement quand dans le cas présent nous devons avoir récupéré les données suivantes :

Je ne parlerai pas de la valeur interval que je récupère simplement en pensant à une prochaine évolution de ce petit développement...

Connexion

La connexion se fait en envoyant un message contenant les données suivantes :

Voici donc comment nous pouvons implémenter cela :

module Faye
  class Client
    def initialize( uri_or_string )
      @uri = uri_or_string
      @uri = URI.parse(@uri) if @uri.class == String
      @clientId = nil
      @interval = nil

      # ...

    end

    # ...

    def connect
      id = Faye.random(32)
      message = {
        "channel" => Faye::Channel::CONNECT,
        "clientId" => @clientId,
        "connectionType" => "long-polling",
        "id" => id
      }
      r = send( message )
            
      # ...

    end
  end
end

A ce niveau, si vous faites un petit test, vous vous rendrez compte que la connexion attend une réponse de la part du serveur. En effet cette connexion attend que le bus Comet envoie des données. C'est ainsi que nous simulons le push ! Ce sont ces données que nous devrons donc traiter en fonction des souscriptions.

Pour savoir si le retour est exact, nous devons retrouver dans la structure JSON le champ successful à true et le champ id avec la même valeur que celle envoyée.

Si tout se passe bien, alors nous pouvons récupérer les données du message dans la structure data :

module Faye
  class Client
    def initialize( uri_or_string )
      @uri = uri_or_string
      @uri = URI.parse(@uri) if @uri.class == String
      @clientId = nil
      @interval = nil

      # ...

    end

    # ...

    def connect
      id = Faye.random(32)
      message = {
        "channel" => Faye::Channel::CONNECT,
        "clientId" => @clientId,
        "connectionType" => "long-polling",
        "id" => id
      }
      r = send( message )
            
      if r[0]["id"] == id and r[0]["successful"] == true

        # traitement du message contenu dans r[1]["data"] pour la souscription au channel r[1]["channel"]
        # ...

      elsif r[0]["successful"] == false

        # ...

      end
      
      # ...

    end

    # ...

  end
end

Nous verrons au paragraphe souscription comment traiter le message de réponse. Avant cela, détaillons ce que nous devons faire en cas d'échec. Le protocole Bayeux nous indique que dans un tel cas, il faut refaire un handshake puis refaire la connexion. Cependant, comme la connexion elle-même est bloquante, dans le sens où elle attend une réponse, il serait bon de l'isoler dans un thread afin de permettre le déroulement de notre programme. De plus, si la connexion est un succès, une fois le message traité, il faut en rouvrir une de façon à se remettre en attente d'un nouveau push du serveur. Pour cela nous pouvons placer la connexion dans une boucle infinie.

Voici donc comment nous allons gérer cela :

module Faye
  class Client
    def initialize( uri_or_string )
      @uri = uri_or_string
      @uri = URI.parse(@uri) if @uri.class == String
      @clientId = nil
      @interval = nil
      @connection = nil

      # ...

    end

    # ...

    def connect
      @connection.kill unless @connection.nil?
      @connection = Thread.new {
        faild = false
        while true
          id = Faye.random(32)
          message = {
            "channel" => Faye::Channel::CONNECT,
            "clientId" => @clientId,
            "connectionType" => "long-polling",
            "id" => id
          }
          r = send( message )
            
          if r[0]["id"] == id and r[0]["successful"] == true

            # traitement du message contenu dans r[1]["data"] pour la souscription au channel r[1]["channel"]
            # ...

          elsif r[0]["successful"] == false
            faild = true
            break
          end
        end
        
        if faild
          handshake()
          connect()
        end
      }
    end

    # ...

  end
end

Souscription

La souscription se fait pour un ou plusieurs channels avec, en second paramètre, la méthode qui doit être utilisée pour traiter les massages venant de ces channels. Nous avons déjà parlé des channels. Bien entendu dans le cas présent il s'agira de channels spécifiques à l'application. Concernant le callback, nous utiliserons un block.

Le message envoyé au bus Comet doit contenir les informations suivantes :

Comme vous pouvez le voir, nulle part nous ne stockons le callback à utiliser pour traiter les messages renvoyés. C'est normal puisque le bus Comet n'en à que faire puisqu'il seront exécutés côté client. Il nous appartient donc de les stocker. Nous ferons cela en mettant en place un hashage ayant pour clé le channel et comme valeur le block.

module Faye
  class Client    
    def initialize( uri_or_string )
      @uri = uri_or_string
      @uri = URI.parse(@uri) if @uri.class == String
      @clientId = nil
      @interval = nil
      @connection = nil
      @subscriptions = {}
    end

    # ...

    def subscribe( channels, &block )
      channels = [channels] unless channels.class == Array
      if block
        channels.each do |c|
          @subscriptions[c] = block
        end
      end
      message = {
        "channel" => Faye::Channel::SUBSCRIBE,
        "clientId" => @clientId,
        "subscription" => channels,
        "id" => Faye.random(32)
      }
      
      r = send(message)
    end

    # ...

  end
end

Je ne prends pas la peine de gérer le retour. Ce n'est pas bien !

Connexion

Nous pouvons maintenant revenir sur la connexion afin de traiter les messages. Il suffit donc de passer le message au block correspond au channel qui l'a poussé :

    # ...

    def connect
      @connection.kill unless @connection.nil?
      @connection = Thread.new {
        faild = false
        while true
          id = Faye.random(32)
          message = {
            "channel" => Faye::Channel::CONNECT,
            "clientId" => @clientId,
            "connectionType" => "long-polling",
            "id" => id
          }
          r = send( message )
            
          if r[0]["id"] == id and r[0]["successful"] == true
            @subscriptions[r[1]["channel"]].call( r[1]["data"])
          elsif r[0]["successful"] == false
            faild = true
            break
          end
        end
        
        if faild
          handshake()
          connect()
        end
      }
    end

    # ...

Publication

La publication se fait par l'envoi d'un message contenant les champs suivants :

Ceci nous donne donc le code suivant :

module Faye
  class Client

    # ...

    def publish( channel, data )
      message = [
        {
          "channel" => channel,
          "data" => data, 
          "clientId" => @clientId,
          "id" => Faye.random(32)
        }
      ]
      r = send(message)[0]
    end

    # ...

  end
end

Encore une fois, je ne prends pas le temps de traiter la réponse... Encore une fois, ce n'est pas bien !!!

... et leur contraire

Là où il y a connexion, il y a forcement "déconnexion", de même la où il y a souscription, il y a désabonnement. Je pense que vous avez compris le principe, et je me permets donc de vous livrer sans autre explication les méthodes correspondantes :

module Faye
  class Client

    # ...

    def disconnect
      unless @connection.nil?
        @connection.kill 
        message = {
          "channel" => Faye::Channel::DISCONNECT,
          "clientId" => @clientId,
          "id" => Faye.random(32)
        }
        r = send( message )
      end
    end

    # ...

    def unsubscribe( channels )
      channels = [channels] unless channels.class == Array
      channels.each do |c|
        @subscriptions.delete(c)
      end
      message = {
        "channel" => Faye::Channel::UNSUBSCRIBE,
        "clientId" => @clientId,
        "subscription" => channels,
        "id" => Faye.random(32)
      }
      
      r = send(message)
    end

    # ...

  end
end

Let's play !

Maintenant que nous avons notre librairie cliente, nous pouvons développer un petit client en ligne de commande pour soapbox.

x = Faye::Client.new( 'http://localhost:3000/comet' )

puts "-- handshake"
x.handshake

puts "-- subscriptions"
x.subscribe( "/mentioning/daemon" )

x.subscribe( "/from/greg" ) { |r|
  puts "#{r["user"]} : #{r["message"]}"
}

puts "-- connect"
x.connect

msg = ""  
while msg != "quit"
  msg = $stdin.readline.chomp
  unless msg == "quit"
    channel = "/from/daemon"
    data = { "user" => "daemon", "message" => msg }
    r = x.publish( channel, data )
    unless r["successful"]
      puts "=> Message not send !"
    end
  end
end

x.disconnect

Bon, OK, c'est très très rudimentaire, mais vous avez compris ! Et le principal c'est que cela fonctionne :

soapbox-console

A+

Attention, n'oubliez pas la remarque faite sur la page du projet faye : it's a toy. Donc, tout comme ce qui précède, n'utilisez pas cela en production4. Et bien cela s'applique, à plus forte raison, à ce qui est écrit ci-dessus. Maintenant, si vous êtes joueur, vous pouvez récupérer cela dans les sources de Capcode ou ici.

Notez que si je me suis attaché à faye, ce petit client devrait pouvoir fonctionner avec toute solution Comet respectant le protocole Bayeux.

1 Merci !
2 :P
3 Ca peut paraitre idiot, cela n'en reste pas moins vrai !
4 On vous aura prévenu !

Copyright © 2009 - 2011 Grégoire Lejeune.
All documents licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 2.5 License, except ones with specified licence.
Powered by Jekyll.