Download | Plain Text | Line Numbers


/*
 * Copyright (c) 2010, Manuel Mausz. All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 *   - Redistributions of source code must retain the above copyright
 *     notice, this list of conditions and the following disclaimer.
 *
 *   - Redistributions in binary form must reproduce the above copyright
 *     notice, this list of conditions and the following disclaimer in the
 *     documentation and/or other materials provided with the distribution.
 *
 *   - The names of the authors may not be used to endorse or promote products
 *     derived from this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */
 
import static java.lang.System.err;
import static java.lang.System.out;
 
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
 
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.Arrays;
 
import java.net.*;
import java.io.*;
 
/*
 * Fileserver implementation for Lab#1 of DSLab WS10
 * See angabe.pdf for details
 *
 * This code is not documented at all. This is volitional
 *
 * @author Manuel Mausz (0728348)
 */
public class Fileserver
{
  public class ProxyConnection
    extends CommandNetwork
    implements Runnable
  {
    private final SocketChannel sock;
    private final String sharedFilesDir;
    private final ObjectInputStream oin;
    private final ObjectOutputStream oout;
 
    ProxyConnection(SocketChannel sock, String sharedFilesDir)
      throws NoSuchMethodException, IOException
    {
      this.sock = sock;
      this.oout = new ObjectOutputStream(sock.socket().getOutputStream());
      this.oin  = new ObjectInputStream(new BufferedInputStream(sock.socket().getInputStream()));
      this.sharedFilesDir = sharedFilesDir;
 
      setOneCommandMode(true);
      cmdHandler.register("!list",     this, "cmdList");
      cmdHandler.register("!download", this, "cmdDownload");
      cmdHandler.register("unknown",   this, "cmdUnknown");
    }
 
    /*------------------------------------------------------------------------*/
 
    public void cmdList(String cmd, String[] args)
      throws IOException
    {
      if (args.length != 0)
      {
        err.println("Error: Invalid " + cmd + "-command paket from proxy. Ignoring...");
        return;
      }
 
      File file = new File(sharedFilesDir);
      FilenameFilter filter = new FilenameFilter()
      {
        public boolean accept(File dir, String name)
        {
          File file = new File(dir, name);
          return (file.isFile() && file.canRead());
        }
      };
      Utils.sendOutput(oout, file.list(filter));
    }
 
    /*------------------------------------------------------------------------*/
 
    public void cmdDownload(String cmd, String[] args)
      throws IOException
    {
      if (args.length < 2 || args[1].length() <= 0)
      {
        err.println("Error: Invalid " + cmd + "-command paket from proxy. Ignoring...");
        return;
      }
 
      long credits;
      if ((credits = Utils.parseHeaderNum(args, 1)) < 0)
        return;
 
      File file = new File(sharedFilesDir, args[0]);
      if (!file.exists() || !file.getParent().equals(sharedFilesDir))
      {
        Utils.sendError(oout, "File '" + args[0] + "' does not exist");
        return;
      }
      if (!file.isFile())
      {
        Utils.sendError(oout, "File '" + args[0] + "' is not a file");
        return;
      }
      if (!file.canRead())
      {
        Utils.sendError(oout, "File '" + args[0] + "' is not readable");
        return;
      }
 
      long filesize = file.length();
      if (credits < filesize)
      {
        Utils.sendOutput(oout, "You don't have enough credits");
        return;
      }
 
      try
      {
        FileInputStream fin = new FileInputStream(file);
 
        oout.writeUTF(cmd + " " + file.getName() + " " + filesize);
 
        byte[] buffer = new byte[8 * 1024];
        int toread = buffer.length;
        while(filesize > 0)
        {
          if (filesize < toread)
            toread = (int) filesize;
          int count = fin.read(buffer, 0, toread);
          if (count == -1)
            throw new IOException("Error while reading from file");
          oout.write(buffer, 0, count);
          filesize -= count;
        }
      }
      catch(FileNotFoundException e)
      {
        Utils.sendError(oout, "File '" + args[0] + "' is not readable");
      }
      catch(IOException e)
      {
        err.println("Error during file transfer: " + e.getMessage());
      }
    }
 
    /*------------------------------------------------------------------------*/
 
    public void cmdUnknown(String cmd, String[] args)
      throws IOException
    {
      err.println("Error: Unknown data from proxy: " + cmd + " "
          + Utils.join(Arrays.asList(args), " "));
      Utils.sendError(oout, "Unknown command");
    }
 
    /*------------------------------------------------------------------------*/
 
    public void shutdown()
    {
      try
      {
        oout.flush();
      }
      catch(IOException e)
      {}
 
      try
      {
        oin.close();
      }
      catch(IOException e)
      {}
 
      try
      {
        oout.close();
      }
      catch(IOException e)
      {}
 
      try
      {
        if (sock.isOpen())
          sock.close();
      }
      catch(IOException e)
      {}
    }
 
    /*------------------------------------------------------------------------*/
 
    public void run()
    {
      try
      {
        out.println("[" + Thread.currentThread().getId() + "] New connection from tcp:/"
            + sock.socket().getInetAddress() + ":" + sock.socket().getPort());
        run(oin);
        oout.flush();
      }
      catch(CommandHandler.Exception e)
      {
        err.println("Internal Error: " + e.getMessage());
      }
      catch(IOException e)
      {
        /* ignore that exception
         * it's usually a closed connection from client so
         * we can't do anything about it anyway
         */
      }
 
      out.println("[" + Thread.currentThread().getId() + "] Connection closed");
      shutdown();
    }
  }
 
