Home
Web interface
Download
Help
About
Step 1a:
Write your ABS code in the text area:
// ABS case study 'Replication System' by Fredhopper (R) module ReplicationSystem; import * from ABS.DC; import * from ABS.Scheduler; import * from ABS.StdLib; class ReplicationSystemMain { List
>> businessItems = list(Cons( Pair( "Business rules", list(Cons( FileItem( "config", "config/business.xml" ), Nil )) ), Nil )); List
>> dataItems = list(Cons( Pair( "Data", list(Cons( LogItem( "indices/itemstore/log" ), Cons( LogItem( "indices/tree/log" ), Nil ) )) ), Cons( Pair( "Data", list(Cons( FileItem( "indices/itemstore", "indices/itemstore/i" ), Cons( FileItem( "indices/tree", "indices/tree/t" ), Nil ) )) ), Nil ) )); List
>> searchItems = list(Cons( Pair( "Search", list(Cons( SearchItem( "indices/search" ), Nil )) ), Nil )); Map
> schedulemaps = map(Cons( Pair( "Business rules", Pair( 0, Duration( 10 ) ) ), Cons( Pair( "Data", Pair( 0, Duration( 10 ) ) ), Cons( Pair( "Search", Pair( 0, Duration( 10 ) ) ), Nil ) ) )); Unit run(){ List
schedules = this.getSchedules(); Set
cids = this.getCids(); Int maxJobs = this.getMaxJobs(); Int maxUpdates = this.getMaxUpdates(); new ReplicationSystem(maxUpdates, schedules, maxJobs, cids); } Int getMaxJobs(){ return 5; } Int getMaxUpdates(){ return 3; } Set
getCids(){ Int s = 2; Set
cs = EmptySet; while (( s > 0 )) { cs = Insert( s, cs ); s = ( s - 1 ); } return cs; } Int getNumberOfClients(){ return 1; } Map
> getScheduleMap(){ Map
> m = this.getScheduleMapORIGIN_ScheduleDelta(); m = put(m, "Business rules", Pair( 10, Duration( 20 ) )); return m; } List
getSchedules(){ List
ss = this.getSchedulesORIGIN_ScheduleDelta(); List
ss2 = this.getSchedulesORIGIN_BusinessDelta(); return concatenate(ss, ss2); } Map
> getScheduleMapORIGIN_ScheduleDelta(){ return schedulemaps; } List
getSchedulesORIGIN_ScheduleDelta(){ Map
> m = this.getScheduleMap(); return itemMapToSchedule(Nil, m, searchItems); } List
getSchedulesORIGIN_BusinessDelta(){ List
ss = this.getSchedulesORIGIN_ScheduleDelta(); Map
> m = this.getScheduleMap(); return itemMapToSchedule(ss, m, businessItems); } } def List
concatenates
(List
> lists) = case lists { Nil => Nil; Cons( x, xs ) => concatenate(x, concatenates(xs)); }; def Map
join
(Map
f, Map
g) = case g { EmptyMap => f; InsertAssoc( x, xs ) => case contains(keys(f), fst(x)) { True => join(f, xs); False => InsertAssoc( x, join(f, xs) ); }; }; data Command = StartSnapShot| EndSnapShot| ListSchedule| SearchSchedule(String ssname)| EndSearchFile| AppendSearchFile| ReceivePatternFile| SkipFile| ContinueFile| OverwriteFile| EmptyCommand; data JobType = Replication| Boot; data ReplicationItemType = SearchReplicationDirectory| LogReplicationItem| ReplicationFilePattern; type ClientId = Int; type TransactionId = Int; def A fromJustDefault
(Maybe
m, A a) = case m { Just( j ) => j; Nothing => a; }; def Set
fsts
(Set
> ps) = case ps { EmptySet => EmptySet; Insert( x, xs ) => Insert( fst(x), fsts(xs) ); }; def Set
snds
(Set
> ps) = case ps { EmptySet => EmptySet; Insert( x, xs ) => Insert( snd(x), snds(xs) ); }; def Bool range(List
vals, Int limit, Bool strict) = case vals { Nil => ~ strict; _ => let ( Int r ) = ( maximum(vals) - minimum(vals) ) in case strict { True => ( r == limit ); False => ( r <= limit ); }; }; def Int maximum(List
l) = case l { Cons( x, xs ) => maximum0(xs, x); }; def Int maximum0(List
l, Int i) = case l { Nil => i; Cons( x, xs ) => maximum0(xs, max(x, i)); }; def Int min(Int a, Int b) = case ( a < b ) { True => a; False => b; }; def Int minimum(List
l) = case l { Cons( x, xs ) => minimum0(xs, x); }; def Int minimum0(List
l, Int i) = case l { Nil => i; Cons( x, xs ) => minimum0(xs, min(x, i)); }; def Set
listToSet
(List
a) = case a { Nil => EmptySet; Cons( x, xs ) => Insert( x, listToSet(xs) ); }; def Map
setToMap
(Set
a, B b) = case a { EmptySet => EmptyMap; Insert( x, xs ) => InsertAssoc( Pair( x, b ), setToMap(xs, b) ); }; def List
take
(List
ss, Int i) = case i { 0 => Nil; _ => case ss { Nil => Nil; Cons( x, xs ) => Cons( x, take(xs, ( i - 1 )) ); }; }; def Set
choose
(Set
ss, Int i) = case i { 0 => EmptySet; _ => case ss { EmptySet => EmptySet; Insert( x, xs ) => Insert( x, choose(xs, ( i - 1 )) ); }; }; def Map
takeMap
(Map
mp, Int i) = case i { 0 => EmptyMap; _ => case mp { EmptyMap => EmptyMap; InsertAssoc( x, xs ) => InsertAssoc( x, takeMap(xs, ( i - 1 )) ); }; }; def Bool isAppendCommand(Command c) = case c { SkipFile => True; ContinueFile => True; OverwriteFile => True; _ => False; }; def List
listAt
(List
list, A a, Int i) = case list { Nil => Nil; Cons( p, l ) => case i { 0 => Cons( a, l ); _ => Cons( p, listAt(l, a, ( i - 1 )) ); }; }; def Bool setEquals
(Set
s, Set
t) = ( ( size(s) == size(t) ) && subset(s, t) ); def Bool subset
(Set
s, Set
t) = case t { EmptySet => True; Insert( x, xs ) => case contains(s, x) { True => subset(s, xs); False => False; }; }; def Int pow(Int b, Int e) = case e { 0 => 1; _ => ( b * pow(b, ( e - 1 )) ); }; [( result() == text )] def List
split(String text, String c) = case ( strlen(text) == 0 ) { True => Nil; False => split2(tailStr(text), c, Cons( headStr(text), Nil )); }; def List
split2(String text, String c, List
result) = case result { Cons( h, t ) => case ( strlen(text) == 0 ) { True => reverse(result); False => let ( String hd ) = headStr(text) in case ( hd == c ) { True => split2(tailStr(text), c, Cons( "", result )); False => split2(tailStr(text), c, Cons( ( h + hd ), t )); }; }; }; def String headStr(String text) = substr(text, 0, 1); def String tailStr(String text) = substr(text, 1, ( strlen(text) - 1 )); def Bool isPrefix
(List
list1, List
list2) = case list2 { Nil => True; Cons( l, ll ) => case list1 { Nil => False; Cons( m, mm ) => ( ( l == m ) && isPrefix(mm, ll) ); }; }; def List
stringToChar(String s) = let ( Int l ) = strlen(s) in case ( l == 0 ) { True => Nil; False => Cons( headStr(s), stringToChar(tailStr(s)) ); }; def Bool isPrefixText(String s1, String s2) = isPrefix(stringToChar(s2), stringToChar(s1)); def Bool filter(String pattern, String text) = isPrefixText(pattern, text); def Set
filters(String pattern, Set
ts) = case ts { EmptySet => EmptySet; Insert( l, ls ) => case filter(pattern, l) { True => Insert( l, filters(pattern, ls) ); False => filters(pattern, ls); }; }; type TransactionHistories = List
; type Transaction = Pair
>; data JobData = JobData(String jschedname, Int waitperiod, Int jdeadline, Int jcost, Int beforetime, Int deadlineafter, Int totaltime, Int jobid); def Maybe
updateJobData(Maybe
jd, Int cost, Int currentDeadline, Time current) = case jd { Just( JobData( a, b, c, d, e, f, g, h ) ) => Just( JobData( a, b, c, cost, e, currentDeadline, abs(( timeValue(current) - g )), h ) ); }; type TestData = Map
>; type FileId = String; type FileSize = Int; type File = Pair
; type Directory = Pair
; type ReplicationItem = Pair
>; type FileEntry = Map
; [( m <= max(m) )] def Set
> entrySet
(Map
m) = case m { EmptyMap => EmptySet; InsertAssoc( x, xs ) => Insert( x, entrySet(xs) ); }; [( result() == 2 )] def Pair
qualifyEntry(Pair
e, FileId path) = case isDirectory(snd(e)) { True => right(fromJust(qualify(Just( Right( e ) ), path))); False => left(fromJust(qualify(Just( Left( e ) ), path))); }; def Map
qualifyFileEntry(Map
m, FileId path) = case m { EmptyMap => EmptyMap; InsertAssoc( x, xs ) => InsertAssoc( qualifyEntry(x, path), qualifyFileEntry(xs, path) ); }; def Map
schedulemap(Schedules ss) = case ss { Nil => EmptyMap; Cons( x, xs ) => case x { NoSchedule => schedulemap(xs); _ => InsertAssoc( Pair( schedname(x), x ), schedulemap(xs) ); }; }; data Schedule = Schedule(String schedname, List
items, Int sched, Deadline dline)| NoSchedule; data Item = SearchItem(FileId)| FileItem(FileId, String)| LogItem(FileId); def Bool isSearchItem(Item s) = case s { SearchItem( _ ) => True; _ => False; }; def Bool isFileItem(Item s) = case s { FileItem( _, _ ) => True; _ => False; }; def Bool isLogItem(Item s) = case s { LogItem( _ ) => True; _ => False; }; def Maybe
getSchedule(List
ss, String n) = case ss { Nil => Nothing; Cons( x, xs ) => case ( schedname(x) == n ) { True => Just( x ); False => getSchedule(xs, n); }; }; def List
itemMapToSchedule(List
ss, Map
> ts, List
>> im) = case im { Nil => ss; Cons( Pair( x, y ), xs ) => let ( Maybe
s ) = getSchedule(ss, x) in case s { Just( k ) => itemMapToSchedule(Cons( Schedule( schedname(k), concatenate(y, items(k)), sched(k), dline(k) ), without(ss, k) ), removeKey(ts, x), xs); Nothing => let ( Pair
p ) = lookupUnsafe(ts, x) in itemMapToSchedule(Cons( Schedule( x, y, fst(p), snd(p) ), ss ), removeKey(ts, x), xs); }; }; def Schedules insertReplicationItemsTo(Schedules ss, String name, List
items) = case ss { Nil => Nil; Cons( x, xs ) => case ( schedname(x) == name ) { True => Cons( insertReplicationItems(x, items), xs ); False => Cons( x, insertReplicationItemsTo(xs, name, items) ); }; }; def Schedule insertReplicationItems(Schedule s, List
items) = case s { Schedule( n, ll, d, e ) => Schedule( n, concatenate(ll, items), d, e ); }; def Set
scheduleItems(Schedules ss) = case ss { Nil => EmptySet; Cons( x, xs ) => union(listToSet(items(x)), scheduleItems(xs)); }; def Either
> item(Item s) = case s { SearchItem( i ) => Left( i ); FileItem( i, r ) => Right( Pair( i, r ) ); LogItem( i ) => Left( i ); }; type Schedules = List
; data FileContent = Content(FileSize content)| Entries(FileEntry entries)| NoContent; def Bool isAncester(FileId id1, FileId id2) = isPrefix(deroot(split(id2, fileSep())), deroot(split(id1, fileSep()))); def List
deroot(List
path) = let ( FileId r ) = rootId() in case path { Cons( r, ps ) => ps; _ => path; }; def File file(FileId i, FileSize s) = Pair( i, Content( s ) ); def Directory rootDir() = emptyDir(rootId()); def Directory emptyDir(FileId i) = Pair( i, Entries( EmptyMap ) ); def Directory dir(FileId i, FileEntry e) = Pair( i, Entries( e ) ); def String fileSep() = "/"; def FileId rootId() = "root"; def Bool isFile(FileContent c) = case c { Content( _ ) => True; _ => False; }; def Bool isDirectory(FileContent c) = ~ isFile(c); def FileSize fileContent(File f) = content(snd(f)); def FileEntry dirContent(Directory f) = entries(snd(f)); def FileId getFileId(Either
f) = case f { Left( Pair( id, _ ) ) => id; Right( Pair( id, _ ) ) => id; }; def FileContent getFileContent(Either
f) = case f { Left( Pair( _, s ) ) => s; Right( Pair( _, fs ) ) => fs; }; def Either
makeContent(Pair
content) = case isFile(snd(content)) { True => Left( content ); False => Right( content ); }; def FileId makePath(FileId dir, FileId f) = ( ( dir + fileSep() ) + f ); def FileId makePaths(List
fs) = case fs { Nil => ""; Cons( f, Nil ) => f; Cons( f, gs ) => ( ( f + fileSep() ) + makePaths(gs) ); }; def Pair
splitFileId(FileId f) = Pair( dirName(f), fileName(f) ); def FileId fileName(FileId f) = head(reverse(split(f, fileSep()))); def FileId dirName(FileId f) = makePaths(reverse(tail(reverse(split(f, fileSep()))))); def Set
getFileIdFromDir(Directory d) = case snd(d) { Entries( e ) => case ( fst(d) == rootId() ) { True => getFileIdFromEntries1(e); False => getFileIdFromEntries(fst(d), e); }; }; def Set
getFileIdFromEntries1(FileEntry fe) = case fe { EmptyMap => EmptySet; InsertAssoc( Pair( i, c ), fs ) => case isFile(c) { True => Insert( i, getFileIdFromEntries1(fs) ); False => union(getFileIdFromEntries(i, entries(c)), getFileIdFromEntries1(fs)); }; }; def Set
getFileIdFromEntries(FileId id, FileEntry fe) = case fe { EmptyMap => EmptySet; InsertAssoc( Pair( i, c ), fs ) => case isFile(c) { True => Insert( makePath(id, i), getFileIdFromEntries(id, fs) ); False => union(getFileIdFromEntries(makePath(id, i), entries(c)), getFileIdFromEntries(id, fs)); }; }; def Bool hasEntriesIn(Directory d, FileId id) = case snd(d) { Entries( e ) => hasEntry(e, id); }; def Bool hasEntry(FileEntry f, FileId id) = isJust(findFromEntry(f, id)); def Maybe
> findFromEntryIn(Directory d, FileId id) = case snd(d) { Entries( e ) => findFromEntry(e, id); }; def Maybe
> findFromEntry(FileEntry f, FileId id) = case contains(keys(f), id) { True => case lookupUnsafe(f, id) { Content( s ) => makeMaybeEitherValue(True, id, Content( s )); Entries( e ) => makeMaybeEitherValue(False, id, Entries( e )); }; False => case f { InsertAssoc( Pair( i, Content( _ ) ), fm ) => findFromEntry(fm, id); InsertAssoc( Pair( i, Entries( g ) ), fm ) => case findFromEntry(g, id) { Nothing => findFromEntry(fm, id); r => qualify(r, i); }; EmptyMap => Nothing; }; }; def Maybe
> qualify(Maybe
> r, FileId path) = case r { Just( h ) => let ( FileId hi ) = makePath(path, getFileId(h)) in let ( FileContent hc ) = getFileContent(h) in case h { Left( _ ) => makeMaybeEitherValue(True, hi, hc); Right( _ ) => makeMaybeEitherValue(False, hi, hc); }; Nothing => Nothing; }; def Maybe
> makeMaybeEitherValue(Bool isfile, FileId id, FileContent c) = case isfile { True => Just( Left( Pair( id, c ) ) ); _ => Just( Right( Pair( id, c ) ) ); }; def Bool hasQualifiedEntriesIn(Directory d, FileId qualified) = case snd(d) { Entries( e ) => hasQualifiedEntry(e, qualified); }; def Bool hasQualifiedEntry(FileEntry f, FileId qualified) = isJust(getFromEntry(f, qualified)); def Maybe
getFromEntryIn(Directory d, FileId qualified) = case snd(d) { Entries( e ) => case ( fst(d) == rootId() ) { True => getFromEntry(e, qualified); _ => getFromEntry(InsertAssoc( d, EmptyMap ), qualified); }; }; def Maybe
getFromEntry(FileEntry entry, FileId qualified) = let ( List
paths ) = split(qualified, fileSep()) in case ( length(paths) > 0 ) { True => case contains(keys(entry), head(paths)) { True => let ( FileContent cc ) = lookupUnsafe(entry, head(paths)) in case length(tail(paths)) { 0 => Just( cc ); _ => case cc { Content( _ ) => Nothing; Entries( e ) => getFromEntry(e, makePaths(tail(paths))); }; }; False => Nothing; }; False => Nothing; }; def Directory updateDirWithContent(Directory d, FileId i, FileContent c) = updateDirWith(d, makeContent(Pair( i, c ))); def Directory updateDirWithContents(Directory d, Map
contents) = case contents { EmptyMap => d; InsertAssoc( Pair( i, c ), cs ) => updateDirWithContents(updateDirWithContent(d, i, c), cs); }; def Directory updateDirWithFile(Directory d, File f) = updateDirWith(d, Left( f )); def Directory updateDirWithDir(Directory d, Directory f) = updateDirWith(d, Right( f )); def Directory updateDirWith(Directory d, Either
f) = case snd(d) { Entries( e ) => Pair( fst(d), Entries( updateFile(e, f) ) ); }; def FileEntry updateFile(FileEntry fe, Either
f) = updateFile1(fe, getFileContent(f), deroot(split(getFileId(f), fileSep()))); def FileEntry updateFile1(FileEntry fe, FileContent c, List
path) = case path { Nil => fe; Cons( p, Nil ) => put(fe, p, c); Cons( p, ps ) => case contains(keys(fe), p) { True => case lookupUnsafe(fe, p) { Entries( dc ) => put(fe, p, Entries( updateFile1(dc, c, ps) )); _ => put(fe, p, create(ps, c)); }; False => put(fe, p, create(ps, c)); }; }; def FileContent create(List
path, FileContent c) = case path { Cons( p, Nil ) => Entries( InsertAssoc( Pair( p, c ), EmptyMap ) ); Cons( p, ps ) => Entries( InsertAssoc( Pair( p, create(ps, c) ), EmptyMap ) ); }; def Directory applyChanges(Directory d, TransactionHistories th, TransactionId id) = case th { Nil => d; Cons( x, xs ) => case ( fst(x) <= id ) { True => applyChanges(updateDirWithContents(d, snd(x)), xs, id); False => d; }; }; def Map
firstValues
(Map
> mp, B default) = case mp { EmptyMap => EmptyMap; InsertAssoc( Pair( x, ls ), xs ) => case ls { Nil => InsertAssoc( Pair( x, default ), firstValues(xs, default) ); Cons( y, ys ) => InsertAssoc( Pair( x, y ), firstValues(xs, default) ); }; }; interface Tester { Unit analyse();} interface Updater { Unit shutDown();} interface Commandee { [Atomic] Unit command(Command command);} interface Worker extends Commandee { ClientId forClient();} interface Node { DataBase getDataBase(); Bool isShutdownRequested(); Unit requestShutDown();} interface Network { Unit shutDown(SyncClient client);} interface ConnectionThread extends Worker { } interface ServerNode extends Node { UpdatableDataBase getUpdatableDataBase(); Schedules listSchedules(); Schedule getSchedule(String name);} interface DataBase { [Atomic] FileContent getContent(FileId fId); [Atomic] Bool hasFile(FileId fId); [Atomic] Set
listFiles(); [Atomic] Maybe
listFilesAt(FileId dir); [Atomic] Directory getRoot();} interface UpdatableDataBase extends ServerDataBase { [Atomic] Unit update(Map
changes); [Atomic] TransactionHistories getTransactions();} interface ServerDataBase extends DataBase { [Atomic] TransactionId refresh();} interface ClientDataBase extends DataBase { [Atomic] Bool prepareReplicationItem(TransactionId p, Schedule schedule); [Atomic] Unit updateFile(FileId fId, FileSize size); [Atomic] Map
lastTransactionIds();} interface SyncServerAcceptor { [Far] ConnectionThread getConnection(ClientJob job, Int cost);} interface Resource { Unit consume();} class Resource implements Resource { Unit consume(){ duration(1, 1); } } interface Recorder { Unit record(JobData d);} class Recorder(ClientId id) implements Recorder { List
totalTimes = Nil; List
deadlines = Nil; List
missedDeadlines = Nil; [( totalTimes <= max(totalTimes) )] [( deadlines <= max(dealines) )] [( missedDeadlines <= max(missedDeadlines) )] Unit record(JobData jd){ Int d = deadlineafter(jd); deadlines = Cons( d, deadlines ); missedDeadlines = Cons( ( d > 0 ), missedDeadlines ); totalTimes = Cons( totaltime(jd), totalTimes ); } } class SyncServerAcceptorImpl([Final] [Near]SyncServer server) implements SyncServerAcceptor { Bool accept = True; Map
current = EmptyMap; Int threads = 0; Bool rb = True; Resource r1; Resource r2; { r1 = new Resource(); r2 = new Resource(); } [( accept <= max(accept) )] [( current <= max(current) )] [( threads <= max(threads) )] [( rb <= max(rb) )] [( r1 <= max(r1) )] [( r2 <= max(r2) )] Resource getResource(){ Resource r = null; if ( rb ){ r = r1; rb = False; } else { r = r2; rb = True; } return r; } [( accept <= max(accept) )] [( current <= max(current) )] [( threads <= max(threads) )] [( rb <= max(rb) )] [( r1 <= max(r1) )] [( r2 <= max(r2) )] ConnectionThread getConnection(ClientJob job, Int cost){ ConnectionThread thread = null; Bool shutdown = server.isShutdownRequested(); if ( ~ shutdown ){ Resource resource = this.getResource(); thread = new ConnectionThreadImpl(job, server, resource, threads, cost); threads = ( threads + 1 ); } return thread; } } class TesterImpl(ServerNode expected, Client actual) implements Tester { Schedules schedules = Nil; Map
scheduleResults = EmptyMap; Set
> result = EmptySet; [( schedules <= max(schedules) )] [( scheduleResults <= max(scheduleResults) )] [( result <= max(result) )] Unit analyse(){ Fut
fe = expected!getUpdatableDataBase(); UpdatableDataBase e = fe.get; Fut
schf = expected!listSchedules(); schedules = schf.get; Fut
tf = e!getTransactions(); TransactionHistories transactions = tf.get; Fut
fa = actual!getClientDataBase(); ClientDataBase a = fa.get; Fut
> idf = a!lastTransactionIds(); scheduleResults = idf.get; Fut
rf = a!getRoot(); Directory act = rf.get; this.checkDatas(scheduleResults, reverse(transactions), act); } [( schedules <= max(schedules) )] [( scheduleResults <= max(scheduleResults) )] [( result <= max(result) )] Unit checkDatas(Map
tids, TransactionHistories th, Directory act){ Int index=0; while (index
is = items(s); while (( is != Nil )) { this!checkData(head(is), exp, act); is = tail(is); } suspend; } } } [( schedules <= max(schedules) )] [( scheduleResults <= max(scheduleResults) )] [( result <= max(result) )] Bool hasFile(DataBase b, FileId f){ Fut
fb = b!hasFile(f); return fb.get; } Unit checkData(Item i, Directory exp, Directory act){ this.checkDataORIGIN_DirDelta(i, exp, act); if ( isRight(item(i)) ){ FileId id = fst(right(item(i))); String pattern = snd(right(item(i))); Bool eh = hasQualifiedEntriesIn(exp, id); Bool ah = hasQualifiedEntriesIn(act, id); if ( ( ( eh != ah ) && eh ) ){ FileContent ce = fromJust(getFromEntryIn(exp, id)); assert ~ isFile(ce); assert emptySet(filters(pattern, getFileIdFromDir(dir(id, entries(ce))))); } else if ( eh ){ FileContent ce = fromJust(getFromEntryIn(exp, id)); FileContent ca = fromJust(getFromEntryIn(act, id)); if ( isFile(ce) ){ if ( filter(pattern, id) ){ this.compareFile(file(id, content(ce)), file(id, content(ca))); } } else { this.compareDirWithPattern(pattern, dir(id, entries(ce)), dir(id, entries(ca))); } } } } Unit compareEntrySets(Set
eids, Set
aids, Map
ee, Map
ae){ assert ( size(eids) == size(aids) ); while (hasNext(eids)) { Pair
, FileId> nd = next(eids); FileId id = snd(nd); eids = fst(nd); FileContent es = lookupDefault(ee, id, NoContent); FileContent as = lookupDefault(ae, id, NoContent); result = Insert( Triple( id, es, as ), result ); assert ( es == as ); } } Unit compareFile(File e, File a){ FileId id = getFileId(Left( e )); FileContent es = getFileContent(Left( e )); FileContent as = getFileContent(Left( a )); result = Insert( Triple( id, es, as ), result ); assert ( es == as ); } Unit compareDir(Directory e, Directory a){ this.compareEntrySets(getFileIdFromDir(e), getFileIdFromDir(a), qualifyFileEntry(entries(snd(e)), fst(e)), qualifyFileEntry(entries(snd(a)), fst(a))); } Unit checkDataORIGIN_ReplicationItemDelta(Item i, Directory exp, Directory act){ } Unit compareDirWithPattern(String pattern, Directory e, Directory a){ this.compareEntrySets(filters(pattern, getFileIdFromDir(e)), filters(pattern, getFileIdFromDir(a)), qualifyFileEntry(entries(snd(e)), fst(e)), qualifyFileEntry(entries(snd(a)), fst(a))); } Unit checkDataORIGIN_DirDelta(Item i, Directory exp, Directory act){ if ( isLeft(item(i)) ){ FileId id = left(item(i)); Bool eh = hasQualifiedEntriesIn(exp, id); Bool ah = hasQualifiedEntriesIn(act, id); assert ( eh == ah ); if ( eh ){ FileContent ce = fromJust(getFromEntryIn(exp, id)); FileContent ca = fromJust(getFromEntryIn(act, id)); if ( isFile(ce) ){ this.compareFile(file(id, content(ce)), file(id, content(ca))); } else { this.compareDir(dir(id, entries(ce)), dir(id, entries(ca))); } } } else { this.checkDataORIGIN_ReplicationItemDelta(i, exp, act); } } } class Network([Far] [Final]SyncServer server, Set<[Far]SyncClient> clients, [Far]Updater updater) implements Network { Bool ready = False; Set
testers = EmptySet; [( ready <= max(ready) )] [( testers <= max(testers) )] Unit run(){ Set
cs = clients; while (hasNext(cs)) { Pair
, SyncClient> nt = next(cs); Tester tester = new TesterImpl(server, snd(nt)); testers = Insert( tester, testers ); cs = fst(nt); } //Enrique TRY /* Set
cs = clients; while (hasNext(clients)) { Pair
, SyncClient> nt = next(clients); Tester tester = new TesterImpl(server, snd(nt)); testers = Insert( tester, testers ); clients = fst(nt); suspend; } clients = cs; */ ready = True; } [( ready <= max(ready) )] [( testers <= max(testers) )] Unit shutDown(SyncClient client){ await ready; clients = remove(clients, client); if ( ( clients == EmptySet ) ){ Fut
ss = updater!shutDown(); ss.get; ss = server!requestShutDown(); ss.get; Set
ts = testers; while (hasNext(ts)) { Pair
, Tester> nt = next(ts); Tester tester = snd(nt); tester!analyse(); ts = fst(nt); } /* Enrique TRY while (hasNext(testers)) { Pair
, Tester> nt = next(testers); Tester tester = snd(nt); tester!analyse(); testers = fst(nt); suspend; }*/ } } } //class UpdaterImpl([Final]Int updates, SyncServer server) implements Updater { class UpdaterImpl(Int updates, SyncServer server) implements Updater { Bool sd = False; List
> histories = Nil; //[Final] Int best = 5; //[Final] Int worst = 10; //[Final] FileSize limit = 5; List
replicationItems = list(Cons( "indices/itemstore/i1", Cons( "indices/itemstore/i2", Cons( "indices/itemstore/log/j1", Cons( "indices/search/s1", Cons( "indices/search/s2", Cons( "indices/tree/t1", Cons( "indices/tree/log/j2", Cons( "config/random.xml", Cons( "config/business.xml", Nil ) ) ) ) ) ) ) ) )); //[( sd <= max(sd) )] [( histories <= max(histories) )] [( best <= max(best) )] [( worst <= max(worst) )] [( limit <= max(limit) )] [( replicationItems <= max(replicationItems) )] Unit run(){ Fut
fd = server!getUpdatableDataBase(); UpdatableDataBase db = fd.get; Map
changes = EmptyMap; Int count = 0; while (( ~ sd && ( ( updates < 0 ) || ( count < updates ) ) )) { changes = this.makeChange(); histories = Cons( changes, histories ); if ( ( changes != EmptyMap ) ){ Fut
u = db!update(changes); u.get; } await duration(best, worst); count = ( count + 1 ); } /*while (( ~ sd && ( ( updates < 0 ) || ( count < updates ) ) )) { changes = this.makeChange(); histories = Cons( changes, histories ); if ( ( changes != EmptyMap ) ){ Fut
u = db!update(changes); u.get; } await duration(best, worst); count = ( count + 1 ); }*/ this.shutDown(); } [( sd <= max(sd) )] [( histories <= max(histories) )] [( best <= max(best) )] [( worst <= max(worst) )] [( limit <= max(limit) )] [( replicationItems <= max(replicationItems) )] Map
makeChange(){ List
fs = this.chooseFile(); Map
result = this.assignContent(fs, limit); return result; } [( sd <= max(sd) )] [( histories <= max(histories) )] [( best <= max(best) )] [( worst <= max(worst) )] [( limit <= max(limit) )] [( replicationItems <= max(replicationItems) )] Unit shutDown(){ sd = True; } [( sd <= max(sd) )] [( histories <= max(histories) )] [( best <= max(best) )] [( worst <= max(worst) )] [( limit <= max(limit) )] [( replicationItems <= max(replicationItems) )] Map
assignContent(List
w, FileSize limit){ Map
result = EmptyMap; while (( w != Nil )) { Int rand = random(limit); result = InsertAssoc( file(head(w), ( rand + 1 )), result ); w = tail(w); } return result; } [( sd <= max(sd) )] [( histories <= max(histories) )] [( best <= max(best) )] [( worst <= max(worst) )] [( limit <= max(limit) )] [( replicationItems <= max(replicationItems) )] List
chooseFile(){ /* List
files = replicationItems; List
result = Nil; while (( files != Nil )) { Int rand = random(2); if ( ( rand == 0 ) ){ result = Cons( head(files), result ); } files = tail(files); } */ List
result = Nil; while (( replicationItems != Nil )) { Int rand = random(2); if ( ( rand == 0 ) ){ result = Cons( head(replicationItems), result ); } replicationItems = tail(replicationItems); suspend; } return result; } } class DataBaseImpl implements ServerDataBase, ClientDataBase, UpdatableDataBase { Int count = 0; Map
> transactions = EmptyMap; TransactionHistories histories = Nil; Pair
> currentTransaction = Pair( - 1, EmptyMap ); Directory rdir = rootDir(); [( count <= max(count) )] [( transactions <= max(transactions) )] [( histories <= max(histories) )] [( currentTransaction <= max(currentTransaction) )] [( rdir <= max(rdir) )] [Atomic] TransactionHistories getTransactions(){ return histories; } [( count <= max(count) )] [( transactions <= max(transactions) )] [( histories <= max(histories) )] [( currentTransaction <= max(currentTransaction) )] [( rdir <= max(rdir) )] [Atomic] Unit update(Map
changes){ rdir = updateDirWithContents(rdir, changes); currentTransaction = Pair( ( fst(currentTransaction) + 1 ), changes ); histories = Cons( currentTransaction, histories ); } [( count <= max(count) )] [( transactions <= max(transactions) )] [( histories <= max(histories) )] [( currentTransaction <= max(currentTransaction) )] [( rdir <= max(rdir) )] [Atomic] TransactionId refresh(){ count = ( count + 1 ); return fst(currentTransaction); } [( count <= max(count) )] [( transactions <= max(transactions) )] [( histories <= max(histories) )] [( currentTransaction <= max(currentTransaction) )] [( rdir <= max(rdir) )] [Atomic] FileContent getContent(FileId qualified){ Maybe
result = Nothing; if ( ( qualified == rootId() ) ){ result = Just( getFileContent(Right( rdir )) ); } else { result = getFromEntryIn(rdir, qualified); } assert ( result != Nothing ); return fromJust(result); } [( count <= max(count) )] [( transactions <= max(transactions) )] [( histories <= max(histories) )] [( currentTransaction <= max(currentTransaction) )] [( rdir <= max(rdir) )] [Atomic] Bool hasFile(FileId qualified){ return hasQualifiedEntriesIn(rdir, qualified); } [( count <= max(count) )] [( transactions <= max(transactions) )] [( histories <= max(histories) )] [( currentTransaction <= max(currentTransaction) )] [( rdir <= max(rdir) )] [Atomic] Directory getRoot(){ return rdir; } [( count <= max(count) )] [( transactions <= max(transactions) )] [( histories <= max(histories) )] [( currentTransaction <= max(currentTransaction) )] [( rdir <= max(rdir) )] [Atomic] Bool prepareReplicationItem(TransactionId p, Schedule schedule){ Bool result = False; String name = schedname(schedule); List
tids = lookupDefault(transactions, name, Nil); if ( ~ contains(set(tids), p) ){ transactions = put(transactions, name, Cons( p, tids )); result = True; } return result; } [( count <= max(count) )] [( transactions <= max(transactions) )] [( histories <= max(histories) )] [( currentTransaction <= max(currentTransaction) )] [( rdir <= max(rdir) )] [Atomic] Map
lastTransactionIds(){ return firstValues(transactions, - 1); } [( count <= max(count) )] [( transactions <= max(transactions) )] [( histories <= max(histories) )] [( currentTransaction <= max(currentTransaction) )] [( rdir <= max(rdir) )] [Atomic] Unit updateFile(FileId qualified, FileSize size){ rdir = updateDirWithFile(rdir, file(qualified, size)); } [( count <= max(count) )] [( transactions <= max(transactions) )] [( histories <= max(histories) )] [( currentTransaction <= max(currentTransaction) )] [( rdir <= max(rdir) )] [( result() <= max(qualifiedDir) )] [Atomic] Maybe
listFilesAt(FileId qualifiedDir){ return getFromEntryIn(rdir, qualifiedDir); } [( count <= max(count) )] [( transactions <= max(transactions) )] [( histories <= max(histories) )] [( currentTransaction <= max(currentTransaction) )] [( rdir <= max(rdir) )] [Atomic] Set
listFiles(){ Set
allqualified = getFileIdFromDir(rdir); return allqualified; } } class SyncServerImpl(Schedules schedules, Set
clients) implements SyncServer { Bool shutDown = False; [Far] SyncServerClientCoordinator coordinator; [Near] SyncServerAcceptor acceptor; [Near] UpdatableDataBase db; Map
scheduleMap = schedulemap(schedules); ReplicationSnapshot snapshot; { db = new local DataBaseImpl(); } [( shutDown <= max(shutDown) )] [( coordinator <= max(coordinator) )] [( acceptor <= max(acceptor) )] [( db <= max(db) )] [( scheduleMap <= max(scheduleMap) )] DataBase getDataBase(){ return db; } [( shutDown <= max(shutDown) )] [( coordinator <= max(coordinator) )] [( acceptor <= max(acceptor) )] [( db <= max(db) )] [( scheduleMap <= max(scheduleMap) )] Schedule getSchedule(String name){ assert contains(keys(scheduleMap), name); return lookupUnsafe(scheduleMap, name); } [( shutDown <= max(shutDown) )] [( coordinator <= max(coordinator) )] [( acceptor <= max(acceptor) )] [( db <= max(db) )] [( scheduleMap <= max(scheduleMap) )] Schedules listSchedules(){ return schedules; } [( shutDown <= max(shutDown) )] [( coordinator <= max(coordinator) )] [( acceptor <= max(acceptor) )] [( db <= max(db) )] [( scheduleMap <= max(scheduleMap) )] UpdatableDataBase getUpdatableDataBase(){ return db; } [( shutDown <= max(shutDown) )] [( coordinator <= max(coordinator) )] [( acceptor <= max(acceptor) )] [( db <= max(db) )] [( scheduleMap <= max(scheduleMap) )] Bool isShutdownRequested(){ return shutDown; } [( shutDown <= max(shutDown) )] [( coordinator <= max(coordinator) )] [( acceptor <= max(acceptor) )] [( db <= max(db) )] [( scheduleMap <= max(scheduleMap) )] Unit requestShutDown(){ shutDown = True; } [( shutDown <= max(shutDown) )] [( coordinator <= max(coordinator) )] [( acceptor <= max(acceptor) )] [( db <= max(db) )] [( scheduleMap <= max(scheduleMap) )] SyncServerClientCoordinator getCoordinator(){ await ( coordinator != null ); return coordinator; } [( shutDown <= max(shutDown) )] [( coordinator <= max(coordinator) )] [( acceptor <= max(acceptor) )] [( db <= max(db) )] [( scheduleMap <= max(scheduleMap) )] SyncServerAcceptor getAcceptor(){ await ( acceptor != null ); return acceptor; } Unit run(){ snapshot = new local ReplicationSnapshotImpl(db, schedules); coordinator = new SyncServerClientCoordinatorImpl(this, clients); Fut
f = coordinator!setSnapshot(snapshot); f.get; acceptor = new local SyncServerAcceptorImpl(this); } ReplicationSnapshot getReplicationSnapshot(){ return snapshot; } } class ReplicationSystem([Final]Int maxUpdates, [Final]List
schedules, [Final]Int maxJobs, Set
cids) { //class ReplicationSystem([Final]Int maxUpdates, [Final]List
schedules, [Final]Int maxJobs, [Final]Set
cids) { SyncServer getSyncServer(){ DeploymentComponent s = this.getServerDeployment(); [DC : s] SyncServer syncserver = new SyncServerImpl(schedules, cids); return syncserver; } SyncClient getSyncClient(ClientId id){ DeploymentComponent c = this.getClientDeployment(); [DC : c] SyncClient syncclient = new SyncClientImpl(maxJobs, id); return syncclient; } Unit run(){ SyncServer syncserver = this.getSyncServer(); Set<[Far]SyncClient> syncclients = EmptySet; while (hasNext(cids)) { Pair
, ClientId> nt = next(cids); SyncClient syncclient = this.getSyncClient(snd(nt)); syncclients = insertElement(syncclients, syncclient); cids = fst(nt); suspend; } /*Set
iterator = cids; while (hasNext(iterator)) { Pair
, ClientId> nt = next(iterator); SyncClient syncclient = this.getSyncClient(snd(nt)); syncclients = insertElement(syncclients, syncclient); iterator = fst(nt); }*/ Updater updater = new UpdaterImpl(maxUpdates, syncserver); Network network = new Network(syncserver, syncclients, updater); Fut
acc = syncserver!getAcceptor(); await acc?; [Far] SyncServerAcceptor acceptor = acc.get; Set
clientIterator = syncclients; while (hasNext(clientIterator)) { Pair
, SyncClient> nt = next(clientIterator); SyncClient syncclient = snd(nt); Fut
fu = syncclient!setNetwork(network); fu.get; syncclient!setAcceptor(acceptor); clientIterator = fst(nt); } } DeploymentComponent getServerDeployment(){ DeploymentComponent s = new DeploymentComponent("s1", CPU( 1 )); return s; } DeploymentComponent getClientDeployment(){ DeploymentComponent c = new DeploymentComponent("c1", CPU( 1 )); return c; } DeploymentComponent changeDC(DeploymentComponent dc, String name, Int cpu){ DeploymentComponent deployment = new local DeploymentComponent(name, CPU( cpu )); return deployment; } } def Int defaultScheduleCost(Schedule s) = case s { NoSchedule => 5; Schedule( n, _, _, i ) => case n { "Data" => 5; "Business rules" => 5; "Search" => 5; }; }; def Process scheduleHighestCostScheduler(List
q) = scheduleCostSchedulerH(head(q), tail(q)); def Process scheduleCostSchedulerH(Process h, List
t) = case t { Nil => h; Cons( h2, t2 ) => case ( value(h) > value(h2) ) { True => scheduleCostSchedulerH(h, t2); False => scheduleCostSchedulerH(h2, t2); }; }; def Process scheduleLoweestCostScheduler(List
q) = scheduleCostSchedulerL(head(q), tail(q)); def Process scheduleCostSchedulerL(Process h, List
t) = case t { Nil => h; Cons( h2, t2 ) => case ( value(h) < value(h2) ) { True => scheduleCostSchedulerL(h, t2); False => scheduleCostSchedulerL(h2, t2); }; }; def Process edf(List
q) = edfH(head(q), tail(q)); def Process edfH(Process h, List
t) = case t { Nil => h; Cons( h2, t2 ) => case durationLessThan(procdeadline(h), procdeadline(h2)) { True => edfH(h, t2); False => edfH(h2, t2); }; }; def Process hcf(List
q) = hcfH(head(q), tail(q)); def Process hcfH(Process h, List
t) = case t { Nil => h; Cons( h2, t2 ) => case durationLessThan(cost(h), cost(h2)) { False => hcfH(h, t2); True => hcfH(h2, t2); }; }; interface ReplicationSnapshot { Unit refreshSnapshot(); Unit clearSnapshot(); Int getIndexingId(); Set
getItems(String name);} interface BasicReplicationItem { FileEntry getContents(); [Atomic] Unit cleanup(); FileId getAbsoluteDir();} interface ServerReplicationItem extends BasicReplicationItem { Command getCommand(); ReplicationItemType getType(); [Atomic] Unit refresh();} class ReplicationSnapshotImpl(ServerDataBase db, Schedules schedules) implements ReplicationSnapshot { Int count = 0; Int update = 0; TransactionId tid = - 1; Bool clean = True; Map
> repItems = EmptyMap; Set
getItems(String name){ return lookupDefault(repItems, name, EmptySet); } Unit refreshSnapshot(){ count = ( count + 1 ); if ( clean ){ tid = db.refresh(); update = ( update + 1 ); this.createReplicationItems(); Set
names = keys(repItems); while (hasNext(names)) { Pair
, String> nn = next(names); Set
titems = lookupUnsafe(repItems, snd(nn)); while (hasNext(titems)) { Pair
, ServerReplicationItem> ni = next(titems); ServerReplicationItem item = snd(ni); item.refresh(); titems = fst(ni); } names = fst(nn); } clean = False; } } Unit createReplicationItems(){ Int index=0; Fut
y; while (index
is = items(schedule); Set
sitems = EmptySet; while (( is != Nil )) { ServerReplicationItem r = this.replicationItem(head(is)); sitems = Insert( r, sitems ); is = tail(is); } repItems = InsertAssoc( Pair( schedname(schedule), sitems ), repItems ); } [Atomic] ServerReplicationItem replicationItem(Item i){ ServerReplicationItem item = this.replicationItemORIGIN_FileDelta(i); if ( ( ( item == null ) && isLogItem(i) ) ){ item = new local ReplicationLogItem(left(item(i)), db); } return item; } Unit clearSnapshot(){ repItems = EmptyMap; clean = True; } Int getIndexingId(){ return tid; } [Atomic] ServerReplicationItem replicationItemORIGIN_ReplicationItemDelta(Item i){ ServerReplicationItem item = null; return item; } [Atomic] ServerReplicationItem replicationItemORIGIN_DirDelta(Item i){ ServerReplicationItem item = this.replicationItemORIGIN_ReplicationItemDelta(i); if ( ( ( item == null ) && isSearchItem(i) ) ){ item = new local SearchDirectoryItem(left(item(i)), db); } return item; } [Atomic] ServerReplicationItem replicationItemORIGIN_FileDelta(Item i){ ServerReplicationItem item = this.replicationItemORIGIN_DirDelta(i); if ( ( ( item == null ) && isFileItem(i) ) ){ Pair
it = right(item(i)); item = new local ReplicationFilePattern(fst(it), snd(it), db); } return item; } } interface InternalItem extends BasicReplicationItem { [Atomic] Directory getState(); [Atomic] Unit setState(Directory dir);} class BasicReplicationItemImpl(FileId qualified, ServerDataBase db) implements InternalItem { Directory snapshot = updateDirWithDir(rootDir(), emptyDir(qualified)); FileEntry getContents(){ return dirContent(snapshot); } FileId getAbsoluteDir(){ return qualified; } [( qualified <= max(qualified) )] [Atomic] Unit cleanup(){ snapshot = updateDirWithDir(rootDir(), emptyDir(qualified)); } [( result() <= max(snapshot) )] [Atomic] Directory getState(){ return snapshot; } [Atomic] Unit setState(Directory dir){ snapshot = dir; } } class SearchDirectoryItem(FileId qualified, ServerDataBase db) implements ServerReplicationItem { InternalItem internal; { internal = new local BasicReplicationItemImpl(qualified, db); } FileEntry getContents(){ return internal.getContents(); } Command getCommand(){ return AppendSearchFile; } ReplicationItemType getType(){ return SearchReplicationDirectory; } FileId getAbsoluteDir(){ return internal.getAbsoluteDir(); } [Atomic] Unit refresh(){ Directory snapshot = internal.getState(); Maybe
ffs = db.listFilesAt(qualified); if ( ( ffs != Nothing ) ){ FileContent content = fromJust(ffs); assert isDirectory(content); snapshot = updateDirWithDir(snapshot, dir(qualified, entries(content))); } internal.setState(snapshot); } [Atomic] Unit cleanup(){ internal.cleanup(); } } type StateMachine = Map
>; interface ClientJob extends Worker { Bool registerReplicationItems(TransactionId id); Maybe
processFile(FileId id); Unit processContent(File file); Unit receiveSchedule(Schedules schedules); Unit executeJob();} interface Client extends Node { ClientDataBase getClientDataBase();} interface SyncClient extends Client, ClientStateMachine { [Far] SyncServerAcceptor getAcceptor(); Unit setAcceptor(SyncServerAcceptor acceptor); Unit setNetwork(Network network);} interface SyncServerClientCoordinatorSpec extends SyncServerClientCoordinator { Unit refreshSnapShot(); Unit clearSnapshot();} interface CommonInternalClient extends SyncClient { ClientId getId(); [Atomic] Unit setMaximumTransactionId(Int id); [Atomic] Pair
jobCountAndMaximumTransactionId(); Unit scheduleJob(JobType jb, Schedule schedule); Unit finishJob(ClientJob job, Maybe
jobData);} class SyncClientImpl([Final]Int maxJobs, [Final]ClientId id) implements InternalClient, SyncClient { Network network; SyncServerAcceptor acceptor; ClientDataBase db; Recorder recorder; Bool shutDown = False; Set
jobRecords = EmptySet; List
jobHistories = Nil; List
jobDatas = Nil; List
hit = Nil; List
missed = Nil; Int currentTransactionId = - 1; StateMachine machine = stateMachine(); State state = Start; Bool next = False; { db = new local DataBaseImpl(); recorder = new local Recorder(id); } [Atomic] Unit setMaximumTransactionId(Int id){ currentTransactionId = id; } [Atomic] Pair
jobCountAndMaximumTransactionId(){ return Pair( length(jobHistories), currentTransactionId ); } Unit scheduleJob(JobType jb, Schedule schedule){ this.waitFor(schedule); if ( ~ shutDown ){ this.setNext(schedule); [Deadline : dline(schedule)] this.makeJob(jb, schedule); hit = Cons( schedule, hit ); } else { missed = Cons( schedule, missed ); } } [( maxJobs <= max(maxjobs) )] [( jobHistories <= max(jobHistories) )] [( jobDatas <= max(jobDatas) )] Unit makeJob(JobType jb, Schedule schedule){ ClientJob job = new ClientJobImpl(maxJobs, this, jb, schedule, length(jobHistories)); [Deadline : deadline()] job!executeJob(); jobHistories = Cons( job, jobHistories ); jobRecords = Insert( job, jobRecords ); } Unit finishJob(ClientJob job, Maybe
jobData){ if ( isJust(jobData) ){ jobDatas = Cons( fromJust(jobData), jobDatas ); recorder.record(fromJust(jobData)); } jobRecords = remove(jobRecords, job); } ClientId getId(){ return id; } Bool isShutdownRequested(){ return shutDown; } Unit requestShutDown(){ shutDown = True; await ( jobRecords == EmptySet ); network!shutDown(this); } SyncServerAcceptor getAcceptor(){ return acceptor; } Unit run(){ this.waitToBoot(); await ( acceptor != null ); this.makeJob(Boot, NoSchedule); } ClientDataBase getClientDataBase(){ return db; } DataBase getDataBase(){ return db; } Unit setAcceptor([Far]SyncServerAcceptor acc){ acceptor = acc; } Unit setNetwork(Network n){ network = n; } Unit setNext(Schedule schedule){ next = False; } Unit waitFor(Schedule schedule){ Int wait = sched(schedule); await duration(wait, wait) & ( next || shutDown ); } Unit nextJob(){ next = True; } [( machine <= max(machine) )] Unit becomesState(State state){ Set
tos = lookupDefault(machine, state, EmptySet); assert ( tos != EmptySet ); assert contains(tos, state); state = state; } Unit waitToBoot(){ this.becomesState(WaitToBoot); } Unit boot(){ this.becomesState(Booting); } Unit start(){ this.becomesState(Booting); } Unit waitToReplicate(){ this.becomesState(WaitToReplicate); } Unit replicate(){ this.becomesState(WorkOnReplicate); } Unit end(){ this.becomesState(End); } } class ClientJobImpl([Final]Int maxJobs, [Far] [Final]InternalClient client, [Final]JobType job, [Final]Schedule schedule, [Final]Int id) implements ClientJob { Int br = 0; Int ar = 0; Maybe
deadline = Nothing; Maybe
jd = Nothing; Command start = EmptyCommand; Command command = EmptyCommand; Schedules schedules = Nil; ClientId clientId = - 1; TransactionId transactionId = - 1; ConnectionThread thread = null; [Far] ClientDataBase db; ConnectionThread getConnectionThread(){ Fut
fs = client!getAcceptor(); SyncServerAcceptor acceptor = fs.get; Fut
t = acceptor!getConnection(this, defaultScheduleCost(schedule)); await t?; return t.get; } Unit clientDB(){ Fut
fd = client!getClientDataBase(); db = fd.get; } Unit establishSchedule(){ Fut
> jcf = client!jobCountAndMaximumTransactionId(); Pair
stats = jcf.get; if ( ( fst(stats) >= maxJobs ) ){ this.shutDownClient(); } else { Int index=0; while (index
af = dc!available(); DCData dd = af.get; return capacity(dd); } Int consumeResource(){ Int cost = defaultScheduleCost(schedule); Time bt = now(); br = this.resource(); Int consume = 0; while (( consume <= cost )) { skip; consume = ( consume + 1 ); } Time at = now(); ar = this.resource(); assert ( ( ( cost == 0 ) || ( ar < br ) ) || ( timeDifference(at, bt) > 0 ) ); return cost; } Unit beginMeasurement(){ if ( ( job != Boot ) ){ Deadline beginning = deadline(); Time beforetime = now(); jd = Just( JobData( schedname(schedule), sched(schedule), durationValue(dline(schedule)), 0, durationValue(beginning), 0, timeValue(beforetime), id ) ); } } Unit executeJob(){ [Deadline : deadline()] this.beginMeasurement(); Fut
fut = client!getId(); clientId = fut.get; this.clientDB(); thread = this.getConnectionThread(); Int cost = defaultScheduleCost(schedule); if ( ( thread != null ) ){ if ( ( job == Boot ) ){ this.becomeState(Booting); thread!command(ListSchedule); await ( schedules != Nil ); this!establishSchedule(); } else { this.becomeState(WorkOnReplicate); thread!command(SearchSchedule( schedname(schedule) )); await ( schedules != Nil ); this!establishSchedule(); await ( start == StartSnapShot ); await ( command == EndSnapShot ); } Fut
sd = client!isShutdownRequested(); Bool shutDown = sd.get; if ( ~ shutDown ){ this.becomeState(WaitToReplicate); } this.nextJob(); if ( ( job != Boot ) ){ Duration d = deadline(); if ( ~ isDurationInfinite(d) ){ deadline = Just( durationValue(d) ); jd = updateJobData(jd, cost, fromJust(deadline), now()); } } } client!finishJob(this, jd); } ClientId forClient(){ return clientId; } Unit shutDownClient(){ Fut
bf = client!isShutdownRequested(); await bf?; Bool bool = bf.get; if ( ~ bool ){ Fut
unit = client!requestShutDown(); await unit?; unit.get; this.becomeState(End); } } Bool registerReplicationItems(TransactionId id){ Fut
reg = db!prepareReplicationItem(id, schedule); Bool rg = reg.get; if ( rg ){ transactionId = id; Fut
u = client!setMaximumTransactionId(id); u.get; } return rg; } Bool hasFile(FileId id){ Fut
he = db!hasFile(id); await he?; return he.get; } Maybe
processFile(FileId id){ Maybe
result = Nothing; Bool hasfile = this.hasFile(id); if ( hasfile ){ Fut
contentf = db!getContent(id); await contentf?; FileContent content = contentf.get; if ( isFile(content) ){ FileSize size = content(content); result = Just( size ); } } return result; } Unit overwrite(File file){ FileId id = fst(file); FileSize size = fileContent(file); Fut
u = db!updateFile(id, size); await u?; } Unit continue(File file){ FileId id = fst(file); FileSize size = fileContent(file); Bool he = this.hasFile(id); FileSize fsize = 0; if ( he ){ Fut
s = db!getContent(fst(file)); await s?; FileContent c = s.get; fsize = content(c); } size = ( size + fsize ); Fut
u = db!updateFile(id, size); await u?; } Unit processContent(File file){ await isAppendCommand(command); if ( ( command == SkipFile ) ){ skip; } else if ( ( command == OverwriteFile ) ){ this.overwrite(file); } else if ( ( command == ContinueFile ) ){ this.continue(file); } } [Atomic] Unit command(Command c){ if ( ( c == StartSnapShot ) ){ start = c; } else { command = c; } } Unit receiveSchedule(Schedules schedules){ schedules = schedules; } Unit nextJob(){ client!nextJob(); } Unit becomeState(State state){ if ( ( state == WaitToBoot ) ){ Fut
unit = client!waitToBoot(); unit.get; } else if ( ( state == Booting ) ){ Fut
unit = client!boot(); unit.get; } else if ( ( state == WorkOnReplicate ) ){ Fut
unit = client!replicate(); unit.get; } else if ( ( state == WaitToReplicate ) ){ Fut
unit = client!waitToReplicate(); unit.get; } else if ( ( state == End ) ){ Fut
unit = client!end(); unit.get; } } } class ConnectionThreadImpl([Far]ClientJob job, [Far]SyncServer server, [Far]Resource res, Int id, Int cost) implements ConnectionThread { SyncServerClientCoordinator coord; Maybe
cmd = Nothing; Schedules schedules = Nil; Int resource(){ DeploymentComponent dc = thisDC(); Fut
af = dc!available(); DCData dd = af.get; return capacity(dd); } Unit consumeResource(Int i){ while (( i > 0 )) { Fut
fr = res!consume(); fr.get; i = ( i - 1 ); } } Unit run(){ Fut
c = server!getCoordinator(); await c?; coord = c.get; await ( cmd != Nothing ); schedules = this.sendSchedule(); if ( ( cmd != Just( ListSchedule ) ) ){ this.consumeResource(1); ReplicationSnapshot snapshot = this.startReplicationUpdate(); Fut
idf = snapshot!getIndexingId(); await idf?; TransactionId tid = idf.get; this.consumeResource(2); Fut
b = job!registerReplicationItems(tid); await b?; Bool register = b.get; Set
> filesets = EmptySet; if ( register ){ Fut
> nis = snapshot!getItems(ssname(fromJust(cmd))); await nis?; Set
newitems = nis.get; filesets = this.registerItems(newitems); } this.consumeResource(2); Fut
rp = job!command(StartSnapShot); await rp?; while (hasNext(filesets)) { Pair
>, Set
> nfs = next(filesets); filesets = fst(nfs); Set
fileset = snd(nfs); this.transferItems(fileset); } this.consumeResource(1); rp = job!command(EndSnapShot); await rp?; this.consumeResource(1); this.finishReplicationUpdate(); } } Schedules sendSchedule(){ assert isJust(cmd); Schedules results = Nil; if ( ( cmd == Just( ListSchedule ) ) ){ Fut
ssf = server!listSchedules(); await ssf?; results = ssf.get; } else { Fut
ssf = server!getSchedule(ssname(fromJust(cmd))); await ssf?; Schedule s = ssf.get; results = Cons( s, results ); } Fut
rp = job!receiveSchedule(results); await rp?; return results; } ClientId forClient(){ Fut
id = job!forClient(); return id.get; } [Atomic] Unit command(Command c){ cmd = Just( c ); } Set
> registerItems(Set
items){ Set
> regs = EmptySet; while (hasNext(items)) { Pair
, ServerReplicationItem> nis = next(items); items = fst(nis); ServerReplicationItem item = snd(nis); Fut
entryf = item!getContents(); await entryf?; FileEntry entry = entryf.get; Set
result = EmptySet; Set
ids = getFileIdFromEntries1(entry); while (hasNext(ids)) { Pair
, FileId> nids = next(ids); FileId id = snd(nids); Maybe
content = getFromEntry(entry, id); result = Insert( Pair( id, fromJust(content) ), result ); ids = fst(nids); } regs = Insert( result, regs ); } return regs; } Unit transferItems(Set