OSDN Git Service

1.3.0
[pgdbmsstats/pg_dbms_stats.git] / import.c
1 /*
2  * import.c
3  *
4  * Copyright (c) 2012, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
5  */
6 #include "postgres.h"
7
8 #include "access/xact.h"
9 #include "catalog/namespace.h"
10 #include "catalog/pg_type.h"
11 #include "commands/copy.h"
12 #include "executor/spi.h"
13 #include "libpq/pqformat.h"
14 #include "mb/pg_wchar.h"
15 #include "tcop/tcopprot.h"
16 #include "utils/builtins.h"
17 #include "utils/lsyscache.h"
18 #include "utils/syscache.h"
19 #if PG_VERSION_NUM >= 90200
20 #include "catalog/pg_class.h"
21 #endif
22
23 #include "pg_dbms_stats.h"
24
25 #if PG_VERSION_NUM >= 90200
26 #define RELATION_PARAM_NUM      9
27 #else
28 #define RELATION_PARAM_NUM      8
29 #endif
30
31 extern PGDLLIMPORT bool standard_conforming_strings;
32
33 PG_FUNCTION_INFO_V1(dbms_stats_import);
34
35 Datum   dbms_stats_import(PG_FUNCTION_ARGS);
36
37 static void get_args(FunctionCallInfo fcinfo, char **nspname, char **relname,
38                                 char **attname, char **filename);
39 static void spi_exec_utility(const char *query);
40 static void spi_exec_query(const char *query, int nargs, Oid *argtypes,
41                                 SPIPlanPtr *plan, Datum *values, const char *nulls, int result);
42 static void import_stats_from_file(char *filename, char *nspname, char *relname,
43                                 char *attname);
44
45 /*
46  * dbms_stats_import
47  *   Import exported statistics from stdin or a file.
48  *
49  *   Order of arguments:
50  *     1) schema name
51  *     2) relation oid
52  *     3) attribute name
53  *     4) absolute path of source file, or 'stdin' (case insensitive)
54  */
55 Datum
56 dbms_stats_import(PG_FUNCTION_ARGS)
57 {
58         char               *nspname;
59         char               *relname;
60         char               *attname;
61         char               *filename;   /* filename, or NULL for STDIN */
62         int                             ret;
63         int                             i;
64         uint32                  r_num;
65         HeapTuple          *r_tups;
66         TupleDesc               r_tupdesc;
67         SPIPlanPtr              r_upd_plan = NULL;
68         SPIPlanPtr              r_ins_plan = NULL;
69         SPIPlanPtr              c_sel_plan = NULL;
70         SPIPlanPtr              c_del_plan = NULL;
71         SPIPlanPtr              c_ins_plan = NULL;
72
73         /* get validated arguments */
74         get_args(fcinfo, &nspname, &relname, &attname, &filename);
75
76         /* for debug use */
77         elog(DEBUG3, "%s() f=%s n=%s r=%s a=%s", __FUNCTION__,
78                  filename ? filename : "(null)",
79                  nspname ? nspname : "(null)",
80                  relname ? relname : "(null)",
81                  attname ? attname : "(null)");
82
83         /* connect to SPI */
84         ret = SPI_connect();
85         if (ret != SPI_OK_CONNECT)
86                 elog(ERROR, "pg_dbms_stats: SPI_connect => %d", ret);
87
88         /* lock dummy statistics tables. */
89         spi_exec_utility("LOCK dbms_stats._relation_stats_locked"
90                                                 " IN SHARE UPDATE EXCLUSIVE MODE");
91         spi_exec_utility("LOCK dbms_stats._column_stats_locked"
92                                                 " IN SHARE UPDATE EXCLUSIVE MODE");
93
94         /*
95          * Create a temp table to save the statistics to import.
96          * This table should fit with the content of export files.
97          */
98         spi_exec_utility("CREATE TEMP TABLE dbms_stats_work_stats ("
99                                          "nspname          name   NOT NULL,"
100                                          "relname          name   NOT NULL,"
101                                          "relpages         int4   NOT NULL,"
102                                          "reltuples        float4 NOT NULL,"
103 #if PG_VERSION_NUM >= 90200
104                                          "relallvisible    int4   NOT NULL,"
105 #endif
106                                          "curpages         int4   NOT NULL,"
107                                          "last_analyze     timestamp with time zone,"
108                                          "last_autoanalyze timestamp with time zone,"
109                                          "attname          name,"
110                                          "nspname_of_typename name,"
111                                          "typname name,"
112                                          "atttypmod int4,"
113                                          "stainherit       bool,"
114                                          "stanullfrac      float4,"
115                                          "stawidth         int4,"
116                                          "stadistinct      float4,"
117                                          "stakind1         int2,"
118                                          "stakind2         int2,"
119                                          "stakind3         int2,"
120                                          "stakind4         int2,"
121 #if PG_VERSION_NUM >= 90200
122                                          "stakind5         int2,"
123 #endif
124                                          "staop1           oid,"
125                                          "staop2           oid,"
126                                          "staop3           oid,"
127                                          "staop4           oid,"
128 #if PG_VERSION_NUM >= 90200
129                                          "staop5           oid,"
130 #endif
131                                          "stanumbers1      float4[],"
132                                          "stanumbers2      float4[],"
133                                          "stanumbers3      float4[],"
134                                          "stanumbers4      float4[],"
135 #if PG_VERSION_NUM >= 90200
136                                          "stanumbers5      float4[],"
137 #endif
138                                          "stavalues1       dbms_stats.anyarray,"
139                                          "stavalues2       dbms_stats.anyarray,"
140                                          "stavalues3       dbms_stats.anyarray,"
141                                          "stavalues4       dbms_stats.anyarray"
142 #if PG_VERSION_NUM >= 90200
143                                         ",stavalues5       dbms_stats.anyarray"
144 #endif
145                                          ")");
146
147         /* load the statistics from export file to the temp table */
148         import_stats_from_file(filename, nspname, relname, attname);
149
150         /* Determine the Oid of local table from the tablename and schemaname. */
151         /* TODO 可視ページ数に対応する */
152         ret = SPI_execute("SELECT DISTINCT w.nspname, w.relname, c.oid, "
153                                                          "w.relpages, w.reltuples, "
154                                                          "w.curpages, w.last_analyze, w.last_autoanalyze "
155 #if PG_VERSION_NUM >= 90200
156                                                          ",w.relallvisible "
157 #endif
158                                                 "FROM pg_catalog.pg_class c "
159                                                 "JOIN pg_catalog.pg_namespace n "
160                                                   "ON (c.relnamespace = n.oid) "
161                                            "RIGHT JOIN dbms_stats_work_stats w "
162                                                   "ON (w.relname = c.relname AND w.nspname = n.nspname) "
163                                            "ORDER BY 1, 2", false, 0);
164         if (ret != SPI_OK_SELECT)
165                 elog(ERROR, "pg_dbms_stats: SPI_execute => %d", ret);
166
167         /*
168          * If there is no record in the staging table after loading source and
169          * deleting unnecessary records, we treat it as an error.
170          */
171         if (SPI_processed == 0)
172                 elog(ERROR, "no per-table statistic data to be imported");
173
174         /* */
175         r_num = SPI_processed;
176         r_tups = SPI_tuptable->vals;
177         r_tupdesc = SPI_tuptable->tupdesc;
178         for (i = 0; i < r_num; i++)
179         {
180                 bool    isnull;
181                 Datum   w_nspname;
182                 Datum   w_relname;
183                 Datum   w_relid;
184                 Datum   values[9];
185                 char    nulls[9] = {'t', 't', 't', 't', 't', 't', 't', 't', 't'};
186                 Oid             r_types[9] = {NAMEOID, NAMEOID, INT4OID, FLOAT4OID, INT4OID,
187                                                           TIMESTAMPTZOID, TIMESTAMPTZOID, OIDOID, INT4OID};
188                 Oid             c_types[5] = {OIDOID, INT2OID, NAMEOID, NAMEOID,
189                                                           NAMEOID};
190                 uint32          c_num;
191                 TupleDesc       c_tupdesc;
192                 HeapTuple  *c_tups;
193                 int                     j;
194
195                 values[0] = w_nspname = SPI_getbinval(r_tups[i], r_tupdesc, 1, &isnull);
196                 values[1] = w_relname = SPI_getbinval(r_tups[i], r_tupdesc, 2, &isnull);
197                 values[7] = w_relid = SPI_getbinval(r_tups[i], r_tupdesc, 3, &isnull);
198                 if (isnull)
199                 {
200                         elog(WARNING, "relation \"%s.%s\" does not exist",
201                                         DatumGetName(w_nspname)->data,
202                                         DatumGetName(w_relname)->data);
203                         continue;
204                 }
205
206                 values[2] = SPI_getbinval(r_tups[i], r_tupdesc, 4, &isnull);
207                 values[3] = SPI_getbinval(r_tups[i], r_tupdesc, 5, &isnull);
208                 values[4] = SPI_getbinval(r_tups[i], r_tupdesc, 6, &isnull);
209                 values[5] = SPI_getbinval(r_tups[i], r_tupdesc, 7, &isnull);
210                 nulls[5] = isnull ? 'n' : 't';
211                 values[6] = SPI_getbinval(r_tups[i], r_tupdesc, 8, &isnull);
212                 nulls[6] = isnull ? 'n' : 't';
213                 values[8] = SPI_getbinval(r_tups[i], r_tupdesc, 9, &isnull);
214
215                 /*
216                  * First we try UPDATE with the oid.  When no record matched, try
217                  * INSERT.  We can't use DELETE-then-INSERT method because we have FK
218                  * on _relation_stats_locked so DELETE would delete child records in
219                  * _column_stats_locked undesirably.
220                  */
221                 spi_exec_query("UPDATE dbms_stats._relation_stats_locked SET "
222                                 "relname = quote_ident($1) || '.' || quote_ident($2), "
223                                 "relpages = $3, reltuples = $4, "
224 #if PG_VERSION_NUM >= 90200
225                                 "relallvisible = $9, "
226 #endif
227                                 "curpages = $5, last_analyze = $6, last_autoanalyze = $7 "
228                                 "WHERE relid = $8",
229                                 RELATION_PARAM_NUM, r_types, &r_upd_plan, values, nulls,
230                                 SPI_OK_UPDATE);
231                 if (SPI_processed == 0)
232                 {
233                         spi_exec_query("INSERT INTO dbms_stats._relation_stats_locked "
234                                         "(relname, relpages, reltuples, curpages, "
235                                         "last_analyze, last_autoanalyze, relid"
236 #if PG_VERSION_NUM >= 90200
237                                         ", relallvisible"
238 #endif
239                                         ") VALUES (quote_ident($1) || '.' || quote_ident($2), "
240                                         "$3, $4, $5, $6, $7, $8"
241 #if PG_VERSION_NUM >= 90200
242                                         ", $9"
243 #endif
244                                         ")",
245                                         RELATION_PARAM_NUM, r_types, &r_ins_plan, values, nulls,
246                                         SPI_OK_INSERT);
247                         /*  If we failed to insert, we can't proceed. */
248                         if (SPI_processed != 1)
249                                 elog(ERROR, "failed to insert import data");
250                 }
251
252                 elog(DEBUG2, "\"%s.%s\" relation statistic import",
253                         DatumGetName(w_nspname)->data, DatumGetName(w_relname)->data);
254
255                 /*
256                  * Determine the attnum of the attribute with given name, and load
257                  * statistics from temp table into dbms._column_stats_locked.
258                  */
259                 spi_exec_query("SELECT w.stainherit, w.attname, a.attnum, "
260                                                           "w.nspname_of_typename, tn.nspname, "
261                                                           "w.typname, t.typname, w.atttypmod, a.atttypmod "
262                                                  "FROM pg_catalog.pg_class c "
263                                                  "JOIN pg_catalog.pg_namespace cn "
264                                                    "ON (cn.oid = c.relnamespace) "
265                                                  "JOIN pg_catalog.pg_attribute a "
266                                                    "ON (a.attrelid = c.oid) "
267                                                  "JOIN pg_catalog.pg_type t "
268                                                    "ON (t.oid = a.atttypid) "
269                                                  "JOIN pg_catalog.pg_namespace tn "
270                                                    "ON (tn.oid = t.typnamespace) "
271                                                 "RIGHT JOIN dbms_stats_work_stats w "
272                                                    "ON (w.nspname = cn.nspname AND w.relname = c.relname "
273                                                            "AND (w.attname = a.attname OR w.attname = '')) "
274                                                 "WHERE w.nspname = $1 AND w.relname = $2 "
275                                                   "AND a.attnum > 0"
276                                                 "ORDER BY 1, 3, 2",
277                                 2, r_types, &c_sel_plan, values, NULL, SPI_OK_SELECT);
278
279                 /* This query ought to return at least one record. */
280                 if (SPI_processed == 0)
281                         elog(ERROR, "no per-column statistic data to be imported");
282
283                 values[0] = w_relid;
284                 values[2] = w_nspname;
285                 values[3] = w_relname;
286
287                 c_num = SPI_processed;
288                 c_tups = SPI_tuptable->vals;
289                 c_tupdesc = SPI_tuptable->tupdesc;
290                 for (j = 0; j < c_num; j++)
291                 {
292                         char   *w_typnamespace;
293                         char   *a_typnamespace;
294                         char   *w_typname;
295                         char   *a_typname;
296                         int             w_typmod;
297                         int             a_typmod;
298
299                         /*
300                          * If we have only per-relation statistics in source, all of
301                          * column_stats_effective for per-column statistics are NULL.
302                          */
303                         (void) SPI_getbinval(c_tups[j], c_tupdesc, 1, &isnull);
304                         if (isnull)
305                                 continue;
306
307                         /*
308                          * If there is no column with given name, we skip the rest of
309                          * import process.
310                          */
311                         values[4] = SPI_getbinval(c_tups[j], c_tupdesc, 2, &isnull);
312                         values[1] = SPI_getbinval(c_tups[j], c_tupdesc, 3, &isnull);
313                         if (isnull)
314                         {
315                                 elog(WARNING, "column \"%s\" of \"%s.%s\" does not exist",
316                                         DatumGetName(values[4])->data,
317                                                 DatumGetName(w_nspname)->data,
318                                                 DatumGetName(w_relname)->data);
319                                 continue;
320                         }
321
322                         /*
323                          * If the destination column has different data type from source
324                          * column, we stop importing to avoid corrupted statistics.
325                          */
326                         w_typnamespace = DatumGetName(SPI_getbinval(c_tups[j], c_tupdesc, 4,
327                                                 &isnull))->data;
328                         a_typnamespace = DatumGetName(SPI_getbinval(c_tups[j], c_tupdesc, 5,
329                                                 &isnull))->data;
330                         w_typname = DatumGetName(SPI_getbinval(c_tups[j], c_tupdesc, 6,
331                                                 &isnull))->data;
332                         a_typname = DatumGetName(SPI_getbinval(c_tups[j], c_tupdesc, 7,
333                                                 &isnull))->data;
334                         if (strcmp(w_typnamespace, a_typnamespace) != 0 ||
335                                 strcmp(w_typname, a_typname) != 0)
336                         {
337                                 ereport(WARNING,
338                                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
339                                                  errmsg("column \"%s\" is of type \"%s.%s\""
340                                                                 " but import data is of type \"%s.%s\"",
341                                                                 DatumGetName(values[4])->data,
342                                                                 a_typnamespace, a_typname,
343                                                                 w_typnamespace, w_typname)));
344                                 continue;
345                         }
346
347                         /*
348                          * If the atttypmod of the destination column is different from the
349                          * one of source, column, we stop importing to avoid corrupted
350                          * statistics.
351                          */
352                         w_typmod = DatumGetInt32(SPI_getbinval(c_tups[j], c_tupdesc, 8,
353                                                 &isnull));
354                         a_typmod = DatumGetInt32(SPI_getbinval(c_tups[j], c_tupdesc, 9,
355                                                 &isnull));
356                         if (w_typmod != a_typmod)
357                         {
358                                 ereport(WARNING,
359                                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
360                                                  errmsg("column \"%s\" is of atttypmod %d"
361                                                                 " but import data is of atttypmod %d",
362                                                                 DatumGetName(values[4])->data,
363                                                                 a_typmod, a_typmod)));
364                                 continue;
365                         }
366
367                         /*
368                          * First delete old dummy statistics, and import new one.  We use
369                          * DELETE-then-INSERT method here to simplify codes.
370                          */
371                         spi_exec_query("DELETE FROM dbms_stats._column_stats_locked "
372                                         "WHERE starelid = $1 AND staattnum = $2", 2, c_types,
373                                         &c_del_plan, values, NULL, SPI_OK_DELETE);
374
375                         spi_exec_query("INSERT INTO dbms_stats._column_stats_locked "
376                                 "SELECT $1, $2, "
377                                 "stainherit, stanullfrac, stawidth, stadistinct, "
378                                 "stakind1, stakind2, stakind3, stakind4, "
379 #if PG_VERSION_NUM >= 90200
380                                 "stakind5, "
381 #endif
382                                 "staop1, staop2, staop3, staop4, "
383 #if PG_VERSION_NUM >= 90200
384                                 "staop5, "
385 #endif
386                                 "stanumbers1, stanumbers2, stanumbers3, stanumbers4, "
387 #if PG_VERSION_NUM >= 90200
388                                 "stanumbers5, "
389 #endif
390                                 "stavalues1, stavalues2, stavalues3, stavalues4 "
391 #if PG_VERSION_NUM >= 90200
392                                 ", stavalues5 "
393 #endif
394                                 "FROM dbms_stats_work_stats "
395                                 "WHERE nspname = $3 AND relname = $4 "
396                                 "AND attname = $5 "
397                                 "ORDER BY 3",
398                                 5, c_types, &c_ins_plan, values, NULL, SPI_OK_INSERT);
399
400                         elog(DEBUG2, "\"%s.%s.%s\" column statistic import",
401                                 DatumGetName(w_nspname)->data,
402                                 DatumGetName(w_relname)->data, DatumGetName(values[4])->data);
403                 }
404
405                 if (c_num == 0)
406                         elog(DEBUG2, "\"%s.%s\" column statistic no data",
407                                 DatumGetName(w_nspname)->data, DatumGetName(w_relname)->data);
408         }
409
410         /* release the cached plan */
411         SPI_freeplan(r_upd_plan);
412         SPI_freeplan(r_ins_plan);
413         SPI_freeplan(c_sel_plan);
414         SPI_freeplan(c_del_plan);
415         SPI_freeplan(c_ins_plan);
416
417         /* delete the temp table */
418         spi_exec_utility("DROP TABLE dbms_stats_work_stats");
419
420         /* disconnect SPI */
421         ret = SPI_finish();
422         if (ret != SPI_OK_FINISH)
423                 elog(ERROR, "pg_dbms_stats: SPI_finish => %d", ret);
424
425         /*
426          * Recover the protocol state because it has been invalidated by our
427          * COPY-from-stdin.
428          */
429         if (filename == NULL)
430                 pq_puttextmessage('C', "dbms_stats_import");
431
432         PG_RETURN_VOID();
433 }
434
435 /*
436  * spi_exec_utility
437  *   Execute given utility command via SPI.
438  */
439 static void
440 spi_exec_utility(const char *query)
441 {
442         int     ret;
443
444         ret = SPI_exec(query, 0);
445         if (ret != SPI_OK_UTILITY)
446                 elog(ERROR, "pg_dbms_stats: SPI_exec => %d", ret);
447 }
448
449 /*
450  * spi_exec_query
451  *   Execute given SQL command via SPI.
452  *   The plan will be cached by SPI_prepare if it hasn't been.
453  */
454 static void
455 spi_exec_query(const char *query, int nargs, Oid *argtypes, SPIPlanPtr *plan,
456                                 Datum *values, const char *nulls, int result)
457 {
458         int     ret;
459
460         if (*plan == NULL)
461                 *plan = SPI_prepare(query, nargs, argtypes);
462
463         ret = SPI_execute_plan(*plan, values, nulls, false, 0);
464         if (ret != result)
465                 elog(ERROR, "pg_dbms_stats: SPI_execute_plan => %d", ret);
466 }
467
468 static char *
469 get_text_arg(FunctionCallInfo fcinfo, int n, bool is_name)
470 {
471         text   *arg;
472         char   *s;
473         int             len;
474         char   *result;
475
476         arg = PG_GETARG_TEXT_PP(n);
477         s = text_to_cstring(arg);
478         PG_FREE_IF_COPY(arg, n);
479
480         if (!is_name)
481                 return s;
482
483         len = strlen(s);
484
485         /* Truncate oversize input */
486         if (len >= NAMEDATALEN)
487                 len = pg_mbcliplen(s, len, NAMEDATALEN - 1);
488
489         /* We use palloc0 here to ensure result is zero-padded */
490         result = (char *) palloc0(NAMEDATALEN);
491         memcpy(result, s, len);
492         pfree(s);
493
494         return result;
495 }
496
497 /*
498  * get_args
499  *   Retrieve arguments from FunctionCallInfo and validate them.  We assume
500  *   that order of arguments is:
501  *     1) schema name
502  *     2) relation oid
503  *     3) attribute name
504  *     4) absolute path of source file, or 'stdin' (case insensitive)
505  */
506 static void
507 get_args(FunctionCallInfo fcinfo, char **nspname, char **relname,
508                 char **attname, char **filename)
509 {
510         Oid                             nspid;
511         Oid                             relid;
512         AttrNumber              attnum;
513         HeapTuple               tp;
514         Form_pg_class   reltup;
515         char                    relkind;
516
517         *nspname = *relname = *attname = *filename = NULL;
518
519         /*
520          * First of all, check whether combination of arguments is consistent.
521          *
522          * 1) relid and attname can't be used with schemaname.
523          * 2) relid is required when attname is given.
524          */
525         if (!PG_ARGISNULL(0) && (!PG_ARGISNULL(1) || !PG_ARGISNULL(2)))
526                 elog(ERROR, "relid and attnum can not be used with schemaname");
527         else if (PG_ARGISNULL(1) && !PG_ARGISNULL(2))
528                 elog(ERROR, "relation is required");
529
530         /* filepath validation */
531         if (!PG_ARGISNULL(3))
532         {
533                 *filename = get_text_arg(fcinfo, 3, false);
534
535                 /*
536                  * If given filepath is "stdin", clear filename to tell caller to
537                  * import from standard input.  Note that we accept only absolute path
538                  * for security reason.
539                  */
540                 if (pg_strcasecmp(*filename, "stdin") == 0)
541                         *filename = NULL;
542                 else if (!is_absolute_path(*filename))
543                         ereport(ERROR,
544                                         (errcode(ERRCODE_INVALID_NAME),
545                                          errmsg("relative path not allowed for dbms_stats_export"
546                                                         " to file")));
547         }
548
549         /* schemaname validation */
550         if (!PG_ARGISNULL(0))
551         {
552                 *nspname = get_text_arg(fcinfo, 0, true);
553
554                 /* check that a schema with given name exists */
555                 get_namespace_oid(*nspname, false);
556
557                 /* check that given schema is not one of system schemas */
558                 if (dbms_stats_is_system_schema_internal(*nspname))
559                         elog(ERROR, "\"%s\" is a system catalog", *nspname);
560         }
561
562         /* table oid validation */
563         if (!PG_ARGISNULL(1))
564         {
565                 relid = PG_GETARG_OID(1);
566                 tp = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
567                 if (!HeapTupleIsValid(tp))
568                         elog(ERROR, "relid %d does not exist", relid);
569
570                 /* check that the target is an ordinary table or an index */
571                 reltup = (Form_pg_class) GETSTRUCT(tp);
572                 *relname = pstrdup(reltup->relname.data);
573                 relkind = reltup->relkind;
574                 nspid = reltup->relnamespace;
575                 ReleaseSysCache(tp);
576
577                 if (relkind != RELKIND_RELATION && relkind != RELKIND_INDEX
578 #if PG_VERSION_NUM >= 90200
579                         && relkind != RELKIND_FOREIGN_TABLE
580 #endif
581                 )
582                         elog(ERROR, "relkind of \"%s\" is \"%c\", can not import",
583                                 get_rel_name(relid), relkind);
584
585                 /* check that the relation is not in one of system schemas */
586                 *nspname = get_namespace_name(nspid);
587                 if (dbms_stats_is_system_schema_internal(*nspname))
588                         elog(ERROR, "\"%s\" is a system catalog", *nspname);
589
590                 /* attribute name validation */
591                 if (!PG_ARGISNULL(2))
592                 {
593                         *attname = get_text_arg(fcinfo, 2, true);
594                         attnum = get_attnum(relid, *attname);
595                         if (!AttributeNumberIsValid(attnum))
596                                 elog(ERROR, "column \"%s\" of \"%s.%s\" does not exist", *attname, *nspname, *relname);
597                 }
598         }
599 }
600
601 /*
602  * appendLiteral - Format a string as a SQL literal, append to buf
603  *
604  * This function was copied from simple_quote_literal() in
605  * src/backend/utils/adt/ruleutils.c
606  */
607 static void
608 appendLiteral(StringInfo buf, const char *val)
609 {
610         const char *valptr;
611
612         /*
613          * We form the string literal according to the prevailing setting of
614          * standard_conforming_strings; we never use E''. User is responsible for
615          * making sure result is used correctly.
616          */
617         appendStringInfoChar(buf, '\'');
618         for (valptr = val; *valptr; valptr++)
619         {
620                 char            ch = *valptr;
621
622                 if (SQL_STR_DOUBLE(ch, !standard_conforming_strings))
623                         appendStringInfoChar(buf, ch);
624                 appendStringInfoChar(buf, ch);
625         }
626         appendStringInfoChar(buf, '\'');
627 }
628
629 /*
630  * import_stats_from_file
631  *       load data from file or stdin into work table, and delete unnecessary
632  *       records.
633  */
634 static void
635 import_stats_from_file(char *filename, char *nspname, char *relname,
636         char *attname)
637 {
638         StringInfoData  buf;
639         List               *parsetree_list;
640         uint64                  processed;
641         Datum                   values[3];
642         Oid                             argtypes[3] = { CSTRINGOID, CSTRINGOID, CSTRINGOID };
643         char                    nulls[3] = { 'n', 'n', 'n' };
644         int                             nargs;
645         int                             ret;
646
647         /* for debug use */
648         elog(DEBUG3, "%s() f=%s n=%s r=%s a=%s", __FUNCTION__,
649                  filename ? filename : "(null)",
650                  nspname ? nspname : "(null)",
651                  relname ? relname : "(null)",
652                  attname ? attname : "(null)");
653
654         /*
655          * Construct COPY statement.  NULL for filename indicates that source is
656          * stdin.
657          */
658         initStringInfo(&buf);
659         appendStringInfoString(&buf, "COPY dbms_stats_work_stats FROM ");
660         if (filename == NULL)
661                 appendStringInfoString(&buf, "stdin");
662         else
663                 appendLiteral(&buf, filename);
664
665         appendStringInfoString(&buf, " (FORMAT 'binary')");
666
667         /* Execute COPY FROM command. */
668         parsetree_list = pg_parse_query(buf.data);
669         processed = DoCopy((CopyStmt *)linitial(parsetree_list), buf.data);
670
671         if (processed == 0)
672                 elog(ERROR, "no data to be imported");
673
674         /*
675          * Delete the statistics other than the specified object's statistic from
676          * the temp table.  We can skip DELETEing staging data when schemaname is
677          * NULL, because it means database-wise import.
678          */
679         if (nspname == NULL)
680                 return;
681
682         resetStringInfo(&buf);
683         appendStringInfoString(&buf,
684                                                    "DELETE FROM dbms_stats_work_stats "
685                                                    " WHERE nspname <> $1::text ");
686         values[0] = CStringGetDatum(nspname);
687         nulls[0] = 't';
688         nargs = 1;
689
690         if (relname != NULL)
691         {
692                 values[1] = CStringGetDatum(relname);
693                 nulls[1] = 't';
694                 nargs++;
695                 appendStringInfoString(&buf, " OR (relname <> $2::text) ");
696
697                 if (attname != NULL)
698                 {
699                         values[2] = CStringGetDatum(attname);
700                         nulls[2] = 't';
701                         nargs++;
702                         appendStringInfoString(&buf, " OR (attname <> $3::text) ");
703                 }
704         }
705
706         ret = SPI_execute_with_args(buf.data, nargs, argtypes, values, nulls,
707                                                                  false, 0);
708         if (ret != SPI_OK_DELETE)
709                 elog(ERROR, "pg_dbms_stats: SPI_execute_with_args => %d", ret);
710 }
711
712 #ifdef UNIT_TEST
713 void test_import(int *passed, int *total);
714 static void test_spi_exec_query(int *passed, int *total);
715 static void test_spi_exec_utility(int *passed, int *total);
716 static void test_appendLiteral(int *passed, int *total);
717
718 #define StringEq(actual, expected)      \
719                 (strcmp((actual), (expected)) == 0 ? 1 : \
720                         (elog(WARNING, "%s-%d failed: [%s]", \
721                                 __FUNCTION__, caseno, (actual)), 0))
722
723 /*
724  * Test appendLiteral function
725  */
726 static void
727 test_appendLiteral(int *passed, int *total)
728 {
729         bool                    org_standard_conforming_strings;
730         int                             caseno = 0;
731         StringInfoData  buf;
732
733         /* Backup current GUC parameters */
734         NewGUCNestLevel();
735         org_standard_conforming_strings = standard_conforming_strings;
736
737         /* Initialize resources for tests */
738         initStringInfo(&buf);
739
740         /*
741          * *-*-1:
742          *   - no special char
743          */
744         caseno++;
745         resetStringInfo(&buf);
746         appendStringInfoString(&buf, "BEFORE");
747         appendLiteral(&buf, "\"abc 123\tあいう\n\"");
748         if (StringEq(buf.data, "BEFORE'\"abc 123\tあいう\n\"'"))
749         {
750                 elog(WARNING, "%s-%d ok", __FUNCTION__, caseno);
751                 (*passed)++;
752         }
753
754         /*
755          * *-*-2:
756          *   - contains special chars (single quote, back slash),
757          *   - standard_conforming_strings is true
758          */
759         caseno++;
760         resetStringInfo(&buf);
761         appendStringInfoString(&buf, "BEFORE");
762         standard_conforming_strings = true;
763         appendLiteral(&buf, "'abc 123\tあいう\n\\");
764         if (StringEq(buf.data, "BEFORE'''abc 123\tあいう\n\\'"))
765         {
766                 elog(WARNING, "%s-%d ok", __FUNCTION__, caseno);
767                 (*passed)++;
768         }
769
770         /*
771          * *-*-3:
772          *   - contains special chars (single quote, back slash),
773          *   - standard_conforming_strings is false
774          */
775         caseno++;
776         resetStringInfo(&buf);
777         appendStringInfoString(&buf, "BEFORE");
778         standard_conforming_strings = false;
779         appendLiteral(&buf, "'abc 123\tあいう\n\\");
780         if (StringEq(buf.data, "BEFORE'''abc 123\tあいう\n\\\\'"))
781         {
782                 elog(WARNING, "%s-%d ok", __FUNCTION__, caseno);
783                 (*passed)++;
784         }
785
786         /*
787          * *-*-4:
788          *   - empty string
789          */
790         caseno++;
791         resetStringInfo(&buf);
792         appendStringInfoString(&buf, "BEFORE");
793         appendLiteral(&buf, "");
794         if (StringEq(buf.data, "BEFORE''"))
795         {
796                 elog(WARNING, "%s-%d ok", __FUNCTION__, caseno);
797                 (*passed)++;
798         }
799
800         /* report # of tests */
801         *total += caseno;
802
803         /* Restore GUC parameters */
804         standard_conforming_strings = org_standard_conforming_strings;
805 }
806
807 static void
808 test_spi_exec_query(int *passed, int *total)
809 {
810         int                             rc;
811         volatile int    caseno = 0;
812         SPIPlanPtr              ptr = NULL;
813         SPIPlanPtr              org_ptr;
814
815         /* Initialize */
816         rc = SPI_connect();
817         if (rc != SPI_OK_CONNECT)
818                 elog(ERROR, "could not connect SPI: %s", SPI_result_code_string(rc));
819
820         /*
821          * *-*-1
822          *   - plan is not cached
823          */
824         caseno++;
825         BeginInternalSubTransaction("test");
826         PG_TRY();
827         {
828                 spi_exec_query("SELECT 1", 0, NULL, &ptr, NULL, NULL, SPI_OK_SELECT);
829                 if (ptr != NULL && SPI_processed == 1)
830                 {
831                         elog(WARNING, "%s-%d ok", __FUNCTION__, caseno);
832                         (*passed)++;
833                 }
834                 ReleaseCurrentSubTransaction();
835         }
836         PG_CATCH();
837         {
838                 elog(WARNING, "*-*-%d failed", caseno);
839                 RollbackAndReleaseCurrentSubTransaction();
840                 SPI_restore_connection();
841         }
842         PG_END_TRY();
843
844         /*
845          * *-*-2
846          *   - plan is cached
847          */
848         caseno++;
849         BeginInternalSubTransaction("test");
850         PG_TRY();
851         {
852                 org_ptr = ptr;
853                 spi_exec_query(NULL, 0, NULL, &ptr, NULL, NULL, SPI_OK_SELECT);
854                 if (ptr == org_ptr && SPI_processed == 1)
855                 {
856                         elog(WARNING, "%s-%d ok", __FUNCTION__, caseno);
857                         (*passed)++;
858                 }
859                 ReleaseCurrentSubTransaction();
860         }
861         PG_CATCH();
862         {
863                 elog(WARNING, "*-*-%d failed", caseno);
864                 RollbackAndReleaseCurrentSubTransaction();
865                 FlushErrorState();
866                 SPI_restore_connection();
867         }
868         PG_END_TRY();
869         SPI_freeplan(ptr);
870         ptr = NULL;
871
872         /*
873          * *-*-3
874          *   - query error
875          */
876         caseno++;
877         BeginInternalSubTransaction("test");
878         PG_TRY();
879         {
880                 spi_exec_query("SELECT 1 / 0",
881                                            0, NULL, &ptr, NULL, NULL, SPI_OK_SELECT);
882                 elog(WARNING, "*-*-%d failed", caseno);
883                 ReleaseCurrentSubTransaction();
884         }
885         PG_CATCH();
886         {
887                 elog(WARNING, "%s-%d ok", __FUNCTION__, caseno);
888                 (*passed)++;
889                 RollbackAndReleaseCurrentSubTransaction();
890                 FlushErrorState();
891                 SPI_restore_connection();
892         }
893         PG_END_TRY();
894         SPI_freeplan(ptr);
895         ptr = NULL;
896
897         /*
898          * *-*-4
899          *   - query success
900          */
901         caseno++;
902         BeginInternalSubTransaction("test");
903         PG_TRY();
904         {
905                 spi_exec_query("SELECT 1", 0, NULL, &ptr, NULL, NULL, SPI_OK_SELECT);
906                 if (ptr != NULL && SPI_processed == 1)
907                 {
908                         elog(WARNING, "%s-%d ok", __FUNCTION__, caseno);
909                         (*passed)++;
910                 }
911                 ReleaseCurrentSubTransaction();
912         }
913         PG_CATCH();
914         {
915                 elog(WARNING, "*-*-%d failed", caseno);
916                 PG_RE_THROW();
917                 RollbackAndReleaseCurrentSubTransaction();
918                 SPI_restore_connection();
919         }
920         PG_END_TRY();
921         SPI_freeplan(ptr);
922         ptr = NULL;
923
924         /* report # of tests */
925         (*total) += caseno;
926
927         /* Cleanup */
928         rc = SPI_finish();
929         if (rc != SPI_OK_FINISH && rc != SPI_ERROR_UNCONNECTED)
930                 elog(ERROR, "could not finish SPI: %s", SPI_result_code_string(rc));
931 }
932
933 static void
934 test_spi_exec_utility(int *passed, int *total)
935 {
936         int                             rc;
937         volatile int    caseno = 0;
938
939         /* Initialize */
940         rc = SPI_connect();
941         if (rc != SPI_OK_CONNECT)
942                 elog(ERROR, "could not connect SPI: %s", SPI_result_code_string(rc));
943
944         /*
945          * *-*-1
946          *   - query error
947          */
948         caseno++;
949         BeginInternalSubTransaction("test");
950         PG_TRY();
951         {
952                 spi_exec_utility("RESET dummy_parameter");
953                 elog(WARNING, "*-*-%d failed", caseno);
954                 ReleaseCurrentSubTransaction();
955         }
956         PG_CATCH();
957         {
958                 elog(WARNING, "%s-%d ok", __FUNCTION__, caseno);
959                 (*passed)++;
960                 RollbackAndReleaseCurrentSubTransaction();
961                 FlushErrorState();
962                 SPI_restore_connection();
963         }
964         PG_END_TRY();
965
966         /*
967          * *-*-2
968          *   - query success
969          */
970         caseno++;
971         BeginInternalSubTransaction("test");
972         PG_TRY();
973         {
974                 spi_exec_utility("RESET client_min_messages");
975                 elog(WARNING, "%s-%d ok", __FUNCTION__, caseno);
976                 (*passed)++;
977                 ReleaseCurrentSubTransaction();
978         }
979         PG_CATCH();
980         {
981                 elog(WARNING, "*-*-%d failed", caseno);
982                 RollbackAndReleaseCurrentSubTransaction();
983                 SPI_restore_connection();
984         }
985         PG_END_TRY();
986
987         /* report # of tests */
988         (*total) += caseno;
989
990         /* Cleanup */
991         rc = SPI_finish();
992         if (rc != SPI_OK_FINISH && rc != SPI_ERROR_UNCONNECTED)
993                 elog(ERROR, "could not finish SPI: %s", SPI_result_code_string(rc));
994 }
995
996 /*
997  * Unit test entry point for import.c.  This will be called by PG_init()
998  * function, after initialization for this extension is completed .
999  * This funciton should add the numbers of tests passed and the total number of
1000  * tests to parameter passed and total respectively.
1001  */
1002 void
1003 test_import(int *passed, int *total)
1004 {
1005         int local_passed = 0;
1006         int local_total = 0;
1007
1008         elog(WARNING, "==========");
1009
1010         /* Do tests here */
1011         test_appendLiteral(&local_passed, &local_total);
1012         test_spi_exec_query(&local_passed, &local_total);
1013         test_spi_exec_utility(&local_passed, &local_total);
1014
1015         elog(WARNING, "%s %d/%d passed", __FUNCTION__, local_passed, local_total);
1016         *passed += local_passed;
1017         *total += local_total;
1018 }
1019
1020 #endif