winrt: Rework socket handling
Buffer handling is now completely moved to the worker. Instead of moving data around all the time, the worker is responsible for buffer handling. When reads happen, the data that is read is used directly from the worker and its buffer is updated. With the previous approach it was possible, that transfers never completed. It was possible, that new data was read between calls of bytesAvailable and read and the availability of that data was never communicated to the user. If a read that does not read all the data happens, we signal, that there is still data available, so that the user is notified about that fact. At the same time we avoid unnecessary readyRead calls by blocking them until a read happens. To make future debugging sessions easier, categorized logging (including verbose) was added to the socket engine. Task-number: QTBUG-65556 Change-Id: I12020ffcccf8eb3efec9c36dc5b0e6c0ebef7eb5 Reviewed-by: Andre de la Rocha <andre.rocha@qt.io> Reviewed-by: Maurice Kalinowski <maurice.kalinowski@qt.io>bb10
parent
8e93988cce
commit
f3db6971ee
|
|
@ -87,6 +87,9 @@ typedef IAsyncOperationWithProgress<IBuffer *, UINT32> IAsyncBufferOperation;
|
|||
|
||||
QT_BEGIN_NAMESPACE
|
||||
|
||||
Q_LOGGING_CATEGORY(lcNetworkSocket, "qt.network.socket");
|
||||
Q_LOGGING_CATEGORY(lcNetworkSocketVerbose, "qt.network.socket.verbose");
|
||||
|
||||
#if _MSC_VER >= 1900
|
||||
static HRESULT qt_winrt_try_create_thread_network_context(QString host, ComPtr<IThreadNetworkContext> &context)
|
||||
{
|
||||
|
|
@ -167,11 +170,14 @@ public:
|
|||
SocketEngineWorker(QNativeSocketEnginePrivate *engine)
|
||||
: enginePrivate(engine)
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << engine;
|
||||
}
|
||||
|
||||
~SocketEngineWorker()
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO;
|
||||
if (Q_UNLIKELY(initialReadOp)) {
|
||||
qCDebug(lcNetworkSocket) << Q_FUNC_INFO << "Closing initial read operation";
|
||||
ComPtr<IAsyncInfo> info;
|
||||
HRESULT hr = initialReadOp.As(&info);
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
|
|
@ -184,6 +190,7 @@ public:
|
|||
}
|
||||
|
||||
if (readOp) {
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << "Closing read operation";
|
||||
ComPtr<IAsyncInfo> info;
|
||||
HRESULT hr = readOp.As(&info);
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
|
|
@ -196,6 +203,7 @@ public:
|
|||
}
|
||||
|
||||
if (connectOp) {
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << "Closing connect operation";
|
||||
ComPtr<IAsyncInfo> info;
|
||||
HRESULT hr = connectOp.As(&info);
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
|
|
@ -210,30 +218,13 @@ public:
|
|||
|
||||
signals:
|
||||
void connectOpFinished(bool success, QAbstractSocket::SocketError error, WinRTSocketEngine::ErrorString errorString);
|
||||
void newDatagramsReceived(const QList<WinRtDatagram> &datagram);
|
||||
void newDataReceived(const QVector<QByteArray> &data);
|
||||
void newDataReceived();
|
||||
void socketErrorOccured(QAbstractSocket::SocketError error);
|
||||
|
||||
public slots:
|
||||
Q_INVOKABLE void notifyAboutNewDatagrams()
|
||||
{
|
||||
QMutexLocker locker(&mutex);
|
||||
QList<WinRtDatagram> datagrams = pendingDatagrams;
|
||||
pendingDatagrams.clear();
|
||||
emit newDatagramsReceived(datagrams);
|
||||
}
|
||||
|
||||
Q_INVOKABLE void notifyAboutNewData()
|
||||
{
|
||||
QMutexLocker locker(&mutex);
|
||||
const QVector<QByteArray> newData = std::move(pendingData);
|
||||
pendingData.clear();
|
||||
emit newDataReceived(newData);
|
||||
}
|
||||
|
||||
public:
|
||||
void startReading()
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO;
|
||||
ComPtr<IBuffer> buffer;
|
||||
HRESULT hr = g->bufferFactory->Create(READ_BUFFER_SIZE, &buffer);
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
|
|
@ -249,6 +240,7 @@ public:
|
|||
|
||||
HRESULT onConnectOpFinished(IAsyncAction *action, AsyncStatus)
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO;
|
||||
HRESULT hr = action->GetResults();
|
||||
if (FAILED(hr)) {
|
||||
if (hr == HRESULT_FROM_WIN32(WSAETIMEDOUT)) {
|
||||
|
|
@ -287,6 +279,7 @@ public:
|
|||
|
||||
HRESULT OnNewDatagramReceived(IDatagramSocket *, IDatagramSocketMessageReceivedEventArgs *args)
|
||||
{
|
||||
qCDebug(lcNetworkSocketVerbose) << this << Q_FUNC_INFO;
|
||||
WinRtDatagram datagram;
|
||||
QHostAddress returnAddress;
|
||||
ComPtr<IHostName> remoteHost;
|
||||
|
|
@ -311,10 +304,11 @@ public:
|
|||
datagram.data.resize(length);
|
||||
hr = reader->ReadBytes(length, reinterpret_cast<BYTE *>(datagram.data.data()));
|
||||
RETURN_OK_IF_FAILED("Could not read datagram");
|
||||
|
||||
QMutexLocker locker(&mutex);
|
||||
// Notify the engine about new datagrams being present at the next event loop iteration
|
||||
if (pendingDatagrams.isEmpty())
|
||||
QMetaObject::invokeMethod(this, "notifyAboutNewDatagrams", Qt::QueuedConnection);
|
||||
if (emitDataReceived)
|
||||
emit newDataReceived();
|
||||
pendingDatagrams << datagram;
|
||||
|
||||
return S_OK;
|
||||
|
|
@ -322,6 +316,7 @@ public:
|
|||
|
||||
HRESULT onReadyRead(IAsyncBufferOperation *asyncInfo, AsyncStatus status)
|
||||
{
|
||||
qCDebug(lcNetworkSocketVerbose) << this << Q_FUNC_INFO;
|
||||
if (asyncInfo == initialReadOp.Get()) {
|
||||
initialReadOp.Reset();
|
||||
} else if (asyncInfo == readOp.Get()) {
|
||||
|
|
@ -334,6 +329,7 @@ public:
|
|||
// that the connection was closed. The socket cannot be closed here, as the subsequent read
|
||||
// might fail then.
|
||||
if (status == Error || status == Canceled) {
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << "Remote host closed";
|
||||
emit socketErrorOccured(QAbstractSocket::RemoteHostClosedError);
|
||||
return S_OK;
|
||||
}
|
||||
|
|
@ -358,6 +354,7 @@ public:
|
|||
// the closing of the socket won't be communicated to the caller. So only the error is set. The
|
||||
// actual socket close happens inside of read.
|
||||
if (!bufferLength) {
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << "Remote host closed";
|
||||
emit socketErrorOccured(QAbstractSocket::RemoteHostClosedError);
|
||||
return S_OK;
|
||||
}
|
||||
|
|
@ -378,10 +375,10 @@ public:
|
|||
}
|
||||
|
||||
QByteArray newData(reinterpret_cast<const char*>(data), qint64(bufferLength));
|
||||
|
||||
QMutexLocker readLocker(&mutex);
|
||||
if (pendingData.isEmpty())
|
||||
QMetaObject::invokeMethod(this, "notifyAboutNewData", Qt::QueuedConnection);
|
||||
pendingData << newData;
|
||||
emit newDataReceived();
|
||||
pendingData.append(newData);
|
||||
readLocker.unlock();
|
||||
|
||||
hr = QEventDispatcherWinRT::runOnXamlThread([buffer, this]() {
|
||||
|
|
@ -433,7 +430,8 @@ private:
|
|||
ComPtr<IStreamSocket> tcpSocket;
|
||||
|
||||
QList<WinRtDatagram> pendingDatagrams;
|
||||
QVector<QByteArray> pendingData;
|
||||
bool emitDataReceived = true;
|
||||
QByteArray pendingData;
|
||||
|
||||
// Protects pendingData/pendingDatagrams which are accessed from native callbacks
|
||||
QMutex mutex;
|
||||
|
|
@ -509,6 +507,7 @@ static AsyncStatus opStatus(const ComPtr<T> &op)
|
|||
|
||||
static qint64 writeIOStream(ComPtr<IOutputStream> stream, const char *data, qint64 len)
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << Q_FUNC_INFO << data << len;
|
||||
ComPtr<IBuffer> buffer;
|
||||
HRESULT hr = g->bufferFactory->Create(len, &buffer);
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
|
|
@ -533,6 +532,7 @@ static qint64 writeIOStream(ComPtr<IOutputStream> stream, const char *data, qint
|
|||
QNativeSocketEngine::QNativeSocketEngine(QObject *parent)
|
||||
: QAbstractSocketEngine(*new QNativeSocketEnginePrivate(), parent)
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << parent;
|
||||
qRegisterMetaType<WinRtDatagram>();
|
||||
qRegisterMetaType<WinRTSocketEngine::ErrorString>();
|
||||
Q_D(QNativeSocketEngine);
|
||||
|
|
@ -546,20 +546,20 @@ QNativeSocketEngine::QNativeSocketEngine(QObject *parent)
|
|||
connect(this, SIGNAL(writeReady()), SLOT(writeNotification()), Qt::QueuedConnection);
|
||||
connect(d->worker, &SocketEngineWorker::connectOpFinished,
|
||||
this, &QNativeSocketEngine::handleConnectOpFinished, Qt::QueuedConnection);
|
||||
connect(d->worker, &SocketEngineWorker::newDatagramsReceived, this, &QNativeSocketEngine::handleNewDatagrams, Qt::QueuedConnection);
|
||||
connect(d->worker, &SocketEngineWorker::newDataReceived,
|
||||
this, &QNativeSocketEngine::handleNewData, Qt::QueuedConnection);
|
||||
connect(d->worker, &SocketEngineWorker::newDataReceived, this, &QNativeSocketEngine::handleNewData, Qt::QueuedConnection);
|
||||
connect(d->worker, &SocketEngineWorker::socketErrorOccured,
|
||||
this, &QNativeSocketEngine::handleTcpError, Qt::QueuedConnection);
|
||||
}
|
||||
|
||||
QNativeSocketEngine::~QNativeSocketEngine()
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO;
|
||||
close();
|
||||
}
|
||||
|
||||
bool QNativeSocketEngine::initialize(QAbstractSocket::SocketType type, QAbstractSocket::NetworkLayerProtocol protocol)
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << type << protocol;
|
||||
Q_D(QNativeSocketEngine);
|
||||
if (isValid())
|
||||
close();
|
||||
|
|
@ -575,6 +575,7 @@ bool QNativeSocketEngine::initialize(QAbstractSocket::SocketType type, QAbstract
|
|||
|
||||
bool QNativeSocketEngine::initialize(qintptr socketDescriptor, QAbstractSocket::SocketState socketState)
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << socketDescriptor << socketState;
|
||||
Q_D(QNativeSocketEngine);
|
||||
|
||||
if (isValid())
|
||||
|
|
@ -622,18 +623,21 @@ bool QNativeSocketEngine::isValid() const
|
|||
|
||||
bool QNativeSocketEngine::connectToHost(const QHostAddress &address, quint16 port)
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << address << port;
|
||||
const QString addressString = address.toString();
|
||||
return connectToHostByName(addressString, port);
|
||||
}
|
||||
|
||||
bool QNativeSocketEngine::connectToHostByName(const QString &name, quint16 port)
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << name << port;
|
||||
Q_D(QNativeSocketEngine);
|
||||
HRESULT hr;
|
||||
|
||||
#if _MSC_VER >= 1900
|
||||
ComPtr<IThreadNetworkContext> networkContext;
|
||||
if (!qEnvironmentVariableIsEmpty("QT_WINRT_USE_THREAD_NETWORK_CONTEXT")) {
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << "Creating network context";
|
||||
hr = qt_winrt_try_create_thread_network_context(name, networkContext);
|
||||
if (FAILED(hr)) {
|
||||
setError(QAbstractSocket::ConnectionRefusedError, QLatin1String("Could not create thread network context."));
|
||||
|
|
@ -668,6 +672,7 @@ bool QNativeSocketEngine::connectToHostByName(const QString &name, quint16 port)
|
|||
|
||||
#if _MSC_VER >= 1900
|
||||
if (networkContext != nullptr) {
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << "Closing network context";
|
||||
ComPtr<IClosable> networkContextCloser;
|
||||
hr = networkContext.As(&networkContextCloser);
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
|
|
@ -691,6 +696,7 @@ bool QNativeSocketEngine::connectToHostByName(const QString &name, quint16 port)
|
|||
|
||||
bool QNativeSocketEngine::bind(const QHostAddress &address, quint16 port)
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << address << port;
|
||||
Q_D(QNativeSocketEngine);
|
||||
HRESULT hr;
|
||||
// runOnXamlThread may only return S_OK (will assert otherwise) so no need to check its result.
|
||||
|
|
@ -773,6 +779,7 @@ bool QNativeSocketEngine::bind(const QHostAddress &address, quint16 port)
|
|||
|
||||
bool QNativeSocketEngine::listen()
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO;
|
||||
Q_D(QNativeSocketEngine);
|
||||
Q_CHECK_VALID_SOCKETLAYER(QNativeSocketEngine::listen(), false);
|
||||
Q_CHECK_STATE(QNativeSocketEngine::listen(), QAbstractSocket::BoundState, false);
|
||||
|
|
@ -787,6 +794,7 @@ bool QNativeSocketEngine::listen()
|
|||
|
||||
int QNativeSocketEngine::accept()
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO;
|
||||
Q_D(QNativeSocketEngine);
|
||||
Q_CHECK_VALID_SOCKETLAYER(QNativeSocketEngine::accept(), -1);
|
||||
Q_CHECK_STATE(QNativeSocketEngine::accept(), QAbstractSocket::ListeningState, -1);
|
||||
|
|
@ -810,6 +818,7 @@ int QNativeSocketEngine::accept()
|
|||
|
||||
void QNativeSocketEngine::close()
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO;
|
||||
Q_D(QNativeSocketEngine);
|
||||
|
||||
if (d->closingDown)
|
||||
|
|
@ -878,16 +887,14 @@ void QNativeSocketEngine::close()
|
|||
|
||||
bool QNativeSocketEngine::joinMulticastGroup(const QHostAddress &groupAddress, const QNetworkInterface &iface)
|
||||
{
|
||||
Q_UNUSED(groupAddress);
|
||||
Q_UNUSED(iface);
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << groupAddress << iface;
|
||||
Q_UNIMPLEMENTED();
|
||||
return false;
|
||||
}
|
||||
|
||||
bool QNativeSocketEngine::leaveMulticastGroup(const QHostAddress &groupAddress, const QNetworkInterface &iface)
|
||||
{
|
||||
Q_UNUSED(groupAddress);
|
||||
Q_UNUSED(iface);
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << groupAddress << iface;
|
||||
Q_UNIMPLEMENTED();
|
||||
return false;
|
||||
}
|
||||
|
|
@ -900,7 +907,7 @@ QNetworkInterface QNativeSocketEngine::multicastInterface() const
|
|||
|
||||
bool QNativeSocketEngine::setMulticastInterface(const QNetworkInterface &iface)
|
||||
{
|
||||
Q_UNUSED(iface);
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << iface;
|
||||
Q_UNIMPLEMENTED();
|
||||
return false;
|
||||
}
|
||||
|
|
@ -911,11 +918,16 @@ qint64 QNativeSocketEngine::bytesAvailable() const
|
|||
if (d->socketType != QAbstractSocket::TcpSocket)
|
||||
return -1;
|
||||
|
||||
return d->bytesAvailable;
|
||||
QMutexLocker locker(&d->worker->mutex);
|
||||
const qint64 bytesAvailable = d->worker->pendingData.length();
|
||||
|
||||
qCDebug(lcNetworkSocketVerbose) << this << Q_FUNC_INFO << bytesAvailable;
|
||||
return bytesAvailable;
|
||||
}
|
||||
|
||||
qint64 QNativeSocketEngine::read(char *data, qint64 maxlen)
|
||||
{
|
||||
qCDebug(lcNetworkSocketVerbose) << this << Q_FUNC_INFO << maxlen;
|
||||
Q_D(QNativeSocketEngine);
|
||||
if (d->socketType != QAbstractSocket::TcpSocket)
|
||||
return -1;
|
||||
|
|
@ -923,37 +935,37 @@ qint64 QNativeSocketEngine::read(char *data, qint64 maxlen)
|
|||
// There will be a read notification when the socket was closed by the remote host. If that
|
||||
// happens and there isn't anything left in the buffer, we have to return -1 in order to signal
|
||||
// the closing of the socket.
|
||||
QMutexLocker mutexLocker(&d->readMutex);
|
||||
if (d->pendingData.isEmpty() && d->socketState != QAbstractSocket::ConnectedState) {
|
||||
QMutexLocker mutexLocker(&d->worker->mutex);
|
||||
if (d->worker->pendingData.isEmpty() && d->socketState != QAbstractSocket::ConnectedState) {
|
||||
close();
|
||||
return -1;
|
||||
}
|
||||
|
||||
QByteArray readData;
|
||||
qint64 leftToMaxLen = maxlen;
|
||||
while (leftToMaxLen > 0 && !d->pendingData.isEmpty()) {
|
||||
QByteArray pendingData = d->pendingData.takeFirst();
|
||||
// Do not read the whole data. Put the rest of it back into the "queue"
|
||||
if (leftToMaxLen < pendingData.length()) {
|
||||
readData += pendingData.left(leftToMaxLen);
|
||||
pendingData = pendingData.remove(0, maxlen);
|
||||
d->pendingData.prepend(pendingData);
|
||||
break;
|
||||
} else {
|
||||
readData += pendingData;
|
||||
leftToMaxLen -= pendingData.length();
|
||||
}
|
||||
const int copyLength = qMin(maxlen, qint64(d->worker->pendingData.length()));
|
||||
if (maxlen >= d->worker->pendingData.length()) {
|
||||
qCDebug(lcNetworkSocketVerbose) << this << Q_FUNC_INFO << "Reading full buffer";
|
||||
readData = d->worker->pendingData;
|
||||
d->worker->pendingData.clear();
|
||||
d->emitReadReady = true;
|
||||
} else {
|
||||
qCDebug(lcNetworkSocketVerbose) << this << Q_FUNC_INFO << "Reading part of the buffer ("
|
||||
<< copyLength << "of" << d->worker->pendingData.length() << "bytes";
|
||||
readData = d->worker->pendingData.left(maxlen);
|
||||
d->worker->pendingData.remove(0, maxlen);
|
||||
if (d->notifyOnRead)
|
||||
emit readReady();
|
||||
}
|
||||
const int copyLength = qMin(maxlen, qint64(readData.length()));
|
||||
d->bytesAvailable -= copyLength;
|
||||
mutexLocker.unlock();
|
||||
|
||||
memcpy(data, readData, copyLength);
|
||||
qCDebug(lcNetworkSocketVerbose) << this << Q_FUNC_INFO << "Read" << copyLength << "bytes";
|
||||
return copyLength;
|
||||
}
|
||||
|
||||
qint64 QNativeSocketEngine::write(const char *data, qint64 len)
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << data << len;
|
||||
Q_D(QNativeSocketEngine);
|
||||
if (!isValid())
|
||||
return -1;
|
||||
|
|
@ -978,16 +990,17 @@ qint64 QNativeSocketEngine::write(const char *data, qint64 len)
|
|||
qint64 QNativeSocketEngine::readDatagram(char *data, qint64 maxlen, QIpPacketHeader *header,
|
||||
PacketHeaderOptions)
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << maxlen;
|
||||
#ifndef QT_NO_UDPSOCKET
|
||||
Q_D(QNativeSocketEngine);
|
||||
QMutexLocker locker(&d->readMutex);
|
||||
if (d->socketType != QAbstractSocket::UdpSocket || d->pendingDatagrams.isEmpty()) {
|
||||
QMutexLocker locker(&d->worker->mutex);
|
||||
if (d->socketType != QAbstractSocket::UdpSocket || d->worker->pendingDatagrams.isEmpty()) {
|
||||
if (header)
|
||||
header->clear();
|
||||
return -1;
|
||||
}
|
||||
|
||||
WinRtDatagram datagram = d->pendingDatagrams.takeFirst();
|
||||
WinRtDatagram datagram = d->worker->pendingDatagrams.takeFirst();
|
||||
if (header)
|
||||
*header = datagram.header;
|
||||
|
||||
|
|
@ -996,10 +1009,16 @@ qint64 QNativeSocketEngine::readDatagram(char *data, qint64 maxlen, QIpPacketHea
|
|||
if (maxlen < datagram.data.length()) {
|
||||
readOrigin = datagram.data.left(maxlen);
|
||||
datagram.data = datagram.data.remove(0, maxlen);
|
||||
d->pendingDatagrams.prepend(datagram);
|
||||
d->worker->pendingDatagrams.prepend(datagram);
|
||||
} else {
|
||||
readOrigin = datagram.data;
|
||||
}
|
||||
if (d->worker->pendingDatagrams.isEmpty()) {
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << "That's all folks";
|
||||
d->worker->emitDataReceived = true;
|
||||
d->emitReadReady = true;
|
||||
}
|
||||
|
||||
locker.unlock();
|
||||
memcpy(data, readOrigin, qMin(maxlen, qint64(datagram.data.length())));
|
||||
return readOrigin.length();
|
||||
|
|
@ -1013,6 +1032,7 @@ qint64 QNativeSocketEngine::readDatagram(char *data, qint64 maxlen, QIpPacketHea
|
|||
|
||||
qint64 QNativeSocketEngine::writeDatagram(const char *data, qint64 len, const QIpPacketHeader &header)
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << data << len;
|
||||
#ifndef QT_NO_UDPSOCKET
|
||||
Q_D(QNativeSocketEngine);
|
||||
if (d->socketType != QAbstractSocket::UdpSocket)
|
||||
|
|
@ -1051,18 +1071,18 @@ qint64 QNativeSocketEngine::writeDatagram(const char *data, qint64 len, const QI
|
|||
bool QNativeSocketEngine::hasPendingDatagrams() const
|
||||
{
|
||||
Q_D(const QNativeSocketEngine);
|
||||
QMutexLocker locker(&d->readMutex);
|
||||
return d->pendingDatagrams.length() > 0;
|
||||
QMutexLocker locker(&d->worker->mutex);
|
||||
return d->worker->pendingDatagrams.length() > 0;
|
||||
}
|
||||
|
||||
qint64 QNativeSocketEngine::pendingDatagramSize() const
|
||||
{
|
||||
Q_D(const QNativeSocketEngine);
|
||||
QMutexLocker locker(&d->readMutex);
|
||||
if (d->pendingDatagrams.isEmpty())
|
||||
QMutexLocker locker(&d->worker->mutex);
|
||||
if (d->worker->pendingDatagrams.isEmpty())
|
||||
return -1;
|
||||
|
||||
return d->pendingDatagrams.at(0).data.length();
|
||||
return d->worker->pendingDatagrams.at(0).data.length();
|
||||
}
|
||||
|
||||
qint64 QNativeSocketEngine::bytesToWrite() const
|
||||
|
|
@ -1078,6 +1098,7 @@ qint64 QNativeSocketEngine::receiveBufferSize() const
|
|||
|
||||
void QNativeSocketEngine::setReceiveBufferSize(qint64 bufferSize)
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << bufferSize;
|
||||
Q_D(QNativeSocketEngine);
|
||||
d->setOption(QAbstractSocketEngine::ReceiveBufferSocketOption, bufferSize);
|
||||
}
|
||||
|
|
@ -1090,6 +1111,7 @@ qint64 QNativeSocketEngine::sendBufferSize() const
|
|||
|
||||
void QNativeSocketEngine::setSendBufferSize(qint64 bufferSize)
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << bufferSize;
|
||||
Q_D(QNativeSocketEngine);
|
||||
d->setOption(QAbstractSocketEngine::SendBufferSocketOption, bufferSize);
|
||||
}
|
||||
|
|
@ -1102,12 +1124,14 @@ int QNativeSocketEngine::option(QAbstractSocketEngine::SocketOption option) cons
|
|||
|
||||
bool QNativeSocketEngine::setOption(QAbstractSocketEngine::SocketOption option, int value)
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << option << value;
|
||||
Q_D(QNativeSocketEngine);
|
||||
return d->setOption(option, value);
|
||||
}
|
||||
|
||||
bool QNativeSocketEngine::waitForRead(int msecs, bool *timedOut)
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << msecs;
|
||||
Q_D(QNativeSocketEngine);
|
||||
Q_CHECK_VALID_SOCKETLAYER(QNativeSocketEngine::waitForRead(), false);
|
||||
Q_CHECK_NOT_STATE(QNativeSocketEngine::waitForRead(),
|
||||
|
|
@ -1124,8 +1148,8 @@ bool QNativeSocketEngine::waitForRead(int msecs, bool *timedOut)
|
|||
return true;
|
||||
|
||||
// If we are a client, we are ready to read if our buffer has data
|
||||
QMutexLocker locker(&d->readMutex);
|
||||
if (!d->pendingData.isEmpty())
|
||||
QMutexLocker locker(&d->worker->mutex);
|
||||
if (!d->worker->pendingData.isEmpty())
|
||||
return true;
|
||||
|
||||
// Nothing to do, wait for more events
|
||||
|
|
@ -1142,7 +1166,7 @@ bool QNativeSocketEngine::waitForRead(int msecs, bool *timedOut)
|
|||
|
||||
bool QNativeSocketEngine::waitForWrite(int msecs, bool *timedOut)
|
||||
{
|
||||
Q_UNUSED(msecs);
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << msecs;
|
||||
Q_UNUSED(timedOut);
|
||||
Q_D(QNativeSocketEngine);
|
||||
if (d->socketState == QAbstractSocket::ConnectingState) {
|
||||
|
|
@ -1157,11 +1181,9 @@ bool QNativeSocketEngine::waitForWrite(int msecs, bool *timedOut)
|
|||
|
||||
bool QNativeSocketEngine::waitForReadOrWrite(bool *readyToRead, bool *readyToWrite, bool checkRead, bool checkWrite, int msecs, bool *timedOut)
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << checkRead << checkWrite << msecs;
|
||||
Q_UNUSED(readyToRead);
|
||||
Q_UNUSED(readyToWrite);
|
||||
Q_UNUSED(checkRead);
|
||||
Q_UNUSED(checkWrite);
|
||||
Q_UNUSED(msecs);
|
||||
Q_UNUSED(timedOut);
|
||||
return false;
|
||||
}
|
||||
|
|
@ -1174,6 +1196,7 @@ bool QNativeSocketEngine::isReadNotificationEnabled() const
|
|||
|
||||
void QNativeSocketEngine::setReadNotificationEnabled(bool enable)
|
||||
{
|
||||
qCDebug(lcNetworkSocketVerbose) << this << Q_FUNC_INFO << enable;
|
||||
Q_D(QNativeSocketEngine);
|
||||
d->notifyOnRead = enable;
|
||||
}
|
||||
|
|
@ -1186,6 +1209,7 @@ bool QNativeSocketEngine::isWriteNotificationEnabled() const
|
|||
|
||||
void QNativeSocketEngine::setWriteNotificationEnabled(bool enable)
|
||||
{
|
||||
qCDebug(lcNetworkSocketVerbose) << this << Q_FUNC_INFO << enable;
|
||||
Q_D(QNativeSocketEngine);
|
||||
d->notifyOnWrite = enable;
|
||||
if (enable && d->socketState == QAbstractSocket::ConnectedState) {
|
||||
|
|
@ -1203,12 +1227,14 @@ bool QNativeSocketEngine::isExceptionNotificationEnabled() const
|
|||
|
||||
void QNativeSocketEngine::setExceptionNotificationEnabled(bool enable)
|
||||
{
|
||||
qCDebug(lcNetworkSocketVerbose) << this << Q_FUNC_INFO << enable;
|
||||
Q_D(QNativeSocketEngine);
|
||||
d->notifyOnException = enable;
|
||||
}
|
||||
|
||||
void QNativeSocketEngine::establishRead()
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO;
|
||||
Q_D(QNativeSocketEngine);
|
||||
|
||||
HRESULT hr;
|
||||
|
|
@ -1222,6 +1248,7 @@ void QNativeSocketEngine::establishRead()
|
|||
|
||||
void QNativeSocketEngine::handleConnectOpFinished(bool success, QAbstractSocket::SocketError error, WinRTSocketEngine::ErrorString errorString)
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << success << error << errorString;
|
||||
Q_D(QNativeSocketEngine);
|
||||
disconnect(d->worker, &SocketEngineWorker::connectOpFinished,
|
||||
this, &QNativeSocketEngine::handleConnectOpFinished);
|
||||
|
|
@ -1246,29 +1273,24 @@ void QNativeSocketEngine::handleConnectOpFinished(bool success, QAbstractSocket:
|
|||
establishRead();
|
||||
}
|
||||
|
||||
void QNativeSocketEngine::handleNewDatagrams(const QList<WinRtDatagram> &datagrams)
|
||||
void QNativeSocketEngine::handleNewData()
|
||||
{
|
||||
qCDebug(lcNetworkSocketVerbose) << this << Q_FUNC_INFO;
|
||||
Q_D(QNativeSocketEngine);
|
||||
QMutexLocker locker(&d->readMutex);
|
||||
d->pendingDatagrams.append(datagrams);
|
||||
if (d->notifyOnRead)
|
||||
emit readReady();
|
||||
}
|
||||
|
||||
void QNativeSocketEngine::handleNewData(const QVector<QByteArray> &data)
|
||||
{
|
||||
Q_D(QNativeSocketEngine);
|
||||
QMutexLocker locker(&d->readMutex);
|
||||
d->pendingData.append(data);
|
||||
for (const QByteArray &newData : data)
|
||||
d->bytesAvailable += newData.length();
|
||||
locker.unlock();
|
||||
if (d->notifyOnRead)
|
||||
readNotification();
|
||||
if (d->notifyOnRead && d->emitReadReady) {
|
||||
if (d->socketType == QAbstractSocket::UdpSocket && !d->worker->emitDataReceived)
|
||||
return;
|
||||
qCDebug(lcNetworkSocketVerbose) << this << Q_FUNC_INFO << "Emitting readReady";
|
||||
emit readReady();
|
||||
d->worker->emitDataReceived = false;
|
||||
d->emitReadReady = false;
|
||||
}
|
||||
}
|
||||
|
||||
void QNativeSocketEngine::handleTcpError(QAbstractSocket::SocketError error)
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << error;
|
||||
Q_D(QNativeSocketEngine);
|
||||
WinRTSocketEngine::ErrorString errorString;
|
||||
switch (error) {
|
||||
|
|
@ -1287,6 +1309,7 @@ void QNativeSocketEngine::handleTcpError(QAbstractSocket::SocketError error)
|
|||
|
||||
bool QNativeSocketEnginePrivate::createNewSocket(QAbstractSocket::SocketType socketType, QAbstractSocket::NetworkLayerProtocol &socketProtocol)
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << socketType << socketProtocol;
|
||||
Q_UNUSED(socketProtocol);
|
||||
HRESULT hr;
|
||||
|
||||
|
|
@ -1343,10 +1366,12 @@ QNativeSocketEnginePrivate::QNativeSocketEnginePrivate()
|
|||
, sslSocket(nullptr)
|
||||
, connectionToken( { -1 } )
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO;
|
||||
}
|
||||
|
||||
QNativeSocketEnginePrivate::~QNativeSocketEnginePrivate()
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO;
|
||||
if (socketDescriptor == -1 || connectionToken.value == -1)
|
||||
return;
|
||||
|
||||
|
|
@ -1362,6 +1387,7 @@ QNativeSocketEnginePrivate::~QNativeSocketEnginePrivate()
|
|||
|
||||
void QNativeSocketEnginePrivate::setError(QAbstractSocket::SocketError error, WinRTSocketEngine::ErrorString errorString) const
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << error << errorString;
|
||||
if (hasSetSocketError) {
|
||||
// Only set socket errors once for one engine; expect the
|
||||
// socket to recreate its engine after an error. Note: There's
|
||||
|
|
@ -1523,6 +1549,7 @@ int QNativeSocketEnginePrivate::option(QAbstractSocketEngine::SocketOption opt)
|
|||
|
||||
bool QNativeSocketEnginePrivate::setOption(QAbstractSocketEngine::SocketOption opt, int v)
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO << opt << v;
|
||||
ComPtr<IStreamSocketControl> control;
|
||||
if (socketType == QAbstractSocket::TcpSocket) {
|
||||
if (FAILED(tcpSocket()->get_Control(&control))) {
|
||||
|
|
@ -1583,6 +1610,7 @@ bool QNativeSocketEnginePrivate::setOption(QAbstractSocketEngine::SocketOption o
|
|||
|
||||
bool QNativeSocketEnginePrivate::fetchConnectionParameters()
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO;
|
||||
localPort = 0;
|
||||
localAddress.clear();
|
||||
peerPort = 0;
|
||||
|
|
@ -1659,6 +1687,7 @@ bool QNativeSocketEnginePrivate::fetchConnectionParameters()
|
|||
|
||||
HRESULT QNativeSocketEnginePrivate::handleClientConnection(IStreamSocketListener *listener, IStreamSocketListenerConnectionReceivedEventArgs *args)
|
||||
{
|
||||
qCDebug(lcNetworkSocket) << this << Q_FUNC_INFO;
|
||||
Q_Q(QNativeSocketEngine);
|
||||
Q_UNUSED(listener)
|
||||
IStreamSocket *socket;
|
||||
|
|
@ -1672,6 +1701,7 @@ HRESULT QNativeSocketEnginePrivate::handleClientConnection(IStreamSocketListener
|
|||
|
||||
HRESULT QNativeSocketEnginePrivate::handleNewDatagram(IDatagramSocket *socket, IDatagramSocketMessageReceivedEventArgs *args)
|
||||
{
|
||||
qCDebug(lcNetworkSocketVerbose) << this << Q_FUNC_INFO;
|
||||
Q_Q(QNativeSocketEngine);
|
||||
Q_UNUSED(socket);
|
||||
|
||||
|
|
|
|||
|
|
@ -54,6 +54,7 @@
|
|||
#include <QtNetwork/private/qtnetworkglobal_p.h>
|
||||
#include <QtCore/QEventLoop>
|
||||
#include <QtCore/QBuffer>
|
||||
#include <QtCore/QLoggingCategory>
|
||||
#include <QtCore/QMutex>
|
||||
#include <QtCore/QAtomicInteger>
|
||||
#include "QtNetwork/qhostaddress.h"
|
||||
|
|
@ -63,6 +64,9 @@
|
|||
|
||||
QT_BEGIN_NAMESPACE
|
||||
|
||||
Q_DECLARE_LOGGING_CATEGORY(lcNetworkSocket)
|
||||
Q_DECLARE_LOGGING_CATEGORY(lcNetworkSocketVerbose)
|
||||
|
||||
namespace WinRTSocketEngine {
|
||||
enum ErrorString {
|
||||
NonBlockingInitFailedErrorString,
|
||||
|
|
@ -178,8 +182,7 @@ private slots:
|
|||
void establishRead();
|
||||
void handleConnectOpFinished(bool success, QAbstractSocket::SocketError error,
|
||||
WinRTSocketEngine::ErrorString errorString);
|
||||
void handleNewDatagrams(const QList<WinRtDatagram> &datagram);
|
||||
void handleNewData(const QVector<QByteArray> &data);
|
||||
void handleNewData();
|
||||
void handleTcpError(QAbstractSocket::SocketError error);
|
||||
|
||||
private:
|
||||
|
|
@ -218,26 +221,14 @@ private:
|
|||
{ return reinterpret_cast<ABI::Windows::Networking::Sockets::IDatagramSocket *>(socketDescriptor); }
|
||||
Microsoft::WRL::ComPtr<ABI::Windows::Networking::Sockets::IStreamSocketListener> tcpListener;
|
||||
|
||||
// In case of TCP readMutex protects readBytes and bytesAvailable. In case of UDP it is
|
||||
// pendingDatagrams. They are written inside native callbacks (handleReadyRead and
|
||||
// handleNewDatagrams/putIntoPendingDatagramsList)
|
||||
mutable QMutex readMutex;
|
||||
|
||||
// Protected by readMutex. Written in handleReadyRead (native callback)
|
||||
QAtomicInteger<int> bytesAvailable;
|
||||
|
||||
// Protected by readMutex. Written in handleNewData/putIntoPendingData (native callback)
|
||||
QVector<QByteArray> pendingData;
|
||||
|
||||
// Protected by readMutex. Written in handleNewDatagrams/putIntoPendingDatagramsList
|
||||
QList<WinRtDatagram> pendingDatagrams;
|
||||
|
||||
QList<ABI::Windows::Networking::Sockets::IStreamSocket *> pendingConnections;
|
||||
QList<ABI::Windows::Networking::Sockets::IStreamSocket *> currentConnections;
|
||||
QEventLoop eventLoop;
|
||||
QAbstractSocket *sslSocket;
|
||||
EventRegistrationToken connectionToken;
|
||||
|
||||
bool emitReadReady = true;
|
||||
|
||||
HRESULT handleNewDatagram(ABI::Windows::Networking::Sockets::IDatagramSocket *socket,
|
||||
ABI::Windows::Networking::Sockets::IDatagramSocketMessageReceivedEventArgs *args);
|
||||
HRESULT handleClientConnection(ABI::Windows::Networking::Sockets::IStreamSocketListener *tcpListener,
|
||||
|
|
|
|||
Loading…
Reference in New Issue