  /*==========================================================================*/
 
  public class TCPSocketReader
    implements Runnable
  {
    private final ServerSocketChannel sschannel;
    private final String sharedFilesDir;
    private final Object mainLock;
    private final ExecutorService pool;
 
    TCPSocketReader(ServerSocketChannel sschannel, String sharedFilesDir,
        Object mainLock)
    {
      this.sschannel      = sschannel;
      this.sharedFilesDir = sharedFilesDir;
      this.mainLock       = mainLock;
      this.pool           = Executors.newCachedThreadPool();
    }
 
    /*------------------------------------------------------------------------*/
 
    public void run()
    {
      try
      {
        while(true)
          pool.execute(new ProxyConnection(sschannel.accept(), sharedFilesDir));
      }
      catch(NoSuchMethodException e)
      {
        err.println("Error: Unable to setup remote command handler");
      }
      catch(IOException e)
      {
        /* ignore that exception
         * thread will shutdown and unlock the main thread
         * which will shutdown the application
         */
      }
 
      pool.shutdown();
      try
      {
        if (!pool.awaitTermination(100, TimeUnit.MILLISECONDS))
          out.println("Trying to shutdown the proxy connections. This may take up to 15 seconds...");
        if (!pool.awaitTermination(5, TimeUnit.SECONDS))
        {
          pool.shutdownNow();
          if (!pool.awaitTermination(5, TimeUnit.SECONDS))
            err.println("Error: Proxy connections did not terminate. You may have to kill that appplication.");
        }
      }
      catch(InterruptedException e)
      {
        pool.shutdownNow();
      }
 
      synchronized(mainLock)
      {
        mainLock.notify();
      }
    }
  }
 
  /*==========================================================================*/
 
  public class Interactive
    extends CommandInteractive
    implements Runnable
  {
    private final InputStream sin;
    private final Object mainLock;
 
    Interactive(InputStream sin, Object mainLock)
      throws NoSuchMethodException
    {
      this.sin      = sin;
      this.mainLock = mainLock;
 
      cmdHandler.register("unknown", this, "cmdUnknown");
      cmdHandler.register("!exit",   this, "cmdExit");
    }
 
    /*------------------------------------------------------------------------*/
 
    public void cmdUnknown(String cmd, String[] args)
    {
      err.println("Unknown command: " + cmd + " "
          + Utils.join(Arrays.asList(args), " "));
    }
 
    /*------------------------------------------------------------------------*/
 
    public void cmdExit(String cmd, String[] args)
    {
      stop();
    }
 
    /*------------------------------------------------------------------------*/
 
    public void printPrompt()
    {
      out.print(">: ");
      out.flush();
    }
 
    /*------------------------------------------------------------------------*/
 
    public void run()
    {
      try
      {
        run(sin);
      }
      catch(CommandHandler.Exception e)
      {
        err.println("Internal Error: " + e.getMessage());
      }
      catch (IOException e)
      {
        /* ignore that exception
         * thread will shutdown and unlock the main thread
         * which will shutdown the application
         */
      }
 
      synchronized(mainLock)
      {
        mainLock.notify();
      }
    }
  }
 
