Fibres¶
General Description¶
The Felix fibration system is implemented by service calls into the run time library.
These calls are basically:
- mk_ioschannel_pair[T]
- read channel
- write (channel, value)
- spawn_fthread
- suicide
- run
Channel construction¶
A channel is a single object however typically we might say:
var inp, out = mk_ioschannel_pair[int]();
to create a channel object on the heap, and then cast a pointer to to the object to ischannel[int] and a copy to oschannel[int]. These types can be abbreviated %<int and %>int respectively.
Spawning a Fibre¶
To create a fibre, we just call:
spawn_fthread p;
where p is any unit procedure to create a fibre of control which starts at the entry point of p.
Termination¶
A fibre terminates when its initial coroutine returns, it calls suicide(), starves or blocks.
Starvation¶
A fibre starves if it is suspended on a channel which is unable to be written. What this means is that no other active procedure owns the pointer to the channel, in other words, the channel is unreachable. In this case, the fibre evaporates automatically because only active fibres are known to the scheduler, and fibres are otherwise anonymous. Thus the continuation and thus all the stack frames of the fibre are unreachable and can be reaped by the garbage collector.
Thus, fibres cannot deadlock, because if they do they no longer exist. Starvation is equivalent to suicide.
Blockage¶
A fibre blocks if it is suspended on channel which is unable to be read. A with starvation, blocking is equivalent to suicide.
Note if a procedure stack contains a channel, or data structures which make the channel reachable, then the channel is considered accessible, even if the procedure has no control path which will lead to an I/O attempt on it. For this reason channels should be forgotten except by those using them.
Contrarily, when a channel is reachable and a fibres is suspended on it, if the procedure which can reach it never does so, that is called a livelock.
Constructing a new Scheduler¶
Felix top level mainline code forms a coroutine which the system spawns automatically. However you can also create a sub-scheduler with the procedure call
run p;
run is a subroutine, it creates a new scheduler object, spawns p on that scheduler, and runs the scheduler until there are no active fibres left on that scheduler.
Note that if p itself spawns new fibres they will become active on the same scheduler as p, however, fibres can migrate between schedulers.
Example¶
Here is a simple example.
proc example () {
var inp,out = mk_ioschannel_pair[int]();
spawn_fthread {
for i in 0..9 perform write (out,i);
};
spawn_fthread {
repeat perform println$ read inp;
};
}
example();
println$ "Done";
In the example, we create a channel with a read and write endpoint, and then spawn two fibres. The first one writes 10 numbers and suicides by returning. The seccond one reads 10 numbers and prints them, then suicides by starvation.
It is important to note that the abstract logic does not specify when the Done is printed. After a spawn, both the spawner and spawnee are active. After a read and write match up, both the reader and writer are active. The implementation is free to choose which of all the active fibres to run next. However Felix runs the spawnee before the spawner, and it runs the reader before the writer, so the Done will actually print last.
The other very important thing to note is that the example procedure knows the channels being used, however the channel endpoints are stored in its stack frame, which will become unreachable when example returns. Thus, only the reader and writer will have access to the channel at that time, and once the writer has terminated that leaves the reader blocked: it is trying to read from a channel which no active fibre can write on. This, the reader becomes unreachable, and so when the mainline terminates the program is finished.
Binding Channels With HOFs.¶
A better way to write the code above is to use Higher Order Functions (HOFs).
fun make() = {
typedef r_t = (inp: %<int);
typedef w_t = (out: %>int);
proc writer (x: w_t) () {
for i in 0..9 perform write (x.out,i);
};
proc reader (y: r_t) () {
repeat perform println$ read y.inp;
};
var i,o = mk_ioschannel_pair[int]();
return reader (inp=i), writer (out=o);
}
proc example () {
var r,w = make();
spawn_fthread r;
spawn_fthread w;
}
example();
println$ "Done";
Here the reader and writer are functions which take a record argument whose fields are the required channels and return a unit procedure.
Syntactic Supoport¶
The protocol above is supported by special syntax:
chip writer
connector x
pin out: %>int
{
for i in 0..9 perform write (x.out,i);
}
chip reader
connector y
pin inp: %<int
{
repeat perform println$ read y.inp;
}
circuit
connect writer.out, reader.inp
endcircuit
println$ "Done";
The chip constructions above are exactly the same as the procedures in the previous example. The connectors are the record parameters, the pins are the fields of the record.
The circuit statement constructs the channels required to connect the pins automatically, binds them to the parameters, and then spawns the resulting unit procedures as fibres.
Sources, Sinks, and Transducers¶
What is important to note here is that connectors can have any number of pins. Coroutines are not restricted to using one communication channel.
The writer above, with a single output pin, is called a source. The reader above, with a single input pin, is called a sink. And the following shows a transducer:
chip squareit
connector x
pin inp: %<int
pin out: %>int
{
repeat do
var i = read x.inp;
write (x.out, i*i);
done
}
circuit
connect writer.out, squareit.inp
connect squareit.out, reader.inp
endcircuit
Pipelines¶
When you run a set of coroutines starting with a source, followed by a sequence of transducers, and terminated by a sink, the construction is called a closed pipeline and is precisely a unit procedure.
There are special operators to simplify pipeline construction:
var pipeline = writer |-> squareit |-> reader;
pipeline ();
Pipelines can also be open, if there is no source at the beginning and no sink at the end, or half open, where there is a source at the start but no sink at the end, or no source at the start but a sink at the end.
In fact the pipeline operator is associative:
LHS | RHS | Result |
---|---|---|
Source | Sink | Closed Pipline |
Source | Transducer | Source |
Transducer | Transducer | Transducer |
Transducer | Sink | Sink |
in particular for any legitimate combination:
a |-> b |-> c
(a |-> b) |-> c
a |-> (b |-> c)
are equivalent.
Library Chips¶
We can simplify our code again by using standard library chips. Here is the whole program again:
proc readit (y:int) { println$ y; }
gen writeit () : opt[int] = {
for i in 0..9 perform yield Some i;
return None[int];
}
fun squareit (x:int) => x * x;
var pipeline = iterate writeit |-> function squareit |-> procedure readit;
pipeline ();
println$ "Done";
The iterate chip is an adaptor that accepts an iterator and produces a source.
The function chip is an adaptor that accepts a function and proceduces a transducer.
The procedure chip is an adaptor that accepts a procedure with one argument and produces a sink.
Here’s another example:
run (
iterate (1,2,3).iterator |->
function (fun (x:int) => x * x) |->
procedure (proc (x:int) { println$ x; })
);
which prints the squares of the values of an array 1,2,3 in a single line by using anonymous functions and the standard iterator method for arrays.
More Library Chips¶
sink¶
Universal sink. Reads input forever.
chip sink[T]
connector io
pin inp : %<T
{
while true do
var x = read (io.inp);
C_hack::ignore (x);
done
}
source¶
Writes fixed value forever.
chip source[T] (a:T)
connector io
pin out: %>T
{
while true do
write (io.out, a);
done
}
generator¶
Writes values acquired from a generator.
chip generator[T] (g: 1->T)
connector io
pin out: %>T
{
repeat perform write (io.out, g());
}
iterate¶
Writes values acquired from an iterator, terminates when and if iterator becomes exhausted.
chip iterate[T] (g: 1->opt[T])
connector io
pin out: %>T
{
again:>
var x = g();
match x with
| Some v =>
write (io.out, v);
goto again;
| None => ;
endmatch;
}
source_from_list¶
A specialised source which writes the values of a list. Terminates at the end of the list.
chip source_from_list[T] (a:list[T])
connector io
pin out: %>T
{
for y in a perform write (io.out,y);
}
bound_source_from_list¶
Writes Some x, for each x in the list, then writes an infinite tail of None.
chip bound_source_from_list[T] (a:list[T])
connector io
pin out: %>opt[T]
{
for y in a perform write (io.out,Some y);
while true perform write (io.out,None[T]);
}
function¶
Function adaptor. Converts a function to transducer. Repeatedly reads input, writes result of applying function to it.
chip function[D,C] (f:D->C)
connector io
pin inp: %<D
pin out: %>C
{
while true do
var x = read io.inp;
var y = f x;
write (io.out, y);
done
}
procedure¶
Procedure adaptor. Converts a procedure taking one argument to a sink.
chip procedure[D] (p:D->0)
connector io
pin inp: %<D
{
while true do
var x = read io.inp;
p x;
done
}
filter¶
Convert a predicate and function to a transducer. Reads value from input, applies function to it, and writes result if it satisfies the predicate. Note the predicate applies to the output of the function, not the input to it.
chip filter[D,C] (c:D->bool) (f:D->C)
connector io
pin inp: %<D
pin out: %>C
{
while true do
var x = read io.inp;
if c x do
write (io.out, f x);
done
done
}
filter¶
A variant of the two argument filter which reads a value, applies the function to it, and checks the resulting option type. If Some v is returned, writes v, if None is returned does not write anything.
chip filter[D,C] (f:D->opt[C])
connector io
pin inp: %<D
pin out: %>C
{
while true do
var x = read io.inp;
match f x with
| Some y => write (io.out, y);
| None => ;
endmatch;
done
}
sink_to_list¶
This chip accepts a pointer to a variable containing a list. Each value read is prepended to the list.
chip sink_to_list[T] (p: &list[T])
connector io
pin inp : %<T
{
while true do
var x = read (io.inp);
p <- Cons (x,*p);
done
}
sink_to_unique_list¶
A variant of sink_to_list for which the value is prepended to the list if, and only if, it is not already in the list.
chip sink_to_unique_list[T with Eq[T]] (p: &list[T])
connector io
pin inp : %<T
{
while true do
var x = read (io.inp);
if not (x in *p) perform
p <- Cons (x,*p)
;
done
}
buffer¶
A single value buffer, equivalent to a function adaptor passed the identity function.
chip buffer [T]
connector io
pin inp: %<T
pin out: %>T
{
while true do
var x = read io.inp;
write (io.out, x);
done
}
dup¶
Copies input to two outputs.
chip dup [T]
connector io
pin inp: %<T
pin out1: %>T
pin out2: %>T
{
while true do
var x = read io.inp;
write (io.out1, x);
write (io.out2, x);
done
}
debug_buffer¶
A variant of a buffer which also prints diagnostics before reading, after reading and before writing, and after writing.
chip debug_buffer [T with Str[T]] (tag:string)
connector io
pin inp: %<T
pin out: %>T
{
while true do
println$ "Debug buffer [" + tag + "] READ";
var x = read io.inp;
println$ "Debug buffer [" + tag + "] read " + x.str;
write (io.out, x);
println$ "Debug buffer [" + tag + "] written " + x.str;
done
}
oneshot¶
A one shot buffer. Reads one value and writes it, then terminates.
chip oneshot [T]
connector io
pin inp: %<T
pin out: %>T
{
var x = read io.inp;
write (io.out, x);
}
store¶
Repeatedly stores read values into a variable.
chip store[T] (p:&T)
connector io
pin inp: %<T
{
while true do
var x = read io.inp;
p <- x;
done
}
fetch¶
Repeatedly writes the ccurrent value of a variable.
chip fetch[T] (p:&T)
connector io
pin out: %>T
{
while true do
write (io.out, *p);
done
}
debug_sink¶
Writes input to standard output.
chip debug_sink [T with Str[T]] (s:string)
connector io
pin inp: %<T
{
while true do
var x = read io.inp;
println$ "Debug sink ["+s+"] "+x.str;
done
}
latch¶
Satisfies all reads on its output channel with the last value read on the input channel. Blocks readers until at least one value is read from its input channel.
chip latch[T]
connector io
pin inp: %<T
pin out: %>T
{
var x = read io.inp;
device w = fetch &x;
device r = store &x;
circuit
wire io.inp to r.inp
wire io.out to w.out
endcircuit
}
Duplex Channels¶
A duplex channel can be used to first send data of type D from one coroutine to another, and then have the second coroutine send data of type C back along the same channel.
This protocol emulates a standard function call where D is the domain of the function and C the codomain. It can be done with two monotyped half-duplex channels as well: using a duplex channel saves one heap allocation and enforces the subroutine call protocol.
Session Typed Channels¶
Underneath, channels are untyped, and I/O operations transfer a single machine address. Therefore, with casts, you can read and write a pointer to any data type safely provided the read and write agree on the type.
Type systems have been developed, called session types which can be used to statically enforce agreement on the type of data being communicated, where the type varies over time, however Felix currently does not support any session types other than duplex channels.