Java の Comet でサーバープッシュを行う(1)

まず、server.xml の protocol を org.apache.coyote.http11.Http11NioProtocol に変更します。

    <Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol" 
               connectionTimeout="20000" 
               redirectPort="8443" />

以下のような Servlet を用意します。CometProcessor インターフェースの event メソッドが、ブラウザからの接続を待ち受ける場所となります。

public class CometServlet extends HttpServlet implements CometProcessor {

  private static final long serialVersionUID = -3319969086920456957L;

  protected transient ConcurrentHashMap<HttpServletResponse, String> connections =
    null;

  protected transient MessageSender sender = null;

  @Override
  public void init() {
    connections = new ConcurrentHashMap<HttpServletResponse, String>();

    this.sender = new MessageSender();
    this.sender.setDaemon(true);
    this.sender.start();

    getServletContext().setAttribute("MessageSender", sender);

  }

  @Override
  public void destroy() {
    sender.quit();
    sender = null;
    connections.clear();
  }

  /**
   * @param event
   * @throws IOException
   * @throws ServletException
   */
  @Override
  public void event(CometEvent event) throws IOException, ServletException {

    try {
      HttpServletRequest request = event.getHttpServletRequest();
      HttpServletResponse response = event.getHttpServletResponse();

      switch (event.getEventType()) {
        case BEGIN:
          open(request, response);
          event.setTimeout(60 * 60 * 1000);
          break;
        case READ:
          open(request, response);
          event.setTimeout(60 * 60 * 1000);
          break;
        case END:
          close(response);
          event.close();
          break;
        case ERROR:
          close(response);
          event.close();
          break;
        default:
          break;
      }
    } catch (Throwable t) {
      log(t.getMessage(), t);
    }
  }

  protected void open(HttpServletRequest request, HttpServletResponse response) {
    String username = (String) request.getSession().getAttribute("username");
    if (username != null) {
      connections.put(response, username);
    }
  }

  protected void close(HttpServletResponse response) {
    try {
      if (!response.isCommitted()) {
        response.setContentType("text/json");
        response.setCharacterEncoding("UTF-8");
        ServletOutputStream os = response.getOutputStream();
        os.write("{}".getBytes("utf-8"));
        os.flush();
        os.close();
      }
    } catch (Throwable t) {
      log(t.getMessage(), t);
    } finally {
      try {
        connections.remove(response);
      } catch (Throwable ignore) {
        //
      }
    }
  }

  public class Message {

    private final List<String> recipients;

    private final String message;

    public Message(List<String> recipients, String message) {
      this.recipients = recipients;
      this.message = message;
    }

    /**
     * @return message
     */
    public String getMessage() {
      return message;
    }

    /**
     * @return recipients
     */
    public List<String> getRecipients() {
      return recipients;
    }

  }

  public class MessageSender extends Thread {

    private final BlockingQueue<Message> messages =
      new LinkedBlockingQueue<Message>();

    protected boolean running = true;

    public void quit() {
      running = false;
      messages.clear();
      this.interrupt();
    }

    public synchronized void sendMessage(List<String> recipients, String message) {
      try {
        messages.put(new Message(recipients, message));
      } catch (InterruptedException e) {
        //
      }
    }

    @Override
    public void run() {
      while (running) {
        try {
          Message message = messages.poll(1000, TimeUnit.SECONDS);
          if (message != null) {
            Iterator<Entry<HttpServletResponse, String>> iterator =
              connections.entrySet().iterator();

            while (iterator.hasNext()) {
              Entry<HttpServletResponse, String> next = iterator.next();
              if (message.getRecipients().contains(next.getValue())) {
                HttpServletResponse response = next.getKey();
                try {
                  if (!response.isCommitted()) {
                    response.setContentType("text/json");
                    response.setCharacterEncoding("UTF-8");
                    ServletOutputStream os = response.getOutputStream();
                    os.write(message.getMessage().getBytes("utf-8"));
                    os.flush();
                    os.close();
                  }
                } catch (Throwable t) {
                  log(t.getMessage(), t);
                } finally {
                  try {
                    connections.remove(response);
                  } catch (Throwable ignore) {
                    //
                  }
                }
              }
            }
          }
          try {
            Thread.sleep(100);
          } catch (Throwable ignore) {
            //
          }
        } catch (Throwable t) {
          log(t.getMessage(), t);
          //
        }
      }
    }
  }
}

Java の Comet でサーバープッシュを行う(2)へつづく。