Home
Web Interface
Download
About
Write your ABS code in the text area:
module ReplicationSystem.Server.SyncServerAcceptor; export *; import * from ReplicationSystem.Environment.DataTypes; import * from ReplicationSystem.Client.Interfaces; import * from ReplicationSystem.Server.Interfaces; import * from ReplicationSystem.Server.ConnectionThread; import * from ReplicationSystem.Schedulers; [Plain] class SyncServerAcceptorImpl([Final] [Near] SyncServer server) implements SyncServerAcceptor { Int threads = 0; // for debugging ConnectionThread getConnection(ClientJob job) { ConnectionThread thread = null; // Shutdown flag Bool shutdown = this.server.isShutdownRequested(); if (~ shutdown) { // allocate resources to connection thread thread = new ConnectionThreadImpl(job,server,threads); threads = threads + 1; // for debugging } return thread; } Unit finish(ConnectionThread thread) { } }module ReplicationSystem.Server.BaseReplicationItem; export *; import * from ReplicationSystem.Environment.Interfaces; import * from ReplicationSystem.Environment.ReplicationSnapshot.Interfaces; import * from ReplicationSystem.Environment.DataTypes; import * from ReplicationSystem.Environment.Files; interface InternalItem extends BasicReplicationItem { [Atomic] Directory getState(); [Atomic] Unit setState(Directory dir); } class BasicReplicationItemImpl(FileId qualified, ServerDataBase db) implements InternalItem { Directory snapshot = updateDirWithDir(rootDir(),emptyDir(qualified)); FileEntry getContents() { return dirContent(snapshot); } FileId getAbsoluteDir() { return qualified; } [Atomic] Unit cleanup() { this.snapshot = updateDirWithDir(rootDir(),emptyDir(qualified)); } [Atomic] Directory getState() { return snapshot; } [Atomic] Unit setState(Directory dir) { this.snapshot = dir; } } // For testing class ReplicationFilePattern(FileId qualified, String pattern, ServerDataBase db) implements ServerReplicationItem { InternalItem internal; { internal = new local BasicReplicationItemImpl(qualified,db); } FileEntry getContents() { return internal.getContents(); } Command getCommand() { return ReceivePatternFile; } ReplicationItemType getType() { return ReplicationFilePattern; } FileId getAbsoluteDir() { return internal.getAbsoluteDir(); } [Atomic] Unit refresh() { //We know snapshot cannot be access during the execution of this method //by another task. Directory snapshot = internal.getState(); // get all file names for the newest check points Maybe
ffs = db.listFilesAt(qualified); if (ffs != Nothing) { FileContent content = fromJust(ffs); assert isDirectory(content); FileEntry es = entries(content); Set
> entryset = entrySet(es); while (hasNext(entryset)) { Pair
>,Pair
> nt = next(entryset); Pair
entry = qualifyEntry(snd(nt),qualified); FileId fid = fst(entry); if (isAncester(qualified,fid) && filter(pattern,fid)) { snapshot = updateDirWithContent(snapshot,fid,snd(entry)); } entryset = fst(nt); } } internal.setState(snapshot); } [Atomic] Unit cleanup() { internal.cleanup(); } } //For testing class ReplicationLogItem(FileId qualified, ServerDataBase db) implements ServerReplicationItem { Directory snapshot = rootDir(); InternalItem internal; { internal = new local BasicReplicationItemImpl(qualified,db); } FileEntry getContents() { return internal.getContents(); } Command getCommand() { return AppendSearchFile; } ReplicationItemType getType() { return LogReplicationItem; } FileId getAbsoluteDir() { return internal.getAbsoluteDir(); } [Atomic] Unit refresh() { //We know snapshot cannot be access during the execution of this method //by another task. Directory snapshot = internal.getState(); //get content for this replication item Maybe
ffs = db.listFilesAt(qualified); if (ffs != Nothing) { FileContent content = fromJust(ffs); assert isDirectory(content); //it must be a directory snapshot = updateDirWithDir(snapshot,dir(qualified,entries(content))); } internal.setState(snapshot); } [Atomic] Unit cleanup() { internal.cleanup(); } }module ReplicationSystem.Client.Interfaces; export *; import * from ReplicationSystem.Environment.Files; import * from ReplicationSystem.Environment.DataTypes; import * from ReplicationSystem.Environment.Interfaces; import * from ReplicationSystem.Interfaces; import * from ReplicationSystem.Server.Interfaces; import * from Replication.Network; interface Client extends Node { ClientDataBase getClientDataBase(); } interface ClientConnector extends Client { Unit setAcceptor(ServerAcceptor acceptor); Unit setNetwork(Network network); } // CSP model SyncClient(n) // Java class com.fredhopper.application.SyncClient interface SyncClient extends Client, ClientStateMachine { [Far] ServerAcceptor getAcceptor(); } interface InternalClient extends SyncClient { /* * Existing java implementation does not have client id * the notion of an identifier for each client is required * since the ABS model should guarantee data * consistency as well as deadlock freedom */ ClientId getId(); /* * The number of jobs spawned off so far */ [Atomic] Int jobCount(); /* * Notify a new local job can be started. */ Unit nextJob(); /* * Schedule jobs */ Unit scheduleJob(JobType jb, Schedule schedule); /* * Notify client this job is finished. */ Unit finishJob(ClientJob job); } //interfaces for concurrent clients interface ConcurrentInternalClient extends InternalClient, ConcurrentSyncClient { } //interfaces for concurrent clients interface ConcurrentSyncClient extends SyncClient, ConcurrentClientStateMachine { //notify the next job for this schedule can proceed Unit nextJobFor(Schedule s); } //interfaces for concurrent clients interface ConcurrentClientStateMachine extends ClientStateMachine { Unit waitToReplicateFromBoot(Schedules ss); Unit waitToReplicateM(Schedule s); Unit replicateM(Schedule s); Unit endM(Schedule s); } interface ClientStateMachine { /* view job { * call_ClientStateMachine.start s, * call_ClientStateMachine.boot b, * call_ClientStateMachine.waitToBoot wb, * call_ClientStateMachine.replicate r, * call_ClientStateMachine.waitToReplicate wr, * call_ClientStateMachine.end e * } */ /* * START ::= s WB * WB ::= wb B * | wb E * B ::= b WB * | b WR * | b E * WR ::= wr R * | wr WB * | wr E * R ::= r WR * | wr WB * | wr E * E ::= e */ Unit waitToBoot(); Unit boot(); Unit start(); Unit waitToReplicate(); Unit replicate(); Unit end(); } // Java class com.fredhopper.replication.client.ClientReplicationJob // Java class com.fredhopper.replication.client.ClientBootJob interface ClientJob extends Worker { /* view job { * resolve_ClientJob.registerReplicationItems r, * call_ClientJob.processFile p, * call_ClientJob.processContent c * } */ /* * Bool START.retVal; Int START.callNums; Int T.callNums; * * START ::= r T START.retVal = r.return; START.callNums = T.callNums; * T ::= p c T' T.callNums = 2 + T'.callNums; * | p T' T.callNums = 1 + T'.callNums; * | /\ T.callNums = 0; */ //@ invariant not(retVal) ==> callNums == 0; Bool registerReplicationItems(TransactionId id); /* * CSP model ProcessCommand' * com.fredhopper.replication.client.ClientReplicationJob.receiveItemFragment(DataInputStream, int, ClientReplicationItem) */ Maybe
processFile(FileId id); Unit processContent(File file); Unit receiveSchedule(Schedules schedules); Unit executeJob(); }module ReplicationSystem.Client.ClientJob; export ClientJobImpl; import * from ReplicationSystem.Environment.Files; import * from ReplicationSystem.Environment.DataTypes; import * from ReplicationSystem.Environment.StateMachine; import * from ReplicationSystem.Schedulers; import * from ReplicationSystem.Server.Interfaces; import * from ReplicationSystem.Client.Interfaces; import * from ReplicationSystem.Environment.Interfaces; // CSP model ClientReplicationJob(n) // Java class com.fredhopper.replication.client.ClientReplicationJob class ClientJobImpl( [Final] Int maxJobs, //default maximum number of client jobs allowed per client [Far] [Final] InternalClient client, [Final] JobType job, [Final] Schedule schedule, [Final] Int id) implements ClientJob { Command start = EmptyCommand; Command command = EmptyCommand; Schedules schedules = EmptySet; ClientId clientId = -1; TransactionId transactionId = -1; //debugging ConnectionThread thread = null; [Far] ClientDataBase db; ConnectionThread getConnectionThread() { Fut
fs = client!getAcceptor(); ServerAcceptor acceptor = fs.get; // Acquire a connection Fut
ft = acceptor!getConnection(this); await ft?; ConnectionThread t = ft.get; return t; } Unit clientDB() { Fut
fd = client!getClientDataBase(); this.db = fd.get; } // starts the next set of replication jobs Unit establishSchedule() { Fut
jcf = client!jobCount(); Int stats = jcf.get; if (maxJobs > 0 && stats >= maxJobs) { this.shutDownClient(); } else { Schedules ss = schedules; while (hasNext(ss)) { Pair
nt = next(ss); ss = fst(nt); Schedule s = snd(nt); client!scheduleJob(Replication,s); } } } Unit executeJob() { Fut
fut = client!getId(); clientId = fut.get; // set data base this.clientDB(); // Acquire a connection thread = this.getConnectionThread(); if (thread != null) { // Connection successful! if (job == Boot) { this.becomeState(Booting); thread!command(ListSchedule); await schedules != EmptySet; //establish the next schedule triggers! this.establishSchedule(); } else { this.becomeState(WorkOnReplicate); thread!command(SearchSchedule(schedname(schedule))); await schedules != EmptySet; //establish the next schedule triggers! this.establishSchedule(); // wait for current job to start then end await start == StartSnapShot; await command == EndSnapShot; } //Switch to the proper state after finishing. //From all states we can go to the end state to shutdown. Fut
sd = client!isShutdownRequested(); Bool shutDown = sd.get; if (~shutDown) { this.becomeState(WaitToReplicate); } } else { this.becomeState(End); } Fut
fj = client!finishJob(this); fj.get; //allow next job to proceed client!nextJob(); } Unit becomeState(State state) { if (state == WaitToBoot) { Fut
unit = client!waitToBoot(); unit.get; } else if (state == Booting) { Fut
unit = client!boot(); unit.get; } else if (state == WorkOnReplicate) { Fut
unit = client!replicate(); unit.get; } else if (state == WaitToReplicate) { Fut
unit = client!waitToReplicate(); unit.get; } else if (state == End) { Fut
unit = client!end(); unit.get; } } ClientId forClient() { return clientId; } Unit shutDownClient() { Fut
bf = client!isShutdownRequested(); await bf?; Bool bool = bf.get; if (~bool) { Fut
unit = client!requestShutDown(); await unit?; unit.get; this.becomeState(End); } } Bool registerReplicationItems(TransactionId id) { Fut
reg = db!prepareReplicationItem(id,schedule); Bool rg = reg.get; if (rg) { transactionId = id; } return rg; } Bool hasFile(FileId id) { Fut
he = db!hasFile(id); await he?; return he.get; } /* * CSP model Register * com.fredhopper.replication.client.ClientReplicationJob.receiveItemFragment(DataInputStream, int, ClientReplicationItem) */ Maybe
processFile(FileId id) { Maybe
result = Nothing; Bool hasfile = this.hasFile(id); if (hasfile) { Fut
contentf = db!getContent(id); await contentf?; FileContent content = contentf.get; if (isFile(content)) { FileSize size = content(content); result = Just(size); } } return result; } Unit overwrite(File file) { FileId id = fst(file); FileSize size = fileContent(file); Fut
u = db!updateFile(id,size); await u?; } Unit continue(File file) { FileId id = fst(file); FileSize size = fileContent(file); Bool he = this.hasFile(id); FileSize fsize = 0; if (he) { Fut
s = db!getContent(fst(file)); await s?; FileContent c = s.get; fsize = content(c); } size = size + fsize; Fut
u = db!updateFile(id,size); await u?; } Unit processContent(File file) { await isAppendCommand(command); if (command == SkipFile) { skip; } else if (command == OverwriteFile) { this.overwrite(file); } else if (command == ContinueFile) { this.continue(file); } } Unit command(Command c) { if (c == StartSnapShot) { start = c; } else { command = c; } } Unit receiveSchedule(Schedules schedules) { String sn = ""; Schedules ss = schedules; while (hasNext(ss)) { Pair
nt = next(ss); ss = fst(nt); sn = sn + schedname(snd(nt)) + ", "; } this.schedules = schedules; } }module ReplicationSystem.Client; export *; import * from ReplicationSystem.Environment.DataTypes; import * from ReplicationSystem.Environment.Files; import * from ReplicationSystem.Environment.StateMachine; import * from ReplicationSystem.Client.Interfaces; import * from ReplicationSystem.Interfaces; import * from ReplicationSystem.Server.Interfaces; import * from ReplicationSystem.Environment.Interfaces; import * from ReplicationSystem.Environment.DataBases; import * from ReplicationSystem.Client.ClientJob; import * from Replication.Network; import * from ReplicationSystem.Schedulers; // Implementation of SyncClient // Java class com.fredhopper.application.SyncClient [COG] class SyncClientImpl( [Final] Int maxJobs, [Final] ClientId id) implements InternalClient, ClientConnector { StateMachine machine = stateMachine(); State state = Start; Network network; ServerAcceptor acceptor; ClientDataBase db; Bool shutDown = False; Bool next = False; Set
jobRecords = EmptySet; //measurement List
jobHistories = Nil; List
jobDatas = Nil; List
hit = Nil; List
missed = Nil; String currentState = ""; Int currentTransactionId = -1; { // initialize the client side data base db = new local DataBaseImpl(); } [Atomic] Int jobCount() { return length(jobHistories); } Unit scheduleJob(JobType jb, Schedule schedule) { // wait for its time to be initiated // and the next available slot this.waitFor(schedule); // only proceed if a shutdown // request has not been made. if (~shutDown) { // block subsequent onces this.setNext(schedule); this.makeJob(jb,schedule); hit = Cons(schedule,hit); } else { //record those schedules that are missed missed = Cons(schedule,missed); } } Unit setNext(Schedule schedule) { next = False; } Unit waitFor(Schedule schedule) { await next || shutDown; } Unit makeJob(JobType jb, Schedule schedule) { ClientJob job = new ClientJobImpl(maxJobs,this,jb,schedule,length(jobHistories)); job!executeJob(); jobHistories = Cons(job,jobHistories); jobRecords = Insert(job,jobRecords); } Unit finishJob(ClientJob job) { jobRecords = remove(jobRecords,job); } Unit nextJob() { next = True; } ClientId getId() { return id; } Bool isShutdownRequested() { return shutDown; } Unit requestShutDown() { shutDown = True; await jobRecords == EmptySet; network!shutDown(this); } ServerAcceptor getAcceptor() { return acceptor; } Unit run() { // Makes a transition this.becomesState(WaitToBoot); // wait for acceptor to be ready await acceptor != null; // starts a boot job this.makeJob(Boot,NoSchedule); } ClientDataBase getClientDataBase() { return db; } DataBase getDataBase() { return db; } Unit becomesState(State state) { Set
tos = lookupDefault(machine,this.state,EmptySet); assert tos != EmptySet; // this is an end state assert contains(tos,state); // cannot proceed to specified state this.state = state; } Unit setAcceptor([Far] ServerAcceptor acc) { acceptor = acc; } Unit setNetwork(Network network) { this.network = network; } Unit waitToBoot() { this.becomesState(WaitToBoot); } Unit boot() { this.becomesState(Booting); } Unit start() { this.becomesState(Booting); } Unit waitToReplicate() { this.becomesState(WaitToReplicate); } Unit replicate() { this.becomesState(WorkOnReplicate); } Unit end() { this.becomesState(End); this!requestShutDown(); } }module ReplicationSystem.Server.ConnectionThread; export ConnectionThreadImpl; import * from ReplicationSystem.Environment.Files; import * from ReplicationSystem.Environment.DataTypes; import * from ReplicationSystem.Environment.ReplicationSnapshot.Interfaces; import * from ReplicationSystem.Client.Interfaces; import * from ReplicationSystem.Server.Interfaces; import * from ReplicationSystem.Schedulers; import * from ReplicationSystem.Server.SyncServerAcceptor; [COG] class ConnectionThreadImpl( [Far] ClientJob job, [Far] SyncServer server, Int id) implements ConnectionThread { SyncServerClientCoordinator coord; Maybe
cmd = Nothing; Schedules schedules = EmptySet; ReplicationSnapshot startReplicationUpdate() { //expects only one schedule per replication job assert size(schedules) == 1; Schedule schedule = snd(next(schedules)); // register and refresh snapshot Fut
rp = coord!startReplicationUpdate(this); await rp?; Fut
sp = server!getReplicationSnapshot(); return sp.get; } Unit finishReplicationUpdate() { Fut
rp = this.coord!finishReplicationUpdate(this); await rp?; } Unit run() { Fut
c = server!getCoordinator(); await c?; this.coord = c.get; // wait for client's command await this.cmd != Nothing; // Send schedules schedules = this.sendSchedule(); if (cmd != Just(ListSchedule)) { // Get replication items ReplicationSnapshot snapshot = this.startReplicationUpdate(); Fut
idf = snapshot!getIndexingId(); await idf?; TransactionId tid = idf.get; Fut
b = job!registerReplicationItems(tid); await b?; Bool register = b.get; Set
> filesets = EmptySet; if (register) { Fut
> nis = snapshot!getItems(ssname(fromJust(cmd))); await nis?; Set
newitems = nis.get; filesets = this.registerItems(newitems); } // start snapshot Fut
rp = this.job!command(StartSnapShot); await rp?; while (hasNext(filesets)) { Pair
>,Set
> nfs = next(filesets); filesets = fst(nfs); Set
fileset = snd(nfs); this.transferItems(fileset); } // end snapshot rp = this.job!command(EndSnapShot); await rp?; // tidy up this.finishReplicationUpdate(); } } // send one or more schedules to client job Schedules sendSchedule() { assert isJust(cmd); Schedules results = EmptySet; if (cmd == Just(ListSchedule)) { Fut
ssf = server!listSchedules(); await ssf?; results = ssf.get; } else { Fut
ssf = server!getSchedule(ssname(fromJust(cmd))); await ssf?; Schedule s = ssf.get; results = Insert(s,results); } Fut
rp = this.job!receiveSchedule(results); await rp?; return results; } ClientId forClient() { Fut
id = job!forClient(); return id.get; } Unit command(Command c) { this.cmd = Just(c); } /* * Register replication items with client * Returns a set of files to be replicated */ Set
> registerItems(Set
items) { Set
> regs = EmptySet; //iterate over possible check points while (hasNext(items)) { Pair
,ServerReplicationItem> nis = next(items); items = fst(nis); ServerReplicationItem item = snd(nis); // For now convert to a set // will convert it into directory Fut
entryf = item!getContents(); await entryf?; FileEntry entry = entryf.get; Set
result = EmptySet; Set
ids = getFileIdFromEntries1(entry); while (hasNext(ids)) { Pair
,FileId> nids = next(ids); FileId id = snd(nids); Maybe
content = getFromEntry(entry,id); result = Insert(Pair(id,fromJust(content)),result); ids = fst(nids); } regs = Insert(result,regs); } return regs; } Unit transferItems(Set
fileset) { while (hasNext(fileset)) { Pair
,File> nf = next(fileset); fileset = fst(nf); File file = snd(nf); FileSize tsize = fileContent(file); Fut
rp = job!command(AppendSearchFile); await rp?; Fut
> fs = job!processFile(fst(file)); await fs?; Maybe
content = fs.get; FileSize size = 0; if (isJust(content)) { size = fromJust(content); } if (size > tsize) { rp = job!command(OverwriteFile); await rp?; rp = job!processContent(file); await rp?; } else { // find out how much is still need to be replicated if (tsize - size > 0) { rp = job!command(ContinueFile); await rp?; file = file(fst(file),tsize - size); rp = job!processContent(file); await rp?; } else { rp = job!command(SkipFile); await rp?; } } } Fut
rp = job!command(EndSearchFile); await rp?; } }module ReplicationSystem.Server.SyncServerClientCoordinator; export SyncServerClientCoordinatorImpl; import * from ReplicationSystem.Environment.DataTypes; import * from ReplicationSystem.Environment.ReplicationSnapshot.Interfaces; import * from ReplicationSystem.Server.Interfaces; // CSP model CoordinatorProcess // Java class com.fredhopper.replication.server.SyncServerClientCoordinator [COG] class SyncServerClientCoordinatorImpl([Far] SyncServer server, Set
clients) implements SyncServerClientCoordinatorSpec { // Diagnostic only ... // Mimic internal choice //Bool internal = False; // Diagnostic only ... // Keep track of current list of clients (used for invariant checking) //Set
currentClients = EmptySet; // Diagnostic only ... //trace of clients activity //initialise with empty trace //List
> traces = Cons(Nil,Nil); Int count = 0; //Maybe
cps = Nothing; Bool shutDown = False; SyncServerAcceptor acceptor; Set
threads = EmptySet; [Far] ReplicationSnapshot snapshot; //take this out from the constructor so that it //can be modified Unit setSnapshot(ReplicationSnapshot snapshot) { this.snapshot = snapshot; } Unit process() { //commented out for abstraction /* // get SyncServerAcceptor Fut
fs = this.server!getAcceptor(); await fs?; this.acceptor = fs.get; Bool shutdown = this.isServerShutdownRequested(); while (~shutdown) { //try polling on the return boolean value Fut
fd = acceptor!isAcceptingConnection(); await fd?; Bool accept = fd.get; // There is a consideration about how long // a worker should have been working // This is abstracted in this model and // so we use a flag to model this. if (accept) { if (~ emptySet(threads) && internal) { Fut
unit = this.acceptor!suspendConnection(); await unit?; internal = False; } else { internal = True; } } else { if (emptySet(threads)) { Fut
unit = this.acceptor!resumingConnection(); await unit?; } } shutdown = this.isServerShutdownRequested(); } // Shutdown sequence await threads == EmptySet; Fut
unit = this.acceptor!resumingConnection(); await unit?;*/ } //Bool isServerShutdownRequested() { // Fut
fd = server!isShutdownRequested(); await fd?; // return fd.get; //} // Setting up a replication session Unit startReplicationUpdate(ConnectionThread thread) { threads = Insert(thread,threads); if (size(threads) == 1) { this.refreshSnapShot(); } } // Tidy up after a replication session Unit finishReplicationUpdate(ConnectionThread thread) { if (contains(threads,thread)) { if (size(threads) == 1) { this.clearSnapshot(); } threads = remove(threads,thread); } } Unit clearSnapshot() { Fut
unit = snapshot!clearSnapshot(); unit.get; } Unit refreshSnapShot() { count = count + 1; //for debug //Bool makeChange = this.isNotWaitingForMoreClient(); // advance check point only after data // has been replicated to all clients Fut
unit = snapshot!refreshSnapshot(); unit.get; // check point to see if any items has been reloaded /*TransactionId cp = this.getCurrentTransactionId(); if (cps == Nothing || cp > fromJust(cps)) { cps = Just(cp); }*/ // check point to see if an update has happened //Fut
rf = snapshot!hasUpdated(); //Bool up = rf.get; //shutDown = ~up; //reset the current client lists //if refresh is successful and is not waiting //for more clients. /*if (makeChange) { if (cps == Nothing || cp > fromJust(cps)) { cps = Just(cp); traces = appendright(traces,Nil); } if (~shutDown) { currentClients = clients; } else { unit = server!requestShutDown(); unit.get; } }*/ } /*ClientId getClientId([Far] ConnectionThread thread) { Fut
fid = thread!forClient(); return fid.get; } Unit removeClient(ConnectionThread thread) { ClientId id = this.getClientId(thread); this.currentClients = remove(currentClients,id); } Unit addClient(ConnectionThread thread) { ClientId id = this.getClientId(thread); assert ~contains(currentClients,id); currentClients= Insert(id,currentClients); }*/ /* * We need to introduce a mechanism to determine * the number of SyncClients that will be connected to * this implementation of the SyncServer * * Note that this method is not exposed through any interface */ /*Bool isNotWaitingForMoreClient() { //return this.currentClients == EmptySet; //need redesigning here! return True; }*/ TransactionId getCurrentTransactionId() { Fut
cf = snapshot!getIndexingId(); return cf.get; } /* * Indicate client specified by cid is finished its * replication session */ //Unit finishClient(ClientId cid) { // TransactionId cp = this.getCurrentTransactionId(); // currentClients = remove(currentClients,cid); // assert length(traces) > cp - 1; // traces = setAt(traces,appendright(nth(traces,cp-1),cid),cp-1); //} //Int getCount() { // return count; //} }module ReplicationSystem.Environment.Interfaces; export *; import * from ReplicationSystem.Environment.Files; import * from ReplicationSystem.Environment.DataTypes; /* * Common operations to all data base: * 1. get the file size (content) of a file (id) * 2. check if a file exists in this data base * 3. list all files in the data base * 4. get the content of the specified subdirectory. * 5. get the root directory of this data base */ interface DataBase { [Atomic] FileContent getContent(FileId fId); [Atomic] Bool hasFile(FileId fId); [Atomic] Set
listFiles(); [Atomic] Maybe
listFilesAt(FileId dir); [Atomic] Directory getRoot(); } /* * update this data base with a set of changes. */ interface UpdatableDataBase extends ServerDataBase { [Atomic] Unit update(Map
changes); [Atomic] TransactionHistories getTransactions(); } /* * A data base on the server can peform the following: * 1. refresh data base and returns the most recent transaction id */ interface ServerDataBase extends DataBase { [Atomic] TransactionId refresh(); } /* * A client data base cannot be refreshed * but can perform the following: * 1. prepare a new local client replication item; * 2. update both internal data base and file store with new local files * 3. get the last transaction ids */ interface ClientDataBase extends DataBase { [Atomic] Bool prepareReplicationItem(TransactionId p, Schedule schedule); [Atomic] Unit updateFile(FileId fId, FileSize size); [Atomic] Map
lastTransactionIds(); } interface Updater { Unit shutDown(); }module ReplicationSystem.Environment.DataBases; export *; import * from ReplicationSystem.Environment.DataTypes; import * from ReplicationSystem.Environment.Files; import * from ReplicationSystem.Environment.Interfaces; def Map
firstValues
(Map
> mp, B default) = case mp { EmptyMap => EmptyMap; InsertAssoc(Pair(x,ls),xs) => case ls { Nil => InsertAssoc(Pair(x,default),firstValues(xs,default)); Cons(y,ys) => InsertAssoc(Pair(x,y),firstValues(xs,default)); }; }; /* * A simple model of a database mimicking * a sequence of updates of files */ [Plain] class DataBaseImpl implements ServerDataBase, ClientDataBase, UpdatableDataBase { Int count = 0; //for debug // client data base Map
> transactions = EmptyMap; // server data base // history of transactions TransactionHistories histories = Nil; // server data base // current transaction Pair
> currentTransaction = Pair(-1,EmptyMap); // Begin with the root location (id = 0) Directory rdir = rootDir(); [Atomic] TransactionHistories getTransactions() { return histories; } [Atomic] Unit update(Map
changes) { rdir = updateDirWithContents(rdir,changes); currentTransaction = Pair(fst(currentTransaction) + 1,changes); histories = Cons(currentTransaction,histories); } [Atomic] TransactionId refresh() { count = count + 1; return fst(currentTransaction); } // Returns 0 if file not found. [Atomic] FileContent getContent(FileId qualified) { Maybe
result = Nothing; if (qualified == rootId()) { result = Just(getFileContent(Right(rdir))); } else { result = getFromEntryIn(rdir,qualified); } assert result != Nothing; return fromJust(result); } [Atomic] Bool hasFile(FileId qualified) { return hasQualifiedEntriesIn(rdir,qualified); } // Updates file store // ClientDataBase [Atomic] Directory getRoot() { return rdir; } [Atomic] Bool prepareReplicationItem(TransactionId p, Schedule schedule) { Bool result = False; String name = schedname(schedule); List
tids = lookupDefault(transactions,name,Nil); if (~ contains(set(tids),p)) { transactions = put(transactions,name,Cons(p,tids)); result = True; } return result; } [Atomic] Map
lastTransactionIds() { return firstValues(transactions,-1); } [Atomic] Unit updateFile(FileId qualified, FileSize size) { rdir = updateDirWithFile(rdir,file(qualified,size)); } [Atomic] Maybe
listFilesAt(FileId qualifiedDir) { return getFromEntryIn(rdir,qualifiedDir); } [Atomic] Set
listFiles() { Set
allqualified = getFileIdFromDir(rdir); return allqualified; } }module ReplicationSystem.Environment.DataTypes; export *; import * from ABS.DC; def List
concatenates
(List
> lists) = case lists { Nil => Nil; Cons(x,xs) => concatenate(x,concatenates(xs)); }; def Map
join
(Map
f, Map
g) = case g { EmptyMap => f; InsertAssoc(x,xs) => case contains(keys(f),fst(x)) { True => join(f,xs); False => InsertAssoc(x,join(f,xs)); }; }; data Command = StartSnapShot | EndSnapShot | ListSchedule | SearchSchedule(String ssname) | EndSearchFile | AppendSearchFile | ReceivePatternFile | SkipFile | ContinueFile | OverwriteFile | EmptyCommand; def String commandToString(Command c) = case c { StartSnapShot => "StartSnapShot"; EndSnapShot => "EndSnapShot"; ListSchedule => "ListSchedule"; SearchSchedule(ss) => "Schedule for " + ss; EndSearchFile => "EndSearchFile"; AppendSearchFile => "AppendSearchFile"; ReceivePatternFile => "ReceivePatternFile"; SkipFile => "SkipFile"; ContinueFile => "ContinueFile"; OverwriteFile => "OverwriteFile"; EmptyCommand => "EmptyCommand"; }; data JobType = Replication | Boot; data ReplicationItemType = SearchReplicationDirectory | LogReplicationItem | ReplicationFilePattern; type ClientId = Int; // CSP set CheckPoint // Java class com.fredhopper.search.fred.Checkpoint // For Java method com.fredhopper.replication.server.item.SearchReplicationDirectory.isValid(String, long) // type CheckPoint = Int; type TransactionId = Int; // Function on Maybe def A fromJustDefault
(Maybe
m, A a) = case m { Just(j) => j; Nothing => a; }; // Functions on set of pairs def Set
fsts
(Set
> ps) = case ps { EmptySet => EmptySet; Insert(x,xs) => Insert(fst(x),fsts(xs)); }; def Set
snds
(Set
> ps) = case ps { EmptySet => EmptySet; Insert(x,xs) => Insert(snd(x),snds(xs)); }; def Bool range(List
vals, Int limit, Bool strict) = case vals { Nil => ~strict; _ => let (Int r) = maximum(vals) - minimum(vals) in case strict { True => r == limit; False => r <= limit; }; }; def Int maximum(List
l) = case l { Cons(x,xs) => maximum0(xs,x); }; def Int maximum0(List
l, Int i) = case l { Nil => i; Cons(x,xs) => maximum0(xs,max(x,i)); }; //def Int min(Int a, Int b) = // case a < b { True => a; False => b; }; def Int minimum(List
l) = case l { Cons(x,xs) => minimum0(xs,x); }; def Int minimum0(List
l, Int i) = case l { Nil => i; Cons(x,xs) => minimum0(xs,min(x,i)); }; // Functions on set def Set
listToSet
(List
a) = case a { Nil => EmptySet; Cons(x,xs) => Insert(x,listToSet(xs)); }; def Map
setToMap
(Set
a, B b) = case a { EmptySet => EmptyMap; Insert(x,xs) => InsertAssoc(Pair(x,b),setToMap(xs,b)); }; // Take first i elements from list ss def List
take
(List
ss,Int i) = case i { 0 => Nil; _ => case ss { Nil => Nil; Cons(x,xs) => Cons(x,take(xs,i-1)); }; }; // Choose i elements from set ss def Set
choose
(Set
ss,Int i) = case i { 0 => EmptySet; _ => case ss { EmptySet => EmptySet; Insert(x,xs) => Insert(x,choose(xs,i-1)); }; }; // Choose i elements from map mp def Map
takeMap
(Map
mp,Int i) = case i { 0 => EmptyMap; _ => case mp { EmptyMap => EmptyMap; InsertAssoc(x,xs) => InsertAssoc(x,takeMap(xs,i-1)); }; }; // Functions on data Command def Bool isAppendCommand(Command c) = case c { SkipFile => True; ContinueFile => True; OverwriteFile => True; _ => False; }; // Insert 'a' at position i of list. def List
setAt
(List
list, A a, Int i) = case list { Nil => Nil; Cons(p,l) => case i { 0 => Cons(a,l); _ => Cons(p,setAt(l,a,i-1)); }; }; def Bool setEquals
(Set
s, Set
t) = size(s) == size(t) && subset(s,t); // t is a subset of s def Bool subset
(Set
s, Set
t) = case t { EmptySet => True; Insert(x,xs) => case contains(s,x) { True => subset(s,xs); False => False; }; }; // def Int pow(Int b, Int e) = case e { 0 => 1; _ => b * pow(b,e-1); }; // String operations // c must have length 1 currently // COSTABS ANNOTATION // Here termsize(Cons(a,b)) = 1 + termsize(a) + termsize(b); // termsize(Nil) = 1; [text >= 1][result() >= 1][result() <= 2*text] def List
split(String text, String c) = case strlen(text) == 0 { True => Nil; False => split2(tailStr(text),c,Cons(headStr(text),Nil)); }; def List
split2(String text, String c, List
result) = case result { Cons(h,t) => case strlen(text) == 0 { True => reverse(result); False => let (String hd) = headStr(text) in case hd == c { True => split2(tailStr(text),c,Cons("",result)); False => split2(tailStr(text),c,Cons(h + hd,t)); }; }; }; def String headStr(String text) = substr(text,0,1); def String tailStr(String text) = substr(text,1,strlen(text)-1); // if list2 is a prefix of list1 def Bool isPrefix
(List
list1, List
list2) = case list2 { Nil => True; Cons(l,ll) => case list1 { Nil => False; Cons(m,mm) => (l == m) && isPrefix(mm,ll); }; }; // COSTABS ANNOTATION [s >= 0][result() <= 2*s+1] def List
stringToChar(String s) = let (Int l) = strlen(s) in case l == 0 { True => Nil; False => Cons(headStr(s),stringToChar(tailStr(s))); }; def Bool isPrefixText(String s1, String s2) = isPrefix(stringToChar(s2),stringToChar(s1)); def Bool filter(String pattern, String text) = isPrefixText(pattern,text); // filters a set of strings against some pattern def Set
filters(String pattern, Set
ts) = case ts { EmptySet => EmptySet; Insert(l,ls) => case filter(pattern,l) { True => Insert(l,filters(pattern,ls)); False => filters(pattern,ls); }; }; data DCData = InfCPU | CPU(Int cpu); //DeploymentComponent def Set
modifyCPU(Set
ds, Int cpu) = case ds { EmptySet => Insert(CPU(cpu),EmptySet); Insert(x,xs) => case x { CPU(_) => Insert(CPU(cpu),xs); _ => Insert(x,modifyCPU(xs,cpu)); }; }; module ReplicationSystem.Environment.Files; export *; import * from ABS.Scheduler; import * from ReplicationSystem.Environment.DataTypes; // transaction histories type TransactionHistories = List
; type Transaction = Pair
>; data JobData = JobData( String jschedname, //schedule name Int waitperiod, //waiting period Int jdeadline, //deadline Int jcost, //cost Int beforetime, //remaining time before execution Int deadlineafter, //deadline after execution Int totaltime, //total time spent Int jobid //id ); def Maybe
updateJobData(Maybe
jd, Int cost, Int currentDeadline, Time current) = case jd { Just(JobData(a,b,c,d,e,f,g,h)) => Just(JobData(a,b,c,cost,e,currentDeadline,abs(timeValue(current)-g),h)); }; // test data type TestData = Map
>; // CSP set FileId // Used for identifying the file to be replicated type FileId = String; // CSP set FileSize // Java method java.io.File.length() // Used for identifying the state of the client-side file type FileSize = Int; // CSP name type File type File = Pair
; type Directory = Pair
; // CSP name type Item // Java class com.fredhopper.replication.server.item.ServerReplicationItem type ReplicationItem = Pair
>; // A File system structure // internally file entry is organised hierarchically // e.g. file 123 is organised 1/2/123 where 1 and 2 are directories // default root directory id is 0 // This will be extended once we allowed file ids as strings like "12/34/56". type FileEntry = Map
; // Get a set of entries from this map def Set
> entrySet
(Map
m) = case m { EmptyMap => EmptySet; InsertAssoc(x,xs) => Insert(x,entrySet(xs)); }; // qualify an entry with the specified path. def Pair
qualifyEntry(Pair
e, FileId path) = case isDirectory(snd(e)) { True => right(fromJust(qualify(Just(Right(e)),path))); False => left(fromJust(qualify(Just(Left(e)),path))); }; def Map
qualifyFileEntry(Map
m, FileId path) = case m { EmptyMap => EmptyMap; InsertAssoc(x,xs) => InsertAssoc(qualifyEntry(x,path),qualifyFileEntry(xs,path)); }; def Map
schedulemap(Schedules ss) = case ss { EmptySet => EmptyMap; Insert(x,xs) => case x { NoSchedule => schedulemap(xs); _ => InsertAssoc(Pair(schedname(x),x),schedulemap(xs)); }; }; def String businesses() = "Business rules"; def String datas() = "Data"; def String search() = "Search"; // Java com.fredhopper.replication.server.SyncServerSchedule data Schedule = Schedule( String schedname, List
items, Int sched, Deadline dline) | NoSchedule; data Item = SearchItem(FileId) | //top level directory FileItem(FileId, String) | //top level directory, pattern, checkpoint LogItem(FileId); //for now log item is the same as search item def Bool isSearchItem(Item s) = case s { SearchItem(_) => True; _ => False; }; def Bool isFileItem(Item s) = case s { FileItem(_,_) => True; _ => False; }; def Bool isLogItem(Item s) = case s { LogItem(_) => True; _ => False; }; def Pair
, Schedule> getScheduleSet(Set
ss, String n) = getSSet(ss, ss, n); def Pair
, Schedule> getSSet(Set
re, Set
ss, String n) = case ss { EmptySet => Pair(re, NoSchedule); Insert(x,xs) => case schedname(x) == n { True => Pair(remove(re, x), x); False => getSSet(re, xs, n); }; }; def Maybe
getSchedule(List
ss, String n) = case ss { Nil => Nothing; Cons(x,xs) => case schedname(x) == n { True => Just(x); False => getSchedule(xs,n); }; }; /* @param ss a list of existing schedules, @param ts a map of schedule name to scheduling and deadlines @param im a list of schedule name to replication items @return a list of schedules ss' with replication items described in im added @ensures (\forall Pair
> i; contains(listToSet(im),i) ==> \exists Schedule s; contains(listToSet(\result),s) && schedname(s) == fst(i)); */ def List
itemMapToSchedule( List
ss, Map
> ts, List
>> im) = case im { Nil => ss; Cons(Pair(x,y),xs) => let (Maybe
s) = getSchedule(ss,x) in case s { Just(k) => itemMapToSchedule( Cons(Schedule(schedname(k),concatenate(y,items(k)),sched(k),dline(k)), without(ss,k)),removeKey(ts,x),xs); Nothing => let (Pair
p) = lookupUnsafe(ts,x) in itemMapToSchedule( Cons(Schedule(x,y,fst(p),snd(p)),ss), removeKey(ts,x),xs); }; }; def Schedules insertReplicationItemsTo(Schedules ss, String name, List
items) = case ss { EmptySet => EmptySet; Insert(x,xs) => case schedname(x) == name { True => Insert(insertReplicationItems(x,items),xs); False => Insert(x,insertReplicationItemsTo(xs,name,items)); }; }; def Schedule insertReplicationItems(Schedule s, List
items) = case s { Schedule(n,ll,d,e) => Schedule(n,concatenate(ll,items),d,e); }; def Set
scheduleItems(Schedules ss) = case ss { EmptySet => EmptySet; Insert(x,xs) => union(listToSet(items(x)),scheduleItems(xs)); }; def Either
> item(Item s) = case s { SearchItem(i) => Left(i); FileItem(i,r) => Right(Pair(i,r)); LogItem(i) => Left(i); }; // a set of schedules type Schedules = Set
; // seems cannot use type synonym Either data FileContent = Content(FileSize content) | Entries(FileEntry entries) | NoContent; // if id1 is an ancester of id2 def Bool isAncester(FileId id1, FileId id2) = isPrefix(deroot(split(id2,fileSep())),deroot(split(id1,fileSep()))); def List
deroot(List
path) = let (FileId r) = rootId() in case path { Cons(r,ps) => ps; _ => path; }; def File file(FileId i, FileSize s) = Pair(i,Content(s)); def Directory rootDir() = emptyDir(rootId()); def Directory emptyDir(FileId i) = Pair(i,Entries(EmptyMap)); def Directory dir(FileId i, FileEntry e) = Pair(i,Entries(e)); def String fileSep() = "/"; def FileId rootId() = "root"; def Bool isFile(FileContent c) = case c { Content(_) => True; _ => False; }; def Bool isDirectory(FileContent c) = ~isFile(c); def List
nameToPath(String name) = split(name,fileSep()); // partial def FileSize fileContent(File f) = content(snd(f)); // partial def FileEntry dirContent(Directory f) = entries(snd(f)); def FileId getFileId(Either
f) = case f { Left(Pair(id,_)) => id; Right(Pair(id,_)) => id; }; def FileContent getFileContent(Either
f) = case f { Left(Pair(_,s)) => s; Right(Pair(_,fs)) => fs; }; def Either
makeContent(Pair
content) = case isFile(snd(content)) { True => Left(content); False => Right(content); }; // given a/b and c returns a/b/c def FileId makePath(FileId dir, FileId f) = dir + fileSep() + f; def FileId makePaths(List
fs) = case fs { Nil => ""; Cons(f,Nil) => f; Cons(f,gs) => f + fileSep() + makePaths(gs); }; // given a/b/c returns (a/b,c) def Pair
splitFileId(FileId f) = Pair(dirName(f),fileName(f)); // given a/b/c returns c def FileId fileName(FileId f) = head(reverse(split(f,fileSep()))); // given a/b/c returns a/b def FileId dirName(FileId f) = makePaths(reverse(tail(reverse(split(f,fileSep()))))); // get fully qualified file ids from the suppied directory recursively def Set
getFileIdFromDir(Directory d) = case snd(d) { Entries(e) => case fst(d) == rootId() { True => getFileIdFromEntries1(e); False => getFileIdFromEntries(fst(d),e); }; }; def Set
getFileIdFromEntries1(FileEntry fe) = case fe { EmptyMap => EmptySet; InsertAssoc(Pair(i,c),fs) => case isFile(c) { True => Insert(i,getFileIdFromEntries1(fs)); False => union(getFileIdFromEntries(i,entries(c)),getFileIdFromEntries1(fs)); }; }; def Set
getFileIdFromEntries(FileId id, FileEntry fe) = case fe { EmptyMap => EmptySet; InsertAssoc(Pair(i,c),fs) => case isFile(c) { True => Insert(makePath(id,i),getFileIdFromEntries(id,fs)); False => union(getFileIdFromEntries(makePath(id,i),entries(c)),getFileIdFromEntries(id,fs)); }; }; // find a file given the file name def Bool hasEntriesIn(Directory d, FileId id) = case snd(d) { Entries(e) => hasEntry(e,id); }; def Bool hasEntry(FileEntry f, FileId id) = isJust(findFromEntry(f,id)); // find either a file or a directory (if it exists) given // the file name def Maybe
> findFromEntryIn(Directory d, FileId id) = case snd(d) { Entries(e) => findFromEntry(e,id); }; // find either a file or a directory (if it exists) given // the file name def Maybe
> findFromEntry(FileEntry f, FileId id) = case contains(keys(f),id) { True => case lookupUnsafe(f,id) { Content(s) => makeMaybeEitherValue(True,id,Content(s)); // leaf Entries(e) => makeMaybeEitherValue(False,id,Entries(e)); // leaf }; False => case f { InsertAssoc(Pair(i,Content(_)),fm) => findFromEntry(fm,id); InsertAssoc(Pair(i,Entries(g)),fm) => case findFromEntry(g,id) { Nothing => findFromEntry(fm,id); //next path r => qualify(r,i); }; EmptyMap => Nothing; // end of listing }; }; // prefix id of 'r' with 'path' def Maybe
> qualify(Maybe
> r, FileId path) = case r { Just(h) => let (FileId hi) = makePath(path,getFileId(h)) in let (FileContent hc) = getFileContent(h) in case h { Left(_) => makeMaybeEitherValue(True,hi,hc); Right(_) => makeMaybeEitherValue(False,hi,hc); }; Nothing => Nothing; }; def Maybe
> makeMaybeEitherValue(Bool isfile, FileId id, FileContent c) = case isfile { True => Just(Left(Pair(id,c))); _ => Just(Right(Pair(id,c))); }; def Bool hasQualifiedEntriesInList(Directory d, List
qualified) = case snd(d) { Entries(e) => hasQualifiedEntryInList(e,qualified); }; def Bool hasQualifiedEntryInList(FileEntry f, List
qualified) = isJust(getFromEntryList(f,qualified)); // find a file given its fully qualified path def Bool hasQualifiedEntriesIn(Directory d, FileId qualified) = case snd(d) { Entries(e) => hasQualifiedEntry(e,qualified); }; def Bool hasQualifiedEntry(FileEntry f, FileId qualified) = isJust(getFromEntry(f,qualified)); // get content at the specified fully qualified path // content may be the size of a file at the specified fully qualified path // or the set of files (filename-size pairs) contained in the directory at // the specified fully qualified path def Maybe
getFromEntryIn(Directory d, FileId qualified) = getFromEntryInList(d, split(qualified,fileSep())); def Maybe
getFromEntryInList(Directory d, List
qualified) = case snd(d) { Entries(e) => case fst(d) == rootId() { True => getFromEntryList(e,qualified); //root id '0' is disregard _ => getFromEntryList(InsertAssoc(d,EmptyMap),qualified); }; }; // get the content (if it exists) from a fully qualified path def Maybe
getFromEntry(FileEntry entry, FileId qualified) = getFromEntryList(entry, split(qualified,fileSep())); def Maybe
getFromEntryList(FileEntry entry, List
paths) = if length(paths) <= 0 then Nothing else if ~contains(keys(entry),head(paths)) then Nothing else let (FileContent cc) = lookupUnsafe(entry,head(paths)) in if length(tail(paths)) == 0 then Just(cc) // at node else case cc { // qualified is of form 'a/b/...' // but at this level 'a' is a file and not a directory Content(_) => Nothing; // else we are on the right track // go to the subdirectory Entries(e) => getFromEntryList(e,tail(paths)); }; def Directory updateDirWithContent(Directory d, FileId i, FileContent c) = updateDirWith(d,makeContent(Pair(i,c))); def Directory updateDirWithContents(Directory d, Map
contents) = case contents { EmptyMap => d; InsertAssoc(Pair(i,c),cs) => updateDirWithContents(updateDirWithContent(d,i,c),cs); }; // Update a directory with a file (a file is a pair of full qualified path and size) def Directory updateDirWithFile(Directory d, File f) = updateDirWith(d,Left(f)); // Update a directory with a directory def Directory updateDirWithDir(Directory d, Directory f) = updateDirWith(d,Right(f)); // Update a directory with either a directory or a file // COSTABS ANNOTATION [d >= 0][f>=0][result() >=0][result() <= d+2*f+1] def Directory updateDirWith(Directory d, Either
f) = case snd(d) { Entries(e) => Pair(fst(d),Entries(updateFile(e,f))); }; def FileEntry updateFile(FileEntry fe, Either
f) = updateFile1(fe,getFileContent(f),deroot(split(getFileId(f),fileSep()))); def FileEntry updateFile1(FileEntry fe, FileContent c, List
path) = case path { Nil => fe; Cons(p,Nil) => put(fe,p,c); Cons(p,ps) => case contains(keys(fe),p) { True => case lookupUnsafe(fe,p) { Entries(dc) => put(fe,p,Entries(updateFile1(dc,c,ps))); //dir _ => put(fe,p,create(ps,c)); //file }; False => put(fe,p,create(ps,c)); }; }; def FileContent create(List
path, FileContent c) = case path { Cons(p,Nil) => Entries(InsertAssoc(Pair(p,c),EmptyMap)); Cons(p,ps) => Entries(InsertAssoc(Pair(p,create(ps,c)),EmptyMap)); }; //apply all changes from th upto and include id to d //histories must be in accending order [(1,c1),(2,c2),...,(n,cn)] def Directory applyChanges(Directory d, TransactionHistories th, TransactionId id) = case th { Nil => d; Cons(x,xs) => case fst(x) <= id { True => applyChanges(updateDirWithContents(d,snd(x)),xs,id); False => d; }; };module ReplicationSystem.Installation; import * from ReplicationSystem.Main; { new ReplicationSystemMain(); }module ReplicationSystem.Interfaces; export *; import * from ReplicationSystem.Environment.Interfaces; import * from ReplicationSystem.Environment.Files; import * from ReplicationSystem.Environment.DataTypes; // Tester interface Tester { Unit analyse(); } // Common Interfaces interface Commandee { Unit command(Command command); } interface Worker extends Commandee { /* * Existing java implementation does not have client id * the notion of an identifier for each client is required * since the ABS model should guarantee data * consistency as well as deadlock freedom */ ClientId forClient(); } // One can shut down a node or ask if the node has been shut down. // Both client and server are nodes interface Node { /* view shutdown { * call_Node.isShutdownRequested i, * call_Node.requestShutDown r * } */ /* * START ::= F * F ::= i F' * | r T' * T ::= i T' * | r T' */ DataBase getDataBase(); Bool isShutdownRequested(); Unit requestShutDown(); }module ReplicationSystem.Main; export *; import * from ReplicationSystem.Environment.DataTypes; import * from ReplicationSystem.Environment.Files; import * from ReplicationSystem.ReplicationSystem; class ReplicationSystemMain { [Final] List
>> businessItems = list[Pair(businesses(),list[FileItem("config","config/business.xml")])]; [Final] List
>> dataItems = list[Pair(datas(),list[LogItem("indices/itemstore/log"),LogItem("indices/tree/log")]), Pair(datas(),list[FileItem("indices/itemstore","indices/itemstore/i"), FileItem("indices/tree","indices/tree/t")])]; [Final] List
>> searchItems = list[Pair(search(),list[SearchItem("indices/search")])]; [Final] Map
> scheduleMap = map[Pair(businesses(), Pair(0,Duration(10))), Pair(datas(), Pair(0,Duration(10))), Pair(search(), Pair(0,Duration(10)))]; Unit run() { List
schedules = this.getSchedules(); Set
cids = this.getCids(); Int maxJobs = this.getMaxJobs(); Int maxUpdates = this.getMaxUpdates(); new ReplicationSystem(maxUpdates,schedules,maxJobs,cids); } Int getMaxJobs() { return 7; } Int getMaxUpdates() { return 10; } Map
> getScheduleMap() { return scheduleMap; } List
getSchedules() { Map
> m = this.getScheduleMap(); return itemMapToSchedule(Nil,m,concatenates(list[searchItems,businessItems,dataItems])); } Set
getCids() { Set
cs = EmptySet; Int c = this.getNumberOfClients(); while (c > 0) { cs = Insert(c,cs); c = c - 1; } return cs; } Int getNumberOfClients() { return 1; } }module Replication.Network; export *; import * from ReplicationSystem.Client.Interfaces; import * from ReplicationSystem.Server.Interfaces; import * from ReplicationSystem.Environment.Interfaces; import * from ReplicationSystem.Tests; import * from ReplicationSystem.Environment.Files; interface Network { Unit shutDown(ClientConnector client); } class Network( [Far] [Final] SyncServer server, Set<[Far] ClientConnector> clients, [Far] Updater updater ) implements Network { Bool ready = False; Set
testers = EmptySet; Unit run() { Set
cs = clients; while (hasNext(cs)) { Pair
,ClientConnector> nt = next(cs); Tester tester = new TesterImpl(server,snd(nt)); testers = Insert(tester,testers); cs = fst(nt); } ready = True; } Unit shutDown(ClientConnector client) { await ready; clients = remove(clients,client); if (clients == EmptySet) { Fut
ss = updater!shutDown(); ss.get; ss = server!requestShutDown(); ss.get; Set
ts = testers; while (hasNext(ts)) { Pair
,Tester> nt = next(ts); Tester tester = snd(nt); tester!analyse(); ts = fst(nt); } } } }module ReplicationSystem.Server.ReplicationSnapshot; export *; import * from ReplicationSystem.Environment.Files; import * from ReplicationSystem.Environment.DataTypes; import * from ReplicationSystem.Environment.Interfaces; import * from ReplicationSystem.Environment.ReplicationSnapshot.Interfaces; import SearchDirectoryItem from ReplicationSystem.Server.ReplicationItem; //For testing import * from ReplicationSystem.Server.BaseReplicationItem; class ReplicationSnapshotImpl( ServerDataBase db, Schedules schedules) implements ReplicationSnapshot { Int count = 0; Int update = 0; //the transaction id after refreshing snapshot; TransactionId tid = -1; // if snapshot is cleaned Bool clean = True; Map
> repItems = EmptyMap; Set
getItems(String name) { return lookupDefault(repItems,name,EmptySet); } /* * Updating replication snapshot */ [Atomic] Unit refreshSnapshot() { count = count + 1; //for debug if (clean) { tid = db.refresh(); update = update + 1; //for debug this.createReplicationItems(); Set
names = keys(repItems); while (hasNext(names)) { Pair
,String> nn = next(names); Set
titems = lookupUnsafe(repItems,snd(nn)); while (hasNext(titems)) { Pair
,ServerReplicationItem> ni = next(titems); ServerReplicationItem item = snd(ni); item.refresh(); titems = fst(ni); } names = fst(nn); } clean = False; } } [Atomic] Unit createReplicationItems() { Schedules tsc = schedules; while (hasNext(tsc)) { Pair
ns = next(tsc); this.replicationItems(snd(ns)); tsc = fst(ns); } } [Atomic] Unit replicationItems(Schedule schedule) { List
is = items(schedule); Set
sitems = EmptySet; while (is != Nil) { ServerReplicationItem r = this.replicationItem(head(is)); sitems = Insert(r,sitems); is = tail(is); } repItems = InsertAssoc(Pair(schedname(schedule),sitems),repItems); } [Atomic] ServerReplicationItem replicationItem(Item i) { ServerReplicationItem item = null; if (isSearchItem(i)) { item = new local SearchDirectoryItem(left(item(i)),this.db); } //for testing if (item == null && isFileItem(i)) { Pair
it = right(item(i)); item = new local ReplicationFilePattern(fst(it),snd(it),this.db); } //for testing if (item == null && isLogItem(i)) { item = new local ReplicationLogItem(left(item(i)),this.db); } return item; } //Clear snapshot Unit clearSnapshot() { repItems = EmptyMap; clean = True; } Int getIndexingId() { return tid; } }module ReplicationSystem.ReplicationSystem; export *; import * from ReplicationSystem.Environment.DataTypes; import * from ReplicationSystem.Environment.Files; import * from ReplicationSystem.Environment.Interfaces; import * from ReplicationSystem.Server.Interfaces; import * from ReplicationSystem.Client.Interfaces; import * from ReplicationSystem.Environment.DataBases; import * from ReplicationSystem.Client; import * from ReplicationSystem.Server; import * from ReplicationSystem.Server.Update; import * from Replication.Network; import * from ReplicationSystem.Tests; //ReplicationSystem simulator //@param maxUpdates maximum number of updates //@param schedules list of schedules //@param cids list of clients class ReplicationSystem( [Final] Int maxUpdates, [Final] List
schedules, [Final] Int maxJobs, [Final] Set
cids) { Unit run() { // One SyncServer SyncServer syncserver = new SyncServerImpl( listToSet(schedules), cids); Set<[Far] ClientConnector> syncclients = EmptySet; Set
iterator = cids; while (hasNext(iterator)) { Pair
,ClientId> nt = next(iterator); ClientConnector syncclient = new SyncClientImpl(maxJobs,snd(nt)); syncclients = insertElement(syncclients,syncclient); iterator = fst(nt); } //inject changes Updater updater = new UpdaterImpl(maxUpdates,syncserver); Network network = new Network(syncserver,syncclients,updater); Fut
acc = syncserver!getAcceptor(); await acc?; [Far] SyncServerAcceptor acceptor = acc.get; Set
clientIterator = syncclients; while (hasNext(clientIterator)) { Pair
,ClientConnector> nt = next(clientIterator); ClientConnector syncclient = snd(nt); //make sure clients have access to the network Fut
fu = syncclient!setNetwork(network); fu.get; syncclient!setAcceptor(acceptor); clientIterator = fst(nt); } } } module ReplicationSystem.Schedulers; export *; import * from ABS.Scheduler; import * from ReplicationSystem.Environment.Files; //lenient deadlines consume less resource? def Int defaultScheduleCost(Schedule s) = case s { NoSchedule => 1; Schedule(n,_,_,i) => case n { "Data" => 6;// - (durationValue(i)/100 * 5); "Business rules" => 6;// - (durationValue(i)/100 * 3); "Search" => 6;// - (durationValue(i)/100 * 2); }; }; // according to schedule priorities def Process scheduleHighestCostScheduler(List
q) = scheduleCostSchedulerH(head(q),tail(q)); def Process scheduleCostSchedulerH(Process h, List
t) = case t { Nil => h; Cons(h2,t2) => case value(h) > value(h2) { True => scheduleCostSchedulerH(h,t2); False => scheduleCostSchedulerH(h2,t2); }; }; def Process scheduleLoweestCostScheduler(List
q) = scheduleCostSchedulerL(head(q),tail(q)); def Process scheduleCostSchedulerL(Process h, List
t) = case t { Nil => h; Cons(h2,t2) => case value(h) < value(h2) { True => scheduleCostSchedulerL(h,t2); False => scheduleCostSchedulerL(h2,t2); }; }; //earliest deadline first scheduling def Process edf(List
q) = edfH(head(q),tail(q)); def Process edfH(Process h, List
t) = case t { Nil => h; Cons(h2,t2) => case durationLessThan(procDeadline(h),procDeadline(h2)) { True => edfH(h,t2); False => edfH(h2,t2); }; }; //highest cost first def Process hcf(List
q) = hcfH(head(q),tail(q)); def Process hcfH(Process h, List
t) = case t { Nil => h; Cons(h2,t2) => case durationLessThan(cost(h),cost(h2)) { False => hcfH(h,t2); True => hcfH(h2,t2); }; };module ReplicationSystem.Server.ReplicationItem; export *; import * from ReplicationSystem.Environment.Interfaces; import * from ReplicationSystem.Environment.ReplicationSnapshot.Interfaces; import * from ReplicationSystem.Environment.DataTypes; import * from ReplicationSystem.Environment.Files; import * from ReplicationSystem.Server.BaseReplicationItem; // @param qualified is an absolute path from 'root' in db // @param db points to the database that stores the whole file structure and it responsible for update class SearchDirectoryItem(FileId qualified, ServerDataBase db) implements ServerReplicationItem { InternalItem internal; { internal = new local BasicReplicationItemImpl(qualified,db); } FileEntry getContents() { return internal.getContents(); } Command getCommand() { return AppendSearchFile; } ReplicationItemType getType() { return SearchReplicationDirectory; } FileId getAbsoluteDir() { return internal.getAbsoluteDir(); } [Atomic] Unit refresh() { //We know snapshot cannot be access during the execution of this method //by another task. Directory snapshot = internal.getState(); //get content for this replication item Maybe
ffs = db.listFilesAt(qualified); if (ffs != Nothing) { FileContent content = fromJust(ffs); assert isDirectory(content); //it must be a directory snapshot = updateDirWithDir(snapshot,dir(qualified,entries(content))); } internal.setState(snapshot); } [Atomic] Unit cleanup() { internal.cleanup(); } }module ReplicationSystem.Server.Interfaces; export *; import * from ReplicationSystem.Environment.Interfaces; import * from ReplicationSystem.Environment.Files; import * from ReplicationSystem.Environment.DataTypes; import * from ReplicationSystem.Environment.ReplicationSnapshot.Interfaces; import ClientJob from ReplicationSystem.Client.Interfaces; import Node, Worker from ReplicationSystem.Interfaces; // CSP model CoordinatorProcess // Java class com.fredhopper.replication.server.SyncServerClientCoordinator interface SyncServerClientCoordinator { /* view coordinator { * call_SyncServerClientCoordinator.startReplicationUpdate s, * call_SyncServerClientCoordinator.finishReplicationUpdate f * } */ /* * START ::= S START.threads = S.threads * S ::= s S' S.threads = insertElement(s.worker,S'.threads) * | f S' S.threads = remove(f.worker,S'.threads) * | /\ threads = EmptySet */ Unit process(); //@ requires z = coordinator.threads(); //@ ensures equals(z,insertElement(worker,coordinator.threads())); Unit startReplicationUpdate(ConnectionThread worker); //@ requires z = coordinator.threads(); //@ ensures equals(z,remove(worker,coordinator.threads())); Unit finishReplicationUpdate(ConnectionThread worker); /* * Indicate client specified by cid is finished its * replication session */ //Unit finishClient(ClientId cid); /* * Setting snapshot. */ Unit setSnapshot(ReplicationSnapshot snapshot); } interface ConcurrentSyncServerClientCoordinator extends SyncServerClientCoordinator { /* * Setting snapshots. */ Unit setSnapshots(Map
snapshots); Unit startReplicationUpdateFor(Schedule s, ConnectionThread worker); Unit finishReplicationUpdateFor(Schedule s, ConnectionThread worker); //TransactionId getCurrentTransactionIdFor(Schedule s); } interface ConcurrentSyncServerClientCoordinatorSpec extends ConcurrentSyncServerClientCoordinator, SyncServerClientCoordinatorSpec { } // unexposed interface for specification interface SyncServerClientCoordinatorSpec extends SyncServerClientCoordinator { /* view coordSpec { * call_SyncServerClientCoordinatorSpec.refreshSnapShot r, * call_SyncServerClientCoordinatorSpec.clearSnapshot c * } */ /* * START ::= S START.threads = S.threads * S ::= s S' S.threads = insertElement(s.worker,S'.threads) * | s r S' S.threads = insertElement(s.worker,S'.threads) * | f S' S.threads = remove(f.worker,S'.threads) * | f c S' S.threads = remove(f.worker,S'.threads) * | /\ threads = EmptySet */ //@ requires size(coordSpec.threads()) == 1; Unit refreshSnapShot(); //@ requires size(coordSpec.threads()) == 0; Unit clearSnapshot(); } interface InternalCoordinator { Unit process(); } // CSP model AcceptorThreadRun(t) // Java class com.fredhopper.replication.server.SyncServerAcceptorThread interface ServerAcceptor { [Far] ConnectionThread getConnection(ClientJob job); Unit finish(ConnectionThread thread); } interface SyncServerAcceptor extends ServerAcceptor { /* view syncaccepthist { * resolve_ServerAcceptor.getConnection g, * resolve_SyncServerAcceptor.isAcceptingConnection a, * call_SyncServerAcceptor.suspendConnection s, * call_SyncServerAcceptor.resumingConnection f * } */ /* * START ::= R START.suspend = False; * R ::= a R a.retVal = ~ R.suspend; R.suspend = R.suspend; * | s S R.suspend = True; * | r R' R.suspend = False; * | g R' g.retVal = new ConnectionThread(g.job); R.suspend = R'.suspend; * S ::= a S a.retVal = S.suspend; S.suspend = S.suspend; * | s S S.suspend = True; * | r R S.suspend = False; * | g S' g.retVal = null; S.suspend = S'.suspend; */ //@ ensures result == syncaccepthist.suspend(); //Bool isAcceptingConnection(); //@ ensures syncaccepthist.suspend() == True; //Unit suspendConnection(); //@ ensures syncaccepthist.suspend() == False; //Unit resumingConnection(); } //syncserver in the concurrent setting interface ConcurrentSyncServer extends SyncServer { [Near] ReplicationSnapshot getReplicationSnapshotFor(Schedule schedule); [Far] ConcurrentSyncServerClientCoordinator getConcurrentCoordinator(); } // Java class com.fredhopper.replication.server.ConnectionThread // CSP model ConnectionThreadRun(n) interface ConnectionThread extends Worker { } // Exposes schedule for testing interface ServerNode extends Node { UpdatableDataBase getUpdatableDataBase(); Schedules listSchedules(); Schedule getSchedule(String name); } // CSP model SyncServer(n) // Java class com.fredhopper.application.SyncServer // Java class com.fredhopper.replication.server.ReplicationSnapshot interface SyncServer extends ServerNode { SyncServerAcceptor getAcceptor(); [Far] SyncServerClientCoordinator getCoordinator(); [Near] ReplicationSnapshot getReplicationSnapshot(); } interface InternalServer extends SyncServer { Unit debug(); }module ReplicationSystem.Server; export *; import * from ReplicationSystem.Environment.Files; import * from ReplicationSystem.Environment.DataTypes; import * from ReplicationSystem.Environment.ReplicationSnapshot.Interfaces; import * from ReplicationSystem.Server.Interfaces; import * from ReplicationSystem.Environment.ReplicationSnapshot.Interfaces; import * from ReplicationSystem.Environment.Interfaces; import * from ReplicationSystem.Server.SyncServerAcceptor; import * from ReplicationSystem.Server.SyncServerClientCoordinator; import * from ReplicationSystem.Environment.DataBases; import * from ReplicationSystem.Server.ReplicationSnapshot; [COG] class SyncServerImpl( Schedules schedules, Set
clients) implements SyncServer { Bool shutDown = False; [Far] SyncServerClientCoordinator coordinator; [Near] SyncServerAcceptor acceptor; [Near] UpdatableDataBase db; [Near] ReplicationSnapshot snapshot; Map
scheduleMap = schedulemap(schedules); { db = new local DataBaseImpl(); snapshot = new local ReplicationSnapshotImpl(db,schedules); } Unit run() { coordinator = new SyncServerClientCoordinatorImpl(this,clients); Fut
f = coordinator!setSnapshot(snapshot); f.get; acceptor = new local SyncServerAcceptorImpl(this); } DataBase getDataBase() { return db; } Schedule getSchedule(String name) { assert contains(keys(scheduleMap),name); return lookupUnsafe(scheduleMap,name); } Schedules listSchedules() { return schedules; } ReplicationSnapshot getReplicationSnapshot() { return snapshot; } UpdatableDataBase getUpdatableDataBase() { return db; } Bool isShutdownRequested() { return shutDown; } Unit requestShutDown() { this.shutDown = True; } SyncServerClientCoordinator getCoordinator() { await coordinator != null; return this.coordinator; } SyncServerAcceptor getAcceptor() { await acceptor != null; return this.acceptor; } }module ReplicationSystem.Environment.ReplicationSnapshot.Interfaces; export *; import * from ReplicationSystem.Environment.Files; import * from ReplicationSystem.Environment.DataTypes; interface ReplicationSnapshot { /* * view snapshotspec { * call_ReplicationSnapshot.refreshSnapshot r, * call_ReplicationSnapshot.clearSnapshot c, * call_ReplicationSnapshot.getItems i * } */ /* * START ::= R START.updated = S.updated * R ::= r C S.updated = True; * | /\ S.updated = False; * C ::= i C S.updated = True; * | c R S.updated = False; */ [Atomic] Unit refreshSnapshot(); /* * Cleaning replication snapshot */ Unit clearSnapshot(); Int getIndexingId(); Set
getItems(String name); /* * Only exists at model level */ //@ ensures \result == snapshotspec.updated(); //Bool hasUpdated(); } interface BasicReplicationItem { FileEntry getContents(); [Atomic] Unit cleanup(); FileId getAbsoluteDir(); } /* * Represents an item to be replicated to * the sync clients. Global for the SyncServer * * Items could be abstracted as data type * but data types cannot be modified by deltas! */ interface ServerReplicationItem extends BasicReplicationItem { Command getCommand(); ReplicationItemType getType(); [Atomic] Unit refresh(); }module ReplicationSystem.Environment.StateMachine; export *; import * from ReplicationSystem.Environment.Files; data State = Start | WaitToBoot | Booting | WaitToReplicate| WorkOnReplicate | End | ManyWaitToReplicate(Schedule) | ManyWorkOnReplicate(Schedule) | ManyEnd(Schedule); def String stateToString(State s) = case s { Start => "Start"; WaitToBoot => "WaitToBoot"; Booting => "Booting"; WaitToReplicate => "WaitToReplicate"; WorkOnReplicate => "WorkOnReplicate"; End => "End"; }; type StateMachine = Map
>; type ManyState = Map
; def Map
> stateMachine() = let (Pair
> start) = Pair(Start, set[WaitToBoot]) in let (Pair
> waitToBoot) = Pair(WaitToBoot, set[Booting,End]) in let (Pair
> booting) = Pair(Booting, set[WaitToBoot,WaitToReplicate,End]) in let (Pair
> waitToReplicate) = Pair(WaitToReplicate, set[WaitToBoot,WorkOnReplicate,End]) in let (Pair
> workOnReplicate) = Pair(WorkOnReplicate, set[WaitToBoot,WaitToReplicate,End]) in map[start,waitToBoot,booting,waitToReplicate,workOnReplicate]; def Map
> concurrentStateMachine() = let (Pair
> start) = Pair(Start, set[WaitToBoot]) in let (Pair
> waitToBoot) = Pair(WaitToBoot, set[Booting,End]) in let (Pair
> booting) = Pair(Booting, set[WaitToBoot,WaitToReplicate,End]) in map[start,waitToBoot,booting]; def Map
setWaitToReplicate(Set