21 #include "session_p.h"
23 #include "imapparser_p.h"
26 #include "servermanager.h"
27 #include "servermanager_p.h"
28 #include "protocolhelper_p.h"
29 #include "xdgbasedirs_p.h"
32 #include <klocalizedstring.h>
34 #include <QCoreApplication>
35 #include <QtCore/QDir>
36 #include <QtCore/QQueue>
37 #include <QtCore/QThreadStorage>
38 #include <QtCore/QTimer>
39 #include <QtCore/QThread>
42 #include <QtNetwork/QLocalSocket>
43 #include <QtNetwork/QTcpSocket>
44 #include <QtNetwork/QHostAddress>
45 #include <QApplication>
50 #define PIPELINE_LENGTH 0
57 static const QList<QByteArray> sCapabilities = QList<QByteArray>()
60 <<
"AKAPPENDSTREAMING"
64 void SessionPrivate::startNext()
66 QTimer::singleShot(0, mParent, SLOT(doStartNext()));
71 QLocalSocket *localSocket = qobject_cast<QLocalSocket *>(socket);
72 if (localSocket && (localSocket->state() == QLocalSocket::ConnectedState
73 || localSocket->state() == QLocalSocket::ConnectingState)) {
78 QTcpSocket *tcpSocket = qobject_cast<QTcpSocket *>(socket);
79 if (tcpSocket && (tcpSocket->state() == QTcpSocket::ConnectedState
80 || tcpSocket->state() == QTcpSocket::ConnectingState)) {
86 QString serverAddress;
91 const QByteArray serverAddressEnvVar = qgetenv(
"AKONADI_SERVER_ADDRESS");
92 if (!serverAddressEnvVar.isEmpty()) {
93 const int pos = serverAddressEnvVar.indexOf(
':');
94 const QByteArray protocol = serverAddressEnvVar.left(pos);
95 QMap<QString, QString> options;
96 foreach (
const QString &entry, QString::fromLatin1(serverAddressEnvVar.mid(pos + 1)).split(QLatin1Char(
','))) {
97 const QStringList pair = entry.split(QLatin1Char(
'='));
98 if (pair.size() != 2) {
101 options.insert(pair.first(), pair.last());
103 kDebug() << protocol << options;
105 if (protocol ==
"tcp") {
106 serverAddress = options.value(QLatin1String(
"host"));
107 port = options.value(QLatin1String(
"port")).toUInt();
109 }
else if (protocol ==
"unix") {
110 serverAddress = options.value(QLatin1String(
"path"));
111 }
else if (protocol ==
"pipe") {
112 serverAddress = options.value(QLatin1String(
"name"));
117 if (serverAddress.isEmpty()) {
119 const QFileInfo fileInfo(connectionConfigFile);
120 if (!fileInfo.exists()) {
121 kDebug() <<
"Akonadi Client Session: connection config file '"
122 "akonadi/akonadiconnectionrc' can not be found in"
123 << XdgBaseDirs::homePath(
"config") <<
"nor in any of"
124 << XdgBaseDirs::systemPathList(
"config");
126 const QSettings connectionSettings(connectionConfigFile, QSettings::IniFormat);
128 #ifdef Q_OS_WIN //krazy:exclude=cpp
129 serverAddress = connectionSettings.value(QLatin1String(
"Data/NamedPipe"), QLatin1String(
"Akonadi")).toString();
131 const QString defaultSocketDir = Internal::xdgSaveDir(
"data");
132 serverAddress = connectionSettings.value(QLatin1String(
"Data/UnixPath"), QString(defaultSocketDir + QLatin1String(
"/akonadiserver.socket"))).toString();
140 socket = localSocket =
new QLocalSocket(mParent);
141 mParent->connect(localSocket, SIGNAL(error(QLocalSocket::LocalSocketError)), SLOT(socketError(QLocalSocket::LocalSocketError)));
143 socket = tcpSocket =
new QTcpSocket(mParent);
144 mParent->connect(tcpSocket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(socketError(QAbstractSocket::SocketError)));
146 mParent->connect(socket, SIGNAL(disconnected()), SLOT(socketDisconnected()));
147 mParent->connect(socket, SIGNAL(readyRead()), SLOT(dataReceived()));
151 kDebug() <<
"connectToServer" << serverAddress;
153 localSocket->connectToServer(serverAddress);
155 tcpSocket->connectToHost(serverAddress, port);
163 return Internal::xdgSaveDir(
"config") + QLatin1String(
"/akonadiconnectionrc");
166 void SessionPrivate::socketError(QLocalSocket::LocalSocketError)
168 Q_ASSERT(mParent->sender() == socket);
169 kWarning() <<
"Socket error occurred:" << qobject_cast<QLocalSocket *>(socket)->errorString();
170 socketDisconnected();
173 void SessionPrivate::socketError(QAbstractSocket::SocketError)
175 Q_ASSERT(mParent->sender() == socket);
176 kWarning() <<
"Socket error occurred:" << qobject_cast<QTcpSocket *>(socket)->errorString();
177 socketDisconnected();
180 void SessionPrivate::socketDisconnected()
183 currentJob->d_ptr->lostConnection();
188 void SessionPrivate::dataReceived()
190 while (socket->bytesAvailable() > 0) {
191 if (parser->continuationSize() > 1) {
192 const QByteArray data = socket->read(qMin(socket->bytesAvailable(), parser->continuationSize() - 1));
193 parser->parseBlock(data);
194 }
else if (socket->canReadLine()) {
195 if (!parser->parseNextLine(socket->readLine())) {
200 logFile->write(
"S: " + parser->data());
204 if (parser->tag() == QByteArray(
"0")) {
205 if (parser->data().startsWith(
"OK")) {
206 writeData(
"1 CAPABILITY (" + ImapParser::join(sCapabilities,
" ") +
")");
208 kWarning() <<
"Unable to login to Akonadi server:" << parser->data();
210 QTimer::singleShot(1000, mParent, SLOT(
reconnect()));
215 if (parser->tag() == QByteArray(
"1")) {
216 if (parser->data().startsWith(
"OK")) {
220 kDebug() <<
"Unhandled server capability response:" << parser->data();
225 if (parser->tag() ==
"*" && parser->data().startsWith(
"OK Akonadi")) {
226 const int pos = parser->data().indexOf(
"[PROTOCOL");
229 ImapParser::parseNumber(parser->data(), tmp, 0, pos + 9);
230 protocolVersion = tmp;
231 Internal::setServerProtocolVersion(tmp);
233 kDebug() <<
"Server protocol version is:" << protocolVersion;
235 writeData(
"0 LOGIN " + ImapParser::quote(sessionId) +
'\n');
240 currentJob->d_ptr->handleResponse(parser->tag(), parser->data());
252 bool SessionPrivate::canPipelineNext()
254 if (queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH) {
257 if (pipeline.isEmpty() && currentJob) {
258 return currentJob->d_ptr->mWriteFinished;
260 if (!pipeline.isEmpty()) {
261 return pipeline.last()->d_ptr->mWriteFinished;
266 void SessionPrivate::doStartNext()
268 if (!connected || (queue.isEmpty() && pipeline.isEmpty())) {
271 if (canPipelineNext()) {
273 pipeline.enqueue(nextJob);
280 if (!pipeline.isEmpty()) {
281 currentJob = pipeline.dequeue();
283 currentJob = queue.dequeue();
284 startJob(currentJob);
288 void SessionPrivate::startJob(
Job *job)
290 if (protocolVersion < minimumProtocolVersion()) {
292 job->setErrorText(i18n(
"Protocol version %1 found, expected at least %2", protocolVersion, minimumProtocolVersion()));
295 job->d_ptr->startQueued();
304 void SessionPrivate::jobDone(KJob *job)
308 if (job == currentJob) {
309 if (pipeline.isEmpty()) {
313 currentJob = pipeline.dequeue();
324 void SessionPrivate::jobWriteFinished(
Akonadi::Job *job)
326 Q_ASSERT((job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()));
332 void SessionPrivate::jobDestroyed(QObject *job)
335 jobDone(
static_cast<KJob *
>(job));
341 QObject::connect(job, SIGNAL(result(KJob*)), mParent, SLOT(jobDone(KJob*)));
343 QObject::connect(job, SIGNAL(destroyed(QObject*)), mParent, SLOT(jobDestroyed(QObject*)));
355 logFile->write(
"C: " + data);
356 if (!data.endsWith(
'\n')) {
357 logFile->write(
"\n");
365 kWarning() <<
"Trying to write while session is disconnected!" << kBacktrace();
376 Q_FOREACH (
Job *job, queue) {
378 job->kill(KJob::EmitResult);
387 foreach (
Job *job, queue) {
388 job->d_ptr->updateItemRevision(itemId, oldRevision, newRevision);
394 SessionPrivate::SessionPrivate(
Session *parent)
404 void SessionPrivate::init(
const QByteArray &
id)
407 parser =
new ImapParser();
412 sessionId = QCoreApplication::instance()->applicationName().toUtf8()
413 +
'-' + QByteArray::number(qrand());
426 const QByteArray sessionLogFile = qgetenv(
"AKONADI_SESSION_LOGFILE");
427 if (!sessionLogFile.isEmpty()) {
428 logFile =
new QFile(QString::fromLatin1(
"%1.%2.%3").arg(QString::fromLatin1(sessionLogFile))
429 .arg(QString::number(QApplication::applicationPid()))
430 .arg(QString::fromLatin1(sessionId)),
432 if (!logFile->open(QIODevice::WriteOnly | QIODevice::Truncate)) {
433 kWarning() <<
"Failed to open Akonadi Session log file" << logFile->fileName();
447 socket->disconnect(mParent);
451 QMetaObject::invokeMethod(mParent,
"reconnect", Qt::QueuedConnection);
479 static QThreadStorage<Session *> instances;
483 Q_ASSERT_X(!sessionId.isEmpty(),
"SessionPrivate::createDefaultSession",
484 "You tried to create a default session with empty session id!");
485 Q_ASSERT_X(!instances.hasLocalData(),
"SessionPrivate::createDefaultSession",
486 "You tried to create a default session twice!");
488 instances.setLocalData(
new Session(sessionId));
493 instances.setLocalData(session);
498 if (!instances.hasLocalData()) {
499 instances.setLocalData(
new Session());
501 return instances.localData();
506 foreach (
Job *job, d->queue) {
507 job->kill(KJob::EmitResult);
510 foreach (
Job *job, d->pipeline) {
511 job->d_ptr->mStarted =
false;
512 job->kill(KJob::EmitResult);
516 d->currentJob->d_ptr->mStarted =
false;
517 d->currentJob->kill(KJob::EmitResult);
522 #include "moc_session.cpp"