#!/usr/bin/env ruby # # an arbitrary file synchronizer over tcp # uses intermediate chunk checksum to optimize transfers # "the poor man's rsync" # # Copyright 2007 Yoann Guillot # Licenced under the terms of the WTFPL # require 'digest/md5' class BinSync attr_reader :sock attr_accessor :allow_push def initialize(sock) @sock = sock @allow_push = true end def verb(s) puts s if $VERBOSE end def read_int @sock.read(4).unpack('N').first end def write_int(i) @sock.write [i].pack('N') end def read_chunks(fd, len) while len >= 8196 begin buf = fd.read(8196) rescue Errno::EIO puts $! begin buf = fd.read(512) rescue fd.pos += 512 buf = 0.chr*512 end end len -= buf.length yield buf end buf = (fd.read(len) rescue (0.chr*len)) if len > 0 yield buf if len > 0 end def file_md5(file, off, len) md5 = Digest::MD5.new() File.open(file, 'rb') { |fd| fd.pos = off read_chunks(fd, len) { |c| md5 << c } } md5.hexdigest end # server side def main_loop begin nil while handle_msg rescue Exception verb "E: #$!" end @sock.shutdown rescue nil @sock.close end # server side def handle_msg case @sock.read(4) when nil verb "closed" return false when 'FILE' @curfile = @sock.read(256).gsub('..', '__') @curfile = @curfile[0, @curfile.index(0)] if @curfile.index(0) if not File.exist?(@curfile) if @allow_push File.open(@curfile, 'w') {} else raise 'readonly!' end end verb "syncing #@curfile (#{File.size(@curfile).to_s(16)})" when 'LIST' l = Dir['*'].sort write_int l.length l.each { |f| write_int f.length @sock.write f } when 'LEN?' write_int File.size(@curfile) when 'LEN=' len = read_int verb "resize to #{len}" raise 'readonly!' if not @allow_push File.truncate(@curfile, len) puts 'rsz done' if $DEBUG when 'MD5?' off = read_int len = read_int puts "check #{off.to_s 16} #{len}" if $DEBUG @sock.write file_md5(@curfile, off, len) when 'BUF?' off = read_int len = read_int verb "sending #{off.to_s 16}, #{len}" File.open(@curfile, 'rb') { |fd| fd.pos = off read_chunks(fd, len) { |c| @sock.write c } } when 'BUF=' raise 'readonly!' if not @allow_push off = read_int len = read_int verb "rewriting #{off.to_s 16}, #{len}" File.open(@curfile, 'rb+') { |fd| fd.pos = off read_chunks(@sock, len) { |c| fd.write c } } end true end # requests the file list from the server def list @sock.write 'LIST' nfile = read_int (1..nfile).map { len = read_int @sock.read(len) } end # client side (mode = :get or :put, rfile = filename server-side) def sync(file, opts={}) blocksize = opts.fetch(:blocksize, 64*1024) mode = opts.fetch(:mode, :get) rname = opts.fetch(:rname, file) verb "syncing #{file}" starttime = Time.now @sock.write 'FILE' @sock.write rname.ljust(256, 0.chr) File.open(file, 'w') {} if not File.exist? file and mode == :get @sock.write 'LEN?' rlen = read_int len = File.size(file) if len != rlen verb "size changed: mine #{len}, his #{rlen}" case mode when :get # TODO use mylen to avoid useless MD5? File.truncate(file, rlen) len = rlen when :put @sock.write 'LEN=' write_int len else raise "bad mode #{mode.inspect}, :put or :get ?" end end badchk = 0 badlast = false chk = 0 off = 0 clen = 0 while off < len bt = [] # pipeline requests while off < len and bt.length < opts.fetch(:pipelen, 16) clen = len - off clen = blocksize if clen > blocksize md5 = file_md5(file, off, clen) puts "check #{off.to_s 16} #{clen}" if $DEBUG @sock.write 'MD5?' write_int off write_int clen bt << [off, clen, md5] off += clen end # read all answers bt.each { |a| a << @sock.read(32) } @speed ||= 0 speed_width = 16 todo = [] bt.each do |coff, clen, md5, rmd5| chk += 1 if md5 != rmd5 if clen != blocksize badlast = true else badchk += 1 end verb "difference at #{coff.to_s 16}, #{clen}: mine #{md5}, his #{rmd5}" case mode when :get @sock.write 'BUF?' write_int coff write_int clen when :put else raise "bad mode #{mode.inspect}, :put or :get ?" end todo << [coff, clen] end end todo.each do |coff, clen| pre_t = Time.now case mode when :get File.open(file, 'rb+') { |fd| fd.pos = coff read_chunks(@sock, clen) { |c| fd.write c } } when :put @sock.write 'BUF=' write_int coff write_int clen File.open(file, 'rb') { |fd| fd.pos = coff read_chunks(fd, clen) { |c| @sock.write c } } end @speed = clen.to_f / (Time.now - pre_t) * 3/4 if @speed == 0 # 3/4 just to see it rise @speed = ((speed_width-1)*@speed + clen.to_f / (Time.now - pre_t))/speed_width eta = ((len-coff)/@speed).to_i $stderr.print "#{'%.2f%%' % (coff*100.0/len)} #{'%.2fko/s' % (@speed/1024)} ETA #{eta/3600}h#{(eta/60)%60} \r" end end $stderr.print ' '*40+"\r" s = (badchk * blocksize + (clen if badlast).to_i) / 1024.0 / (Time.now - starttime) badchk += 1 if badlast verb "synced #{badchk}/#{chk} segs in #{'%.02f' % (Time.now - starttime)}s (#{(s > 768) ? ('%.2fM' % (s/1024.0)) : ('%.2fk' % s)}o/s)" end end if __FILE__ == $0 require 'optparse' require 'socket' opts = {} s = nil $VERBOSE = true OptionParser.new { |opt| opt.on('-r', '--ro', 'readonly server') { opts[:readonly] = true } opt.on('-l h:p', '--listen host:port', 'listens on host:port') { |a| s = TCPServer.new(*a.split(':')) } opt.on('-c h:p', '--connect host:port', 'connects to host:port') { |a| s = TCPSocket.open(*a.split(':')) } opt.on('-p', '--put', 'put files to server instead of get') { opts[:mode] = :put } opt.on('-P len', '--pipe len', 'number of requests to queue (avoid rtt)') { |a| opts[:pipelen] = a.to_i } opt.on('-b bs', '--blocksize blocksize', 'set transfert block size (client only)') { |a| opts[:blocksize] = \ case a.downcase[-1] when ?k: 1024 * a.to_i when ?m: 1024*1024 * a.to_i else a.to_i end } opt.on('-v', '--verbose', 'verbose mode') { $VERBOSE = true } opt.on('-q', '--quiet', 'non verbose mode') { $VERBOSE = false } }.parse!(ARGV) if s.kind_of? TCPServer if dir = ARGV.shift Dir.chdir(dir) end puts "waiting connexions on port #{s.addr[1]}" if $VERBOSE loop do a = s.accept puts "incoming connexion from #{a.peeraddr[3].inspect}" if $VERBOSE bs = BinSync.new(a) bs.allow_push = false if opts[:readonly] bs.main_loop end else bs = BinSync.new(s) bsend = proc { |f| if File.directory? f puts "walking #{f}" if $VERBOSE Dir["#{f}/*"].each { |sf| bsend[sf] } else puts f bs.sync(f, opts) end } if not ARGV.empty? ARGV elsif opts[:mode] == :put Dir['*'] else bs.list end.each { |f| bsend[f] } end end