myspace / CommonLua /Libs /Network /DataSocket.lua
sirnii's picture
Upload 1816 files
b6a38d7 verified
raw
history blame
7.92 kB
-- Large Data Transfers (Hogs)
-- There is one hog per socket which is progressivelly transfered. Starting another transfer operation cancels the previous one.
-- Other rfn can be called while the hog is being transfered.
-- The transfer is initiated with SendHog(hog) and cancelled with SendHogCancel().
-- The send status can be checked with SendHogStatus() which returns hog, confirmed_transfer.
-- The status of the receive hog is obtained with ReceiveHogStatus() which returns hog, total_hog_size. Note that during the transfer
-- the hog returned is a table whith the hog chunks. hog.size contains the total length of the received chunks
-- Since the size of these hogs can be excessive, SendHogCancel is called immediatelly after the send callback.
-- Therefore only rfn calls made in the callback can rely on the hog being present and complete on the receiving end.
-- Note that calling SendHogCancel clears the remote hog as well.
HogChunksInAdvance = 4
HogChunkSize = 32 * 1024
HogChunkTimeout = 10000 -- chunk confirm timeout
DefineClass.DataSocket = {
__parents = { "MessageSocket" },
hog_download = false,
hog_download_total = -1,
hog_download_signal = false,
hog_download_data = false,
hog_download_thread = false,
hog_download_timeout = false,
hog_download_monitor = false,
hog_upload = false,
hog_upload_confirmed = -1,
hog_upload_callback = false,
hog_upload_signal = false,
hog_upload_thread = false,
hog_upload_data = false,
hog_upload_timeout = false,
hog_upload_monitor = false,
}
function DataSocket:new(object)
object = MessageSocket.new(self, object)
object.hog_download_signal = {}
object.hog_upload_signal = {}
return object
end
function DataSocket:SendHog(hog, sent_callback)
if self.hog_upload then
assert(false, "Hog sending still in progress!")
return
end
if type(hog) ~= "string" then
assert(false, "Trying to send a hog of type " .. type(hog))
return
end
self:Log("Uploading hog size", #hog)
self.hog_upload = hog
self.hog_upload_confirmed = 0
self.hog_upload_callback = sent_callback or false
self:Send("rfnHogStart", #hog)
for i = 0, HogChunksInAdvance - 1 do
if i * HogChunkSize <= #hog then
self:Send("rfnHogData", string.sub(hog, 1 + i * HogChunkSize, (i + 1) * HogChunkSize))
end
end
DeleteThread(self.hog_upload_monitor)
self.hog_upload_monitor = CreateRealTimeThread(function()
self.hog_upload_timeout = RealTime() + HogChunkTimeout
while self.hog_upload do
local timout_after = self.hog_upload_timeout - RealTime()
if timout_after <= 0 then
self:StopUpload("timeout")
break
end
Sleep(timout_after)
end
end)
return true
end
function DataSocket:SendHogCancel(dont_notify)
if self.hog_upload then
self:Log("Hog upload stopped")
self.hog_upload = false
self.hog_upload_confirmed = -1
self.hog_upload_callback = false
if not dont_notify then
self:Send("rfnSendHogCancel")
end
end
end
function DataSocket:ReceiveHogCancel(dont_notify)
if self.hog_download then
self:Log("Hog download stopped")
self.hog_download = false
self.hog_download_total = -1
if not dont_notify then
self:Send("rfnReceiveHogCancel")
end
end
end
function DataSocket:SendHogStatus()
return self.hog_upload, self.hog_upload_confirmed
end
function DataSocket:ReceiveHogStatus()
return self.hog_download, self.hog_download_total
end
function DataSocket:rfnHogConfirm(size)
if not self.hog_upload then
return
end
self.hog_upload_timeout = RealTime() + HogChunkTimeout
assert(self.hog_upload_confirmed + HogChunkSize == size or #self.hog_upload == size)
local delta = HogChunkSize * HogChunksInAdvance
if self.hog_upload_confirmed + delta < #self.hog_upload then
self:Send("rfnHogData", string.sub(self.hog_upload, 1 + self.hog_upload_confirmed + delta, size + delta))
end
self.hog_upload_confirmed = size
if self.hog_upload_confirmed == #self.hog_upload then
if self.hog_upload_callback then
self.hog_upload_callback(self, self)
end
self:SendHogCancel()
end
end
function DataSocket:rfnHogStart(size)
self:Log("Hog download started", size)
self.hog_download = { size = 0 }
self.hog_download_total = size
DeleteThread(self.hog_download_monitor)
self.hog_download_monitor = CreateRealTimeThread(function()
self.hog_download_timeout = RealTime() + HogChunkTimeout
while self.hog_download do
local timout_after = self.hog_download_timeout - RealTime()
if timout_after <= 0 then
self:StopDownload("timeout")
break
end
Sleep(timout_after)
end
end)
end
function DataSocket:rfnHogData(data)
local hog = self.hog_download
if type(hog) ~= "table" then
return
end
self.hog_download_timeout = RealTime() + HogChunkTimeout
hog[#hog + 1] = data
hog.size = hog.size + #data
self:Send("rfnHogConfirm", hog.size)
if hog.size == self.hog_download_total then
self.hog_download = table.concat(hog)
assert(#self.hog_download == self.hog_download_total)
end
end
function DataSocket:rfnSendHogCancel()
self:StopDownload("cancelled")
end
function DataSocket:rfnReceiveHogCancel()
self:StopUpload("cancelled")
end
-- HOG UPLOAD HELPERS ------------------------------------------------------------------------
function DataSocket:WaitUpload(data, upload_server_handler, ...)
if not self:IsConnected() then
return "disconnected"
end
if IsValidThread(self.hog_upload_thread) then
assert(false, "another upload in progress!")
return "busy"
end
self.hog_upload_data = false
self.hog_upload_thread = CurrentThread()
local handler_params = pack_params(...)
local started = self:SendHog(data, function()
self:Send("rfnHogUploadEnd", upload_server_handler, unpack_params(handler_params))
end)
if not started then
assert(false, "Upload not started!")
return "failed"
end
local ok, local_error = WaitMsg(self.hog_upload_signal)
local upload_result = self.hog_upload_data
self.hog_upload_data = false
self.hog_upload_thread = false
if not upload_result then
return local_error or "failed"
end
return unpack_params(upload_result)
end
function DataSocket:rfnHogUploadEnd(...)
self.hog_upload_data = pack_params(...) or {}
self:StopUpload()
end
function DataSocket:StopUpload(error)
self:SendHogCancel(not error)
Msg(self.hog_upload_signal, error)
end
function DataSocket:UploadProgress()
local data, progress = self:SendHogStatus()
if data or IsValidThread(self.hog_upload_thread) then
return data and progress * 100 / #data or 100 or 0
end
end
-- HOG DOWNLOAD HELPERS ------------------------------------------------------------------------
function DataSocket:WaitDownload(download_server_handler, ...)
if not self:IsConnected() then
return "disconnected"
end
if IsValidThread(self.hog_download_thread) then
assert(false, "another download in progress!")
return "busy"
end
self.hog_download_thread = CurrentThread()
self.hog_download_data = false
local error = self:Call("rfnHogDownloadStart", download_server_handler, ...)
if error then
return error
end
local ok, error = WaitMsg(self.hog_download_signal)
local data = self.hog_download_data
self.hog_download_data = false
self.hog_download_thread = false
if not data then
return error or "failed"
end
return unpack_params(data)
end
function DataSocket:DownloadProgress()
local data, total = self:ReceiveHogStatus()
if data or IsValidThread(self.hog_download_thread) then
return type(data) == "table" and data.size * 100 / total or type(data) == "string" and 100 or 0
end
end
function DataSocket:StopDownload(error)
self:ReceiveHogCancel(not error)
Msg(self.hog_download_signal, error)
end
function DataSocket:rfnHogDownloadEnd()
local data, size = self:ReceiveHogStatus()
assert(data and #data == size)
self.hog_download_data = data
self:StopDownload()
end
function DataSocket:OnDisconnect(reason)
self:StopUpload("disconnected")
self:StopDownload("disconnected")
MessageSocket.OnDisconnect(self, reason)
end