From 133ccb3bba0c1721c166f11a156df5f33e41f9d7 Mon Sep 17 00:00:00 2001 From: Humberto Silva Naves Date: Thu, 20 Nov 2008 01:38:02 -0200 Subject: [PATCH] Agora functionando --- pom.xml | 2 +- .../java/org/hnaves/dfs/election/Election.java | 50 ++++++---- src/main/java/org/hnaves/dfs/election/Group.java | 18 ++-- src/main/java/org/hnaves/dfs/election/Node.java | 16 ++-- .../org/hnaves/dfs/election/socket/UDPSocket.java | 4 +- .../java/org/hnaves/dfs/repository/Container.java | 64 +++++++++++++ .../{FileObject.java => ContentItem.java} | 31 +++---- .../java/org/hnaves/dfs/repository/Repository.java | 101 ++++++++++----------- src/main/resources/log4j.xml | 3 + 9 files changed, 183 insertions(+), 106 deletions(-) create mode 100644 src/main/java/org/hnaves/dfs/repository/Container.java rename src/main/java/org/hnaves/dfs/repository/{FileObject.java => ContentItem.java} (77%) rewrite src/main/java/org/hnaves/dfs/repository/Repository.java (72%) diff --git a/pom.xml b/pom.xml index 9a4da50..08975f5 100644 --- a/pom.xml +++ b/pom.xml @@ -88,7 +88,7 @@ true true - org.hnaves.dfs.election.ElectionNode + org.hnaves.dfs.election.Election diff --git a/src/main/java/org/hnaves/dfs/election/Election.java b/src/main/java/org/hnaves/dfs/election/Election.java index 18b6b56..6f0e1a4 100644 --- a/src/main/java/org/hnaves/dfs/election/Election.java +++ b/src/main/java/org/hnaves/dfs/election/Election.java @@ -23,12 +23,14 @@ import org.slf4j.LoggerFactory; public class Election { private final Logger LOG = LoggerFactory.getLogger(this.getClass()); + private static final int DISCOVERY_TIMEOUT = 10000; + private static final int QUERY_COORDINATOR_TIMEOUT = 1000; private static final int INVITATION_TIMEOUT = 5000; private static final int READY_TIMEOUT = 3000; - private static final int ARE_YOU_THERE_TIMEOUT = 1000; private static final int ACCEPT_TIMEOUT = 3000; - private static final int MAIN_THREAD_WAIT = 500; + private static final int TIME_CONSTANT = 100; + private static final int MAIN_THREAD_WAIT = 30000; private final UDPSocket socket; private final Node id; @@ -56,6 +58,7 @@ public class Election { try { Message message = socket.receiveMessage(); if (message != null && !message.getSourceNode().equals(id)) { + LOG.debug("Processing message " + message); processMessage(message); } } catch(Throwable t) { @@ -74,7 +77,7 @@ public class Election { recovery(); if (status == Status.NORMAL && isGroupCoordinator()) check(); - if (status == Status.MERGE && isGroupCoordinator()) + if (status == Status.MERGE) merge(); if (status == Status.NORMAL && !isGroupCoordinator()) timeout(); @@ -171,9 +174,9 @@ public class Election { sendMessage(invitation, nodeId); } } - group = message.getGroup(); + setGroup(message.getGroup()); Accept accept = new Accept(id, group); - sendMessage(accept, group.getCoordinatorId()); + sendMessage(accept, group.getCoordinator()); coordinatorAccepted = false; wait(ACCEPT_TIMEOUT); if (coordinatorAccepted) { @@ -214,7 +217,7 @@ public class Election { } private synchronized void processAcceptReply(AcceptReply reply) throws IOException { - if (status == Status.ELECTION && group.getCoordinatorId().equals(reply.getSourceNode())) { + if (status == Status.ELECTION && group.getCoordinator().equals(reply.getSourceNode())) { coordinatorAccepted = reply.getAnswer(); } } @@ -222,7 +225,7 @@ public class Election { private synchronized void recovery() { setStatus(Status.ELECTION); counter = counter + 1; - group = new Group(id, counter); + setGroup(new Group(id, counter)); upNodes.clear(); setStatus(Status.REORGANIZATION); setStatus(Status.NORMAL); @@ -231,10 +234,11 @@ public class Election { private synchronized void check() throws IOException, InterruptedException { otherCoordinators.clear(); AreYouCoordinator message = new AreYouCoordinator(id); - socket.broadcastMessage(message); + broadcastMessage(message); wait(DISCOVERY_TIMEOUT); if (!otherCoordinators.isEmpty()) { + wait(TIME_CONSTANT * id.getPriority()); setStatus(Status.MERGE); } } @@ -242,7 +246,7 @@ public class Election { private synchronized void merge() throws IOException, InterruptedException { setStatus(Status.ELECTION); counter = counter + 1; - group = new Group(id, counter); + setGroup(new Group(id, counter)); Set nodes = new HashSet(upNodes); upNodes.clear(); nodes.addAll(otherCoordinators); @@ -269,10 +273,12 @@ public class Election { } private synchronized void timeout() throws IOException, InterruptedException { - AreYouThere areYouThereMessage = new AreYouThere(id, group); + AreYouThere message = new AreYouThere(id, group); + isCoordinatorAlive = false; - sendMessage(areYouThereMessage, group.getCoordinatorId()); - wait(ARE_YOU_THERE_TIMEOUT); + sendMessage(message, group.getCoordinator()); + wait(QUERY_COORDINATOR_TIMEOUT); + if (isCoordinatorAlive) { setStatus(Status.RECOVERY); } @@ -283,13 +289,23 @@ public class Election { this.status = status; } + private void setGroup(Group group) { + LOG.debug("Changing to group " + group + " in node " + id); + this.group = group; + } + private void sendMessage(Message message, Node destination) throws IOException { LOG.debug("Node " + id + " sending message " + message + " to " + destination); socket.sendMessage(message, destination); } + private void broadcastMessage(Message message) throws IOException { + LOG.debug("Node " + id + " broadcasting message " + message); + socket.broadcastMessage(message); + } + public Election(Node id) { - this(id, Node.DEFAULT_PORT); + this(id, Node.DEFAULT_UDP_PORT); } public Election(Node id, int port) { @@ -306,7 +322,7 @@ public class Election { } public boolean isGroupCoordinator() { - return id.equals(group.getCoordinatorId()); + return id.equals(group.getCoordinator()); } public Status getStatus() { @@ -329,9 +345,11 @@ public class Election { } public static void main(String[] args) throws InterruptedException { - Election node = new Election(new Node(args[0], new InetSocketAddress(args[1], Node.DEFAULT_PORT))); + Election node = new Election(new Node(args[0], new InetSocketAddress(args[1], Node.DEFAULT_UDP_PORT))); node.start(); - Thread.sleep(100000); + while(true) { + Thread.sleep(1000); + } } } diff --git a/src/main/java/org/hnaves/dfs/election/Group.java b/src/main/java/org/hnaves/dfs/election/Group.java index 44e00cc..9ef6998 100644 --- a/src/main/java/org/hnaves/dfs/election/Group.java +++ b/src/main/java/org/hnaves/dfs/election/Group.java @@ -6,17 +6,17 @@ import java.io.Serializable; public class Group implements Serializable { private static final long serialVersionUID = 1912100815705396637L; - private final Node coordinatorId; + private final Node coordinator; private final int groupNumber; - public Group(Node coordinatorId, int groupNumber) { - if (coordinatorId == null) throw new NullPointerException("Coordinator id is null"); - this.coordinatorId = coordinatorId; + public Group(Node coordinator, int groupNumber) { + if (coordinator == null) throw new NullPointerException("Coordinator is null"); + this.coordinator = coordinator; this.groupNumber = groupNumber; } - public Node getCoordinatorId() { - return coordinatorId; + public Node getCoordinator() { + return coordinator; } public int getGroupNumber() { @@ -29,19 +29,19 @@ public class Group implements Serializable { if (!(obj instanceof Group)) return false; Group other = (Group) obj; return getGroupNumber() == other.getGroupNumber() && - getCoordinatorId().equals(other.getCoordinatorId()); + getCoordinator().equals(other.getCoordinator()); } @Override public int hashCode() { final int PRIME = 17; int hash = getGroupNumber(); - hash = hash * PRIME + getCoordinatorId().hashCode(); + hash = hash * PRIME + getCoordinator().hashCode(); return hash; } @Override public String toString() { - return "Group " + getGroupNumber() + "[" + getCoordinatorId() + "]"; + return "Group " + getGroupNumber() + "[" + getCoordinator() + "]"; } } diff --git a/src/main/java/org/hnaves/dfs/election/Node.java b/src/main/java/org/hnaves/dfs/election/Node.java index 76ada7b..906c392 100644 --- a/src/main/java/org/hnaves/dfs/election/Node.java +++ b/src/main/java/org/hnaves/dfs/election/Node.java @@ -3,9 +3,9 @@ package org.hnaves.dfs.election; import java.io.Serializable; import java.net.InetSocketAddress; -public class Node implements Serializable, Comparable { +public class Node implements Serializable { private static final long serialVersionUID = 6339718337514260251L; - public static int DEFAULT_PORT = 10340; + public static int DEFAULT_UDP_PORT = 10340; private final String name; private final InetSocketAddress address; @@ -32,9 +32,13 @@ public class Node implements Serializable, Comparable { public InetSocketAddress getAddress() { return address; } + + public int getPriority() { + return hashCode() % 31; + } private static InetSocketAddress getLocalHost() { - return new InetSocketAddress(DEFAULT_PORT); + return new InetSocketAddress(DEFAULT_UDP_PORT); } @Override @@ -52,10 +56,6 @@ public class Node implements Serializable, Comparable { @Override public String toString() { - return getAddress().toString(); - } - - public int compareTo(Node o) { - return getAddress().getAddress().getHostAddress().compareTo(o.getAddress().getAddress().getHostAddress()); + return getName() + "@" + getAddress().toString(); } } diff --git a/src/main/java/org/hnaves/dfs/election/socket/UDPSocket.java b/src/main/java/org/hnaves/dfs/election/socket/UDPSocket.java index 8614fbf..a5b96ce 100644 --- a/src/main/java/org/hnaves/dfs/election/socket/UDPSocket.java +++ b/src/main/java/org/hnaves/dfs/election/socket/UDPSocket.java @@ -17,7 +17,7 @@ public class UDPSocket { private final DatagramSocket socket; public UDPSocket() { - this(Node.DEFAULT_PORT); + this(Node.DEFAULT_UDP_PORT); } public UDPSocket(int port) { @@ -57,7 +57,7 @@ public class UDPSocket { } public void broadcastMessage(Message message) throws IOException { - broadcastMessage(message, Node.DEFAULT_PORT); + broadcastMessage(message, Node.DEFAULT_UDP_PORT); } diff --git a/src/main/java/org/hnaves/dfs/repository/Container.java b/src/main/java/org/hnaves/dfs/repository/Container.java new file mode 100644 index 0000000..1038908 --- /dev/null +++ b/src/main/java/org/hnaves/dfs/repository/Container.java @@ -0,0 +1,64 @@ +package org.hnaves.dfs.repository; + +import java.io.File; +import java.io.Serializable; +import java.util.ArrayList; + +import org.hnaves.dfs.election.Node; + +public class Container implements Serializable { + private static final long serialVersionUID = 4190763929963328455L; + private final ArrayList content = new ArrayList(); + + public ContentItem[] getContent() { + return (ContentItem[]) content.toArray(); + } + + public void clearContent() { + content.clear(); + } + + public void addItem(ContentItem item) { + if (item == null) throw new NullPointerException("Item is null"); + content.add(item); + } + + public void merge(Container other) { + if (other == null) throw new NullPointerException("Container is null"); + content.addAll(other.content); + } + + public void updateFiles(Node location, File directory) { + if (directory == null) throw new NullPointerException("Directory is null"); + if (!directory.isDirectory()) throw new IllegalArgumentException("Invalid directory " + directory); + content.clear(); + for(File file : directory.listFiles()) { + if (!file.isFile()) continue; + if (file.getName().startsWith(".")) continue; + ContentItem item = new ContentItem(location, file); + content.add(item); + } + } + + @Override + public boolean equals(Object obj) { + if (obj == this) return true; + if (!(obj instanceof Container)) return false; + Container other = (Container) obj; + return content.equals(other.content); + } + + @Override + public int hashCode() { + return content.hashCode(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + for(ContentItem item : content) { + sb.append(item).append('\n'); + } + return sb.toString(); + } +} diff --git a/src/main/java/org/hnaves/dfs/repository/FileObject.java b/src/main/java/org/hnaves/dfs/repository/ContentItem.java similarity index 77% rename from src/main/java/org/hnaves/dfs/repository/FileObject.java rename to src/main/java/org/hnaves/dfs/repository/ContentItem.java index 1a6611f..e30b268 100644 --- a/src/main/java/org/hnaves/dfs/repository/FileObject.java +++ b/src/main/java/org/hnaves/dfs/repository/ContentItem.java @@ -9,12 +9,11 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; import java.util.Date; import org.hnaves.dfs.election.Node; -public class FileObject implements Serializable { +public class ContentItem implements Serializable { private static final long serialVersionUID = -2943849225159883158L; @@ -22,25 +21,26 @@ public class FileObject implements Serializable { private final Date lastModified; private final String md5Hash; private final int size; - private transient final File localFile; - private final ArrayList remoteLocations = new ArrayList(); + private Node location; - public FileObject(File localFile) { + public ContentItem(Node location, File localFile) { + if (location == null) throw new NullPointerException("Location is null"); if (localFile == null) throw new NullPointerException("Local file is null"); if (!localFile.exists() || !localFile.isFile() || !localFile.canRead()) throw new IllegalArgumentException("Local file is invalid"); - this.localFile = localFile; + this.location = location; this.name = localFile.getName(); this.lastModified = new Date(localFile.lastModified()); this.size = (int) localFile.length(); try { - this.md5Hash = md5Hash(); + this.md5Hash = md5Hash(localFile); } catch (Exception e) { throw new RuntimeException(e); } } - public FileObject(String name, Date lastModified, String md5Hash, int size) { + public ContentItem(Node location, String name, Date lastModified, String md5Hash, int size) { + if (location == null) throw new NullPointerException("Location is null"); if (name == null) throw new NullPointerException("Name is null"); if (md5Hash == null) throw new NullPointerException("MD5 hash is null"); if (lastModified == null) throw new NullPointerException("Last Modified Date is null"); @@ -48,10 +48,9 @@ public class FileObject implements Serializable { this.lastModified = lastModified; this.md5Hash = md5Hash; this.size = size; - this.localFile = null; } - private String md5Hash() throws NoSuchAlgorithmException, IOException { + private String md5Hash(File localFile) throws NoSuchAlgorithmException, IOException { MessageDigest digest = MessageDigest.getInstance("MD5"); FileChannel fcl = new RandomAccessFile(localFile, "r").getChannel(); ByteBuffer bl = fcl.map(FileChannel.MapMode.READ_ONLY, 0, (int) fcl.size()); @@ -76,19 +75,15 @@ public class FileObject implements Serializable { return size; } - public File getLocalFile() { - return localFile; - } - - public Node[] getRemoteLocations() { - return (Node[]) remoteLocations.toArray(); + public Node getLocation() { + return location; } @Override public boolean equals(Object obj) { if (obj == this) return true; - if (! (obj instanceof FileObject)) return false; - FileObject other = (FileObject) obj; + if (! (obj instanceof ContentItem)) return false; + ContentItem other = (ContentItem) obj; return (other.getMd5Hash().equals(this.getMd5Hash())) && (other.getSize() == this.getSize()); } diff --git a/src/main/java/org/hnaves/dfs/repository/Repository.java b/src/main/java/org/hnaves/dfs/repository/Repository.java dissimilarity index 72% index 8845395..afebda6 100644 --- a/src/main/java/org/hnaves/dfs/repository/Repository.java +++ b/src/main/java/org/hnaves/dfs/repository/Repository.java @@ -1,52 +1,49 @@ -package org.hnaves.dfs.repository; - -import java.io.File; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Set; - -public class Repository implements Serializable { - - private static final long serialVersionUID = -1889954527422091907L; - private final ArrayList files = new ArrayList(); - private final Set md5Hashes = new HashSet(); - - private final transient File localCacheDirectory; - - public Repository(File localCacheDirectory) { - if (localCacheDirectory == null) throw new NullPointerException("Local cache directory is null"); - this.localCacheDirectory = localCacheDirectory; - } - - public FileObject[] getFiles() { - return (FileObject[]) files.toArray(); - } - - public boolean addFile(FileObject obj) { - if (obj == null) throw new NullPointerException("File object is null"); - if (md5Hashes.contains(obj.getMd5Hash())) return false; - md5Hashes.add(obj.getMd5Hash()); - files.add(obj); - return true; - } - - @Override - public boolean equals(Object obj) { - if (obj == this) return true; - if (! (obj instanceof Repository)) return false; - Repository other = (Repository) obj; - return super.equals(other); - } - - @Override - public int hashCode() { - return super.hashCode(); - } - - @Override - public String toString() { - return "Repository"; - } - -} +package org.hnaves.dfs.repository; + +import java.io.File; + +import org.hnaves.dfs.election.Node; + +public class Repository { + private final Container localFiles = new Container(); + private final Node location; + private final File repositoryDirectory; + + public Repository(Node location, File repositoryDirectory) { + if (repositoryDirectory == null) throw new NullPointerException("Local cache directory is null"); + if (location == null) throw new NullPointerException("Location is null"); + this.location = location; + this.repositoryDirectory = repositoryDirectory; + } + + public File getRepositoryDirectory() { + return repositoryDirectory; + } + + public Node getLocation() { + return location; + } + + public void updateLocalFiles() { + localFiles.updateFiles(location, repositoryDirectory); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) return true; + if (! (obj instanceof Repository)) return false; + Repository other = (Repository) obj; + return getRepositoryDirectory().equals(other.getRepositoryDirectory()); + } + + @Override + public int hashCode() { + return getRepositoryDirectory().hashCode(); + } + + @Override + public String toString() { + return "Repository[" + getRepositoryDirectory() + "]"; + } + +} diff --git a/src/main/resources/log4j.xml b/src/main/resources/log4j.xml index e6782e7..c629bab 100644 --- a/src/main/resources/log4j.xml +++ b/src/main/resources/log4j.xml @@ -15,6 +15,9 @@ + + + -- 2.11.4.GIT