Index: src/java/main/org/apache/zookeeper/KeeperException.java =================================================================== --- src/java/main/org/apache/zookeeper/KeeperException.java (revision 1446831) +++ src/java/main/org/apache/zookeeper/KeeperException.java (working copy) @@ -131,7 +131,9 @@ return new SessionMovedException(); case NOTREADONLY: return new NotReadOnlyException(); - + case EPHEMERALONLOCALSESSION: + return new EphemeralOnLocalSessionException(); + case OK: default: throw new IllegalArgumentException("Invalid exception code"); @@ -313,7 +315,7 @@ OPERATIONTIMEOUT (OperationTimeout), /** Invalid arguments */ BADARGUMENTS (BadArguments), - + /** API errors. * This is never thrown by the server, it shouldn't be used other than * to indicate a range. Specifically error codes greater than this @@ -345,7 +347,11 @@ /** Session moved to another server, so operation is ignored */ SESSIONMOVED (-118), /** State-changing request is passed to read-only server */ - NOTREADONLY (-119); + NOTREADONLY (-119), + /** Attempt to create ephemeral node on a local session */ + EPHEMERALONLOCALSESSION (-120), + /** Unknown session (internal server use only) */ + UNKNOWNSESSION (-122); private static final Map lookup = new HashMap(); @@ -422,6 +428,8 @@ return "Session moved"; case NOTREADONLY: return "Not a read-only call"; + case EPHEMERALONLOCALSESSION: + return "Ephemeral node on local session"; default: return "Unknown error " + code; } @@ -482,7 +490,7 @@ * If this exception was thrown by a multi-request then the (partial) results * and error codes can be retrieved using this getter. * @return A copy of the list of results from the operations in the multi-request. - * + * * @since 3.4.0 * */ @@ -663,7 +671,16 @@ super(Code.SESSIONEXPIRED); } } - + + /** + * @see Code#UNKNOWNSESSION + */ + public static class UnknownSessionException extends KeeperException { + public UnknownSessionException() { + super(Code.UNKNOWNSESSION); + } + } + /** * @see Code#SESSIONMOVED */ @@ -683,6 +700,15 @@ } /** + * @see Code#EPHEMERALONLOCALSESSION + */ + public static class EphemeralOnLocalSessionException extends KeeperException { + public EphemeralOnLocalSessionException() { + super(Code.EPHEMERALONLOCALSESSION); + } + } + + /** * @see Code#SYSTEMERROR */ public static class SystemErrorException extends KeeperException { Index: src/java/main/org/apache/zookeeper/cli/CreateCommand.java =================================================================== --- src/java/main/org/apache/zookeeper/cli/CreateCommand.java (revision 1446831) +++ src/java/main/org/apache/zookeeper/cli/CreateCommand.java (working copy) @@ -32,16 +32,16 @@ private static Options options = new Options(); private String[] args; private CommandLine cl; - + { options.addOption(new Option("e", false, "ephemeral")); options.addOption(new Option("s", false, "sequential")); } - + public CreateCommand() { super("create", "[-s] [-e] path [data] [acl]"); } - + @Override public CliCommand parse(String[] cmdArgs) throws ParseException { @@ -54,7 +54,7 @@ return this; } - + @Override public boolean exec() throws KeeperException, InterruptedException { CreateMode flags = CreateMode.PERSISTENT; @@ -74,8 +74,13 @@ if (args.length > 3) { acl = AclParser.parse(args[3]); } - String newPath = zk.create(path, data, acl, flags); - err.println("Created " + newPath); + try { + String newPath = zk.create(path, data, acl, flags); + err.println("Created " + newPath); + } catch(KeeperException.EphemeralOnLocalSessionException e) { + err.println("Unable to create ephemeral node on a local session"); + return false; + } return true; } } Index: src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java =================================================================== --- src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (revision 1446831) +++ src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (working copy) @@ -94,8 +94,13 @@ if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, traceMask, 'E', request, ""); } - ProcessTxnResult rc = null; + ProcessTxnResult rc = new ProcessTxnResult(); synchronized (zks.outstandingChanges) { + // Need to process local session requests + rc = zks.processTxn(request); + + // request.hdr is set for write requests, which are the only ones + // that add to outstandingChanges. if (request.getHdr() != null) { TxnHeader hdr = request.getHdr(); Record txn = request.getTxn(); @@ -111,16 +116,15 @@ zks.outstandingChangesForPath.remove(cr.path); } } - - rc = zks.processTxn(hdr, txn); } + // do not add non quorum packets to the queue. - if (Request.isQuorum(request.type)) { + if (request.isQuorum()) { zks.getZKDatabase().addCommittedProposal(request); } } - if (request.getHdr() != null && request.getHdr().getType() == OpCode.closeSession) { + if (request.type == OpCode.closeSession) { ServerCnxnFactory scxn = zks.getServerCnxnFactory(); // this might be possible since // we might just be playing diffs from the leader @@ -145,8 +149,19 @@ Record rsp = null; try { if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) { - throw KeeperException.create(KeeperException.Code.get(( - (ErrorTxn) request.getTxn()).getErr())); + /* + * When local session upgrading is disabled, leader will + * reject the ephemeral node creation due to session expire. + * However, if this is the follower that issue the request, + * it will have the correct error code, so we should use that + * and report to user + */ + if (request.getException() != null) { + throw request.getException(); + } else { + throw KeeperException.create(KeeperException.Code + .get(((ErrorTxn) request.getTxn()).getErr())); + } } KeeperException ke = request.getException(); Index: src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java =================================================================== --- src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java (revision 1446831) +++ src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java (working copy) @@ -337,11 +337,23 @@ switch (type) { case OpCode.create: { - zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); CreateRequest createRequest = (CreateRequest)record; if (deserialize) { ByteBufferInputStream.byteBuffer2Record(request.request, createRequest); } + CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags()); + if (createMode.isEphemeral()) { + // Exception is set when local session failed to upgrade + // so we just need to report the error + if (request.getException() != null) { + throw request.getException(); + } + zks.sessionTracker.checkGlobalSession(request.sessionId, + request.getOwner()); + } else { + zks.sessionTracker.checkSession(request.sessionId, + request.getOwner()); + } String path = createRequest.getPath(); String parentPath = validatePathForCreate(path, request.sessionId); @@ -353,7 +365,6 @@ checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo); int parentCVersion = parentRecord.stat.getCversion(); - CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags()); if (createMode.isSequential()) { path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion); } @@ -390,11 +401,23 @@ break; } case OpCode.create2: { - zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); Create2Request createRequest = (Create2Request)record; if (deserialize) { ByteBufferInputStream.byteBuffer2Record(request.request, createRequest); } + CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags()); + if (createMode.isEphemeral()) { + // Exception is set when local session failed to upgrade + // so we just need to report the error + if (request.getException() != null) { + throw request.getException(); + } + zks.sessionTracker.checkGlobalSession(request.sessionId, + request.getOwner()); + } else { + zks.sessionTracker.checkSession(request.sessionId, + request.getOwner()); + } String path = createRequest.getPath(); String parentPath = validatePathForCreate(path, request.sessionId); @@ -406,7 +429,6 @@ checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo); int parentCVersion = parentRecord.stat.getCversion(); - CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags()); if (createMode.isSequential()) { path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion); } @@ -666,7 +688,10 @@ //create/close session don't require request record case OpCode.createSession: case OpCode.closeSession: - pRequest2Txn(request.type, zks.getNextZxid(), request, null, true); + if (!request.isLocalSession()) { + pRequest2Txn(request.type, zks.getNextZxid(), request, + null, true); + } break; //All the rest don't need to create a Txn - just verify session Index: src/java/main/org/apache/zookeeper/server/Request.java =================================================================== --- src/java/main/org/apache/zookeeper/server/Request.java (revision 1446831) +++ src/java/main/org/apache/zookeeper/server/Request.java (working copy) @@ -80,6 +80,19 @@ private KeeperException e; + /** + * If this is a create or close request for a local-only session. + */ + private boolean isLocalSession = false; + + public boolean isLocalSession() { + return isLocalSession; + } + + public void setLocalSession(boolean isLocalSession) { + this.isLocalSession = isLocalSession; + } + public Object getOwner() { return owner; } @@ -116,48 +129,49 @@ switch (type) { case OpCode.notification: return false; + case OpCode.check: + case OpCode.closeSession: case OpCode.create: case OpCode.create2: - case OpCode.delete: case OpCode.createSession: + case OpCode.delete: case OpCode.exists: - case OpCode.getData: - case OpCode.check: - case OpCode.multi: - case OpCode.setData: - case OpCode.sync: case OpCode.getACL: - case OpCode.setACL: case OpCode.getChildren: case OpCode.getChildren2: + case OpCode.getData: + case OpCode.multi: case OpCode.ping: - case OpCode.closeSession: + case OpCode.setACL: + case OpCode.setData: case OpCode.setWatches: + case OpCode.sync: return true; default: return false; } } - static boolean isQuorum(int type) { - switch (type) { + public boolean isQuorum() { + switch (this.type) { case OpCode.exists: case OpCode.getACL: case OpCode.getChildren: case OpCode.getChildren2: case OpCode.getData: return false; - case OpCode.error: - case OpCode.closeSession: case OpCode.create: case OpCode.create2: - case OpCode.createSession: + case OpCode.error: case OpCode.delete: case OpCode.setACL: case OpCode.setData: case OpCode.check: case OpCode.multi: return true; + case OpCode.closeSession: + case OpCode.createSession: + return !this.isLocalSession; default: return false; } Index: src/java/main/org/apache/zookeeper/server/SessionTracker.java =================================================================== --- src/java/main/org/apache/zookeeper/server/SessionTracker.java (revision 1446831) +++ src/java/main/org/apache/zookeeper/server/SessionTracker.java (working copy) @@ -44,7 +44,22 @@ long createSession(int sessionTimeout); - void addSession(long id, int to); + /** + * Add a global session to those being tracked. + * @param id sessionId + * @param to sessionTimeout + * @return whether the session was newly added (if false, already existed) + */ + boolean addGlobalSession(long id, int to); + + /** + * Add a session to those being tracked. The session is added as a local + * session if they are enabled, otherwise as global. + * @param id sessionId + * @param to sessionTimeout + * @return whether the session was newly added (if false, already existed) + */ + boolean addSession(long id, int to); /** * @param sessionId @@ -60,7 +75,7 @@ void setSessionClosing(long sessionId); /** - * + * */ void shutdown(); @@ -69,7 +84,39 @@ */ void removeSession(long sessionId); - void checkSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, SessionMovedException; + /** + * @param sessionId + * @return whether or not the SessionTracker is aware of this session + */ + boolean isTrackingSession(long sessionId); + + /** + * Checks whether the SessionTracker is aware of this session, the session + * is still active, and the owner matches. If the owner wasn't previously + * set, this sets the owner of the session. + * + * UnknownSessionException should never been thrown to the client. It is + * only used internally to deal with possible local session from other + * machine + * + * @param sessionId + * @param owner + */ + public void checkSession(long sessionId, Object owner) + throws KeeperException.SessionExpiredException, + KeeperException.SessionMovedException, + KeeperException.UnknownSessionException; + + /** + * Strictly check that a given session is a global session or not + * @param sessionId + * @param owner + * @throws KeeperException.SessionExpiredException + * @throws KeeperException.SessionMovedException + */ + public void checkGlobalSession(long sessionId, Object owner) + throws KeeperException.SessionExpiredException, + KeeperException.SessionMovedException; void setOwner(long id, Object owner) throws SessionExpiredException; Index: src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java =================================================================== --- src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java (revision 1446831) +++ src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java (working copy) @@ -20,18 +20,15 @@ import java.io.PrintWriter; import java.io.StringWriter; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.SessionExpiredException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This is a full featured SessionTracker. It tracks session in grouped by tick @@ -47,8 +44,7 @@ private final ExpiryQueue sessionExpiryQueue; - private final ConcurrentHashMap sessionsWithTimeout; - private final long serverId; + private final ConcurrentMap sessionsWithTimeout; private final AtomicLong nextSessionId = new AtomicLong(); public static class SessionImpl implements Session { @@ -73,6 +69,10 @@ } } + /** + * Generates an initial sessionId. High order byte is serverId, next 5 + * 5 bytes are from timestamp, and low order 2 bytes are 0s. + */ public static long initializeNextSession(long id) { long nextSid = 0; nextSid = (System.currentTimeMillis() << 24) >> 8; @@ -83,15 +83,14 @@ private final SessionExpirer expirer; public SessionTrackerImpl(SessionExpirer expirer, - ConcurrentHashMap sessionsWithTimeout, int tickTime, - long sid) + ConcurrentMap sessionsWithTimeout, int tickTime, + long serverId) { super("SessionTracker"); this.expirer = expirer; this.sessionExpiryQueue = new ExpiryQueue(tickTime); this.sessionsWithTimeout = sessionsWithTimeout; - this.serverId = sid; - this.nextSessionId.set(initializeNextSession(sid)); + this.nextSessionId.set(initializeNextSession(serverId)); for (Entry e : sessionsWithTimeout.entrySet()) { addSession(e.getKey(), e.getValue()); } @@ -103,7 +102,7 @@ pwriter.print("Session "); sessionExpiryQueue.dump(pwriter); } - + @Override public String toString() { StringWriter sw = new StringWriter(); @@ -151,6 +150,10 @@ return true; } + public int getSessionTimeout(long sessionId) { + return sessionsWithTimeout.get(sessionId); + } + synchronized public void setSessionClosing(long sessionId) { if (LOG.isTraceEnabled()) { LOG.info("Session closing: 0x" + Long.toHexString(sessionId)); @@ -192,31 +195,49 @@ return sessionId; } - synchronized public void addSession(long id, int sessionTimeout) { + public boolean addGlobalSession(long id, int sessionTimeout) { + return addSession(id, sessionTimeout); + } + + public synchronized boolean addSession(long id, int sessionTimeout) { + boolean added = false; + sessionsWithTimeout.put(id, sessionTimeout); if (sessionsById.get(id) == null) { SessionImpl s = new SessionImpl(id, sessionTimeout); sessionsById.put(id, s); - if (LOG.isTraceEnabled()) { - ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, - "SessionTrackerImpl --- Adding session 0x" - + Long.toHexString(id) + " " + sessionTimeout); - } - } else { - if (LOG.isTraceEnabled()) { - ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, - "SessionTrackerImpl --- Existing session 0x" - + Long.toHexString(id) + " " + sessionTimeout); - } + added = true; + LOG.debug("Adding session 0x" + Long.toHexString(id)); + } + if (LOG.isTraceEnabled()) { + String actionStr = added ? "Adding" : "Existing"; + ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, + "SessionTrackerImpl --- " + actionStr + " session 0x" + + Long.toHexString(id) + " " + sessionTimeout); } touchSession(id, sessionTimeout); + return added; + } + + public boolean isTrackingSession(long sessionId) { + return sessionsById.containsKey(sessionId); } - synchronized public void checkSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, KeeperException.SessionMovedException { + public synchronized void checkSession(long sessionId, Object owner) + throws KeeperException.SessionExpiredException, + KeeperException.SessionMovedException, + KeeperException.UnknownSessionException { + LOG.debug("Checking session 0x" + Long.toHexString(sessionId)); SessionImpl session = sessionsById.get(sessionId); - if (session == null || session.isClosing()) { + + if (session == null) { + throw new KeeperException.UnknownSessionException(); + } + + if (session.isClosing()) { throw new KeeperException.SessionExpiredException(); } + if (session.owner == null) { session.owner = owner; } else if (session.owner != owner) { @@ -231,4 +252,14 @@ } session.owner = owner; } + + public void checkGlobalSession(long sessionId, Object owner) + throws KeeperException.SessionExpiredException, + KeeperException.SessionMovedException { + try { + checkSession(sessionId, owner); + } catch (KeeperException.UnknownSessionException e) { + throw new KeeperException.SessionExpiredException(); + } + } } Index: src/java/main/org/apache/zookeeper/server/TraceFormatter.java =================================================================== --- src/java/main/org/apache/zookeeper/server/TraceFormatter.java (revision 1446831) +++ src/java/main/org/apache/zookeeper/server/TraceFormatter.java (working copy) @@ -29,7 +29,7 @@ public class TraceFormatter { - static String op2String(int op) { + public static String op2String(int op) { switch (op) { case OpCode.notification: return "notification"; Index: src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java =================================================================== --- src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (revision 1446831) +++ src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (working copy) @@ -263,6 +263,10 @@ return hzxid.get(); } + public SessionTracker getSessionTracker() { + return sessionTracker; + } + long getNextZxid() { return hzxid.incrementAndGet(); } @@ -276,7 +280,9 @@ } private void close(long sessionId) { - submitRequest(null, sessionId, OpCode.closeSession, 0, null, null); + Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null); + setLocalSessionFlag(si); + submitRequest(si); } public void closeSession(long sessionId) { @@ -385,7 +391,7 @@ sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, 1); } - + protected void startSessionTracker() { ((SessionTrackerImpl)sessionTracker).start(); } @@ -494,13 +500,19 @@ } long createSession(ServerCnxn cnxn, byte passwd[], int timeout) { + if (passwd == null) { + // Possible since it's just deserialized from a packet on the wire. + passwd = new byte[0]; + } long sessionId = sessionTracker.createSession(timeout); Random r = new Random(sessionId ^ superSecret); r.nextBytes(passwd); ByteBuffer to = ByteBuffer.allocate(4); to.putInt(timeout); cnxn.setSessionId(sessionId); - submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null); + Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null); + setLocalSessionFlag(si); + submitRequest(si); return sessionId; } @@ -530,6 +542,8 @@ if (checkPasswd(sessionId, passwd)) { revalidateSession(cnxn, sessionId, sessionTimeout); } else { + LOG.warn("Incorrect password from " + cnxn.getRemoteSocketAddress() + + " for session 0x" + Long.toHexString(sessionId)); finishSessionInit(cnxn, false); } } @@ -594,15 +608,13 @@ } /** - * @param cnxn - * @param sessionId - * @param xid - * @param bb + * If the underlying Zookeeper server support local session, this method + * will set a isLocalSession to true if a request is associated with + * a local session. + * + * @param si */ - private void submitRequest(ServerCnxn cnxn, long sessionId, int type, - int xid, ByteBuffer bb, List authInfo) { - Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo); - submitRequest(si); + protected void setLocalSessionFlag(Request si) { } public void submitRequest(Request si) { @@ -888,6 +900,9 @@ Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); si.setOwner(ServerCnxn.me); + // Always treat packet from the client as a possible + // local request. + setLocalSessionFlag(si); submitRequest(si); } } @@ -935,17 +950,36 @@ // wrap SASL response token to client inside a Response object. return new SetSASLResponse(responseToken); } - + + // entry point for quorum/Learner.java public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { + return processTxn(null, hdr, txn); + } + + // entry point for FinalRequestProcessor.java + public ProcessTxnResult processTxn(Request request) { + return processTxn(request, request.getHdr(), request.getTxn()); + } + + private ProcessTxnResult processTxn(Request request, TxnHeader hdr, + Record txn) { ProcessTxnResult rc; - int opCode = hdr.getType(); - long sessionId = hdr.getClientId(); - rc = getZKDatabase().processTxn(hdr, txn); + int opCode = request != null ? request.type : hdr.getType(); + long sessionId = request != null ? request.sessionId : hdr.getClientId(); + if (hdr != null) { + rc = getZKDatabase().processTxn(hdr, txn); + } else { + rc = new ProcessTxnResult(); + } if (opCode == OpCode.createSession) { - if (txn instanceof CreateSessionTxn) { + if (hdr != null && txn instanceof CreateSessionTxn) { CreateSessionTxn cst = (CreateSessionTxn) txn; - sessionTracker.addSession(sessionId, cst - .getTimeOut()); + sessionTracker.addGlobalSession(sessionId, cst.getTimeOut()); + } else if (request != null && request.isLocalSession()) { + request.request.rewind(); + int timeout = request.request.getInt(); + request.request.rewind(); + sessionTracker.addSession(request.sessionId, timeout); } else { LOG.warn("*****>>>>> Got " + txn.getClass() + " " Index: src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java =================================================================== --- src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (revision 1446831) +++ src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (working copy) @@ -133,11 +133,12 @@ case OpCode.setData: case OpCode.multi: case OpCode.setACL: - case OpCode.createSession: - case OpCode.closeSession: return true; case OpCode.sync: - return matchSyncs; + return matchSyncs; + case OpCode.createSession: + case OpCode.closeSession: + return !request.isLocalSession(); default: return false; } Index: src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java =================================================================== --- src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java (revision 1446831) +++ src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java (working copy) @@ -18,15 +18,21 @@ package org.apache.zookeeper.server.quorum; +import java.io.IOException; import java.util.concurrent.LinkedBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.proto.CreateRequest; +import org.apache.zookeeper.server.ByteBufferInputStream; import org.apache.zookeeper.server.RequestProcessor; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.ZooTrace; +import org.apache.zookeeper.txn.ErrorTxn; /** * This RequestProcessor forwards any requests that modify the state of the @@ -82,12 +88,28 @@ case OpCode.delete: case OpCode.setData: case OpCode.setACL: - case OpCode.createSession: - case OpCode.closeSession: case OpCode.multi: case OpCode.check: + try { + zks.getSessionTracker().setOwner(request.sessionId, request.getOwner()); + } catch (KeeperException e) { + /* + * We cannot do anything when session is already expired + * the request need to go through, otherwise the commit + * processor will get stuck + */ + LOG.warn("Session already expired 0x" + + Long.toHexString(request.sessionId)); + } zks.getFollower().request(request); break; + case OpCode.createSession: + case OpCode.closeSession: + // Don't forward local sessions to the leader. + if (!request.isLocalSession()) { + zks.getFollower().request(request); + } + break; } } } catch (Exception e) { @@ -98,6 +120,25 @@ public void processRequest(Request request) { if (!finished) { + // Before sending the request, check if the request requires a + // global session and what we have is a local session. If so do + // an upgrade. + Request upgradeRequest = null; + try { + upgradeRequest = zks.checkUpgradeSession(request); + } catch (KeeperException ke) { + if (request.getHdr() != null) { + request.getHdr().setType(OpCode.error); + request.setTxn(new ErrorTxn(ke.code().intValue())); + } + request.setException(ke); + LOG.info("Error creating upgrade request", ke); + } catch (IOException ie) { + LOG.error("Unexpected error in upgrade", ie); + } + if (upgradeRequest != null) { + queuedRequests.add(upgradeRequest); + } queuedRequests.add(request); } } Index: src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java =================================================================== --- src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java (revision 0) +++ src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java (working copy) @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.server.quorum; + +import java.io.PrintWriter; +import java.util.concurrent.ConcurrentMap; + +import org.apache.zookeeper.KeeperException.SessionExpiredException; +import org.apache.zookeeper.KeeperException.SessionMovedException; +import org.apache.zookeeper.KeeperException.UnknownSessionException; +import org.apache.zookeeper.server.SessionTrackerImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The leader session tracker tracks local and global sessions on the leader. + */ +public class LeaderSessionTracker extends UpgradeableSessionTracker { + private static final Logger LOG = LoggerFactory.getLogger(LeaderSessionTracker.class); + + private final boolean localSessionsEnabled; + private final SessionTrackerImpl globalSessionTracker; + + /** + * Server id of the leader + */ + private final long serverId; + + public LeaderSessionTracker(SessionExpirer expirer, + ConcurrentMap sessionsWithTimeouts, + int tickTime, long id, boolean localSessionsEnabled) { + + this.globalSessionTracker = new SessionTrackerImpl( + expirer, sessionsWithTimeouts, tickTime, id); + + this.localSessionsEnabled = localSessionsEnabled; + if (this.localSessionsEnabled) { + createLocalSessionTracker(expirer, tickTime, id); + } + serverId = id; + } + + public void removeSession(long sessionId) { + if (localSessionTracker != null) { + localSessionTracker.removeSession(sessionId); + } + globalSessionTracker.removeSession(sessionId); + } + + public void start() { + globalSessionTracker.start(); + if (localSessionTracker != null) { + localSessionTracker.start(); + } + } + + public void shutdown() { + if (localSessionTracker != null) { + localSessionTracker.shutdown(); + } + globalSessionTracker.shutdown(); + } + + public boolean isGlobalSession(long sessionId) { + return globalSessionTracker.isTrackingSession(sessionId); + } + + public boolean addGlobalSession(long sessionId, int sessionTimeout) { + boolean added = + globalSessionTracker.addSession(sessionId, sessionTimeout); + if (localSessionsEnabled && added) { + // Only do extra logging so we know what kind of session this is + // if we're supporting both kinds of sessions + LOG.info("Adding global session 0x" + Long.toHexString(sessionId)); + } + return added; + } + + public boolean addSession(long sessionId, int sessionTimeout) { + boolean added; + if (localSessionsEnabled && !isGlobalSession(sessionId)) { + added = localSessionTracker.addSession(sessionId, sessionTimeout); + // Check for race condition with session upgrading + if (isGlobalSession(sessionId)) { + added = false; + localSessionTracker.removeSession(sessionId); + } else if (added) { + LOG.info("Adding local session 0x" + Long.toHexString(sessionId)); + } + } else { + added = addGlobalSession(sessionId, sessionTimeout); + } + return added; + } + + public boolean touchSession(long sessionId, int sessionTimeout) { + if (localSessionTracker != null && + localSessionTracker.touchSession(sessionId, sessionTimeout)) { + return true; + } + return globalSessionTracker.touchSession(sessionId, sessionTimeout); + } + + public long createSession(int sessionTimeout) { + if (localSessionsEnabled) { + return localSessionTracker.createSession(sessionTimeout); + } + return globalSessionTracker.createSession(sessionTimeout); + } + + // Returns the serverId from the sessionId (the high order byte) + public static long getServerIdFromSessionId(long sessionId) { + return sessionId >> 56; + } + + public void checkSession(long sessionId, Object owner) + throws SessionExpiredException, SessionMovedException, + UnknownSessionException { + if (localSessionTracker != null) { + try { + localSessionTracker.checkSession(sessionId, owner); + // A session can both be a local and global session during + // upgrade + if (!isGlobalSession(sessionId)) { + return; + } + } catch(UnknownSessionException e) { + // Ignore. We'll check instead whether it's a global session + } + } + try { + globalSessionTracker.checkSession(sessionId, owner); + // if we can get here, it is a valid global session + return; + } catch (UnknownSessionException e) { + // Ignore. This may be local session from other servers. + } + + /* + * if local session is not enabled or it used to be our local session + * throw sessions expires + */ + if (!localSessionsEnabled + || (getServerIdFromSessionId(sessionId) == serverId)) { + throw new SessionExpiredException(); + } + } + + public void checkGlobalSession(long sessionId, Object owner) + throws SessionExpiredException, SessionMovedException { + try { + globalSessionTracker.checkSession(sessionId, owner); + } catch (UnknownSessionException e) { + // For global session, if we don't know it, it is already expired + throw new SessionExpiredException(); + } + } + + public void setOwner(long sessionId, Object owner) + throws SessionExpiredException { + if (localSessionTracker != null) { + try { + localSessionTracker.setOwner(sessionId, owner); + return; + } catch(SessionExpiredException e) { + // Ignore. We'll check instead whether it's a global session + } + } + globalSessionTracker.setOwner(sessionId, owner); + } + + public void dumpSessions(PrintWriter pwriter) { + if (localSessionTracker != null) { + pwriter.print("Local "); + localSessionTracker.dumpSessions(pwriter); + } + pwriter.print("Global "); + globalSessionTracker.dumpSessions(pwriter); + } + + public void setSessionClosing(long sessionId) { + // call is no-op if session isn't tracked so safe to call both + if (localSessionTracker != null) { + localSessionTracker.setSessionClosing(sessionId); + } + globalSessionTracker.setSessionClosing(sessionId); + } +} Index: src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java =================================================================== --- src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java (revision 1446831) +++ src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java (working copy) @@ -20,16 +20,20 @@ import java.io.IOException; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.SessionExpiredException; import org.apache.zookeeper.jmx.MBeanRegistry; +import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.server.DataTreeBean; import org.apache.zookeeper.server.FinalRequestProcessor; +import org.apache.zookeeper.server.SessionTrackerImpl; import org.apache.zookeeper.server.PrepRequestProcessor; +import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; import org.apache.zookeeper.server.ServerCnxn; -import org.apache.zookeeper.server.SessionTrackerImpl; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.txn.ErrorTxn; /** * @@ -39,6 +43,7 @@ * FinalRequestProcessor */ public class LeaderZooKeeperServer extends QuorumZooKeeperServer { + CommitProcessor commitProcessor; /** @@ -54,6 +59,40 @@ return self.leader; } + public class LeaderPrepRequestProcessor extends PrepRequestProcessor { + LeaderZooKeeperServer lzks; + + public LeaderPrepRequestProcessor(LeaderZooKeeperServer zks, + RequestProcessor nextProcessor) { + super(zks, nextProcessor); + this.lzks = zks; + } + + @Override + protected void pRequest(Request request) throws RequestProcessorException { + // Check if this is a local session and we are trying to create + // an ephemeral node, in which case we upgrade the session + Request upgradeRequest = null; + try { + upgradeRequest = lzks.checkUpgradeSession(request); + } catch (KeeperException ke) { + if (request.getHdr() != null) { + request.getHdr().setType(OpCode.error); + request.setTxn(new ErrorTxn(ke.code().intValue())); + } + request.setException(ke); + LOG.error("Error creating upgrade request" + ke.getMessage()); + } catch (IOException ie) { + LOG.error("Unexpected error in upgrade", ie); + } + if (upgradeRequest != null) { + super.pRequest(upgradeRequest); + } + + super.pRequest(request); + } + } + @Override protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); @@ -64,7 +103,8 @@ ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor); proposalProcessor.initialize(); - firstProcessor = new PrepRequestProcessor(this, proposalProcessor); + firstProcessor = new LeaderPrepRequestProcessor( + this, proposalProcessor); ((PrepRequestProcessor)firstProcessor).start(); } @@ -75,20 +115,23 @@ @Override public void createSessionTracker() { - sessionTracker = new SessionTrackerImpl(this, getZKDatabase().getSessionWithTimeOuts(), - tickTime, self.getId()); - } - - @Override - protected void startSessionTracker() { - ((SessionTrackerImpl)sessionTracker).start(); + sessionTracker = new LeaderSessionTracker( + this, getZKDatabase().getSessionWithTimeOuts(), + tickTime, self.getId(), self.areLocalSessionsEnabled()); } - public boolean touch(long sess, int to) { return sessionTracker.touchSession(sess, to); } + public boolean checkIfValidGlobalSession(long sess, int to) { + if (self.areLocalSessionsEnabled() && + !upgradeableSessionTracker.isGlobalSession(sess)) { + return false; + } + return sessionTracker.touchSession(sess, to); + } + @Override protected void registerJMX() { // register with JMX Index: src/java/main/org/apache/zookeeper/server/quorum/Learner.java =================================================================== --- src/java/main/org/apache/zookeeper/server/quorum/Learner.java (revision 1446831) +++ src/java/main/org/apache/zookeeper/server/quorum/Learner.java (working copy) @@ -30,6 +30,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.LinkedList; +import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; @@ -434,8 +435,7 @@ DataInputStream dis = new DataInputStream(bis); long sessionId = dis.readLong(); boolean valid = dis.readBoolean(); - ServerCnxn cnxn = pendingRevalidations - .remove(sessionId); + ServerCnxn cnxn = pendingRevalidations.remove(sessionId); if (cnxn == null) { LOG.warn("Missing session 0x" + Long.toHexString(sessionId) @@ -455,8 +455,7 @@ // Send back the ping with our session data ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); - HashMap touchTable = zk - .getTouchSnapshot(); + Map touchTable = zk.getTouchSnapshot(); for (Entry entry : touchTable.entrySet()) { dos.writeLong(entry.getKey()); dos.writeInt(entry.getValue()); Index: src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java =================================================================== --- src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (revision 1446831) +++ src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (working copy) @@ -522,7 +522,7 @@ ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); dos.writeLong(id); - boolean valid = leader.zk.touch(id, to); + boolean valid = leader.zk.checkIfValidGlobalSession(id, to); if (valid) { try { //set the session owner Index: src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java =================================================================== --- src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java (revision 1446831) +++ src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java (working copy) @@ -15,82 +15,203 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.zookeeper.server.quorum; import java.io.PrintWriter; -import java.util.HashMap; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; -import org.apache.zookeeper.server.SessionTracker; +import org.apache.zookeeper.KeeperException.SessionExpiredException; +import org.apache.zookeeper.KeeperException.SessionMovedException; +import org.apache.zookeeper.KeeperException.UnknownSessionException; import org.apache.zookeeper.server.SessionTrackerImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * This is really just a shell of a SessionTracker that tracks session activity - * to be forwarded to the Leader using a PING. + * The learner session tracker is used by learners (followers and observers) to + * track zookeeper sessions which may or may not be echoed to the leader. When + * a new session is created it is saved locally in a wrapped + * LocalSessionTracker. It can subsequently be upgraded to a global session + * as required. If an upgrade is requested the session is removed from local + * collections while keeping the same session ID. It is up to the caller to + * queue a session creation request for the leader. + * A secondary function of the learner session tracker is to remember sessions + * which have been touched in this service. This information is passed along + * to the leader with a ping. */ -public class LearnerSessionTracker implements SessionTracker { - SessionExpirer expirer; +public class LearnerSessionTracker extends UpgradeableSessionTracker { + private static final Logger LOG = LoggerFactory.getLogger(LearnerSessionTracker.class); + + private final SessionExpirer expirer; + // Touch table for the global sessions + private final AtomicReference> touchTable = + new AtomicReference>(); + private final long serverId; + private final AtomicLong nextSessionId = new AtomicLong(); - HashMap touchTable = new HashMap(); - long serverId = 1; - long nextSessionId=0; - - private ConcurrentHashMap sessionsWithTimeouts; + private final boolean localSessionsEnabled; + private final ConcurrentMap globalSessionsWithTimeouts; public LearnerSessionTracker(SessionExpirer expirer, - ConcurrentHashMap sessionsWithTimeouts, long id) { + ConcurrentMap sessionsWithTimeouts, + int tickTime, long id, boolean localSessionsEnabled) { this.expirer = expirer; - this.sessionsWithTimeouts = sessionsWithTimeouts; + this.touchTable.set(new ConcurrentHashMap()); + this.globalSessionsWithTimeouts = sessionsWithTimeouts; this.serverId = id; - nextSessionId = SessionTrackerImpl.initializeNextSession(this.serverId); - + nextSessionId.set(SessionTrackerImpl.initializeNextSession(serverId)); + + this.localSessionsEnabled = localSessionsEnabled; + if (this.localSessionsEnabled) { + createLocalSessionTracker(expirer, tickTime, id); + } } - synchronized public void removeSession(long sessionId) { - sessionsWithTimeouts.remove(sessionId); - touchTable.remove(sessionId); + public void removeSession(long sessionId) { + if (localSessionTracker != null) { + localSessionTracker.removeSession(sessionId); + } + globalSessionsWithTimeouts.remove(sessionId); + touchTable.get().remove(sessionId); + } + + public void start() { + if (localSessionTracker != null) { + localSessionTracker.start(); + } } public void shutdown() { + if (localSessionTracker != null) { + localSessionTracker.shutdown(); + } } - synchronized public void addSession(long sessionId, int sessionTimeout) { - sessionsWithTimeouts.put(sessionId, sessionTimeout); - touchTable.put(sessionId, sessionTimeout); + public boolean isGlobalSession(long sessionId) { + return globalSessionsWithTimeouts.containsKey(sessionId); } - synchronized public boolean touchSession(long sessionId, int sessionTimeout) { - touchTable.put(sessionId, sessionTimeout); - return true; + public boolean addGlobalSession(long sessionId, int sessionTimeout) { + boolean added = + globalSessionsWithTimeouts.put(sessionId, sessionTimeout) == null; + if (localSessionsEnabled && added) { + // Only do extra logging so we know what kind of session this is + // if we're supporting both kinds of sessions + LOG.info("Adding global session 0x" + Long.toHexString(sessionId)); + } + touchTable.get().put(sessionId, sessionTimeout); + return added; } - synchronized HashMap snapshot() { - HashMap oldTouchTable = touchTable; - touchTable = new HashMap(); - return oldTouchTable; + public boolean addSession(long sessionId, int sessionTimeout) { + boolean added; + if (localSessionsEnabled && !isGlobalSession(sessionId)) { + added = localSessionTracker.addSession(sessionId, sessionTimeout); + // Check for race condition with session upgrading + if (isGlobalSession(sessionId)) { + added = false; + localSessionTracker.removeSession(sessionId); + } else if (added) { + LOG.info("Adding local session 0x" + + Long.toHexString(sessionId)); + } + } else { + added = addGlobalSession(sessionId, sessionTimeout); + } + return added; } + public boolean touchSession(long sessionId, int sessionTimeout) { + if (localSessionsEnabled) { + if (localSessionTracker.touchSession(sessionId, sessionTimeout)) { + return true; + } + if (!isGlobalSession(sessionId)) { + return false; + } + } + touchTable.get().put(sessionId, sessionTimeout); + return true; + } - synchronized public long createSession(int sessionTimeout) { - return (nextSessionId++); + public Map snapshot() { + return touchTable.getAndSet(new ConcurrentHashMap()); } - public void checkSession(long sessionId, Object owner) { - // Nothing to do here. Sessions are checked at the Leader + public long createSession(int sessionTimeout) { + if (localSessionsEnabled) { + return localSessionTracker.createSession(sessionTimeout); + } + return nextSessionId.getAndIncrement(); } - - public void setOwner(long sessionId, Object owner) { - // Nothing to do here. Sessions are checked at the Leader + + public void checkSession(long sessionId, Object owner) + throws SessionExpiredException, SessionMovedException { + if (localSessionTracker != null) { + try { + localSessionTracker.checkSession(sessionId, owner); + return; + } catch(UnknownSessionException e) { + // Check whether it's a global session. We can ignore those + // because they are handled at the leader, but if not, rethrow. + // We check local session status first to avoid race condition + // with session upgrading. + if (!isGlobalSession(sessionId)) { + throw new SessionExpiredException(); + } + } + } + } + + public void setOwner(long sessionId, Object owner) + throws SessionExpiredException { + if (localSessionTracker != null) { + try { + localSessionTracker.setOwner(sessionId, owner); + return; + } catch(SessionExpiredException e) { + // Check whether it's a global session. We can ignore those + // because they are handled at the leader, but if not, rethrow. + // We check local session status first to avoid race condition + // with session upgrading. + if (!isGlobalSession(sessionId)) { + throw e; + } + } + } } public void dumpSessions(PrintWriter pwriter) { - // the original class didn't have tostring impl, so just - // dup what we had before - pwriter.println(toString()); + if (localSessionTracker != null) { + pwriter.print("Local "); + localSessionTracker.dumpSessions(pwriter); + } + pwriter.print("Global Sessions("); + pwriter.print(globalSessionsWithTimeouts.size()); + pwriter.println("):"); + ArrayList sessionIds = + new ArrayList(globalSessionsWithTimeouts.keySet()); + Collections.sort(sessionIds); + for (long sessionId : sessionIds) { + pwriter.print("0x"); + pwriter.print(Long.toHexString(sessionId)); + pwriter.print("\t"); + pwriter.print(globalSessionsWithTimeouts.get(sessionId)); + pwriter.println("ms"); + } } public void setSessionClosing(long sessionId) { - // Nothing to do here. + // Global sessions handled on the leader; this call is a no-op if + // not tracked as a local session so safe to call in both cases. + if (localSessionTracker != null) { + localSessionTracker.setSessionClosing(sessionId); + } } } Index: src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java =================================================================== --- src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java (revision 1446831) +++ src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java (working copy) @@ -18,19 +18,23 @@ package org.apache.zookeeper.server.quorum; import java.io.IOException; -import java.util.HashMap; +import java.util.Collections; +import java.util.Map; import org.apache.zookeeper.jmx.MBeanRegistry; +import org.apache.zookeeper.KeeperException.SessionExpiredException; import org.apache.zookeeper.server.DataTreeBean; +import org.apache.zookeeper.server.quorum.LearnerSessionTracker; import org.apache.zookeeper.server.ServerCnxn; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServerBean; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; /** - * Parent class for all ZooKeeperServers for Learners + * Parent class for all ZooKeeperServers for Learners */ -public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer { +public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer { + public LearnerZooKeeperServer(FileTxnSnapLog logFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, ZKDatabase zkDb, QuorumPeer self) @@ -42,47 +46,50 @@ /** * Abstract method to return the learner associated with this server. * Since the Learner may change under our feet (when QuorumPeer reassigns - * it) we can't simply take a reference here. Instead, we need the - * subclasses to implement this. + * it) we can't simply take a reference here. Instead, we need the + * subclasses to implement this. */ - abstract public Learner getLearner(); - + abstract public Learner getLearner(); + /** * Returns the current state of the session tracker. This is only currently * used by a Learner to build a ping response packet. - * + * */ - protected HashMap getTouchSnapshot() { + protected Map getTouchSnapshot() { if (sessionTracker != null) { return ((LearnerSessionTracker) sessionTracker).snapshot(); } - return new HashMap(); + Map map = Collections.emptyMap(); + return map; } - + /** * Returns the id of the associated QuorumPeer, which will do for a unique - * id of this server. + * id of this server. */ @Override public long getServerId() { return self.getId(); - } - + } + @Override public void createSessionTracker() { - sessionTracker = new LearnerSessionTracker(this, getZKDatabase().getSessionWithTimeOuts(), - self.getId()); + sessionTracker = new LearnerSessionTracker( + this, getZKDatabase().getSessionWithTimeOuts(), + this.tickTime, self.getId(), self.areLocalSessionsEnabled()); } - - @Override - protected void startSessionTracker() {} - + @Override protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException { - getLearner().validateSession(cnxn, sessionId, sessionTimeout); + if (upgradeableSessionTracker.isLocalSession(sessionId)) { + super.revalidateSession(cnxn, sessionId, sessionTimeout); + } else { + getLearner().validateSession(cnxn, sessionId, sessionTimeout); + } } - + @Override protected void registerJMX() { // register with JMX Index: src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java =================================================================== --- src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java (revision 0) +++ src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java (working copy) @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.server.quorum; + +import java.util.concurrent.ConcurrentMap; + +import org.apache.zookeeper.server.SessionTrackerImpl; + +/** + * Local session tracker. + */ +public class LocalSessionTracker extends SessionTrackerImpl { + public LocalSessionTracker(SessionExpirer expirer, + ConcurrentMap sessionsWithTimeouts, + int tickTime, long id) { + super(expirer, sessionsWithTimeouts, tickTime, id); + } + + public boolean isLocalSession(long sessionId) { + return isTrackingSession(sessionId); + } + + public boolean isGlobalSession(long sessionId) { + return false; + } + + public boolean addGlobalSession(long sessionId, int sessionTimeout) { + throw new UnsupportedOperationException(); + } +} Index: src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java =================================================================== --- src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java (revision 1446831) +++ src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java (working copy) @@ -18,15 +18,18 @@ package org.apache.zookeeper.server.quorum; +import java.io.IOException; import java.util.concurrent.LinkedBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.server.RequestProcessor; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.ZooTrace; +import org.apache.zookeeper.txn.ErrorTxn; /** * This RequestProcessor forwards any requests that modify the state of the @@ -90,12 +93,28 @@ case OpCode.delete: case OpCode.setData: case OpCode.setACL: - case OpCode.createSession: - case OpCode.closeSession: case OpCode.multi: case OpCode.check: + try { + zks.getSessionTracker().setOwner(request.sessionId, request.getOwner()); + } catch (KeeperException e) { + /* + * We cannot do anything when session is already expired + * the request need to go through, otherwise the commit + * processor will get stuck + */ + LOG.warn("Session already expired 0x" + + Long.toHexString(request.sessionId)); + } zks.getObserver().request(request); break; + case OpCode.createSession: + case OpCode.closeSession: + // Don't forward local sessions to the leader. + if (!request.isLocalSession()) { + zks.getObserver().request(request); + } + break; } } } catch (Exception e) { @@ -109,6 +128,22 @@ */ public void processRequest(Request request) { if (!finished) { + Request upgradeRequest = null; + try { + upgradeRequest = zks.checkUpgradeSession(request); + } catch (KeeperException ke) { + if (request.getHdr() != null) { + request.getHdr().setType(OpCode.error); + request.setTxn(new ErrorTxn(ke.code().intValue())); + } + request.setException(ke); + LOG.info("Error creating upgrade request", ke); + } catch (IOException ie) { + LOG.error("Unexpected error in upgrade", ie); + } + if (upgradeRequest != null) { + queuedRequests.add(upgradeRequest); + } queuedRequests.add(request); } } Index: src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java =================================================================== --- src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java (revision 1446831) +++ src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java (working copy) @@ -68,10 +68,10 @@ * call processRequest on the next processor. */ - if(request instanceof LearnerSyncRequest){ + if (request instanceof LearnerSyncRequest){ zks.getLeader().processSync((LearnerSyncRequest)request); } else { - nextProcessor.processRequest(request); + nextProcessor.processRequest(request); if (request.getHdr() != null) { // We need to sync and get consensus on any transactions try { Index: src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java =================================================================== --- src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (revision 1446831) +++ src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (working copy) @@ -368,6 +368,18 @@ protected int tickTime; /** + * Whether learners in this quorum should create new sessions as local. + * False by default to preserve existing behavior. + */ + protected boolean localSessionsEnabled = false; + + /** + * Whether learners in this quorum should upgrade local sessions to + * global. Only matters if local sessions are enabled. + */ + protected boolean localSessionsUpgradingEnabled = true; + + /** * Minimum number of milliseconds to allow for session timeout. * A value of -1 indicates unset, use default. */ @@ -1089,6 +1101,28 @@ return fac.getMaxClientCnxnsPerHost(); } + /** Whether local sessions are enabled */ + public boolean areLocalSessionsEnabled() { + return localSessionsEnabled; + } + + /** Whether to enable local sessions */ + public void enableLocalSessions(boolean flag) { + LOG.info("Local sessions " + (flag ? "enabled" : "disabled")); + localSessionsEnabled = flag; + } + + /** Whether local sessions are allowed to upgrade to global sessions */ + public boolean isLocalSessionsUpgradingEnabled() { + return localSessionsUpgradingEnabled; + } + + /** Whether to allow local sessions to upgrade to global sessions */ + public void enableLocalSessionsUpgrading(boolean flag) { + LOG.info("Local session upgrading " + (flag ? "enabled" : "disabled")); + localSessionsUpgradingEnabled = flag; + } + /** minimum session timeout in milliseconds */ public int getMinSessionTimeout() { return minSessionTimeout == -1 ? tickTime * 2 : minSessionTimeout; @@ -1105,7 +1139,7 @@ return maxSessionTimeout == -1 ? tickTime * 20 : maxSessionTimeout; } - /** minimum session timeout in milliseconds */ + /** maximum session timeout in milliseconds */ public void setMaxSessionTimeout(int max) { LOG.info("maxSessionTimeout set to " + max); this.maxSessionTimeout = max; Index: src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java =================================================================== --- src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java (revision 1446831) +++ src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java (working copy) @@ -62,6 +62,8 @@ protected int minSessionTimeout = -1; /** defaults to -1 if not set explicitly */ protected int maxSessionTimeout = -1; + protected boolean localSessionsEnabled = false; + protected boolean localSessionsUpgradingEnabled = false; protected int initLimit; protected int syncLimit; @@ -171,6 +173,10 @@ dataLogDir = vff.create(value); } else if (key.equals("clientPort")) { clientPort = Integer.parseInt(value); + } else if (key.equals("localSessionsEnabled")) { + localSessionsEnabled = Boolean.parseBoolean(value); + } else if (key.equals("localSessionsUpgradingEnabled")) { + localSessionsUpgradingEnabled = Boolean.parseBoolean(value); } else if (key.equals("clientPortAddress")) { clientPortAddress = value.trim(); } else if (key.equals("tickTime")) { @@ -472,12 +478,16 @@ public int getMaxClientCnxns() { return maxClientCnxns; } public int getMinSessionTimeout() { return minSessionTimeout; } public int getMaxSessionTimeout() { return maxSessionTimeout; } + public boolean areLocalSessionsEnabled() { return localSessionsEnabled; } + public boolean isLocalSessionsUpgradingEnabled() { + return localSessionsUpgradingEnabled; + } public int getInitLimit() { return initLimit; } public int getSyncLimit() { return syncLimit; } public int getElectionAlg() { return electionAlg; } - public int getElectionPort() { return electionPort; } - + public int getElectionPort() { return electionPort; } + public int getSnapRetainCount() { return snapRetainCount; } @@ -486,7 +496,7 @@ return purgeInterval; } - public QuorumVerifier getQuorumVerifier() { + public QuorumVerifier getQuorumVerifier() { return quorumVerifier; } Index: src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java =================================================================== --- src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java (revision 1446831) +++ src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java (working copy) @@ -110,7 +110,7 @@ .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); - + if (args.length == 1 && config.isDistributed()) { runFromConfig(config); } else { @@ -127,17 +127,20 @@ } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } - + LOG.info("Starting quorum peer"); try { ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); - - quorumPeer = new QuorumPeer(); + + quorumPeer = new QuorumPeer(); quorumPeer.setTxnFactory(new FileTxnSnapLog( config.getDataLogDir(), config.getDataDir())); + quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled()); + quorumPeer.enableLocalSessionsUpgrading( + config.isLocalSessionsUpgradingEnabled()); //quorumPeer.setQuorumPeers(config.getAllMembers()); quorumPeer.setElectionType(config.getElectionAlg()); quorumPeer.setMyid(config.getServerId()); @@ -153,7 +156,7 @@ quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); quorumPeer.setCnxnFactory(cnxnFactory); quorumPeer.setLearnerType(config.getPeerType()); - + quorumPeer.start(); quorumPeer.join(); } catch (InterruptedException e) { Index: src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java =================================================================== --- src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java (revision 1446831) +++ src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java (working copy) @@ -17,8 +17,16 @@ */ package org.apache.zookeeper.server.quorum; +import java.io.IOException; import java.io.PrintWriter; +import java.nio.ByteBuffer; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.proto.CreateRequest; +import org.apache.zookeeper.server.ByteBufferInputStream; +import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; @@ -28,7 +36,9 @@ * a quorum. */ public abstract class QuorumZooKeeperServer extends ZooKeeperServer { + protected final QuorumPeer self; + protected UpgradeableSessionTracker upgradeableSessionTracker; protected QuorumZooKeeperServer(FileTxnSnapLog logFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, @@ -39,6 +49,95 @@ } @Override + protected void startSessionTracker() { + upgradeableSessionTracker = (UpgradeableSessionTracker) sessionTracker; + upgradeableSessionTracker.start(); + } + + public Request checkUpgradeSession(Request request) + throws IOException, KeeperException { + // If this is a request for a local session and it is to + // create an ephemeral node, then upgrade the session and return + // a new session request for the leader. + // This is called by the request processor thread (either follower + // or observer request processor), which is unique to a learner. + // So will not be called concurrently by two threads. + if (request.type != OpCode.create || + !upgradeableSessionTracker.isLocalSession(request.sessionId)) { + return null; + } + CreateRequest createRequest = new CreateRequest(); + request.request.rewind(); + ByteBufferInputStream.byteBuffer2Record(request.request, createRequest); + request.request.rewind(); + CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags()); + if (!createMode.isEphemeral()) { + return null; + } + // Uh oh. We need to upgrade before we can proceed. + if (!self.isLocalSessionsUpgradingEnabled()) { + throw new KeeperException.EphemeralOnLocalSessionException(); + } + + return makeUpgradeRequest(request.sessionId); + } + + private Request makeUpgradeRequest(long sessionId) { + // Make sure to atomically check local session status, upgrade + // session, and make the session creation request. This is to + // avoid another thread upgrading the session in parallel. + synchronized (upgradeableSessionTracker) { + if (upgradeableSessionTracker.isLocalSession(sessionId)) { + int timeout = upgradeableSessionTracker.upgradeSession(sessionId); + ByteBuffer to = ByteBuffer.allocate(4); + to.putInt(timeout); + return new Request( + null, sessionId, 0, OpCode.createSession, to, null); + } + } + return null; + } + + /** + * Implements the SessionUpgrader interface, + * + * @param sessionId + */ + public void upgrade(long sessionId) { + Request request = makeUpgradeRequest(sessionId); + if (request != null) { + LOG.info("Upgrading session 0x" + Long.toHexString(sessionId)); + // This must be a global request + submitRequest(request); + } + } + + @Override + protected void setLocalSessionFlag(Request si) { + // We need to set isLocalSession to tree for these type of request + // so that the request processor can process them correctly. + switch (si.type) { + case OpCode.createSession: + if (self.areLocalSessionsEnabled()) { + // All new sessions local by default. + si.setLocalSession(true); + } + break; + case OpCode.closeSession: + String reqType = "global"; + if (upgradeableSessionTracker.isLocalSession(si.sessionId)) { + si.setLocalSession(true); + reqType = "local"; + } + LOG.info("Submitting " + reqType + " closeSession request" + + " for session 0x" + Long.toHexString(si.sessionId)); + break; + default: + break; + } + } + + @Override public void dumpConf(PrintWriter pwriter) { super.dumpConf(pwriter); Index: src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java =================================================================== --- src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java (revision 1446831) +++ src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java (working copy) @@ -18,6 +18,8 @@ package org.apache.zookeeper.server.quorum; +import java.io.PrintWriter; + import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.server.DataTreeBean; import org.apache.zookeeper.server.FinalRequestProcessor; @@ -36,11 +38,16 @@ * The very first processor in the chain of request processors is a * ReadOnlyRequestProcessor which drops state-changing requests. */ -public class ReadOnlyZooKeeperServer extends QuorumZooKeeperServer { +public class ReadOnlyZooKeeperServer extends ZooKeeperServer { + protected final QuorumPeer self; private volatile boolean shutdown = false; - ReadOnlyZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) { - super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, zkDb, self); + + ReadOnlyZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, + ZKDatabase zkDb) { + super(logFactory, self.tickTime, self.minSessionTimeout, + self.maxSessionTimeout, zkDb); + this.self = self; } @Override @@ -141,4 +148,21 @@ super.shutdown(); } + @Override + public void dumpConf(PrintWriter pwriter) { + super.dumpConf(pwriter); + + pwriter.print("initLimit="); + pwriter.println(self.getInitLimit()); + pwriter.print("syncLimit="); + pwriter.println(self.getSyncLimit()); + pwriter.print("electionAlg="); + pwriter.println(self.getElectionType()); + pwriter.print("electionPort="); + pwriter.println(self.getElectionAddress().getPort()); + pwriter.print("quorumPort="); + pwriter.println(self.getQuorumAddress().getPort()); + pwriter.print("peerType="); + pwriter.println(self.getLearnerType().ordinal()); + } } Index: src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java =================================================================== --- src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java (revision 0) +++ src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java (working copy) @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.server.quorum; + +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.server.SessionTracker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A session tracker that supports upgradeable local sessions. + */ +public abstract class UpgradeableSessionTracker implements SessionTracker { + private static final Logger LOG = LoggerFactory.getLogger(UpgradeableSessionTracker.class); + + private ConcurrentHashMap localSessionsWithTimeouts; + protected LocalSessionTracker localSessionTracker; + + public void start() {} + + public void createLocalSessionTracker(SessionExpirer expirer, + int tickTime, long id) { + this.localSessionsWithTimeouts = + new ConcurrentHashMap(); + this.localSessionTracker = new LocalSessionTracker( + expirer, this.localSessionsWithTimeouts, tickTime, id); + } + + public boolean isTrackingSession(long sessionId) { + return isLocalSession(sessionId) || isGlobalSession(sessionId); + } + + public boolean isLocalSession(long sessionId) { + return localSessionTracker != null && + localSessionTracker.isTrackingSession(sessionId); + } + + abstract public boolean isGlobalSession(long sessionId); + + /** + * Upgrades the session to a global session. + * This simply removes the session from the local tracker and marks + * it as global. It is up to the caller to actually + * queue up a transaction for the session. + * + * @param sessionId + * @return session timeout (-1 if not a local session) + */ + public int upgradeSession(long sessionId) { + if (localSessionsWithTimeouts == null) { + return -1; + } + // We won't race another upgrade attempt because only one thread + // will get the timeout from the map + Integer timeout = localSessionsWithTimeouts.remove(sessionId); + if (timeout != null) { + LOG.info("Upgrading session 0x" + Long.toHexString(sessionId)); + // Add as global before removing as local + addGlobalSession(sessionId, timeout); + localSessionTracker.removeSession(sessionId); + return timeout; + } + return -1; + } + + public void checkGlobalSession(long sessionId, Object owner) + throws KeeperException.SessionExpiredException, + KeeperException.SessionMovedException { + throw new UnsupportedOperationException(); + } +} Index: src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java =================================================================== --- src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java (revision 1446831) +++ src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java (working copy) @@ -76,15 +76,19 @@ private class MySessionTracker implements SessionTracker { @Override - public void addSession(long id, int to) { + public boolean addGlobalSession(long id, int to) { // TODO Auto-generated method stub - + return false; + } + @Override + public boolean addSession(long id, int to) { + // TODO Auto-generated method stub + return false; } @Override public void checkSession(long sessionId, Object owner) throws SessionExpiredException, SessionMovedException { // TODO Auto-generated method stub - } @Override public long createSession(int sessionTimeout) { @@ -94,23 +98,27 @@ @Override public void dumpSessions(PrintWriter pwriter) { // TODO Auto-generated method stub - + } @Override public void removeSession(long sessionId) { // TODO Auto-generated method stub - + + } + public int upgradeSession(long sessionId) { + // TODO Auto-generated method stub + return 0; } @Override public void setOwner(long id, Object owner) throws SessionExpiredException { // TODO Auto-generated method stub - + } @Override public void shutdown() { // TODO Auto-generated method stub - + } @Override public boolean touchSession(long sessionId, int sessionTimeout) { @@ -121,5 +129,15 @@ public void setSessionClosing(long sessionId) { // TODO Auto-generated method stub } + @Override + public boolean isTrackingSession(long sessionId) { + // TODO Auto-generated method stub + return false; + } + @Override + public void checkGlobalSession(long sessionId, Object owner) + throws SessionExpiredException, SessionMovedException { + // TODO Auto-generated method stub + } } } Index: src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java =================================================================== --- src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java (revision 1446831) +++ src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java (working copy) @@ -113,7 +113,6 @@ @Test public void testEarlyLeaderAbandonment() throws Exception { ClientBase.setupTestEnv(); - final int SERVER_COUNT = 3; final int clientPorts[] = new int[SERVER_COUNT]; StringBuilder sb = new StringBuilder(); @@ -143,10 +142,12 @@ for (int i = 0; i < SERVER_COUNT; i++) { mt[i].start(); - } + // Recreate a client session since the previous session was not persisted. + zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this); + } + + waitForAll(zk, States.CONNECTED); - waitForAll(zk, States.CONNECTED); - // ok lets find the leader and kill everything else, we have a few // seconds, so it should be plenty of time @@ -182,6 +183,8 @@ } for (int i = 0; i < SERVER_COUNT; i++) { if (i != leader) { + // Recreate a client session since the previous session was not persisted. + zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this); waitForOne(zk[i], States.CONNECTED); zk[i].create("/zk" + i, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } @@ -306,24 +309,29 @@ } private void waitForOne(ZooKeeper zk, States state) throws InterruptedException { + int iterations = ClientBase.CONNECTION_TIMEOUT / 500; while (zk.getState() != state) { + if (iterations-- == 0) { + throw new RuntimeException("Waiting too long"); + } Thread.sleep(500); } } private void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException { - int iterations = 10; + int iterations = ClientBase.CONNECTION_TIMEOUT / 1000; boolean someoneNotConnected = true; - while (someoneNotConnected) { + while (someoneNotConnected) { if (iterations-- == 0) { ClientBase.logAllStackTraces(); throw new RuntimeException("Waiting too long"); } someoneNotConnected = false; - for (ZooKeeper zk : zks) { + for (ZooKeeper zk : zks) { if (zk.getState() != state) { someoneNotConnected = true; + break; } } Thread.sleep(1000); Index: src/java/test/org/apache/zookeeper/test/LeaderSessionTrackerTest.java =================================================================== --- src/java/test/org/apache/zookeeper/test/LeaderSessionTrackerTest.java (revision 0) +++ src/java/test/org/apache/zookeeper/test/LeaderSessionTrackerTest.java (working copy) @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; + +import org.apache.jute.BinaryOutputArchive; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Id; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.proto.CreateRequest; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Due to race condition or bad client code, the leader may get request from + * expired session. We need to make sure that we never allow ephmeral node + * to be created in those case, but we do allow normal node to be created. + */ +public class LeaderSessionTrackerTest extends ZKTestCase implements Watcher { + + protected static final Logger LOG = LoggerFactory + .getLogger(LeaderSessionTrackerTest.class); + + QuorumUtil qu; + + @Before + public void setUp() throws Exception { + qu = new QuorumUtil(1); + } + + @After + public void tearDown() throws Exception { + qu.shutdownAll(); + } + + @Test + public void testExpiredSessionWithLocalSession() throws Exception { + testCreateEphemeral(true); + } + + @Test + public void testExpiredSessionWithoutLocalSession() throws Exception { + testCreateEphemeral(false); + } + + /** + * When we create ephemeral node, we need to check against global + * session, so the leader never accept request from an expired session + * (that we no longer track) + * + * This is not the same as SessionInvalidationTest since session + * is not in closing state + */ + public void testCreateEphemeral(boolean localSessionEnabled) throws Exception { + QuorumUtil qu = new QuorumUtil(1); + if (localSessionEnabled) { + qu.enableLocalSession(true); + } + qu.startAll(); + + QuorumPeer leader = qu.getLeaderQuorumPeer(); + + ZooKeeper zk = new ZooKeeper(qu.getConnectString(leader), + CONNECTION_TIMEOUT, this); + + CreateRequest createRequest = new CreateRequest("/impossible", + new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL.toFlag()); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); + createRequest.serialize(boa, "request"); + ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); + + // Mimic sessionId generated by follower's local session tracker + long sid = qu.getFollowerQuorumPeers().get(0).getActiveServer() + .getServerId(); + long fakeSessionId = (sid << 56) + 1; + + LOG.info("Fake session Id: " + Long.toHexString(fakeSessionId)); + + Request request = new Request(null, fakeSessionId, 0, OpCode.create, + bb, new ArrayList()); + + // Submit request directly to leader + leader.getActiveServer().submitRequest(request); + + // Make sure that previous request is finished + zk.create("/ok", new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + Stat stat = zk.exists("/impossible", null); + Assert.assertEquals("Node from fake session get created", null, stat); + + } + + /** + * When local session is enabled, leader will allow persistent node + * to be create for unknown session + */ + @Test + public void testCreatePersistent() throws Exception { + QuorumUtil qu = new QuorumUtil(1); + qu.enableLocalSession(true); + qu.startAll(); + + QuorumPeer leader = qu.getLeaderQuorumPeer(); + + ZooKeeper zk = new ZooKeeper(qu.getConnectString(leader), + CONNECTION_TIMEOUT, this); + + CreateRequest createRequest = new CreateRequest("/success", + new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT.toFlag()); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); + createRequest.serialize(boa, "request"); + ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); + + // Mimic sessionId generated by follower's local session tracker + long sid = qu.getFollowerQuorumPeers().get(0).getActiveServer() + .getServerId(); + long locallSession = (sid << 56) + 1; + + LOG.info("Local session Id: " + Long.toHexString(locallSession)); + + Request request = new Request(null, locallSession, 0, OpCode.create, + bb, new ArrayList()); + + // Submit request directly to leader + leader.getActiveServer().submitRequest(request); + + // Make sure that previous request is finished + zk.create("/ok", new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + Stat stat = zk.exists("/success", null); + Assert.assertTrue("Request from local sesson failed", stat != null); + + } + + @Override + public void process(WatchedEvent event) { + } + +} Index: src/java/test/org/apache/zookeeper/test/LocalSessionRequestTest.java =================================================================== --- src/java/test/org/apache/zookeeper/test/LocalSessionRequestTest.java (revision 0) +++ src/java/test/org/apache/zookeeper/test/LocalSessionRequestTest.java (working copy) @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.server.TraceFormatter; +import org.apache.zookeeper.server.ZKDatabase; +import org.apache.zookeeper.server.quorum.Leader.Proposal; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.test.ClientBase.CountdownWatcher; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Validate that open/close session request of a local session to not propagate + * to other machines in the quorum. We verify this by checking that + * these request doesn't show up in committedLog on other machines. + */ +public class LocalSessionRequestTest extends ZKTestCase { + protected static final Logger LOG = LoggerFactory + .getLogger(LocalSessionRequestTest.class); + public static final int CONNECTION_TIMEOUT = 4000; + + private final QuorumBase qb = new QuorumBase(); + + @Before + public void setUp() throws Exception { + LOG.info("STARTING quorum " + getClass().getName()); + qb.localSessionsEnabled = true; + qb.localSessionsUpgradingEnabled = true; + qb.setUp(); + ClientBase.waitForServerUp(qb.hostPort, 10000); + } + + @After + public void tearDown() throws Exception { + LOG.info("STOPPING quorum " + getClass().getName()); + qb.tearDown(); + } + + @Test + public void testLocalSessionsOnFollower() throws Exception { + testOpenCloseSession(false); + } + + @Test + public void testLocalSessionsOnLeader() throws Exception { + testOpenCloseSession(true); + } + + /** + * Walk through the target peer commmittedLog. + * @param sessionId + * @param peerId + */ + private void validateRequestLog(long sessionId, int peerId) { + String session = Long.toHexString(sessionId); + LOG.info("Searching for txn of session 0x " + session + + " on peer " + peerId); + String peerType = peerId == qb.getLeaderIndex() ? "leader" : "follower"; + QuorumPeer peer = qb.getPeerList().get(peerId); + ZKDatabase db = peer.getActiveServer().getZKDatabase(); + for (Proposal p : db.getCommittedLog()) { + Assert.assertFalse("Should not see " + + TraceFormatter.op2String(p.request.type) + + " request from local session 0x" + session + + " on the " + peerType, + p.request.sessionId == sessionId); + } + } + + public void testOpenCloseSession(boolean onLeader) throws Exception { + int leaderIdx = qb.getLeaderIndex(); + Assert.assertFalse("No leader in quorum?", leaderIdx == -1); + int followerIdx = (leaderIdx + 1) % 5; + int testPeerIdx = onLeader ? leaderIdx : followerIdx; + int verifyPeerIdx = onLeader ? followerIdx : leaderIdx; + + String hostPorts[] = qb.hostPort.split(","); + + CountdownWatcher watcher = new CountdownWatcher(); + DisconnectableZooKeeper client = new DisconnectableZooKeeper( + hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher); + watcher.waitForConnected(CONNECTION_TIMEOUT); + + long localSessionId1 = client.getSessionId(); + + // Cut the connection, so the server will create closeSession as part + // of expiring the session. + client.dontReconnect(); + client.disconnect(); + watcher.reset(); + + // We don't validate right away, will do another session create first + + ZooKeeper zk = qb.createClient(watcher, hostPorts[testPeerIdx], + CONNECTION_TIMEOUT); + watcher.waitForConnected(CONNECTION_TIMEOUT); + + long localSessionId2 = zk.getSessionId(); + + // Send closeSession request. + zk.close(); + watcher.reset(); + + // This should be enough time for the first session to expire and for + // the closeSession request to propagate to other machines (if there is a bug) + // Since it is time sensitive, we have false positive when test + // machine is under load + Thread.sleep(CONNECTION_TIMEOUT * 2); + + // Validate that we don't see any txn from the first session + validateRequestLog(localSessionId1, verifyPeerIdx); + + // Validate that we don't see any txn from the second session + validateRequestLog(localSessionId2, verifyPeerIdx); + + qb.shutdownServers(); + + } +} Index: src/java/test/org/apache/zookeeper/test/LocalSessionsOnlyTest.java =================================================================== --- src/java/test/org/apache/zookeeper/test/LocalSessionsOnlyTest.java (revision 0) +++ src/java/test/org/apache/zookeeper/test/LocalSessionsOnlyTest.java (working copy) @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.util.HashMap; +import java.util.Map.Entry; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; +import org.apache.zookeeper.test.ClientBase.CountdownWatcher; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests learners configured to use local sessions only. Expected + * behavior is that sessions created on the learner will never be + * made global. Operations requiring a global session (e.g. + * creation of ephemeral nodes) will fail with an error. + */ +public class LocalSessionsOnlyTest extends ZKTestCase { + protected static final Logger LOG = LoggerFactory.getLogger(LocalSessionsOnlyTest.class); + public static final int CONNECTION_TIMEOUT = 4000; + + private final QuorumBase qb = new QuorumBase(); + + @Before + public void setUp() throws Exception { + LOG.info("STARTING quorum " + getClass().getName()); + qb.localSessionsEnabled = true; + qb.localSessionsUpgradingEnabled = false; + qb.setUp(); + ClientBase.waitForServerUp(qb.hostPort, 10000); + } + + @After + public void tearDown() throws Exception { + LOG.info("STOPPING quorum " + getClass().getName()); + qb.tearDown(); + } + + @Test + public void testLocalSessionsOnFollower() throws Exception { + testLocalSessions(false); + } + + @Test + public void testLocalSessionsOnLeader() throws Exception { + testLocalSessions(true); + } + + private void testLocalSessions(boolean testLeader) throws Exception { + String nodePrefix = "/testLocalSessions-" + + (testLeader ? "leaderTest-" : "followerTest-"); + int leaderIdx = qb.getLeaderIndex(); + Assert.assertFalse("No leader in quorum?", leaderIdx == -1); + int followerIdx = (leaderIdx + 1) % 5; + int testPeerIdx = testLeader ? leaderIdx : followerIdx; + String hostPorts[] = qb.hostPort.split(","); + + CountdownWatcher watcher = new CountdownWatcher(); + ZooKeeper zk = qb.createClient(watcher, hostPorts[testPeerIdx], + CONNECTION_TIMEOUT); + watcher.waitForConnected(CONNECTION_TIMEOUT); + + long localSessionId = zk.getSessionId(); + + // Try creating some data. + for (int i = 0; i < 5; i++) { + zk.create(nodePrefix + i, new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + // Now, try an ephemeral node. This should fail since we + // cannot create ephemeral nodes on a local session. + try { + zk.create(nodePrefix + "ephemeral", new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + Assert.fail("Ephemeral node creation should fail."); + } catch (KeeperException.EphemeralOnLocalSessionException e) { + } + + // Close the session. + zk.close(); + + // Validate data on both follower and leader + HashMap peers = new HashMap(); + peers.put("leader", leaderIdx); + peers.put("follower", followerIdx); + for (Entry entry: peers.entrySet()) { + watcher.reset(); + // Try reconnecting with a new session. + // The data should be persisted, even though the session was not. + zk = qb.createClient(watcher, hostPorts[entry.getValue()], + CONNECTION_TIMEOUT); + watcher.waitForConnected(CONNECTION_TIMEOUT); + + long newSessionId = zk.getSessionId(); + Assert.assertFalse(newSessionId == localSessionId); + + for (int i = 0; i < 5; i++) { + Assert.assertNotNull("Data not exists in " + entry.getKey(), + zk.exists(nodePrefix + i, null)); + } + + // We may get the correct exception but the txn may go through + Assert.assertNull("Data exists in " + entry.getKey(), + zk.exists(nodePrefix + "ephemeral", null)); + + zk.close(); + } + qb.shutdownServers(); + } +} Index: src/java/test/org/apache/zookeeper/test/QuorumBase.java =================================================================== --- src/java/test/org/apache/zookeeper/test/QuorumBase.java (revision 1446831) +++ src/java/test/org/apache/zookeeper/test/QuorumBase.java (working copy) @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashSet; import java.util.Set; @@ -33,6 +34,7 @@ import org.apache.zookeeper.server.quorum.QuorumPeer; import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; +import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; import org.apache.zookeeper.server.util.OSMXBean; import org.junit.Assert; import org.junit.Test; @@ -60,7 +62,10 @@ protected int portClient3; protected int portClient4; protected int portClient5; - + + protected boolean localSessionsEnabled = false; + protected boolean localSessionsUpgradingEnabled = false; + @Test // This just avoids complaints by junit public void testNull() { @@ -188,6 +193,17 @@ LOG.info("QuorumPeer 4 voting view: " + s4.getVotingView()); LOG.info("QuorumPeer 5 voting view: " + s5.getVotingView()); + s1.enableLocalSessions(localSessionsEnabled); + s2.enableLocalSessions(localSessionsEnabled); + s3.enableLocalSessions(localSessionsEnabled); + s4.enableLocalSessions(localSessionsEnabled); + s5.enableLocalSessions(localSessionsEnabled); + s1.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled); + s2.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled); + s3.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled); + s4.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled); + s5.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled); + LOG.info("start QuorumPeer 1"); s1.start(); LOG.info("start QuorumPeer 2"); @@ -230,9 +246,33 @@ } JMXEnv.ensureAll(ensureNames.toArray(new String[ensureNames.size()])); } - - - public void setupServers() throws IOException { + + public int getLeaderIndex() { + if (s1.getPeerState() == ServerState.LEADING) { + return 0; + } else if (s2.getPeerState() == ServerState.LEADING) { + return 1; + } else if (s3.getPeerState() == ServerState.LEADING) { + return 2; + } else if (s4.getPeerState() == ServerState.LEADING) { + return 3; + } else if (s5.getPeerState() == ServerState.LEADING) { + return 4; + } + return -1; + } + + public ArrayList getPeerList() { + ArrayList peers = new ArrayList(); + peers.add(s1); + peers.add(s2); + peers.add(s3); + peers.add(s4); + peers.add(s5); + return peers; + } + + public void setupServers() throws IOException { setupServer(1); setupServer(2); setupServer(3); @@ -303,7 +343,7 @@ Assert.assertEquals(portClient5, s5.getClientPort()); } } - + @Override public void tearDown() throws Exception { LOG.info("TearDown started"); @@ -334,6 +374,9 @@ } public static void shutdown(QuorumPeer qp) { + if (qp == null) { + return; + } try { LOG.info("Shutting down quorum peer " + qp.getName()); qp.shutdown(); Index: src/java/test/org/apache/zookeeper/test/QuorumTest.java =================================================================== --- src/java/test/org/apache/zookeeper/test/QuorumTest.java (revision 1446831) +++ src/java/test/org/apache/zookeeper/test/QuorumTest.java (working copy) @@ -304,22 +304,20 @@ while(qu.getPeer(index).peer.leader == null) index++; - ZooKeeper zk = new ZooKeeper( - "127.0.0.1:" + qu.getPeer((index == 1)?2:1).peer.getClientPort(), - ClientBase.CONNECTION_TIMEOUT, watcher); - watcher.waitForConnected(CONNECTION_TIMEOUT); - // break the quorum qu.shutdown(index); - - // Wait until we disconnect to proceed - watcher.waitForDisconnected(CONNECTION_TIMEOUT); // try to reestablish the quorum qu.start(index); + + // Connect the client after services are restarted (otherwise we would get + // SessionExpiredException as the previous local session was not persisted). + ZooKeeper zk = new ZooKeeper( + "127.0.0.1:" + qu.getPeer((index == 1)?2:1).peer.getClientPort(), + ClientBase.CONNECTION_TIMEOUT, watcher); try{ - watcher.waitForConnected(30000); + watcher.waitForConnected(CONNECTION_TIMEOUT); } catch(TimeoutException e) { Assert.fail("client could not connect to reestablished quorum: giving up after 30+ seconds."); } Index: src/java/test/org/apache/zookeeper/test/QuorumUtil.java =================================================================== --- src/java/test/org/apache/zookeeper/test/QuorumUtil.java (revision 1446831) +++ src/java/test/org/apache/zookeeper/test/QuorumUtil.java (working copy) @@ -21,13 +21,13 @@ import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.zookeeper.PortAssignment; import org.apache.zookeeper.server.quorum.Election; import org.apache.zookeeper.server.quorum.QuorumPeer; @@ -35,6 +35,8 @@ import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; import org.apache.zookeeper.server.util.OSMXBean; import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Utility for quorum testing. Setups 2n+1 peers and allows to start/stop all @@ -73,6 +75,8 @@ private int electionAlg; + private boolean localSessionEnabled; + /** * Initializes 2n+1 quorum peers which will form a ZooKeeper ensemble. * @@ -122,6 +126,10 @@ return peers.get(id); } + public void enableLocalSession(boolean localSessionEnabled) { + this.localSessionEnabled = localSessionEnabled; + } + public void startAll() throws IOException { shutdownAll(); for (int i = 1; i <= ALL; ++i) { @@ -181,6 +189,9 @@ LOG.info("Creating QuorumPeer " + ps.id + "; public port " + ps.clientPort); ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg, ps.id, tickTime, initLimit, syncLimit); + if (localSessionEnabled) { + ps.peer.enableLocalSessions(true); + } Assert.assertEquals(ps.clientPort, ps.peer.getClientPort()); ps.peer.start(); @@ -197,6 +208,9 @@ LOG.info("Creating QuorumPeer " + ps.id + "; public port " + ps.clientPort); ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg, ps.id, tickTime, initLimit, syncLimit); + if (localSessionEnabled) { + ps.peer.enableLocalSessions(true); + } Assert.assertEquals(ps.clientPort, ps.peer.getClientPort()); ps.peer.start(); @@ -242,6 +256,29 @@ return hostPort; } + public String getConnectString(QuorumPeer peer) { + return "127.0.0.1:" + peer.getClientPort(); + } + + public QuorumPeer getLeaderQuorumPeer() { + for (int i = 1; i <= ALL; i++) { + if (peers.get(i).peer.leader != null) { + return peers.get(i).peer; + } + } + return null; + } + + public List getFollowerQuorumPeers() { + ArrayList peerList = new ArrayList(ALL - 1); + for (int i = 1; i <= ALL; i++) { + if (peers.get(i).peer.leader == null) { + peerList.add(peers.get(i).peer); + } + } + return peerList; + } + public void tearDown() throws Exception { LOG.info("TearDown started"); Index: src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java =================================================================== --- src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java (revision 1446831) +++ src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java (working copy) @@ -80,6 +80,12 @@ watcher.reset(); qu.shutdown(2); + zk.close(); + + // Re-connect the client (in case we were connected to the shut down + // server and the local session was not persisted). + zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, + watcher, true); watcher.waitForConnected(CONNECTION_TIMEOUT); // read operation during r/o mode @@ -97,6 +103,13 @@ qu.start(2); Assert.assertTrue("waiting for server up", ClientBase.waitForServerUp( "127.0.0.1:" + qu.getPeer(2).clientPort, CONNECTION_TIMEOUT)); + zk.close(); + watcher.reset(); + + // Re-connect the client (in case we were connected to the shut down + // server and the local session was not persisted). + zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, + watcher, true); watcher.waitForConnected(CONNECTION_TIMEOUT); zk.setData(node, "We're in the quorum now".getBytes(), -1); @@ -132,6 +145,15 @@ // kill peer and wait no more than 5 seconds for read-only server // to be started (which should take one tickTime (2 seconds)) qu.shutdown(2); + + // Re-connect the client (in case we were connected to the shut down + // server and the local session was not persisted). + zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, + new Watcher() { + public void process(WatchedEvent event) { + states.add(event.getState()); + } + }, true); long start = System.currentTimeMillis(); while (!(zk.getState() == States.CONNECTEDREADONLY)) { Thread.sleep(200); @@ -185,7 +207,6 @@ @SuppressWarnings("deprecation") @Test public void testSeekForRwServer() throws Exception { - // setup the logger to capture all logs Layout layout = Logger.getRootLogger().getAppender("CONSOLE") .getLayout(); Index: src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java =================================================================== --- src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java (revision 0) +++ src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java (working copy) @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.zookeeper.KeeperException.SessionExpiredException; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.server.SessionTracker.Session; +import org.apache.zookeeper.server.SessionTracker.SessionExpirer; +import org.apache.zookeeper.server.quorum.LeaderSessionTracker; +import org.apache.zookeeper.server.quorum.LearnerSessionTracker; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Validate various type of sessions againts leader session tracker and learner + * session tracker + */ +public class SessionTrackerCheckTest extends ZKTestCase { + + protected static final Logger LOG = LoggerFactory + .getLogger(SessionTrackerCheckTest.class); + + private ConcurrentHashMap sessionsWithTimeouts = + new ConcurrentHashMap(); + + private class Expirer implements SessionExpirer { + long sid; + + public Expirer(long sid) { + this.sid = sid; + } + + public void expire(Session session) { + } + + public long getServerId() { + return sid; + } + } + + @Before + public void setUp() throws Exception { + sessionsWithTimeouts.clear(); + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testLearnerSessionTracker() throws Exception { + Expirer expirer = new Expirer(1); + // With local session on + LearnerSessionTracker tracker = new LearnerSessionTracker(expirer, + sessionsWithTimeouts, 1000, expirer.sid, true); + + // Unknown session + long sessionId = 0xb100ded; + try { + tracker.checkSession(sessionId, null); + Assert.fail("Unknown session should have failed"); + } catch (SessionExpiredException e) { + // Get expected exception + } + + // Global session + sessionsWithTimeouts.put(sessionId, 10000); + try { + tracker.checkSession(sessionId, null); + } catch (Exception e) { + Assert.fail("Global session should not fail"); + } + + // Local session + sessionId = 0xf005ba11; + tracker.addSession(sessionId, 10000); + try { + tracker.checkSession(sessionId, null); + } catch (Exception e) { + Assert.fail("Local session should not fail"); + } + + // During session upgrade + sessionsWithTimeouts.put(sessionId, 10000); + try { + tracker.checkSession(sessionId, null); + } catch (Exception e) { + Assert.fail("Session during upgrade should not fail"); + } + + // With local session off + tracker = new LearnerSessionTracker(expirer, sessionsWithTimeouts, + 1000, expirer.sid, false); + + // Should be noop + sessionId = 0xdeadbeef; + try { + tracker.checkSession(sessionId, null); + } catch (Exception e) { + Assert.fail("Should not get any exception"); + } + + } + + @Test + public void testLeaderSessionTracker() throws Exception { + Expirer expirer = new Expirer(2); + // With local session on + LeaderSessionTracker tracker = new LeaderSessionTracker(expirer, + sessionsWithTimeouts, 1000, expirer.sid, true); + + // Local session from other server + long sessionId = ((expirer.sid + 1) << 56) + 1; + try { + tracker.checkSession(sessionId, null); + } catch (Exception e) { + Assert.fail("local session from other server should not fail"); + } + + // Global session + tracker.addGlobalSession(sessionId, 10000); + try { + tracker.checkSession(sessionId, null); + } catch (Exception e) { + Assert.fail("Global session should not fail"); + } + try { + tracker.checkGlobalSession(sessionId, null); + } catch (Exception e) { + Assert.fail("Global session should not fail " + e); + } + + // Local session from the leader + sessionId = (expirer.sid << 56) + 1; + ; + tracker.addSession(sessionId, 10000); + try { + tracker.checkSession(sessionId, null); + } catch (Exception e) { + Assert.fail("Local session on the leader should not fail"); + } + + // During session upgrade + tracker.addGlobalSession(sessionId, 10000); + try { + tracker.checkSession(sessionId, null); + } catch (Exception e) { + Assert.fail("Session during upgrade should not fail"); + } + try { + tracker.checkGlobalSession(sessionId, null); + } catch (Exception e) { + Assert.fail("Global session should not fail " + e); + } + + // With local session off + tracker = new LeaderSessionTracker(expirer, sessionsWithTimeouts, 1000, + expirer.sid, false); + + // Global session + sessionId = 0xdeadbeef; + tracker.addSession(sessionId, 10000); + try { + tracker.checkSession(sessionId, null); + } catch (Exception e) { + Assert.fail("Global session should not fail"); + } + try { + tracker.checkGlobalSession(sessionId, null); + } catch (Exception e) { + Assert.fail("Global session should not fail"); + } + + // Local session from other server + sessionId = ((expirer.sid + 1) << 56) + 2; + try { + tracker.checkSession(sessionId, null); + Assert.fail("local session from other server should fail"); + } catch (SessionExpiredException e) { + // Got expected exception + } + + // Local session from the leader + sessionId = ((expirer.sid) << 56) + 2; + try { + tracker.checkSession(sessionId, null); + Assert.fail("local session from the leader should fail"); + } catch (SessionExpiredException e) { + // Got expected exception + } + + } + +} Index: src/java/test/org/apache/zookeeper/test/SessionUpgradeTest.java =================================================================== --- src/java/test/org/apache/zookeeper/test/SessionUpgradeTest.java (revision 0) +++ src/java/test/org/apache/zookeeper/test/SessionUpgradeTest.java (working copy) @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; +import org.apache.zookeeper.test.ClientBase.CountdownWatcher; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests that session upgrade works from local to global sessions. + * Expected behavior is that if global-only sessions are unset, + * and no upgrade interval is specified, then sessions will be + * created locally to the host. They will be upgraded to global + * sessions iff an operation is done on that session which requires + * persistence, i.e. creating an ephemeral node. + */ +public class SessionUpgradeTest extends ZKTestCase { + protected static final Logger LOG = LoggerFactory.getLogger(SessionUpgradeTest.class); + public static final int CONNECTION_TIMEOUT = 4000; + + private final QuorumBase qb = new QuorumBase(); + + @Before + public void setUp() throws Exception { + LOG.info("STARTING quorum " + getClass().getName()); + qb.localSessionsEnabled = true; + qb.localSessionsUpgradingEnabled = true; + qb.setUp(); + ClientBase.waitForServerUp(qb.hostPort, 10000); + } + + @After + public void tearDown() throws Exception { + LOG.info("STOPPING quorum " + getClass().getName()); + qb.tearDown(); + } + + @Test + public void testLocalSessionsWithoutEphemeralOnFollower() throws Exception { + testLocalSessionsWithoutEphemeral(false); + } + + @Test + public void testLocalSessionsWithoutEphemeralOnLeader() throws Exception { + testLocalSessionsWithoutEphemeral(true); + } + + private void testLocalSessionsWithoutEphemeral(boolean testLeader) + throws Exception { + String nodePrefix = "/testLocalSessions-" + + (testLeader ? "leaderTest-" : "followerTest-"); + int leaderIdx = qb.getLeaderIndex(); + Assert.assertFalse("No leader in quorum?", leaderIdx == -1); + int followerIdx = (leaderIdx + 1) % 5; + int otherFollowerIdx = (leaderIdx + 2) % 5; + int testPeerIdx = testLeader ? leaderIdx : followerIdx; + String hostPorts[] = qb.hostPort.split(","); + CountdownWatcher watcher = new CountdownWatcher(); + DisconnectableZooKeeper zk = new DisconnectableZooKeeper( + hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher); + watcher.waitForConnected(CONNECTION_TIMEOUT); + + // Try creating some data. + for (int i = 0; i < 5; i++) { + zk.create(nodePrefix + i, new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + long localSessionId = zk.getSessionId(); + byte[] localSessionPwd = zk.getSessionPasswd().clone(); + + // Try connecting with the same session id on a different + // server. This should fail since it is a local sesion. + try { + watcher.reset(); + DisconnectableZooKeeper zknew = new DisconnectableZooKeeper( + hostPorts[otherFollowerIdx], CONNECTION_TIMEOUT, watcher, + localSessionId, localSessionPwd); + + zknew.create(nodePrefix + "5", new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + Assert.fail("Connection on the same session ID should fail."); + } catch (KeeperException.SessionExpiredException e) { + } catch (KeeperException.ConnectionLossException e) { + } + + // If we're testing a follower, also check the session id on the + // leader. This should also fail + if (!testLeader) { + try { + watcher.reset(); + DisconnectableZooKeeper zknew = new DisconnectableZooKeeper( + hostPorts[leaderIdx], CONNECTION_TIMEOUT, + watcher, localSessionId, localSessionPwd); + + zknew.create(nodePrefix + "5", new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + Assert.fail("Connection on the same session ID should fail."); + } catch (KeeperException.SessionExpiredException e) { + } catch (KeeperException.ConnectionLossException e) { + } + } + + // However, we should be able to disconnect and reconnect to the same + // server with the same session id (as long as we do it quickly + // before expiration). + zk.disconnect(); + + watcher.reset(); + zk = new DisconnectableZooKeeper( + hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher, + localSessionId, localSessionPwd); + watcher.waitForConnected(CONNECTION_TIMEOUT); + + zk.create(nodePrefix + "6", new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + // If we explicitly close the session, then the session id should no + // longer be valid. + zk.close(); + try { + watcher.reset(); + zk = new DisconnectableZooKeeper( + hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher, + localSessionId, localSessionPwd); + + zk.create(nodePrefix + "7", new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + Assert.fail("Reconnecting to a closed session ID should fail."); + } catch (KeeperException.SessionExpiredException e) { + } + } + + @Test + public void testUpgradeWithEphemeralOnFollower() throws Exception { + testUpgradeWithEphemeral(false); + } + + @Test + public void testUpgradeWithEphemeralOnLeader() throws Exception { + testUpgradeWithEphemeral(true); + } + + private void testUpgradeWithEphemeral(boolean testLeader) + throws Exception { + String nodePrefix = "/testUpgrade-" + + (testLeader ? "leaderTest-" : "followerTest-"); + int leaderIdx = qb.getLeaderIndex(); + Assert.assertFalse("No leader in quorum?", leaderIdx == -1); + int followerIdx = (leaderIdx + 1) % 5; + int otherFollowerIdx = (leaderIdx + 2) % 5; + int testPeerIdx = testLeader ? leaderIdx : followerIdx; + String hostPorts[] = qb.hostPort.split(","); + + CountdownWatcher watcher = new CountdownWatcher(); + DisconnectableZooKeeper zk = new DisconnectableZooKeeper( + hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher); + watcher.waitForConnected(CONNECTION_TIMEOUT); + + // Create some ephemeral nodes. This should force the session to + // be propagated to the other servers in the ensemble. + for (int i = 0; i < 5; i++) { + zk.create(nodePrefix + i, new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + } + + // We should be able to reconnect with the same session id on a + // different server, since it has been propagated. + long localSessionId = zk.getSessionId(); + byte[] localSessionPwd = zk.getSessionPasswd().clone(); + + zk.disconnect(); + watcher.reset(); + zk = new DisconnectableZooKeeper( + hostPorts[otherFollowerIdx], CONNECTION_TIMEOUT, watcher, + localSessionId, localSessionPwd); + watcher.waitForConnected(CONNECTION_TIMEOUT); + + // The created ephemeral nodes are still around. + for (int i = 0; i < 5; i++) { + Assert.assertNotNull(zk.exists(nodePrefix + i, null)); + } + + // When we explicitly close the session, we should not be able to + // reconnect with the same session id + zk.close(); + + try { + watcher.reset(); + zk = new DisconnectableZooKeeper( + hostPorts[otherFollowerIdx], CONNECTION_TIMEOUT, watcher, + localSessionId, localSessionPwd); + zk.exists(nodePrefix + "0", null); + Assert.fail("Reconnecting to a closed session ID should fail."); + } catch (KeeperException.SessionExpiredException e) { + } + + watcher.reset(); + // And the ephemeral nodes will be gone since the session died. + zk = new DisconnectableZooKeeper( + hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher); + watcher.waitForConnected(CONNECTION_TIMEOUT); + for (int i = 0; i < 5; i++) { + Assert.assertNull(zk.exists(nodePrefix + i, null)); + } + } +}