#!/usr/bin/perl -w
#
# This is a DPM database check tool.
# Author: Andrey Kiryanov <kiryanov@cern.ch>
# Version: 2.6
#
use strict;
use Getopt::Std;
use DBI;
$| = 1;
my ($stage, %opts, $contact, $user, $pass, $host, $db, $dbh_dpm, $dbh_cns, $sth, $ref) = 1;
my (%fss, %fsc, %spaces, %sizes, $init, $run);
if(!getopts("hrdyns:c:", \%opts) or $opts{h}) {
	print STDERR <<EOF;
Usage: dpm-dbck [-h] [-d] [-r] [-y|-n] [-s <n>] [-c <config dir>] [<filesystem>[ ...]]

       Please stop all DPM daemons before processing.

       -h		Show this help.
       -y		Answer 'yes' to all questions (auto fix).
       -n		Answer 'no' to all questions (dry run).
       -s		Number of parallel filesystem traverse streams.
       -c		Specify a directory with DPMCONFIG and NSCONFIG files.
       -d		Remove duplicate replicas of the same file.
       -r		Use rfio (deprecated) instead of sftp for disk node access.

       You can specify a list of filesystems to check. By default all filesystems are checked.
EOF
	exit;
}
die "Options -y and -n are mutually exclusive.\n" if $opts{y} and $opts{n};
$opts{c} = '/usr/etc' unless defined $opts{c};
$opts{s} = 10 unless defined $opts{s} and $opts{s} =~ /^\d+$/;
$fsc{$_} = 1 foreach @ARGV;
$SIG{ALRM} = sub {
	$dbh_cns->do("select 1");
	$dbh_dpm->do("select 1");
	alarm 3600;
};
sub fix() {
	print "Fix? ";
	if($opts{y}) {
		print "yes\n";
		return 1;
	}
	if($opts{n}) {
		print "no\n";
		return 0;
	}
	print "[no] ";
	my $resp = <STDIN>;
	return $resp =~ /^[yY]/ ? 1 : 0;
}
sub traverse($) {
	my (@hlist, @plist, %flist);
	my @dirlist = $_[0];
	my $i = -1;
	do {{
		$i++;
		$i = 0 if $i > $opts{s};
		unless(exists $hlist[$i]) {
			$plist[$i] = shift @dirlist or next;
			unless($opts{r}) {
				open $hlist[$i], "echo ls -lf|sftp -oStrictHostKeyChecking=no -b- $plist[$i] 2>/dev/null|" or die "Cannot talk to sftp: $!\n";
			} else {
				open $hlist[$i], "rfdir $plist[$i] 2>/dev/null|" or die "Cannot talk to rfdir: $!\n";
			}
			print ".";
			next;
		}
		if($_ = readline $hlist[$i]) {
			chomp;
			next unless /^(.).{9}\s+(\S+\s+){3}(\d+)(\s+\S+){3}\s+(.+)$/;
			next if $5 eq '.' or $5 eq '..';
			$flist{"$plist[$i]/$5"} = $3 if $1 eq '-';
			push @dirlist, "$plist[$i]/$5" if $1 eq 'd';
		} else {
			die "Directory listing failed with exit code " . ($? >> 8) . "\n" unless close $hlist[$i];
			delete $hlist[$i];
			$i--;
		}
	}} while $#hlist >= 0 or $#dirlist >= 0;
	return \%flist;
}
die "Cannot read DPM config file.\n" unless open FILE, $opts{c} . '/DPMCONFIG';
$contact = <FILE>;
close FILE;
chomp $contact;
$contact =~ /^([\w-]+)\/([^@]+)@([\w.-]+)(\/(\w+))?$/ or die "Cannot parse DPM DB contact string\n";
($user, $pass, $host, $db) = ($1, $2, $3, $5 ? $5 : "dpm_db");
$dbh_dpm = DBI->connect("DBI:mysql:database=$db;host=$host", $user, $pass, {RaiseError => 1, PrintError => 0});
die "Cannot read NS config file.\n" unless open FILE, $opts{c} . '/NSCONFIG';
$contact = <FILE>;
close FILE;
chomp $contact;
$contact =~ /^([\w-]+)\/([^@]+)@([\w.-]+)(\/(\w+))?$/ or die "Cannot parse NS DB contact string\n";
($user, $pass, $host, $db) = ($1, $2, $3, $5 ? $5 : "cns_db");
$dbh_cns = DBI->connect("DBI:mysql:database=$db;host=$host", $user, $pass, {RaiseError => 1, PrintError => 0});
alarm 3600;
open FILE, "/proc/1/comm";
$init = <FILE>;
close FILE;
chomp $init;
if($init eq "init") {
	$run = -e "/var/lock/subsys/dpm" or -e "/var/lock/subsys/dpnsdaemon";
} elsif($init eq "systemd") {
	`systemctl -q is-active dpm 2>/dev/null || systemctl -q is-active dpnsdaemon 2>/dev/null`;
	$run = $? == 0;
} else {
	$run = 1;
}
if($run) {
	print STDERR "Warning: it is STRONGLY advised to stop all DPM daemons before processing.\nPress Ctrl-C NOW to cancel.\n";
	sleep 7;
}
print "*** Stage $stage: Checking virtual namespace continuity.\n";
$sth = $dbh_cns->prepare("select rowid from Cns_file_metadata where parent_fileid!=0 and parent_fileid not in (select fileid from Cns_file_metadata)");
$sth->execute();
$ref = $sth->fetchall_arrayref;
if($#$ref >= 0) {
	print "There are ".($#$ref + 1)." lost entries\n";
	if(fix()) {
		$sth = $dbh_cns->prepare("select fileid from Cns_file_metadata where parent_fileid=0");
		$sth->execute();
		my $rootid = $sth->fetchrow_array;
		$sth = $dbh_cns->prepare("update Cns_file_metadata set parent_fileid=$rootid where rowid=?");
		$sth->execute($$_[0]) foreach @$ref;
	}
}
$stage++;
print "*** Stage $stage: Checking filesystems.\n";
$sth = $dbh_dpm->prepare("select distinct server,fs from dpm_fs");
$sth->execute();
$ref = $sth->fetchall_arrayref;
$fss{"$$_[0]:$$_[1]"} = 1 foreach @$ref;
$sth = $dbh_cns->prepare("select sfn,rowid,status from Cns_file_replica where concat(host,':',fs) not in ('" . join("','", keys %fss) . "')");
$sth->execute();
while($_ = $sth->fetchrow_arrayref) {
	print "Nonexistent filesystem for " . ($$_[2] eq 'D' ? "deleted " : "") . "replica $$_[0]\n";
	if(fix()) {
		$dbh_cns->do("delete from Cns_file_replica where rowid=$$_[1]");
	}
}
foreach my $fs (sort keys %fss) {
	unless(!keys %fsc or $fsc{$fs}) {
		print "* Skipping check for $fs\n";
		next;
	}
	print "* Retrieving directory tree for $fs ";
	my $flist = traverse($fs);
	print "\n";
	$sth = $dbh_cns->prepare("select b.sfn,a.filesize,b.fileid,b.rowid,b.status from Cns_file_metadata as a,Cns_file_replica as b where a.fileid=b.fileid and concat(b.host,':',b.fs)='$fs'");
	$sth->execute();
	while($_ = $sth->fetchrow_arrayref) {
		unless(exists $$flist{$$_[0]}) {
			print $$_[4] eq 'D' ? "Stuck deleted replica entry for $$_[0]\n" : "Missing file $$_[0]\n";
			if(fix()) {
				$dbh_cns->do("delete from Cns_file_replica where rowid=$$_[3]");
			}
		} elsif($$flist{$$_[0]} != $$_[1] and not ($$_[4] eq 'P' and $$_[1] == 0)) {
			print "File size mismatch for $$_[0]: stored $$_[1], actual $$flist{$$_[0]}\n";
			if(fix()) {
				$dbh_cns->do("update Cns_file_metadata set filesize=$$flist{$$_[0]} where fileid=$$_[2]");
			}
			delete $$flist{$$_[0]};
		} else {
			delete $$flist{$$_[0]};
		}
	}
	foreach(sort keys %$flist) {
		print "Stray file $_\n";
		if(fix()) {
			unless($opts{r}) {
				$_ =~ /^(.*?):(.*)$/;
				`echo rm \'$2\'|sftp -oStrictHostKeyChecking=no -b- $1 2>/dev/null`;
			} else {
				`rfrm $_ 2>/dev/null`;
			}
		}
	}
}
$stage++;
print "*** Stage $stage: Checking replica counts.\n";
$sth = $dbh_cns->prepare("select rowid from Cns_file_metadata where filemode&32768=32768 and status!='D' and fileid not in (select distinct fileid from Cns_file_replica)");
$sth->execute();
$ref = $sth->fetchall_arrayref;
if($#$ref >= 0) {
	print "There are ".($#$ref + 1)." file entries with no replicas\n";
	if(fix()) {
		$sth = $dbh_cns->prepare("delete from Cns_file_metadata where rowid=?");
		$sth->execute($$_[0]) foreach @$ref;
	}
}
$sth = $dbh_cns->prepare("select rowid from Cns_file_metadata where filemode&32768=32768 and status='D' and fileid not in (select distinct fileid from Cns_file_replica)");
$sth->execute();
$ref = $sth->fetchall_arrayref;
if($#$ref >= 0) {
	print "There are ".($#$ref + 1)." stuck deleted file entries\n";
	if(fix()) {
		$sth = $dbh_cns->prepare("delete from Cns_file_metadata where rowid=?");
		$sth->execute($$_[0]) foreach @$ref;
	}
}
if($opts{d}) {
	$sth = $dbh_cns->prepare("select fileid from Cns_file_replica where status!='D' group by fileid having count(*)>1");
	$sth->execute();
	$ref = $sth->fetchall_arrayref;
	foreach(@$ref) {
		print "Duplicate replicas of the same file:\n";
		$sth = $dbh_cns->prepare("select sfn,r_type,rowid from Cns_file_replica where fileid=$$_[0] order by r_type");
		$sth->execute();
		my $rlist = $sth->fetchall_arrayref;
		print "$$_[1]\t$$_[0]\n" foreach @$rlist;
		if(fix()) {
			my $pflag = 0;
			foreach(@$rlist) {
				$dbh_cns->do("update Cns_file_replica set r_type='P' where rowid=$$_[2]") if !$pflag and $$_[1] eq 'S';
				if(!$pflag) {
					$pflag = 1;
					next;
				}
				$dbh_cns->do("delete from Cns_file_replica where rowid=$$_[2]");
				unless($opts{r}) {
					$$_[0] =~ /^(.*?):(.*)$/;
					`echo rm \'$2\'|sftp -oStrictHostKeyChecking=no -b- $1 2>/dev/null`;
				} else {
					`rfrm $$_[0] 2>/dev/null`;
				}
			}
		}
	}
}
$sth = $dbh_cns->prepare("select rowid,r_type from Cns_file_replica where status!='D' group by fileid having count(*)=1 and r_type='S'");
$sth->execute();
$ref = $sth->fetchall_arrayref;
if($#$ref >= 0) {
	print "There are ".($#$ref + 1)." file entries with missing primary replicas\n";
	if(fix()) {
		$sth = $dbh_cns->prepare("update Cns_file_replica set r_type='P' where rowid=?");
		$sth->execute($$_[0]) foreach @$ref;
	}
}
$stage++;
print "*** Stage $stage: Checking reference link counts.\n";
$sth = $dbh_cns->prepare("select fileid from Cns_file_metadata as b where b.filemode&16384=16384 and b.nlink!=(select count(*) from Cns_file_metadata as a where a.parent_fileid=b.fileid)");
$sth->execute();
$ref = $sth->fetchall_arrayref;
if($#$ref >= 0) {
	print "There are ".($#$ref + 1)." directory entries with wrong reference link counts\n";
	if(fix()) {
		$sth = $dbh_cns->prepare("select count(*) from Cns_file_metadata where parent_fileid=?");
		foreach(@$ref) {
			$sth->execute($$_[0]);
			my $nlink = $sth->fetchrow_array;
			$dbh_cns->do("update Cns_file_metadata set nlink=$nlink where fileid=$$_[0]");
		}
	}
}
$stage++;
print "*** Stage $stage: Checking space usage accounting.\n";
$sth = $dbh_dpm->prepare("select s_token,u_token,poolname,g_space,u_space,t_space from dpm_space_reserv");
$sth->execute();
$ref = $sth->fetchall_arrayref;
$spaces{$$_[0]} = [@$_[1,2,3,4,5]] foreach @$ref;
$sth = $dbh_cns->prepare("select b.setname,sum(a.filesize) from Cns_file_metadata as a,Cns_file_replica as b where a.fileid=b.fileid and b.setname!='' group by b.setname");
$sth->execute();
$ref = $sth->fetchall_arrayref;
$sizes{$$_[0]} = $$_[1] foreach @$ref;
foreach(keys %spaces) {
	if($spaces{$_}[4] != $spaces{$_}[2]) {
		print "Total space mismatch for token $spaces{$_}[0]: stored $spaces{$_}[4], actual $spaces{$_}[2]\n";
		if(fix()) {
			$dbh_dpm->do("update dpm_space_reserv set t_space=g_space where s_token='$_'");
		}
	}
	$sizes{$_} = 0 unless defined $sizes{$_};
	next unless $spaces{$_}[2] - $spaces{$_}[3] != $sizes{$_};
	if($sizes{$_} > $spaces{$_}[2]) {
		print "Calculated used space for token $spaces{$_}[0] exceeds token size by ".($sizes{$_} - $spaces{$_}[2])."\n";
		if(fix()) {
			$dbh_dpm->do("update dpm_space_reserv set t_space=$sizes{$_},g_space=$sizes{$_},u_space=0 where s_token='$_'");
		}
	} else {
		print "Used space mismatch for token $spaces{$_}[0]: stored ".($spaces{$_}[2] - $spaces{$_}[3]).", calculated $sizes{$_}\n";
		if(fix()) {
			$dbh_dpm->do("update dpm_space_reserv set u_space=g_space-$sizes{$_} where s_token='$_'");
		}
	}
}
$stage++;
print "*** Stage $stage: Checking request log.\n";
$sth = $dbh_dpm->prepare("select count(*) from dpm_get_filereq where r_token not in (select r_token from dpm_req)");
$sth->execute();
$ref = $sth->fetchrow_array;
if($ref) {
	print "There are $ref stray 'get' entries\n";
	if(fix()) {
		$dbh_dpm->do("delete from dpm_get_filereq where r_token not in (select r_token from dpm_req)");
	}
}
$sth = $dbh_dpm->prepare("select count(*) from dpm_put_filereq where r_token not in (select r_token from dpm_req)");
$sth->execute();
$ref = $sth->fetchrow_array;
if($ref) {
	print "There are $ref stray 'put' entries\n";
	if(fix()) {
		$dbh_dpm->do("delete from dpm_put_filereq where r_token not in (select r_token from dpm_req)");
	}
}
print "*** Check complete.\n";
