File size: 7,923 Bytes
b6a38d7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 |
-- Large Data Transfers (Hogs)
-- There is one hog per socket which is progressivelly transfered. Starting another transfer operation cancels the previous one.
-- Other rfn can be called while the hog is being transfered.
-- The transfer is initiated with SendHog(hog) and cancelled with SendHogCancel().
-- The send status can be checked with SendHogStatus() which returns hog, confirmed_transfer.
-- The status of the receive hog is obtained with ReceiveHogStatus() which returns hog, total_hog_size. Note that during the transfer
-- the hog returned is a table whith the hog chunks. hog.size contains the total length of the received chunks
-- Since the size of these hogs can be excessive, SendHogCancel is called immediatelly after the send callback.
-- Therefore only rfn calls made in the callback can rely on the hog being present and complete on the receiving end.
-- Note that calling SendHogCancel clears the remote hog as well.
HogChunksInAdvance = 4
HogChunkSize = 32 * 1024
HogChunkTimeout = 10000 -- chunk confirm timeout
DefineClass.DataSocket = {
__parents = { "MessageSocket" },
hog_download = false,
hog_download_total = -1,
hog_download_signal = false,
hog_download_data = false,
hog_download_thread = false,
hog_download_timeout = false,
hog_download_monitor = false,
hog_upload = false,
hog_upload_confirmed = -1,
hog_upload_callback = false,
hog_upload_signal = false,
hog_upload_thread = false,
hog_upload_data = false,
hog_upload_timeout = false,
hog_upload_monitor = false,
}
function DataSocket:new(object)
object = MessageSocket.new(self, object)
object.hog_download_signal = {}
object.hog_upload_signal = {}
return object
end
function DataSocket:SendHog(hog, sent_callback)
if self.hog_upload then
assert(false, "Hog sending still in progress!")
return
end
if type(hog) ~= "string" then
assert(false, "Trying to send a hog of type " .. type(hog))
return
end
self:Log("Uploading hog size", #hog)
self.hog_upload = hog
self.hog_upload_confirmed = 0
self.hog_upload_callback = sent_callback or false
self:Send("rfnHogStart", #hog)
for i = 0, HogChunksInAdvance - 1 do
if i * HogChunkSize <= #hog then
self:Send("rfnHogData", string.sub(hog, 1 + i * HogChunkSize, (i + 1) * HogChunkSize))
end
end
DeleteThread(self.hog_upload_monitor)
self.hog_upload_monitor = CreateRealTimeThread(function()
self.hog_upload_timeout = RealTime() + HogChunkTimeout
while self.hog_upload do
local timout_after = self.hog_upload_timeout - RealTime()
if timout_after <= 0 then
self:StopUpload("timeout")
break
end
Sleep(timout_after)
end
end)
return true
end
function DataSocket:SendHogCancel(dont_notify)
if self.hog_upload then
self:Log("Hog upload stopped")
self.hog_upload = false
self.hog_upload_confirmed = -1
self.hog_upload_callback = false
if not dont_notify then
self:Send("rfnSendHogCancel")
end
end
end
function DataSocket:ReceiveHogCancel(dont_notify)
if self.hog_download then
self:Log("Hog download stopped")
self.hog_download = false
self.hog_download_total = -1
if not dont_notify then
self:Send("rfnReceiveHogCancel")
end
end
end
function DataSocket:SendHogStatus()
return self.hog_upload, self.hog_upload_confirmed
end
function DataSocket:ReceiveHogStatus()
return self.hog_download, self.hog_download_total
end
function DataSocket:rfnHogConfirm(size)
if not self.hog_upload then
return
end
self.hog_upload_timeout = RealTime() + HogChunkTimeout
assert(self.hog_upload_confirmed + HogChunkSize == size or #self.hog_upload == size)
local delta = HogChunkSize * HogChunksInAdvance
if self.hog_upload_confirmed + delta < #self.hog_upload then
self:Send("rfnHogData", string.sub(self.hog_upload, 1 + self.hog_upload_confirmed + delta, size + delta))
end
self.hog_upload_confirmed = size
if self.hog_upload_confirmed == #self.hog_upload then
if self.hog_upload_callback then
self.hog_upload_callback(self, self)
end
self:SendHogCancel()
end
end
function DataSocket:rfnHogStart(size)
self:Log("Hog download started", size)
self.hog_download = { size = 0 }
self.hog_download_total = size
DeleteThread(self.hog_download_monitor)
self.hog_download_monitor = CreateRealTimeThread(function()
self.hog_download_timeout = RealTime() + HogChunkTimeout
while self.hog_download do
local timout_after = self.hog_download_timeout - RealTime()
if timout_after <= 0 then
self:StopDownload("timeout")
break
end
Sleep(timout_after)
end
end)
end
function DataSocket:rfnHogData(data)
local hog = self.hog_download
if type(hog) ~= "table" then
return
end
self.hog_download_timeout = RealTime() + HogChunkTimeout
hog[#hog + 1] = data
hog.size = hog.size + #data
self:Send("rfnHogConfirm", hog.size)
if hog.size == self.hog_download_total then
self.hog_download = table.concat(hog)
assert(#self.hog_download == self.hog_download_total)
end
end
function DataSocket:rfnSendHogCancel()
self:StopDownload("cancelled")
end
function DataSocket:rfnReceiveHogCancel()
self:StopUpload("cancelled")
end
-- HOG UPLOAD HELPERS ------------------------------------------------------------------------
function DataSocket:WaitUpload(data, upload_server_handler, ...)
if not self:IsConnected() then
return "disconnected"
end
if IsValidThread(self.hog_upload_thread) then
assert(false, "another upload in progress!")
return "busy"
end
self.hog_upload_data = false
self.hog_upload_thread = CurrentThread()
local handler_params = pack_params(...)
local started = self:SendHog(data, function()
self:Send("rfnHogUploadEnd", upload_server_handler, unpack_params(handler_params))
end)
if not started then
assert(false, "Upload not started!")
return "failed"
end
local ok, local_error = WaitMsg(self.hog_upload_signal)
local upload_result = self.hog_upload_data
self.hog_upload_data = false
self.hog_upload_thread = false
if not upload_result then
return local_error or "failed"
end
return unpack_params(upload_result)
end
function DataSocket:rfnHogUploadEnd(...)
self.hog_upload_data = pack_params(...) or {}
self:StopUpload()
end
function DataSocket:StopUpload(error)
self:SendHogCancel(not error)
Msg(self.hog_upload_signal, error)
end
function DataSocket:UploadProgress()
local data, progress = self:SendHogStatus()
if data or IsValidThread(self.hog_upload_thread) then
return data and progress * 100 / #data or 100 or 0
end
end
-- HOG DOWNLOAD HELPERS ------------------------------------------------------------------------
function DataSocket:WaitDownload(download_server_handler, ...)
if not self:IsConnected() then
return "disconnected"
end
if IsValidThread(self.hog_download_thread) then
assert(false, "another download in progress!")
return "busy"
end
self.hog_download_thread = CurrentThread()
self.hog_download_data = false
local error = self:Call("rfnHogDownloadStart", download_server_handler, ...)
if error then
return error
end
local ok, error = WaitMsg(self.hog_download_signal)
local data = self.hog_download_data
self.hog_download_data = false
self.hog_download_thread = false
if not data then
return error or "failed"
end
return unpack_params(data)
end
function DataSocket:DownloadProgress()
local data, total = self:ReceiveHogStatus()
if data or IsValidThread(self.hog_download_thread) then
return type(data) == "table" and data.size * 100 / total or type(data) == "string" and 100 or 0
end
end
function DataSocket:StopDownload(error)
self:ReceiveHogCancel(not error)
Msg(self.hog_download_signal, error)
end
function DataSocket:rfnHogDownloadEnd()
local data, size = self:ReceiveHogStatus()
assert(data and #data == size)
self.hog_download_data = data
self:StopDownload()
end
function DataSocket:OnDisconnect(reason)
self:StopUpload("disconnected")
self:StopDownload("disconnected")
MessageSocket.OnDisconnect(self, reason)
end |