Skip to content

Commit

Permalink
Version update, use forwarded_ports, removed a lame infinite loop.
Browse files Browse the repository at this point in the history
  • Loading branch information
Takashi Matsuo committed Jun 19, 2014
1 parent c9d7e39 commit 18819d5
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 61 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

<properties>
<appengine.app.version>chat</appengine.app.version>
<appengine.target.version>1.8.9</appengine.target.version>
<appengine.target.version>1.9.6</appengine.target.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.google.appengine.demos.websocketchat.message.ChatMessage;
import com.google.appengine.demos.websocketchat.message.OutgoingMessage;
import com.google.appengine.demos.websocketchat.message.ParticipantListMessage;
import com.google.apphosting.api.ApiProxy;
import com.google.common.base.Throwables;
import com.google.gson.Gson;
import com.googlecode.objectify.Key;
Expand Down Expand Up @@ -71,8 +72,6 @@ public class ChatSocketServer extends WebSocketServer {

private ConcurrentLinkedQueue<String> updateAndSendParticipantListQueue;

private ConcurrentLinkedQueue<String> updateParticipantListQueue;

private ConcurrentLinkedQueue<OutgoingMessage> propagateQueue;

private String hostname;
Expand Down Expand Up @@ -121,13 +120,12 @@ public static class ChatServerBridge implements Runnable {

private static ChatServerBridge chatServerBridge;

private Map<String, Set<String>> globalParticipantsMap;

private CopyOnWriteArrayList<Key<ChatRoomParticipants>> chatRoomParticipantsKeyList;

private ApiProxy.Environment backgroundEnvironment;

private ChatServerBridge() {
namespace = NamespaceManager.get();
globalParticipantsMap = new ConcurrentHashMap<>();
chatRoomParticipantsKeyList = new CopyOnWriteArrayList<>();
}

Expand Down Expand Up @@ -155,8 +153,8 @@ private void removeWebSocketServerNode() throws IOException {
ofy().delete().key(key).now();
}

protected Set<String> getGlobalParticipantSet(String room) {
return globalParticipantsMap.get(room);
protected ApiProxy.Environment getBackgroundEnvironment() {
return backgroundEnvironment;
}

/**
Expand Down Expand Up @@ -191,7 +189,6 @@ public void stop() {
// delete participant list in the datastore
ofy().delete().keys(chatRoomParticipantsKeyList).now();
// initialize variables
globalParticipantsMap = new ConcurrentHashMap<>();
chatRoomParticipantsKeyList = new CopyOnWriteArrayList<>();
chatSocketServer = null;
} catch (IOException|InterruptedException e) {
Expand Down Expand Up @@ -219,23 +216,10 @@ private void updateParticipantListAndDistribute() throws IOException {
Set<String> participantSet = ChatRoomParticipants.getParticipants(room);
ParticipantListMessage participantListMessage = new ParticipantListMessage(room,
participantSet);
globalParticipantsMap.put(room, participantSet);
chatSocketServer.sendToClients(participantListMessage);
}
}

/**
* Pops a name of a chat room from the updateParticipantListQueue and creates the global
* participant list of that given chat room.
*/
private void updateParticipantList() {
if (! chatSocketServer.updateParticipantListQueue.isEmpty()) {
String room = chatSocketServer.updateParticipantListQueue.remove();
Set<String> participantSet = ChatRoomParticipants.getParticipants(room);
globalParticipantsMap.put(room, participantSet);
}
}

/**
* Propagate a message popped from the propagateQueue to other active server nodes.
*
Expand Down Expand Up @@ -292,13 +276,12 @@ public void onError(Exception ex) {
/**
* A main loop of this bridge thread.
*
* <p>The chat server requests us the following 3 things.</p>
* <p>The chat server requests us the following 2 things.</p>
* <ul>
* <li>Update and distribute the participant list in a particular chat room.</li>
* <li>Aggregate the global participant list for a particular chat room.</li>
* <li>Propagate a message to other active server nodes.</li>
* </ul>
* <p>This thread watches the 3 queue on the ChatSocketServer instance,
* <p>This thread watches the 2 queues on the ChatSocketServer instance,
* and handles those requests in the main loop.</p>
* <p>If this loop becomes the performance bottleneck, distribute these work loads into
* multiple thread worker.</p>
Expand All @@ -310,14 +293,16 @@ public void run() {
watcherThread = Thread.currentThread();
LOG.info("Namespace is set to " + namespace + " in thread " + watcherThread.toString());
NamespaceManager.set(namespace);
// Store the environment for later use.
backgroundEnvironment = ApiProxy.getCurrentEnvironment();

while (true) {
if (Thread.currentThread().isInterrupted()) {
LOG.info("ChatServerBridge is stopping.");
return;
} else {
try {
updateParticipantListAndDistribute();
updateParticipantList();
propagateOneMessage();
Thread.sleep(100);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -350,7 +335,6 @@ public ChatSocketServer(int port) {
super(new InetSocketAddress(port));
metaInfoManager = new MetaInfoManager();
updateAndSendParticipantListQueue = new ConcurrentLinkedQueue<>();
updateParticipantListQueue = new ConcurrentLinkedQueue<>();
propagateQueue = new ConcurrentLinkedQueue<>();
}

Expand Down Expand Up @@ -389,28 +373,6 @@ public void onClose(WebSocket conn, int code, String reason, boolean remote) {
}
}

private Set<String> getParticipants(String room) {
// This infinite loop is a workaround for the difficulties for accessing datastore from
// a thread started with a normal Thread interface rather than App Engine's ThreadManager.
// TODO: Find a way to enable datastore access in such threads and stop this somewhat
// dangerous infinite loop.
Set<String> participantSet = ChatServerBridge.getInstance().getGlobalParticipantSet(room);
if (participantSet != null) {
return participantSet;
}
updateParticipantListQueue.add(room);
while(participantSet == null) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.warning(Throwables.getStackTraceAsString(e));
Thread.currentThread().interrupt();
}
participantSet = ChatServerBridge.getInstance().getGlobalParticipantSet(room);
}
return participantSet;
}

/**
* Handles incoming messages.
*
Expand All @@ -425,10 +387,12 @@ private Set<String> getParticipants(String room) {
public void onMessage(WebSocket conn, String rawMessage) {
// TODO: Make it threadsafe
LOG.info(conn + ": " + rawMessage);
ApiProxy.setEnvironmentForCurrentThread(
ChatServerBridge.getInstance().getBackgroundEnvironment());
ChatMessage message = GSON.fromJson(rawMessage, ChatMessage.class);
if (message.getType().equals(OutgoingMessage.MessageType.ENTER)) {
// Check if there's a participant with the same name in the room.
Set<String> participantSet = getParticipants(message.getRoom());
Set<String> participantSet = ChatRoomParticipants.getParticipants(message.getRoom());
if (participantSet.contains(message.getName())) {
// Adding a trailing underscore until the conflict resolves.
String newName = message.getName() + "_";
Expand Down Expand Up @@ -474,9 +438,6 @@ public void sendToClients(OutgoingMessage message) {
ParticipantListMessage.class);
if (participantListMessage.getType().equals(OutgoingMessage.MessageType.PARTICIPANTS)) {
LOG.info("ParticipantList arrived for the room:" + message.getRoom());
if (! updateParticipantListQueue.contains(message.getRoom())) {
updateParticipantListQueue.add(message.getRoom());
}
}
}
Collection<WebSocket> webSocketConnections = connections();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import com.google.appengine.api.LifecycleManager;
import com.google.appengine.api.NamespaceManager;
import com.google.appengine.api.utils.SystemProperty;
import com.google.appengine.demos.websocketchat.server.ChatSocketServer;
import com.google.appengine.demos.websocketchat.domain.ChatRoomParticipants;
import com.google.appengine.demos.websocketchat.domain.WebSocketServerNode;
import com.google.appengine.demos.websocketchat.server.ChatSocketServer;
import com.google.appengine.demos.websocketchat.server.ChatSocketServerShutdownHook;
import com.googlecode.objectify.ObjectifyService;

Expand All @@ -37,15 +37,11 @@
*/
public class StartupServlet extends HttpServlet {

private static final Logger LOG = Logger.getLogger(StartupServlet.class.getName());

static {
ObjectifyService.register(WebSocketServerNode.class);
ObjectifyService.register(ChatRoomParticipants.class);
}

private static final Logger LOG = Logger.getLogger(StartupServlet.class.getName());

protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
LOG.info("The startup servlet called.");
LifecycleManager.getInstance().setShutdownHook(new ChatSocketServerShutdownHook());
String version = SystemProperty.applicationVersion.get();
Expand All @@ -55,6 +51,10 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response)
ChatSocketServer.ChatServerBridge chatServerBridge =
ChatSocketServer.ChatServerBridge.getInstance();
chatServerBridge.start();
}

protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
response.setStatus(204);
}
}
1 change: 1 addition & 0 deletions src/main/webapp/WEB-INF/appengine-web.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<vm>true</vm>
<vm-settings>
<setting name="machine_type" value="n1-standard-1"/>
<setting name="forwarded_ports" value="65080"/>
</vm-settings>
<manual-scaling>
<instances>3</instances>
Expand Down
13 changes: 12 additions & 1 deletion src/main/webapp/WEB-INF/web.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
</security-constraint>
<servlet>
<servlet-name>startup</servlet-name>
<servlet-class>com.google.appengine.demos.websocketchat.servlet.StartupServlet</servlet-class>
<servlet-class>com.google.appengine.demos.websocketchat.servlet.StartupServlet
</servlet-class>
<load-on-startup>0</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>startup</servlet-name>
Expand All @@ -55,4 +57,13 @@
<welcome-file-list>
<welcome-file>chat.jsp</welcome-file>
</welcome-file-list>
<security-constraint>
<web-resource-collection>
<web-resource-name>top</web-resource-name>
<url-pattern>/</url-pattern>
</web-resource-collection>
<auth-constraint>
<role-name>*</role-name>
</auth-constraint>
</security-constraint>
</web-app>

0 comments on commit 18819d5

Please sign in to comment.