  /*==========================================================================*/
 
  public class PingTask
    implements Runnable
  {
    private final DatagramSocket sock;
    private final DatagramPacket packet;
    private final Object mainLock;
 
    PingTask(DatagramSocket sock, DatagramPacket packet, Object mainLock)
    {
      this.sock     = sock;
      this.packet   = packet;
      this.mainLock = mainLock;
    }
 
    /*------------------------------------------------------------------------*/
 
    public void run()
    {
      try
      {
        sock.send(packet);
      }
      catch(IOException e)
      {
        err.println("Error while sending UDP ping packet: " + e.getMessage()
            + ". Terminating...");
        synchronized(mainLock)
        {
          mainLock.notify();
        }
      }
    }
  }
 
  /*==========================================================================*/
 
  private static String sharedFilesDir;
  private static String proxyHost;
  private static int tcpPort;
  private static int proxyUDPPort;
  private static int alivePeriod;
  private ScheduledExecutorService scheduler = null;
  private ServerSocketChannel sschannel = null;
  private DatagramSocket dsock = null;
  private Thread tTCPSocketReader = null;
  private Thread tInteractive = null;
  private InputStream stdin = null;
  private final Object mainLock = new Object();
 
  /*--------------------------------------------------------------------------*/
 
  public static void usage()
    throws Utils.Shutdown
  {
    out.println("Usage: Fileserver sharedFilesDir tcpPort proxyHost proxyUDPPort alivePeriod\n");
    out.println("sharedFilesDir\t...the directory that contains all the files clients can download");
    out.println("tcpPort\t\t...the port to be used for instantiating a ServerSocket");
    out.println("proxyHost\t...the host name or an IP address where the Proxy is running");
    out.println("proxyUDPPort\t...the UDP port where the Proxy is listening for fileserver datagrams");
    out.println("alivePeriod\t...the period in ms the fileserver needs to send an isAlive datagram to the Proxy");
 
    // Java is some piece of crap which doesn't allow me to set exitcode w/o
    // using System.exit. Maybe someday Java will be a fully functional
    // programming language, but I wouldn't bet my money
    //System.exit(1);
    throw new Utils.Shutdown("FUCK YOU JAVA");
  }
 
  /*--------------------------------------------------------------------------*/
 
  public void bailout(String error)
    throws Utils.Shutdown
  {
    err.println("Error: " + error);
    shutdown();
 
    // Java is some piece of crap which doesn't allow me to set exitcode w/o
    // using System.exit. Maybe someday Java will be a fully functional
    // programming language, but I wouldn't bet my money
    //System.exit(2);
    throw new Utils.Shutdown("FUCK YOU JAVA");
  }
 
  /*--------------------------------------------------------------------------*/
 
  public void parseArgs(String[] args)
  {
    if (args.length != 5)
      usage();
 
    sharedFilesDir = args[0];
    File sharedir = new File(sharedFilesDir);
    if (!sharedir.isDirectory())
      bailout("sharedFilesDir '" + sharedFilesDir + "' is not a directory");
    if (!sharedir.canRead())
      bailout("sharedFilesDir '" + sharedFilesDir + "' is not readable");
 
    try
    {
      tcpPort = Integer.parseInt(args[1]);
      if (tcpPort <= 0 || tcpPort > 65536)
        bailout("tcpPort must be a valid port number (1 - 65535)");
    }
    catch(NumberFormatException e)
    {
      bailout("tcpPort must be numeric");
    }
 
    proxyHost = args[2];
    if (proxyHost.length() == 0)
      bailout("proxyHost is empty");
 
    try
    {
      proxyUDPPort = Integer.parseInt(args[3]);
      if (proxyUDPPort <= 0 || proxyUDPPort > 65536)
        bailout("proxyUDPPort must be a valid port number (1 - 65535)");
    }
    catch(NumberFormatException e)
    {
      bailout("proxyUDPPort must be numeric");
    }
 
    try
    {
      alivePeriod = Integer.parseInt(args[4]);
      if (alivePeriod <= 0)
        bailout("alivePeriod must be positive");
    }
    catch(NumberFormatException e)
    {
      bailout("alivePeriod must be numeric");
    }
  }
 
