|
@@ -6,9 +6,9 @@ using Gee;
|
|
|
|
|
|
namespace LibPeer.Protocols.Stp.Sessions {
|
|
|
|
|
|
- const int SEGMENT_PAYLOAD_SIZE = 14000;
|
|
|
+ const int SEGMENT_PAYLOAD_SIZE = 16384;
|
|
|
const int METRIC_WINDOW_SIZE = 4;
|
|
|
- const int64 MAX_WINDOW_SIZE = 9223372036854775808;
|
|
|
+ const int64 MAX_WINDOW_SIZE = 1024;
|
|
|
|
|
|
public class EgressSession : Session {
|
|
|
|
|
@@ -22,8 +22,9 @@ namespace LibPeer.Protocols.Stp.Sessions {
|
|
|
protected AsyncQueue<Payload> payload_queue = new AsyncQueue<Payload>();
|
|
|
|
|
|
private int redundant_resends = 0;
|
|
|
+ private int resends = 0;
|
|
|
private uint64 window_size = METRIC_WINDOW_SIZE;
|
|
|
- private uint64 best_ping = 10000;
|
|
|
+ private uint64 average_ping = 0;
|
|
|
private uint64 worst_ping = 0;
|
|
|
private int64 adjustment_delta = 0;
|
|
|
private uint64 last_send = 0;
|
|
@@ -34,7 +35,7 @@ namespace LibPeer.Protocols.Stp.Sessions {
|
|
|
|
|
|
public EgressSession(InstanceReference target, uint8[] session_id, uint64 ping) {
|
|
|
base(target, session_id, ping);
|
|
|
- best_ping = ping;
|
|
|
+ average_ping = ping;
|
|
|
worst_ping = ping;
|
|
|
open = true;
|
|
|
}
|
|
@@ -71,6 +72,7 @@ namespace LibPeer.Protocols.Stp.Sessions {
|
|
|
private int64 window_time = 0;
|
|
|
private int64 last_window_time = 0;
|
|
|
private uint64 last_window_size = 0;
|
|
|
+ private int last_resends = 0;
|
|
|
|
|
|
private void handle_acknowledgement(Acknowledgement segment) {
|
|
|
// Is this segment still in-flight?
|
|
@@ -94,40 +96,30 @@ namespace LibPeer.Protocols.Stp.Sessions {
|
|
|
// What was the time difference?
|
|
|
var round_trip = (get_monotonic_time()/1000) - segment.timing;
|
|
|
|
|
|
- // Are we currently at metric window size?
|
|
|
- // if(window_size == METRIC_WINDOW_SIZE) {
|
|
|
- // Yes, add round trip time to the list
|
|
|
- segment_trips.add(round_trip);
|
|
|
-
|
|
|
- // Do we have a sample?
|
|
|
- if(segment_trips.size >= METRIC_WINDOW_SIZE) {
|
|
|
- var current_window_time = get_monotonic_time() - window_time;
|
|
|
- // Update the ping based on the average of the metric segments
|
|
|
- uint64 average = segment_trips[0];
|
|
|
- foreach (var ping in segment_trips) {
|
|
|
- average = (average + ping)/2;
|
|
|
- }
|
|
|
-
|
|
|
- var last = (last_window_size / (double)uint64.max(last_window_time, 1));
|
|
|
- var current = (window_size / (double)uint64.max(current_window_time, 1));
|
|
|
-
|
|
|
- last_window_size = window_size;
|
|
|
+ segment_trips.add(round_trip);
|
|
|
+ // Do we have a sample?
|
|
|
+ if(segment_trips.size >= window_size) {
|
|
|
+ var current_window_time = get_monotonic_time() - window_time;
|
|
|
+ // Update the ping based on the average of the metric segments
|
|
|
+ average_ping = segment_trips[0];
|
|
|
+ foreach (var ping in segment_trips) {
|
|
|
+ average_ping = (average_ping + ping)/2;
|
|
|
+ }
|
|
|
|
|
|
- adjust_window_size(last, current);
|
|
|
+ var last = (last_window_time / (double)uint64.max(last_window_size, 1)) * last_resends;
|
|
|
+ var current = (current_window_time / (double)uint64.max(window_size, 1)) * resends;
|
|
|
|
|
|
+ last_window_size = window_size;
|
|
|
|
|
|
+ adjust_window_size(last, current);
|
|
|
|
|
|
+ segment_trips.clear();
|
|
|
+ last_window_time = current_window_time;
|
|
|
+ last_resends = resends;
|
|
|
+ window_time = get_monotonic_time();
|
|
|
+ }
|
|
|
|
|
|
- best_ping = uint64.min(best_ping, average);
|
|
|
- worst_ping = uint64.max(worst_ping, average);
|
|
|
- segment_trips.clear();
|
|
|
- last_window_time = current_window_time;
|
|
|
- window_time = get_monotonic_time();
|
|
|
- }
|
|
|
- // }
|
|
|
- // else {
|
|
|
- // adjust_window_size(round_trip);
|
|
|
- // }
|
|
|
+ worst_ping = uint64.max(worst_ping, round_trip);
|
|
|
|
|
|
}
|
|
|
|
|
@@ -142,14 +134,15 @@ namespace LibPeer.Protocols.Stp.Sessions {
|
|
|
}
|
|
|
|
|
|
// Calculate a maximum time value for segments eligable to be resent
|
|
|
- uint64 max_time = (get_monotonic_time()/1000) - (uint64)(worst_ping + (redundant_resends * 10));
|
|
|
+ uint64 max_time = (get_monotonic_time()/1000) - (uint64)(average_ping + (redundant_resends * window_size));
|
|
|
|
|
|
// Do we have any in-flight segments to resend?
|
|
|
foreach (var segment in in_flight.values) {
|
|
|
// Is the segment timing value less than the max time?
|
|
|
if(segment.timing != 0 && segment.timing < max_time) {
|
|
|
- print(@"***Resend segment $(segment.sequence_number)\n");
|
|
|
+ // print(@"***Resend segment $(segment.sequence_number)\n");
|
|
|
// Resend it
|
|
|
+ resends ++;
|
|
|
segment.reset_timing();
|
|
|
queue_segment(segment);
|
|
|
break;
|
|
@@ -165,33 +158,36 @@ namespace LibPeer.Protocols.Stp.Sessions {
|
|
|
}
|
|
|
|
|
|
private void adjust_window_size(double last, double current) {
|
|
|
- // print(@"current = $(current); compare = $(last);\n");
|
|
|
|
|
|
if(last > current) {
|
|
|
- print("\t++\n");
|
|
|
- if(adjustment_delta < 0) {
|
|
|
+ if(adjustment_delta <= 0) {
|
|
|
adjustment_delta = 1;
|
|
|
}
|
|
|
- else {
|
|
|
- adjustment_delta ++;
|
|
|
+ else if(adjustment_delta < int64.MAX/2) {
|
|
|
+ adjustment_delta *= 2;
|
|
|
}
|
|
|
}
|
|
|
else if(current > last) {
|
|
|
- print("\t --\n");
|
|
|
- if(adjustment_delta > 0) {
|
|
|
+ if(adjustment_delta >= 0) {
|
|
|
adjustment_delta = -1;
|
|
|
}
|
|
|
- else {
|
|
|
- adjustment_delta --;
|
|
|
+ else if(adjustment_delta > int64.MIN/2) {
|
|
|
+ adjustment_delta *= 2;
|
|
|
}
|
|
|
}
|
|
|
else {
|
|
|
- print("\t ==\n");
|
|
|
adjustment_delta = 0;
|
|
|
}
|
|
|
- adjustment_delta = int64.min(adjustment_delta, 4);
|
|
|
- adjustment_delta = int64.max(adjustment_delta, -4);
|
|
|
- window_size += adjustment_delta;
|
|
|
+
|
|
|
+ if(adjustment_delta < 0 && window_size < (adjustment_delta * -1)) {
|
|
|
+ window_size = 0;
|
|
|
+ }
|
|
|
+ else if(adjustment_delta > 0 && (uint64.MAX - window_size) < adjustment_delta ) {
|
|
|
+ window_size = uint64.MAX;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ window_size += adjustment_delta;
|
|
|
+ }
|
|
|
|
|
|
// Is the window size now less than the metric size?
|
|
|
if(window_size < METRIC_WINDOW_SIZE) {
|
|
@@ -203,7 +199,7 @@ namespace LibPeer.Protocols.Stp.Sessions {
|
|
|
// Yes, cap it
|
|
|
window_size = MAX_WINDOW_SIZE;
|
|
|
}
|
|
|
- print(@"WINDOW SIZE: $(window_size)\n");
|
|
|
+ // print(@"WINDOW SIZE: $(window_size), ADJUSTMENT_DELTA: $(adjustment_delta)\n");
|
|
|
}
|
|
|
|
|
|
protected override void close_session(string reason) {
|
|
@@ -242,7 +238,7 @@ namespace LibPeer.Protocols.Stp.Sessions {
|
|
|
segment_trackers.set(next_sequence_number, tracker);
|
|
|
tracker.add_segment();
|
|
|
int payload_size = int.min(data.length, (i+1)*SEGMENT_PAYLOAD_SIZE);
|
|
|
- print(@"data.length: $(data.length); i: $(i); SEGMENT_PAYLOAD_SIZE: $(SEGMENT_PAYLOAD_SIZE); payload_size: $(payload_size)\n");
|
|
|
+ // print(@"data.length: $(data.length); i: $(i); SEGMENT_PAYLOAD_SIZE: $(SEGMENT_PAYLOAD_SIZE); payload_size: $(payload_size)\n");
|
|
|
payload_queue.push(new Payload(next_sequence_number, data[i*SEGMENT_PAYLOAD_SIZE:payload_size].copy()));
|
|
|
next_sequence_number ++;
|
|
|
}
|