postgres/contrib/dblink/dblink.c

285 lines
6.7 KiB
C

/*
* dblink.c
*
* Functions returning results from a remote database
*
* Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001;
*
* Permission to use, copy, modify, and distribute this software and its
* documentation for any purpose, without fee, and without a written agreement
* is hereby granted, provided that the above copyright notice and this
* paragraph and the following two paragraphs appear in all copies.
*
* IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
* DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
* LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
* DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
* AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
* ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
* PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
*
*/
#include "dblink.h"
PG_FUNCTION_INFO_V1(dblink);
Datum
dblink(PG_FUNCTION_ARGS)
{
PGconn *conn = NULL;
PGresult *res = NULL;
dblink_results *results;
char *optstr;
char *sqlstatement;
char *curstr = "DECLARE mycursor CURSOR FOR ";
char *execstatement;
char *msg;
int ntuples = 0;
ReturnSetInfo *rsi;
if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) {
elog(ERROR, "dblink: NULL arguments are not permitted");
}
if (fcinfo->resultinfo == NULL || ! IsA(fcinfo->resultinfo, ReturnSetInfo)) {
elog(ERROR, "dblink: function called in context that does not accept a set result");
}
optstr = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0))));
sqlstatement = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1))));
if (fcinfo->flinfo->fn_extra == NULL) {
conn = PQconnectdb(optstr);
if (PQstatus(conn) == CONNECTION_BAD)
{
msg = pstrdup(PQerrorMessage(conn));
PQfinish(conn);
elog(ERROR, "dblink: connection error: %s", msg);
}
res = PQexec(conn, "BEGIN");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
msg = pstrdup(PQerrorMessage(conn));
PQclear(res);
PQfinish(conn);
elog(ERROR, "dblink: begin error: %s", msg);
}
PQclear(res);
execstatement = (char *) palloc(strlen(curstr) + strlen(sqlstatement) + 1);
if (execstatement != NULL) {
strcpy(execstatement, curstr);
strcat(execstatement, sqlstatement);
strcat(execstatement, "\0");
} else {
elog(ERROR, "dblink: insufficient memory" );
}
res = PQexec(conn, execstatement);
if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK))
{
msg = pstrdup(PQerrorMessage(conn));
PQclear(res);
PQfinish(conn);
elog(ERROR, "dblink: sql error: %s", msg);
} else {
/*
* got results, start fetching them
*/
PQclear(res);
res = PQexec(conn, "FETCH ALL in mycursor");
if (!res || PQresultStatus(res) != PGRES_TUPLES_OK) {
msg = pstrdup(PQerrorMessage(conn));
PQclear(res);
PQfinish(conn);
elog(ERROR, "dblink: sql error: %s", msg);
}
ntuples = PQntuples(res);
if (ntuples > 0) {
results = init_dblink_results(fcinfo->flinfo->fn_mcxt);
results->tup_num = 0;
results->res = res;
res = NULL;
fcinfo->flinfo->fn_extra = (void *) results;
results = NULL;
results = fcinfo->flinfo->fn_extra;
/* close the cursor */
res = PQexec(conn, "CLOSE mycursor");
PQclear(res);
/* commit the transaction */
res = PQexec(conn, "COMMIT");
PQclear(res);
/* close the connection to the database and cleanup */
PQfinish(conn);
rsi = (ReturnSetInfo *)fcinfo->resultinfo;
rsi->isDone = ExprMultipleResult;
PG_RETURN_POINTER(results);
} else {
PQclear(res);
/* close the cursor */
res = PQexec(conn, "CLOSE mycursor");
PQclear(res);
/* commit the transaction */
res = PQexec(conn, "COMMIT");
PQclear(res);
/* close the connection to the database and cleanup */
PQfinish(conn);
rsi = (ReturnSetInfo *)fcinfo->resultinfo;
rsi->isDone = ExprEndResult ;
PG_RETURN_NULL();
}
}
} else {
/*
* check for more results
*/
results = fcinfo->flinfo->fn_extra;
results->tup_num++;
ntuples = PQntuples(results->res);
if (results->tup_num < ntuples) {
/*
* fetch them if available
*/
rsi = (ReturnSetInfo *)fcinfo->resultinfo;
rsi->isDone = ExprMultipleResult;
PG_RETURN_POINTER(results);
} else {
/*
* or if no more, clean things up
*/
results = fcinfo->flinfo->fn_extra;
PQclear(results->res);
rsi = (ReturnSetInfo *)fcinfo->resultinfo;
rsi->isDone = ExprEndResult ;
PG_RETURN_NULL();
}
}
PG_RETURN_NULL();
}
/*
* dblink_tok
* parse dblink output string
* return fldnum item (0 based)
* based on provided field separator
*/
PG_FUNCTION_INFO_V1(dblink_tok);
Datum
dblink_tok(PG_FUNCTION_ARGS)
{
dblink_results *results;
int fldnum;
text *result_text;
char *result;
int nfields = 0;
int text_len = 0;
if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) {
elog(ERROR, "dblink: NULL arguments are not permitted");
}
results = (dblink_results *) PG_GETARG_POINTER(0);
if (results == NULL) {
elog(ERROR, "dblink: function called with invalid result pointer");
}
fldnum = PG_GETARG_INT32(1);
if (fldnum < 0) {
elog(ERROR, "dblink: field number < 0 not permitted");
}
nfields = PQnfields(results->res);
if (fldnum > (nfields - 1)) {
elog(ERROR, "dblink: field number %d does not exist", fldnum);
}
if (PQgetisnull(results->res, results->tup_num, fldnum) == 1) {
PG_RETURN_NULL();
} else {
text_len = PQgetlength(results->res, results->tup_num, fldnum);
result = (char *) palloc(text_len + 1);
if (result != NULL) {
strcpy(result, PQgetvalue(results->res, results->tup_num, fldnum));
strcat(result, "\0");
} else {
elog(ERROR, "dblink: insufficient memory" );
}
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result)));
PG_RETURN_TEXT_P(result_text);
}
}
/*
* internal functions
*/
/*
* init_dblink_results
* - create an empty dblink_results data structure
*/
dblink_results *
init_dblink_results(MemoryContext fn_mcxt)
{
MemoryContext oldcontext;
dblink_results *retval;
oldcontext = MemoryContextSwitchTo(fn_mcxt);
retval = (dblink_results *) palloc(sizeof(dblink_results));
MemSet(retval, 0, sizeof(dblink_results));
retval->tup_num = -1;
retval->res = NULL;
MemoryContextSwitchTo(oldcontext);
return retval;
}