  /*--------------------------------------------------------------------------*/
 
  public void shutdown()
  {
    try
    {
      if (scheduler != null)
      {
        scheduler.shutdownNow();
        scheduler.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
      }
    }
    catch(InterruptedException e)
    {}
 
    if (dsock != null)
      dsock.close();
 
    try
    {
      if (sschannel != null)
        sschannel.close();
    }
    catch(IOException e)
    {}
 
    try
    {
      if (tTCPSocketReader != null)
        tTCPSocketReader.join();
    }
    catch(InterruptedException e)
    {}
 
    try
    {
      if (tInteractive != null)
      {
        tInteractive.interrupt();
        tInteractive.join();
      }
    }
    catch(InterruptedException e)
    {}
 
    try
    {
      if (stdin != null)
        stdin.close();
    }
    catch(IOException e)
    {}
  }
 
  /*--------------------------------------------------------------------------*/
 
  public void run(String[] args)
  {
    parseArgs(args);
 
    synchronized(mainLock)
    {
      try
      {
        dsock = new DatagramSocket();
        InetAddress proxyaddr = InetAddress.getByName(proxyHost);
        ByteBuffer buffer = ByteBuffer.allocate(4);
        buffer.putInt(tcpPort);
        DatagramPacket dpacket = new DatagramPacket(buffer.array(),
            buffer.array().length, proxyaddr, proxyUDPPort);
 
        scheduler = Executors.newScheduledThreadPool(1);
        ScheduledFuture<?> pingTimer = scheduler.scheduleAtFixedRate(
            new PingTask(dsock, dpacket, mainLock),
            0, alivePeriod, TimeUnit.MILLISECONDS);
      }
      catch(SocketException e)
      {
        bailout("Unable to create UDP Socket: " + e.getMessage());
      }
      catch(UnknownHostException e)
      {
        bailout("Unable to resolve hostname: " + e.getMessage());
      }
 
      try
      {
        sschannel = ServerSocketChannel.open();
        sschannel.socket().bind(new InetSocketAddress(tcpPort));
        tTCPSocketReader = new Thread(new TCPSocketReader(sschannel,
              sharedFilesDir, mainLock));
        tTCPSocketReader.start();
        out.println("Listening on tcp:/" + sschannel.socket().getLocalSocketAddress());
      }
      catch(IOException e)
      {
        bailout("Unable to create TCP Socket: " + e.getMessage());
      }
 
      try
      {
        InputStream stdin = java.nio.channels.Channels.newInputStream(
            new FileInputStream(FileDescriptor.in).getChannel());
        tInteractive = new Thread(new Interactive(stdin, mainLock));
        tInteractive.start();
      }
      catch(NoSuchMethodException e)
      {
        bailout("Unable to setup interactive command handler");
      }
 
      out.println("Fileserver startup successful!");
      try
      {
        mainLock.wait();
      }
      catch(InterruptedException e)
      {
        /* if we get interrupted -> ignore */
      }
 
      try
      {
        /* let the threads shutdown */
        Thread.sleep(100);
      }
      catch(InterruptedException e)
      {}
    }
 
    if (tTCPSocketReader != null && !tTCPSocketReader.isAlive())
      bailout("Listening TCP socket closed unexpected. Terminating...");
 
    shutdown();
  }
 
  /*--------------------------------------------------------------------------*/
 
  public static void main(String[] args)
  {
    try
    {
      Fileserver fserver = new Fileserver();
      fserver.run(args);
    }
    catch(Utils.Shutdown e)
    {}
  }
}